From 2909614b0cceff403d68f74897acef45acd73b3d Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Mon, 18 Jul 2022 14:21:50 -0700 Subject: [PATCH 1/6] smaller function --- fdbserver/DataDistribution.actor.cpp | 28 ++++++++++++++++------- fdbserver/DataDistributionQueue.actor.cpp | 3 +-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e4682f44bd..900da635f5 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -311,7 +311,12 @@ struct DataDistributor : NonCopyable, ReferenceCounted { // fully-functional. DDTeamCollection* teamCollection; Reference shardsAffectedByTeamFailure; - PromiseStream relocationProducer, relocationConsumer; // comsumer is a yield stream from producer + // consumer is a yield stream from producer. The RelocateShard is pushed into relocationProducer and popped from + // relocationConsumer (by DDQueue) + PromiseStream relocationProducer, relocationConsumer; + std::vector tcis; // primary and remote region interface + Reference> anyZeroHealthyTeams; // true if primary or remote has zero healthy team + std::vector>> zeroHealthyTeams; // primary and remote DataDistributor(Reference const> const& db, UID id) : dbInfo(db), ddId(id), txnProcessor(nullptr), initialDDEventHolder(makeReference("InitialDD")), @@ -436,11 +441,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted { return Void(); } - // Resume inflight relocations from the previous DD - // TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards - ACTOR static Future resumeRelocations(Reference self) { - ASSERT(self->shardsAffectedByTeamFailure); // has to be allocated - + ACTOR static Future resumeFromInitShards(Reference self) { state int shard = 0; for (; shard < self->initData->shards.size() - 1; shard++) { const DDShardInfo& iShard = self->initData->shards[shard]; @@ -480,7 +481,10 @@ struct DataDistributor : NonCopyable, ReferenceCounted { wait(yield(TaskPriority::DataDistribution)); } + return Void(); + } + ACTOR static Future resumeFromInitDataMoveMap(Reference self) { state KeyRangeMap>::iterator it = self->initData->dataMoveMap.ranges().begin(); for (; it != self->initData->dataMoveMap.ranges().end(); ++it) { const DataMoveMetaData& meta = it.value()->meta; @@ -517,6 +521,14 @@ struct DataDistributor : NonCopyable, ReferenceCounted { } return Void(); } + + // Resume inflight relocations from the previous DD + // TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards + Future resumeRelocations() { + ASSERT(shardsAffectedByTeamFailure); // has to be allocated + return runAfter(resumeFromInitShards(Reference::addRef(this)), + resumeFromInitDataMoveMap(Reference::addRef(this))); + } }; // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection @@ -565,7 +577,7 @@ ACTOR Future dataDistribution(Reference self, state Reference> processingWiggle(new AsyncVar(false)); state Promise readyToStart; self->shardsAffectedByTeamFailure = makeReference(); - wait(DataDistributor::resumeRelocations(self)); + wait(self->resumeRelocations()); std::vector tcis; @@ -575,7 +587,7 @@ ACTOR Future dataDistribution(Reference self, zeroHealthyTeams.push_back(makeReference>(true)); int storageTeamSize = self->configuration.storageTeamSize; - std::vector> actors; + std::vector> actors; // the container of ACTORs if (self->configuration.usableRegions > 1) { tcis.push_back(TeamCollectionInterface()); storageTeamSize = 2 * self->configuration.storageTeamSize; diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 6553132bcb..e7e9baea4a 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -813,7 +813,6 @@ struct DDQueueData { } ACTOR static Future getSourceServersForRange(DDQueueData* self, - Database cx, RelocateData input, PromiseStream output, Reference fetchLock) { @@ -929,7 +928,7 @@ struct DDQueueData { fetchingSourcesQueue.insert(rrs); getSourceActors.insert( - rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock)); + rrs.keys, getSourceServersForRange(this, rrs, fetchSourceServersComplete, fetchSourceLock)); } else { RelocateData newData(rrs); newData.keys = affectedQueuedItems[r]; From 1c543e3b4a3d2883a33e5dbc92675a44beefa4b7 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 19 Jul 2022 11:58:24 -0700 Subject: [PATCH 2/6] add unittest --- fdbserver/DataDistribution.actor.cpp | 58 +++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 900da635f5..9dd6f83c25 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -41,6 +41,7 @@ #include "fdbserver/TenantCache.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WaitFailure.h" +#include "fdbserver/workloads/workloads.actor.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/BooleanParam.h" @@ -290,6 +291,7 @@ ACTOR Future pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab } struct DataDistributor : NonCopyable, ReferenceCounted { +public: Reference const> dbInfo; UID ddId; PromiseStream> addActor; @@ -314,9 +316,6 @@ struct DataDistributor : NonCopyable, ReferenceCounted { // consumer is a yield stream from producer. The RelocateShard is pushed into relocationProducer and popped from // relocationConsumer (by DDQueue) PromiseStream relocationProducer, relocationConsumer; - std::vector tcis; // primary and remote region interface - Reference> anyZeroHealthyTeams; // true if primary or remote has zero healthy team - std::vector>> zeroHealthyTeams; // primary and remote DataDistributor(Reference const> const& db, UID id) : dbInfo(db), ddId(id), txnProcessor(nullptr), initialDDEventHolder(makeReference("InitialDD")), @@ -579,10 +578,10 @@ ACTOR Future dataDistribution(Reference self, self->shardsAffectedByTeamFailure = makeReference(); wait(self->resumeRelocations()); - std::vector tcis; + std::vector tcis; // primary and remote region interface + Reference> anyZeroHealthyTeams; // true if primary or remote has zero healthy team + std::vector>> zeroHealthyTeams; // primary and remote - Reference> anyZeroHealthyTeams; - std::vector>> zeroHealthyTeams; tcis.push_back(TeamCollectionInterface()); zeroHealthyTeams.push_back(makeReference>(true)); int storageTeamSize = self->configuration.storageTeamSize; @@ -1391,6 +1390,16 @@ static Future> badTestFuture(double duration, Error e) { return tag(delay(duration), ErrorOr(e)); } +inline DDShardInfo doubleToNoLocationShardInfo(double d, bool hasDest) { + DDShardInfo res(doubleToTestKey(d), anonymousShardId, anonymousShardId); + res.primarySrc.emplace_back((uint64_t)d, 0); + if (hasDest) { + res.primaryDest.emplace_back((uint64_t)d + 1, 0); + res.hasDest = true; + } + return res; +} + } // namespace data_distribution_test TEST_CASE("/DataDistribution/WaitForMost") { @@ -1451,4 +1460,41 @@ TEST_CASE("/DataDistributor/StorageWiggler/Order") { } ASSERT(!wiggler.getNextServerId().present()); return Void(); +} + +TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { + state Reference const> dbInfo; + state Reference self(new DataDistributor(dbInfo, UID())); + + self->shardsAffectedByTeamFailure = makeReference(); + self->initData = makeReference(); + self->configuration.usableRegions = 1; + self->configuration.storageTeamSize = 1; + + // add DDShardInfo + int shardNum = 1000000; // 2000000000; OOM + std::cout << "generating " << shardNum << "shards...\n"; + for (int i = 1; i <= SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { + self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, true)); + } + for (int i = SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM + 1; i <= shardNum; ++i) { + self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, false)); + } + self->initData->shards.emplace_back(DDShardInfo(allKeys.end)); + std::cout << "Start resuming...\n"; + wait(DataDistributor::resumeFromInitShards(self)); + std::cout << "Start validation...\n"; + auto relocateFuture = self->relocationProducer.getFuture(); + for (int i = 0; i < SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { + ASSERT(relocateFuture.isReady()); + auto rs = relocateFuture.pop(); + ASSERT(rs.isRestore() == false); + ASSERT(rs.cancelled == false); + ASSERT(rs.dataMoveId == anonymousShardId); + ASSERT(rs.priority == SERVER_KNOBS->PRIORITY_RECOVER_MOVE); + // std::cout << rs.keys.begin.toString() << " " << self->initData->shards[i].key.toString() << " \n"; + ASSERT(rs.keys.begin.compare(self->initData->shards[i].key) == 0); + ASSERT(rs.keys.end == self->initData->shards[i + 1].key); + } + return Void(); } \ No newline at end of file From a5fba6031fd1570a09ad2503940f3f5164f3b2a6 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 19 Jul 2022 13:55:51 -0700 Subject: [PATCH 3/6] resolve conflict upstream/main --- fdbserver/DataDistribution.actor.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 45e97e6635..d5d1d2aad2 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1461,7 +1461,6 @@ TEST_CASE("/DataDistributor/StorageWiggler/Order") { ASSERT(!wiggler.getNextServerId().present()); return Void(); } -<<<<<<< HEAD TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { state Reference const> dbInfo; @@ -1498,6 +1497,4 @@ TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { ASSERT(rs.keys.end == self->initData->shards[i + 1].key); } return Void(); -} -======= ->>>>>>> dea775ad0d18c745b09cb81425471ec29f701b4a +} \ No newline at end of file From 41032500d0b90ee736d97295e4638f906d2260c3 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Wed, 20 Jul 2022 14:47:54 -0700 Subject: [PATCH 4/6] only do check after bootstraping test --- fdbserver/DataDistribution.actor.cpp | 18 +++++++++++------- fdbserver/DataDistributionTracker.actor.cpp | 8 +++++++- .../include/fdbserver/DataDistribution.actor.h | 4 ++++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index d5d1d2aad2..32c55426af 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -440,7 +440,7 @@ public: return Void(); } - ACTOR static Future resumeFromInitShards(Reference self) { + ACTOR static Future resumeFromInitShards(Reference self, bool traceShard) { state int shard = 0; for (; shard < self->initData->shards.size() - 1; shard++) { const DDShardInfo& iShard = self->initData->shards[shard]; @@ -452,8 +452,8 @@ public: if (self->configuration.usableRegions > 1) { teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false)); } - if (g_network->isSimulated()) { - TraceEvent("DDInitShard") + if (traceShard) { + TraceEvent(SevDebug, "DDInitShard") .detail("Keys", keys) .detail("PrimarySrc", describe(iShard.primarySrc)) .detail("RemoteSrc", describe(iShard.remoteSrc)) @@ -525,7 +525,7 @@ public: // TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards Future resumeRelocations() { ASSERT(shardsAffectedByTeamFailure); // has to be allocated - return runAfter(resumeFromInitShards(Reference::addRef(this)), + return runAfter(resumeFromInitShards(Reference::addRef(this), g_network->isSimulated()), resumeFromInitDataMoveMap(Reference::addRef(this))); } }; @@ -1472,8 +1472,10 @@ TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { self->configuration.storageTeamSize = 1; // add DDShardInfo - int shardNum = 1000000; // 2000000000; OOM - std::cout << "generating " << shardNum << "shards...\n"; + self->shardsAffectedByTeamFailure->setCheckMode( + ShardsAffectedByTeamFailure::CheckMode::ForceNoCheck); // skip check when build + int shardNum = deterministicRandom()->randomInt(1000, CLIENT_KNOBS->TOO_MANY * 5); // 2000000000; OOM + std::cout << "generating " << shardNum << " shards...\n"; for (int i = 1; i <= SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, true)); } @@ -1482,7 +1484,7 @@ TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { } self->initData->shards.emplace_back(DDShardInfo(allKeys.end)); std::cout << "Start resuming...\n"; - wait(DataDistributor::resumeFromInitShards(self)); + wait(DataDistributor::resumeFromInitShards(self, false)); std::cout << "Start validation...\n"; auto relocateFuture = self->relocationProducer.getFuture(); for (int i = 0; i < SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { @@ -1496,5 +1498,7 @@ TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { ASSERT(rs.keys.begin.compare(self->initData->shards[i].key) == 0); ASSERT(rs.keys.end == self->initData->shards[i + 1].key); } + self->shardsAffectedByTeamFailure->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::ForceCheck); + self->shardsAffectedByTeamFailure->check(); return Void(); } \ No newline at end of file diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 6b504a6f8a..f8d5fa2352 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -1191,8 +1191,14 @@ void ShardsAffectedByTeamFailure::finishMove(KeyRangeRef keys) { } } +void ShardsAffectedByTeamFailure::setCheckMode(CheckMode mode) { + checkMode = mode; +} + void ShardsAffectedByTeamFailure::check() const { - if (EXPENSIVE_VALIDATION) { + if (checkMode == CheckMode::ForceNoCheck) + return; + if (EXPENSIVE_VALIDATION || checkMode == CheckMode::ForceCheck) { for (auto t = team_shards.begin(); t != team_shards.end(); ++t) { auto i = shard_teams.rangeContaining(t->second.begin); if (i->range() != t->second || !std::count(i->value().first.begin(), i->value().first.end(), t->first)) { diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 508b7a11ab..e67b3695e0 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -269,6 +269,7 @@ class ShardsAffectedByTeamFailure : public ReferenceCounted servers; // sorted bool primary; @@ -318,6 +319,8 @@ public: void finishMove(KeyRangeRef keys); void check() const; + void setCheckMode(CheckMode); + PromiseStream restartShardTracker; private: @@ -331,6 +334,7 @@ private: } }; + CheckMode checkMode = CheckMode::Normal; KeyRangeMap, std::vector>> shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge, or when // usable_regions > 1 From 7b8ea1d14f2f00266c6d0ceeb29c2ec0d9196cbd Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Thu, 21 Jul 2022 09:19:05 -0700 Subject: [PATCH 5/6] function naming using intention rather than implementation details --- fdbserver/DataDistribution.actor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 32c55426af..7d4e31ec70 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -440,7 +440,7 @@ public: return Void(); } - ACTOR static Future resumeFromInitShards(Reference self, bool traceShard) { + ACTOR static Future resumeFromShards(Reference self, bool traceShard) { state int shard = 0; for (; shard < self->initData->shards.size() - 1; shard++) { const DDShardInfo& iShard = self->initData->shards[shard]; @@ -483,7 +483,7 @@ public: return Void(); } - ACTOR static Future resumeFromInitDataMoveMap(Reference self) { + ACTOR static Future resumeFromDataMoves(Reference self) { state KeyRangeMap>::iterator it = self->initData->dataMoveMap.ranges().begin(); for (; it != self->initData->dataMoveMap.ranges().end(); ++it) { const DataMoveMetaData& meta = it.value()->meta; @@ -525,8 +525,8 @@ public: // TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards Future resumeRelocations() { ASSERT(shardsAffectedByTeamFailure); // has to be allocated - return runAfter(resumeFromInitShards(Reference::addRef(this), g_network->isSimulated()), - resumeFromInitDataMoveMap(Reference::addRef(this))); + return runAfter(resumeFromShards(Reference::addRef(this), g_network->isSimulated()), + resumeFromDataMoves(Reference::addRef(this))); } }; @@ -1484,7 +1484,7 @@ TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") { } self->initData->shards.emplace_back(DDShardInfo(allKeys.end)); std::cout << "Start resuming...\n"; - wait(DataDistributor::resumeFromInitShards(self, false)); + wait(DataDistributor::resumeFromShards(self, false)); std::cout << "Start validation...\n"; auto relocateFuture = self->relocationProducer.getFuture(); for (int i = 0; i < SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { From 01569963da0e45f9e810e59a9157c27ff1a0dc60 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Thu, 21 Jul 2022 09:26:20 -0700 Subject: [PATCH 6/6] update comments --- fdbserver/DataDistribution.actor.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 7d4e31ec70..0dd21f8dca 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -483,6 +483,7 @@ public: return Void(); } + // TODO: unit test needed ACTOR static Future resumeFromDataMoves(Reference self) { state KeyRangeMap>::iterator it = self->initData->dataMoveMap.ranges().begin(); for (; it != self->initData->dataMoveMap.ranges().end(); ++it) { @@ -522,7 +523,9 @@ public: } // Resume inflight relocations from the previous DD - // TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards + // TODO: The initialDataDistribution is unused once resumeRelocations and + // DataDistributionTracker::trackInitialShards are done. In the future, we can release the object to save memory + // usage if it turns out to be a problem. Future resumeRelocations() { ASSERT(shardsAffectedByTeamFailure); // has to be allocated return runAfter(resumeFromShards(Reference::addRef(this), g_network->isSimulated()),