diff --git a/benchmarks/src/jmh/java/io/grpc/internal/SerializingExecutorBenchmark.java b/benchmarks/src/jmh/java/io/grpc/internal/SerializingExecutorBenchmark.java
new file mode 100644
index 0000000000..1ff1b4c448
--- /dev/null
+++ b/benchmarks/src/jmh/java/io/grpc/internal/SerializingExecutorBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * SerializingExecutor benchmark.
+ *
+ *
Since this is a microbenchmark, don't actually believe the numbers in a strict sense. Instead,
+ * it is a gauge that the code is behaving roughly as expected, to increase confidence that our
+ * understanding of the code is correct (and will behave as expected in other cases). Even more
+ * helpfully it pushes the implementation, which should weed out many multithreading bugs.
+ */
+@State(Scope.Thread)
+public class SerializingExecutorBenchmark {
+
+ private ExecutorService executorService;
+ private Executor executor = new SerializingExecutor(executorService);
+
+ private static class IncrRunnable implements Runnable {
+ int val;
+
+ @Override
+ public void run() {
+ val++;
+ }
+ }
+
+ private final IncrRunnable incrRunnable = new IncrRunnable();
+
+ private final Phaser phaser = new Phaser(2);
+ private final Runnable phaserRunnable = new Runnable() {
+ @Override
+ public void run() {
+ phaser.arrive();
+ }
+ };
+
+ @TearDown
+ public void tearDown() throws Exception {
+ executorService.shutdownNow();
+ if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
+ throw new RuntimeException("executor failed to shut down in a timely fashion");
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.SampleTime)
+ @OutputTimeUnit(TimeUnit.NANOSECONDS)
+ public void oneRunnableLatency() throws Exception {
+ executor.execute(phaserRunnable);
+ phaser.arriveAndAwaitAdvance();
+ }
+
+ /**
+ * Queue many runnables, to better see queuing/consumption cost instead of just context switch.
+ */
+ @Benchmark
+ @BenchmarkMode(Mode.SampleTime)
+ @OutputTimeUnit(TimeUnit.NANOSECONDS)
+ public void manyRunnables() throws Exception {
+ incrRunnable.val = 0;
+ for (int i = 0; i < 500; i++) {
+ executor.execute(incrRunnable);
+ }
+ executor.execute(phaserRunnable);
+ phaser.arriveAndAwaitAdvance();
+ if (incrRunnable.val != 500) {
+ throw new AssertionError();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/io/grpc/internal/SerializingExecutor.java b/core/src/main/java/io/grpc/internal/SerializingExecutor.java
index 0355aa3857..b1afb839f5 100644
--- a/core/src/main/java/io/grpc/internal/SerializingExecutor.java
+++ b/core/src/main/java/io/grpc/internal/SerializingExecutor.java
@@ -31,13 +31,16 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.base.Preconditions;
-import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.Nullable;
/**
* Executor ensuring that all {@link Runnable} tasks submitted are executed in order
@@ -45,7 +48,7 @@ import javax.annotation.concurrent.GuardedBy;
* running at the same time.
*/
// TODO(madongfly): figure out a way to not expose it or move it to transport package.
-public final class SerializingExecutor implements Executor {
+public final class SerializingExecutor implements Executor, Runnable {
private static final Logger log =
Logger.getLogger(SerializingExecutor.class.getName());
@@ -53,24 +56,9 @@ public final class SerializingExecutor implements Executor {
private final Executor executor;
/** A list of Runnables to be run in order. */
- // Initial size set to 4 because it is a nice number and at least the size necessary for handling
- // a unary response: onHeaders + onPayload + onClose
- @GuardedBy("internalLock")
- private final Queue waitQueue = new ArrayDeque(4);
+ private final Queue runQueue = new ConcurrentLinkedQueue();
- /**
- * We explicitly keep track of if the TaskRunner is currently scheduled to
- * run. If it isn't, we start it. We can't just use
- * waitQueue.isEmpty() as a proxy because we need to ensure that only one
- * Runnable submitted is running at a time so even if waitQueue is empty
- * the isThreadScheduled isn't set to false until after the Runnable is
- * finished.
- */
- @GuardedBy("internalLock")
- private boolean isThreadScheduled = false;
-
- /** The object that actually runs the Runnables submitted, reused. */
- private final TaskRunner taskRunner = new TaskRunner();
+ private final AtomicBoolean running = new AtomicBoolean();
/**
* Creates a SerializingExecutor, running tasks using {@code executor}.
@@ -82,90 +70,62 @@ public final class SerializingExecutor implements Executor {
this.executor = executor;
}
- private final Object internalLock = new Object() {
- @Override public String toString() {
- return "SerializingExecutor lock: " + super.toString();
- }
- };
-
/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
*/
@Override
public void execute(Runnable r) {
- Preconditions.checkNotNull(r, "'r' must not be null.");
- boolean scheduleTaskRunner = false;
- synchronized (internalLock) {
- waitQueue.add(r);
+ runQueue.add(checkNotNull(r, "'r' must not be null."));
+ schedule(r);
+ }
- if (!isThreadScheduled) {
- isThreadScheduled = true;
- scheduleTaskRunner = true;
- }
- }
- if (scheduleTaskRunner) {
- boolean threw = true;
+ private void schedule(@Nullable Runnable removable) {
+ if (running.compareAndSet(false, true)) {
+ boolean success = false;
try {
- executor.execute(taskRunner);
- threw = false;
+ executor.execute(this);
+ success = true;
} finally {
- if (threw) {
- synchronized (internalLock) {
- // It is possible that at this point that there are still tasks in
- // the queue, it would be nice to keep trying but the error may not
- // be recoverable. So we update our state and propogate so that if
- // our caller deems it recoverable we won't be stuck.
- isThreadScheduled = false;
+ // It is possible that at this point that there are still tasks in
+ // the queue, it would be nice to keep trying but the error may not
+ // be recoverable. So we update our state and propagate so that if
+ // our caller deems it recoverable we won't be stuck.
+ if (!success) {
+ if (removable != null) {
+ // This case can only be reached if 'this' was not currently running, and we failed to
+ // reschedule. The item should still be in the queue for removal.
+ // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
+ // throw if the item to remove is null. If removable is present in the queue twice,
+ // the wrong one may be removed. It doesn't seem possible for this case to exist today.
+ // This is important to run in case of RejectedExectuionException, so that future calls
+ // to execute don't succeed and accidentally run a previous runnable.
+ runQueue.remove(removable);
}
+ running.set(false);
}
}
}
}
- /**
- * Task that actually runs the Runnables. It takes the Runnables off of the
- * queue one by one and runs them. After it is done with all Runnables and
- * there are no more to run, puts the SerializingExecutor in the state where
- * isThreadScheduled = false and returns. This allows the current worker
- * thread to return to the original pool.
- */
- private class TaskRunner implements Runnable {
- @Override
- public void run() {
- boolean stillRunning = true;
- try {
- while (true) {
- Runnable nextToRun;
- synchronized (internalLock) {
- Preconditions.checkState(isThreadScheduled);
- nextToRun = waitQueue.poll();
- if (nextToRun == null) {
- isThreadScheduled = false;
- stillRunning = false;
- break;
- }
- }
-
- // Always run while not holding the lock, to avoid deadlocks.
- try {
- nextToRun.run();
- } catch (RuntimeException e) {
- // Log it and keep going.
- log.log(Level.SEVERE, "Exception while executing runnable "
- + nextToRun, e);
- }
- }
- } finally {
- if (stillRunning) {
- // An Error is bubbling up, we should mark ourselves as no longer
- // running, that way if anyone tries to keep using us we won't be
- // corrupted.
- synchronized (internalLock) {
- isThreadScheduled = false;
- }
+ @Override
+ public void run() {
+ Runnable r;
+ try {
+ while ((r = runQueue.poll()) != null) {
+ try {
+ r.run();
+ } catch (RuntimeException e) {
+ // Log it and keep going.
+ log.log(Level.SEVERE, "Exception while executing runnable " + r, e);
}
}
+ } finally {
+ running.set(false);
+ }
+ if (!runQueue.isEmpty()) {
+ // we didn't enqueue anything but someone else did.
+ schedule(null);
}
}
}
diff --git a/core/src/test/java/io/grpc/internal/SerializingExecutorTest.java b/core/src/test/java/io/grpc/internal/SerializingExecutorTest.java
new file mode 100644
index 0000000000..aa82ed8dbc
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/SerializingExecutorTest.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2017, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SerializingExecutorTest {
+ private SingleExecutor singleExecutor = new SingleExecutor();
+ private SerializingExecutor executor = new SerializingExecutor(singleExecutor);
+ private List runs = new ArrayList();
+
+ private class AddToRuns implements Runnable {
+ private final int val;
+
+ public AddToRuns(int val) {
+ this.val = val;
+ }
+
+ @Override
+ public void run() {
+ runs.add(val);
+ }
+ }
+
+ @Test
+ public void resumable() {
+ class CoyExecutor implements Executor {
+ int runCount;
+
+ @Override
+ public void execute(Runnable command) {
+ runCount++;
+ if (runCount == 1) {
+ throw new RuntimeException();
+ }
+ command.run();
+ }
+ }
+
+ executor = new SerializingExecutor(new CoyExecutor());
+ try {
+ executor.execute(new AddToRuns(1));
+ fail();
+ } catch (RuntimeException expected) {
+ }
+
+ // Ensure that the runnable enqueued was actually removed on the failed execute above.
+ executor.execute(new AddToRuns(2));
+
+ assertThat(runs).containsExactly(2);
+ }
+
+
+ @Test
+ public void serial() {
+ executor.execute(new AddToRuns(1));
+ assertEquals(Collections.emptyList(), runs);
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1), runs);
+
+ executor.execute(new AddToRuns(2));
+ assertEquals(Arrays.asList(1), runs);
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1, 2), runs);
+ }
+
+ @Test
+ public void parallel() {
+ executor.execute(new AddToRuns(1));
+ executor.execute(new AddToRuns(2));
+ executor.execute(new AddToRuns(3));
+ assertEquals(Collections.emptyList(), runs);
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ }
+
+ @Test
+ public void reentrant() {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ executor.execute(new AddToRuns(3));
+ runs.add(1);
+ }
+ });
+ executor.execute(new AddToRuns(2));
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ }
+
+ @Test
+ public void testFirstRunnableThrows() {
+ final RuntimeException ex = new RuntimeException();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ runs.add(1);
+ throw ex;
+ }
+ });
+ executor.execute(new AddToRuns(2));
+ executor.execute(new AddToRuns(3));
+
+ singleExecutor.drain();
+
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ }
+
+ @Test
+ public void lastRunnableThrows() {
+ final RuntimeException ex = new RuntimeException();
+ executor.execute(new AddToRuns(1));
+ executor.execute(new AddToRuns(2));
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ runs.add(3);
+ throw ex;
+ }
+ });
+
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+
+ // Scheduling more still works
+ executor.execute(new AddToRuns(4));
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(1, 2, 3, 4), runs);
+ }
+
+ @Test
+ public void firstExecuteThrows() {
+ final RuntimeException ex = new RuntimeException();
+ ForwardingExecutor forwardingExecutor = new ForwardingExecutor(new Executor() {
+ @Override
+ public void execute(Runnable r) {
+ throw ex;
+ }
+ });
+ executor = new SerializingExecutor(forwardingExecutor);
+ try {
+ executor.execute(new AddToRuns(1));
+ fail("expected exception");
+ } catch (RuntimeException e) {
+ assertSame(ex, e);
+ }
+ assertEquals(Collections.emptyList(), runs);
+
+ forwardingExecutor.executor = singleExecutor;
+ executor.execute(new AddToRuns(2));
+ executor.execute(new AddToRuns(3));
+ assertEquals(Collections.emptyList(), runs);
+ singleExecutor.drain();
+ assertEquals(Arrays.asList(2, 3), runs);
+ }
+
+ @Test
+ public void direct() {
+ executor = new SerializingExecutor(MoreExecutors.directExecutor());
+ executor.execute(new AddToRuns(1));
+ assertEquals(Arrays.asList(1), runs);
+ executor.execute(new AddToRuns(2));
+ assertEquals(Arrays.asList(1, 2), runs);
+ executor.execute(new AddToRuns(3));
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ }
+
+ @Test
+ public void testDirectReentrant() {
+ executor = new SerializingExecutor(MoreExecutors.directExecutor());
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ executor.execute(new AddToRuns(2));
+ runs.add(1);
+ }
+ });
+ assertEquals(Arrays.asList(1, 2), runs);
+ executor.execute(new AddToRuns(3));
+ assertEquals(Arrays.asList(1, 2, 3), runs);
+ }
+
+ private static class SingleExecutor implements Executor {
+ private Runnable runnable;
+
+ @Override
+ public void execute(Runnable r) {
+ if (runnable != null) {
+ fail("Already have runnable scheduled");
+ }
+ runnable = r;
+ }
+
+ public void drain() {
+ if (runnable != null) {
+ Runnable r = runnable;
+ runnable = null;
+ r.run();
+ }
+ }
+ }
+
+ private static class ForwardingExecutor implements Executor {
+ Executor executor;
+
+ public ForwardingExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ executor.execute(r);
+ }
+ }
+}