smaller function

This commit is contained in:
Xiaoxi Wang 2022-07-18 14:21:50 -07:00
parent 02ab3b05ab
commit 2909614b0c
2 changed files with 21 additions and 10 deletions

View File

@ -311,7 +311,12 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
// fully-functional.
DDTeamCollection* teamCollection;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
PromiseStream<RelocateShard> relocationProducer, relocationConsumer; // comsumer is a yield stream from producer
// consumer is a yield stream from producer. The RelocateShard is pushed into relocationProducer and popped from
// relocationConsumer (by DDQueue)
PromiseStream<RelocateShard> relocationProducer, relocationConsumer;
std::vector<TeamCollectionInterface> tcis; // primary and remote region interface
Reference<AsyncVar<bool>> anyZeroHealthyTeams; // true if primary or remote has zero healthy team
std::vector<Reference<AsyncVar<bool>>> zeroHealthyTeams; // primary and remote
DataDistributor(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), txnProcessor(nullptr), initialDDEventHolder(makeReference<EventCacheHolder>("InitialDD")),
@ -436,11 +441,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
return Void();
}
// Resume inflight relocations from the previous DD
// TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards
ACTOR static Future<Void> resumeRelocations(Reference<DataDistributor> self) {
ASSERT(self->shardsAffectedByTeamFailure); // has to be allocated
ACTOR static Future<Void> resumeFromInitShards(Reference<DataDistributor> self) {
state int shard = 0;
for (; shard < self->initData->shards.size() - 1; shard++) {
const DDShardInfo& iShard = self->initData->shards[shard];
@ -480,7 +481,10 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
wait(yield(TaskPriority::DataDistribution));
}
return Void();
}
ACTOR static Future<Void> resumeFromInitDataMoveMap(Reference<DataDistributor> self) {
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = self->initData->dataMoveMap.ranges().begin();
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
@ -517,6 +521,14 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
}
return Void();
}
// Resume inflight relocations from the previous DD
// TODO: add a test to verify the inflight relocation correctness and measure the memory usage with 4 million shards
Future<Void> resumeRelocations() {
ASSERT(shardsAffectedByTeamFailure); // has to be allocated
return runAfter(resumeFromInitShards(Reference<DataDistributor>::addRef(this)),
resumeFromInitDataMoveMap(Reference<DataDistributor>::addRef(this)));
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -565,7 +577,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
wait(DataDistributor::resumeRelocations(self));
wait(self->resumeRelocations());
std::vector<TeamCollectionInterface> tcis;
@ -575,7 +587,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
zeroHealthyTeams.push_back(makeReference<AsyncVar<bool>>(true));
int storageTeamSize = self->configuration.storageTeamSize;
std::vector<Future<Void>> actors;
std::vector<Future<Void>> actors; // the container of ACTORs
if (self->configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
storageTeamSize = 2 * self->configuration.storageTeamSize;

View File

@ -813,7 +813,6 @@ struct DDQueueData {
}
ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
Database cx,
RelocateData input,
PromiseStream<RelocateData> output,
Reference<FlowLock> fetchLock) {
@ -929,7 +928,7 @@ struct DDQueueData {
fetchingSourcesQueue.insert(rrs);
getSourceActors.insert(
rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock));
rrs.keys, getSourceServersForRange(this, rrs, fetchSourceServersComplete, fetchSourceLock));
} else {
RelocateData newData(rrs);
newData.keys = affectedQueuedItems[r];