benchmark: add missing server side workloads (#3136)

* benchmark: add server side workloads
This commit is contained in:
zpencer 2017-08-05 21:00:55 -07:00 committed by GitHub
parent 16f4de4636
commit 6277c0ce4e
2 changed files with 161 additions and 54 deletions

View File

@ -34,10 +34,9 @@ import io.grpc.benchmarks.ByteBufOutputMarshaller;
import io.grpc.benchmarks.Utils;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.Control;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.benchmarks.proto.Stats;
import io.grpc.benchmarks.qps.AsyncServer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.File;
@ -74,9 +73,8 @@ final class LoadServer {
private static final Logger log = Logger.getLogger(LoadServer.class.getName());
private final Server server;
private final BenchmarkServiceImpl benchmarkService;
private final AsyncServer.BenchmarkServiceImpl benchmarkService;
private final OperatingSystemMXBean osBean;
private volatile boolean shutdown;
private final int port;
private ByteBuf genericResponse;
private long lastStatTime;
@ -121,7 +119,7 @@ final class LoadServer {
File key = TestUtils.loadCert("server1.key");
serverBuilder.useTransportSecurity(cert, key);
}
benchmarkService = new BenchmarkServiceImpl();
benchmarkService = new AsyncServer.BenchmarkServiceImpl();
if (config.getServerType() == Control.ServerType.ASYNC_GENERIC_SERVER) {
serverBuilder.addService(
ServerServiceDefinition
@ -192,45 +190,10 @@ final class LoadServer {
}
void shutdownNow() {
shutdown = true;
benchmarkService.shutdown();
server.shutdownNow();
}
private class BenchmarkServiceImpl extends BenchmarkServiceGrpc.BenchmarkServiceImplBase {
@Override
public void unaryCall(Messages.SimpleRequest request,
StreamObserver<Messages.SimpleResponse> responseObserver) {
responseObserver.onNext(Utils.makeResponse(request));
responseObserver.onCompleted();
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
final StreamObserver<Messages.SimpleResponse> responseObserver) {
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(Messages.SimpleRequest value) {
if (!shutdown) {
responseObserver.onNext(Utils.makeResponse(value));
} else {
responseObserver.onCompleted();
}
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
private class GenericServiceCallHandler implements ServerCallHandler<ByteBuf, ByteBuf> {
@Override

View File

@ -17,15 +17,18 @@
package io.grpc.benchmarks.qps;
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.benchmarks.Utils;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
import io.grpc.benchmarks.proto.Messages.SimpleResponse;
import io.grpc.benchmarks.proto.Messages;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.stub.StreamObservers;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@ -36,16 +39,20 @@ import io.netty.handler.ssl.SslProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
/**
* QPS server using the non-blocking API.
*/
public class AsyncServer {
private static final Logger log = Logger.getLogger(AsyncServer.class.getName());
/**
* checkstyle complains if there is no javadoc comment here.
@ -202,36 +209,173 @@ public class AsyncServer {
}
public static class BenchmarkServiceImpl extends BenchmarkServiceGrpc.BenchmarkServiceImplBase {
// Always use the same canned response for bidi. This is allowed by the spec.
private static final int BIDI_RESPONSE_BYTES = 100;
private static final Messages.SimpleResponse BIDI_RESPONSE = Messages.SimpleResponse
.newBuilder()
.setPayload(Messages.Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[BIDI_RESPONSE_BYTES])).build())
.build();
private final AtomicBoolean shutdown = new AtomicBoolean();
public BenchmarkServiceImpl() {
}
public void shutdown() {
shutdown.set(true);
}
@Override
public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
SimpleResponse response = Utils.makeResponse(request);
responseObserver.onNext(response);
public void unaryCall(Messages.SimpleRequest request,
StreamObserver<Messages.SimpleResponse> responseObserver) {
responseObserver.onNext(Utils.makeResponse(request));
responseObserver.onCompleted();
}
@Override
public StreamObserver<SimpleRequest> streamingCall(
final StreamObserver<SimpleResponse> responseObserver) {
return new StreamObserver<SimpleRequest>() {
public StreamObserver<Messages.SimpleRequest> streamingCall(
final StreamObserver<Messages.SimpleResponse> observer) {
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(SimpleRequest request) {
SimpleResponse response = Utils.makeResponse(request);
responseObserver.onNext(response);
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
responseObserver.onNext(Utils.makeResponse(value));
}
@Override
public void onError(Throwable t) {
System.out.println("Encountered an error in streamingCall");
t.printStackTrace();
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
// other side closed with OK
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingFromClient(
final StreamObserver<Messages.SimpleResponse> responseObserver) {
return new StreamObserver<Messages.SimpleRequest>() {
Messages.SimpleRequest lastSeen = null;
@Override
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
lastSeen = value;
}
@Override
public void onError(Throwable t) {
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
if (lastSeen != null) {
responseObserver.onNext(Utils.makeResponse(lastSeen));
responseObserver.onCompleted();
} else {
responseObserver.onError(
Status.FAILED_PRECONDITION
.withDescription("never received any requests").asException());
}
}
};
}
@Override
public void streamingFromServer(
final Messages.SimpleRequest request,
final StreamObserver<Messages.SimpleResponse> observer) {
// send forever, until the client cancels or we shut down
final Messages.SimpleResponse response = Utils.makeResponse(request);
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return response;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver);
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(
final StreamObserver<Messages.SimpleResponse> observer) {
// receive data forever and send data forever until client cancels or we shut down.
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return BIDI_RESPONSE;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver
);
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(final Messages.SimpleRequest request) {
// noop
}
@Override
public void onError(Throwable t) {
// other side cancelled
}
@Override
public void onCompleted() {
// Should never happen, because clients should cancel this call in order to stop
// the operation. Also because copyWithFlowControl hogs the inbound network thread
// via the handler for onReady, we would never expect this callback to be able to
// run anyways.
log.severe("clients should CANCEL the call to stop bidi streaming");
}
};
}
}
}