mirror of https://github.com/ByConity/ByConity
Merge pull request #18199 from ClickHouse/add_one_more_test_in_util
Add one more case in zk test util
This commit is contained in:
commit
dbfe50c0c1
|
@ -6,10 +6,15 @@
|
|||
#include <common/LineReader.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <fmt/format.h>
|
||||
#include <random>
|
||||
#include <iterator>
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <exception>
|
||||
#include <future>
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@ -144,9 +149,98 @@ void testMultiRequest(zkutil::ZooKeeper & zk)
|
|||
checkEq(zk, "/data/multirequest", "bbb");
|
||||
}
|
||||
|
||||
std::mutex elements_mutex;
|
||||
std::vector<int> current_elements;
|
||||
std::atomic<int> watches_triggered = 0;
|
||||
|
||||
void triggerWatch(const Coordination::WatchResponse &)
|
||||
{
|
||||
watches_triggered++;
|
||||
}
|
||||
|
||||
template<typename Iter, typename RandomGenerator>
|
||||
Iter select_randomly(Iter start, Iter end, RandomGenerator& g)
|
||||
{
|
||||
std::uniform_int_distribution<> dis(0, std::distance(start, end) - 1);
|
||||
std::advance(start, dis(g));
|
||||
return start;
|
||||
}
|
||||
|
||||
template<typename Iter>
|
||||
Iter select_randomly(Iter start, Iter end)
|
||||
{
|
||||
static std::random_device rd;
|
||||
static std::mt19937 gen(rd());
|
||||
return select_randomly(start, end, gen);
|
||||
}
|
||||
|
||||
std::atomic<int> element_counter = 0;
|
||||
std::atomic<int> failed_setup_counter = 0;
|
||||
|
||||
void createPathAndSetWatch(zkutil::ZooKeeper & zk, const String & path_prefix, size_t total)
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
{
|
||||
int element = element_counter++;
|
||||
zk.createIfNotExists(path_prefix + "/" + std::to_string(element), "");
|
||||
|
||||
std::string result;
|
||||
if (!zk.tryGetWatch(path_prefix + "/" + std::to_string(element), result, nullptr, triggerWatch))
|
||||
failed_setup_counter++;
|
||||
|
||||
{
|
||||
std::lock_guard lock(elements_mutex);
|
||||
current_elements.push_back(element);
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
|
||||
{
|
||||
std::lock_guard lock(elements_mutex);
|
||||
if (current_elements.empty())
|
||||
continue;
|
||||
element = *select_randomly(current_elements.begin(), current_elements.end());
|
||||
current_elements.erase(std::remove(current_elements.begin(), current_elements.end(), element), current_elements.end());
|
||||
}
|
||||
zk.tryRemove(path_prefix + "/" + std::to_string(element));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void tryConcurrentWatches(zkutil::ZooKeeper & zk)
|
||||
{
|
||||
std::string path_prefix = "/concurrent_watches";
|
||||
std::vector<std::future<void>> asyncs;
|
||||
zk.createIfNotExists(path_prefix, "");
|
||||
for (size_t i = 0; i < 100; ++i)
|
||||
{
|
||||
auto callback = [&zk, path_prefix] ()
|
||||
{
|
||||
createPathAndSetWatch(zk, path_prefix, 100);
|
||||
};
|
||||
asyncs.push_back(std::async(std::launch::async, callback));
|
||||
}
|
||||
|
||||
for (auto & async : asyncs)
|
||||
{
|
||||
async.wait();
|
||||
}
|
||||
|
||||
size_t counter = 0;
|
||||
while (watches_triggered != 100 * 100)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
if (counter++ > 20)
|
||||
break;
|
||||
}
|
||||
|
||||
std::cerr << "Failed setup counter:" << failed_setup_counter << std::endl;
|
||||
std::cerr << "Current elements size:" << current_elements.size() << std::endl;
|
||||
std::cerr << "WatchesTriggered:" << watches_triggered << std::endl;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
|
||||
if (argc != 2)
|
||||
{
|
||||
std::cerr << "usage: " << argv[0] << " hosts" << std::endl;
|
||||
|
@ -168,6 +262,7 @@ int main(int argc, char *argv[])
|
|||
testMultiRequest(zk);
|
||||
testCreateSetWatchEvent(zk);
|
||||
testCreateListWatchEvent(zk);
|
||||
tryConcurrentWatches(zk);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue