From 15d050ece59030092c09584f22aac109d337c295 Mon Sep 17 00:00:00 2001 From: helium Date: Mon, 9 Aug 2021 16:26:29 -0700 Subject: [PATCH] Added unit test, and bug fixes. --- flow/IThreadPool.h | 7 ++--- flow/IThreadPoolTest.actor.cpp | 47 ++++++++++++++++++++++++++++++++++ flow/genericactors.actor.h | 2 +- tests/WorkerTests.txt | 2 +- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/flow/IThreadPool.h b/flow/IThreadPool.h index d1700951a9..a34f6c708d 100644 --- a/flow/IThreadPool.h +++ b/flow/IThreadPool.h @@ -115,12 +115,9 @@ template class ThreadReturnPromiseStream : NonCopyable { public: ThreadReturnPromiseStream() {} - ~ThreadReturnPromiseStream() { - if (!promiseStream.isValid()) - sendError(broken_promise()); - } + ~ThreadReturnPromiseStream() {} - Future getFuture() { // Call only on the originating thread! + FutureStream getFuture() { // Call only on the originating thread! return promiseStream.getFuture(); } diff --git a/flow/IThreadPoolTest.actor.cpp b/flow/IThreadPoolTest.actor.cpp index 371717f55b..44e5889b92 100644 --- a/flow/IThreadPoolTest.actor.cpp +++ b/flow/IThreadPoolTest.actor.cpp @@ -11,7 +11,11 @@ void forceLinkIThreadPoolTests() {} +static ThreadReturnPromiseStream notifications; + struct ThreadNameReceiver final : IThreadPoolReceiver { + ThreadNameReceiver(const bool stream_signal = false) : stream_signal(stream_signal) {} + void init() override {} struct GetNameAction final : TypedAction { @@ -28,11 +32,19 @@ struct ThreadNameReceiver final : IThreadPoolReceiver { if (err != 0) { std::cout << "Get name failed with error code: " << err << std::endl; a.name.sendError(platform_error()); + if (stream_signal) { + notifications.sendError(platform_error()); + } return; } std::string s = name; + if (stream_signal) { + notifications.send(s); + } a.name.send(std::move(s)); } + + bool stream_signal; }; TEST_CASE("/flow/IThreadPool/NamedThread") { @@ -60,6 +72,41 @@ TEST_CASE("/flow/IThreadPool/NamedThread") { return Void(); } +TEST_CASE("/flow/IThreadPool/ThreadReturnPromiseStream") { + noUnseed = true; + + state Reference pool = createGenericThreadPool(); + pool->addThread(new ThreadNameReceiver(/*stream_signal=*/true), "thread-foo"); + + // Warning: this action is a little racy with the call to `pthread_setname_np`. In practice, + // ~nothing should depend on the thread name being set instantaneously. If this test ever + // flakes, we can make `startThread` in platform a little bit more complex to clearly order + // the actions. + state int num = 3; + for (int i = 0; i < num; ++i) { + auto* a = new ThreadNameReceiver::GetNameAction(); + pool->post(a); + } + + state FutureStream futs = notifications.getFuture(); + + state int n = 0; + while (n < num) { + std::string name = waitNext(futs); + if (name != "thread-foo") { + std::cout << "Incorrect thread name: " << name << std::endl; + ASSERT(false); + } + ++n; + } + + ASSERT(n == num); + + wait(pool->stop()); + + return Void(); +} + #else void forceLinkIThreadPoolTests() {} #endif diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 52e5907989..6d71a65b28 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1269,7 +1269,7 @@ void tagAndForwardError(Promise* pOutputPromise, Error value, Future si ACTOR template void tagAndForwardError(PromiseStream* pOutput, Error value, Future signal) { wait(signal); - pOutput.sendError(value); + pOutput->sendError(value); } ACTOR template diff --git a/tests/WorkerTests.txt b/tests/WorkerTests.txt index cd94c2b112..d1175f82e8 100644 --- a/tests/WorkerTests.txt +++ b/tests/WorkerTests.txt @@ -4,4 +4,4 @@ useDB=false testName=UnitTests maxTestCases=0 - testsMatching=/fdbserver/worker/ + testsMatching=/flow/IThreadPool/