making range assigner boundary changes fully synchronous to ensure no races with things reading assignments

This commit is contained in:
Josh Slocum 2022-03-14 14:39:00 -05:00
parent 98eeaac1b7
commit 8d88b7ca41
2 changed files with 60 additions and 37 deletions

View File

@ -7596,7 +7596,7 @@ Future<Void> ChangeFeedData::whenAtLeast(Version version) {
return changeFeedWhenAtLatest(Reference<ChangeFeedData>::addRef(this), version);
}
#define DEBUG_CF_CLIENT_TRACE true
#define DEBUG_CF_CLIENT_TRACE false
ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,

View File

@ -342,6 +342,8 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
}
bmData->restartRecruiting.trigger();
wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture());
// FIXME: may want to have some buffer here so zero-worker recruiting case doesn't assign every single pending
// range to the first worker recruited
}
int minGranulesAssigned = INT_MAX;
@ -374,8 +376,23 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
RangeAssignment assignment,
UID workerID,
Optional<UID> workerID,
int64_t seqNo) {
// WorkerId is set, except in case of assigning to any worker. Then we pick the worker to assign to in here
// inject delay into range assignments
if (BUGGIFY_WITH_PROB(0.05)) {
wait(delay(deterministicRandom()->random01()));
}
if (!workerID.present()) {
ASSERT(assignment.isAssign);
UID _workerId = wait(pickWorkerForAssign(bmData));
if (BM_DEBUG) {
fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch);
}
workerID = _workerId;
}
if (BM_DEBUG) {
fmt::print("BM {0} {1} range [{2} - {3}) @ ({4}, {5}) to {6}\n",
@ -385,7 +402,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
assignment.keyRange.end.printable(),
bmData->epoch,
seqNo,
workerID.toString());
workerID.get().toString());
}
try {
@ -401,10 +418,10 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
req.type = assignment.assign.get().type;
// if that worker isn't alive anymore, add the range back into the stream
if (bmData->workersById.count(workerID) == 0) {
if (bmData->workersById.count(workerID.get()) == 0) {
throw no_more_servers();
}
wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
wait(bmData->workersById[workerID.get()].assignBlobRangeRequest.getReply(req));
} else {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
@ -417,8 +434,8 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
req.dispose = assignment.revoke.get().dispose;
// if that worker isn't alive anymore, this is a noop
if (bmData->workersById.count(workerID)) {
wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
if (bmData->workersById.count(workerID.get())) {
wait(bmData->workersById[workerID.get()].revokeBlobRangeRequest.getReply(req));
} else {
return Void();
}
@ -443,7 +460,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
bmData->epoch,
assignment.keyRange.begin.printable(),
assignment.keyRange.end.printable(),
workerID.toString());
workerID.get().toString());
}
if (bmData->doLockCheck.canBeSet()) {
bmData->doLockCheck.send(Void());
@ -472,7 +489,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
e.name(),
assignment.keyRange.begin.printable(),
assignment.keyRange.end.printable(),
workerID.toString());
workerID.get().toString());
}
// re-send revoke to queue to handle range being un-assigned from that worker before the new one
@ -518,10 +535,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
loop {
// inject delay into range assignments
if (BUGGIFY_WITH_PROB(0.05)) {
wait(delay(deterministicRandom()->random01()));
}
state RangeAssignment assignment = waitNext(bmData->rangesToAssign.getFuture());
state int64_t seqNo = bmData->seqNo;
bmData->seqNo++;
@ -568,22 +582,22 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
bmData->id.toString());
}
workerId = assignment.worker.get();
bmData->workerAssignments.insert(assignment.keyRange, workerId);
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, workerId, seqNo));
} else {
UID _workerId = wait(pickWorkerForAssign(bmData));
if (BM_DEBUG) {
fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch);
}
workerId = _workerId;
// Ensure the key boundaries are updated before we pick a worker
bmData->workerAssignments.insert(assignment.keyRange, UID());
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, Optional<UID>(), seqNo));
}
bmData->workerAssignments.insert(assignment.keyRange, workerId);
// If we know about the worker and this is not a continue, then this is a new range for the worker
if (bmData->workerStats.count(workerId) && assignment.assign.get().type != AssignRequestType::Continue) {
bmData->workerStats[workerId].numGranulesAssigned += 1;
}
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, workerId, seqNo));
} else {
if (assignment.worker.present()) {
// revoke this specific range from this specific worker. Either part of recovery or failing a worker
@ -778,7 +792,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
ra.assign = RangeAssignmentData(); // type=normal
bmData->rangesToAssign.send(ra);
}
wait(bmData->rangesToAssign.onEmpty());
ASSERT(bmData->rangesToAssign.isEmpty());
}
lastChangeKeyValue =
@ -956,6 +970,23 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
existingState.size() + 2));
// +2 because this is boundaries and existingState was granules, and to ensure it doesn't set more
ASSERT(!existingBoundaries.more);
// TODO remove debugging printf
if (existingBoundaries.size() != existingState.size() + 1) {
printf("DBG: EB=%d, ES=%d for [%s - %s)\n",
existingBoundaries.size(),
existingState.size(),
granuleRange.begin.printable().c_str(),
granuleRange.end.printable().c_str());
printf("Boundaries:\n");
for (auto& it : existingBoundaries) {
printf(" %s\n", it.key.removePrefix(blobGranuleMappingKeys.begin).printable().c_str());
}
printf("State:\n");
for (auto& it : existingState) {
std::pair<UID, UID> k = decodeBlobGranuleSplitKey(it.key);
printf(" %s\n", k.second.toString().substr(0, 6).c_str());
}
}
ASSERT(existingBoundaries.size() == existingState.size() + 1);
newRanges.clear();
newRanges.arena().dependsOn(existingBoundaries.arena());
@ -1100,18 +1131,9 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
bmData->rangesToAssign.send(raAssignSplit);
}
if (BM_DEBUG) {
fmt::print(
"Splitting range [{0} - {1}) into {2} granules @ {3} sent assignments, waiting for them to be processed:\n",
granuleRange.begin.printable(),
granuleRange.end.printable(),
newRanges.size() - 1,
latestVersion);
}
// Ensure the new assignments actually got processed and the split boundaries are reflected in the granule mapping
// before returning. This prevents a race with a subsequent split evaluation
wait(bmData->rangesToAssign.onEmpty());
ASSERT(bmData->rangesToAssign.isEmpty());
if (BM_DEBUG) {
fmt::print("Splitting range [{0} - {1}) into {2} granules @ {3} got assignments processed\n",
@ -1221,6 +1243,8 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
bmData->rangesToAssign.send(raAssign);
}
ASSERT(bmData->rangesToAssign.isEmpty());
// Send halt to blob worker, with no expectation of hearing back
if (BM_DEBUG) {
fmt::print("Sending halt to BW {}\n", bwId.toString());
@ -1229,10 +1253,7 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
// wait for blob worker to be removed from DB and in-memory mapping to have reassigned all shards from this worker
// before removing it from deadWorkers, to avoid a race with checkBlobWorkerList
wait(deregister && bmData->rangesToAssign.onEmpty());
// delay(0) after onEmpty to yield back to the range assigner on the final pop to ensure it gets processed before
// deadWorkers.erase
wait(delay(0));
wait(deregister);
// restart recruiting to replace the dead blob worker
bmData->restartRecruiting.trigger();
@ -2686,9 +2707,12 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
self->epoch = epoch;
// start rangeAssigner first since other actors can send messages to it
self->addActor.send(rangeAssigner(self));
// although we start the recruiter, we wait until existing workers are ack'd
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });
self->addActor.send(blobWorkerRecruiter(self, recruitBlobWorker));
// we need to recover the old blob manager's state (e.g. granule assignments) before
@ -2697,7 +2721,6 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
self->addActor.send(doLockChecks(self));
self->addActor.send(monitorClientRanges(self));
self->addActor.send(rangeAssigner(self));
self->addActor.send(monitorPruneKeys(self));
if (BUGGIFY) {