diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 149d83944e..5f4c731716 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -302,7 +302,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { } } - private class ServerListenerImpl implements ServerListener { + private final class ServerListenerImpl implements ServerListener { @Override public ServerTransportListener transportCreated(ServerTransport transport) { synchronized (lock) { @@ -336,11 +336,11 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { } } - private class ServerTransportListenerImpl implements ServerTransportListener { + private final class ServerTransportListenerImpl implements ServerTransportListener { private final ServerTransport transport; private Attributes attributes; - public ServerTransportListenerImpl(ServerTransport transport) { + ServerTransportListenerImpl(ServerTransport transport) { this.transport = transport; } @@ -398,41 +398,49 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks // are delivered, including any errors. Callbacks can still be triggered, but they will be // queued. - wrappedExecutor.execute(new ContextRunnable(context) { - @Override - public void runInContext() { - ServerStreamListener listener = NOOP_LISTENER; - try { - ServerMethodDefinition method = registry.lookupMethod(methodName); - if (method == null) { - method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); - } - if (method == null) { - Status status = Status.UNIMPLEMENTED.withDescription( - "Method not found: " + methodName); - // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in - // memory as a map whose key is the method name, this would allow a misbehaving - // client to blow up the server in-memory stats storage by sending large number of - // distinct unimplemented method - // names. (https://github.com/grpc/grpc-java/issues/2285) - stream.close(status, new Metadata()); - context.cancel(null); - return; - } - listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); - } catch (RuntimeException e) { - stream.close(Status.fromThrowable(e), new Metadata()); - context.cancel(null); - throw e; - } catch (Error e) { - stream.close(Status.fromThrowable(e), new Metadata()); - context.cancel(null); - throw e; - } finally { - jumpListener.setListener(listener); + + final class StreamCreated extends ContextRunnable { + + StreamCreated() { + super(context); + } + + @Override + public void runInContext() { + ServerStreamListener listener = NOOP_LISTENER; + try { + ServerMethodDefinition method = registry.lookupMethod(methodName); + if (method == null) { + method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); } + if (method == null) { + Status status = Status.UNIMPLEMENTED.withDescription( + "Method not found: " + methodName); + // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in + // memory as a map whose key is the method name, this would allow a misbehaving + // client to blow up the server in-memory stats storage by sending large number of + // distinct unimplemented method + // names. (https://github.com/grpc/grpc-java/issues/2285) + stream.close(status, new Metadata()); + context.cancel(null); + return; + } + listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); + } catch (RuntimeException e) { + stream.close(Status.fromThrowable(e), new Metadata()); + context.cancel(null); + throw e; + } catch (Error e) { + stream.close(Status.fromThrowable(e), new Metadata()); + context.cancel(null); + throw e; + } finally { + jumpListener.setListener(listener); } - }); + } + } + + wrappedExecutor.execute(new StreamCreated()); } private Context.CancellableContext createContext( @@ -447,7 +455,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { Context.CancellableContext context = baseContext.withDeadlineAfter( timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService()); - context.addListener(new Context.CancellationListener() { + final class ServerStreamCancellationListener implements Context.CancellationListener { @Override public void cancelled(Context context) { Status status = statusFromCancelled(context); @@ -457,7 +465,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { stream.cancel(status); } } - }, directExecutor()); + } + + context.addListener(new ServerStreamCancellationListener(), directExecutor()); return context; } @@ -489,7 +499,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { return logId; } - private static class NoopListener implements ServerStreamListener { + private static final class NoopListener implements ServerStreamListener { @Override public void messagesAvailable(MessageProducer producer) { InputStream message; @@ -526,7 +536,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { * exceptions. */ @VisibleForTesting - static class JumpToApplicationThreadServerStreamListener implements ServerStreamListener { + static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener { private final Executor callExecutor; private final Executor cancelExecutor; private final Context.CancellableContext context; @@ -569,7 +579,13 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { @Override public void messagesAvailable(final MessageProducer producer) { - callExecutor.execute(new ContextRunnable(context) { + + final class MessagesAvailable extends ContextRunnable { + + MessagesAvailable() { + super(context); + } + @Override public void runInContext() { try { @@ -582,12 +598,18 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw e; } } - }); + } + + callExecutor.execute(new MessagesAvailable()); } @Override public void halfClosed() { - callExecutor.execute(new ContextRunnable(context) { + final class HalfClosed extends ContextRunnable { + HalfClosed() { + super(context); + } + @Override public void runInContext() { try { @@ -600,7 +622,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw e; } } - }); + } + + callExecutor.execute(new HalfClosed()); } @Override @@ -612,17 +636,28 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { // is not serializing. cancelExecutor.execute(new ContextCloser(context, status.getCause())); } - callExecutor.execute(new ContextRunnable(context) { + + final class Closed extends ContextRunnable { + Closed() { + super(context); + } + @Override public void runInContext() { getListener().closed(status); } - }); + } + + callExecutor.execute(new Closed()); } @Override public void onReady() { - callExecutor.execute(new ContextRunnable(context) { + final class OnReady extends ContextRunnable { + OnReady() { + super(context); + } + @Override public void runInContext() { try { @@ -635,12 +670,14 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw e; } } - }); + } + + callExecutor.execute(new OnReady()); } } @VisibleForTesting - static class ContextCloser implements Runnable { + static final class ContextCloser implements Runnable { private final Context.CancellableContext context; private final Throwable cause;