Added unit test, and bug fixes.

This commit is contained in:
helium 2021-08-09 16:26:29 -07:00
parent 9a9346afaf
commit 15d050ece5
4 changed files with 51 additions and 7 deletions

View File

@ -115,12 +115,9 @@ template <class T>
class ThreadReturnPromiseStream : NonCopyable {
public:
ThreadReturnPromiseStream() {}
~ThreadReturnPromiseStream() {
if (!promiseStream.isValid())
sendError(broken_promise());
}
~ThreadReturnPromiseStream() {}
Future<T> getFuture() { // Call only on the originating thread!
FutureStream<T> getFuture() { // Call only on the originating thread!
return promiseStream.getFuture();
}

View File

@ -11,7 +11,11 @@
void forceLinkIThreadPoolTests() {}
static ThreadReturnPromiseStream<std::string> notifications;
struct ThreadNameReceiver final : IThreadPoolReceiver {
ThreadNameReceiver(const bool stream_signal = false) : stream_signal(stream_signal) {}
void init() override {}
struct GetNameAction final : TypedAction<ThreadNameReceiver, GetNameAction> {
@ -28,11 +32,19 @@ struct ThreadNameReceiver final : IThreadPoolReceiver {
if (err != 0) {
std::cout << "Get name failed with error code: " << err << std::endl;
a.name.sendError(platform_error());
if (stream_signal) {
notifications.sendError(platform_error());
}
return;
}
std::string s = name;
if (stream_signal) {
notifications.send(s);
}
a.name.send(std::move(s));
}
bool stream_signal;
};
TEST_CASE("/flow/IThreadPool/NamedThread") {
@ -60,6 +72,41 @@ TEST_CASE("/flow/IThreadPool/NamedThread") {
return Void();
}
TEST_CASE("/flow/IThreadPool/ThreadReturnPromiseStream") {
noUnseed = true;
state Reference<IThreadPool> pool = createGenericThreadPool();
pool->addThread(new ThreadNameReceiver(/*stream_signal=*/true), "thread-foo");
// Warning: this action is a little racy with the call to `pthread_setname_np`. In practice,
// ~nothing should depend on the thread name being set instantaneously. If this test ever
// flakes, we can make `startThread` in platform a little bit more complex to clearly order
// the actions.
state int num = 3;
for (int i = 0; i < num; ++i) {
auto* a = new ThreadNameReceiver::GetNameAction();
pool->post(a);
}
state FutureStream<std::string> futs = notifications.getFuture();
state int n = 0;
while (n < num) {
std::string name = waitNext(futs);
if (name != "thread-foo") {
std::cout << "Incorrect thread name: " << name << std::endl;
ASSERT(false);
}
++n;
}
ASSERT(n == num);
wait(pool->stop());
return Void();
}
#else
void forceLinkIThreadPoolTests() {}
#endif

View File

@ -1269,7 +1269,7 @@ void tagAndForwardError(Promise<T>* pOutputPromise, Error value, Future<Void> si
ACTOR template <class T>
void tagAndForwardError(PromiseStream<T>* pOutput, Error value, Future<Void> signal) {
wait(signal);
pOutput.sendError(value);
pOutput->sendError(value);
}
ACTOR template <class T>

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/fdbserver/worker/
testsMatching=/flow/IThreadPool/