From ad7d66921884de6ab1b08d6e80108b4e62f8edc2 Mon Sep 17 00:00:00 2001 From: Naveen Prasanna V Date: Wed, 8 Jan 2025 07:12:48 +0000 Subject: [PATCH] Issue 11732 - s2a tests are order-dependent --- examples/example-xds/build.gradle | 4 +- .../grpc/netty/NettyClientTransportTest.java | 44 ++++++++--- .../io/grpc/s2a/S2AChannelCredentials.java | 16 +++- .../channel/S2AChannelPool.java | 2 +- .../channel/S2AGrpcChannelPool.java | 2 +- .../channel/S2AHandshakerServiceChannel.java | 6 +- .../handshaker/ConnectionClosedException.java | 4 +- .../GetAuthenticationMechanisms.java | 12 +-- .../{ => internal}/handshaker/ProtoUtil.java | 12 +-- .../handshaker/S2AConnectionException.java | 4 +- .../handshaker/S2AIdentity.java | 3 +- .../handshaker/S2APrivateKeyMethod.java | 14 ++-- .../S2AProtocolNegotiatorFactory.java | 19 ++--- .../{ => internal}/handshaker/S2AStub.java | 46 +++++++---- .../handshaker/S2ATrustManager.java | 11 ++- .../handshaker/SslContextFactory.java | 13 +++- .../tokenmanager/AccessTokenManager.java | 6 +- .../tokenmanager/SingleTokenFetcher.java | 14 +++- .../handshaker/tokenmanager/TokenFetcher.java | 4 +- .../s2a/channel/S2AGrpcChannelPoolTest.java | 2 + .../S2AHandshakerServiceChannelTest.java | 3 +- .../io/grpc/s2a/handshaker/FakeWriter.java | 1 + .../GetAuthenticationMechanismsTest.java | 13 +++- .../grpc/s2a/handshaker/IntegrationTest.java | 1 - .../io/grpc/s2a/handshaker/ProtoUtilTest.java | 1 + .../handshaker/S2APrivateKeyMethodTest.java | 5 +- .../S2AProtocolNegotiatorFactoryTest.java | 11 +-- .../io/grpc/s2a/handshaker/S2AStubTest.java | 9 ++- .../s2a/handshaker/S2ATrustManagerTest.java | 4 +- .../s2a/handshaker/SslContextFactoryTest.java | 5 +- .../SingleTokenAccessTokenManagerTest.java | 15 +++- .../AsyncServletOutputStreamWriter.java | 60 +++++++++----- ...vletOutputStreamWriterConcurrencyTest.java | 78 ++++++++++++++----- .../java/io/grpc/xds/XdsNameResolver.java | 36 +++++---- 34 files changed, 327 insertions(+), 153 deletions(-) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/channel/S2AChannelPool.java (97%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/channel/S2AGrpcChannelPool.java (98%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/channel/S2AHandshakerServiceChannel.java (97%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/ConnectionClosedException.java (88%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/GetAuthenticationMechanisms.java (82%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/ProtoUtil.java (90%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2AConnectionException.java (90%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2AIdentity.java (96%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2APrivateKeyMethod.java (92%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2AProtocolNegotiatorFactory.java (94%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2AStub.java (87%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/S2ATrustManager.java (94%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/SslContextFactory.java (94%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/tokenmanager/AccessTokenManager.java (91%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/tokenmanager/SingleTokenFetcher.java (85%) rename s2a/src/main/java/io/grpc/s2a/{ => internal}/handshaker/tokenmanager/TokenFetcher.java (89%) diff --git a/examples/example-xds/build.gradle b/examples/example-xds/build.gradle index a3e23a19601..5d0bcc34d87 100644 --- a/examples/example-xds/build.gradle +++ b/examples/example-xds/build.gradle @@ -14,8 +14,8 @@ repositories { } java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_23 + targetCompatibility = JavaVersion.VERSION_23 } // IMPORTANT: You probably want the non-SNAPSHOT version of gRPC. Make sure you diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 9777bb0926c..4d308ad3de5 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -128,7 +128,9 @@ */ @RunWith(JUnit4.class) public class NettyClientTransportTest { - @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); private static final SslContext SSL_CONTEXT = createSslContext(); @@ -143,7 +145,9 @@ public class NettyClientTransportTest { private final InternalChannelz channelz = new InternalChannelz(); private Runnable tooManyPingsRunnable = new Runnable() { // Throwing is useless in this method, because Netty doesn't propagate the exception - @Override public void run() {} + @Override + public void run() { + } }; private Attributes eagAttributes = Attributes.EMPTY; @@ -415,7 +419,7 @@ public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Excep new Rpc(transport).halfClose().waitForResponse(); // Create 3 streams, but don't half-close. The transport will buffer the second and third. - Rpc[] rpcs = new Rpc[] { new Rpc(transport), new Rpc(transport), new Rpc(transport) }; + Rpc[] rpcs = new Rpc[]{new Rpc(transport), new Rpc(transport), new Rpc(transport)}; // Wait for the response for the stream that was actually created. rpcs[0].waitForResponse(); @@ -439,7 +443,10 @@ public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Excep } public static class CantConstructChannel extends NioSocketChannel { - /** Constructor. It doesn't work. Feel free to try. But it doesn't work. */ + + /** + * Constructor. It doesn't work. Feel free to try. But it doesn't work. + */ public CantConstructChannel() { // Use an Error because we've seen cases of channels failing to construct due to classloading // problems (like mixing different versions of Netty), and those involve Errors. @@ -447,7 +454,9 @@ public CantConstructChannel() { } } - private static class CantConstructChannelError extends Error {} + private static class CantConstructChannelError extends Error { + + } @Test public void failingToConstructChannelShouldFailGracefully() throws Exception { @@ -601,6 +610,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Test public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception { + startServer(100, 1); NettyClientTransport transport = newTransport(newNegotiator()); @@ -615,6 +625,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception { Status status = Status.fromThrowable(e); assertEquals(status.toString(), Status.Code.INTERNAL, status.getCode()); } + } @Test @@ -866,6 +877,7 @@ private static SslContext createSslContext() { } private static class Rpc { + static final String MESSAGE = "hello"; static final MethodDescriptor METHOD = MethodDescriptor.newBuilder() @@ -885,7 +897,8 @@ private static class Rpc { Rpc(NettyClientTransport transport, Metadata headers) { stream = transport.newStream( METHOD, headers, CallOptions.DEFAULT, - new ClientStreamTracer[]{ new ClientStreamTracer() {} }); + new ClientStreamTracer[]{new ClientStreamTracer() { + }}); stream.start(listener); stream.request(1); stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes(UTF_8))); @@ -907,6 +920,7 @@ void waitForClose() throws InterruptedException, ExecutionException, TimeoutExce } private static final class TestClientStreamListener implements ClientStreamListener { + final SettableFuture closedFuture = SettableFuture.create(); final SettableFuture responseFuture = SettableFuture.create(); @@ -938,6 +952,7 @@ public void onReady() { } private static final class EchoServerStreamListener implements ServerStreamListener { + final ServerStream stream; final Metadata headers; @@ -972,9 +987,10 @@ public void closed(Status status) { } private final class EchoServerListener implements ServerListener { + final List transports = new ArrayList<>(); final List streamListeners = - Collections.synchronizedList(new ArrayList()); + Collections.synchronizedList(new ArrayList()); @Override public ServerTransportListener transportCreated(final ServerTransport transport) { @@ -996,7 +1012,8 @@ public Attributes transportReady(Attributes transportAttrs) { } @Override - public void transportTerminated() {} + public void transportTerminated() { + } }; } @@ -1006,6 +1023,7 @@ public void serverShutdown() { } private static final class StringMarshaller implements Marshaller { + static final StringMarshaller INSTANCE = new StringMarshaller(); @Override @@ -1042,6 +1060,7 @@ public void fail(ChannelHandlerContext ctx, Throwable cause) { } private static class NoopProtocolNegotiator implements ProtocolNegotiator { + GrpcHttp2ConnectionHandler grpcHandler; NoopHandler handler; @@ -1057,7 +1076,8 @@ public AsciiString scheme() { } @Override - public void close() {} + public void close() { + } } private static final class SocketPicker extends LocalSocketPicker { @@ -1072,9 +1092,11 @@ public SocketAddress createSocketAddress(SocketAddress remoteAddress, Attributes private static final class FakeChannelLogger extends ChannelLogger { @Override - public void log(ChannelLogLevel level, String message) {} + public void log(ChannelLogLevel level, String message) { + } @Override - public void log(ChannelLogLevel level, String messageFormat, Object... args) {} + public void log(ChannelLogLevel level, String messageFormat, Object... args) { + } } } diff --git a/s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java b/s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java index ba0f6d72de1..685c9df9b05 100644 --- a/s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java +++ b/s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Channel; import io.grpc.ChannelCredentials; @@ -30,9 +31,10 @@ import io.grpc.internal.SharedResourcePool; import io.grpc.netty.InternalNettyChannelCredentials; import io.grpc.netty.InternalProtocolNegotiator; -import io.grpc.s2a.channel.S2AHandshakerServiceChannel; -import io.grpc.s2a.handshaker.S2AIdentity; -import io.grpc.s2a.handshaker.S2AProtocolNegotiatorFactory; +import io.grpc.s2a.internal.channel.S2AHandshakerServiceChannel; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AProtocolNegotiatorFactory; +import io.grpc.s2a.internal.handshaker.S2AStub; import javax.annotation.concurrent.NotThreadSafe; import org.checkerframework.checker.nullness.qual.Nullable; @@ -60,6 +62,7 @@ public static final class Builder { private ObjectPool s2aChannelPool; private ChannelCredentials s2aChannelCredentials; private @Nullable S2AIdentity localIdentity = null; + private S2AStub stub; Builder(String s2aAddress) { this.s2aAddress = s2aAddress; @@ -113,6 +116,13 @@ public Builder setS2AChannelCredentials(ChannelCredentials s2aChannelCredentials return this; } + @VisibleForTesting + Builder setStub(S2AStub stub) { + checkNotNull(stub); + this.stub = stub; + return this; + } + public ChannelCredentials build() { checkState(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty."); ObjectPool s2aChannelPool = diff --git a/s2a/src/main/java/io/grpc/s2a/channel/S2AChannelPool.java b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java similarity index 97% rename from s2a/src/main/java/io/grpc/s2a/channel/S2AChannelPool.java rename to s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java index e5caf5e69bd..aaaa0fffd53 100644 --- a/s2a/src/main/java/io/grpc/s2a/channel/S2AChannelPool.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.channel; +package io.grpc.s2a.internal.channel; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Channel; diff --git a/s2a/src/main/java/io/grpc/s2a/channel/S2AGrpcChannelPool.java b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java similarity index 98% rename from s2a/src/main/java/io/grpc/s2a/channel/S2AGrpcChannelPool.java rename to s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java index 4794cd9ee49..af911185e6c 100644 --- a/s2a/src/main/java/io/grpc/s2a/channel/S2AGrpcChannelPool.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.channel; +package io.grpc.s2a.internal.channel; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; diff --git a/s2a/src/main/java/io/grpc/s2a/channel/S2AHandshakerServiceChannel.java b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AHandshakerServiceChannel.java similarity index 97% rename from s2a/src/main/java/io/grpc/s2a/channel/S2AHandshakerServiceChannel.java rename to s2a/src/main/java/io/grpc/s2a/internal/channel/S2AHandshakerServiceChannel.java index 443ea553e52..73a4d9aea37 100644 --- a/s2a/src/main/java/io/grpc/s2a/channel/S2AHandshakerServiceChannel.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/channel/S2AHandshakerServiceChannel.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.channel; +package io.grpc.s2a.internal.channel; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.concurrent.TimeUnit.SECONDS; @@ -122,12 +122,12 @@ public String toString() { * Manages a channel using a {@link ManagedChannel} instance. */ @VisibleForTesting - static class HandshakerServiceChannel extends Channel { + public static class HandshakerServiceChannel extends Channel { private static final Logger logger = Logger.getLogger(S2AHandshakerServiceChannel.class.getName()); private final ManagedChannel delegate; - static HandshakerServiceChannel create(ManagedChannel delegate) { + public static HandshakerServiceChannel create(ManagedChannel delegate) { checkNotNull(delegate); return new HandshakerServiceChannel(delegate); } diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/ConnectionClosedException.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/ConnectionClosedException.java similarity index 88% rename from s2a/src/main/java/io/grpc/s2a/handshaker/ConnectionClosedException.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/ConnectionClosedException.java index 1a7f86bda91..0a2cbabe84f 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/ConnectionClosedException.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/ConnectionClosedException.java @@ -14,13 +14,13 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import java.io.IOException; /** Indicates that a connection has been closed. */ @SuppressWarnings("serial") // This class is never serialized. -final class ConnectionClosedException extends IOException { +public final class ConnectionClosedException extends IOException { public ConnectionClosedException(String errorMessage) { super(errorMessage); } diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/GetAuthenticationMechanisms.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/GetAuthenticationMechanisms.java similarity index 82% rename from s2a/src/main/java/io/grpc/s2a/handshaker/GetAuthenticationMechanisms.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/GetAuthenticationMechanisms.java index 56d74a9b766..29c27f9ec39 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/GetAuthenticationMechanisms.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/GetAuthenticationMechanisms.java @@ -14,16 +14,17 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import com.google.errorprone.annotations.Immutable; -import io.grpc.s2a.handshaker.S2AIdentity; -import io.grpc.s2a.handshaker.tokenmanager.AccessTokenManager; +//import io.grpc.s2a.internal.handshaker.GetAuthenticationMechanisms; +import io.grpc.s2a.handshaker.AuthenticationMechanism; +import io.grpc.s2a.internal.handshaker.tokenmanager.AccessTokenManager; import java.util.Optional; /** Retrieves the authentication mechanism for a given local identity. */ @Immutable -final class GetAuthenticationMechanisms { +public final class GetAuthenticationMechanisms { private static final Optional TOKEN_MANAGER = AccessTokenManager.create(); /** @@ -32,7 +33,8 @@ final class GetAuthenticationMechanisms { * @param localIdentity the identity for which to fetch a token. * @return an {@link AuthenticationMechanism} for the given local identity. */ - static Optional getAuthMechanism(Optional localIdentity) { + public static Optional getAuthMechanism( + Optional localIdentity) { if (!TOKEN_MANAGER.isPresent()) { return Optional.empty(); } diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/ProtoUtil.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/ProtoUtil.java similarity index 90% rename from s2a/src/main/java/io/grpc/s2a/handshaker/ProtoUtil.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/ProtoUtil.java index 59e3931d9e6..17b75cfd859 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/ProtoUtil.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/ProtoUtil.java @@ -14,12 +14,14 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import com.google.common.collect.ImmutableSet; +import io.grpc.s2a.handshaker.Ciphersuite; +import io.grpc.s2a.handshaker.TLSVersion; /** Converts proto messages to Netty strings. */ -final class ProtoUtil { +public final class ProtoUtil { /** * Converts {@link Ciphersuite} to its {@link String} representation. * @@ -27,7 +29,7 @@ final class ProtoUtil { * @return a {@link String} representing the ciphersuite. * @throws AssertionError if the {@link Ciphersuite} is not one of the supported ciphersuites. */ - static String convertCiphersuite(Ciphersuite ciphersuite) { + public static String convertCiphersuite(Ciphersuite ciphersuite) { switch (ciphersuite) { case CIPHERSUITE_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: return "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"; @@ -54,7 +56,7 @@ static String convertCiphersuite(Ciphersuite ciphersuite) { * @return a {@link String} representation of the TLS version. * @throws AssertionError if the {@code tlsVersion} is not one of the supported TLS versions. */ - static String convertTlsProtocolVersion(TLSVersion tlsVersion) { + public static String convertTlsProtocolVersion(TLSVersion tlsVersion) { switch (tlsVersion) { case TLS_VERSION_1_3: return "TLSv1.3"; @@ -74,7 +76,7 @@ static String convertTlsProtocolVersion(TLSVersion tlsVersion) { * Builds a set of strings representing all {@link TLSVersion}s between {@code minTlsVersion} and * {@code maxTlsVersion}. */ - static ImmutableSet buildTlsProtocolVersionSet( + public static ImmutableSet buildTlsProtocolVersionSet( TLSVersion minTlsVersion, TLSVersion maxTlsVersion) { ImmutableSet.Builder tlsVersions = ImmutableSet.builder(); for (TLSVersion tlsVersion : TLSVersion.values()) { diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AConnectionException.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AConnectionException.java similarity index 90% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2AConnectionException.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AConnectionException.java index d976308ad22..1301967afda 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AConnectionException.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AConnectionException.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; /** Exception that denotes a runtime error that was encountered when talking to the S2A server. */ @SuppressWarnings("serial") // This class is never serialized. public class S2AConnectionException extends RuntimeException { - S2AConnectionException(String message) { + public S2AConnectionException(String message) { super(message); } } \ No newline at end of file diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AIdentity.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AIdentity.java similarity index 96% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2AIdentity.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AIdentity.java index c4fed7377ac..769bbb0636b 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AIdentity.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AIdentity.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkNotNull; import com.google.errorprone.annotations.ThreadSafe; +import io.grpc.s2a.handshaker.Identity; /** * Stores an identity in such a way that it can be sent to the S2A handshaker service. The identity diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2APrivateKeyMethod.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2APrivateKeyMethod.java similarity index 92% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2APrivateKeyMethod.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2APrivateKeyMethod.java index fb6d5761355..0921f0174ce 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2APrivateKeyMethod.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2APrivateKeyMethod.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -22,7 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.handshaker.OffloadPrivateKeyOperationReq; +import io.grpc.s2a.handshaker.SessionReq; +import io.grpc.s2a.handshaker.SessionResp; +import io.grpc.s2a.handshaker.SignatureAlgorithm; import io.netty.handler.ssl.OpenSslPrivateKeyMethod; import java.io.IOException; import java.util.Optional; @@ -40,7 +43,7 @@ * GrpcSslContexts.configure(SslContextBuilder.forClient());}. */ @NotThreadSafe -final class S2APrivateKeyMethod implements OpenSslPrivateKeyMethod { +public final class S2APrivateKeyMethod implements OpenSslPrivateKeyMethod { private final S2AStub stub; private final Optional localIdentity; private static final ImmutableMap @@ -65,7 +68,8 @@ final class S2APrivateKeyMethod implements OpenSslPrivateKeyMethod { OpenSslPrivateKeyMethod.SSL_SIGN_RSA_PSS_RSAE_SHA512, SignatureAlgorithm.S2A_SSL_SIGN_RSA_PSS_RSAE_SHA512); - public static S2APrivateKeyMethod create(S2AStub stub, Optional localIdentity) { + public static S2APrivateKeyMethod create( + S2AStub stub, Optional localIdentity) { checkNotNull(stub); return new S2APrivateKeyMethod(stub, localIdentity); } @@ -84,7 +88,7 @@ private S2APrivateKeyMethod(S2AStub stub, Optional localIdentity) { * @throws UnsupportedOperationException if the algorithm is not supported by S2A. */ @VisibleForTesting - static SignatureAlgorithm convertOpenSslSignAlgToS2ASignAlg(int signatureAlgorithm) { + public static SignatureAlgorithm convertOpenSslSignAlgToS2ASignAlg(int signatureAlgorithm) { SignatureAlgorithm sig = OPENSSL_TO_S2A_SIGNATURE_ALGORITHM_MAP.get(signatureAlgorithm); if (sig == null) { throw new UnsupportedOperationException( diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactory.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AProtocolNegotiatorFactory.java similarity index 94% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactory.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AProtocolNegotiatorFactory.java index 14bdc05238d..752b209d05a 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactory.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AProtocolNegotiatorFactory.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -37,9 +37,9 @@ import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.InternalProtocolNegotiators.ProtocolNegotiationHandler; -import io.grpc.s2a.channel.S2AChannelPool; -import io.grpc.s2a.channel.S2AGrpcChannelPool; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.handshaker.S2AServiceGrpc; +import io.grpc.s2a.internal.channel.S2AChannelPool; +import io.grpc.s2a.internal.channel.S2AGrpcChannelPool; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @@ -55,7 +55,8 @@ /** Factory for performing negotiation of a secure channel using the S2A. */ @ThreadSafe public final class S2AProtocolNegotiatorFactory { - @VisibleForTesting static final int DEFAULT_PORT = 443; + @VisibleForTesting + public static final int DEFAULT_PORT = 443; private static final AsciiString SCHEME = AsciiString.of("https"); /** @@ -98,14 +99,14 @@ public int getDefaultPort() { /** Negotiates the TLS handshake using S2A. */ @VisibleForTesting - static final class S2AProtocolNegotiator implements ProtocolNegotiator { + public static final class S2AProtocolNegotiator implements ProtocolNegotiator { private final S2AChannelPool channelPool; private final Optional localIdentity; private final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); - static S2AProtocolNegotiator createForClient( + public static S2AProtocolNegotiator createForClient( S2AChannelPool channelPool, @Nullable S2AIdentity localIdentity) { checkNotNull(channelPool, "Channel pool should not be null."); if (localIdentity == null) { @@ -116,7 +117,7 @@ static S2AProtocolNegotiator createForClient( } @VisibleForTesting - static @Nullable String getHostNameFromAuthority(@Nullable String authority) { + public static @Nullable String getHostNameFromAuthority(@Nullable String authority) { if (authority == null) { return null; } @@ -150,7 +151,7 @@ public void close() { } @VisibleForTesting - static class BufferReadsHandler extends ChannelInboundHandlerAdapter { + public static class BufferReadsHandler extends ChannelInboundHandlerAdapter { private final List reads = new ArrayList<>(); private boolean readComplete; diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AStub.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AStub.java similarity index 87% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2AStub.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AStub.java index 8249ca59d09..2f718f7ac20 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2AStub.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2AStub.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -22,6 +22,9 @@ import static java.util.concurrent.TimeUnit.SECONDS; import com.google.common.annotations.VisibleForTesting; +import io.grpc.s2a.handshaker.S2AServiceGrpc; +import io.grpc.s2a.handshaker.SessionReq; +import io.grpc.s2a.handshaker.SessionResp; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Optional; @@ -31,9 +34,12 @@ import java.util.logging.Logger; import javax.annotation.concurrent.NotThreadSafe; -/** Reads and writes messages to and from the S2A. */ +/** + * Reads and writes messages to and from the S2A. + */ @NotThreadSafe -class S2AStub implements AutoCloseable { +public class S2AStub implements AutoCloseable { + private static final Logger logger = Logger.getLogger(S2AStub.class.getName()); private static final long HANDSHAKE_RPC_DEADLINE_SECS = 20; private final StreamObserver reader = new Reader(); @@ -43,13 +49,14 @@ class S2AStub implements AutoCloseable { private boolean doneReading = false; private boolean doneWriting = false; - static S2AStub newInstance(S2AServiceGrpc.S2AServiceStub serviceStub) { + @VisibleForTesting + public static S2AStub newInstance(S2AServiceGrpc.S2AServiceStub serviceStub) { checkNotNull(serviceStub); return new S2AStub(serviceStub); } @VisibleForTesting - static S2AStub newInstanceForTesting(StreamObserver writer) { + public static S2AStub newInstanceForTesting(StreamObserver writer) { checkNotNull(writer); return new S2AStub(writer); } @@ -63,12 +70,12 @@ private S2AStub(StreamObserver writer) { } @VisibleForTesting - StreamObserver getReader() { + public StreamObserver getReader() { return reader; } @VisibleForTesting - BlockingQueue getResponses() { + public BlockingQueue getResponses() { return responses; } @@ -79,10 +86,10 @@ BlockingQueue getResponses() { * * @param req the {@code SessionReq} message to be sent to the S2A server. * @return the {@code SessionResp} message received from the S2A server. - * @throws ConnectionClosedException if {@code reader} or {@code writer} calls their {@code - * onCompleted} method. - * @throws IOException if an unexpected response is received, or if the {@code reader} or {@code - * writer} calls their {@code onError} method. + * @throws ConnectionClosedException if {@code reader} or {@code writer} calls their + * {@code onCompleted} method. + * @throws IOException if an unexpected response is received, or if the {@code reader} or + * {@code writer} calls their {@code onError} method. */ public SessionResp send(SessionReq req) throws IOException, InterruptedException { if (doneWriting && doneReading) { @@ -138,7 +145,9 @@ public void close() { } } - /** Create a new writer if the writer is null. */ + /** + * Create a new writer if the writer is null. + */ private void createWriterIfNull() { if (writer == null) { writer = @@ -150,9 +159,10 @@ private void createWriterIfNull() { } private class Reader implements StreamObserver { + /** - * Places a {@code SessionResp} message in the {@code responses} queue, or an {@code - * IOException} if reading is complete. + * Places a {@code SessionResp} message in the {@code responses} queue, or an + * {@code IOException} if reading is complete. * * @param resp the {@code SessionResp} message received from the S2A handshaker module. */ @@ -165,7 +175,8 @@ public void onNext(SessionResp resp) { /** * Places a {@code Throwable} in the {@code responses} queue. * - * @param t the {@code Throwable} caught when reading the stream to the S2A handshaker module. + * @param t the {@code Throwable} caught when reading the stream to the S2A handshaker + * module. */ @Override public void onError(Throwable t) { @@ -187,6 +198,7 @@ public void onCompleted() { } private static final class Result { + private final Optional response; private final Optional throwable; @@ -204,7 +216,9 @@ private Result(Optional response, Optional throwable) { this.throwable = throwable; } - /** Throws {@code throwable} if present, and returns {@code response} otherwise. */ + /** + * Throws {@code throwable} if present, and returns {@code response} otherwise. + */ SessionResp getResultOrThrow() throws IOException { if (throwable.isPresent()) { if (throwable.get() instanceof ConnectionClosedException) { diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/S2ATrustManager.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2ATrustManager.java similarity index 94% rename from s2a/src/main/java/io/grpc/s2a/handshaker/S2ATrustManager.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2ATrustManager.java index aafbb94c047..902efcd9a95 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/S2ATrustManager.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/S2ATrustManager.java @@ -14,15 +14,18 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.handshaker.SessionReq; +import io.grpc.s2a.handshaker.SessionResp; +import io.grpc.s2a.handshaker.ValidatePeerCertificateChainReq; import io.grpc.s2a.handshaker.ValidatePeerCertificateChainReq.VerificationMode; +import io.grpc.s2a.handshaker.ValidatePeerCertificateChainResp; import java.io.IOException; import java.security.cert.CertificateEncodingException; import java.security.cert.CertificateException; @@ -34,12 +37,12 @@ /** Offloads verification of the peer certificate chain to S2A. */ @NotThreadSafe -final class S2ATrustManager implements X509TrustManager { +public final class S2ATrustManager implements X509TrustManager { private final Optional localIdentity; private final S2AStub stub; private final String hostname; - static S2ATrustManager createForClient( + public static S2ATrustManager createForClient( S2AStub stub, String hostname, Optional localIdentity) { checkNotNull(stub); checkNotNull(hostname); diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/SslContextFactory.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/SslContextFactory.java similarity index 94% rename from s2a/src/main/java/io/grpc/s2a/handshaker/SslContextFactory.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/SslContextFactory.java index 1ac5887ebc4..f70e3e9c52c 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/SslContextFactory.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/SslContextFactory.java @@ -14,14 +14,19 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker; +package io.grpc.s2a.internal.handshaker; import static com.google.common.base.Preconditions.checkNotNull; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.ImmutableSet; import io.grpc.netty.GrpcSslContexts; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.handshaker.AuthenticationMechanism; +import io.grpc.s2a.handshaker.ConnectionSide; +import io.grpc.s2a.handshaker.GetTlsConfigurationReq; +import io.grpc.s2a.handshaker.GetTlsConfigurationResp; +import io.grpc.s2a.handshaker.SessionReq; +import io.grpc.s2a.handshaker.SessionResp; import io.netty.handler.ssl.OpenSslContextOption; import io.netty.handler.ssl.OpenSslSessionContext; import io.netty.handler.ssl.OpenSslX509KeyManagerFactory; @@ -41,7 +46,7 @@ import javax.net.ssl.SSLSessionContext; /** Creates {@link SslContext} objects with TLS configurations from S2A server. */ -final class SslContextFactory { +public final class SslContextFactory { /** * Creates {@link SslContext} objects for client with TLS configurations from S2A server. @@ -55,7 +60,7 @@ final class SslContextFactory { * @throws IOException if an unexpected response from S2A server is received. * @throws InterruptedException if {@code stub} is closed. */ - static SslContext createForClient( + public static SslContext createForClient( S2AStub stub, String targetName, Optional localIdentity) throws IOException, InterruptedException, diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/AccessTokenManager.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/AccessTokenManager.java similarity index 91% rename from s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/AccessTokenManager.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/AccessTokenManager.java index 94549d11c87..979ed24956f 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/AccessTokenManager.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/AccessTokenManager.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker.tokenmanager; +package io.grpc.s2a.internal.handshaker.tokenmanager; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AIdentity; import java.lang.reflect.Method; import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; @@ -32,7 +32,7 @@ public static Optional create() { Optional tokenFetcher; try { Class singleTokenFetcherClass = - Class.forName("io.grpc.s2a.handshaker.tokenmanager.SingleTokenFetcher"); + Class.forName("io.grpc.s2a.internal.handshaker.tokenmanager.SingleTokenFetcher"); Method createTokenFetcher = singleTokenFetcherClass.getMethod("create"); tokenFetcher = (Optional) createTokenFetcher.invoke(null); } catch (ClassNotFoundException e) { diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenFetcher.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/SingleTokenFetcher.java similarity index 85% rename from s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenFetcher.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/SingleTokenFetcher.java index c3dffd2b715..b7512f591c7 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenFetcher.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/SingleTokenFetcher.java @@ -14,15 +14,18 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker.tokenmanager; +package io.grpc.s2a.internal.handshaker.tokenmanager; import com.google.common.annotations.VisibleForTesting; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AIdentity; import java.util.Optional; -/** Fetches a single access token via an environment variable. */ +/** + * Fetches a single access token via an environment variable. + */ @SuppressWarnings("NonFinalStaticField") public final class SingleTokenFetcher implements TokenFetcher { + private static final String ENVIRONMENT_VARIABLE = "S2A_ACCESS_TOKEN"; private static String accessToken = System.getenv(ENVIRONMENT_VARIABLE); @@ -41,6 +44,11 @@ public static void setAccessToken(String token) { accessToken = token; } + @VisibleForTesting + public static String getAccessToken() { + return accessToken; + } + private SingleTokenFetcher(String token) { this.token = token; } diff --git a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/TokenFetcher.java b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/TokenFetcher.java similarity index 89% rename from s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/TokenFetcher.java rename to s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/TokenFetcher.java index 9eeddaad844..6827f095afe 100644 --- a/s2a/src/main/java/io/grpc/s2a/handshaker/tokenmanager/TokenFetcher.java +++ b/s2a/src/main/java/io/grpc/s2a/internal/handshaker/tokenmanager/TokenFetcher.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package io.grpc.s2a.handshaker.tokenmanager; +package io.grpc.s2a.internal.handshaker.tokenmanager; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AIdentity; /** Fetches tokens used to authenticate to S2A. */ interface TokenFetcher { diff --git a/s2a/src/test/java/io/grpc/s2a/channel/S2AGrpcChannelPoolTest.java b/s2a/src/test/java/io/grpc/s2a/channel/S2AGrpcChannelPoolTest.java index 260129f8f56..91b9354ae37 100644 --- a/s2a/src/test/java/io/grpc/s2a/channel/S2AGrpcChannelPoolTest.java +++ b/s2a/src/test/java/io/grpc/s2a/channel/S2AGrpcChannelPoolTest.java @@ -22,6 +22,8 @@ import io.grpc.Channel; import io.grpc.internal.ObjectPool; +import io.grpc.s2a.internal.channel.S2AChannelPool; +import io.grpc.s2a.internal.channel.S2AGrpcChannelPool; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/s2a/src/test/java/io/grpc/s2a/channel/S2AHandshakerServiceChannelTest.java b/s2a/src/test/java/io/grpc/s2a/channel/S2AHandshakerServiceChannelTest.java index 7845e7c3bcb..ae4aef6aa07 100644 --- a/s2a/src/test/java/io/grpc/s2a/channel/S2AHandshakerServiceChannelTest.java +++ b/s2a/src/test/java/io/grpc/s2a/channel/S2AHandshakerServiceChannelTest.java @@ -36,7 +36,8 @@ import io.grpc.benchmarks.Utils; import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.netty.NettyServerBuilder; -import io.grpc.s2a.channel.S2AHandshakerServiceChannel.HandshakerServiceChannel; +import io.grpc.s2a.internal.channel.S2AHandshakerServiceChannel; +import io.grpc.s2a.internal.channel.S2AHandshakerServiceChannel.HandshakerServiceChannel; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleRequest; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/FakeWriter.java b/s2a/src/test/java/io/grpc/s2a/handshaker/FakeWriter.java index 45961b81b7b..7b751ef2e17 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/FakeWriter.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/FakeWriter.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; +import io.grpc.s2a.internal.handshaker.S2AConnectionException; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.security.KeyFactory; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/GetAuthenticationMechanismsTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/GetAuthenticationMechanismsTest.java index 884e1ec88eb..01ae0e57731 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/GetAuthenticationMechanismsTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/GetAuthenticationMechanismsTest.java @@ -17,9 +17,11 @@ package io.grpc.s2a.handshaker; import com.google.common.truth.Expect; -import io.grpc.s2a.handshaker.S2AIdentity; -import io.grpc.s2a.handshaker.tokenmanager.SingleTokenFetcher; +import io.grpc.s2a.internal.handshaker.GetAuthenticationMechanisms; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.tokenmanager.SingleTokenFetcher; import java.util.Optional; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -31,13 +33,20 @@ public final class GetAuthenticationMechanismsTest { @Rule public final Expect expect = Expect.create(); private static final String TOKEN = "access_token"; + private static String originalAccessToken; @BeforeClass public static void setUpClass() { + originalAccessToken = SingleTokenFetcher.getAccessToken(); // Set the token that the client will use to authenticate to the S2A. SingleTokenFetcher.setAccessToken(TOKEN); } + @AfterClass + public static void tearDownClass() { + SingleTokenFetcher.setAccessToken(originalAccessToken); + } + @Test public void getAuthMechanisms_emptyIdentity_success() { expect diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/IntegrationTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/IntegrationTest.java index bae58f2f9ec..8ff7e462a23 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/IntegrationTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/IntegrationTest.java @@ -32,7 +32,6 @@ import io.grpc.netty.NettyServerBuilder; import io.grpc.s2a.MtlsToS2AChannelCredentials; import io.grpc.s2a.S2AChannelCredentials; -import io.grpc.s2a.handshaker.FakeS2AServer; import io.grpc.stub.StreamObserver; import io.grpc.testing.protobuf.SimpleRequest; import io.grpc.testing.protobuf.SimpleResponse; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/ProtoUtilTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/ProtoUtilTest.java index 6d134b43f7a..7ebef430833 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/ProtoUtilTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/ProtoUtilTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.truth.Expect; +import io.grpc.s2a.internal.handshaker.ProtoUtil; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/S2APrivateKeyMethodTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/S2APrivateKeyMethodTest.java index 8252aa245d7..823c869ea51 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/S2APrivateKeyMethodTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/S2APrivateKeyMethodTest.java @@ -26,7 +26,10 @@ import com.google.common.truth.Expect; import com.google.protobuf.ByteString; import io.grpc.netty.GrpcSslContexts; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AConnectionException; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2APrivateKeyMethod; +import io.grpc.s2a.internal.handshaker.S2AStub; import io.netty.handler.ssl.OpenSslPrivateKeyMethod; import io.netty.handler.ssl.SslContextBuilder; import java.io.ByteArrayInputStream; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactoryTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactoryTest.java index 404910e8be0..8d9e72e82b3 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactoryTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactoryTest.java @@ -35,11 +35,12 @@ import io.grpc.netty.GrpcHttp2ConnectionHandler; import io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; -import io.grpc.s2a.channel.S2AChannelPool; -import io.grpc.s2a.channel.S2AGrpcChannelPool; -import io.grpc.s2a.channel.S2AHandshakerServiceChannel; -import io.grpc.s2a.handshaker.S2AIdentity; -import io.grpc.s2a.handshaker.S2AProtocolNegotiatorFactory.S2AProtocolNegotiator; +import io.grpc.s2a.internal.channel.S2AChannelPool; +import io.grpc.s2a.internal.channel.S2AGrpcChannelPool; +import io.grpc.s2a.internal.channel.S2AHandshakerServiceChannel; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AProtocolNegotiatorFactory; +import io.grpc.s2a.internal.handshaker.S2AProtocolNegotiatorFactory.S2AProtocolNegotiator; import io.grpc.stub.StreamObserver; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/S2AStubTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/S2AStubTest.java index 47fd154d949..c118aaf3d6a 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/S2AStubTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/S2AStubTest.java @@ -23,9 +23,12 @@ import com.google.common.truth.Expect; import io.grpc.InsecureChannelCredentials; import io.grpc.internal.SharedResourcePool; -import io.grpc.s2a.channel.S2AChannelPool; -import io.grpc.s2a.channel.S2AGrpcChannelPool; -import io.grpc.s2a.channel.S2AHandshakerServiceChannel; +import io.grpc.s2a.internal.channel.S2AChannelPool; +import io.grpc.s2a.internal.channel.S2AGrpcChannelPool; +import io.grpc.s2a.internal.channel.S2AHandshakerServiceChannel; +import io.grpc.s2a.internal.handshaker.ConnectionClosedException; +import io.grpc.s2a.internal.handshaker.S2AConnectionException; +import io.grpc.s2a.internal.handshaker.S2AStub; import io.grpc.stub.StreamObserver; import java.io.IOException; import org.junit.Before; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/S2ATrustManagerTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/S2ATrustManagerTest.java index 384e1aba5cc..fe2bc2129e9 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/S2ATrustManagerTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/S2ATrustManagerTest.java @@ -19,7 +19,9 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AStub; +import io.grpc.s2a.internal.handshaker.S2ATrustManager; import java.io.ByteArrayInputStream; import java.security.cert.CertificateException; import java.security.cert.CertificateFactory; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/SslContextFactoryTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/SslContextFactoryTest.java index a2a66a7b563..634a2e7bbff 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/SslContextFactoryTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/SslContextFactoryTest.java @@ -20,7 +20,10 @@ import static org.junit.Assert.assertThrows; import com.google.common.truth.Expect; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AConnectionException; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AStub; +import io.grpc.s2a.internal.handshaker.SslContextFactory; import io.netty.handler.ssl.OpenSslSessionContext; import io.netty.handler.ssl.SslContext; import java.security.GeneralSecurityException; diff --git a/s2a/src/test/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenAccessTokenManagerTest.java b/s2a/src/test/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenAccessTokenManagerTest.java index 80adba07f20..fbc337c6d60 100644 --- a/s2a/src/test/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenAccessTokenManagerTest.java +++ b/s2a/src/test/java/io/grpc/s2a/handshaker/tokenmanager/SingleTokenAccessTokenManagerTest.java @@ -18,8 +18,11 @@ import static com.google.common.truth.Truth.assertThat; -import io.grpc.s2a.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.S2AIdentity; +import io.grpc.s2a.internal.handshaker.tokenmanager.AccessTokenManager; +import io.grpc.s2a.internal.handshaker.tokenmanager.SingleTokenFetcher; import java.util.Optional; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,14 +30,24 @@ @RunWith(JUnit4.class) public final class SingleTokenAccessTokenManagerTest { + private static final S2AIdentity IDENTITY = S2AIdentity.fromSpiffeId("spiffe_id"); private static final String TOKEN = "token"; + private String originalAccessToken; @Before public void setUp() { + + originalAccessToken = SingleTokenFetcher.getAccessToken(); SingleTokenFetcher.setAccessToken(null); } + @After + public void tearDown() { + SingleTokenFetcher.setAccessToken(originalAccessToken); + } + + @Test public void getDefaultToken_success() throws Exception { SingleTokenFetcher.setAccessToken(TOKEN); diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java index cfd29b1a2fd..de463f626e9 100644 --- a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java +++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -39,7 +39,9 @@ import javax.servlet.AsyncContext; import javax.servlet.ServletOutputStream; -/** Handles write actions from the container thread and the application thread. */ +/** + * Handles write actions from the container thread and the application thread. + */ final class AsyncServletOutputStreamWriter { /** @@ -58,9 +60,9 @@ final class AsyncServletOutputStreamWriter { * * *

There are two threads, the container thread (calling {@code onWritePossible()}) and the - * application thread (calling {@code runOrBuffer()}) that read and update the - * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and - * only runOrBuffer() may turn it from true to false. + * application thread (calling {@code runOrBuffer()}) that read and update the writeState. Only + * onWritePossible() may turn {@code readyAndDrained} from false to true, and only runOrBuffer() + * may turn it from true to false. */ private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); @@ -132,10 +134,13 @@ public void finest(String str, Object... params) { } /** - * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run. + * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can + * run. * - * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length. - * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously). + * @param writeAction Provides an {@link ActionItem} to write given bytes with specified + * length. + * @param isReady Indicates whether the writer can write bytes at the moment + * (asynchronously). */ @VisibleForTesting AsyncServletOutputStreamWriter( @@ -151,17 +156,23 @@ public void finest(String str, Object... params) { this.log = log; } - /** Called from application thread. */ + /** + * Called from application thread. + */ void writeBytes(byte[] bytes, int numBytes) throws IOException { runOrBuffer(writeAction.apply(bytes, numBytes)); } - /** Called from application thread. */ + /** + * Called from application thread. + */ void flush() throws IOException { runOrBuffer(flushAction); } - /** Called from application thread. */ + /** + * Called from application thread. + */ void complete() { try { runOrBuffer(completeAction); @@ -170,7 +181,9 @@ void complete() { } } - /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */ + /** + * Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. + */ void onWritePossible() throws IOException { log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready"); assureReadyAndDrainedTurnsFalse(); @@ -204,6 +217,7 @@ private void assureReadyAndDrainedTurnsFalse() { parkingThread = Thread.currentThread(); // Try to sleep for an extremely long time to avoid writeState being changed at exactly // the time when sleep time expires (in extreme scenario, such as #9917). + //LockSupport.parkNanos(Duration.ofMinutes(1).toNanos()); //Previous changes LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately } parkingThread = null; @@ -244,22 +258,28 @@ private void runOrBuffer(ActionItem actionItem) throws IOException { } } - /** Write actions, e.g. writeBytes, flush, complete. */ + /** + * Write actions, e.g. writeBytes, flush, complete. + */ @FunctionalInterface @VisibleForTesting interface ActionItem { + void run() throws IOException; } @VisibleForTesting // Lincheck test can not run with java.util.logging dependency. interface Log { + default boolean isLoggable(Level level) { - return false; + return false; } - default void fine(String str, Object...params) {} + default void fine(String str, Object... params) { + } - default void finest(String str, Object...params) {} + default void finest(String str, Object... params) { + } } private static final class WriteState { @@ -274,9 +294,9 @@ private static final class WriteState { * check of {@link javax.servlet.ServletOutputStream#isReady()} is true. * *

readyAndDrained turns from true to false when: - * {@code runOrBuffer()} exits while either the action item is written directly to the - * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()} - * right after that returns false, or the action item is buffered into the writeChain. + * {@code runOrBuffer()} exits while either the action item is written directly to the servlet + * output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()} right + * after that returns false, or the action item is buffered into the writeChain. */ final boolean readyAndDrained; @@ -285,8 +305,8 @@ private static final class WriteState { } /** - * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code - * runOrBuffer()} can set it to false. + * Only {@code onWritePossible()} can set readyAndDrained to true, and only + * {@code runOrBuffer()} can set it to false. */ @CheckReturnValue WriteState withReadyAndDrained(boolean readyAndDrained) { diff --git a/servlet/src/threadingTest/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java b/servlet/src/threadingTest/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java index 61da2bf4c69..f4677080c8c 100644 --- a/servlet/src/threadingTest/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java +++ b/servlet/src/threadingTest/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java @@ -23,6 +23,8 @@ import io.grpc.servlet.AsyncServletOutputStreamWriter.Log; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import org.jetbrains.kotlinx.lincheck.LinChecker; @@ -37,6 +39,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; + /** * Test concurrency correctness of {@link AsyncServletOutputStreamWriter} using model checking with * Lincheck. @@ -50,12 +53,14 @@ * operations are linearizable in each interleave scenario. */ @ModelCheckingCTest +@ModelCheckingCTest @OpGroupConfig(name = "update", nonParallel = true) @OpGroupConfig(name = "write", nonParallel = true) @Param(name = "keepReady", gen = BooleanGen.class) @RunWith(JUnit4.class) public class AsyncServletOutputStreamWriterConcurrencyTest extends VerifierState { - private static final int OPERATIONS_PER_THREAD = 6; + + private static final int OPERATIONS_PER_THREAD = 4; private final AsyncServletOutputStreamWriter writer; private final boolean[] keepReadyArray = new boolean[OPERATIONS_PER_THREAD]; @@ -67,7 +72,9 @@ public class AsyncServletOutputStreamWriterConcurrencyTest extends VerifierState private int consumerIndex; private int bytesWritten; - /** Public no-args constructor. */ + /** + * Public no-args constructor. + */ public AsyncServletOutputStreamWriterConcurrencyTest() { BiFunction writeAction = (bytes, numBytes) -> () -> { @@ -88,9 +95,11 @@ public AsyncServletOutputStreamWriterConcurrencyTest() { writer = new AsyncServletOutputStreamWriter( writeAction, flushAction, - () -> { }, + () -> { + }, this::isReady, - new Log() {}); + new Log() { + }); } private void writeOrFlush() { @@ -113,9 +122,8 @@ private boolean isReady() { /** * Writes a single byte with value equal to {@link #producerIndex}. * - * @param keepReady when the byte is written: - * the ServletOutputStream should remain ready if keepReady == true; - * the ServletOutputStream should become unready if keepReady == false. + * @param keepReady when the byte is written: the ServletOutputStream should remain ready if + * keepReady == true; the ServletOutputStream should become unready if keepReady == false. */ // @com.google.errorprone.annotations.Keep @Operation(group = "write") @@ -128,9 +136,8 @@ public void write(@Param(name = "keepReady") boolean keepReady) throws IOExcepti /** * Flushes the writer. * - * @param keepReady when flushing: - * the ServletOutputStream should remain ready if keepReady == true; - * the ServletOutputStream should become unready if keepReady == false. + * @param keepReady when flushing: the ServletOutputStream should remain ready if keepReady == + * true; the ServletOutputStream should become unready if keepReady == false. */ // @com.google.errorprone.annotations.Keep // called by lincheck reflectively @Operation(group = "write") @@ -140,7 +147,9 @@ public void flush(@Param(name = "keepReady") boolean keepReady) throws IOExcepti producerIndex++; } - /** If the writer is not ready, let it turn ready and call writer.onWritePossible(). */ + /** + * If the writer is not ready, let it turn ready and call writer.onWritePossible(). + */ // @com.google.errorprone.annotations.Keep // called by lincheck reflectively @Operation(group = "update") public void maybeOnWritePossible() throws IOException { @@ -156,19 +165,46 @@ protected Object extractState() { return bytesWritten; } + // @Test + // public void linCheck() { + // { + // ModelCheckingOptions options = new ModelCheckingOptions() + // .actorsBefore(0) + // .threads(2) + // .actorsPerThread(OPERATIONS_PER_THREAD) + // .actorsAfter(0) + // .addGuarantee( + // forClasses( + // ConcurrentLinkedQueue.class.getName(), + // AtomicReference.class.getName()) + // .allMethods() + // .treatAsAtomic()); + // LinChecker.check(AsyncServletOutputStreamWriterConcurrencyTest.class, options); + // System.out.println("linCheck() Executed..."); + // } + // } + @Test public void linCheck() { - ModelCheckingOptions options = new ModelCheckingOptions() - .actorsBefore(0) - .threads(2) - .actorsPerThread(OPERATIONS_PER_THREAD) - .actorsAfter(0) - .addGuarantee( - forClasses( + { + for (int i = 0; i < 5; i++) { + ModelCheckingOptions options = new ModelCheckingOptions() + .actorsBefore(0) + .threads(2) + .actorsPerThread(OPERATIONS_PER_THREAD) + .actorsAfter(0) + .addGuarantee( + forClasses( ConcurrentLinkedQueue.class.getName(), AtomicReference.class.getName()) - .allMethods() - .treatAsAtomic()); - LinChecker.check(AsyncServletOutputStreamWriterConcurrencyTest.class, options); + .allMethods() + .treatAsAtomic()); + LinChecker.check(AsyncServletOutputStreamWriterConcurrencyTest.class, options); + System.out.println("linCheck() execution: " + i); + } + } + + } + } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index ca73b7d8451..11a975f2e27 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -156,15 +156,15 @@ final class XdsNameResolver extends NameResolver { // The name might have multiple slashes so encode it before verifying. serviceAuthority = checkNotNull(name, "name"); - this.encodedServiceAuthority = - GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority)); + this.encodedServiceAuthority = + GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority)); this.overrideAuthority = overrideAuthority; this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.scheduler = checkNotNull(scheduler, "scheduler"); this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory, - "xdsClientPoolFactory") : new SharedXdsClientPoolProvider(); + "xdsClientPoolFactory") : new SharedXdsClientPoolProvider(); this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); this.random = checkNotNull(random, "random"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); @@ -209,7 +209,7 @@ public void start(Listener2 listener) { } String ldsResourceName = expandPercentS(listenerNameTemplate, replacement); if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl()) - ) { + ) { listener.onError(Status.INVALID_ARGUMENT.withDescription( "invalid listener resource URI for service authority: " + serviceAuthority)); return; @@ -372,6 +372,7 @@ static boolean matchHostName(String hostName, String pattern) { } private final class ConfigSelector extends InternalConfigSelector { + @Override public Result selectConfig(PickSubchannelArgs args) { String cluster = null; @@ -385,7 +386,7 @@ public Result selectConfig(PickSubchannelArgs args) { selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig); for (Route route : routingCfg.routes) { if (RoutingUtils.matchRoute( - route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(), + route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(), headers, random)) { selectedRoute = route; selectedOverrideConfigs.putAll(route.filterConfigOverrides()); @@ -466,6 +467,7 @@ public Result selectConfig(PickSubchannelArgs args) { final String finalCluster = cluster; final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), headers); class ClusterSelectionInterceptor implements ClientInterceptor { + @Override public ClientCall interceptCall( final MethodDescriptor method, CallOptions callOptions, @@ -555,7 +557,7 @@ private long generateHash(List hashPolicies, Metadata headers) { } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) { newHash = hashFunc.hashLong(randomChannelId); } - if (newHash != null ) { + if (newHash != null) { // Rotating the old value prevents duplicate hash rules from cancelling each other out // and preserves all of the entropy. long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0; @@ -613,6 +615,7 @@ private static String prefixedClusterSpecifierPluginName(String pluginName) { } private static final class FailingConfigSelector extends InternalConfigSelector { + private final Result result; public FailingConfigSelector(Status error) { @@ -626,6 +629,7 @@ public Result selectConfig(PickSubchannelArgs args) { } private class ResolveState implements ResourceWatcher { + private final ConfigOrError emptyServiceConfig = serviceConfigParser.parseServiceConfig(Collections.emptyMap()); private final String ldsResourceName; @@ -664,12 +668,12 @@ public void onChanged(final XdsListenerResource.LdsUpdate update) { @Override public void onError(final Status error) { - if (stopped || receivedConfig) { + if (stopped) { return; } listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( String.format("Unable to load LDS %s. xDS server returned: %s: %s", - ldsResourceName, error.getCode(), error.getDescription()))); + ldsResourceName, error.getCode(), error.getDescription()))); } @Override @@ -819,9 +823,9 @@ private void cleanUpRoutes(String error) { error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId(); listener.onResult(ResolutionResult.newBuilder() .setAttributes(Attributes.newBuilder() - .set(InternalConfigSelector.KEY, - new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId))) - .build()) + .set(InternalConfigSelector.KEY, + new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId))) + .build()) .setServiceConfig(emptyServiceConfig) .build()); receivedConfig = true; @@ -842,6 +846,7 @@ private void cleanUpRouteDiscoveryState() { * update. */ private class RouteDiscoveryState implements ResourceWatcher { + private final String resourceName; private final long httpMaxStreamDurationNano; @Nullable @@ -865,12 +870,12 @@ public void onChanged(final RdsUpdate update) { @Override public void onError(final Status error) { - if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { + if (RouteDiscoveryState.this != routeDiscoveryState) { return; } listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( String.format("Unable to load RDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription()))); + resourceName, error.getCode(), error.getDescription()))); } @Override @@ -889,10 +894,12 @@ public void onResourceDoesNotExist(final String resourceName) { * VirtualHost-level configuration for request routing. */ private static class RoutingConfig { + private final long fallbackTimeoutNano; final List routes; // Null if HttpFilter is not supported. - @Nullable final List filterChain; + @Nullable + final List filterChain; final Map virtualHostOverrideConfig; private static RoutingConfig empty = new RoutingConfig( @@ -910,6 +917,7 @@ private RoutingConfig( } private static class ClusterRefState { + final AtomicInteger refCount; @Nullable final String traditionalCluster;