Merge pull request #7786 from sfc-gh-mdvorsky/mdvorsky/net2_unittest

Add Net2 unittests for ThreadSafeQueue and Net2::onMainThread
This commit is contained in:
Markus Pilman 2022-08-08 11:23:31 -06:00 committed by GitHub
commit c9865809e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 63 deletions

View File

@ -52,6 +52,8 @@
#include "flow/TLSConfig.actor.h"
#include "flow/genericactors.actor.h"
#include "flow/Util.h"
#include "flow/UnitTest.h"
#include "flow/ScopeExit.h"
#ifdef ADDRESS_SANITIZER
#include <sanitizer/lsan_interface.h>
@ -2138,71 +2140,141 @@ void startThreadF(F&& func) {
}
};
Thing* t = new Thing(std::move(func));
startThread(Thing::start, t);
g_network->startThread(Thing::start, t);
}
TEST_CASE("/flow/Net2/ThreadSafeQueue/Interface") {
ThreadSafeQueue<int> tq;
ASSERT(!tq.pop().present());
ASSERT(tq.canSleep());
ASSERT(tq.push(1) == true);
ASSERT(!tq.canSleep());
ASSERT(!tq.canSleep());
ASSERT(tq.push(2) == false);
ASSERT(tq.push(3) == false);
ASSERT(tq.pop().get() == 1);
ASSERT(tq.pop().get() == 2);
ASSERT(tq.push(4) == false);
ASSERT(tq.pop().get() == 3);
ASSERT(tq.pop().get() == 4);
ASSERT(!tq.pop().present());
ASSERT(tq.canSleep());
return Void();
}
// A helper struct used by queueing tests which use multiple threads.
struct QueueTestThreadState {
QueueTestThreadState(int threadId, int toProduce) : threadId(threadId), toProduce(toProduce) {}
int threadId;
int toProduce;
int produced = 0;
Promise<Void> doneProducing;
int consumed = 0;
static int valueToThreadId(int value) { return value >> 20; }
int elementValue(int index) { return index + (threadId << 20); }
int nextProduced() { return elementValue(produced++); }
int nextConsumed() { return elementValue(consumed++); }
void checkDone() {
ASSERT_EQ(produced, toProduce);
ASSERT_EQ(consumed, produced);
}
};
TEST_CASE("/flow/Net2/ThreadSafeQueue/Threaded") {
// Uses ThreadSafeQueue from multiple threads. Verifies that all pushed elements are popped, maintaining the
// ordering within a thread.
ThreadSafeQueue<int> queue;
state std::vector<QueueTestThreadState> perThread = { QueueTestThreadState(0, 1000000),
QueueTestThreadState(1, 100000),
QueueTestThreadState(2, 1000000) };
state std::vector<Future<Void>> doneProducing;
int total = 0;
for (int t = 0; t < perThread.size(); ++t) {
auto& s = perThread[t];
doneProducing.push_back(s.doneProducing.getFuture());
total += s.toProduce;
startThreadF([&queue, &s]() {
printf("Thread%d\n", s.threadId);
int nextYield = 0;
while (s.produced < s.toProduce) {
queue.push(s.nextProduced());
if (nextYield-- == 0) {
std::this_thread::yield();
nextYield = nondeterministicRandom()->randomInt(0, 100);
}
}
printf("T%dDone\n", s.threadId);
s.doneProducing.send(Void());
});
}
int consumed = 0;
while (consumed < total) {
Optional<int> element = queue.pop();
if (element.present()) {
int v = element.get();
auto& s = perThread[QueueTestThreadState::valueToThreadId(v)];
++consumed;
ASSERT(v == s.nextConsumed());
} else {
std::this_thread::yield();
}
if ((consumed & 3) == 0)
queue.canSleep();
}
wait(waitForAll(doneProducing));
for (int t = 0; t < std::size(perThread); ++t) {
perThread[t].checkDone();
}
return Void();
}
// NB: This could be a test for any INetwork implementation, but Sim2 doesn't
// satisfy this requirement yet.
TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
// Verifies that signals processed by onMainThread() are executed in order.
state std::vector<QueueTestThreadState> perThread = { QueueTestThreadState(0, 1000000),
QueueTestThreadState(1, 100000),
QueueTestThreadState(2, 1000000) };
state std::vector<Future<Void>> doneProducing;
for (int t = 0; t < perThread.size(); ++t) {
auto& s = perThread[t];
doneProducing.push_back(s.doneProducing.getFuture());
startThreadF([&s]() {
int nextYield = 0;
while (s.produced < s.toProduce) {
if (nextYield-- == 0) {
std::this_thread::yield();
nextYield = nondeterministicRandom()->randomInt(0, 100);
}
int v = s.nextProduced();
onMainThreadVoid([&s, v]() { ASSERT_EQ(v, s.nextConsumed()); });
}
s.doneProducing.send(Void());
});
}
wait(waitForAll(doneProducing));
// Wait for one more onMainThread to wait for all scheduled signals to be executed.
Promise<Void> signal;
state Future<Void> doneConsuming = signal.getFuture();
g_network->onMainThread(std::move(signal), TaskPriority::DefaultOnMainThread);
wait(doneConsuming);
for (int t = 0; t < std::size(perThread); ++t) {
perThread[t].checkDone();
}
return Void();
}
void net2_test(){
/*printf("ThreadSafeQueue test\n");
printf(" Interface: ");
ThreadSafeQueue<int> tq;
ASSERT( tq.canSleep() == true );
ASSERT( tq.push( 1 ) == true ) ;
ASSERT( tq.push( 2 ) == false );
ASSERT( tq.push( 3 ) == false );
ASSERT( tq.pop().get() == 1 );
ASSERT( tq.pop().get() == 2 );
ASSERT( tq.push( 4 ) == false );
ASSERT( tq.pop().get() == 3 );
ASSERT( tq.pop().get() == 4 );
ASSERT( !tq.pop().present() );
printf("OK\n");
printf("Threaded: ");
Event finished, finished2;
int thread1Iterations = 1000000, thread2Iterations = 100000;
if (thread1Iterations)
startThreadF([&](){
printf("Thread1\n");
for(int i=0; i<thread1Iterations; i++)
tq.push(i);
printf("T1Done\n");
finished.set();
});
if (thread2Iterations)
startThreadF([&](){
printf("Thread2\n");
for(int i=0; i<thread2Iterations; i++)
tq.push(i + (1<<20));
printf("T2Done\n");
finished2.set();
});
int c = 0, mx[2]={0, 1<<20}, p = 0;
while (c < thread1Iterations + thread2Iterations)
{
Optional<int> i = tq.pop();
if (i.present()) {
int v = i.get();
++c;
if (mx[v>>20] != v)
printf("Wrong value dequeued!\n");
ASSERT( mx[v>>20] == v );
mx[v>>20] = v + 1;
} else {
++p;
_mm_pause();
}
if ((c&3)==0) tq.canSleep();
}
printf("%d %d %x %x %s\n", c, p, mx[0], mx[1], mx[0]==thread1Iterations && mx[1]==(1<<20)+thread2Iterations ? "OK" :
"FAIL");
finished.block();
finished2.block();
/*
g_network = newNet2(); // for promise serialization below
Endpoint destination;

View File

@ -579,7 +579,7 @@ public:
// Returns true if the current thread is the main thread
virtual void onMainThread(Promise<Void>&& signal, TaskPriority taskID) = 0;
// Executes signal.send(Void()) on a/the thread belonging to this network
// Executes signal.send(Void()) on a/the thread belonging to this network in FIFO order
virtual THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*),
void* arg,