From 6d46b036518b5665d1f1aa61d6a9b7616f1022f3 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Wed, 9 Feb 2022 22:22:56 -0800 Subject: [PATCH 01/14] Add some unit tests for DD team selection --- fdbserver/DDTeamCollection.actor.cpp | 566 +++++++++++++++++++++++++++ fdbserver/DataDistribution.actor.cpp | 248 ------------ 2 files changed, 566 insertions(+), 248 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index f81e81ed0e..f43842e1b2 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -3132,3 +3132,569 @@ Future DDTeamCollection::readStorageWiggleMap() { Future DDTeamCollection::readOrCreateStorageMetadata(TCServerInfo* server) { return DDTeamCollectionImpl::readOrCreateStorageMetadata(this, server); } + +std::unique_ptr testTeamCollection(int teamSize, + Reference policy, + int processCount) { + Database database = DatabaseContext::create( + makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); + + DatabaseConfiguration conf; + conf.storageTeamSize = teamSize; + conf.storagePolicy = policy; + + auto collection = + std::unique_ptr(new DDTeamCollection(database, + UID(0, 0), + MoveKeysLock(), + PromiseStream(), + makeReference(), + conf, + {}, + {}, + Future(Void()), + makeReference>(true), + IsPrimary::True, + makeReference>(false), + makeReference>(false), + PromiseStream(), + Promise(), + PromiseStream>())); + + for (int id = 1; id <= processCount; ++id) { + UID uid(id, 0); + StorageServerInterface interface; + interface.uniqueID = uid; + interface.locality.set(LiteralStringRef("machineid"), Standalone(std::to_string(id))); + interface.locality.set(LiteralStringRef("zoneid"), Standalone(std::to_string(id % 5))); + interface.locality.set(LiteralStringRef("data_hall"), Standalone(std::to_string(id % 3))); + collection->server_info[uid] = makeReference( + interface, collection.get(), ProcessClass(), true, collection->storageServerSet); + collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); + collection->checkAndCreateMachine(collection->server_info[uid]); + } + + return collection; +} + +std::unique_ptr testMachineTeamCollection(int teamSize, + Reference policy, + int processCount) { + Database database = DatabaseContext::create( + makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); + + DatabaseConfiguration conf; + conf.storageTeamSize = teamSize; + conf.storagePolicy = policy; + + auto collection = + std::unique_ptr(new DDTeamCollection(database, + UID(0, 0), + MoveKeysLock(), + PromiseStream(), + makeReference(), + conf, + {}, + {}, + Future(Void()), + makeReference>(true), + IsPrimary::True, + makeReference>(false), + makeReference>(false), + PromiseStream(), + Promise(), + PromiseStream>())); + + for (int id = 1; id <= processCount; id++) { + UID uid(id, 0); + StorageServerInterface interface; + interface.uniqueID = uid; + int process_id = id; + int dc_id = process_id / 1000; + int data_hall_id = process_id / 100; + int zone_id = process_id / 10; + int machine_id = process_id / 5; + + printf("testMachineTeamCollection: process_id:%d zone_id:%d machine_id:%d ip_addr:%s\n", + process_id, + zone_id, + machine_id, + interface.address().toString().c_str()); + interface.locality.set(LiteralStringRef("processid"), Standalone(std::to_string(process_id))); + interface.locality.set(LiteralStringRef("machineid"), Standalone(std::to_string(machine_id))); + interface.locality.set(LiteralStringRef("zoneid"), Standalone(std::to_string(zone_id))); + interface.locality.set(LiteralStringRef("data_hall"), Standalone(std::to_string(data_hall_id))); + interface.locality.set(LiteralStringRef("dcid"), Standalone(std::to_string(dc_id))); + collection->server_info[uid] = makeReference( + interface, collection.get(), ProcessClass(), true, collection->storageServerSet); + + collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); + } + + int totalServerIndex = collection->constructMachinesFromServers(); + printf("testMachineTeamCollection: construct machines for %d servers\n", totalServerIndex); + + return collection; +} + +TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") { + wait(Future(Void())); + + int teamSize = 3; // replication size + int processSize = 60; + int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + + Reference policy = Reference( + new PolicyAcross(teamSize, "zoneid", Reference(new PolicyOne()))); + state std::unique_ptr collection = testMachineTeamCollection(teamSize, policy, processSize); + + collection->addTeamsBestOf(30, desiredTeams, maxTeams); + + ASSERT(collection->sanityCheckTeams() == true); + + return Void(); +} + +TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") { + wait(Future(Void())); + + int teamSize = 3; // replication size + int processSize = 60; + int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + + Reference policy = Reference( + new PolicyAcross(teamSize, "zoneid", Reference(new PolicyOne()))); + state std::unique_ptr collection = testMachineTeamCollection(teamSize, policy, processSize); + + if (collection == nullptr) { + fprintf(stderr, "collection is null\n"); + return Void(); + } + + collection->addBestMachineTeams(30); // Create machine teams to help debug + collection->addTeamsBestOf(30, desiredTeams, maxTeams); + collection->sanityCheckTeams(); // Server team may happen to be on the same machine team, although unlikely + + return Void(); +} + +TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") { + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 10; + state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + state std::unique_ptr collection = testTeamCollection(3, policy, processSize); + + int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams); + + // The maximum number of available server teams without considering machine locality is 120 + // The maximum number of available server teams with machine locality constraint is 120 - 40, because + // the 40 (5*4*2) server teams whose servers come from the same machine are invalid. + ASSERT(result == 80); + + return Void(); +} + +TEST_CASE("/DataDistribution/AddAllTeams/withLimit") { + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 10; + state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + + state std::unique_ptr collection = testTeamCollection(3, policy, processSize); + + int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams); + + ASSERT(result >= 10); + + return Void(); +} + +TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") { + wait(Future(Void())); + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 10; + state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + state int teamSize = 3; + // state int targetTeamsPerServer = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (teamSize + 1) / 2; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(1, 0), UID(3, 0), UID(4, 0) }), true); + + state int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams); + + ASSERT(result >= 8); + + for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) { + auto teamCount = process->second->teams.size(); + ASSERT(teamCount >= 1); + // ASSERT(teamCount <= targetTeamsPerServer); + } + + return Void(); +} + +// Due to the randomness in choosing the machine team and the server team from the machine team, it is possible that +// we may not find the remaining several (e.g., 1 or 2) available teams. +// It is hard to conclude what is the minimum number of teams the addTeamsBestOf() should create in this situation. +TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; + state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(1, 0), UID(3, 0), UID(4, 0) }), true); + + collection->addBestMachineTeams(10); + int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams); + + if (collection->machineTeams.size() != 10 || result != 8) { + collection->traceAllInfo(true); // Debug message + } + + // NOTE: Due to the pure randomness in selecting a machine for a machine team, + // we cannot guarantee that all machine teams are created. + // When we chnage the selectReplicas function to achieve such guarantee, we can enable the following ASSERT + ASSERT(collection->machineTeams.size() == 10); // Should create all machine teams + + // We need to guarantee a server always have at least a team so that the server can participate in data distribution + for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) { + auto teamCount = process->second->teams.size(); + ASSERT(teamCount >= 1); + } + + // If we find all available teams, result will be 8 because we prebuild 2 teams + ASSERT(result == 8); + + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply mid_avail; + mid_avail.capacity.bytes = 1000 * 1024 * 1024; + mid_avail.available.bytes = 400 * 1024 * 1024; + mid_avail.load.bytes = 100 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 1000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + + bool wantsNewServers = false; + bool wantsTrueBest = true; + bool preferLowerUtilization = true; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0)}; + ASSERT(resTeam.first.present()); + auto servers = resTeam.first.get()->getServerIDs(); + const std::set selectedServers(servers.begin(), servers.end()); + ASSERT(expectedServers == selectedServers); + + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply mid_avail; + mid_avail.capacity.bytes = 1000 * 1024 * 1024; + mid_avail.available.bytes = 400 * 1024 * 1024; + mid_avail.load.bytes = 100 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 1000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + ServerStatus server1status = collection->server_status.get(UID(1, 0)); + server1status.isUndesired = true; + collection->server_status.set(UID(1, 0), server1status); + + bool wantsNewServers = false; + bool wantsTrueBest = true; + bool preferLowerUtilization = true; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0), UID(4, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + collection->traceAllInfo(); + collection->traceServerInfo(); + collection->traceServerTeamInfo(); + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + ASSERT(resTeam.first.present()); + auto servers = resTeam.first.get()->getServerIDs(); + const std::set selectedServers(servers.begin(), servers.end()); + + ASSERT(expectedServers == selectedServers); + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply mid_avail; + mid_avail.capacity.bytes = 1000 * 1024 * 1024; + mid_avail.available.bytes = 400 * 1024 * 1024; + mid_avail.load.bytes = 100 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 1000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + + bool wantsNewServers = true; + bool wantsTrueBest = true; + bool preferLowerUtilization = true; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + ASSERT(resTeam.first.present()); + auto servers = resTeam.first.get()->getServerIDs(); + const std::set selectedServers(servers.begin(), servers.end()); + ASSERT(expectedServers == selectedServers); + + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply mid_avail; + mid_avail.capacity.bytes = 1000 * 1024 * 1024; + mid_avail.available.bytes = 400 * 1024 * 1024; + mid_avail.load.bytes = 100 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 1000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + + bool wantsNewServers = true; + bool wantsTrueBest = true; + bool preferLowerUtilization = false; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0)}; + ASSERT(resTeam.first.present()); + auto servers = resTeam.first.get()->getServerIDs(); + const std::set selectedServers(servers.begin(), servers.end()); + ASSERT(expectedServers == selectedServers); + + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply low_avail; + low_avail.capacity.bytes = 2000 * 1024 * 1024; + low_avail.available.bytes = 50 * 1024 * 1024; + low_avail.load.bytes = 90 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 2000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = low_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = low_avail; + ServerStatus server1status = collection->server_status.get(UID(1, 0)); + server1status.isUndesired = true; + collection->server_status.set(UID(1, 0), server1status); + + bool wantsNewServers = true; + bool wantsTrueBest = true; + bool preferLowerUtilization = true; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + ASSERT(!resTeam.first.present()); + + return Void(); +} + +TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { + wait(Future(Void())); + + Reference policy = + Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); + state int processSize = 5; + state int teamSize = 3; + state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); + + GetStorageMetricsReply low_avail; + low_avail.capacity.bytes = 2000 * 1024 * 1024; + low_avail.available.bytes = 150 * 1024 * 1024; + low_avail.load.bytes = 90 * 1024 * 1024; + + GetStorageMetricsReply high_avail; + high_avail.capacity.bytes = 2000 * 1024 * 1024; + high_avail.available.bytes = 800 * 1024 * 1024; + high_avail.load.bytes = 90 * 1024 * 1024; + + collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); + collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); + collection->addTeam(std::set({ UID(3, 0), UID(4, 0), UID(5, 0) }), true); + collection->doBuildTeams = false; + + collection->server_info[UID(1, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = low_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = low_avail; + collection->server_info[UID(5, 0)]->serverMetrics = high_avail; + ServerStatus server1status = collection->server_status.get(UID(1, 0)); + server1status.isUndesired = true; + collection->server_status.set(UID(1, 0), server1status); + + bool wantsNewServers = true; + bool wantsTrueBest = true; + bool preferLowerUtilization = true; + bool teamMustHaveShards = false; + std::vector completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) }; + + state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); + req.completeSources = completeSources; + + Future a = collection->getTeam(req); + wait(a); + + std::pair>, bool> resTeam = req.reply.getFuture().get(); + + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + ASSERT(resTeam.first.present()); + auto servers = resTeam.first.get()->getServerIDs(); + const std::set selectedServers(servers.begin(), servers.end()); + ASSERT(expectedServers == selectedServers); + + return Void(); +} \ No newline at end of file diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 47811392b8..0e0e5b66e3 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1603,251 +1603,3 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference testTeamCollection(int teamSize, - Reference policy, - int processCount) { - Database database = DatabaseContext::create( - makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); - - DatabaseConfiguration conf; - conf.storageTeamSize = teamSize; - conf.storagePolicy = policy; - - auto collection = - std::unique_ptr(new DDTeamCollection(database, - UID(0, 0), - MoveKeysLock(), - PromiseStream(), - makeReference(), - conf, - {}, - {}, - Future(Void()), - makeReference>(true), - IsPrimary::True, - makeReference>(false), - makeReference>(false), - PromiseStream(), - Promise(), - PromiseStream>())); - - for (int id = 1; id <= processCount; ++id) { - UID uid(id, 0); - StorageServerInterface interface; - interface.uniqueID = uid; - interface.locality.set(LiteralStringRef("machineid"), Standalone(std::to_string(id))); - interface.locality.set(LiteralStringRef("zoneid"), Standalone(std::to_string(id % 5))); - interface.locality.set(LiteralStringRef("data_hall"), Standalone(std::to_string(id % 3))); - collection->server_info[uid] = makeReference( - interface, collection.get(), ProcessClass(), true, collection->storageServerSet); - collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); - collection->checkAndCreateMachine(collection->server_info[uid]); - } - - return collection; -} - -std::unique_ptr testMachineTeamCollection(int teamSize, - Reference policy, - int processCount) { - Database database = DatabaseContext::create( - makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); - - DatabaseConfiguration conf; - conf.storageTeamSize = teamSize; - conf.storagePolicy = policy; - - auto collection = - std::unique_ptr(new DDTeamCollection(database, - UID(0, 0), - MoveKeysLock(), - PromiseStream(), - makeReference(), - conf, - {}, - {}, - Future(Void()), - makeReference>(true), - IsPrimary::True, - makeReference>(false), - makeReference>(false), - PromiseStream(), - Promise(), - PromiseStream>())); - - for (int id = 1; id <= processCount; id++) { - UID uid(id, 0); - StorageServerInterface interface; - interface.uniqueID = uid; - int process_id = id; - int dc_id = process_id / 1000; - int data_hall_id = process_id / 100; - int zone_id = process_id / 10; - int machine_id = process_id / 5; - - printf("testMachineTeamCollection: process_id:%d zone_id:%d machine_id:%d ip_addr:%s\n", - process_id, - zone_id, - machine_id, - interface.address().toString().c_str()); - interface.locality.set(LiteralStringRef("processid"), Standalone(std::to_string(process_id))); - interface.locality.set(LiteralStringRef("machineid"), Standalone(std::to_string(machine_id))); - interface.locality.set(LiteralStringRef("zoneid"), Standalone(std::to_string(zone_id))); - interface.locality.set(LiteralStringRef("data_hall"), Standalone(std::to_string(data_hall_id))); - interface.locality.set(LiteralStringRef("dcid"), Standalone(std::to_string(dc_id))); - collection->server_info[uid] = makeReference( - interface, collection.get(), ProcessClass(), true, collection->storageServerSet); - - collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); - } - - int totalServerIndex = collection->constructMachinesFromServers(); - printf("testMachineTeamCollection: construct machines for %d servers\n", totalServerIndex); - - return collection; -} - -TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") { - wait(Future(Void())); - - int teamSize = 3; // replication size - int processSize = 60; - int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - - Reference policy = Reference( - new PolicyAcross(teamSize, "zoneid", Reference(new PolicyOne()))); - state std::unique_ptr collection = testMachineTeamCollection(teamSize, policy, processSize); - - collection->addTeamsBestOf(30, desiredTeams, maxTeams); - - ASSERT(collection->sanityCheckTeams() == true); - - return Void(); -} - -TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") { - wait(Future(Void())); - - int teamSize = 3; // replication size - int processSize = 60; - int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - - Reference policy = Reference( - new PolicyAcross(teamSize, "zoneid", Reference(new PolicyOne()))); - state std::unique_ptr collection = testMachineTeamCollection(teamSize, policy, processSize); - - if (collection == nullptr) { - fprintf(stderr, "collection is null\n"); - return Void(); - } - - collection->addBestMachineTeams(30); // Create machine teams to help debug - collection->addTeamsBestOf(30, desiredTeams, maxTeams); - collection->sanityCheckTeams(); // Server team may happen to be on the same machine team, although unlikely - - return Void(); -} - -TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") { - Reference policy = - Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); - state int processSize = 10; - state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - state std::unique_ptr collection = testTeamCollection(3, policy, processSize); - - int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams); - - // The maximum number of available server teams without considering machine locality is 120 - // The maximum number of available server teams with machine locality constraint is 120 - 40, because - // the 40 (5*4*2) server teams whose servers come from the same machine are invalid. - ASSERT(result == 80); - - return Void(); -} - -TEST_CASE("/DataDistribution/AddAllTeams/withLimit") { - Reference policy = - Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); - state int processSize = 10; - state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - - state std::unique_ptr collection = testTeamCollection(3, policy, processSize); - - int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams); - - ASSERT(result >= 10); - - return Void(); -} - -TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") { - wait(Future(Void())); - Reference policy = - Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); - state int processSize = 10; - state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - state int teamSize = 3; - // state int targetTeamsPerServer = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (teamSize + 1) / 2; - state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - - collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); - collection->addTeam(std::set({ UID(1, 0), UID(3, 0), UID(4, 0) }), true); - - state int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams); - - ASSERT(result >= 8); - - for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) { - auto teamCount = process->second->teams.size(); - ASSERT(teamCount >= 1); - // ASSERT(teamCount <= targetTeamsPerServer); - } - - return Void(); -} - -// Due to the randomness in choosing the machine team and the server team from the machine team, it is possible that -// we may not find the remaining several (e.g., 1 or 2) available teams. -// It is hard to conclude what is the minimum number of teams the addTeamsBestOf() should create in this situation. -TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") { - wait(Future(Void())); - - Reference policy = - Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); - state int processSize = 5; - state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize; - state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize; - state int teamSize = 3; - state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - - collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); - collection->addTeam(std::set({ UID(1, 0), UID(3, 0), UID(4, 0) }), true); - - collection->addBestMachineTeams(10); - int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams); - - if (collection->machineTeams.size() != 10 || result != 8) { - collection->traceAllInfo(true); // Debug message - } - - // NOTE: Due to the pure randomness in selecting a machine for a machine team, - // we cannot guarantee that all machine teams are created. - // When we chnage the selectReplicas function to achieve such guarantee, we can enable the following ASSERT - ASSERT(collection->machineTeams.size() == 10); // Should create all machine teams - - // We need to guarantee a server always have at least a team so that the server can participate in data distribution - for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) { - auto teamCount = process->second->teams.size(); - ASSERT(teamCount >= 1); - } - - // If we find all available teams, result will be 8 because we prebuild 2 teams - ASSERT(result == 8); - - return Void(); -} From ab34d0553dd4246e7dcdbb18a009ad36ef27cb73 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Wed, 9 Feb 2022 22:50:01 -0800 Subject: [PATCH 02/14] Add unit tests for DD to test the team selection logic --- tests/CMakeLists.txt | 1 + tests/rare/DDUnitTests.toml | 9 +++++++++ 2 files changed, 10 insertions(+) create mode 100644 tests/rare/DDUnitTests.toml diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 948e45d94c..afc0bcb5ca 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -188,6 +188,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/ConflictRangeRYOWCheck.toml) add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml) add_fdb_test(TEST_FILES rare/CycleWithKills.toml) + add_fdb_test(TEST_FILES rare/DDUnitTests.toml) add_fdb_test(TEST_FILES rare/FuzzTest.toml) add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml) add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml) diff --git a/tests/rare/DDUnitTests.toml b/tests/rare/DDUnitTests.toml new file mode 100644 index 0000000000..a4efb5ee06 --- /dev/null +++ b/tests/rare/DDUnitTests.toml @@ -0,0 +1,9 @@ +[[test]] +testTitle = 'UnitTests' +useDB = false +startDelay = 0 + + [[test.workload]] + testName = 'UnitTests' + #maxTestCases = 1 + testsMatching = '/DataDistribution/' From 0a1bcf7a29867afb80980d4d87de3fbbd1bb6c0f Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 10 Feb 2022 13:57:48 -0800 Subject: [PATCH 03/14] Log protocol version as a hex string rather than a uint64_t. --- fdbrpc/FlowTransport.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 26b6e89b87..6e4bb083a5 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -1205,8 +1205,8 @@ ACTOR static Future connectionReader(TransportData* transport, FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) { TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID()) .detail("Reason", "IncompatibleProtocolVersion") - .detail("LocalVersion", g_network->protocolVersion().version()) - .detail("RejectedVersion", pkt.protocolVersion.version()) + .detail("LocalVersion", g_network->protocolVersion()) + .detail("RejectedVersion", pkt.protocolVersion) .detail("Peer", pkt.canonicalRemotePort ? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort) From 5096a312ae97313c091dd8a453c96c0ad159e205 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Thu, 10 Feb 2022 14:56:54 -0800 Subject: [PATCH 04/14] Mark new DD unit tests as such in cmakelists --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index afc0bcb5ca..c5eff04fb4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -188,7 +188,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/ConflictRangeRYOWCheck.toml) add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml) add_fdb_test(TEST_FILES rare/CycleWithKills.toml) - add_fdb_test(TEST_FILES rare/DDUnitTests.toml) + add_fdb_test(TEST_FILES rare/DDUnitTests.toml UNIT) add_fdb_test(TEST_FILES rare/FuzzTest.toml) add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml) add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml) From 7790d57c06c7ef8156fe86169e7de9c88c2cb1dd Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Thu, 10 Feb 2022 18:52:40 -0800 Subject: [PATCH 05/14] Mask the use of the delay in getTeam and the buildTeams path in the DD unit test delay doesn't work as expected in the UT envionment --- fdbserver/DDTeamCollection.actor.cpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index f43842e1b2..ab99cea241 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -19,6 +19,7 @@ */ #include "fdbserver/DDTeamCollection.h" +#include "flow/Trace.h" #include "flow/actorcompiler.h" // This must be the last #include. FDB_DEFINE_BOOLEAN_PARAM(IsPrimary); @@ -3404,6 +3405,7 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -3455,14 +3457,13 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = high_avail; - ServerStatus server1status = collection->server_status.get(UID(1, 0)); - server1status.isUndesired = true; - collection->server_status.set(UID(1, 0), server1status); + collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); bool wantsNewServers = false; bool wantsTrueBest = true; @@ -3513,6 +3514,7 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -3564,6 +3566,7 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -3615,14 +3618,13 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = low_avail; - ServerStatus server1status = collection->server_status.get(UID(1, 0)); - server1status.isUndesired = true; - collection->server_status.set(UID(1, 0), server1status); + collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); bool wantsNewServers = true; bool wantsTrueBest = true; @@ -3666,15 +3668,14 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->addTeam(std::set({ UID(3, 0), UID(4, 0), UID(5, 0) }), true); collection->doBuildTeams = false; + collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = low_avail; collection->server_info[UID(5, 0)]->serverMetrics = high_avail; - ServerStatus server1status = collection->server_status.get(UID(1, 0)); - server1status.isUndesired = true; - collection->server_status.set(UID(1, 0), server1status); + collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); bool wantsNewServers = true; bool wantsTrueBest = true; From 83b7f1fcbf6273341c33168945f6da865a937a1d Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Thu, 10 Feb 2022 19:29:34 -0800 Subject: [PATCH 06/14] Add test descriptions for DD unit tests --- fdbserver/DDTeamCollection.actor.cpp | 38 +++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index ab99cea241..bad2211925 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -3410,7 +3410,14 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; collection->server_info[UID(3, 0)]->serverMetrics = high_avail; - collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + + /* + * Suppose 1, 2 and 3 are complete sources, i.e., they have all shards in + * the key range being considered for movement. If the caller says that they + * don't strictly need new servers and all of these servers are healthy, + * maintain status quo. + */ bool wantsNewServers = false; bool wantsTrueBest = true; @@ -3465,6 +3472,13 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->server_info[UID(4, 0)]->serverMetrics = high_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); + /* + * Suppose 1, 2, 3 and 4 are complete sources, i.e., they have all shards in + * the key range being considered for movement. If the caller says that they don't + * strictly need new servers but '1' is not healthy, see that the other team of + * complete sources is selected. + */ + bool wantsNewServers = false; bool wantsTrueBest = true; bool preferLowerUtilization = true; @@ -3516,6 +3530,11 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { collection->doBuildTeams = false; collection->checkTeamDelay = Void(); + /* + * Among server teams that have healthy space available, pick the team that is + * least utilized, if the caller says they preferLowerUtilization. + */ + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; collection->server_info[UID(3, 0)]->serverMetrics = high_avail; @@ -3573,6 +3592,11 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + /* + * Among server teams that have healthy space available, pick the team that is + * most utilized, if the caller says they don't preferLowerUtilization. + */ + bool wantsNewServers = true; bool wantsTrueBest = true; bool preferLowerUtilization = false; @@ -3626,6 +3650,12 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { collection->server_info[UID(4, 0)]->serverMetrics = low_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); + /* + * If the only available team is one where at least one server is low on + * space, decline to pick that team. Every server must have some minimum + * free space defined by the MIN_AVAILABLE_SPACE server knob. + */ + bool wantsNewServers = true; bool wantsTrueBest = true; bool preferLowerUtilization = true; @@ -3677,6 +3707,12 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { collection->server_info[UID(5, 0)]->serverMetrics = high_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); + /* + * If the only available team is one where all servers are low on space, + * test that each server has at least MIN_AVAILABLE_SPACE_RATIO (server knob) + * percentage points of capacity free before picking that team. + */ + bool wantsNewServers = true; bool wantsTrueBest = true; bool preferLowerUtilization = true; From df2f5c5709155466dec71785463783256d25dee5 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Thu, 10 Feb 2022 21:52:30 -0800 Subject: [PATCH 07/14] Make UT parameters sensitive to server-knobs --- fdbserver/DDTeamCollection.actor.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index bad2211925..c9dddf6071 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -3630,8 +3630,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); GetStorageMetricsReply low_avail; - low_avail.capacity.bytes = 2000 * 1024 * 1024; - low_avail.available.bytes = 50 * 1024 * 1024; + low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * 20; + low_avail.available.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE / 2; low_avail.load.bytes = 90 * 1024 * 1024; GetStorageMetricsReply high_avail; @@ -3685,8 +3685,13 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); GetStorageMetricsReply low_avail; - low_avail.capacity.bytes = 2000 * 1024 * 1024; - low_avail.available.bytes = 150 * 1024 * 1024; + if (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO > 0) { + /* Pick a capacity where MIN_AVAILABLE_SPACE_RATIO of the capacity would be higher than MIN_AVAILABLE_SPACE */ + low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * (2 / SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO); + } else { + low_avail.capacity.bytes = 2000 * 1024 * 1024; + } + low_avail.available.bytes = (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * 1.1) * low_avail.capacity.bytes; low_avail.load.bytes = 90 * 1024 * 1024; GetStorageMetricsReply high_avail; From 41bd39a82a31b1485a4964fba5f980660025f363 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Thu, 10 Feb 2022 22:10:50 -0800 Subject: [PATCH 08/14] Fix code formatting --- fdbserver/DDTeamCollection.actor.cpp | 76 ++++++++++++++-------------- fdbserver/DataDistribution.actor.cpp | 1 - 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 934663329e..8216211a94 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -5003,7 +5003,7 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); GetStorageMetricsReply mid_avail; @@ -5020,10 +5020,10 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; collection->checkTeamDelay = Void(); - + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; - collection->server_info[UID(2, 0)]->serverMetrics = high_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = high_avail; /* @@ -5041,13 +5041,13 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - + Future a = collection->getTeam(req); wait(a); std::pair>, bool> resTeam = req.reply.getFuture().get(); - std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0)}; + std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0) }; ASSERT(resTeam.first.present()); auto servers = resTeam.first.get()->getServerIDs(); const std::set selectedServers(servers.begin(), servers.end()); @@ -5062,7 +5062,7 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); GetStorageMetricsReply mid_avail; @@ -5079,11 +5079,11 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->doBuildTeams = false; collection->checkTeamDelay = Void(); - + collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; - collection->server_info[UID(2, 0)]->serverMetrics = high_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; - collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); /* @@ -5105,13 +5105,13 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->traceAllInfo(); collection->traceServerInfo(); collection->traceServerTeamInfo(); - + Future a = collection->getTeam(req); wait(a); std::pair>, bool> resTeam = req.reply.getFuture().get(); - std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) }; ASSERT(resTeam.first.present()); auto servers = resTeam.first.get()->getServerIDs(); const std::set selectedServers(servers.begin(), servers.end()); @@ -5126,9 +5126,9 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - + GetStorageMetricsReply mid_avail; mid_avail.capacity.bytes = 1000 * 1024 * 1024; mid_avail.available.bytes = 400 * 1024 * 1024; @@ -5150,9 +5150,9 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { */ collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; - collection->server_info[UID(2, 0)]->serverMetrics = high_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; - collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; bool wantsNewServers = true; bool wantsTrueBest = true; @@ -5162,13 +5162,13 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - + Future a = collection->getTeam(req); wait(a); std::pair>, bool> resTeam = req.reply.getFuture().get(); - std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) }; ASSERT(resTeam.first.present()); auto servers = resTeam.first.get()->getServerIDs(); const std::set selectedServers(servers.begin(), servers.end()); @@ -5183,9 +5183,9 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - + GetStorageMetricsReply mid_avail; mid_avail.capacity.bytes = 1000 * 1024 * 1024; mid_avail.available.bytes = 400 * 1024 * 1024; @@ -5202,9 +5202,9 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; - collection->server_info[UID(2, 0)]->serverMetrics = high_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; - collection->server_info[UID(4, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = high_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(4, 0)]->serverMetrics = high_avail; /* * Among server teams that have healthy space available, pick the team that is @@ -5219,13 +5219,13 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - + Future a = collection->getTeam(req); wait(a); std::pair>, bool> resTeam = req.reply.getFuture().get(); - std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0)}; + std::set expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0) }; ASSERT(resTeam.first.present()); auto servers = resTeam.first.get()->getServerIDs(); const std::set selectedServers(servers.begin(), servers.end()); @@ -5240,9 +5240,9 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - + GetStorageMetricsReply low_avail; low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * 20; low_avail.available.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE / 2; @@ -5259,8 +5259,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; - collection->server_info[UID(2, 0)]->serverMetrics = low_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = low_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = low_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); @@ -5278,7 +5278,7 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - + Future a = collection->getTeam(req); wait(a); @@ -5295,9 +5295,9 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); state int processSize = 5; - state int teamSize = 3; + state int teamSize = 3; state std::unique_ptr collection = testTeamCollection(teamSize, policy, processSize); - + GetStorageMetricsReply low_avail; if (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO > 0) { /* Pick a capacity where MIN_AVAILABLE_SPACE_RATIO of the capacity would be higher than MIN_AVAILABLE_SPACE */ @@ -5320,8 +5320,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { collection->checkTeamDelay = Void(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; - collection->server_info[UID(2, 0)]->serverMetrics = low_avail; - collection->server_info[UID(3, 0)]->serverMetrics = high_avail; + collection->server_info[UID(2, 0)]->serverMetrics = low_avail; + collection->server_info[UID(3, 0)]->serverMetrics = high_avail; collection->server_info[UID(4, 0)]->serverMetrics = low_avail; collection->server_info[UID(5, 0)]->serverMetrics = high_avail; collection->server_info[UID(1, 0)]->teams[0]->setHealthy(false); @@ -5340,13 +5340,13 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - + Future a = collection->getTeam(req); wait(a); std::pair>, bool> resTeam = req.reply.getFuture().get(); - std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0)}; + std::set expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) }; ASSERT(resTeam.first.present()); auto servers = resTeam.first.get()->getServerIDs(); const std::set selectedServers(servers.begin(), servers.end()); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 8bc7fe6991..2f59028920 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1590,4 +1590,3 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference Date: Thu, 10 Feb 2022 22:23:03 -0800 Subject: [PATCH 09/14] Removed some trace statements that were accidentally left behind in the code --- fdbserver/DDTeamCollection.actor.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 8216211a94..3940eb7c36 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -5102,10 +5102,6 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - collection->traceAllInfo(); - collection->traceServerInfo(); - collection->traceServerTeamInfo(); - Future a = collection->getTeam(req); wait(a); From c3bb248d61b272e190e33f292d079834cde2be37 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Fri, 11 Feb 2022 13:56:00 -0800 Subject: [PATCH 10/14] Adapt to members of DDCollection class that were accessed directly in test cases becoming private --- fdbserver/DDTeamCollection.actor.cpp | 24 ++++++++++++------------ fdbserver/DDTeamCollection.h | 6 ++++++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 1e6b5be3a0..5c56f58f82 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -5359,8 +5359,8 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5418,8 +5418,8 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5478,8 +5478,8 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); /* * Among server teams that have healthy space available, pick the team that is @@ -5535,8 +5535,8 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5592,8 +5592,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; @@ -5653,8 +5653,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->addTeam(std::set({ UID(3, 0), UID(4, 0), UID(5, 0) }), true); - collection->doBuildTeams = false; - collection->checkTeamDelay = Void(); + collection->enableBuildingTeams(false); + collection->clearCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; diff --git a/fdbserver/DDTeamCollection.h b/fdbserver/DDTeamCollection.h index 5d237b2902..4eed754397 100644 --- a/fdbserver/DDTeamCollection.h +++ b/fdbserver/DDTeamCollection.h @@ -595,6 +595,12 @@ public: void addTeam(std::set const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); } + // FIXME: Public for testing only + void enableBuildingTeams(bool doBuildTeams) { this->doBuildTeams = doBuildTeams; } + + // FIXME: Public for testing only + void clearCheckTeamDelay() { this->checkTeamDelay = Void(); } + // FIXME: Public for testing only // Group storage servers (process) based on their machineId in LocalityData // All created machines are healthy From a14377641c1160129886a34eb5bd4643ac084fa0 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Sun, 13 Feb 2022 23:53:12 -0800 Subject: [PATCH 11/14] Renamed a couple of test-only collection methods and make the test case waits more succinct --- fdbserver/DDTeamCollection.actor.cpp | 42 ++++++++++++---------------- fdbserver/DDTeamCollection.h | 4 +-- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 5c56f58f82..ae456376b3 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -5359,8 +5359,8 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5383,8 +5383,7 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); @@ -5418,8 +5417,8 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5443,8 +5442,7 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); @@ -5478,8 +5476,8 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); /* * Among server teams that have healthy space available, pick the team that is @@ -5500,8 +5498,7 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); @@ -5535,8 +5532,8 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = mid_avail; collection->server_info[UID(2, 0)]->serverMetrics = high_avail; @@ -5557,8 +5554,7 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); @@ -5592,8 +5588,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; @@ -5616,8 +5612,7 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); @@ -5653,8 +5648,8 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { collection->addTeam(std::set({ UID(1, 0), UID(2, 0), UID(3, 0) }), true); collection->addTeam(std::set({ UID(2, 0), UID(3, 0), UID(4, 0) }), true); collection->addTeam(std::set({ UID(3, 0), UID(4, 0), UID(5, 0) }), true); - collection->enableBuildingTeams(false); - collection->clearCheckTeamDelay(); + collection->disableBuildingTeams(); + collection->setCheckTeamDelay(); collection->server_info[UID(1, 0)]->serverMetrics = high_avail; collection->server_info[UID(2, 0)]->serverMetrics = low_avail; @@ -5678,8 +5673,7 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards); req.completeSources = completeSources; - Future a = collection->getTeam(req); - wait(a); + wait(collection->getTeam(req)); std::pair>, bool> resTeam = req.reply.getFuture().get(); diff --git a/fdbserver/DDTeamCollection.h b/fdbserver/DDTeamCollection.h index 4eed754397..ed4d27d248 100644 --- a/fdbserver/DDTeamCollection.h +++ b/fdbserver/DDTeamCollection.h @@ -596,10 +596,10 @@ public: void addTeam(std::set const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); } // FIXME: Public for testing only - void enableBuildingTeams(bool doBuildTeams) { this->doBuildTeams = doBuildTeams; } + void disableBuildingTeams() { doBuildTeams = false; } // FIXME: Public for testing only - void clearCheckTeamDelay() { this->checkTeamDelay = Void(); } + void setCheckTeamDelay() { this->checkTeamDelay = Void(); } // FIXME: Public for testing only // Group storage servers (process) based on their machineId in LocalityData From eaef2a373e930e72662f22efe6e7a4c26a00deaf Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Mon, 14 Feb 2022 10:11:00 -0800 Subject: [PATCH 12/14] Remove superfluous wait statements in DD unit tests. They wait on the getTeam future --- fdbserver/DDTeamCollection.actor.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index ae456376b3..c6dc10b0f1 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -5339,7 +5339,6 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") { } TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); @@ -5397,7 +5396,6 @@ TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") { } TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); @@ -5456,7 +5454,6 @@ TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") { } TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); @@ -5512,7 +5509,6 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") { } TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); @@ -5568,7 +5564,6 @@ TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") { } TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); @@ -5622,7 +5617,6 @@ TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") { } TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") { - wait(Future(Void())); Reference policy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); From 3420654813c70c2f458319097882a3ae94930735 Mon Sep 17 00:00:00 2001 From: "Bharadwaj V.R" Date: Mon, 14 Feb 2022 10:30:20 -0800 Subject: [PATCH 13/14] Remove DDUnitTests.toml; Between AllSimUnitTests and SpecificUnitTests, there are enough ways available to exercise the DD unit tests --- tests/CMakeLists.txt | 1 - tests/rare/DDUnitTests.toml | 9 --------- 2 files changed, 10 deletions(-) delete mode 100644 tests/rare/DDUnitTests.toml diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2c5ac26839..df6cf64466 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -189,7 +189,6 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/ConflictRangeRYOWCheck.toml) add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml) add_fdb_test(TEST_FILES rare/CycleWithKills.toml) - add_fdb_test(TEST_FILES rare/DDUnitTests.toml UNIT) add_fdb_test(TEST_FILES rare/FuzzTest.toml) add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml) add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml) diff --git a/tests/rare/DDUnitTests.toml b/tests/rare/DDUnitTests.toml deleted file mode 100644 index a4efb5ee06..0000000000 --- a/tests/rare/DDUnitTests.toml +++ /dev/null @@ -1,9 +0,0 @@ -[[test]] -testTitle = 'UnitTests' -useDB = false -startDelay = 0 - - [[test.workload]] - testName = 'UnitTests' - #maxTestCases = 1 - testsMatching = '/DataDistribution/' From 092b5cee4b4438218189c6ee14e3f22bf3b4144b Mon Sep 17 00:00:00 2001 From: Vaidas Gasiunas Date: Tue, 8 Feb 2022 09:14:12 +0100 Subject: [PATCH 14/14] MVC2.0: Rollback added code --- fdbclient/CMakeLists.txt | 2 - fdbclient/ClientKnobs.cpp | 4 - fdbclient/ClientKnobs.h | 4 - fdbclient/ClientLibManagement.actor.cpp | 801 ------------------ fdbclient/ClientLibManagement.actor.h | 146 ---- fdbclient/CommitProxyInterface.h | 5 +- fdbclient/DatabaseContext.h | 2 - fdbclient/NativeAPI.actor.cpp | 14 +- fdbclient/Schemas.cpp | 16 - fdbclient/Schemas.h | 1 - fdbclient/SystemData.cpp | 10 - fdbclient/SystemData.h | 10 - fdbserver/CMakeLists.txt | 1 - fdbserver/ClusterController.actor.cpp | 43 - .../ClientLibManagementWorkload.actor.cpp | 464 ---------- 15 files changed, 3 insertions(+), 1520 deletions(-) delete mode 100644 fdbclient/ClientLibManagement.actor.cpp delete mode 100644 fdbclient/ClientLibManagement.actor.h delete mode 100644 fdbserver/workloads/ClientLibManagementWorkload.actor.cpp diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index e442c0d86f..8a0350e3a2 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -86,8 +86,6 @@ set(FDBCLIENT_SRCS MonitorLeader.actor.cpp MonitorLeader.h MultiVersionAssignmentVars.h - ClientLibManagement.actor.cpp - ClientLibManagement.actor.h MultiVersionTransaction.actor.cpp MultiVersionTransaction.h MutationList.h diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 51d7e4ae82..6b896cca85 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -263,10 +263,6 @@ void ClientKnobs::initialize(Randomize randomize) { init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 ); init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 ); - // multi-version client control - init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 ); - init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 ); - // blob granules init( ENABLE_BLOB_GRANULES, false ); diff --git a/fdbclient/ClientKnobs.h b/fdbclient/ClientKnobs.h index 6755a07896..43dec60277 100644 --- a/fdbclient/ClientKnobs.h +++ b/fdbclient/ClientKnobs.h @@ -254,10 +254,6 @@ public: double BUSYNESS_SPIKE_START_THRESHOLD; double BUSYNESS_SPIKE_SATURATED_THRESHOLD; - // multi-version client control - int MVC_CLIENTLIB_CHUNK_SIZE; - int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION; - // blob granules bool ENABLE_BLOB_GRANULES; diff --git a/fdbclient/ClientLibManagement.actor.cpp b/fdbclient/ClientLibManagement.actor.cpp deleted file mode 100644 index 4fae229da6..0000000000 --- a/fdbclient/ClientLibManagement.actor.cpp +++ /dev/null @@ -1,801 +0,0 @@ -/* - * ClientLibManagement.actor.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fdbclient/ClientLibManagement.actor.h" -#include "fdbclient/Schemas.h" -#include "fdbclient/NativeAPI.actor.h" -#include "fdbclient/ManagementAPI.actor.h" -#include "fdbclient/ClientKnobs.h" -#include "fdbclient/SystemData.h" -#include "fdbclient/versions.h" -#include "fdbrpc/IAsyncFile.h" -#include "flow/Platform.h" - -#include -#include -#include - -#include "flow/Trace.h" -#include "flow/actorcompiler.h" // This must be the last #include. - -namespace ClientLibManagement { - -struct ClientLibBinaryInfo { - size_t totalBytes = 0; - size_t chunkCnt = 0; - size_t chunkSize = 0; - Standalone sumBytes; -}; - -#define ASSERT_INDEX_IN_RANGE(idx, arr) ASSERT(idx >= 0 && idx < sizeof(arr) / sizeof(arr[0])) - -const std::string& getStatusName(ClientLibStatus status) { - static const std::string statusNames[] = { "disabled", "uploading", "download", "active" }; - int idx = static_cast(status); - ASSERT_INDEX_IN_RANGE(idx, statusNames); - return statusNames[idx]; -} - -ClientLibStatus getStatusByName(std::string_view statusName) { - static std::map statusByName; - // initialize the map on demand - if (statusByName.empty()) { - for (int i = 0; i < static_cast(ClientLibStatus::COUNT); i++) { - ClientLibStatus status = static_cast(i); - statusByName[getStatusName(status)] = status; - } - } - auto statusIter = statusByName.find(statusName); - if (statusIter == statusByName.cend()) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Unknown status value %s", std::string(statusName).c_str())); - throw client_lib_invalid_metadata(); - } - return statusIter->second; -} - -const std::string& getPlatformName(ClientLibPlatform platform) { - static const std::string platformNames[] = { "unknown", "x84_64-linux", "x86_64-windows", "x86_64-macos" }; - int idx = static_cast(platform); - ASSERT_INDEX_IN_RANGE(idx, platformNames); - return platformNames[idx]; -} - -ClientLibPlatform getPlatformByName(std::string_view platformName) { - static std::map platformByName; - // initialize the map on demand - if (platformByName.empty()) { - for (int i = 0; i < static_cast(ClientLibPlatform::COUNT); i++) { - ClientLibPlatform platform = static_cast(i); - platformByName[getPlatformName(platform)] = platform; - } - } - auto platfIter = platformByName.find(platformName); - if (platfIter == platformByName.cend()) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Unknown platform value %s", std::string(platformName).c_str())); - throw client_lib_invalid_metadata(); - } - return platfIter->second; -} - -const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg) { - static const std::string checksumAlgNames[] = { "md5" }; - int idx = static_cast(checksumAlg); - ASSERT_INDEX_IN_RANGE(idx, checksumAlgNames); - return checksumAlgNames[idx]; -} - -ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName) { - static std::map checksumAlgByName; - // initialize the map on demand - if (checksumAlgByName.empty()) { - for (int i = 0; i < (int)ClientLibChecksumAlg::COUNT; i++) { - ClientLibChecksumAlg checksumAlg = static_cast(i); - checksumAlgByName[getChecksumAlgName(checksumAlg)] = checksumAlg; - } - } - auto iter = checksumAlgByName.find(checksumAlgName); - if (iter == checksumAlgByName.cend()) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Unknown checksum algorithm %s", std::string(checksumAlgName).c_str())); - throw client_lib_invalid_metadata(); - } - return iter->second; -} - -namespace { - -bool isValidTargetStatus(ClientLibStatus status) { - return status == ClientLibStatus::DISABLED || status == ClientLibStatus::DOWNLOAD || - status == ClientLibStatus::ACTIVE; -} - -bool isAvailableForDownload(ClientLibStatus status) { - return status == ClientLibStatus::DOWNLOAD || status == ClientLibStatus::ACTIVE; -} - -void updateClientLibChangeCounter(Transaction& tr, ClientLibStatus prevStatus, ClientLibStatus newStatus) { - static const int64_t counterIncVal = 1; - if ((prevStatus != newStatus) && - (newStatus == ClientLibStatus::DOWNLOAD || newStatus == ClientLibStatus::ACTIVE || - prevStatus == ClientLibStatus::DOWNLOAD || prevStatus == ClientLibStatus::ACTIVE)) { - tr.atomicOp(clientLibChangeCounterKey, - StringRef(reinterpret_cast(&counterIncVal), sizeof(counterIncVal)), - MutationRef::AddValue); - } -} - -json_spirit::mObject parseMetadataJson(StringRef metadataString) { - json_spirit::mValue parsedMetadata; - if (!json_spirit::read_string(metadataString.toString(), parsedMetadata) || - parsedMetadata.type() != json_spirit::obj_type) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Reason", "InvalidJSON") - .detail("Configuration", metadataString); - throw client_lib_invalid_metadata(); - } - - return parsedMetadata.get_obj(); -} - -const std::string& getMetadataStrAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) { - auto attrIter = metadataJson.find(attrName); - if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::str_type) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Missing attribute %s", attrName.c_str())); - throw client_lib_invalid_metadata(); - } - return attrIter->second.get_str(); -} - -int getMetadataIntAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) { - auto attrIter = metadataJson.find(attrName); - if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::int_type) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Missing attribute %s", attrName.c_str())); - throw client_lib_invalid_metadata(); - } - return attrIter->second.get_int(); -} - -bool validVersionPartNum(int num) { - return (num >= 0 && num < 1000); -} - -int getNumericVersionEncoding(const std::string& versionStr) { - int major, minor, patch; - int charsScanned; - int numScanned = sscanf(versionStr.c_str(), "%d.%d.%d%n", &major, &minor, &patch, &charsScanned); - if (numScanned != 3 || !validVersionPartNum(major) || !validVersionPartNum(minor) || !validVersionPartNum(patch) || - charsScanned != versionStr.size()) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Error", format("Invalid version string %s", versionStr.c_str())); - throw client_lib_invalid_metadata(); - } - return ((major * 1000) + minor) * 1000 + patch; -} - -Standalone getIdFromMetadataJson(const json_spirit::mObject& metadataJson) { - std::ostringstream libIdBuilder; - libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM) << "/"; - libIdBuilder << format("%09d", getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION))) - << "/"; - libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_TYPE) << "/"; - libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM); - return Standalone(libIdBuilder.str()); -} - -Key metadataKeyFromId(StringRef clientLibId) { - return clientLibId.withPrefix(clientLibMetadataPrefix); -} - -Key chunkKeyPrefixFromId(StringRef clientLibId) { - return clientLibId.withPrefix(clientLibBinaryPrefix).withSuffix(LiteralStringRef("/")); -} - -KeyRef chunkKeyFromNo(StringRef clientLibBinPrefix, size_t chunkNo, Arena& arena) { - return clientLibBinPrefix.withSuffix(format("%06zu", chunkNo), arena); -} - -[[maybe_unused]] ClientLibPlatform getCurrentClientPlatform() { -#ifdef __x86_64__ -#if defined(_WIN32) - return ClientLibPlatform::X86_64_WINDOWS; -#elif defined(__linux__) - return ClientLibPlatform::X86_64_LINUX; -#elif defined(__FreeBSD__) || defined(__APPLE__) - return ClientLibPlatform::X86_64_MACOS; -#else - return ClientLibPlatform::UNKNOWN; -#endif -#else // not __x86_64__ - return ClientLibPlatform::UNKNOWN; -#endif -} - -Standalone byteArrayToHexString(StringRef input) { - static const char* digits = "0123456789abcdef"; - Standalone output = makeString(input.size() * 2); - char* pout = reinterpret_cast(mutateString(output)); - for (const uint8_t* pin = input.begin(); pin != input.end(); ++pin) { - *pout++ = digits[(*pin >> 4) & 0xF]; - *pout++ = digits[(*pin) & 0xF]; - } - return output; -} - -} // namespace - -Standalone md5SumToHexString(MD5_CTX& sum) { - Standalone sumBytes = makeString(16); - ::MD5_Final(mutateString(sumBytes), &sum); - return byteArrayToHexString(sumBytes); -} - -ClientLibFilter& ClientLibFilter::filterNewerPackageVersion(const std::string& versionStr) { - matchNewerPackageVersion = true; - this->numericPkgVersion = getNumericVersionEncoding(versionStr); - return *this; -} - -Standalone getClientLibIdFromMetadataJson(StringRef metadataString) { - json_spirit::mObject parsedMetadata = parseMetadataJson(metadataString); - return getIdFromMetadataJson(parsedMetadata); -} - -namespace { - -ACTOR Future uploadClientLibBinary(Database db, - StringRef libFilePath, - KeyRef chunkKeyPrefix, - ClientLibBinaryInfo* binInfo) { - - state int chunkSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNK_SIZE, 1024); - state int transactionSize = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1) * chunkSize; - state size_t fileOffset = 0; - state size_t chunkNo = 0; - state MD5_CTX sum; - state Arena arena; - state StringRef buf; - state Transaction tr; - state size_t firstChunkNo; - - // Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library - // is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way - // we are wrapping it - state Reference fClientLib = wait(IAsyncFileSystem::filesystem()->open( - libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0)); - - ::MD5_Init(&sum); - - loop { - arena = Arena(); - // Use page-aligned buffers for enabling possible future use with AIO - buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena); - state int bytesRead = wait(fClientLib->read(mutateString(buf), transactionSize, fileOffset)); - fileOffset += bytesRead; - if (bytesRead <= 0) { - break; - } - - ::MD5_Update(&sum, buf.begin(), bytesRead); - - tr = Transaction(db); - firstChunkNo = chunkNo; - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - int bufferOffset = 0; - chunkNo = firstChunkNo; - while (bufferOffset < bytesRead) { - size_t chunkLen = std::min(chunkSize, bytesRead - bufferOffset); - KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena); - chunkNo++; - tr.set(chunkKey, ValueRef(mutateString(buf) + bufferOffset, chunkLen)); - bufferOffset += chunkLen; - } - wait(tr.commit()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - if (bytesRead < transactionSize) { - break; - } - } - binInfo->totalBytes = fileOffset; - binInfo->chunkCnt = chunkNo; - binInfo->chunkSize = chunkSize; - binInfo->sumBytes = md5SumToHexString(sum); - return Void(); -} - -} // namespace - -ACTOR Future uploadClientLibrary(Database db, - Standalone metadataString, - Standalone libFilePath) { - state json_spirit::mObject metadataJson; - state Standalone clientLibId; - state Key clientLibMetaKey; - state Key clientLibBinPrefix; - state std::string jsStr; - state Transaction tr; - state ClientLibBinaryInfo binInfo; - state ClientLibStatus targetStatus; - - metadataJson = parseMetadataJson(metadataString); - - json_spirit::mValue schema; - if (!json_spirit::read_string(JSONSchemas::clientLibMetadataSchema.toString(), schema)) { - ASSERT(false); - } - - std::string errorStr; - if (!schemaMatch(schema.get_obj(), metadataJson, errorStr, SevWarnAlways)) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Reason", "SchemaMismatch") - .detail("Configuration", metadataString) - .detail("Error", errorStr); - throw client_lib_invalid_metadata(); - } - - clientLibId = getIdFromMetadataJson(metadataJson); - clientLibMetaKey = metadataKeyFromId(clientLibId); - clientLibBinPrefix = chunkKeyPrefixFromId(clientLibId); - - targetStatus = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)); - if (!isValidTargetStatus(targetStatus)) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Reason", "InvalidTargetStatus") - .detail("Configuration", metadataString); - throw client_lib_invalid_metadata(); - } - - // check if checksumalg and platform attributes have valid values - getChecksumAlgByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM_ALG)); - getPlatformByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM)); - - // Check if further mandatory attributes are set - getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_GIT_HASH); - getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PROTOCOL); - getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION); - - metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::UPLOADING); - jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson)); - - /* - * Check if the client library with the same identifier already exists. - * If not, write its metadata with "uploading" state to prevent concurrent uploads - */ - tr = Transaction(db); - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - Optional existingMeta = wait(tr.get(clientLibMetaKey)); - if (existingMeta.present()) { - TraceEvent(SevWarnAlways, "ClientLibraryAlreadyExists") - .detail("Key", clientLibMetaKey) - .detail("ExistingMetadata", existingMeta.get().toString()); - throw client_lib_already_exists(); - } - - TraceEvent("ClientLibraryBeginUpload").detail("Key", clientLibMetaKey); - - tr.set(clientLibMetaKey, ValueRef(jsStr)); - wait(tr.commit()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - /* - * Upload the binary of the client library in chunks - */ - wait(uploadClientLibBinary(db, libFilePath, clientLibBinPrefix, &binInfo)); - - std::string checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM); - if (binInfo.sumBytes != StringRef(checkSum)) { - TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch") - .detail("Expected", checkSum) - .detail("Actual", binInfo.sumBytes) - .detail("Configuration", metadataString); - // Rollback the upload operation - try { - wait(deleteClientLibrary(db, clientLibId)); - } catch (Error& e) { - TraceEvent(SevError, "ClientLibraryUploadRollbackFailed").error(e); - } - throw client_lib_invalid_binary(); - } - - /* - * Update the metadata entry, with additional information about the binary - * and change its state from "uploading" to the given one - */ - metadataJson[CLIENTLIB_ATTR_SIZE] = static_cast(binInfo.totalBytes); - metadataJson[CLIENTLIB_ATTR_CHUNK_COUNT] = static_cast(binInfo.chunkCnt); - metadataJson[CLIENTLIB_ATTR_CHUNK_SIZE] = static_cast(binInfo.chunkSize); - metadataJson[CLIENTLIB_ATTR_FILENAME] = basename(libFilePath.toString()); - metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(targetStatus); - jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson)); - - tr.reset(); - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.set(clientLibMetaKey, ValueRef(jsStr)); - updateClientLibChangeCounter(tr, ClientLibStatus::DISABLED, targetStatus); - wait(tr.commit()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - TraceEvent("ClientLibraryUploadDone").detail("Key", clientLibMetaKey); - return Void(); -} - -ACTOR Future downloadClientLibrary(Database db, - Standalone clientLibId, - Standalone libFilePath) { - state Key clientLibMetaKey = metadataKeyFromId(clientLibId); - state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId); - state int chunksPerTransaction = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1); - state int transactionSize; - state json_spirit::mObject metadataJson; - state std::string checkSum; - state size_t chunkCount; - state size_t binarySize; - state size_t expectedChunkSize; - state Transaction tr; - state size_t fileOffset; - state MD5_CTX sum; - state Arena arena; - state StringRef buf; - state size_t bufferOffset; - state size_t fromChunkNo; - state size_t toChunkNo; - state std::vector>> chunkFutures; - - TraceEvent("ClientLibraryBeginDownload").detail("Key", clientLibMetaKey); - - /* - * First read the metadata to get information about the status and - * the chunk count of the client library - */ - loop { - tr = Transaction(db); - try { - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - Optional metadataOpt = wait(tr.get(clientLibMetaKey)); - if (!metadataOpt.present()) { - TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey); - throw client_lib_not_found(); - } - metadataJson = parseMetadataJson(metadataOpt.get()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - // Prevent downloading not yet uploaded and disabled libraries - if (!isAvailableForDownload(getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)))) { - throw client_lib_not_available(); - } - - // Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library - // is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way - // we are wrapping it - int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE | - IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO; - state Reference fClientLib = - wait(IAsyncFileSystem::filesystem()->open(libFilePath.toString(), flags, 0666)); - - checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM); - chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT); - binarySize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_SIZE); - expectedChunkSize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_SIZE); - transactionSize = chunksPerTransaction * expectedChunkSize; - fileOffset = 0; - fromChunkNo = 0; - - ::MD5_Init(&sum); - - arena = Arena(); - // Use page-aligned buffers for enabling possible future use with AIO - buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena); - - loop { - if (fromChunkNo == chunkCount) { - break; - } - - tr = Transaction(db); - toChunkNo = std::min(chunkCount, fromChunkNo + chunksPerTransaction); - - // read a batch of file chunks concurrently - loop { - try { - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - - chunkFutures.clear(); - for (size_t chunkNo = fromChunkNo; chunkNo < toChunkNo; chunkNo++) { - KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena); - chunkFutures.push_back(tr.get(chunkKey)); - } - - wait(waitForAll(chunkFutures)); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - // check the read chunks and copy them to a buffer - bufferOffset = 0; - size_t chunkNo = fromChunkNo; - for (auto chunkOptFuture : chunkFutures) { - if (!chunkOptFuture.get().present()) { - TraceEvent(SevWarnAlways, "ClientLibraryChunkNotFound") - .detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena)); - throw client_lib_invalid_binary(); - } - StringRef chunkVal = chunkOptFuture.get().get(); - - // All chunks exept for the last one must be of the expected size to guarantee - // alignment when writing to file - if ((chunkNo != (chunkCount - 1) && chunkVal.size() != expectedChunkSize) || - chunkVal.size() > expectedChunkSize) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidChunkSize") - .detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena)) - .detail("MaxSize", expectedChunkSize) - .detail("ActualSize", chunkVal.size()); - throw client_lib_invalid_binary(); - } - - memcpy(mutateString(buf) + bufferOffset, chunkVal.begin(), chunkVal.size()); - bufferOffset += chunkVal.size(); - chunkNo++; - } - - // write the chunks to the file, update checksum - if (bufferOffset > 0) { - wait(fClientLib->write(buf.begin(), bufferOffset, fileOffset)); - fileOffset += bufferOffset; - ::MD5_Update(&sum, buf.begin(), bufferOffset); - } - - // move to the next batch - fromChunkNo = toChunkNo; - } - - // check if the downloaded file size is as expected - if (fileOffset != binarySize) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidSize") - .detail("ExpectedSize", binarySize) - .detail("ActualSize", fileOffset); - throw client_lib_invalid_binary(); - } - - // check if the checksum of downloaded file is as expected - Standalone sumBytesStr = md5SumToHexString(sum); - if (sumBytesStr != StringRef(checkSum)) { - TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch") - .detail("Expected", checkSum) - .detail("Actual", sumBytesStr) - .detail("Key", clientLibMetaKey); - throw client_lib_invalid_binary(); - } - - wait(fClientLib->sync()); - - TraceEvent("ClientLibraryDownloadDone").detail("Key", clientLibMetaKey); - return Void(); -} - -ACTOR Future deleteClientLibrary(Database db, Standalone clientLibId) { - state Key clientLibMetaKey = metadataKeyFromId(clientLibId.toString()); - state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId.toString()); - - TraceEvent("ClientLibraryBeginDelete").detail("Key", clientLibMetaKey); - - loop { - state Transaction tr(db); - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - Optional metadataOpt = wait(tr.get(clientLibMetaKey)); - if (!metadataOpt.present()) { - TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey); - throw client_lib_not_found(); - } - json_spirit::mObject metadataJson = parseMetadataJson(metadataOpt.get()); - ClientLibStatus status = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)); - tr.clear(prefixRange(chunkKeyPrefix)); - tr.clear(clientLibMetaKey); - updateClientLibChangeCounter(tr, status, ClientLibStatus::DISABLED); - wait(tr.commit()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - TraceEvent("ClientLibraryDeleteDone").detail("Key", clientLibMetaKey); - return Void(); -} - -namespace { - -void applyClientLibFilter(const ClientLibFilter& filter, - const RangeResultRef& scanResults, - Standalone>& filteredResults) { - for (const auto& [k, v] : scanResults) { - try { - json_spirit::mObject metadataJson = parseMetadataJson(v); - if (filter.matchAvailableOnly && - !isAvailableForDownload(getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)))) { - continue; - } - if (filter.matchCompatibleAPI && - getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION) < filter.apiVersion) { - continue; - } - if (filter.matchNewerPackageVersion && !filter.matchPlatform && - getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION)) <= - filter.numericPkgVersion) { - continue; - } - filteredResults.push_back_deep(filteredResults.arena(), v); - } catch (Error& e) { - // Entries with invalid metadata on the cluster - // Can happen only if the official management interface is bypassed - ASSERT(e.code() == error_code_client_lib_invalid_metadata); - TraceEvent(SevError, "ClientLibraryIgnoringInvalidMetadata").detail("Metadata", v); - } - } -} - -} // namespace - -ACTOR Future>> listClientLibraries(Database db, ClientLibFilter filter) { - state Standalone> result; - state Transaction tr(db); - state PromiseStream> scanResults; - state Key fromKey; - state Key toKey; - state KeyRangeRef scanRange; - state Future stream; - - loop { - try { - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - if (filter.matchPlatform) { - Key prefixWithPlatform = - clientLibMetadataPrefix.withSuffix(std::string(getPlatformName(filter.platformVal))); - fromKey = prefixWithPlatform.withSuffix(LiteralStringRef("/")); - if (filter.matchNewerPackageVersion) { - fromKey = fromKey.withSuffix(format("%09d", filter.numericPkgVersion + 1)); - } - toKey = prefixWithPlatform.withSuffix(LiteralStringRef("0")); - scanRange = KeyRangeRef(fromKey, toKey); - } else { - scanRange = clientLibMetadataKeys; - } - scanResults = PromiseStream>(); - stream = tr.getRangeStream(scanResults, scanRange, GetRangeLimits()); - loop { - Standalone scanResultRange = waitNext(scanResults.getFuture()); - applyClientLibFilter(filter, scanResultRange, result); - } - } catch (Error& e) { - if (e.code() == error_code_end_of_stream) { - break; - } - wait(tr.onError(e)); - } - } - return result; -} - -ACTOR Future getClientLibraryStatus(Database db, Standalone clientLibId) { - state Key clientLibMetaKey = metadataKeyFromId(clientLibId); - state Transaction tr(db); - loop { - try { - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - Optional metadataOpt = wait(tr.get(clientLibMetaKey)); - if (!metadataOpt.present()) { - TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey); - throw client_lib_not_found(); - } - json_spirit::mObject metadataJson = parseMetadataJson(metadataOpt.get()); - return getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)); - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} - -ACTOR Future changeClientLibraryStatus(Database db, - Standalone clientLibId, - ClientLibStatus newStatus) { - state Key clientLibMetaKey = metadataKeyFromId(clientLibId); - state json_spirit::mObject metadataJson; - state std::string jsStr; - state Transaction tr; - - if (!isValidTargetStatus(newStatus)) { - TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata") - .detail("Reason", "InvalidTargetStatus") - .detail("Status", getStatusName(newStatus)); - throw client_lib_invalid_metadata(); - } - - loop { - tr = Transaction(db); - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - Optional metadataOpt = wait(tr.get(clientLibMetaKey)); - if (!metadataOpt.present()) { - TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey); - throw client_lib_not_found(); - } - metadataJson = parseMetadataJson(metadataOpt.get()); - ClientLibStatus prevStatus = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)); - if (prevStatus == newStatus) { - return Void(); - } - metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(newStatus); - jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson)); - tr.set(clientLibMetaKey, ValueRef(jsStr)); - - updateClientLibChangeCounter(tr, prevStatus, newStatus); - - wait(tr.commit()); - break; - } catch (Error& e) { - if (e.code() == error_code_client_lib_not_found) { - throw; - } - wait(tr.onError(e)); - } - } - - TraceEvent("ClientLibraryStatusChanged").detail("Key", clientLibMetaKey).detail("Status", getStatusName(newStatus)); - return Void(); -} - -} // namespace ClientLibManagement diff --git a/fdbclient/ClientLibManagement.actor.h b/fdbclient/ClientLibManagement.actor.h deleted file mode 100644 index ab3a48ba8f..0000000000 --- a/fdbclient/ClientLibManagement.actor.h +++ /dev/null @@ -1,146 +0,0 @@ -/* - * ClientLibManagement.actor.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once -#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H) -#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H -#include "fdbclient/ClientLibManagement.actor.g.h" -#elif !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H) -#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H - -#include -#include "fdbclient/NativeAPI.actor.h" -#include "fdbclient/md5/md5.h" - -#include "flow/actorcompiler.h" // has to be last include - -namespace ClientLibManagement { - -enum class ClientLibStatus { - DISABLED = 0, - UPLOADING, // 1 - DOWNLOAD, // 2 - ACTIVE, // 3 - COUNT // must be the last one -}; - -enum class ClientLibPlatform { - UNKNOWN = 0, - X86_64_LINUX, - X86_64_WINDOWS, - X86_64_MACOS, - COUNT // must be the last one -}; - -// Currently we support only one, -// but we may want to change it in the future -enum class ClientLibChecksumAlg { - MD5 = 0, - COUNT // must be the last one -}; - -inline const std::string CLIENTLIB_ATTR_PLATFORM{ "platform" }; -inline const std::string CLIENTLIB_ATTR_STATUS{ "status" }; -inline const std::string CLIENTLIB_ATTR_CHECKSUM{ "checksum" }; -inline const std::string CLIENTLIB_ATTR_VERSION{ "version" }; -inline const std::string CLIENTLIB_ATTR_TYPE{ "type" }; -inline const std::string CLIENTLIB_ATTR_API_VERSION{ "apiversion" }; -inline const std::string CLIENTLIB_ATTR_PROTOCOL{ "protocol" }; -inline const std::string CLIENTLIB_ATTR_GIT_HASH{ "githash" }; -inline const std::string CLIENTLIB_ATTR_FILENAME{ "filename" }; -inline const std::string CLIENTLIB_ATTR_SIZE{ "size" }; -inline const std::string CLIENTLIB_ATTR_CHUNK_COUNT{ "chunkcount" }; -inline const std::string CLIENTLIB_ATTR_CHUNK_SIZE{ "chunksize" }; -inline const std::string CLIENTLIB_ATTR_CHECKSUM_ALG{ "checksumalg" }; - -struct ClientLibFilter { - bool matchAvailableOnly = false; - bool matchPlatform = false; - bool matchCompatibleAPI = false; - bool matchNewerPackageVersion = false; - ClientLibPlatform platformVal = ClientLibPlatform::UNKNOWN; - int apiVersion = 0; - int numericPkgVersion = 0; - - ClientLibFilter& filterAvailable() { - matchAvailableOnly = true; - return *this; - } - - ClientLibFilter& filterPlatform(ClientLibPlatform platformVal) { - matchPlatform = true; - this->platformVal = platformVal; - return *this; - } - - ClientLibFilter& filterCompatibleAPI(int apiVersion) { - matchCompatibleAPI = true; - this->apiVersion = apiVersion; - return *this; - } - - // expects a version string like "6.3.10" - ClientLibFilter& filterNewerPackageVersion(const std::string& versionStr); -}; - -const std::string& getStatusName(ClientLibStatus status); -ClientLibStatus getStatusByName(std::string_view statusName); - -const std::string& getPlatformName(ClientLibPlatform platform); -ClientLibPlatform getPlatformByName(std::string_view platformName); - -const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg); -ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName); - -// encodes MD5 result to a hexadecimal string to be provided in the checksum attribute -Standalone md5SumToHexString(MD5_CTX& sum); - -// Upload a client library binary from a file and associated metadata JSON -// to the system keyspace of the database -ACTOR Future uploadClientLibrary(Database db, - Standalone metadataString, - Standalone libFilePath); - -// Determine clientLibId from the relevant attributes of the metadata JSON -Standalone getClientLibIdFromMetadataJson(StringRef metadataString); - -// Download a client library binary from the system keyspace of the database -// and save it at the given file path -ACTOR Future downloadClientLibrary(Database db, - Standalone clientLibId, - Standalone libFilePath); - -// Delete the client library binary from to the system keyspace of the database -ACTOR Future deleteClientLibrary(Database db, Standalone clientLibId); - -// List client libraries available on the cluster, with the specified filter -// Returns metadata JSON of each library -ACTOR Future>> listClientLibraries(Database db, ClientLibFilter filter); - -// Get the current status of an uploaded client library -ACTOR Future getClientLibraryStatus(Database db, Standalone clientLibId); - -// Change client library metadata status -ACTOR Future changeClientLibraryStatus(Database db, Standalone clientLibId, ClientLibStatus newStatus); - -} // namespace ClientLibManagement - -#include "flow/unactorcompiler.h" -#endif \ No newline at end of file diff --git a/fdbclient/CommitProxyInterface.h b/fdbclient/CommitProxyInterface.h index c46bdcb709..ecb745d318 100644 --- a/fdbclient/CommitProxyInterface.h +++ b/fdbclient/CommitProxyInterface.h @@ -115,9 +115,6 @@ struct ClientDBInfo { firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk Optional forward; std::vector history; - // a counter increased every time a change of uploaded client libraries - // happens, the clients need to be aware of - uint64_t clientLibChangeCounter = 0; ClientDBInfo() {} @@ -129,7 +126,7 @@ struct ClientDBInfo { if constexpr (!is_fb_function) { ASSERT(ar.protocolVersion().isValid()); } - serializer(ar, grvProxies, commitProxies, id, forward, history, clientLibChangeCounter); + serializer(ar, grvProxies, commitProxies, id, forward, history); } }; diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index b863961f8f..bd7b6f0e3a 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -249,7 +249,6 @@ public: Future> getCommitProxiesFuture(UseProvisionalProxies useProvisionalProxies); Reference getGrvProxies(UseProvisionalProxies useProvisionalProxies); Future onProxiesChanged() const; - Future onClientLibStatusChanged() const; Future getHealthMetrics(bool detailed); // Pass a negative value for `shardLimit` to indicate no limit on the shard number. Future getStorageMetrics(KeyRange const& keys, int shardLimit); @@ -347,7 +346,6 @@ public: // Key DB-specific information Reference>> connectionRecord; AsyncTrigger proxiesChangeTrigger; - AsyncTrigger clientLibChangeTrigger; Future clientDBInfoMonitor; Future monitorTssInfoChange; Future tssMismatchHandler; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 79165dbf4c..b2019b989b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -732,15 +732,12 @@ Future attemptGRVFromOldProxies(std::vector oldProxies, ACTOR static Future monitorClientDBInfoChange(DatabaseContext* cx, Reference const> clientDBInfo, - AsyncTrigger* proxyChangeTrigger, - AsyncTrigger* clientLibChangeTrigger) { + AsyncTrigger* proxyChangeTrigger) { state std::vector curCommitProxies; state std::vector curGrvProxies; state ActorCollection actors(false); - state uint64_t curClientLibChangeCounter; curCommitProxies = clientDBInfo->get().commitProxies; curGrvProxies = clientDBInfo->get().grvProxies; - curClientLibChangeCounter = clientDBInfo->get().clientLibChangeCounter; loop { choose { @@ -763,9 +760,6 @@ ACTOR static Future monitorClientDBInfoChange(DatabaseContext* cx, curGrvProxies = clientDBInfo->get().grvProxies; proxyChangeTrigger->trigger(); } - if (curClientLibChangeCounter != clientDBInfo->get().clientLibChangeCounter) { - clientLibChangeTrigger->trigger(); - } } when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); } } @@ -1255,7 +1249,7 @@ DatabaseContext::DatabaseContext(Reference DatabaseContext::onProxiesChanged() const { return this->proxiesChangeTrigger.onTrigger(); } -Future DatabaseContext::onClientLibStatusChanged() const { - return this->clientLibChangeTrigger.onTrigger(); -} - bool DatabaseContext::sampleReadTags() const { double sampleRate = GlobalConfig::globalConfig().get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE); return sampleRate > 0 && deterministicRandom()->random01() <= sampleRate; diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 9ca3c122f1..2e9fcc53a6 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -1077,19 +1077,3 @@ const KeyRef JSONSchemas::managementApiErrorSchema = LiteralStringRef(R"""( "message": "The reason of the error" } )"""); - -const KeyRef JSONSchemas::clientLibMetadataSchema = LiteralStringRef(R"""( -{ - "platform": "x86_64-linux", - "version": "7.1.0", - "githash": "e28fef6264d05ab0c9488238022d1ee885a30bea", - "type": "debug", - "checksum": "fcef53fb4ae86d2c4fff4dc17c7e5d08", - "checksumalg": "md5", - "apiversion": 710, - "protocol": "fdb00b07001001", - "filename": "libfdb_c.7.1.0.so", - "size" : 19467552, - "chunkcount" : 2377, - "status": "available" -})"""); diff --git a/fdbclient/Schemas.h b/fdbclient/Schemas.h index 9686cf3cdb..88424d6a87 100644 --- a/fdbclient/Schemas.h +++ b/fdbclient/Schemas.h @@ -35,7 +35,6 @@ struct JSONSchemas { static const KeyRef storageHealthSchema; static const KeyRef aggregateHealthSchema; static const KeyRef managementApiErrorSchema; - static const KeyRef clientLibMetadataSchema; }; #endif /* FDBCLIENT_SCHEMAS_H */ diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 4d8d4bf56a..fe10cae868 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1033,16 +1033,6 @@ std::pair decodeHealthyZoneValue(ValueRef const& value) { return std::make_pair(zoneId, version); } -const KeyRangeRef clientLibMetadataKeys(LiteralStringRef("\xff\x02/clientlib/meta/"), - LiteralStringRef("\xff\x02/clientlib/meta0")); -const KeyRef clientLibMetadataPrefix = clientLibMetadataKeys.begin; - -const KeyRangeRef clientLibBinaryKeys(LiteralStringRef("\xff\x02/clientlib/bin/"), - LiteralStringRef("\xff\x02/clientlib/bin0")); -const KeyRef clientLibBinaryPrefix = clientLibBinaryKeys.begin; - -const KeyRef clientLibChangeCounterKey = "\xff\x02/clientlib/changeCounter"_sr; - const KeyRangeRef testOnlyTxnStateStorePrefixRange(LiteralStringRef("\xff/TESTONLYtxnStateStore/"), LiteralStringRef("\xff/TESTONLYtxnStateStore0")); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 2cc507cae4..14234151a3 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -488,16 +488,6 @@ extern const KeyRef rebalanceDDIgnoreKey; const Value healthyZoneValue(StringRef const& zoneId, Version version); std::pair decodeHealthyZoneValue(ValueRef const&); -// Key ranges reserved for storing client library binaries and respective -// json documents with the metadata describing the libaries -extern const KeyRangeRef clientLibMetadataKeys; -extern const KeyRef clientLibMetadataPrefix; - -extern const KeyRangeRef clientLibBinaryKeys; -extern const KeyRef clientLibBinaryPrefix; - -extern const KeyRef clientLibChangeCounterKey; - // All mutations done to this range are blindly copied into txnStateStore. // Used to create artifically large txnStateStore instances in testing. extern const KeyRangeRef testOnlyTxnStateStorePrefixRange; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index c207ee694f..3e919258ea 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -174,7 +174,6 @@ set(FDBSERVER_SRCS workloads/Cache.actor.cpp workloads/ChangeConfig.actor.cpp workloads/ClearSingleRange.actor.cpp - workloads/ClientLibManagementWorkload.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp workloads/TriggerRecovery.actor.cpp workloads/SuspendProcesses.actor.cpp diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d187e40191..99eac07eaa 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1633,48 +1633,6 @@ ACTOR Future monitorGlobalConfig(ClusterControllerData::DBInfo* db) { } } -ACTOR Future monitorClientLibChangeCounter(ClusterControllerData::DBInfo* db) { - state ClientDBInfo clientInfo; - state ReadYourWritesTransaction tr; - state Future clientLibChangeFuture; - - loop { - tr = ReadYourWritesTransaction(db->db); - loop { - try { - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - - Optional counterVal = wait(tr.get(clientLibChangeCounterKey)); - if (counterVal.present() && counterVal.get().size() == sizeof(uint64_t)) { - uint64_t changeCounter = *reinterpret_cast(counterVal.get().begin()); - - clientInfo = db->serverInfo->get().client; - if (changeCounter != clientInfo.clientLibChangeCounter) { - TraceEvent("ClientLibChangeCounterChanged").detail("Value", changeCounter); - clientInfo.id = deterministicRandom()->randomUniqueID(); - clientInfo.clientLibChangeCounter = changeCounter; - db->clientInfo->set(clientInfo); - - ServerDBInfo serverInfo = db->serverInfo->get(); - serverInfo.id = deterministicRandom()->randomUniqueID(); - serverInfo.infoGeneration = ++db->dbInfoCount; - serverInfo.client = clientInfo; - db->serverInfo->set(serverInfo); - } - } - - clientLibChangeFuture = tr.watch(clientLibChangeCounterKey); - wait(tr.commit()); - wait(clientLibChangeFuture); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - } -} - ACTOR Future updatedChangingDatacenters(ClusterControllerData* self) { // do not change the cluster controller until all the processes have had a chance to register wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); @@ -2466,7 +2424,6 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(monitorProcessClasses(&self)); self.addActor.send(monitorServerInfoConfig(&self.db)); self.addActor.send(monitorGlobalConfig(&self.db)); - self.addActor.send(monitorClientLibChangeCounter(&self.db)); self.addActor.send(updatedChangingDatacenters(&self)); self.addActor.send(updatedChangedDatacenters(&self)); self.addActor.send(updateDatacenterVersionDifference(&self)); diff --git a/fdbserver/workloads/ClientLibManagementWorkload.actor.cpp b/fdbserver/workloads/ClientLibManagementWorkload.actor.cpp deleted file mode 100644 index b97a74320b..0000000000 --- a/fdbserver/workloads/ClientLibManagementWorkload.actor.cpp +++ /dev/null @@ -1,464 +0,0 @@ -/* - * ClientLibManagementWorkload.actor.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fdbrpc/IAsyncFile.h" -#include "fdbserver/workloads/workloads.actor.h" -#include "fdbclient/ClientLibManagement.actor.h" -#include "fdbserver/workloads/AsyncFile.actor.h" -#include "fdbclient/md5/md5.h" -#include "flow/Error.h" -#include "flow/IRandom.h" -#include "flow/actorcompiler.h" // This must be the last #include. - -using namespace ClientLibManagement; - -/** - * Workload for testing ClientLib management operations, declared in - * MultiVersionClientControl.actor.h - */ -struct ClientLibManagementWorkload : public TestWorkload { - static constexpr size_t FILE_CHUNK_SIZE = 128 * 1024; // Used for test setup only - - size_t testFileSize = 0; - RandomByteGenerator rbg; - Standalone uploadedClientLibId; - json_spirit::mObject uploadedMetadataJson; - Standalone generatedChecksum; - std::string generatedFileName; - bool success = true; - - /*---------------------------------------------------------------- - * Interface - */ - - ClientLibManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { - int minTestFileSize = getOption(options, LiteralStringRef("minTestFileSize"), 0); - int maxTestFileSize = getOption(options, LiteralStringRef("maxTestFileSize"), 1024 * 1024); - testFileSize = deterministicRandom()->randomInt(minTestFileSize, maxTestFileSize + 1); - } - - std::string description() const override { return "ClientLibManagement"; } - - Future setup(Database const& cx) override { return _setup(this); } - - Future start(Database const& cx) override { return _start(this, cx); } - - Future check(Database const& cx) override { return success; } - - void getMetrics(std::vector& m) override {} - - /*---------------------------------------------------------------- - * Setup - */ - - ACTOR Future _setup(ClientLibManagementWorkload* self) { - state Reference data = self->allocateBuffer(FILE_CHUNK_SIZE); - state size_t fileOffset; - state MD5_CTX sum; - state size_t bytesToWrite; - - self->generatedFileName = format("clientLibUpload%d", self->clientId); - int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | - IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO; - state Reference file = - wait(IAsyncFileSystem::filesystem()->open(self->generatedFileName, flags, 0666)); - - ::MD5_Init(&sum); - - for (fileOffset = 0; fileOffset < self->testFileSize; fileOffset += FILE_CHUNK_SIZE) { - self->rbg.writeRandomBytesToBuffer(data->buffer, FILE_CHUNK_SIZE); - bytesToWrite = std::min(FILE_CHUNK_SIZE, self->testFileSize - fileOffset); - wait(file->write(data->buffer, bytesToWrite, fileOffset)); - - ::MD5_Update(&sum, data->buffer, bytesToWrite); - } - wait(file->sync()); - - self->generatedChecksum = md5SumToHexString(sum); - - return Void(); - } - - /*---------------------------------------------------------------- - * Tests - */ - - ACTOR static Future _start(ClientLibManagementWorkload* self, Database cx) { - wait(testUploadClientLibInvalidInput(self, cx)); - wait(testClientLibUploadFileDoesNotExist(self, cx)); - wait(testUploadClientLib(self, cx)); - wait(testClientLibListAfterUpload(self, cx)); - wait(testDownloadClientLib(self, cx)); - wait(testClientLibDownloadNotExisting(self, cx)); - wait(testChangeClientLibStatusErrors(self, cx)); - wait(testDisableClientLib(self, cx)); - wait(testChangeStateToDownload(self, cx)); - wait(testDeleteClientLib(self, cx)); - wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "No filter, after delete")); - return Void(); - } - - ACTOR static Future testUploadClientLibInvalidInput(ClientLibManagementWorkload* self, Database cx) { - state std::vector invalidMetadataStrs = { - "{foo", // invalid json - "[]", // json array - }; - state StringRef metadataStr; - - // add garbage attribute - json_spirit::mObject metadataJson; - validClientLibMetadataSample(metadataJson); - metadataJson["unknownattr"] = "someval"; - invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson))); - - const std::string mandatoryAttrs[] = { CLIENTLIB_ATTR_PLATFORM, CLIENTLIB_ATTR_VERSION, - CLIENTLIB_ATTR_CHECKSUM, CLIENTLIB_ATTR_TYPE, - CLIENTLIB_ATTR_GIT_HASH, CLIENTLIB_ATTR_PROTOCOL, - CLIENTLIB_ATTR_API_VERSION, CLIENTLIB_ATTR_CHECKSUM_ALG }; - - for (const std::string& attr : mandatoryAttrs) { - validClientLibMetadataSample(metadataJson); - metadataJson.erase(attr); - invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson))); - } - - for (auto& testMetadataStr : invalidMetadataStrs) { - metadataStr = StringRef(testMetadataStr); - wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)), - "uploadClientLibrary with invalid metadata", - client_lib_invalid_metadata(), - &self->success, - { { "Metadata", metadataStr.toString().c_str() } })); - } - - return Void(); - } - - ACTOR static Future testClientLibUploadFileDoesNotExist(ClientLibManagementWorkload* self, Database cx) { - state Standalone metadataStr; - json_spirit::mObject metadataJson; - validClientLibMetadataSample(metadataJson); - metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson))); - wait(testExpectedError(uploadClientLibrary(cx, metadataStr, "some_not_existing_file_name"_sr), - "uploadClientLibrary with a not existing file", - file_not_found(), - &self->success)); - return Void(); - } - - ACTOR static Future testUploadClientLibWrongChecksum(ClientLibManagementWorkload* self, Database cx) { - state Standalone metadataStr; - validClientLibMetadataSample(self->uploadedMetadataJson); - metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson))); - self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr); - wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)), - "uploadClientLibrary wrong checksum", - client_lib_invalid_binary(), - &self->success)); - wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "After upload with wrong checksum")); - return Void(); - } - - ACTOR static Future testUploadClientLib(ClientLibManagementWorkload* self, Database cx) { - state Standalone metadataStr; - state std::vector>> concurrentUploads; - state Future clientLibChanged = cx->onClientLibStatusChanged(); - - validClientLibMetadataSample(self->uploadedMetadataJson); - self->uploadedMetadataJson[CLIENTLIB_ATTR_CHECKSUM] = self->generatedChecksum.toString(); - // avoid clientLibId clashes, when multiple clients try to upload the same file - self->uploadedMetadataJson[CLIENTLIB_ATTR_TYPE] = format("devbuild%d", self->clientId); - self->uploadedMetadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::ACTIVE); - metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson))); - self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr); - - // Test two concurrent uploads of the same library, one of the must fail and another succeed - for (int i1 = 0; i1 < 2; i1++) { - Future uploadActor = uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)); - concurrentUploads.push_back(errorOr(uploadActor)); - } - - wait(waitForAll(concurrentUploads)); - - int successCnt = 0; - for (auto uploadRes : concurrentUploads) { - if (uploadRes.get().isError()) { - self->testErrorCode( - "concurrent client lib upload", client_lib_already_exists(), uploadRes.get().getError()); - } else { - successCnt++; - } - } - - if (successCnt == 0) { - TraceEvent(SevError, "ClientLibUploadFailed").log(); - self->success = false; - throw operation_failed(); - } else if (successCnt > 1) { - TraceEvent(SevError, "ClientLibConflictingUpload").log(); - self->success = false; - } - - // Clients should be notified about upload of a library with the active status - Optional notificationWait = wait(timeout(clientLibChanged, 100.0)); - if (!notificationWait.present()) { - TraceEvent(SevError, "ClientLibChangeNotificationFailed").log(); - self->success = false; - } - - return Void(); - } - - ACTOR static Future testClientLibDownloadNotExisting(ClientLibManagementWorkload* self, Database cx) { - // Generate a random valid clientLibId - state Standalone clientLibId; - state std::string destFileName; - json_spirit::mObject metadataJson; - validClientLibMetadataSample(metadataJson); - Standalone metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson))); - clientLibId = getClientLibIdFromMetadataJson(metadataStr); - - destFileName = format("clientLibDownload%d", self->clientId); - wait(testExpectedError(downloadClientLibrary(cx, StringRef(clientLibId), StringRef(destFileName)), - "download not existing client library", - client_lib_not_found(), - &self->success)); - return Void(); - } - - ACTOR static Future testDownloadClientLib(ClientLibManagementWorkload* self, Database cx) { - state std::string destFileName = format("clientLibDownload%d", self->clientId); - wait(downloadClientLibrary(cx, self->uploadedClientLibId, StringRef(destFileName))); - - FILE* f = fopen(destFileName.c_str(), "r"); - if (f == nullptr) { - TraceEvent(SevError, "ClientLibDownloadFileDoesNotExist").detail("FileName", destFileName); - self->success = false; - } else { - fseek(f, 0L, SEEK_END); - size_t fileSize = ftell(f); - if (fileSize != self->testFileSize) { - TraceEvent(SevError, "ClientLibDownloadFileSizeMismatch") - .detail("ExpectedSize", self->testFileSize) - .detail("ActualSize", fileSize); - self->success = false; - } - fclose(f); - } - - return Void(); - } - - ACTOR static Future testDeleteClientLib(ClientLibManagementWorkload* self, Database cx) { - state Future clientLibChanged = cx->onClientLibStatusChanged(); - - wait(deleteClientLibrary(cx, self->uploadedClientLibId)); - - // Clients should be notified about deletion of the library, because it has "download" status - Optional notificationWait = wait(timeout(clientLibChanged, 100.0)); - if (!notificationWait.present()) { - TraceEvent(SevError, "ClientLibChangeNotificationFailed").log(); - } - return Void(); - } - - ACTOR static Future testClientLibListAfterUpload(ClientLibManagementWorkload* self, Database cx) { - state int uploadedApiVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_API_VERSION].get_int(); - state ClientLibPlatform uploadedPlatform = - getPlatformByName(self->uploadedMetadataJson[CLIENTLIB_ATTR_PLATFORM].get_str()); - state std::string uploadedVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_VERSION].get_str(); - state ClientLibFilter filter; - - filter = ClientLibFilter(); - wait(testUploadedClientLibInList(self, cx, filter, true, "No filter")); - filter = ClientLibFilter().filterAvailable(); - wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available")); - filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion); - wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available, the same API")); - filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion + 1); - wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, newer API")); - filter = ClientLibFilter().filterCompatibleAPI(uploadedApiVersion).filterPlatform(uploadedPlatform); - wait(testUploadedClientLibInList(self, cx, filter, true, "Filter the same API, the same platform")); - ASSERT(uploadedPlatform != ClientLibPlatform::X86_64_WINDOWS); - filter = ClientLibFilter().filterAvailable().filterPlatform(ClientLibPlatform::X86_64_WINDOWS); - wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, different platform")); - filter = ClientLibFilter().filterAvailable().filterNewerPackageVersion(uploadedVersion); - wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, the same version")); - filter = - ClientLibFilter().filterAvailable().filterNewerPackageVersion("1.15.10").filterPlatform(uploadedPlatform); - wait(testUploadedClientLibInList( - self, cx, filter, true, "Filter available, an older version, the same platform")); - filter = ClientLibFilter() - .filterAvailable() - .filterNewerPackageVersion(uploadedVersion) - .filterPlatform(uploadedPlatform); - wait(testUploadedClientLibInList( - self, cx, filter, false, "Filter available, the same version, the same platform")); - filter = ClientLibFilter().filterNewerPackageVersion("100.1.1"); - wait(testUploadedClientLibInList(self, cx, filter, false, "Filter a newer version")); - filter = ClientLibFilter().filterNewerPackageVersion("1.15.10"); - wait(testUploadedClientLibInList(self, cx, filter, true, "Filter an older version")); - return Void(); - } - - ACTOR static Future testUploadedClientLibInList(ClientLibManagementWorkload* self, - Database cx, - ClientLibFilter filter, - bool expectInList, - const char* testDescr) { - Standalone> allLibs = wait(listClientLibraries(cx, filter)); - bool found = false; - for (StringRef metadataJson : allLibs) { - Standalone clientLibId; - clientLibId = getClientLibIdFromMetadataJson(metadataJson); - if (clientLibId == self->uploadedClientLibId) { - found = true; - } - } - if (found != expectInList) { - TraceEvent(SevError, "ClientLibInListTestFailed") - .detail("Test", testDescr) - .detail("ClientLibId", self->uploadedClientLibId) - .detail("Expected", expectInList) - .detail("Actual", found); - self->success = false; - } - return Void(); - } - - ACTOR static Future testChangeClientLibStatusErrors(ClientLibManagementWorkload* self, Database cx) { - wait(testExpectedError(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::UPLOADING), - "Setting invalid client library status", - client_lib_invalid_metadata(), - &self->success)); - - wait(testExpectedError(changeClientLibraryStatus(cx, "notExistingClientLib"_sr, ClientLibStatus::DOWNLOAD), - "Changing not existing client library status", - client_lib_not_found(), - &self->success)); - return Void(); - } - - ACTOR static Future testDisableClientLib(ClientLibManagementWorkload* self, Database cx) { - state std::string destFileName = format("clientLibDownload%d", self->clientId); - state Future clientLibChanged = cx->onClientLibStatusChanged(); - - // Set disabled status on the uploaded library - wait(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::DISABLED)); - state ClientLibStatus newStatus = wait(getClientLibraryStatus(cx, self->uploadedClientLibId)); - if (newStatus != ClientLibStatus::DISABLED) { - TraceEvent(SevError, "ClientLibDisableClientLibFailed") - .detail("Reason", "Unexpected status") - .detail("Expected", ClientLibStatus::DISABLED) - .detail("Actual", newStatus); - self->success = false; - } - - // Clients should be notified about an active library being disabled - Optional notificationWait = wait(timeout(clientLibChanged, 100.0)); - if (!notificationWait.present()) { - TraceEvent(SevError, "ClientLibChangeNotificationFailed").log(); - self->success = false; - } - - // It should not be possible to download a disabled client library - wait(testExpectedError(downloadClientLibrary(cx, self->uploadedClientLibId, StringRef(destFileName)), - "Downloading disabled client library", - client_lib_not_available(), - &self->success)); - - return Void(); - } - - ACTOR static Future testChangeStateToDownload(ClientLibManagementWorkload* self, Database cx) { - state std::string destFileName = format("clientLibDownload%d", self->clientId); - state Future clientLibChanged = cx->onClientLibStatusChanged(); - - // Set disabled status on the uploaded library - wait(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::DOWNLOAD)); - state ClientLibStatus newStatus = wait(getClientLibraryStatus(cx, self->uploadedClientLibId)); - if (newStatus != ClientLibStatus::DOWNLOAD) { - TraceEvent(SevError, "ClientLibChangeStatusFailed") - .detail("Reason", "Unexpected status") - .detail("Expected", ClientLibStatus::DOWNLOAD) - .detail("Actual", newStatus); - self->success = false; - } - - Optional notificationWait = wait(timeout(clientLibChanged, 100.0)); - if (!notificationWait.present()) { - TraceEvent(SevError, "ClientLibChangeNotificationFailed").log(); - self->success = false; - } - - return Void(); - } - - /* ---------------------------------------------------------------- - * Utility methods - */ - - Reference allocateBuffer(size_t size) { return makeReference(size, false); } - - static std::string randomHexadecimalStr(int length) { - std::string s; - s.reserve(length); - for (int i = 0; i < length; i++) { - uint32_t hexDigit = static_cast(deterministicRandom()->randomUInt32() % 16); - char ch = (hexDigit >= 10 ? hexDigit - 10 + 'a' : hexDigit + '0'); - s += ch; - } - return s; - } - - static void validClientLibMetadataSample(json_spirit::mObject& metadataJson) { - metadataJson.clear(); - metadataJson[CLIENTLIB_ATTR_PLATFORM] = getPlatformName(ClientLibPlatform::X86_64_LINUX); - metadataJson[CLIENTLIB_ATTR_VERSION] = "7.1.0"; - metadataJson[CLIENTLIB_ATTR_GIT_HASH] = randomHexadecimalStr(40); - metadataJson[CLIENTLIB_ATTR_TYPE] = "debug"; - metadataJson[CLIENTLIB_ATTR_CHECKSUM] = randomHexadecimalStr(32); - metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::DOWNLOAD); - metadataJson[CLIENTLIB_ATTR_API_VERSION] = 710; - metadataJson[CLIENTLIB_ATTR_PROTOCOL] = "fdb00b07001001"; - metadataJson[CLIENTLIB_ATTR_CHECKSUM_ALG] = "md5"; - } - - void testErrorCode(const char* testDescr, - Error expectedError, - Error actualError, - std::map details = {}, - UID id = UID()) { - ASSERT(expectedError.isValid()); - ASSERT(actualError.isValid()); - if (expectedError.code() != actualError.code()) { - TraceEvent evt(SevError, "TestErrorCodeFailed", id); - evt.detail("TestDescription", testDescr); - evt.detail("ExpectedError", expectedError.code()); - evt.error(actualError); - for (auto& p : details) { - evt.detail(p.first.c_str(), p.second); - } - success = false; - } - } -}; - -WorkloadFactory ClientLibOperationsWorkloadFactory("ClientLibManagement");