Improve thread safety of newStream()

OkHttp should now have a thread-safe implementation of newStream.
Previously 'lock' was not held when checking goAway and the checking in
AbstractClientTransport was redundant.

Netty was thread-safe, but it was very hard to tell what guarantees were
necessary and what guarantees each piece was providing.
This commit is contained in:
Eric Anderson 2015-02-06 09:53:56 -08:00
parent c84ef8332e
commit 8aa79b39fc
3 changed files with 36 additions and 92 deletions

View File

@ -1,79 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
/**
* Abstract base class for all {@link ClientTransport} implementations. Implements the
* {@link #newStream} method to perform a state check on the service before allowing stream
* creation.
*/
public abstract class AbstractClientTransport extends AbstractService implements ClientTransport {
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
ClientStreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(listener, "listener");
if (state() == State.STARTING) {
// Wait until the transport is running before creating the new stream.
awaitRunning();
}
if (state() != State.RUNNING) {
throw new IllegalStateException("Invalid state for creating new stream: " + state(),
failureCause());
}
// Create the stream.
return newStreamInternal(method, headers, listener);
}
/**
* Called by {@link #newStream} to perform the actual creation of the new {@link ClientStream}.
* This is only called after the transport has successfully transitioned to the {@code RUNNING}
* state.
*
* @param method the RPC method to be invoked on the server by the new stream.
* @param listener the listener for events on the new stream.
* @return the new stream.
*/
protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
ClientStreamListener listener);
}

View File

@ -34,13 +34,13 @@ package io.grpc.transport.netty;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.transport.AbstractClientTransport;
import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport;
@ -79,7 +79,7 @@ import javax.net.ssl.SSLParameters;
/**
* A Netty-based {@link ClientTransport} implementation.
*/
class NettyClientTransport extends AbstractClientTransport {
class NettyClientTransport extends AbstractService implements ClientTransport {
private final SocketAddress address;
private final EventLoopGroup eventGroup;
@ -87,7 +87,7 @@ class NettyClientTransport extends AbstractClientTransport {
private final NettyClientHandler handler;
private final boolean ssl;
private final AsciiString authority;
private Channel channel;
private volatile Channel channel;
NettyClientTransport(SocketAddress address, NegotiationType negotiationType,
EventLoopGroup eventGroup, SslContext sslContext) {
@ -140,8 +140,27 @@ class NettyClientTransport extends AbstractClientTransport {
}
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, Metadata.Headers headers,
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
ClientStreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
Preconditions.checkNotNull(listener, "listener");
// We can't write to the channel until negotiation is complete. Use state() instead of blindly
// calling awaitRunning() in order to avoid obtaining a lock in the common case.
if (state() != State.RUNNING) {
try {
awaitRunning();
} catch (IllegalStateException ex) {
if (channel == null) {
// Negotiation did not complete, so still can't write to channel. Ex should already
// contain failureCause() information.
throw ex;
}
}
// channel is now guaranteed to be non-null
}
// Create the stream.
NettyClientStream stream = new NettyClientStream(listener, channel, handler);

View File

@ -33,6 +33,7 @@ package io.grpc.transport.okhttp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader;
@ -46,7 +47,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.transport.AbstractClientTransport;
import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport;
@ -75,7 +75,7 @@ import javax.net.ssl.SSLSocketFactory;
/**
* A okhttp-based {@link ClientTransport} implementation.
*/
public class OkHttpClientTransport extends AbstractClientTransport {
public class OkHttpClientTransport extends AbstractService implements ClientTransport {
/** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
@VisibleForTesting
static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
@ -164,15 +164,19 @@ public class OkHttpClientTransport extends AbstractClientTransport {
}
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
ClientStreamListener listener) {
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
ClientStreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
Preconditions.checkNotNull(listener, "listener");
OkHttpClientStream clientStream = OkHttpClientStream.newStream(listener,
frameWriter, this, outboundFlow);
if (goAway) {
clientStream.transportReportStatus(goAwayStatus, false, new Metadata.Trailers());
} else {
assignStreamId(clientStream);
synchronized (lock) {
if (goAway) {
throw new IllegalStateException("Transport not running", goAwayStatus.asRuntimeException());
} else {
assignStreamId(clientStream);
}
}
String defaultPath = "/" + method.getName();
frameWriter.synStream(false, false, clientStream.id(), 0,