xds: Implement XdsServingStatusListener as per the new xDS server gRFC (#7876)

This commit is contained in:
sanjaypujare 2021-02-12 09:22:26 -08:00 committed by GitHub
parent 7b70161eef
commit 8030c3a11d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 90 deletions

View File

@ -104,7 +104,6 @@ public final class XdsClientWrapperForServerSds {
throw new XdsInitializationException("No management server provided by bootstrap");
}
} catch (XdsInitializationException e) {
reportError(Status.fromThrowable(e));
throw new IOException(e);
}
Node node = bootstrapInfo.getNode();
@ -116,9 +115,7 @@ public final class XdsClientWrapperForServerSds {
newServerApi = serverInfo.isUseProtocolV3() && experimentalNewServerApiEnvVar;
String grpcServerResourceId = bootstrapInfo.getGrpcServerResourceId();
if (newServerApi && grpcServerResourceId == null) {
reportError(
Status.INVALID_ARGUMENT.withDescription("missing grpc_server_resource_name_id value"));
throw new IOException("missing grpc_server_resource_name_id value");
throw new IOException("missing grpc_server_resource_name_id value in xds bootstrap");
}
XdsClient xdsClientImpl =
new ServerXdsClient(
@ -152,14 +149,14 @@ public final class XdsClientWrapperForServerSds {
public void onResourceDoesNotExist(String resourceName) {
logger.log(Level.WARNING, "Resource {0} is unavailable", resourceName);
curListener = null;
reportError(Status.NOT_FOUND.withDescription(resourceName));
reportError(Status.NOT_FOUND.withDescription(resourceName).asException());
}
@Override
public void onError(Status error) {
logger.log(
Level.WARNING, "ListenerWatcher in XdsClientWrapperForServerSds: {0}", error);
reportError(error);
reportError(error.asException());
}
};
xdsClient.watchListenerData(port, listenerWatcher);
@ -225,9 +222,9 @@ public final class XdsClientWrapperForServerSds {
}
}
private void reportError(Status status) {
private void reportError(Throwable throwable) {
for (ServerWatcher watcher : getServerWatchers()) {
watcher.onError(status);
watcher.onError(throwable);
}
}
@ -249,7 +246,7 @@ public final class XdsClientWrapperForServerSds {
public interface ServerWatcher {
/** Called to report errors from the control plane including "not found". */
void onError(Status error);
void onError(Throwable throwable);
/** Called to report successful receipt of server config. */
void onSuccess(DownstreamTlsContext downstreamTlsContext);

View File

@ -16,6 +16,7 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
@ -26,7 +27,6 @@ import io.grpc.Internal;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.Status;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.xds.internal.sds.SdsProtocolNegotiators;
@ -42,12 +42,13 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
private final NettyServerBuilder delegate;
private final int port;
private ErrorNotifier errorNotifier;
private XdsServingStatusListener xdsServingStatusListener;
private AtomicBoolean isServerBuilt = new AtomicBoolean(false);
private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
this.port = port;
xdsServingStatusListener = new DefaultListener("port:" + port);
}
@Override
@ -56,9 +57,11 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
return delegate;
}
/** Set the {@link ErrorNotifier}. Pass null to unset a previously set value. */
public XdsServerBuilder errorNotifier(ErrorNotifier errorNotifier) {
this.errorNotifier = errorNotifier;
/** Set the {@link XdsServingStatusListener}. */
public XdsServerBuilder xdsServingStatusListener(
XdsServingStatusListener xdsServingStatusListener) {
this.xdsServingStatusListener =
checkNotNull(xdsServingStatusListener, "xdsServingStatusListener");
return this;
}
@ -91,7 +94,7 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder()
.set(SdsProtocolNegotiators.SERVER_XDS_CLIENT, xdsClient)
.build());
return new ServerWrapperForXds(delegate.build(), xdsClient, errorNotifier);
return new ServerWrapperForXds(delegate.build(), xdsClient, xdsServingStatusListener);
}
public ServerBuilder<?> transportBuilder() {
@ -99,8 +102,40 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui
}
/** Watcher to receive error notifications from xDS control plane during {@code start()}. */
public interface ErrorNotifier {
public interface XdsServingStatusListener {
void onError(Status error);
/** Callback invoked when server begins serving. */
void onServing();
/** Callback invoked when server is forced to be "not serving" due to an error.
* @param throwable cause of the error
*/
void onNotServing(Throwable throwable);
}
/** Default implementation that logs at WARNING level. */
private static class DefaultListener implements XdsServingStatusListener {
XdsLogger xdsLogger;
boolean notServing;
DefaultListener(String prefix) {
xdsLogger = XdsLogger.withPrefix(prefix);
notServing = true;
}
/** Log calls to onServing() following a call to onNotServing() at WARNING level. */
@Override
public void onServing() {
if (notServing) {
notServing = false;
xdsLogger.log(XdsLogger.XdsLogLevel.WARNING, "Entering serving state.");
}
}
@Override
public void onNotServing(Throwable throwable) {
xdsLogger.log(XdsLogger.XdsLogLevel.WARNING, throwable.getMessage());
notServing = true;
}
}
}

View File

@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.xds.EnvoyServerProtoData;
import io.grpc.xds.XdsClientWrapperForServerSds;
import io.grpc.xds.XdsServerBuilder;
@ -45,7 +44,7 @@ import javax.annotation.Nullable;
public final class ServerWrapperForXds extends Server {
private final Server delegate;
private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
@Nullable XdsServerBuilder.ErrorNotifier errorNotifier;
private XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener;
@Nullable XdsClientWrapperForServerSds.ServerWatcher serverWatcher;
private AtomicBoolean started = new AtomicBoolean();
@ -53,11 +52,12 @@ public final class ServerWrapperForXds extends Server {
public ServerWrapperForXds(
Server delegate,
XdsClientWrapperForServerSds xdsClientWrapperForServerSds,
@Nullable XdsServerBuilder.ErrorNotifier errorNotifier) {
XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener) {
this.delegate = checkNotNull(delegate, "delegate");
this.xdsClientWrapperForServerSds =
checkNotNull(xdsClientWrapperForServerSds, "xdsClientWrapperForServerSds");
this.errorNotifier = errorNotifier;
this.xdsServingStatusListener =
checkNotNull(xdsServingStatusListener, "xdsServingStatusListener");
}
@Override
@ -77,6 +77,7 @@ public final class ServerWrapperForXds extends Server {
throw new RuntimeException(ex);
}
delegate.start();
xdsServingStatusListener.onServing();
return this;
}
@ -86,15 +87,12 @@ public final class ServerWrapperForXds extends Server {
serverWatcher =
new XdsClientWrapperForServerSds.ServerWatcher() {
@Override
public void onError(Status error) {
if (errorNotifier != null) {
errorNotifier.onError(error);
}
public void onError(Throwable throwable) {
xdsServingStatusListener.onNotServing(throwable);
}
@Override
public void onSuccess(EnvoyServerProtoData.DownstreamTlsContext downstreamTlsContext) {
removeServerWatcher();
settableFuture.set(downstreamTlsContext);
}
};

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessSocketAddress;
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
@ -171,12 +172,19 @@ public class XdsClientWrapperForServerSdsTestMisc {
mock(XdsClientWrapperForServerSds.ServerWatcher.class);
xdsClientWrapperForServerSds.addServerWatcher(mockServerWatcher);
registeredWatcher.onError(Status.INTERNAL);
verify(mockServerWatcher).onError(eq(Status.INTERNAL));
ArgumentCaptor<Throwable> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Throwable throwable = argCaptor.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
Status captured = ((StatusException)throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.INTERNAL);
reset(mockServerWatcher);
registeredWatcher.onResourceDoesNotExist("not-found Error");
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
ArgumentCaptor<Throwable> argCaptor1 = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor1.capture());
throwable = argCaptor1.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
captured = ((StatusException)throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(captured.getDescription()).isEqualTo("not-found Error");
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
@ -203,14 +211,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
}
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
assertThat(captured.getCode()).isEqualTo(Status.Code.UNKNOWN);
assertThat(captured.getCause()).isInstanceOf(XdsInitializationException.class);
assertThat(captured.getCause())
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
verify(mockServerWatcher, never()).onError(any(Throwable.class));
}
private DownstreamTlsContext sendListenerUpdate(

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InsecureServerCredentials;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import io.grpc.xds.internal.sds.ServerWrapperForXds;
@ -62,14 +63,15 @@ public class XdsServerBuilderTest {
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
private XdsServerBuilder buildServer(
XdsServerBuilder.ErrorNotifier errorNotifier, boolean injectMockXdsClient)
XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener,
boolean injectMockXdsClient)
throws IOException {
port = XdsServerTestHelper.findFreePort();
XdsServerBuilder builder =
XdsServerBuilder.forPort(
port, XdsServerCredentials.create(InsecureServerCredentials.create()));
if (errorNotifier != null) {
builder = builder.errorNotifier(errorNotifier);
if (xdsServingStatusListener != null) {
builder = builder.xdsServingStatusListener(xdsServingStatusListener);
}
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(port);
if (injectMockXdsClient) {
@ -82,7 +84,9 @@ public class XdsServerBuilderTest {
}
private void verifyServer(
Future<Throwable> future, XdsServerBuilder.ErrorNotifier mockErrorNotifier)
Future<Throwable> future,
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener,
Status notServingStatus)
throws InterruptedException, ExecutionException, TimeoutException {
if (future != null) {
Throwable exception = future.get(5, TimeUnit.SECONDS);
@ -93,10 +97,18 @@ public class XdsServerBuilderTest {
InetSocketAddress socketAddress = (InetSocketAddress) list.get(0);
assertThat(socketAddress.getAddress().isAnyLocalAddress()).isTrue();
assertThat(socketAddress.getPort()).isEqualTo(port);
if (mockErrorNotifier != null) {
verify(mockErrorNotifier, never()).onError(any(Status.class));
if (mockXdsServingStatusListener != null) {
if (notServingStatus != null) {
ArgumentCaptor<Throwable> argCaptor = ArgumentCaptor.forClass(null);
verify(mockXdsServingStatusListener, times(1)).onNotServing(argCaptor.capture());
Throwable throwable = argCaptor.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
assertThat(((StatusException) throwable).getStatus()).isEqualTo(notServingStatus);
} else {
verify(mockXdsServingStatusListener, never()).onNotServing(any(Throwable.class));
verify(mockXdsServingStatusListener, times(1)).onServing();
}
}
assertThat(xdsClientWrapperForServerSds.serverWatchers).isEmpty();
}
private void verifyShutdown() throws InterruptedException {
@ -135,7 +147,7 @@ public class XdsServerBuilderTest {
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
verifyServer(future, null);
verifyServer(future, null, null);
verifyShutdown();
}
@ -155,125 +167,128 @@ public class XdsServerBuilderTest {
} catch (IllegalStateException expected) {
assertThat(expected).hasMessageThat().contains("Already started");
}
verifyServer(null,null);
verifyServer(null,null, null);
verifyShutdown();
}
@Test
public void xdsServerStartAndShutdownWithErrorNotifier()
public void xdsServerStartAndShutdownWithXdsServingStatusListener()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
buildServer(mockErrorNotifier, true);
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
Future<Throwable> future = startServerAsync();
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
verifyServer(future, mockErrorNotifier);
verifyServer(future, mockXdsServingStatusListener, null);
verifyShutdown();
}
@Test
public void xdsServer_serverWatcher()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
buildServer(mockErrorNotifier, true);
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
Future<Throwable> future = startServerAsync();
listenerWatcher.onError(Status.ABORTED);
verify(mockErrorNotifier).onError(Status.ABORTED);
ArgumentCaptor<Throwable> argCaptor = ArgumentCaptor.forClass(null);
verify(mockXdsServingStatusListener).onNotServing(argCaptor.capture());
Throwable throwable = argCaptor.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
Status captured = ((StatusException) throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.ABORTED);
assertThat(xdsClientWrapperForServerSds.serverWatchers).hasSize(1);
assertThat(future.isDone()).isFalse();
reset(mockErrorNotifier);
reset(mockXdsServingStatusListener);
listenerWatcher.onResourceDoesNotExist("not found error");
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockErrorNotifier).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
argCaptor = ArgumentCaptor.forClass(null);
verify(mockXdsServingStatusListener).onNotServing(argCaptor.capture());
throwable = argCaptor.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
captured = ((StatusException) throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(captured.getDescription()).isEqualTo("not found error");
assertThat(xdsClientWrapperForServerSds.serverWatchers).hasSize(1);
assertThat(future.isDone()).isFalse();
reset(mockErrorNotifier);
reset(mockXdsServingStatusListener);
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
verifyServer(future, mockErrorNotifier);
verifyServer(future, mockXdsServingStatusListener, null);
verifyShutdown();
}
@Test
public void xdsServer_startError()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
buildServer(mockErrorNotifier, true);
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
Future<Throwable> future = startServerAsync();
// create port conflict for start to fail
ServerSocket serverSocket = new ServerSocket(port);
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
Throwable exception = future.get(5, TimeUnit.SECONDS);
assertThat(exception).isInstanceOf(IOException.class);
assertThat(exception).hasMessageThat().contains("Failed to bind");
verify(mockErrorNotifier, never()).onError(any(Status.class));
verify(mockXdsServingStatusListener, never()).onNotServing(any(Throwable.class));
serverSocket.close();
}
@Test
public void xdsServerWithoutMockXdsClient_startError()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
buildServer(mockErrorNotifier, false);
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, false);
try {
xdsServer.start();
fail("exception expected");
} catch (IOException expected) {
assertThat(expected)
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
assertThat(expected).hasMessageThat().contains("Cannot find bootstrap configuration");
}
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockErrorNotifier).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
assertThat(captured.getCode()).isEqualTo(Status.Code.UNKNOWN);
assertThat(captured.getCause()).isInstanceOf(XdsInitializationException.class);
assertThat(captured.getCause())
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
verify(mockXdsServingStatusListener, never()).onNotServing(any(Throwable.class));
}
@Test
public void xdsServerStartSecondUpdateAndError()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
buildServer(mockErrorNotifier, true);
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
Future<Throwable> future = startServerAsync();
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
port,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"),
null);
verify(mockErrorNotifier, never()).onError(any(Status.class));
verifyServer(future, mockErrorNotifier);
verify(mockXdsServingStatusListener, never()).onNotServing(any(Throwable.class));
verifyServer(future, mockXdsServingStatusListener, null);
listenerWatcher.onError(Status.ABORTED);
verify(mockErrorNotifier, never()).onError(any(Status.class));
verifyServer(null, mockErrorNotifier);
verifyServer(null, mockXdsServingStatusListener, Status.ABORTED);
verifyShutdown();
}
@Test
public void xdsServer_2ndBuild_expectException() throws IOException {
XdsServerBuilder.ErrorNotifier mockErrorNotifier = mock(XdsServerBuilder.ErrorNotifier.class);
XdsServerBuilder builder = buildServer(mockErrorNotifier, true);
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
XdsServerBuilder builder = buildServer(mockXdsServingStatusListener, true);
try {
builder.build();
fail("exception expected");