diff --git a/src/Interpreters/NodeSelector.cpp b/src/Interpreters/NodeSelector.cpp index 787f0afe53..f40b2e6a62 100644 --- a/src/Interpreters/NodeSelector.cpp +++ b/src/Interpreters/NodeSelector.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -292,8 +293,43 @@ NodeSelectorResult LocalNodeSelector::select(PlanSegment *, ContextPtr query_con return result; } +/// workers will be assigning tasks by the order of its data size +std::vector orderAddrByRows(std::unordered_map & payloads) +{ + std::vector ordered_addrs; + ordered_addrs.reserve(payloads.size()); + for (const auto & p : payloads) + ordered_addrs.emplace_back(&p.first); + /// order workers by weight + std::sort(ordered_addrs.begin(), ordered_addrs.end(), [&](auto * l, auto * r) { return payloads[*l].rows > payloads[*r].rows; }); + return ordered_addrs; +} + +/// assign one task for each non-empty worker +size_t initNodeSelectorResult( + const std::vector & ordered_addrs, + std::unordered_map & payloads, + size_t parallel_size, + NodeSelectorResult & result, + std::function init_source_func) +{ + size_t assigned_instances = 0; + for (const auto * addr_p : ordered_addrs) + { + const auto & addr = *addr_p; + const auto & payload_on_worker = payloads[addr]; + if (payload_on_worker.rows > 0 && parallel_size > 0) + { + init_source_func(addr); + result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, payload_on_worker.worker_id)); + assigned_instances++; + } + } + return assigned_instances; +} + void divideSourceTaskByBucket( - const std::unordered_map & payloads, + std::unordered_map & payloads, size_t weight_sum, size_t parallel_size, NodeSelectorResult & result) @@ -303,13 +339,19 @@ void divideSourceTaskByBucket( ErrorCodes::LOGICAL_ERROR, fmt::format("Invalid argument for divideSourceTaskByBucket payloads.size:{} parallel_size:{}", payloads.size(), parallel_size)); - size_t assigned_instances = 0; + std::vector ordered_addrs = orderAddrByRows(payloads); + size_t assigned_instances = initNodeSelectorResult(ordered_addrs, payloads, parallel_size, result, [&](const AddressInfo & addr) { + result.buckets_on_workers[addr].emplace_back(std::set{}); + }); + while (assigned_instances < parallel_size) { - for (const auto & [addr, payload_on_worker] : payloads) + for (const auto & addr_p : ordered_addrs) { - size_t weight = payload_on_worker.rows; - size_t to_be_assigned = weight_sum == 0 ? 0 : weight * parallel_size / weight_sum; + const auto & addr = *addr_p; + const auto & payload_on_worker = payloads.find(addr)->second; + + size_t to_be_assigned = weight_sum == 0 ? 0 : payload_on_worker.rows * parallel_size / weight_sum; /// to_be_assigned <= bucket_groups.size, as to avoid empty plan segment instance. to_be_assigned = std::min(to_be_assigned, payload_on_worker.bucket_groups.size()); size_t already_assigned = std::min(to_be_assigned, result.buckets_on_workers[addr].size()); @@ -357,7 +399,7 @@ void divideSourceTaskByBucket( } void divideSourceTaskByPart( - const std::unordered_map & payloads, + std::unordered_map & payloads, size_t weight_sum, size_t parallel_size, NodeSelectorResult & result) @@ -367,25 +409,29 @@ void divideSourceTaskByPart( ErrorCodes::LOGICAL_ERROR, fmt::format("Invalid argument for divideSourceTaskByPart payloads.size:{} parallel_size:{}", payloads.size(), parallel_size)); - size_t assigned_instances = 0; + std::vector ordered_addrs = orderAddrByRows(payloads); + size_t assigned_instances = initNodeSelectorResult( + ordered_addrs, payloads, parallel_size, result, [&](const AddressInfo & addr) { result.source_task_count_on_workers[addr]++; }); + while (assigned_instances < parallel_size) { - for (auto iter = payloads.begin(); iter != payloads.end() && assigned_instances < parallel_size; iter++) + for (const auto & addr_p : ordered_addrs) { - const auto & addr = iter->first; - const auto & payload_on_worker = iter->second; - size_t weight = payload_on_worker.rows; - size_t to_be_assigned = weight_sum == 0 ? 0 : weight * parallel_size / weight_sum; + const auto & addr = *addr_p; + const auto & payload_on_worker = payloads.find(addr)->second; + + size_t to_be_assigned = weight_sum == 0 ? 0 : payload_on_worker.rows * parallel_size / weight_sum; /// to_be_assigned <= part num, as to avoid empty plan segment instance. to_be_assigned = std::min(to_be_assigned, payload_on_worker.part_num); size_t already_assigned = std::min(to_be_assigned, result.source_task_count_on_workers[addr]); to_be_assigned = to_be_assigned - already_assigned; /// make sure there is no infinte loop to_be_assigned = std::max(1UL, to_be_assigned); + for (size_t p = 0; p < to_be_assigned && assigned_instances < parallel_size; p++) { result.source_task_count_on_workers[addr]++; - result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, iter->second.worker_id)); + result.worker_nodes.emplace_back(WorkerNode(addr, NodeType::Remote, payload_on_worker.worker_id)); assigned_instances++; } } @@ -452,65 +498,6 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co worker_number); } - /// will be obsolete in the future - auto old_func = [&](std::unordered_map & payload_on_workers, - size_t rows_count, - size_t parallel_size) { - size_t avg = rows_count / parallel_size + 1; - if (rows_count < parallel_size) - rows_count = 0; - if (rows_count > 0) - { - // Assign parallelism accroding to regular average size. - for (auto & [addr, payload] : payload_on_workers) - { - size_t s = payload.rows; - size_t p = s / avg; - if (p > 0) - { - s = s % avg; - } - if (p == 0 && s > 0) - { - p = 1; - s = 0; - } - for (size_t i = 0; i < p; i++) - { - result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id); - result.source_task_count_on_workers[addr]++; - } - } - } - // Assign parallelism according to major part(>0.5) of average size, if needed. - if (result.worker_nodes.size() < parallel_size) - { - for (auto & [addr, payload] : payload_on_workers) - { - size_t s = payload.rows; - if (s * 2 > avg) - { - result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id); - result.source_task_count_on_workers[addr]++; - s = 0; - } - if (result.worker_nodes.size() == parallel_size) - break; - } - } - // Assign parallelism to each worker until no one left. - while (result.worker_nodes.size() < parallel_size) - { - for (const auto & [addr, payload] : payload_on_workers) - { - result.worker_nodes.emplace_back(addr, NodeType::Remote, payload.worker_id); - result.source_task_count_on_workers[addr]++; - if (result.worker_nodes.size() == parallel_size) - break; - } - } - }; - // If parallelism is greater than the worker number, we split the parts according to the input size. if (plan_segment_ptr->getParallelSize() > worker_number) { @@ -560,7 +547,7 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co if (is_bucket_valid) divideSourceTaskByBucket(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize(), result); else - old_func(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize()); + divideSourceTaskByPart(payload_on_workers, rows_count, plan_segment_ptr->getParallelSize(), result); } else { diff --git a/src/Interpreters/NodeSelector.h b/src/Interpreters/NodeSelector.h index d75c7c17a8..f1d5b03e8f 100644 --- a/src/Interpreters/NodeSelector.h +++ b/src/Interpreters/NodeSelector.h @@ -91,12 +91,12 @@ struct ClusterNodes struct NodeSelectorResult; void divideSourceTaskByBucket( - const std::unordered_map & payloads, + std::unordered_map & payloads, size_t weight_sum, size_t parallel_size, NodeSelectorResult & result); void divideSourceTaskByPart( - const std::unordered_map & payloads, + std::unordered_map & payloads, size_t weight_sum, size_t parallel_size, NodeSelectorResult & result); diff --git a/src/Interpreters/tests/gtest_node_selector.cpp b/src/Interpreters/tests/gtest_node_selector.cpp index d5e2f5ee2d..d75b26e805 100644 --- a/src/Interpreters/tests/gtest_node_selector.cpp +++ b/src/Interpreters/tests/gtest_node_selector.cpp @@ -53,6 +53,8 @@ void checkDistributeBucketResultMap( TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3, .bucket_groups = {{0, {0}}, {2, {2}}, {4, {4}}}}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -61,10 +63,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1) DB::NodeSelectorResult expected_result; expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2, 4}}}); expected_result.buckets_on_workers.insert({hosts[1], {{1}, {3, 5}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 6; size_t parallel_size = 4; @@ -78,6 +80,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketTestCase1) TEST(NodeSelectorTest, divideSourceTaskByBucketCase2) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{ .worker_id = "1", @@ -90,10 +94,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase2) DB::NodeSelectorResult expected_result; expected_result.buckets_on_workers.insert({hosts[0], {{0, 2}, {4, 6}, {8, 10, 12}}}); expected_result.buckets_on_workers.insert({hosts[1], {{1, 3}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 9; size_t parallel_size = 4; @@ -107,6 +111,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase2) TEST(NodeSelectorTest, divideSourceTaskByBucketCase3) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 2, .part_num = 2, .bucket_groups = {{0, {0}}, {4, {4}}}}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -115,10 +121,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase3) DB::NodeSelectorResult expected_result; expected_result.buckets_on_workers.insert({hosts[0], {{0}, {4}}}); expected_result.buckets_on_workers.insert({hosts[1], {{3}, {-1}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); size_t rows_sum = 3; size_t parallel_size = 4; @@ -132,6 +138,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase3) TEST(NodeSelectorTest, divideSourceTaskByBucketCase4) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1"}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -140,10 +148,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase4) DB::NodeSelectorResult expected_result; expected_result.buckets_on_workers.insert({hosts[0], {{-1}, {-1}}}); expected_result.buckets_on_workers.insert({hosts[1], {{-1}, {-1}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 0; size_t parallel_size = 4; @@ -157,6 +165,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase4) TEST(NodeSelectorTest, divideSourceTaskByBucketCase5) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; // first table containing buckets [0, 1, 2, 3, 5], max bucket number is 8 // second table containing buckets [0, 1, 3], max bucket number is 4 @@ -165,12 +175,12 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase5) auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 5, .part_num = 5, .bucket_groups = {{1, {1, 5}}, {3, {3}}}}; payload_on_worker.insert({hosts[1], std::move(p2)}); DB::NodeSelectorResult expected_result; - expected_result.buckets_on_workers.insert({hosts[0], {{0, 2}}}); - expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {3}, {-1}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); + expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2}}}); + expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {3}}}); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 8; size_t parallel_size = 4; @@ -184,6 +194,8 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase5) TEST(NodeSelectorTest, divideSourceTaskByBucketCase6) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3, .bucket_groups = {{0, {0}}, {2, {2}}}}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -192,10 +204,10 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase6) DB::NodeSelectorResult expected_result; expected_result.buckets_on_workers.insert({hosts[0], {{0}, {2}}}); expected_result.buckets_on_workers.insert({hosts[1], {{1, 5}, {-1}}}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 12; size_t parallel_size = 4; @@ -205,6 +217,46 @@ TEST(NodeSelectorTest, divideSourceTaskByBucketCase6) checkDistributeBucketResultMap(expected_result, result, hosts); } +/// extreme uneven case +TEST(NodeSelectorTest, divideSourceTaskByBucketCase7) +{ + std::vector hosts{ + DB::AddressInfo("host1", 0, "", "", 0), + DB::AddressInfo("host2", 0, "", "", 0), + DB::AddressInfo("host3", 0, "", "", 0), + DB::AddressInfo("host4", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), + DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"), + DB::WorkerNode(hosts[2], Coordination::NodeType::Remote, "3"), + DB::WorkerNode(hosts[3], Coordination::NodeType::Remote, "4")}; + std::unordered_map payload_on_worker; + auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1, .bucket_groups = {{0, {0}}}}; + payload_on_worker.insert({workers[0].address, std::move(p1)}); + auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 1, .part_num = 1, .bucket_groups = {{1, {1}}}}; + payload_on_worker.insert({workers[1].address, std::move(p2)}); + auto p3 = DB::SourceTaskPayloadOnWorker{.worker_id = "3", .rows = 1, .part_num = 1, .bucket_groups = {{2, {2}}}}; + payload_on_worker.insert({workers[2].address, std::move(p3)}); + auto p4 + = DB::SourceTaskPayloadOnWorker{.worker_id = "4", .rows = 30, .part_num = 30, .bucket_groups = {{3, {3}}, {7, {7}}, {11, {11}}}}; + payload_on_worker.insert({workers[3].address, std::move(p4)}); + DB::NodeSelectorResult expected_result; + expected_result.buckets_on_workers.insert({hosts[0], {{0}}}); + expected_result.buckets_on_workers.insert({hosts[1], {{1}}}); + expected_result.buckets_on_workers.insert({hosts[2], {{2}}}); + expected_result.buckets_on_workers.insert({hosts[3], {{3, 7, 11}}}); + expected_result.worker_nodes.emplace_back(workers[3]); + expected_result.worker_nodes.emplace_back(workers[2]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + size_t rows_sum = 33; + size_t parallel_size = 4; + + DB::NodeSelectorResult result; + Coordination::divideSourceTaskByBucket(payload_on_worker, rows_sum, parallel_size, result); + checkDistributeBucketResultMap(expected_result, result, hosts); +} + std::string formatDistributedPartsResultMap(const std::multimap & map) { std::stringstream ss; @@ -232,18 +284,20 @@ void checkDistributePartsResultMap( TEST(NodeSelectorTest, divideTaskByPartTestCase1) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 2, .part_num = 2}; payload_on_worker.insert({hosts[0], std::move(p1)}); auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 3, .part_num = 3}; payload_on_worker.insert({hosts[1], std::move(p2)}); DB::NodeSelectorResult expected_result; - expected_result.source_task_count_on_workers.insert({hosts[0], 1}); - expected_result.source_task_count_on_workers.insert({hosts[1], 3}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); + expected_result.source_task_count_on_workers.insert({hosts[0], 2}); + expected_result.source_task_count_on_workers.insert({hosts[1], 2}); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 5; size_t parallel_size = 4; @@ -257,6 +311,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase1) TEST(NodeSelectorTest, divideTaskByPartTestCase2) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 7, .part_num = 7}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -265,10 +321,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase2) DB::NodeSelectorResult expected_result; expected_result.source_task_count_on_workers.insert({hosts[0], 3}); expected_result.source_task_count_on_workers.insert({hosts[1], 1}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 9; size_t parallel_size = 4; @@ -282,18 +338,20 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase2) TEST(NodeSelectorTest, divideTaskByPartTestCase3) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1}; payload_on_worker.insert({hosts[0], std::move(p1)}); auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 2, .part_num = 2}; payload_on_worker.insert({hosts[1], std::move(p2)}); DB::NodeSelectorResult expected_result; - expected_result.source_task_count_on_workers.insert({hosts[0], 1}); - expected_result.source_task_count_on_workers.insert({hosts[1], 3}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); + expected_result.source_task_count_on_workers.insert({hosts[0], 2}); + expected_result.source_task_count_on_workers.insert({hosts[1], 2}); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 3; size_t parallel_size = 4; @@ -307,6 +365,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase3) TEST(NodeSelectorTest, divideTaskByPartTestCase4) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1"}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -315,10 +375,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase4) DB::NodeSelectorResult expected_result; expected_result.source_task_count_on_workers.insert({hosts[0], 2}); expected_result.source_task_count_on_workers.insert({hosts[1], 2}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 0; size_t parallel_size = 4; @@ -332,6 +392,8 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase4) TEST(NodeSelectorTest, divideTaskByPartTestCase5) { std::vector hosts{DB::AddressInfo("host1", 0, "", "", 0), DB::AddressInfo("host2", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")}; std::unordered_map payload_on_worker; auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 3, .part_num = 3}; payload_on_worker.insert({hosts[0], std::move(p1)}); @@ -340,10 +402,10 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase5) DB::NodeSelectorResult expected_result; expected_result.source_task_count_on_workers.insert({hosts[0], 2}); expected_result.source_task_count_on_workers.insert({hosts[1], 2}); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2")); - expected_result.worker_nodes.emplace_back(DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1")); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); size_t rows_sum = 12; size_t parallel_size = 4; @@ -353,6 +415,45 @@ TEST(NodeSelectorTest, divideTaskByPartTestCase5) checkDistributePartsResultMap(expected_result, result, hosts); } +/// extreme uneven case +TEST(NodeSelectorTest, divideTaskByPartTestCase6) +{ + std::vector hosts{ + DB::AddressInfo("host1", 0, "", "", 0), + DB::AddressInfo("host2", 0, "", "", 0), + DB::AddressInfo("host3", 0, "", "", 0), + DB::AddressInfo("host4", 0, "", "", 0)}; + std::vector workers{ + DB::WorkerNode(hosts[0], Coordination::NodeType::Remote, "1"), + DB::WorkerNode(hosts[1], Coordination::NodeType::Remote, "2"), + DB::WorkerNode(hosts[2], Coordination::NodeType::Remote, "3"), + DB::WorkerNode(hosts[3], Coordination::NodeType::Remote, "4")}; + std::unordered_map payload_on_worker; + auto p1 = DB::SourceTaskPayloadOnWorker{.worker_id = "1", .rows = 1, .part_num = 1}; + payload_on_worker.insert({workers[0].address, std::move(p1)}); + auto p2 = DB::SourceTaskPayloadOnWorker{.worker_id = "2", .rows = 1, .part_num = 1}; + payload_on_worker.insert({workers[1].address, std::move(p2)}); + auto p3 = DB::SourceTaskPayloadOnWorker{.worker_id = "3", .rows = 1, .part_num = 1}; + payload_on_worker.insert({workers[2].address, std::move(p3)}); + auto p4 = DB::SourceTaskPayloadOnWorker{.worker_id = "4", .rows = 30, .part_num = 30}; + payload_on_worker.insert({workers[3].address, std::move(p4)}); + DB::NodeSelectorResult expected_result; + expected_result.source_task_count_on_workers.insert({hosts[0], 1}); + expected_result.source_task_count_on_workers.insert({hosts[1], 1}); + expected_result.source_task_count_on_workers.insert({hosts[2], 1}); + expected_result.source_task_count_on_workers.insert({hosts[3], 1}); + expected_result.worker_nodes.emplace_back(workers[3]); + expected_result.worker_nodes.emplace_back(workers[2]); + expected_result.worker_nodes.emplace_back(workers[1]); + expected_result.worker_nodes.emplace_back(workers[0]); + size_t rows_sum = 33; + size_t parallel_size = 4; + + DB::NodeSelectorResult result; + Coordination::divideSourceTaskByPart(payload_on_worker, rows_sum, parallel_size, result); + checkDistributePartsResultMap(expected_result, result, hosts); +} + std::string printPartitionCoalescingExpectedResult(const std::map> & result) { std::stringstream ss;