From 8d88b7ca41c50ead42f9333b30c5050f971b1d4b Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 14 Mar 2022 14:39:00 -0500 Subject: [PATCH] making range assigner boundary changes fully synchronous to ensure no races with things reading assignments --- fdbclient/NativeAPI.actor.cpp | 2 +- fdbserver/BlobManager.actor.cpp | 95 ++++++++++++++++++++------------- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e82c767dcd..07d983cd64 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7596,7 +7596,7 @@ Future ChangeFeedData::whenAtLeast(Version version) { return changeFeedWhenAtLatest(Reference::addRef(this), version); } -#define DEBUG_CF_CLIENT_TRACE true +#define DEBUG_CF_CLIENT_TRACE false ACTOR Future partialChangeFeedStream(StorageServerInterface interf, PromiseStream> results, diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 9a2e1cdb69..712e069dee 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -342,6 +342,8 @@ ACTOR Future pickWorkerForAssign(Reference bmData) { } bmData->restartRecruiting.trigger(); wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture()); + // FIXME: may want to have some buffer here so zero-worker recruiting case doesn't assign every single pending + // range to the first worker recruited } int minGranulesAssigned = INT_MAX; @@ -374,8 +376,23 @@ ACTOR Future pickWorkerForAssign(Reference bmData) { ACTOR Future doRangeAssignment(Reference bmData, RangeAssignment assignment, - UID workerID, + Optional workerID, int64_t seqNo) { + // WorkerId is set, except in case of assigning to any worker. Then we pick the worker to assign to in here + + // inject delay into range assignments + if (BUGGIFY_WITH_PROB(0.05)) { + wait(delay(deterministicRandom()->random01())); + } + + if (!workerID.present()) { + ASSERT(assignment.isAssign); + UID _workerId = wait(pickWorkerForAssign(bmData)); + if (BM_DEBUG) { + fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch); + } + workerID = _workerId; + } if (BM_DEBUG) { fmt::print("BM {0} {1} range [{2} - {3}) @ ({4}, {5}) to {6}\n", @@ -385,7 +402,7 @@ ACTOR Future doRangeAssignment(Reference bmData, assignment.keyRange.end.printable(), bmData->epoch, seqNo, - workerID.toString()); + workerID.get().toString()); } try { @@ -401,10 +418,10 @@ ACTOR Future doRangeAssignment(Reference bmData, req.type = assignment.assign.get().type; // if that worker isn't alive anymore, add the range back into the stream - if (bmData->workersById.count(workerID) == 0) { + if (bmData->workersById.count(workerID.get()) == 0) { throw no_more_servers(); } - wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req)); + wait(bmData->workersById[workerID.get()].assignBlobRangeRequest.getReply(req)); } else { ASSERT(!assignment.assign.present()); ASSERT(assignment.revoke.present()); @@ -417,8 +434,8 @@ ACTOR Future doRangeAssignment(Reference bmData, req.dispose = assignment.revoke.get().dispose; // if that worker isn't alive anymore, this is a noop - if (bmData->workersById.count(workerID)) { - wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); + if (bmData->workersById.count(workerID.get())) { + wait(bmData->workersById[workerID.get()].revokeBlobRangeRequest.getReply(req)); } else { return Void(); } @@ -443,7 +460,7 @@ ACTOR Future doRangeAssignment(Reference bmData, bmData->epoch, assignment.keyRange.begin.printable(), assignment.keyRange.end.printable(), - workerID.toString()); + workerID.get().toString()); } if (bmData->doLockCheck.canBeSet()) { bmData->doLockCheck.send(Void()); @@ -472,7 +489,7 @@ ACTOR Future doRangeAssignment(Reference bmData, e.name(), assignment.keyRange.begin.printable(), assignment.keyRange.end.printable(), - workerID.toString()); + workerID.get().toString()); } // re-send revoke to queue to handle range being un-assigned from that worker before the new one @@ -518,10 +535,7 @@ ACTOR Future doRangeAssignment(Reference bmData, ACTOR Future rangeAssigner(Reference bmData) { loop { - // inject delay into range assignments - if (BUGGIFY_WITH_PROB(0.05)) { - wait(delay(deterministicRandom()->random01())); - } + state RangeAssignment assignment = waitNext(bmData->rangesToAssign.getFuture()); state int64_t seqNo = bmData->seqNo; bmData->seqNo++; @@ -568,22 +582,22 @@ ACTOR Future rangeAssigner(Reference bmData) { bmData->id.toString()); } workerId = assignment.worker.get(); + + bmData->workerAssignments.insert(assignment.keyRange, workerId); + bmData->assignsInProgress.insert(assignment.keyRange, + doRangeAssignment(bmData, assignment, workerId, seqNo)); } else { - UID _workerId = wait(pickWorkerForAssign(bmData)); - if (BM_DEBUG) { - fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch); - } - workerId = _workerId; + // Ensure the key boundaries are updated before we pick a worker + bmData->workerAssignments.insert(assignment.keyRange, UID()); + bmData->assignsInProgress.insert(assignment.keyRange, + doRangeAssignment(bmData, assignment, Optional(), seqNo)); } - bmData->workerAssignments.insert(assignment.keyRange, workerId); // If we know about the worker and this is not a continue, then this is a new range for the worker if (bmData->workerStats.count(workerId) && assignment.assign.get().type != AssignRequestType::Continue) { bmData->workerStats[workerId].numGranulesAssigned += 1; } - bmData->assignsInProgress.insert(assignment.keyRange, - doRangeAssignment(bmData, assignment, workerId, seqNo)); } else { if (assignment.worker.present()) { // revoke this specific range from this specific worker. Either part of recovery or failing a worker @@ -778,7 +792,7 @@ ACTOR Future monitorClientRanges(Reference bmData) { ra.assign = RangeAssignmentData(); // type=normal bmData->rangesToAssign.send(ra); } - wait(bmData->rangesToAssign.onEmpty()); + ASSERT(bmData->rangesToAssign.isEmpty()); } lastChangeKeyValue = @@ -956,6 +970,23 @@ ACTOR Future maybeSplitRange(Reference bmData, existingState.size() + 2)); // +2 because this is boundaries and existingState was granules, and to ensure it doesn't set more ASSERT(!existingBoundaries.more); + // TODO remove debugging printf + if (existingBoundaries.size() != existingState.size() + 1) { + printf("DBG: EB=%d, ES=%d for [%s - %s)\n", + existingBoundaries.size(), + existingState.size(), + granuleRange.begin.printable().c_str(), + granuleRange.end.printable().c_str()); + printf("Boundaries:\n"); + for (auto& it : existingBoundaries) { + printf(" %s\n", it.key.removePrefix(blobGranuleMappingKeys.begin).printable().c_str()); + } + printf("State:\n"); + for (auto& it : existingState) { + std::pair k = decodeBlobGranuleSplitKey(it.key); + printf(" %s\n", k.second.toString().substr(0, 6).c_str()); + } + } ASSERT(existingBoundaries.size() == existingState.size() + 1); newRanges.clear(); newRanges.arena().dependsOn(existingBoundaries.arena()); @@ -1100,18 +1131,9 @@ ACTOR Future maybeSplitRange(Reference bmData, bmData->rangesToAssign.send(raAssignSplit); } - if (BM_DEBUG) { - fmt::print( - "Splitting range [{0} - {1}) into {2} granules @ {3} sent assignments, waiting for them to be processed:\n", - granuleRange.begin.printable(), - granuleRange.end.printable(), - newRanges.size() - 1, - latestVersion); - } - // Ensure the new assignments actually got processed and the split boundaries are reflected in the granule mapping // before returning. This prevents a race with a subsequent split evaluation - wait(bmData->rangesToAssign.onEmpty()); + ASSERT(bmData->rangesToAssign.isEmpty()); if (BM_DEBUG) { fmt::print("Splitting range [{0} - {1}) into {2} granules @ {3} got assignments processed\n", @@ -1221,6 +1243,8 @@ ACTOR Future killBlobWorker(Reference bmData, BlobWorkerI bmData->rangesToAssign.send(raAssign); } + ASSERT(bmData->rangesToAssign.isEmpty()); + // Send halt to blob worker, with no expectation of hearing back if (BM_DEBUG) { fmt::print("Sending halt to BW {}\n", bwId.toString()); @@ -1229,10 +1253,7 @@ ACTOR Future killBlobWorker(Reference bmData, BlobWorkerI // wait for blob worker to be removed from DB and in-memory mapping to have reassigned all shards from this worker // before removing it from deadWorkers, to avoid a race with checkBlobWorkerList - wait(deregister && bmData->rangesToAssign.onEmpty()); - // delay(0) after onEmpty to yield back to the range assigner on the final pop to ensure it gets processed before - // deadWorkers.erase - wait(delay(0)); + wait(deregister); // restart recruiting to replace the dead blob worker bmData->restartRecruiting.trigger(); @@ -2686,9 +2707,12 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, self->epoch = epoch; + // start rangeAssigner first since other actors can send messages to it + self->addActor.send(rangeAssigner(self)); // although we start the recruiter, we wait until existing workers are ack'd auto recruitBlobWorker = IAsyncListener>::create( dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; }); + self->addActor.send(blobWorkerRecruiter(self, recruitBlobWorker)); // we need to recover the old blob manager's state (e.g. granule assignments) before @@ -2697,7 +2721,6 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, self->addActor.send(doLockChecks(self)); self->addActor.send(monitorClientRanges(self)); - self->addActor.send(rangeAssigner(self)); self->addActor.send(monitorPruneKeys(self)); if (BUGGIFY) {