Merge pull request #7621 from sfc-gh-xwang/feature/dd-refactor-incremental

[DD Testability] Add unittest for resuming infligh relocations
This commit is contained in:
Xiaoxi Wang 2022-07-23 22:14:45 -07:00 committed by GitHub
commit 7b6f1ca712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 16 deletions

View File

@ -41,6 +41,7 @@
#include "fdbserver/TenantCache.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/BooleanParam.h"
@ -290,6 +291,7 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
}
struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
public:
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
UID ddId;
PromiseStream<Future<Void>> addActor;
@ -311,7 +313,9 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
// fully-functional.
DDTeamCollection* teamCollection;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
PromiseStream<RelocateShard> relocationProducer, relocationConsumer; // comsumer is a yield stream from producer
// consumer is a yield stream from producer. The RelocateShard is pushed into relocationProducer and popped from
// relocationConsumer (by DDQueue)
PromiseStream<RelocateShard> relocationProducer, relocationConsumer;
DataDistributor(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), txnProcessor(nullptr), initialDDEventHolder(makeReference<EventCacheHolder>("InitialDD")),
@ -436,11 +440,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
return Void();
}
// Resume inflight relocations from the previous DD
// TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards
ACTOR static Future<Void> resumeRelocations(Reference<DataDistributor> self) {
ASSERT(self->shardsAffectedByTeamFailure); // has to be allocated
ACTOR static Future<Void> resumeFromShards(Reference<DataDistributor> self, bool traceShard) {
state int shard = 0;
for (; shard < self->initData->shards.size() - 1; shard++) {
const DDShardInfo& iShard = self->initData->shards[shard];
@ -452,8 +452,8 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
if (self->configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
}
if (g_network->isSimulated()) {
TraceEvent("DDInitShard")
if (traceShard) {
TraceEvent(SevDebug, "DDInitShard")
.detail("Keys", keys)
.detail("PrimarySrc", describe(iShard.primarySrc))
.detail("RemoteSrc", describe(iShard.remoteSrc))
@ -480,7 +480,11 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
wait(yield(TaskPriority::DataDistribution));
}
return Void();
}
// TODO: unit test needed
ACTOR static Future<Void> resumeFromDataMoves(Reference<DataDistributor> self) {
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = self->initData->dataMoveMap.ranges().begin();
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
@ -517,6 +521,16 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
}
return Void();
}
// Resume inflight relocations from the previous DD
// TODO: The initialDataDistribution is unused once resumeRelocations and
// DataDistributionTracker::trackInitialShards are done. In the future, we can release the object to save memory
// usage if it turns out to be a problem.
Future<Void> resumeRelocations() {
ASSERT(shardsAffectedByTeamFailure); // has to be allocated
return runAfter(resumeFromShards(Reference<DataDistributor>::addRef(this), g_network->isSimulated()),
resumeFromDataMoves(Reference<DataDistributor>::addRef(this)));
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -565,17 +579,17 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
wait(DataDistributor::resumeRelocations(self));
wait(self->resumeRelocations());
std::vector<TeamCollectionInterface> tcis;
std::vector<TeamCollectionInterface> tcis; // primary and remote region interface
Reference<AsyncVar<bool>> anyZeroHealthyTeams; // true if primary or remote has zero healthy team
std::vector<Reference<AsyncVar<bool>>> zeroHealthyTeams; // primary and remote
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
std::vector<Reference<AsyncVar<bool>>> zeroHealthyTeams;
tcis.push_back(TeamCollectionInterface());
zeroHealthyTeams.push_back(makeReference<AsyncVar<bool>>(true));
int storageTeamSize = self->configuration.storageTeamSize;
std::vector<Future<Void>> actors;
std::vector<Future<Void>> actors; // the container of ACTORs
if (self->configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
storageTeamSize = 2 * self->configuration.storageTeamSize;
@ -1379,6 +1393,16 @@ static Future<ErrorOr<Void>> badTestFuture(double duration, Error e) {
return tag(delay(duration), ErrorOr<Void>(e));
}
inline DDShardInfo doubleToNoLocationShardInfo(double d, bool hasDest) {
DDShardInfo res(doubleToTestKey(d), anonymousShardId, anonymousShardId);
res.primarySrc.emplace_back((uint64_t)d, 0);
if (hasDest) {
res.primaryDest.emplace_back((uint64_t)d + 1, 0);
res.hasDest = true;
}
return res;
}
} // namespace data_distribution_test
TEST_CASE("/DataDistribution/WaitForMost") {
@ -1440,3 +1464,44 @@ TEST_CASE("/DataDistributor/StorageWiggler/Order") {
ASSERT(!wiggler.getNextServerId().present());
return Void();
}
TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") {
state Reference<AsyncVar<ServerDBInfo> const> dbInfo;
state Reference<DataDistributor> self(new DataDistributor(dbInfo, UID()));
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
self->initData = makeReference<InitialDataDistribution>();
self->configuration.usableRegions = 1;
self->configuration.storageTeamSize = 1;
// add DDShardInfo
self->shardsAffectedByTeamFailure->setCheckMode(
ShardsAffectedByTeamFailure::CheckMode::ForceNoCheck); // skip check when build
int shardNum = deterministicRandom()->randomInt(1000, CLIENT_KNOBS->TOO_MANY * 5); // 2000000000; OOM
std::cout << "generating " << shardNum << " shards...\n";
for (int i = 1; i <= SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) {
self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, true));
}
for (int i = SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM + 1; i <= shardNum; ++i) {
self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, false));
}
self->initData->shards.emplace_back(DDShardInfo(allKeys.end));
std::cout << "Start resuming...\n";
wait(DataDistributor::resumeFromShards(self, false));
std::cout << "Start validation...\n";
auto relocateFuture = self->relocationProducer.getFuture();
for (int i = 0; i < SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) {
ASSERT(relocateFuture.isReady());
auto rs = relocateFuture.pop();
ASSERT(rs.isRestore() == false);
ASSERT(rs.cancelled == false);
ASSERT(rs.dataMoveId == anonymousShardId);
ASSERT(rs.priority == SERVER_KNOBS->PRIORITY_RECOVER_MOVE);
// std::cout << rs.keys.begin.toString() << " " << self->initData->shards[i].key.toString() << " \n";
ASSERT(rs.keys.begin.compare(self->initData->shards[i].key) == 0);
ASSERT(rs.keys.end == self->initData->shards[i + 1].key);
}
self->shardsAffectedByTeamFailure->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::ForceCheck);
self->shardsAffectedByTeamFailure->check();
return Void();
}

View File

@ -813,7 +813,6 @@ struct DDQueueData {
}
ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
Database cx,
RelocateData input,
PromiseStream<RelocateData> output,
Reference<FlowLock> fetchLock) {
@ -929,7 +928,7 @@ struct DDQueueData {
fetchingSourcesQueue.insert(rrs);
getSourceActors.insert(
rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock));
rrs.keys, getSourceServersForRange(this, rrs, fetchSourceServersComplete, fetchSourceLock));
} else {
RelocateData newData(rrs);
newData.keys = affectedQueuedItems[r];

View File

@ -1189,8 +1189,14 @@ void ShardsAffectedByTeamFailure::finishMove(KeyRangeRef keys) {
}
}
void ShardsAffectedByTeamFailure::setCheckMode(CheckMode mode) {
checkMode = mode;
}
void ShardsAffectedByTeamFailure::check() const {
if (EXPENSIVE_VALIDATION) {
if (checkMode == CheckMode::ForceNoCheck)
return;
if (EXPENSIVE_VALIDATION || checkMode == CheckMode::ForceCheck) {
for (auto t = team_shards.begin(); t != team_shards.end(); ++t) {
auto i = shard_teams.rangeContaining(t->second.begin);
if (i->range() != t->second || !std::count(i->value().first.begin(), i->value().first.end(), t->first)) {

View File

@ -286,6 +286,7 @@ class ShardsAffectedByTeamFailure : public ReferenceCounted<ShardsAffectedByTeam
public:
ShardsAffectedByTeamFailure() {}
enum class CheckMode { Normal = 0, ForceCheck, ForceNoCheck };
struct Team {
std::vector<UID> servers; // sorted
bool primary;
@ -335,6 +336,8 @@ public:
void finishMove(KeyRangeRef keys);
void check() const;
void setCheckMode(CheckMode);
PromiseStream<KeyRange> restartShardTracker;
private:
@ -348,6 +351,7 @@ private:
}
};
CheckMode checkMode = CheckMode::Normal;
KeyRangeMap<std::pair<std::vector<Team>, std::vector<Team>>>
shard_teams; // A shard can be affected by the failure of multiple teams if it is a queued merge, or when
// usable_regions > 1