mirror of https://github.com/grpc/grpc-java.git
xds: Fix flow control data race in ControlPlaneClient
As discovered by TSAN, the adsStream field is not synchronized. ``` WARNING: ThreadSanitizer: data race (pid=1625) Read of size 4 at 0x00009b66fc88 by thread T23 (mutexes: write M0): #0 io.grpc.xds.ControlPlaneClient.isReady()Z ControlPlaneClient.java:203 #1 io.grpc.xds.ControlPlaneClient.readyHandler()V ControlPlaneClient.java:211 #2 io.grpc.xds.ControlPlaneClient$AdsStream.onReady()V ControlPlaneClient.java:328 #3 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onReady()V GrpcXdsTransportFactory.java:145 #4 io.grpc.PartialForwardingClientCallListener.onReady()V PartialForwardingClientCallListener.java:44 #5 io.grpc.ForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:23 #6 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:40 #7 io.grpc.PartialForwardingClientCallListener.onReady()V PartialForwardingClientCallListener.java:44 #8 io.grpc.ForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:23 #9 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onReady()V ForwardingClientCallListener.java:40 #10 io.grpc.internal.DelayedClientCall$DelayedListener.onReady()V DelayedClientCall.java:497 #11 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamOnReady.runInternal()V ClientCallImpl.java:781 #12 io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamOnReady.runInContext()V ClientCallImpl.java:772 #13 io.grpc.internal.ContextRunnable.run()V ContextRunnable.java:37 #14 io.grpc.internal.SerializingExecutor.run()V SerializingExecutor.java:133 #15 java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V ThreadPoolExecutor.java:1130 #16 java.util.concurrent.ThreadPoolExecutor$Worker.run()V ThreadPoolExecutor.java:630 #17 java.lang.Thread.run()V Thread.java:830 #18 (Generated Stub) <null> Previous write of size 4 at 0x00009b66fc88 by thread T4 (mutexes: write M1, write M2, write M3, write M4, write M5): #0 io.grpc.xds.ControlPlaneClient$AdsStream.cleanUp()V ControlPlaneClient.java:424 #1 io.grpc.xds.ControlPlaneClient$AdsStream.close(Ljava/lang/Exception;)V ControlPlaneClient.java:418 #2 io.grpc.xds.ControlPlaneClient$1.run()V ControlPlaneClient.java:130 #3 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:94 #4 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:126 #5 io.grpc.xds.XdsClientImpl.shutdown()V XdsClientImpl.java:207 #6 io.grpc.xds.SharedXdsClientPoolProvider$RefCountedXdsClientObjectPool.returnObject(Ljava/lang/Object;)Lio/grpc/xds/XdsClient; SharedXdsClientPoolProvider.java:144 #7 io.grpc.xds.SharedXdsClientPoolProvider$RefCountedXdsClientObjectPool.returnObject(Ljava/lang/Object;)Ljava/lang/Object; SharedXdsClientPoolProvider.java:102 #8 io.grpc.xds.XdsClientFederationTest.cleanUp()V XdsClientFederationTest.java:86 ```
This commit is contained in:
parent
de7e649e04
commit
d7628a3aba
|
@ -199,6 +199,7 @@ final class ControlPlaneClient {
|
|||
return rpcRetryTimer != null && rpcRetryTimer.isPending();
|
||||
}
|
||||
|
||||
// Must be synchronized.
|
||||
boolean isReady() {
|
||||
return adsStream != null && adsStream.call != null && adsStream.call.isReady();
|
||||
}
|
||||
|
@ -207,6 +208,7 @@ final class ControlPlaneClient {
|
|||
* Starts a timer for each requested resource that hasn't been responded to and
|
||||
* has been waiting for the channel to get ready.
|
||||
*/
|
||||
// Must be synchronized.
|
||||
void readyHandler() {
|
||||
if (!isReady()) {
|
||||
return;
|
||||
|
@ -325,7 +327,7 @@ final class ControlPlaneClient {
|
|||
|
||||
@Override
|
||||
public void onReady() {
|
||||
readyHandler();
|
||||
syncContext.execute(ControlPlaneClient.this::readyHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue