netty: provide an option to lower netty allocator chunk size from 16MB to 2MB (#6407)

This would reduce the amount of direct buffer allocations, especially with light traffic. This should mitigate internal issue b/143075435

The change is currently optional and is only effective if system property "io.grpc.netty.useCustomAllocator" is set to "true" ignoring the case.

Internal benchmark results (median of 5 runs) doesn't show any significant change:
```
                          Before (STDEV)           After (STDEV)
grpc-java-java-multi-qps-integrity_only
Actual QPS               717,848 (7,445)         715,061 (2,122) 
QPS per Client CPU        23,768   (799)          23,842   (295)

grpc-java-java-multi-throughput-integrity_only
Actual QPS                35,631   (204)          35,298    (25) 
QPS per Client CPU         3,362    (56)           3,316    (18)

grpc-java-java-single-latency-integrity_only
Median latency (us)          130  (1.82)             125  (5.36)

grpc-java-java-single-throughput-integrity_only
Actual QPS                    593 (5.14)             587  (3.76)
QPS per Client CPU            502 (4.51)             494  (6.92)

```
This commit is contained in:
Kun Zhang 2019-11-14 15:50:11 -08:00 committed by GitHub
parent 194d1512c0
commit 89cd64328d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 95 additions and 12 deletions

View File

@ -40,6 +40,7 @@ import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
@ -74,6 +75,8 @@ public final class NettyChannelBuilder
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
@ -420,7 +423,7 @@ public final class NettyChannelBuilder
return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
@ -535,6 +538,8 @@ public final class NettyChannelBuilder
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private final ByteBufAllocator allocator;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
@ -550,6 +555,7 @@ public final class NettyChannelBuilder
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
@ -559,6 +565,8 @@ public final class NettyChannelBuilder
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.allocatorPool = allocatorPool;
this.allocator = allocatorPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
@ -597,7 +605,7 @@ public final class NettyChannelBuilder
// TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelFactory, channelOptions, group,
serverAddress, channelFactory, channelOptions, group, allocator,
localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
@ -618,6 +626,7 @@ public final class NettyChannelBuilder
}
closed = true;
allocatorPool.returnObject(allocator);
protocolNegotiator.close();
groupPool.returnObject(group);
}

View File

@ -17,6 +17,7 @@
package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.annotations.VisibleForTesting;
@ -43,6 +44,7 @@ import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
@ -74,6 +76,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final SocketAddress remoteAddress;
private final ChannelFactory<? extends Channel> channelFactory;
private final EventLoopGroup group;
private final ByteBufAllocator allocator;
private final ProtocolNegotiator negotiator;
private final String authorityString;
private final AsciiString authority;
@ -105,6 +108,7 @@ class NettyClientTransport implements ConnectionClientTransport {
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ByteBufAllocator allocator,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
@ -115,6 +119,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.allocator = Preconditions.checkNotNull(allocator, "allocator");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.flowControlWindow = flowControlWindow;
@ -225,6 +230,7 @@ class NettyClientTransport implements ConnectionClientTransport {
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, allocator);
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);

View File

@ -18,6 +18,7 @@ package io.grpc.netty;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
@ -37,6 +38,7 @@ import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
@ -73,8 +75,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ByteBufAllocator allocator;
private ServerListener listener;
private Channel channel;
private final int flowControlWindow;
@ -87,7 +91,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final long maxConnectionAgeGraceInNanos;
private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos;
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final ReferenceCounted sharedResourceReferenceCounter =
new SharedResourceReferenceCounter();
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
private final InternalChannelz channelz;
@ -100,6 +105,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
@ -115,8 +121,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.allocator = allocatorPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory;
@ -155,6 +163,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
listener = checkNotNull(serverListener, "serverListener");
ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, allocator);
b.childOption(ALLOCATOR, allocator);
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
@ -210,7 +220,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
}
// `channel` shutdown can race with `ch` initialization, so this is only safe to increment
// inside the lock.
eventLoopReferenceCounter.retain();
sharedResourceReferenceCounter.retain();
transportListener = listener.transportCreated(transport);
}
@ -224,7 +234,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
public void operationComplete(ChannelFuture future) throws Exception {
if (!done) {
done = true;
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
}
}
@ -281,7 +291,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
synchronized (NettyServer.this) {
listener.serverShutdown();
}
eventLoopReferenceCounter.release();
sharedResourceReferenceCounter.release();
}
});
try {
@ -305,7 +315,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
.toString();
}
class EventLoopReferenceCounter extends AbstractReferenceCounted {
class SharedResourceReferenceCounter extends AbstractReferenceCounted {
@Override
protected void deallocate() {
try {
@ -320,6 +330,13 @@ class NettyServer implements InternalServer, InternalWithLogId {
}
} finally {
workerGroup = null;
try {
if (allocator != null) {
allocatorPool.returnObject(allocator);
}
} finally {
allocator = null;
}
}
}
}

View File

@ -35,6 +35,7 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@ -79,6 +80,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
private final List<SocketAddress> listenAddresses = new ArrayList<>();
@ -541,7 +544,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,

View File

@ -34,6 +34,8 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders;
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
@ -83,6 +85,42 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
new Resource<ByteBufAllocator>() {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
return new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
return ByteBufAllocator.DEFAULT;
}
}
@Override
public void close(ByteBufAllocator allocator) {
// PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
}
};
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;

View File

@ -62,9 +62,12 @@ import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
@ -123,6 +126,7 @@ public class NettyClientTransportTest {
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
private Runnable tooManyPingsRunnable = new Runnable() {
@ -153,6 +157,7 @@ public class NettyClientTransportTest {
}
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
}
@Test
@ -190,7 +195,7 @@ public class NettyClientTransportTest {
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
@ -435,7 +440,7 @@ public class NettyClientTransportTest {
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
new HashMap<ChannelOption<?>, Object>(), group,
new HashMap<ChannelOption<?>, Object>(), group, allocator,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
@ -705,7 +710,7 @@ public class NettyClientTransportTest {
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
@ -723,7 +728,8 @@ public class NettyClientTransportTest {
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,

View File

@ -62,6 +62,7 @@ public class NettyServerTest {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -100,6 +101,7 @@ public class NettyServerTest {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -138,6 +140,7 @@ public class NettyServerTest {
channelOptions,
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -188,6 +191,7 @@ public class NettyServerTest {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),