Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: listener type validation #11933

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsListenerResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@
if (proto.getAddress().hasSocketAddress()) {
SocketAddress socketAddress = proto.getAddress().getSocketAddress();
address = socketAddress.getAddress();
if (address.trim().isEmpty()) {
throw new ResourceInvalidException("Invalid address: Empty address is not allowed.");

Check warning on line 169 in xds/src/main/java/io/grpc/xds/XdsListenerResource.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsListenerResource.java#L169

Added line #L169 was not covered by tests
}
if (socketAddress.hasNamedPort()) {
throw new ResourceInvalidException("NAMED_PORT is not supported in gRPC.");

Check warning on line 172 in xds/src/main/java/io/grpc/xds/XdsListenerResource.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsListenerResource.java#L172

Added line #L172 was not covered by tests
}
switch (socketAddress.getPortSpecifierCase()) {
case NAMED_PORT:
address = address + ":" + socketAddress.getNamedPort();
Expand Down
4 changes: 4 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,10 @@ public void onChanged(final XdsListenerResource.LdsUpdate update) {
}
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
if (httpConnectionManager == null) {
onResourceDoesNotExist("API Listener: httpConnectionManager");
return;
}
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
String rdsName = httpConnectionManager.rdsName();
ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
Expand Down
45 changes: 42 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InternalServerInterceptors;
Expand Down Expand Up @@ -57,6 +59,7 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -383,7 +386,29 @@
return;
}
logger.log(Level.FINEST, "Received Lds update {0}", update);
checkNotNull(update.listener(), "update");
if (update.listener() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit test for this case as well.

onResourceDoesNotExist("Non-API");
return;

Check warning on line 391 in xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java#L390-L391

Added lines #L390 - L391 were not covered by tests
}

boolean isUdpListener = false;
for (FilterChain filterChain : update.listener().filterChains()) {
String transportProtocol = filterChain.filterChainMatch().transportProtocol();
if (transportProtocol != null && !"raw_buffer".equals(transportProtocol)) {
isUdpListener = true;
break;
}
}

String ldsAddress = update.listener().address();
if (!isUdpListener && ldsAddress != null && !ipAddressesMatch(ldsAddress)) {
handleConfigNotFoundOrMismatch(
Status.UNKNOWN.withDescription(
String.format(
"Listener address mismatch: expected %s, but got %s.",
listenerAddress, ldsAddress)).asException());
return;
}
if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
Expand Down Expand Up @@ -432,6 +457,20 @@
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);

InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;

Check warning on line 468 in xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java#L468

Added line #L468 was not covered by tests
}
Comment on lines +459 to +462
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a unit test for this if block?


return listenerIp.equals(ldsIp);
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (stopped) {
Expand All @@ -440,7 +479,7 @@
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
xdsClient.getBootstrapInfo().node().getId())).asException();
handleConfigNotFound(statusException);
handleConfigNotFoundOrMismatch(statusException);
}

@Override
Expand Down Expand Up @@ -673,7 +712,7 @@
};
}

private void handleConfigNotFound(StatusException exception) {
private void handleConfigNotFoundOrMismatch(StatusException exception) {
cleanUpRouteDiscoveryStates();
shutdownActiveFilters();
List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void run() {
EnvoyServerProtoData.Listener tcpListener =
EnvoyServerProtoData.Listener.create(
"listener1",
"10.1.2.3",
"0.0.0.0:7000",
ImmutableList.of(),
null);
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(tcpListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ private void buildServer(
tlsContextManagerForServer = new TlsContextManagerImpl(bootstrapInfoForServer);
XdsServerWrapper xdsServer = (XdsServerWrapper) builder.build();
SettableFuture<Throwable> startFuture = startServerAsync(xdsServer);
EnvoyServerProtoData.Listener listener = buildListener("listener1", "10.1.2.3",
EnvoyServerProtoData.Listener listener = buildListener("listener1", "0.0.0.0:0",
downstreamTlsContext, tlsContextManagerForServer);
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener);
xdsClient.deliverLdsUpdate(listenerUpdate);
Expand Down
12 changes: 9 additions & 3 deletions xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsServerTestHelper.buildTestListener;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand All @@ -26,13 +27,15 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.BindableService;
import io.grpc.InsecureServerCredentials;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClient;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory;
import io.grpc.xds.internal.security.CommonTlsContextTestsUtil;
Expand Down Expand Up @@ -221,10 +224,13 @@ public void xdsServer_startError()
buildServer(mockXdsServingStatusListener);
Future<Throwable> future = startServerAsync();
// create port conflict for start to fail
XdsServerTestHelper.generateListenerUpdate(
xdsClient,
EnvoyServerProtoData.Listener listener = buildTestListener(
"listener1", "0.0.0.0:" + port, ImmutableList.of(),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
tlsContextManager);
null, tlsContextManager);
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener);
xdsClient.deliverLdsUpdate(listenerUpdate);

Throwable exception = future.get(5, TimeUnit.SECONDS);
assertThat(exception).isInstanceOf(IOException.class);
assertThat(exception).hasMessageThat().contains("Failed to bind");
Expand Down
5 changes: 3 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class XdsServerTestHelper {
static void generateListenerUpdate(FakeXdsClient xdsClient,
EnvoyServerProtoData.DownstreamTlsContext tlsContext,
TlsContextManager tlsContextManager) {
EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "10.1.2.3",
EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "0.0.0.0:0",
ImmutableList.of(), tlsContext, null, tlsContextManager);
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener);
xdsClient.deliverLdsUpdate(listenerUpdate);
Expand All @@ -85,7 +85,8 @@ static void generateListenerUpdate(
EnvoyServerProtoData.DownstreamTlsContext tlsContext,
EnvoyServerProtoData.DownstreamTlsContext tlsContextForDefaultFilterChain,
TlsContextManager tlsContextManager) {
EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "10.1.2.3", sourcePorts,
EnvoyServerProtoData.Listener listener = buildTestListener(
"listener1", "0.0.0.0:7000", sourcePorts,
tlsContext, tlsContextForDefaultFilterChain, tlsContextManager);
LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener);
xdsClient.deliverLdsUpdate(listenerUpdate);
Expand Down
44 changes: 43 additions & 1 deletion xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
import io.grpc.xds.EnvoyServerProtoData.CidrRange;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch;
import io.grpc.xds.EnvoyServerProtoData.Listener;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.StatefulFilter.Config;
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
import io.grpc.xds.XdsServerTestHelper.FakeXdsClient;
Expand Down Expand Up @@ -538,6 +540,46 @@ public void run() {
verify(mockServer).start();
}

@Test
public void onChanged_listenerAddressMismatch()
throws ExecutionException, InterruptedException, TimeoutException {

when(mockBuilder.build()).thenReturn(mockServer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already done in @Before. Delete it?

xdsServerWrapper = new XdsServerWrapper("10.1.2.3:1", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
filterRegistry, executor.getScheduledExecutorService());

final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thread is going to be hanging and outlive the test. Run xdsServerWrapper.shutdownNow() at the end to let the start() return. Ideally you'd wait for the start future to complete as well (which will throw an exception), so you know the thread has (essentially) returned.

Copy link
Member Author

@shivaspeaks shivaspeaks Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a teardown happening after every test in @After which does xdsServerWrapper.shutdownNow(). So I doubt if we should be adding in the test as well!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, shutdownNow can be called twice. I'm not wild about the lifecycles in the test as we won't notice if it is broken. But I think a lot of these tests are bad already, so let's call it good enough as-is.

@Override
public void run() {
try {
start.set(xdsServerWrapper.start());
} catch (Exception ex) {
start.setException(ex);
}
}
});
String ldsResource = xdsClient.ldsResource.get(5, TimeUnit.SECONDS);
assertThat(ldsResource).isEqualTo("grpc/server?udpa.resource.listening_address=10.1.2.3:1");

VirtualHost virtualHost =
VirtualHost.create(
"virtual-host", Collections.singletonList("auth"), new ArrayList<Route>(),
ImmutableMap.<String, FilterConfig>of());
HttpConnectionManager httpConnectionManager = HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), new ArrayList<NamedFilterConfig>());
EnvoyServerProtoData.FilterChain filterChain = EnvoyServerProtoData.FilterChain.create(
"filter-chain-foo", createMatch(), httpConnectionManager, createTls(),
mock(TlsContextManager.class));

LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(
Listener.create("listener", "20.3.4.5:1",
ImmutableList.copyOf(Collections.singletonList(filterChain)), null));
xdsClient.deliverLdsUpdate(listenerUpdate);
verify(listener, timeout(10000)).onNotServing(any());
}

@Test
public void discoverState_rds() throws Exception {
final SettableFuture<Server> start = SettableFuture.create();
Expand Down Expand Up @@ -1820,7 +1862,7 @@ private static FilterChainMatch createMatch() {
EnvoyServerProtoData.ConnectionSourceType.ANY,
ImmutableList.of(),
ImmutableList.of(),
"");
"raw_buffer");
}

private static FilterChainMatch createMatchSrcIp(String srcCidr) {
Expand Down