mirror of https://github.com/grpc/grpc-java.git
inprocess: Add maxInboundMetadataSize
This commit is contained in:
parent
0fbc1153bd
commit
0eefa5263b
|
@ -16,6 +16,7 @@
|
|||
|
||||
package io.grpc.inprocess;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import io.grpc.ExperimentalApi;
|
||||
|
@ -67,6 +68,7 @@ public final class InProcessChannelBuilder extends
|
|||
|
||||
private final String name;
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private int maxInboundMetadataSize = Integer.MAX_VALUE;
|
||||
|
||||
private InProcessChannelBuilder(String name) {
|
||||
super(new InProcessSocketAddress(name), "localhost");
|
||||
|
@ -145,10 +147,30 @@ public final class InProcessChannelBuilder extends
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
|
||||
* the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
|
||||
*
|
||||
* <p>There is potential for performance penalty when this setting is enabled, as the Metadata
|
||||
* must actually be serialized. Since the current implementation of Metadata pre-serializes, it's
|
||||
* currently negligible. But Metadata is free to change its implementation.
|
||||
*
|
||||
* @param bytes the maximum size of received metadata
|
||||
* @return this
|
||||
* @throws IllegalArgumentException if bytes is non-positive
|
||||
* @since 1.17.0
|
||||
*/
|
||||
public InProcessChannelBuilder maxInboundMetadataSize(int bytes) {
|
||||
checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0");
|
||||
this.maxInboundMetadataSize = bytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Internal
|
||||
protected ClientTransportFactory buildTransportFactory() {
|
||||
return new InProcessClientTransportFactory(name, scheduledExecutorService);
|
||||
return new InProcessClientTransportFactory(
|
||||
name, scheduledExecutorService, maxInboundMetadataSize);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -158,14 +180,18 @@ public final class InProcessChannelBuilder extends
|
|||
private final String name;
|
||||
private final ScheduledExecutorService timerService;
|
||||
private final boolean useSharedTimer;
|
||||
private final int maxInboundMetadataSize;
|
||||
private boolean closed;
|
||||
|
||||
private InProcessClientTransportFactory(
|
||||
String name, @Nullable ScheduledExecutorService scheduledExecutorService) {
|
||||
String name,
|
||||
@Nullable ScheduledExecutorService scheduledExecutorService,
|
||||
int maxInboundMetadataSize) {
|
||||
this.name = name;
|
||||
useSharedTimer = scheduledExecutorService == null;
|
||||
timerService = useSharedTimer
|
||||
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
|
||||
this.maxInboundMetadataSize = maxInboundMetadataSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -174,7 +200,8 @@ public final class InProcessChannelBuilder extends
|
|||
if (closed) {
|
||||
throw new IllegalStateException("The transport factory is closed.");
|
||||
}
|
||||
return new InProcessTransport(name, options.getAuthority(), options.getUserAgent());
|
||||
return new InProcessTransport(
|
||||
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,6 +44,7 @@ final class InProcessServer implements InternalServer {
|
|||
}
|
||||
|
||||
private final String name;
|
||||
private final int maxInboundMetadataSize;
|
||||
private final List<ServerStreamTracer.Factory> streamTracerFactories;
|
||||
private ServerListener listener;
|
||||
private boolean shutdown;
|
||||
|
@ -56,10 +57,11 @@ final class InProcessServer implements InternalServer {
|
|||
private ScheduledExecutorService scheduler;
|
||||
|
||||
InProcessServer(
|
||||
String name, ObjectPool<ScheduledExecutorService> schedulerPool,
|
||||
InProcessServerBuilder builder,
|
||||
List<ServerStreamTracer.Factory> streamTracerFactories) {
|
||||
this.name = name;
|
||||
this.schedulerPool = schedulerPool;
|
||||
this.name = builder.name;
|
||||
this.schedulerPool = builder.schedulerPool;
|
||||
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
|
||||
this.streamTracerFactories =
|
||||
Collections.unmodifiableList(checkNotNull(streamTracerFactories, "streamTracerFactories"));
|
||||
}
|
||||
|
@ -112,6 +114,10 @@ final class InProcessServer implements InternalServer {
|
|||
return schedulerPool;
|
||||
}
|
||||
|
||||
int getMaxInboundMetadataSize() {
|
||||
return maxInboundMetadataSize;
|
||||
}
|
||||
|
||||
List<ServerStreamTracer.Factory> getStreamTracerFactories() {
|
||||
return streamTracerFactories;
|
||||
}
|
||||
|
|
|
@ -91,8 +91,9 @@ public final class InProcessServerBuilder
|
|||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private ObjectPool<ScheduledExecutorService> schedulerPool =
|
||||
final String name;
|
||||
int maxInboundMetadataSize = Integer.MAX_VALUE;
|
||||
ObjectPool<ScheduledExecutorService> schedulerPool =
|
||||
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
|
||||
|
||||
private InProcessServerBuilder(String name) {
|
||||
|
@ -123,10 +124,29 @@ public final class InProcessServerBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
|
||||
* the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
|
||||
*
|
||||
* <p>There is potential for performance penalty when this setting is enabled, as the Metadata
|
||||
* must actually be serialized. Since the current implementation of Metadata pre-serializes, it's
|
||||
* currently negligible. But Metadata is free to change its implementation.
|
||||
*
|
||||
* @param bytes the maximum size of received metadata
|
||||
* @return this
|
||||
* @throws IllegalArgumentException if bytes is non-positive
|
||||
* @since 1.17.0
|
||||
*/
|
||||
public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
|
||||
Preconditions.checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0");
|
||||
this.maxInboundMetadataSize = bytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InProcessServer buildTransportServer(
|
||||
List<ServerStreamTracer.Factory> streamTracerFactories) {
|
||||
return new InProcessServer(name, schedulerPool, streamTracerFactories);
|
||||
return new InProcessServer(this, streamTracerFactories);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,6 +32,7 @@ import io.grpc.DecompressorRegistry;
|
|||
import io.grpc.Grpc;
|
||||
import io.grpc.InternalChannelz.SocketStats;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.InternalMetadata;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.SecurityLevel;
|
||||
|
@ -74,8 +75,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
|
||||
private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
|
||||
private final String name;
|
||||
private final int clientMaxInboundMetadataSize;
|
||||
private final String authority;
|
||||
private final String userAgent;
|
||||
private int serverMaxInboundMetadataSize;
|
||||
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
|
||||
private ScheduledExecutorService serverScheduler;
|
||||
private ServerTransportListener serverTransportListener;
|
||||
|
@ -108,8 +111,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
}
|
||||
};
|
||||
|
||||
public InProcessTransport(String name, String authority, String userAgent) {
|
||||
public InProcessTransport(
|
||||
String name, int maxInboundMetadataSize, String authority, String userAgent) {
|
||||
this.name = name;
|
||||
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
|
||||
this.authority = authority;
|
||||
this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
|
||||
}
|
||||
|
@ -120,6 +125,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
this.clientTransportListener = listener;
|
||||
InProcessServer server = InProcessServer.findServer(name);
|
||||
if (server != null) {
|
||||
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
|
||||
serverSchedulerPool = server.getScheduledExecutorServicePool();
|
||||
serverScheduler = serverSchedulerPool.getObject();
|
||||
serverStreamTracerFactories = server.getStreamTracerFactories();
|
||||
|
@ -159,20 +165,43 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
public synchronized ClientStream newStream(
|
||||
final MethodDescriptor<?, ?> method, final Metadata headers, final CallOptions callOptions) {
|
||||
if (shutdownStatus != null) {
|
||||
final Status capturedStatus = shutdownStatus;
|
||||
final StatsTraceContext statsTraceCtx =
|
||||
StatsTraceContext.newClientContext(callOptions, headers);
|
||||
return new NoopClientStream() {
|
||||
return failedClientStream(
|
||||
StatsTraceContext.newClientContext(callOptions, headers), shutdownStatus);
|
||||
}
|
||||
|
||||
headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
|
||||
|
||||
if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
|
||||
int metadataSize = metadataSize(headers);
|
||||
if (metadataSize > serverMaxInboundMetadataSize) {
|
||||
// Other transports would compute a status with:
|
||||
// GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
|
||||
// However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
|
||||
// in-process. We go ahead and make a Status, which may need to be updated if
|
||||
// statuscodes.md is updated.
|
||||
Status status = Status.RESOURCE_EXHAUSTED.withDescription(
|
||||
String.format(
|
||||
"Request metadata larger than %d: %d",
|
||||
serverMaxInboundMetadataSize,
|
||||
metadataSize));
|
||||
return failedClientStream(
|
||||
StatsTraceContext.newClientContext(callOptions, headers), status);
|
||||
}
|
||||
}
|
||||
|
||||
return new InProcessStream(method, headers, callOptions, authority).clientStream;
|
||||
}
|
||||
|
||||
private ClientStream failedClientStream(
|
||||
final StatsTraceContext statsTraceCtx, final Status status) {
|
||||
return new NoopClientStream() {
|
||||
@Override
|
||||
public void start(ClientStreamListener listener) {
|
||||
statsTraceCtx.clientOutboundHeaders();
|
||||
statsTraceCtx.streamClosed(capturedStatus);
|
||||
listener.closed(capturedStatus, new Metadata());
|
||||
statsTraceCtx.streamClosed(status);
|
||||
listener.closed(status, new Metadata());
|
||||
}
|
||||
};
|
||||
}
|
||||
headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
|
||||
return new InProcessStream(method, headers, callOptions, authority).clientStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -281,6 +310,21 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
}
|
||||
}
|
||||
|
||||
private static int metadataSize(Metadata metadata) {
|
||||
byte[][] serialized = InternalMetadata.serialize(metadata);
|
||||
if (serialized == null) {
|
||||
return 0;
|
||||
}
|
||||
// Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
|
||||
// different, but it's "sane."
|
||||
long size = 0;
|
||||
for (int i = 0; i < serialized.length; i += 2) {
|
||||
size += 32 + serialized[i].length + serialized[i + 1].length;
|
||||
}
|
||||
size = Math.min(size, Integer.MAX_VALUE);
|
||||
return (int) size;
|
||||
}
|
||||
|
||||
private class InProcessStream {
|
||||
private final InProcessClientStream clientStream;
|
||||
private final InProcessServerStream serverStream;
|
||||
|
@ -424,12 +468,32 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void writeHeaders(Metadata headers) {
|
||||
if (closed) {
|
||||
return;
|
||||
public void writeHeaders(Metadata headers) {
|
||||
if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
|
||||
int metadataSize = metadataSize(headers);
|
||||
if (metadataSize > clientMaxInboundMetadataSize) {
|
||||
Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
|
||||
clientStream.serverClosed(serverStatus, serverStatus);
|
||||
// Other transports provide very little information in this case. We go ahead and make a
|
||||
// Status, which may need to be updated if statuscodes.md is updated.
|
||||
Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
|
||||
String.format(
|
||||
"Response header metadata larger than %d: %d",
|
||||
clientMaxInboundMetadataSize,
|
||||
metadataSize));
|
||||
notifyClientClose(failedStatus, new Metadata());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
clientStream.statsTraceCtx.clientInboundHeaders();
|
||||
clientStreamListener.headersRead(headers);
|
||||
}
|
||||
clientStream.statsTraceCtx.clientInboundHeaders();
|
||||
clientStreamListener.headersRead(headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -440,6 +504,30 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
// calling internalCancel().
|
||||
clientStream.serverClosed(Status.OK, status);
|
||||
|
||||
if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
|
||||
int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
|
||||
// Go ahead and throw in the status description's length, since that could be very long.
|
||||
int metadataSize = metadataSize(trailers) + statusSize;
|
||||
if (metadataSize > clientMaxInboundMetadataSize) {
|
||||
// Override the status for the client, but not the server. Transports do not guarantee
|
||||
// notifying the server of the failure.
|
||||
|
||||
// Other transports provide very little information in this case. We go ahead and make a
|
||||
// Status, which may need to be updated if statuscodes.md is updated.
|
||||
status = Status.RESOURCE_EXHAUSTED.withDescription(
|
||||
String.format(
|
||||
"Response header metadata larger than %d: %d",
|
||||
clientMaxInboundMetadataSize,
|
||||
metadataSize));
|
||||
trailers = new Metadata();
|
||||
}
|
||||
}
|
||||
|
||||
notifyClientClose(status, trailers);
|
||||
}
|
||||
|
||||
/** clientStream.serverClosed() must be called before this method */
|
||||
private void notifyClientClose(Status status, Metadata trailers) {
|
||||
Status clientStatus = stripCause(status);
|
||||
synchronized (this) {
|
||||
if (closed) {
|
||||
|
|
|
@ -19,12 +19,10 @@ package io.grpc.inprocess;
|
|||
import com.google.common.truth.Truth;
|
||||
import io.grpc.ServerStreamTracer;
|
||||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServerListener;
|
||||
import io.grpc.internal.ServerTransport;
|
||||
import io.grpc.internal.ServerTransportListener;
|
||||
import io.grpc.internal.SharedResourcePool;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.junit.Test;
|
||||
|
@ -33,12 +31,12 @@ import org.junit.runners.JUnit4;
|
|||
|
||||
@RunWith(JUnit4.class)
|
||||
public class InProcessServerTest {
|
||||
private InProcessServerBuilder builder = InProcessServerBuilder.forName("name");
|
||||
|
||||
@Test
|
||||
public void getPort_notStarted() throws Exception {
|
||||
InProcessServer s =
|
||||
new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
|
||||
Collections.<ServerStreamTracer.Factory>emptyList());
|
||||
new InProcessServer(builder, Collections.<ServerStreamTracer.Factory>emptyList());
|
||||
|
||||
Truth.assertThat(s.getPort()).isEqualTo(-1);
|
||||
}
|
||||
|
@ -63,8 +61,9 @@ public class InProcessServerTest {
|
|||
}
|
||||
|
||||
RefCountingObjectPool pool = new RefCountingObjectPool();
|
||||
builder.schedulerPool = pool;
|
||||
InProcessServer s =
|
||||
new InProcessServer("name", pool, Collections.<ServerStreamTracer.Factory>emptyList());
|
||||
new InProcessServer(builder, Collections.<ServerStreamTracer.Factory>emptyList());
|
||||
Truth.assertThat(pool.count).isEqualTo(0);
|
||||
s.start(new ServerListener() {
|
||||
@Override public ServerTransportListener transportCreated(ServerTransport transport) {
|
||||
|
|
|
@ -20,7 +20,6 @@ import io.grpc.ServerStreamTracer;
|
|||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.InternalServer;
|
||||
import io.grpc.internal.ManagedClientTransport;
|
||||
import io.grpc.internal.SharedResourcePool;
|
||||
import io.grpc.internal.testing.AbstractTransportTest;
|
||||
import java.util.List;
|
||||
import org.junit.Ignore;
|
||||
|
@ -37,9 +36,10 @@ public class InProcessTransportTest extends AbstractTransportTest {
|
|||
|
||||
@Override
|
||||
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
|
||||
return new InProcessServer(
|
||||
TRANSPORT_NAME,
|
||||
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories);
|
||||
InProcessServerBuilder builder = InProcessServerBuilder
|
||||
.forName(TRANSPORT_NAME)
|
||||
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
|
||||
return new InProcessServer(builder, streamTracerFactories);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -55,7 +55,8 @@ public class InProcessTransportTest extends AbstractTransportTest {
|
|||
|
||||
@Override
|
||||
protected ManagedClientTransport newClientTransport(InternalServer server) {
|
||||
return new InProcessTransport(TRANSPORT_NAME, testAuthority(server), USER_AGENT);
|
||||
return new InProcessTransport(
|
||||
TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,22 +72,4 @@ public class InProcessTransportTest extends AbstractTransportTest {
|
|||
public void socketStats() throws Exception {
|
||||
// test does not apply to in-process
|
||||
}
|
||||
|
||||
// not yet implemented
|
||||
@Test
|
||||
@Ignore
|
||||
@Override
|
||||
public void serverChecksInboundMetadataSize() {}
|
||||
|
||||
// not yet implemented
|
||||
@Test
|
||||
@Ignore
|
||||
@Override
|
||||
public void clientChecksInboundMetadataSize_header() {}
|
||||
|
||||
// not yet implemented
|
||||
@Test
|
||||
@Ignore
|
||||
@Override
|
||||
public void clientChecksInboundMetadataSize_trailer() {}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue