diff --git a/netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java index 3d1aa83d9ff..9619eb43141 100644 --- a/netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java @@ -75,6 +75,34 @@ public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslCo return tls(sslContext, null, Optional.absent()); } + /** + * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will + * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel} + * may happen immediately, even before the TLS Handshake is complete. + */ + public static InternalProtocolNegotiator.ProtocolNegotiator ClientTls(SslContext sslContext) { + final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext); + final class ClientTlsNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { + + @Override + public AsciiString scheme() { + return negotiator.scheme(); + } + + @Override + public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { + return negotiator.newHandler(grpcHandler); + } + + @Override + public void close() { + negotiator.close(); + } + } + + return new ClientTlsNegotiator(); + } + /** * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will be * negotiated, the server TLS {@code handler} is added and writes to the {@link diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index fd4f49fbb83..0b49bfaa4fb 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -175,6 +175,7 @@ public void requestConnection() { @Override public void shutdown() { + System.out.println("calling shutdown in ClusterImplLoadBalancer"); if (dropStats != null) { dropStats.release(); } @@ -346,6 +347,7 @@ private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) { } private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) { + System.out.println("calling updateSslContextProviderSupplier in ClusterImplLoadBalancer"); UpstreamTlsContext currentTlsContext = sslContextProviderSupplier != null ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext() diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index 3a9b98ee321..42f04a2e049 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -445,6 +445,7 @@ public void onError(final Status error) { } private void shutdown() { + System.out.println("calling shutdown in XdsServerWrapper"); stopped = true; cleanUpRouteDiscoveryStates(); logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName); diff --git a/xds/src/main/java/io/grpc/xds/internal/security/SecurityProtocolNegotiators.java b/xds/src/main/java/io/grpc/xds/internal/security/SecurityProtocolNegotiators.java index c34fab74032..f09aab0152f 100644 --- a/xds/src/main/java/io/grpc/xds/internal/security/SecurityProtocolNegotiators.java +++ b/xds/src/main/java/io/grpc/xds/internal/security/SecurityProtocolNegotiators.java @@ -134,6 +134,7 @@ public AsciiString scheme() { @Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { + System.out.println("inside newHandler()"); // check if SslContextProviderSupplier was passed via attributes SslContextProviderSupplier localSslContextProviderSupplier = grpcHandler.getEagAttributes().get(ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER); @@ -202,10 +203,14 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { checkNotNull(grpcHandler, "grpcHandler"); this.grpcHandler = grpcHandler; this.sslContextProviderSupplier = sslContextProviderSupplier; + new Throwable().printStackTrace(); + System.out.println("SecurityProtocolNegotiators.ClientSecurityHandler instance=" + this); } @Override protected void handlerAdded0(final ChannelHandlerContext ctx) { + System.out.println("inside ClientSecurityHandler handlerAdded0()"); + System.out.println("ctx.name=" + ctx.name() + "ctx.handler=" + ctx.handler()); final BufferReadsHandler bufferReads = new BufferReadsHandler(); ctx.pipeline().addBefore(ctx.name(), null, bufferReads); @@ -215,6 +220,7 @@ protected void handlerAdded0(final ChannelHandlerContext ctx) { @Override public void updateSslContext(SslContext sslContext) { if (ctx.isRemoved()) { + System.out.println("ctx.isRemoved() invoked"); return; } logger.log( @@ -222,12 +228,15 @@ public void updateSslContext(SslContext sslContext) { "ClientSecurityHandler.updateSslContext authority={0}, ctx.name={1}", new Object[]{grpcHandler.getAuthority(), ctx.name()}); ChannelHandler handler = - InternalProtocolNegotiators.tls(sslContext).newHandler(grpcHandler); + InternalProtocolNegotiators.ClientTls(sslContext).newHandler(grpcHandler); // Delegate rest of handshake to TLS handler - ctx.pipeline().addAfter(ctx.name(), null, handler); - fireProtocolNegotiationEvent(ctx); - ctx.pipeline().remove(bufferReads); + //if(!ctx.isRemoved()) { + ctx.pipeline().addAfter(ctx.name(), null, handler); + fireProtocolNegotiationEvent(ctx); + ctx.pipeline().remove(bufferReads); + System.out.println("ClientSecurityHandler.updateSslContext invoked"); + //} } @Override @@ -308,14 +317,25 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc ctx.fireUserEventTriggered(pne); return; } else { - ctx.pipeline() + if (ctx.name().contains("ClientSecurityHandler")) { + ctx.pipeline() + .replace( + this, + null, + new ClientSecurityHandler( + grpcHandler, sslContextProviderSupplier)); + ctx.fireUserEventTriggered(pne); + return; + } else { + ctx.pipeline() .replace( this, null, new ServerSecurityHandler( grpcHandler, sslContextProviderSupplier)); - ctx.fireUserEventTriggered(pne); - return; + ctx.fireUserEventTriggered(pne); + return; + } } } else { super.userEventTriggered(ctx, evt); @@ -345,10 +365,13 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { checkNotNull(grpcHandler, "grpcHandler"); this.grpcHandler = grpcHandler; this.sslContextProviderSupplier = sslContextProviderSupplier; + new Throwable().printStackTrace(); + System.out.println("SecurityProtocolNegotiators.ServerSecurityHandler instance=" + this); } @Override protected void handlerAdded0(final ChannelHandlerContext ctx) { + System.out.println("inside ServerSecurityHandler handlerAdded0()"); final BufferReadsHandler bufferReads = new BufferReadsHandler(); ctx.pipeline().addBefore(ctx.name(), null, bufferReads); diff --git a/xds/src/main/java/io/grpc/xds/internal/security/certprovider/FileWatcherCertificateProvider.java b/xds/src/main/java/io/grpc/xds/internal/security/certprovider/FileWatcherCertificateProvider.java index 304124cc7f2..c6dd52fa2fb 100644 --- a/xds/src/main/java/io/grpc/xds/internal/security/certprovider/FileWatcherCertificateProvider.java +++ b/xds/src/main/java/io/grpc/xds/internal/security/certprovider/FileWatcherCertificateProvider.java @@ -90,6 +90,7 @@ final class FileWatcherCertificateProvider extends CertificateProvider implement @Override public void start() { scheduleNextRefreshCertificate(/* delayInSeconds= */0); + System.out.println("Executed start()"); } @Override @@ -101,6 +102,8 @@ public synchronized void close() { scheduledFuture = null; } getWatcher().close(); + //System.out.println("FWCP close()=" + this); + System.out.println("Executed close()"); } private synchronized void scheduleNextRefreshCertificate(long delayInSeconds) {