Fix some timing hole in these two testcases

This commit is contained in:
Jesse Lee 2020-05-27 14:22:37 -04:00
parent e311473401
commit 3b80c10b1e
2 changed files with 19 additions and 8 deletions

View File

@ -55,6 +55,7 @@ private:
uint32_t last_input_; uint32_t last_input_;
uint32_t sleep_ms_ = 0; uint32_t sleep_ms_ = 0;
std::vector<uint32_t> input_; std::vector<uint32_t> input_;
WaitPost wp;
// This worker loop is to be called by a single thread. It will pop my_conn Connector // This worker loop is to be called by a single thread. It will pop my_conn Connector
// and populate output vector // and populate output vector
@ -129,11 +130,13 @@ MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) {
for (int i = 1; i <= last_input_; i++) { for (int i = 1; i <= last_input_; i++) {
input_.push_back(i); input_.push_back(i);
} }
wp.Register(tg_.get());
} }
Status MindDataTestConnector::Run_test_0() { Status MindDataTestConnector::Run_test_0() {
Status rc; Status rc;
std::vector<uint32_t> output; std::vector<uint32_t> output;
wp.Clear();
auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers
1, // num of consumers 1, // num of consumers
10); // capacity of each queue 10); // capacity of each queue
@ -160,8 +163,11 @@ Status MindDataTestConnector::Run_test_0() {
my_conn, my_conn,
&output)); &output));
RETURN_IF_NOT_OK(rc); RETURN_IF_NOT_OK(rc);
// Wait for the threads to finish.
tg_->join_all(); rc = wp.Wait();
EXPECT_TRUE(rc.IsOk());
tg_->interrupt_all();
tg_->join_all(Task::WaitFlag::kNonBlocking);
my_conn.reset(); my_conn.reset();
return ValidateOutput(output); return ValidateOutput(output);
} }
@ -169,6 +175,7 @@ Status MindDataTestConnector::Run_test_0() {
Status MindDataTestConnector::Run_test_1() { Status MindDataTestConnector::Run_test_1() {
std::vector<uint32_t> output; std::vector<uint32_t> output;
Status rc; Status rc;
wp.Clear();
// number of threads in each layer // number of threads in each layer
int l1_threads = 15; int l1_threads = 15;
@ -223,8 +230,11 @@ Status MindDataTestConnector::Run_test_1() {
conn2, // poping the data from conn2 conn2, // poping the data from conn2
&output)); &output));
RETURN_IF_NOT_OK(rc); RETURN_IF_NOT_OK(rc);
// Wait for the threads to finish.
tg_->join_all(); rc = wp.Wait();
EXPECT_TRUE(rc.IsOk());
tg_->interrupt_all();
tg_->join_all(Task::WaitFlag::kNonBlocking);
conn1.reset(); conn1.reset();
conn2.reset(); conn2.reset();
@ -250,11 +260,11 @@ Status MindDataTestConnector::SerialWorkerPull(
GoToSleep(sleep_ms_); GoToSleep(sleep_ms_);
} }
// Raising interrupt after it processed the last_input_. // Signal master thread after it processed the last_input_.
// This will trigger the MidWorkerJob threads to quit their worker loop. // This will trigger the MidWorkerJob threads to quit their worker loop.
if (res == last_input_) { if (res == last_input_) {
MS_LOG(INFO) << "All data is collected."; MS_LOG(INFO) << "All data is collected.";
tg_->interrupt_all(); wp.Set();
break; break;
} }
} }

View File

@ -38,12 +38,13 @@ TEST_F(MindDataTestTaskManager, Test1) {
ASSERT_TRUE(vg_rc.IsOk() || vg_rc.IsOutofMemory()); ASSERT_TRUE(vg_rc.IsOk() || vg_rc.IsOutofMemory());
ASSERT_TRUE(vg.join_all().IsOk()); ASSERT_TRUE(vg.join_all().IsOk());
ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOutofMemory()); ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOutofMemory());
// Test the error is passed back to the master thread. // Test the error is passed back to the master thread if vg_rc above is OK.
// If vg_rc is kOutOfMemory, the group error is already passed back.
// Some compiler may choose to run the next line in parallel with the above 3 lines // Some compiler may choose to run the next line in parallel with the above 3 lines
// and this will cause some mismatch once a while. // and this will cause some mismatch once a while.
// To block this racing condition, we need to create a dependency that the next line // To block this racing condition, we need to create a dependency that the next line
// depends on previous lines. // depends on previous lines.
if (vg.GetTaskErrorIfAny().IsError()) { if (vg.GetTaskErrorIfAny().IsError() && vg_rc.IsOk()) {
Status rc = TaskManager::GetMasterThreadRc(); Status rc = TaskManager::GetMasterThreadRc();
ASSERT_TRUE(rc.IsOutofMemory()); ASSERT_TRUE(rc.IsOutofMemory());
} }