From e80b93939f451202e579624602ef3ec8ac596a07 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 17 Dec 2020 18:09:09 +0300 Subject: [PATCH 1/2] Add one more case in zk test uti --- utils/zookeeper-test/main.cpp | 95 ++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/utils/zookeeper-test/main.cpp b/utils/zookeeper-test/main.cpp index fe7bf93fd1..1854471add 100644 --- a/utils/zookeeper-test/main.cpp +++ b/utils/zookeeper-test/main.cpp @@ -6,10 +6,14 @@ #include #include #include +#include +#include +#include #include #include #include +#include using namespace std; @@ -144,9 +148,97 @@ void testMultiRequest(zkutil::ZooKeeper & zk) checkEq(zk, "/data/multirequest", "bbb"); } +std::mutex elements_mutex; +std::vector current_elements; +std::atomic watches_triggered = 0; + +void triggerWatch(const Coordination::WatchResponse &) +{ + watches_triggered++; +} + +template +Iter select_randomly(Iter start, Iter end, RandomGenerator& g) +{ + std::uniform_int_distribution<> dis(0, std::distance(start, end) - 1); + std::advance(start, dis(g)); + return start; +} + +template +Iter select_randomly(Iter start, Iter end) +{ + static std::random_device rd; + static std::mt19937 gen(rd()); + return select_randomly(start, end, gen); +} + +std::atomic element_counter = 0; +std::atomic failed_setup_counter = 0; + +void createPathAndSetWatch(zkutil::ZooKeeper & zk, const String & path_prefix, size_t total) +{ + for (size_t i = 0; i < total; ++i) + { + int element = element_counter++; + zk.createIfNotExists(path_prefix + "/" + std::to_string(element), ""); + + std::string result; + if (!zk.tryGetWatch(path_prefix + "/" + std::to_string(element), result, nullptr, triggerWatch)) + failed_setup_counter++; + + { + std::lock_guard lock(elements_mutex); + current_elements.push_back(element); + } + sleep(0.2); + + { + std::lock_guard lock(elements_mutex); + if (current_elements.empty()) + continue; + element = *select_randomly(current_elements.begin(), current_elements.end()); + current_elements.erase(std::remove(current_elements.begin(), current_elements.end(), element), current_elements.end()); + } + zk.tryRemove(path_prefix + "/" + std::to_string(element)); + } + +} + +void tryConcurrentWatches(zkutil::ZooKeeper & zk) +{ + std::string path_prefix = "/concurrent_watches"; + std::vector> asyncs; + zk.createIfNotExists(path_prefix, ""); + for (size_t i = 0; i < 100; ++i) + { + auto callback = [&zk, path_prefix] () + { + createPathAndSetWatch(zk, path_prefix, 100); + }; + asyncs.push_back(std::async(std::launch::async, callback)); + } + + for (auto & async : asyncs) + { + async.wait(); + } + + size_t counter = 0; + while (watches_triggered != 100 * 100) + { + sleep(0.1); + if (counter++ > 20) + break; + } + + std::cerr << "Failed setup counter:" << failed_setup_counter << std::endl; + std::cerr << "Current elements size:" << current_elements.size() << std::endl; + std::cerr << "WatchesTriggered:" << watches_triggered << std::endl; +} + int main(int argc, char *argv[]) { - if (argc != 2) { std::cerr << "usage: " << argv[0] << " hosts" << std::endl; @@ -168,6 +260,7 @@ int main(int argc, char *argv[]) testMultiRequest(zk); testCreateSetWatchEvent(zk); testCreateListWatchEvent(zk); + tryConcurrentWatches(zk); } catch (...) { From a207e3db7555b2984275a55a2116af48ece3af48 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 17 Dec 2020 19:21:46 +0300 Subject: [PATCH 2/2] Fix bad sleep --- utils/zookeeper-test/main.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/utils/zookeeper-test/main.cpp b/utils/zookeeper-test/main.cpp index 1854471add..8f8aac0086 100644 --- a/utils/zookeeper-test/main.cpp +++ b/utils/zookeeper-test/main.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -191,7 +192,8 @@ void createPathAndSetWatch(zkutil::ZooKeeper & zk, const String & path_prefix, s std::lock_guard lock(elements_mutex); current_elements.push_back(element); } - sleep(0.2); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); { std::lock_guard lock(elements_mutex); @@ -227,7 +229,7 @@ void tryConcurrentWatches(zkutil::ZooKeeper & zk) size_t counter = 0; while (watches_triggered != 100 * 100) { - sleep(0.1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (counter++ > 20) break; }