mirror of https://github.com/ByConity/ByConity
Merge 'fix-node-selector-extreme-unbalance' into 'cnch-dev'
fix(clickhousech@m-5285095990): fix extreme uneven part distribution See merge request: !24925
This commit is contained in:
parent
8300dfb78e
commit
9dd300f6f9
|
@ -1,4 +1,5 @@
|
|||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
|
@ -292,8 +293,43 @@ NodeSelectorResult LocalNodeSelector::select(PlanSegment *, ContextPtr query_con
|
|||
return result;
|
||||
}
|
||||
|
||||
/// workers will be assigning tasks by the order of its data size
|
||||
std::vector<const AddressInfo *> orderAddrByRows(std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads)
|
||||
{
|
||||
std::vector<const AddressInfo *> ordered_addrs;
|
||||
ordered_addrs.reserve(payloads.size());
|
||||
for (const auto & p : payloads)
|
||||
ordered_addrs.emplace_back(&p.first);
|
||||
/// order workers by weight
|
||||
std::sort(ordered_addrs.begin(), ordered_addrs.end(), [&](auto * l, auto * r) { return payloads[*l].rows > payloads[*r].rows; });
|
||||
return ordered_addrs;
|
||||
}
|
||||
|
||||
/// assign one task for each non-empty worker
|
||||
size_t initNodeSelectorResult(
|
||||
const std::vector<const AddressInfo *> & ordered_addrs,
|
||||
std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
size_t parallel_size,
|
||||
NodeSelectorResult & result,
|
||||
std::function<void(const AddressInfo &)> init_source_func)
|
||||
{
|
||||
size_t assigned_instances = 0;
|
||||
for (const auto * addr_p : ordered_addrs)
|
||||
{
|
||||
const auto & addr = *addr_p;
|
||||
const auto & payload_on_worker = payloads[addr];
|
||||
if (payload_on_worker.rows > 0 && parallel_size > 0)
|
||||
{
|
||||
init_source_func(addr);
|
||||
result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, payload_on_worker.worker_id));
|
||||
assigned_instances++;
|
||||
}
|
||||
}
|
||||
return assigned_instances;
|
||||
}
|
||||
|
||||
void divideSourceTaskByBucket(
|
||||
const std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
size_t weight_sum,
|
||||
size_t parallel_size,
|
||||
NodeSelectorResult & result)
|
||||
|
@ -303,13 +339,19 @@ void divideSourceTaskByBucket(
|
|||
ErrorCodes::LOGICAL_ERROR,
|
||||
fmt::format("Invalid argument for divideSourceTaskByBucket payloads.size:{} parallel_size:{}", payloads.size(), parallel_size));
|
||||
|
||||
size_t assigned_instances = 0;
|
||||
std::vector<const AddressInfo *> ordered_addrs = orderAddrByRows(payloads);
|
||||
size_t assigned_instances = initNodeSelectorResult(ordered_addrs, payloads, parallel_size, result, [&](const AddressInfo & addr) {
|
||||
result.buckets_on_workers[addr].emplace_back(std::set<Int64>{});
|
||||
});
|
||||
|
||||
while (assigned_instances < parallel_size)
|
||||
{
|
||||
for (const auto & [addr, payload_on_worker] : payloads)
|
||||
for (const auto & addr_p : ordered_addrs)
|
||||
{
|
||||
size_t weight = payload_on_worker.rows;
|
||||
size_t to_be_assigned = weight_sum == 0 ? 0 : weight * parallel_size / weight_sum;
|
||||
const auto & addr = *addr_p;
|
||||
const auto & payload_on_worker = payloads.find(addr)->second;
|
||||
|
||||
size_t to_be_assigned = weight_sum == 0 ? 0 : payload_on_worker.rows * parallel_size / weight_sum;
|
||||
/// to_be_assigned <= bucket_groups.size, as to avoid empty plan segment instance.
|
||||
to_be_assigned = std::min(to_be_assigned, payload_on_worker.bucket_groups.size());
|
||||
size_t already_assigned = std::min(to_be_assigned, result.buckets_on_workers[addr].size());
|
||||
|
@ -357,7 +399,7 @@ void divideSourceTaskByBucket(
|
|||
}
|
||||
|
||||
void divideSourceTaskByPart(
|
||||
const std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
size_t weight_sum,
|
||||
size_t parallel_size,
|
||||
NodeSelectorResult & result)
|
||||
|
@ -367,25 +409,29 @@ void divideSourceTaskByPart(
|
|||
ErrorCodes::LOGICAL_ERROR,
|
||||
fmt::format("Invalid argument for divideSourceTaskByPart payloads.size:{} parallel_size:{}", payloads.size(), parallel_size));
|
||||
|
||||
size_t assigned_instances = 0;
|
||||
std::vector<const AddressInfo *> ordered_addrs = orderAddrByRows(payloads);
|
||||
size_t assigned_instances = initNodeSelectorResult(
|
||||
ordered_addrs, payloads, parallel_size, result, [&](const AddressInfo & addr) { result.source_task_count_on_workers[addr]++; });
|
||||
|
||||
while (assigned_instances < parallel_size)
|
||||
{
|
||||
for (auto iter = payloads.begin(); iter != payloads.end() && assigned_instances < parallel_size; iter++)
|
||||
for (const auto & addr_p : ordered_addrs)
|
||||
{
|
||||
const auto & addr = iter->first;
|
||||
const auto & payload_on_worker = iter->second;
|
||||
size_t weight = payload_on_worker.rows;
|
||||
size_t to_be_assigned = weight_sum == 0 ? 0 : weight * parallel_size / weight_sum;
|
||||
const auto & addr = *addr_p;
|
||||
const auto & payload_on_worker = payloads.find(addr)->second;
|
||||
|
||||
size_t to_be_assigned = weight_sum == 0 ? 0 : payload_on_worker.rows * parallel_size / weight_sum;
|
||||
/// to_be_assigned <= part num, as to avoid empty plan segment instance.
|
||||
to_be_assigned = std::min(to_be_assigned, payload_on_worker.part_num);
|
||||
size_t already_assigned = std::min(to_be_assigned, result.source_task_count_on_workers[addr]);
|
||||
to_be_assigned = to_be_assigned - already_assigned;
|
||||
/// make sure there is no infinte loop
|
||||
to_be_assigned = std::max(1UL, to_be_assigned);
|
||||
|
||||
for (size_t p = 0; p < to_be_assigned && assigned_instances < parallel_size; p++)
|
||||
{
|
||||
result.source_task_count_on_workers[addr]++;
|
||||
result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, iter->second.worker_id));
|
||||
result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, payload_on_worker.worker_id));
|
||||
assigned_instances++;
|
||||
}
|
||||
}
|
||||
|
@ -452,65 +498,6 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co
|
|||
worker_number);
|
||||
}
|
||||
|
||||
/// will be obsolete in the future
|
||||
auto old_func = [&](std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payload_on_workers,
|
||||
size_t rows_count,
|
||||
size_t parallel_size) {
|
||||
size_t avg = rows_count / parallel_size + 1;
|
||||
if (rows_count < parallel_size)
|
||||
rows_count = 0;
|
||||
if (rows_count > 0)
|
||||
{
|
||||
// Assign parallelism accroding to regular average size.
|
||||
for (auto & [addr, payload] : payload_on_workers)
|
||||
{
|
||||
size_t s = payload.rows;
|
||||
size_t p = s / avg;
|
||||
if (p > 0)
|
||||
{
|
||||
s = s % avg;
|
||||
}
|
||||
if (p == 0 && s > 0)
|
||||
{
|
||||
p = 1;
|
||||
s = 0;
|
||||
}
|
||||
for (size_t i = 0; i < p; i++)
|
||||
{
|
||||
result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id);
|
||||
result.source_task_count_on_workers[addr]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Assign parallelism according to major part(>0.5) of average size, if needed.
|
||||
if (result.worker_nodes.size() < parallel_size)
|
||||
{
|
||||
for (auto & [addr, payload] : payload_on_workers)
|
||||
{
|
||||
size_t s = payload.rows;
|
||||
if (s * 2 > avg)
|
||||
{
|
||||
result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id);
|
||||
result.source_task_count_on_workers[addr]++;
|
||||
s = 0;
|
||||
}
|
||||
if (result.worker_nodes.size() == parallel_size)
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Assign parallelism to each worker until no one left.
|
||||
while (result.worker_nodes.size() < parallel_size)
|
||||
{
|
||||
for (const auto & [addr, payload] : payload_on_workers)
|
||||
{
|
||||
result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id);
|
||||
result.source_task_count_on_workers[addr]++;
|
||||
if (result.worker_nodes.size() == parallel_size)
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If parallelism is greater than the worker number, we split the parts according to the input size.
|
||||
if (plan_segment_ptr->getParallelSize() > worker_number)
|
||||
{
|
||||
|
@ -560,7 +547,7 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co
|
|||
if (is_bucket_valid)
|
||||
divideSourceTaskByBucket(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize(), result);
|
||||
else
|
||||
old_func(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize());
|
||||
divideSourceTaskByPart(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize(), result);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -91,12 +91,12 @@ struct ClusterNodes
|
|||
|
||||
struct NodeSelectorResult;
|
||||
void divideSourceTaskByBucket(
|
||||
const std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
size_t weight_sum,
|
||||
size_t parallel_size,
|
||||
NodeSelectorResult & result);
|
||||
void divideSourceTaskByPart(
|
||||
const std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
std::unordered_map<AddressInfo, SourceTaskPayloadOnWorker, AddressInfo::Hash> & payloads,
|
||||
size_t weight_sum,
|
||||
size_t parallel_size,
|
||||
NodeSelectorResult & result);
|
||||
|
|
|
@ -53,6 +53,8 @@ void checkDistributeBucketResultMap(
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3, .bucket_groups = {{0, {0}}, {2, {2}}, {4, {4}}}};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -61,10 +63,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2, 4}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1}, {3, 5}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 6;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -78,6 +80,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1)
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase2)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{
|
||||
.worker_id = "1",
|
||||
|
@ -90,10 +94,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase2)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0, 2}, {4, 6}, {8, 10, 12}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1, 3}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 9;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -107,6 +111,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase2)
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase3)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 2, .part_num = 2, .bucket_groups = {{0, {0}}, {4, {4}}}};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -115,10 +121,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase3)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0}, {4}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{3}, {-1}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
size_t rows_sum = 3;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -132,6 +138,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase3)
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase4)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1"};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -140,10 +148,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase4)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{-1}, {-1}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{-1}, {-1}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 0;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -157,6 +165,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase4)
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase5)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
// first table containing buckets [0, 1, 2, 3, 5], max bucket number is 8
|
||||
// second table containing buckets [0, 1, 3], max bucket number is 4
|
||||
|
@ -165,12 +175,12 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase5)
|
|||
auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 5, .part_num = 5, .bucket_groups = {{1, {1, 5}}, {3, {3}}}};
|
||||
payload_on_worker.insert({hosts[1], std::move(p2)});
|
||||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0, 2}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {3}, {-1}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {3}}});
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 8;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -184,6 +194,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase5)
|
|||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase6)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3, .bucket_groups = {{0, {0}}, {2, {2}}}};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -192,10 +204,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase6)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {-1}}});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 12;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -205,6 +217,46 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase6)
|
|||
checkDistributeBucketResultMap(expected_result, result, hosts);
|
||||
}
|
||||
|
||||
/// extreme uneven case
|
||||
TEST(NodeSelectorTest, divideSourceTaskByBucketCase7)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{
|
||||
DB::AddressInfo("host1", 0, "", "", 0),
|
||||
DB::AddressInfo("host2", 0, "", "", 0),
|
||||
DB::AddressInfo("host3", 0, "", "", 0),
|
||||
DB::AddressInfo("host4", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"),
|
||||
DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"),
|
||||
DB::WorkerNode(hosts[2], Coordination::NodeType::Remote, "3"),
|
||||
DB::WorkerNode(hosts[3], Coordination::NodeType::Remote, "4")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1, .bucket_groups = {{0, {0}}}};
|
||||
payload_on_worker.insert({workers[0].address, std::move(p1)});
|
||||
auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 1, .part_num = 1, .bucket_groups = {{1, {1}}}};
|
||||
payload_on_worker.insert({workers[1].address, std::move(p2)});
|
||||
auto p3 = DB::SourceTaskPayloadOnWorker{.worker_id = "3", .rows = 1, .part_num = 1, .bucket_groups = {{2, {2}}}};
|
||||
payload_on_worker.insert({workers[2].address, std::move(p3)});
|
||||
auto p4
|
||||
= DB::SourceTaskPayloadOnWorker{.worker_id = "4", .rows = 30, .part_num = 30, .bucket_groups = {{3, {3}}, {7, {7}}, {11, {11}}}};
|
||||
payload_on_worker.insert({workers[3].address, std::move(p4)});
|
||||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.buckets_on_workers.insert({hosts[0], {{0}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[1], {{1}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[2], {{2}}});
|
||||
expected_result.buckets_on_workers.insert({hosts[3], {{3, 7, 11}}});
|
||||
expected_result.worker_nodes.emplace_back(workers[3]);
|
||||
expected_result.worker_nodes.emplace_back(workers[2]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 33;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
DB::NodeSelectorResult result;
|
||||
Coordination::divideSourceTaskByBucket(payload_on_worker, rows_sum, parallel_size, result);
|
||||
checkDistributeBucketResultMap(expected_result, result, hosts);
|
||||
}
|
||||
|
||||
std::string formatDistributedPartsResultMap(const std::multimap<size_t, size_t> & map)
|
||||
{
|
||||
std::stringstream ss;
|
||||
|
@ -232,18 +284,20 @@ void checkDistributePartsResultMap(
|
|||
TEST(NodeSelectorTest, divideTaskByPartTestCase1)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 2, .part_num = 2};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 3, .part_num = 3};
|
||||
payload_on_worker.insert({hosts[1], std::move(p2)});
|
||||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 1});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 3});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 2});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 2});
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 5;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -257,6 +311,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase1)
|
|||
TEST(NodeSelectorTest, divideTaskByPartTestCase2)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 7, .part_num = 7};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -265,10 +321,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase2)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 3});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 1});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 9;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -282,18 +338,20 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase2)
|
|||
TEST(NodeSelectorTest, divideTaskByPartTestCase3)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 2, .part_num = 2};
|
||||
payload_on_worker.insert({hosts[1], std::move(p2)});
|
||||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 1});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 3});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 2});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 2});
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 3;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -307,6 +365,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase3)
|
|||
TEST(NodeSelectorTest, divideTaskByPartTestCase4)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1"};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -315,10 +375,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase4)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 2});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 2});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 0;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -332,6 +392,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase4)
|
|||
TEST(NodeSelectorTest, divideTaskByPartTestCase5)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3};
|
||||
payload_on_worker.insert({hosts[0], std::move(p1)});
|
||||
|
@ -340,10 +402,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase5)
|
|||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 2});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 2});
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"));
|
||||
expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"));
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 12;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
|
@ -353,6 +415,45 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase5)
|
|||
checkDistributePartsResultMap(expected_result, result, hosts);
|
||||
}
|
||||
|
||||
/// extreme uneven case
|
||||
TEST(NodeSelectorTest, divideTaskByPartTestCase6)
|
||||
{
|
||||
std::vector<DB::AddressInfo> hosts{
|
||||
DB::AddressInfo("host1", 0, "", "", 0),
|
||||
DB::AddressInfo("host2", 0, "", "", 0),
|
||||
DB::AddressInfo("host3", 0, "", "", 0),
|
||||
DB::AddressInfo("host4", 0, "", "", 0)};
|
||||
std::vector<DB::WorkerNode> workers{
|
||||
DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"),
|
||||
DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"),
|
||||
DB::WorkerNode(hosts[2], Coordination::NodeType::Remote, "3"),
|
||||
DB::WorkerNode(hosts[3], Coordination::NodeType::Remote, "4")};
|
||||
std::unordered_map<DB::AddressInfo, DB::SourceTaskPayloadOnWorker, DB::AddressInfo::Hash> payload_on_worker;
|
||||
auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1};
|
||||
payload_on_worker.insert({workers[0].address, std::move(p1)});
|
||||
auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 1, .part_num = 1};
|
||||
payload_on_worker.insert({workers[1].address, std::move(p2)});
|
||||
auto p3 = DB::SourceTaskPayloadOnWorker{.worker_id = "3", .rows = 1, .part_num = 1};
|
||||
payload_on_worker.insert({workers[2].address, std::move(p3)});
|
||||
auto p4 = DB::SourceTaskPayloadOnWorker{.worker_id = "4", .rows = 30, .part_num = 30};
|
||||
payload_on_worker.insert({workers[3].address, std::move(p4)});
|
||||
DB::NodeSelectorResult expected_result;
|
||||
expected_result.source_task_count_on_workers.insert({hosts[0], 1});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[1], 1});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[2], 1});
|
||||
expected_result.source_task_count_on_workers.insert({hosts[3], 1});
|
||||
expected_result.worker_nodes.emplace_back(workers[3]);
|
||||
expected_result.worker_nodes.emplace_back(workers[2]);
|
||||
expected_result.worker_nodes.emplace_back(workers[1]);
|
||||
expected_result.worker_nodes.emplace_back(workers[0]);
|
||||
size_t rows_sum = 33;
|
||||
size_t parallel_size = 4;
|
||||
|
||||
DB::NodeSelectorResult result;
|
||||
Coordination::divideSourceTaskByPart(payload_on_worker, rows_sum, parallel_size, result);
|
||||
checkDistributePartsResultMap(expected_result, result, hosts);
|
||||
}
|
||||
|
||||
std::string printPartitionCoalescingExpectedResult(const std::map<DB::PlanSegmentInstanceId, std::vector<UInt32>> & result)
|
||||
{
|
||||
std::stringstream ss;
|
||||
|
|
Loading…
Reference in New Issue