benchmarks: add fork join pool executor to load server too

This commit is contained in:
Carl Mastrangelo 2016-08-04 17:34:41 -07:00
parent 130d3815cf
commit 54f5c4ba89
1 changed files with 28 additions and 6 deletions

View File

@ -31,6 +31,10 @@
package io.grpc.benchmarks.driver;
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
import com.google.common.util.concurrent.UncaughtExceptionHandlers;
import com.sun.management.OperatingSystemMXBean;
import io.grpc.Metadata;
@ -52,12 +56,15 @@ import io.grpc.stub.StreamObserver;
import io.grpc.testing.TestUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -110,8 +117,7 @@ final class LoadServer {
// fully async.
switch (config.getServerType()) {
case ASYNC_SERVER: {
serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads,
new DefaultThreadFactory("server-worker", true)));
serverBuilder.executor(getExecutor(asyncThreads));
break;
}
case SYNC_SERVER: {
@ -119,8 +125,7 @@ final class LoadServer {
break;
}
case ASYNC_GENERIC_SERVER: {
serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads,
new DefaultThreadFactory("server-worker", true)));
serverBuilder.executor(getExecutor(asyncThreads));
// Create buffers for the generic service
PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
genericResponse = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
@ -160,6 +165,23 @@ final class LoadServer {
}
}
Executor getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
return new ForkJoinPool(asyncThreads,
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("server-worker-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
int getPort() {
return port;
}