From 0eefa5263b79548dc2b62e192e15b15428bd6241 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 20 Jul 2018 16:36:37 -0700 Subject: [PATCH] inprocess: Add maxInboundMetadataSize --- .../inprocess/InProcessChannelBuilder.java | 33 ++++- .../io/grpc/inprocess/InProcessServer.java | 12 +- .../inprocess/InProcessServerBuilder.java | 26 +++- .../io/grpc/inprocess/InProcessTransport.java | 118 +++++++++++++++--- .../grpc/inprocess/InProcessServerTest.java | 9 +- .../inprocess/InProcessTransportTest.java | 29 +---- 6 files changed, 175 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 8606e75ac6..92be845d3b 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -16,6 +16,7 @@ package io.grpc.inprocess; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ExperimentalApi; @@ -67,6 +68,7 @@ public final class InProcessChannelBuilder extends private final String name; private ScheduledExecutorService scheduledExecutorService; + private int maxInboundMetadataSize = Integer.MAX_VALUE; private InProcessChannelBuilder(String name) { super(new InProcessSocketAddress(name), "localhost"); @@ -145,10 +147,30 @@ public final class InProcessChannelBuilder extends return this; } + /** + * Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables + * the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}). + * + *

There is potential for performance penalty when this setting is enabled, as the Metadata + * must actually be serialized. Since the current implementation of Metadata pre-serializes, it's + * currently negligible. But Metadata is free to change its implementation. + * + * @param bytes the maximum size of received metadata + * @return this + * @throws IllegalArgumentException if bytes is non-positive + * @since 1.17.0 + */ + public InProcessChannelBuilder maxInboundMetadataSize(int bytes) { + checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0"); + this.maxInboundMetadataSize = bytes; + return this; + } + @Override @Internal protected ClientTransportFactory buildTransportFactory() { - return new InProcessClientTransportFactory(name, scheduledExecutorService); + return new InProcessClientTransportFactory( + name, scheduledExecutorService, maxInboundMetadataSize); } /** @@ -158,14 +180,18 @@ public final class InProcessChannelBuilder extends private final String name; private final ScheduledExecutorService timerService; private final boolean useSharedTimer; + private final int maxInboundMetadataSize; private boolean closed; private InProcessClientTransportFactory( - String name, @Nullable ScheduledExecutorService scheduledExecutorService) { + String name, + @Nullable ScheduledExecutorService scheduledExecutorService, + int maxInboundMetadataSize) { this.name = name; useSharedTimer = scheduledExecutorService == null; timerService = useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; + this.maxInboundMetadataSize = maxInboundMetadataSize; } @Override @@ -174,7 +200,8 @@ public final class InProcessChannelBuilder extends if (closed) { throw new IllegalStateException("The transport factory is closed."); } - return new InProcessTransport(name, options.getAuthority(), options.getUserAgent()); + return new InProcessTransport( + name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent()); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 93709a07a9..3362cac70c 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -44,6 +44,7 @@ final class InProcessServer implements InternalServer { } private final String name; + private final int maxInboundMetadataSize; private final List streamTracerFactories; private ServerListener listener; private boolean shutdown; @@ -56,10 +57,11 @@ final class InProcessServer implements InternalServer { private ScheduledExecutorService scheduler; InProcessServer( - String name, ObjectPool schedulerPool, + InProcessServerBuilder builder, List streamTracerFactories) { - this.name = name; - this.schedulerPool = schedulerPool; + this.name = builder.name; + this.schedulerPool = builder.schedulerPool; + this.maxInboundMetadataSize = builder.maxInboundMetadataSize; this.streamTracerFactories = Collections.unmodifiableList(checkNotNull(streamTracerFactories, "streamTracerFactories")); } @@ -112,6 +114,10 @@ final class InProcessServer implements InternalServer { return schedulerPool; } + int getMaxInboundMetadataSize() { + return maxInboundMetadataSize; + } + List getStreamTracerFactories() { return streamTracerFactories; } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 205eb4bddf..4e461a544e 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -91,8 +91,9 @@ public final class InProcessServerBuilder return UUID.randomUUID().toString(); } - private final String name; - private ObjectPool schedulerPool = + final String name; + int maxInboundMetadataSize = Integer.MAX_VALUE; + ObjectPool schedulerPool = SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); private InProcessServerBuilder(String name) { @@ -123,10 +124,29 @@ public final class InProcessServerBuilder return this; } + /** + * Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables + * the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}). + * + *

There is potential for performance penalty when this setting is enabled, as the Metadata + * must actually be serialized. Since the current implementation of Metadata pre-serializes, it's + * currently negligible. But Metadata is free to change its implementation. + * + * @param bytes the maximum size of received metadata + * @return this + * @throws IllegalArgumentException if bytes is non-positive + * @since 1.17.0 + */ + public InProcessServerBuilder maxInboundMetadataSize(int bytes) { + Preconditions.checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0"); + this.maxInboundMetadataSize = bytes; + return this; + } + @Override protected InProcessServer buildTransportServer( List streamTracerFactories) { - return new InProcessServer(name, schedulerPool, streamTracerFactories); + return new InProcessServer(this, streamTracerFactories); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 6f9c3b3cf5..ec25bc3430 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -32,6 +32,7 @@ import io.grpc.DecompressorRegistry; import io.grpc.Grpc; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; +import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.SecurityLevel; @@ -74,8 +75,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); private final String name; + private final int clientMaxInboundMetadataSize; private final String authority; private final String userAgent; + private int serverMaxInboundMetadataSize; private ObjectPool serverSchedulerPool; private ScheduledExecutorService serverScheduler; private ServerTransportListener serverTransportListener; @@ -108,8 +111,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } }; - public InProcessTransport(String name, String authority, String userAgent) { + public InProcessTransport( + String name, int maxInboundMetadataSize, String authority, String userAgent) { this.name = name; + this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent); } @@ -120,6 +125,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { + serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); serverSchedulerPool = server.getScheduledExecutorServicePool(); serverScheduler = serverSchedulerPool.getObject(); serverStreamTracerFactories = server.getStreamTracerFactories(); @@ -159,20 +165,43 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans public synchronized ClientStream newStream( final MethodDescriptor method, final Metadata headers, final CallOptions callOptions) { if (shutdownStatus != null) { - final Status capturedStatus = shutdownStatus; - final StatsTraceContext statsTraceCtx = - StatsTraceContext.newClientContext(callOptions, headers); - return new NoopClientStream() { + return failedClientStream( + StatsTraceContext.newClientContext(callOptions, headers), shutdownStatus); + } + + headers.put(GrpcUtil.USER_AGENT_KEY, userAgent); + + if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) { + int metadataSize = metadataSize(headers); + if (metadataSize > serverMaxInboundMetadataSize) { + // Other transports would compute a status with: + // GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */); + // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're + // in-process. We go ahead and make a Status, which may need to be updated if + // statuscodes.md is updated. + Status status = Status.RESOURCE_EXHAUSTED.withDescription( + String.format( + "Request metadata larger than %d: %d", + serverMaxInboundMetadataSize, + metadataSize)); + return failedClientStream( + StatsTraceContext.newClientContext(callOptions, headers), status); + } + } + + return new InProcessStream(method, headers, callOptions, authority).clientStream; + } + + private ClientStream failedClientStream( + final StatsTraceContext statsTraceCtx, final Status status) { + return new NoopClientStream() { @Override public void start(ClientStreamListener listener) { statsTraceCtx.clientOutboundHeaders(); - statsTraceCtx.streamClosed(capturedStatus); - listener.closed(capturedStatus, new Metadata()); + statsTraceCtx.streamClosed(status); + listener.closed(status, new Metadata()); } }; - } - headers.put(GrpcUtil.USER_AGENT_KEY, userAgent); - return new InProcessStream(method, headers, callOptions, authority).clientStream; } @Override @@ -281,6 +310,21 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } } + private static int metadataSize(Metadata metadata) { + byte[][] serialized = InternalMetadata.serialize(metadata); + if (serialized == null) { + return 0; + } + // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2. We could use something + // different, but it's "sane." + long size = 0; + for (int i = 0; i < serialized.length; i += 2) { + size += 32 + serialized[i].length + serialized[i + 1].length; + } + size = Math.min(size, Integer.MAX_VALUE); + return (int) size; + } + private class InProcessStream { private final InProcessClientStream clientStream; private final InProcessServerStream serverStream; @@ -424,12 +468,32 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } @Override - public synchronized void writeHeaders(Metadata headers) { - if (closed) { - return; + public void writeHeaders(Metadata headers) { + if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { + int metadataSize = metadataSize(headers); + if (metadataSize > clientMaxInboundMetadataSize) { + Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC"); + clientStream.serverClosed(serverStatus, serverStatus); + // Other transports provide very little information in this case. We go ahead and make a + // Status, which may need to be updated if statuscodes.md is updated. + Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription( + String.format( + "Response header metadata larger than %d: %d", + clientMaxInboundMetadataSize, + metadataSize)); + notifyClientClose(failedStatus, new Metadata()); + return; + } + } + + synchronized (this) { + if (closed) { + return; + } + + clientStream.statsTraceCtx.clientInboundHeaders(); + clientStreamListener.headersRead(headers); } - clientStream.statsTraceCtx.clientInboundHeaders(); - clientStreamListener.headersRead(headers); } @Override @@ -440,6 +504,30 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans // calling internalCancel(). clientStream.serverClosed(Status.OK, status); + if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { + int statusSize = status.getDescription() == null ? 0 : status.getDescription().length(); + // Go ahead and throw in the status description's length, since that could be very long. + int metadataSize = metadataSize(trailers) + statusSize; + if (metadataSize > clientMaxInboundMetadataSize) { + // Override the status for the client, but not the server. Transports do not guarantee + // notifying the server of the failure. + + // Other transports provide very little information in this case. We go ahead and make a + // Status, which may need to be updated if statuscodes.md is updated. + status = Status.RESOURCE_EXHAUSTED.withDescription( + String.format( + "Response header metadata larger than %d: %d", + clientMaxInboundMetadataSize, + metadataSize)); + trailers = new Metadata(); + } + } + + notifyClientClose(status, trailers); + } + + /** clientStream.serverClosed() must be called before this method */ + private void notifyClientClose(Status status, Metadata trailers) { Status clientStatus = stripCause(status); synchronized (this) { if (closed) { diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java index 2a45f15d9e..c988770541 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java @@ -19,12 +19,10 @@ package io.grpc.inprocess; import com.google.common.truth.Truth; import io.grpc.ServerStreamTracer; import io.grpc.internal.FakeClock; -import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.SharedResourcePool; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; @@ -33,12 +31,12 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class InProcessServerTest { + private InProcessServerBuilder builder = InProcessServerBuilder.forName("name"); @Test public void getPort_notStarted() throws Exception { InProcessServer s = - new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), - Collections.emptyList()); + new InProcessServer(builder, Collections.emptyList()); Truth.assertThat(s.getPort()).isEqualTo(-1); } @@ -63,8 +61,9 @@ public class InProcessServerTest { } RefCountingObjectPool pool = new RefCountingObjectPool(); + builder.schedulerPool = pool; InProcessServer s = - new InProcessServer("name", pool, Collections.emptyList()); + new InProcessServer(builder, Collections.emptyList()); Truth.assertThat(pool.count).isEqualTo(0); s.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 14953e8967..d09009260a 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -20,7 +20,6 @@ import io.grpc.ServerStreamTracer; import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; -import io.grpc.internal.SharedResourcePool; import io.grpc.internal.testing.AbstractTransportTest; import java.util.List; import org.junit.Ignore; @@ -37,9 +36,10 @@ public class InProcessTransportTest extends AbstractTransportTest { @Override protected InternalServer newServer(List streamTracerFactories) { - return new InProcessServer( - TRANSPORT_NAME, - SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories); + InProcessServerBuilder builder = InProcessServerBuilder + .forName(TRANSPORT_NAME) + .maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); + return new InProcessServer(builder, streamTracerFactories); } @Override @@ -55,7 +55,8 @@ public class InProcessTransportTest extends AbstractTransportTest { @Override protected ManagedClientTransport newClientTransport(InternalServer server) { - return new InProcessTransport(TRANSPORT_NAME, testAuthority(server), USER_AGENT); + return new InProcessTransport( + TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT); } @Override @@ -71,22 +72,4 @@ public class InProcessTransportTest extends AbstractTransportTest { public void socketStats() throws Exception { // test does not apply to in-process } - - // not yet implemented - @Test - @Ignore - @Override - public void serverChecksInboundMetadataSize() {} - - // not yet implemented - @Test - @Ignore - @Override - public void clientChecksInboundMetadataSize_header() {} - - // not yet implemented - @Test - @Ignore - @Override - public void clientChecksInboundMetadataSize_trailer() {} }