added a disk to blob workers, and allowed blob workers to rejoin the blob manager after a reboot

This commit is contained in:
Evan Tschannen 2023-02-10 10:29:01 -08:00
parent 05a8a90830
commit 18bc099ca6
7 changed files with 707 additions and 366 deletions

View File

@ -8123,6 +8123,10 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
continue;
}
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
if (!self->trState->cx->blobWorker_interf.count(workerId)) {
throw connection_failed();
}
// prune first/last granules to requested range
if (keyRange.begin > granuleStartKey) {
granuleStartKey = keyRange.begin;

View File

@ -1043,6 +1043,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_WORKER_DO_REJECT_WHEN_FULL, true ); if ( randomize && BUGGIFY ) BLOB_WORKER_DO_REJECT_WHEN_FULL = false;
init( BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD, 0.9 );
init( BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY, 30.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY = deterministicRandom()->randomInt(0, 10) - 1;
init( BLOB_WORKER_STATEFUL, true ); //if ( randomize && BUGGIFY ) BLOB_WORKER_STATEFUL = false;
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );

View File

@ -1010,6 +1010,7 @@ public:
bool BLOB_WORKER_DO_REJECT_WHEN_FULL;
double BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD;
double BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY;
bool BLOB_WORKER_STATEFUL;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX;

View File

@ -283,7 +283,7 @@ struct BlobManagerStats {
explicit BlobManagerStats(UID id,
double interval,
int64_t epoch,
std::unordered_map<UID, BlobWorkerInterface>* workers,
std::unordered_map<UID, Reference<AsyncVar<BlobWorkerInterface>>>* workers,
std::unordered_map<Key, bool>* mergeHardBoundaries,
std::unordered_map<Key, BlobGranuleMergeBoundary>* mergeBoundaries)
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
@ -357,7 +357,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
Reference<BlobConnectionProvider> bstore;
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, Reference<AsyncVar<BlobWorkerInterface>>> workersById;
std::unordered_map<UID, BlobWorkerInfo> workerStats; // mapping between workerID -> workerStats
std::unordered_set<NetworkAddress> workerAddresses;
std::unordered_set<UID> deadWorkers;
@ -858,32 +858,39 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
try {
if (assignment.isAssign) {
ASSERT(assignment.assign.present());
ASSERT(!assignment.revoke.present());
loop {
ASSERT(assignment.assign.present());
ASSERT(!assignment.revoke.present());
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.type = assignment.assign.get().type;
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.type = assignment.assign.get().type;
// if that worker isn't alive anymore, add the range back into the stream
if (bmData->workersById.count(workerID.get()) == 0) {
throw no_more_servers();
}
state Future<Void> assignFuture = bmData->workersById[workerID.get()].assignBlobRangeRequest.getReply(req);
// if that worker isn't alive anymore, add the range back into the stream
if (bmData->workersById.count(workerID.get()) == 0) {
throw no_more_servers();
}
state Future<Void> assignFuture =
bmData->workersById[workerID.get()]->get().assignBlobRangeRequest.getReply(req);
state Future<Void> onChangeFuture = bmData->workersById[workerID.get()]->onChange();
if (BUGGIFY) {
// wait for request to actually send
wait(delay(0));
if (bmData->maybeInjectTargetedRestart()) {
throw blob_manager_replaced();
if (BUGGIFY) {
// wait for request to actually send
wait(delay(0));
if (bmData->maybeInjectTargetedRestart()) {
throw blob_manager_replaced();
}
}
choose {
when(wait(assignFuture)) { break; }
when(wait(onChangeFuture)) {}
}
}
wait(assignFuture);
if (assignment.previousFailure.present()) {
// previous assign failed and this one succeeded
--bmData->stats.blockedAssignments;
@ -891,21 +898,28 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
return Void();
} else {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
loop {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
// if that worker isn't alive anymore, this is a noop
if (bmData->workersById.count(workerID.get())) {
wait(bmData->workersById[workerID.get()].revokeBlobRangeRequest.getReply(req));
} else {
return Void();
// if that worker isn't alive anymore, this is a noop
if (bmData->workersById.count(workerID.get())) {
choose {
when(wait(bmData->workersById[workerID.get()]->get().revokeBlobRangeRequest.getReply(req))) {
break;
}
when(wait(bmData->workersById[workerID.get()]->onChange())) {}
}
} else {
return Void();
}
}
}
} catch (Error& e) {
@ -2799,6 +2813,9 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
TraceEvent("KillBlobWorker", bmData->id).detail("Epoch", bmData->epoch).detail("WorkerId", bwId);
// Remove blob worker from persisted list of blob workers
Future<Void> deregister = deregisterBlobWorker(bmData, bwInterf);
if (registered) {
bmData->deadWorkers.insert(bwId);
bmData->workerStats.erase(bwId);
@ -2806,9 +2823,6 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
bmData->workerAddresses.erase(bwInterf.stableAddress());
}
// Remove blob worker from persisted list of blob workers
Future<Void> deregister = deregisterBlobWorker(bmData, bwInterf);
// for every range owned by this blob worker, we want to
// - send a revoke request for that range
// - add the range back to the stream of ranges to be assigned
@ -2859,7 +2873,8 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
return Void();
}
ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf) {
ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData,
Reference<AsyncVar<BlobWorkerInterface>> bwInterf) {
// outer loop handles reconstructing stream if it got a retryable error
// do backoff, we can get a lot of retries in a row
@ -2871,229 +2886,240 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
loop {
try {
state ReplyPromiseStream<GranuleStatusReply> statusStream =
bwInterf.granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
bwInterf->get().granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
// read from stream until worker fails (should never get explicit end_of_stream)
loop {
GranuleStatusReply rep = waitNext(statusStream.getFuture());
choose {
when(GranuleStatusReply rep = waitNext(statusStream.getFuture())) {
if (BM_DEBUG) {
fmt::print("BM {0} got status of [{1} - {2}) @ ({3}, {4}) from BW {5}: {6} {7}\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
rep.continueEpoch,
rep.continueSeqno,
bwInterf.id().toString(),
rep.doSplit ? "split" : (rep.mergeCandidate ? "merge" : ""),
rep.mergeCandidate
? ""
: (rep.writeHotSplit ? "hot" : (rep.initialSplitTooBig ? "toobig" : "normal")));
}
ASSERT(rep.doSplit || rep.mergeCandidate);
// if we get a reply from the stream, reset backoff
backoff = SERVER_KNOBS->BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
if (rep.continueEpoch > bmData->epoch) {
if (BM_DEBUG) {
fmt::print("BM {0} heard from BW {1} that there is a new manager with higher epoch\n",
bmData->epoch,
bwInterf.id().toString());
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
throw blob_manager_replaced();
}
BoundaryEvaluation newEval(rep.continueEpoch,
rep.continueSeqno,
rep.doSplit ? BoundaryEvalType::SPLIT : BoundaryEvalType::MERGE,
rep.originalEpoch,
rep.originalSeqno);
bool ignore = false;
Optional<std::pair<KeyRange, BoundaryEvaluation>> existingInProgress;
auto lastBoundaryEvals = bmData->boundaryEvaluations.intersectingRanges(rep.granuleRange);
for (auto& lastBoundaryEval : lastBoundaryEvals) {
if (ignore) {
break;
}
if (rep.granuleRange.begin == lastBoundaryEval.begin() &&
rep.granuleRange.end == lastBoundaryEval.end() && newEval == lastBoundaryEval.cvalue()) {
if (BM_DEBUG) {
fmt::print("BM {0} received repeat status for the same granule [{1} - {2}) {3}, "
"ignoring.\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString());
}
ignore = true;
} else if (newEval < lastBoundaryEval.cvalue()) {
CODE_PROBE(true, "BM got out-of-date split request");
if (BM_DEBUG) {
fmt::print("BM {0} ignoring status from BW {1} for granule [{2} - {3}) {4} since it "
"already processed [{5} - {6}) {7}.\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
lastBoundaryEval.begin().printable(),
lastBoundaryEval.end().printable(),
lastBoundaryEval.cvalue().toString());
fmt::print(
"BM {0} got status of [{1} - {2}) @ ({3}, {4}) from BW {5}: {6} {7}\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
rep.continueEpoch,
rep.continueSeqno,
bwInterf->get().id().toString(),
rep.doSplit ? "split" : (rep.mergeCandidate ? "merge" : ""),
rep.mergeCandidate
? ""
: (rep.writeHotSplit ? "hot" : (rep.initialSplitTooBig ? "toobig" : "normal")));
}
// only revoke if original epoch + seqno is older, different assignment
if (newEval.isOlderThanOriginal(lastBoundaryEval.cvalue())) {
// revoke range from out-of-date worker, but bypass rangeAssigner and hack (epoch, seqno) to
// be (requesting epoch, requesting seqno + 1) to ensure no race with then reassigning the
// range to the worker at a later version
ASSERT(rep.doSplit || rep.mergeCandidate);
// if we get a reply from the stream, reset backoff
backoff = SERVER_KNOBS->BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
if (rep.continueEpoch > bmData->epoch) {
if (BM_DEBUG) {
fmt::print("BM {0} revoking from BW {1} granule [{2} - {3}) {4} with original ({5}, "
"{6}) since it already processed original ({7}, {8}).\n",
fmt::print("BM {0} heard from BW {1} that there is a new manager with higher epoch\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
newEval.originalEpoch,
newEval.originalSeqno,
lastBoundaryEval.cvalue().originalEpoch,
lastBoundaryEval.cvalue().originalSeqno);
bwInterf->get().id().toString());
}
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = bwInterf.id();
revokeOld.keyRange = rep.granuleRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->addActor.send(doRangeAssignment(
bmData, revokeOld, bwInterf.id(), rep.continueEpoch, rep.continueSeqno + 1));
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
throw blob_manager_replaced();
}
ignore = true;
} else if (lastBoundaryEval.cvalue().inProgress.isValid() &&
!lastBoundaryEval.cvalue().inProgress.isReady()) {
existingInProgress = std::pair(lastBoundaryEval.range(), lastBoundaryEval.value());
}
}
if (rep.doSplit && !ignore) {
ASSERT(!rep.mergeCandidate);
BoundaryEvaluation newEval(rep.continueEpoch,
rep.continueSeqno,
rep.doSplit ? BoundaryEvalType::SPLIT : BoundaryEvalType::MERGE,
rep.originalEpoch,
rep.originalSeqno);
bool clearMergeCandidate = !existingInProgress.present() ||
existingInProgress.get().second.type != BoundaryEvalType::MERGE;
bool ignore = false;
Optional<std::pair<KeyRange, BoundaryEvaluation>> existingInProgress;
auto lastBoundaryEvals = bmData->boundaryEvaluations.intersectingRanges(rep.granuleRange);
for (auto& lastBoundaryEval : lastBoundaryEvals) {
if (ignore) {
break;
}
if (rep.granuleRange.begin == lastBoundaryEval.begin() &&
rep.granuleRange.end == lastBoundaryEval.end() &&
newEval == lastBoundaryEval.cvalue()) {
if (BM_DEBUG) {
fmt::print("BM {0} received repeat status for the same granule [{1} - {2}) {3}, "
"ignoring.\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString());
}
ignore = true;
} else if (newEval < lastBoundaryEval.cvalue()) {
CODE_PROBE(true, "BM got out-of-date split request");
if (BM_DEBUG) {
fmt::print(
"BM {0} ignoring status from BW {1} for granule [{2} - {3}) {4} since it "
"already processed [{5} - {6}) {7}.\n",
bmData->epoch,
bwInterf->get().id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
lastBoundaryEval.begin().printable(),
lastBoundaryEval.end().printable(),
lastBoundaryEval.cvalue().toString());
}
// Check for split/merge race
Version inProgressMergeVersion = bmData->activeMergeVersion(rep.granuleRange);
// only revoke if original epoch + seqno is older, different assignment
if (newEval.isOlderThanOriginal(lastBoundaryEval.cvalue())) {
// revoke range from out-of-date worker, but bypass rangeAssigner and hack (epoch,
// seqno) to be (requesting epoch, requesting seqno + 1) to ensure no race with then
// reassigning the range to the worker at a later version
if (BM_DEBUG) {
fmt::print("BM {0} splt eval [{1} - {2}). existing={3}, inProgressMergeVersion={4}, "
"blockedVersion={5}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
existingInProgress.present() ? "T" : "F",
inProgressMergeVersion,
rep.blockedVersion);
}
if (BM_DEBUG) {
fmt::print(
"BM {0} revoking from BW {1} granule [{2} - {3}) {4} with original ({5}, "
"{6}) since it already processed original ({7}, {8}).\n",
bmData->epoch,
bwInterf->get().id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
newEval.originalEpoch,
newEval.originalSeqno,
lastBoundaryEval.cvalue().originalEpoch,
lastBoundaryEval.cvalue().originalSeqno);
}
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = bwInterf->get().id();
revokeOld.keyRange = rep.granuleRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->addActor.send(doRangeAssignment(bmData,
revokeOld,
bwInterf->get().id(),
rep.continueEpoch,
rep.continueSeqno + 1));
}
ignore = true;
} else if (lastBoundaryEval.cvalue().inProgress.isValid() &&
!lastBoundaryEval.cvalue().inProgress.isReady()) {
existingInProgress = std::pair(lastBoundaryEval.range(), lastBoundaryEval.value());
}
}
if (rep.doSplit && !ignore) {
ASSERT(!rep.mergeCandidate);
bool clearMergeCandidate = !existingInProgress.present() ||
existingInProgress.get().second.type != BoundaryEvalType::MERGE;
// Check for split/merge race
Version inProgressMergeVersion = bmData->activeMergeVersion(rep.granuleRange);
// If the in progress one is a merge, and the blockedVersion < the mergeVersion, this granule
// needs to continue to flush up to the merge version. If the merge intent is still not
// persisted, the version will be invalidVersion, so this should only happen after the merge
// intent is persisted and the merge version is fixed. This can happen if a merge candidate
// suddenly gets a burst of writes after a decision to merge is made
if (inProgressMergeVersion != invalidVersion) {
if (rep.blockedVersion < inProgressMergeVersion) {
CODE_PROBE(true, "merge blocking re-snapshot");
if (BM_DEBUG) {
fmt::print("BM {0} MERGE @ {1} blocking re-snapshot [{2} - {3}) @ {4}, "
"continuing snapshot\n",
fmt::print("BM {0} splt eval [{1} - {2}). existing={3}, inProgressMergeVersion={4}, "
"blockedVersion={5}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
existingInProgress.present() ? "T" : "F",
inProgressMergeVersion,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
rep.blockedVersion);
}
RangeAssignment raContinue;
raContinue.isAssign = true;
raContinue.worker = bwInterf.id();
raContinue.keyRange = rep.granuleRange;
raContinue.assign =
RangeAssignmentData(AssignRequestType::Continue); // continue assignment and re-snapshot
handleRangeAssign(bmData, raContinue);
}
clearMergeCandidate = false;
ignore = true;
} else if (existingInProgress.present()) {
// For example, one worker asked BM to split, then died, granule was moved, new worker asks
// to split on recovery. We need to ensure that they are semantically the same split. We
// will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("BM {0} got request for [{1} - {2}) {3}, but already in "
"progress from [{4} - {5}) {6}\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
existingInProgress.get().first.begin.printable(),
existingInProgress.get().first.end.printable(),
existingInProgress.get().second.toString());
}
// ignore the request, they will retry
ignore = true;
}
if (!ignore) {
if (BM_DEBUG) {
fmt::print("BM {0} evaluating [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
if (rep.initialSplitTooBig) {
ASSERT(rep.proposedSplitKey.present());
newEval.inProgress = reevaluateInitialSplit(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.originalEpoch,
rep.originalSeqno,
rep.proposedSplitKey.get());
} else {
newEval.inProgress = maybeSplitRange(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit,
rep.originalEpoch,
rep.originalSeqno);
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
}
// If the in progress one is a merge, and the blockedVersion < the mergeVersion, this
// granule needs to continue to flush up to the merge version. If the merge intent is still
// not persisted, the version will be invalidVersion, so this should only happen after the
// merge intent is persisted and the merge version is fixed. This can happen if a merge
// candidate suddenly gets a burst of writes after a decision to merge is made
if (inProgressMergeVersion != invalidVersion) {
if (rep.blockedVersion < inProgressMergeVersion) {
CODE_PROBE(true, "merge blocking re-snapshot");
if (BM_DEBUG) {
fmt::print("BM {0} MERGE @ {1} blocking re-snapshot [{2} - {3}) @ {4}, "
"continuing snapshot\n",
bmData->epoch,
inProgressMergeVersion,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
rep.blockedVersion);
}
RangeAssignment raContinue;
raContinue.isAssign = true;
raContinue.worker = bwInterf->get().id();
raContinue.keyRange = rep.granuleRange;
raContinue.assign = RangeAssignmentData(
AssignRequestType::Continue); // continue assignment and re-snapshot
handleRangeAssign(bmData, raContinue);
}
clearMergeCandidate = false;
ignore = true;
} else if (existingInProgress.present()) {
// For example, one worker asked BM to split, then died, granule was moved, new worker
// asks to split on recovery. We need to ensure that they are semantically the same
// split. We will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("BM {0} got request for [{1} - {2}) {3}, but already in "
"progress from [{4} - {5}) {6}\n",
bmData->epoch,
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
newEval.toString(),
existingInProgress.get().first.begin.printable(),
existingInProgress.get().first.end.printable(),
existingInProgress.get().second.toString());
}
// clear merge candidates for range, if not already merging
if (clearMergeCandidate) {
bmData->clearMergeCandidate(rep.granuleRange);
// ignore the request, they will retry
ignore = true;
}
if (!ignore) {
if (BM_DEBUG) {
fmt::print("BM {0} evaluating [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
if (rep.initialSplitTooBig) {
ASSERT(rep.proposedSplitKey.present());
newEval.inProgress = reevaluateInitialSplit(bmData,
bwInterf->get().id(),
rep.granuleRange,
rep.granuleID,
rep.originalEpoch,
rep.originalSeqno,
rep.proposedSplitKey.get());
} else {
newEval.inProgress = maybeSplitRange(bmData,
bwInterf->get().id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit,
rep.originalEpoch,
rep.originalSeqno);
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
}
// clear merge candidates for range, if not already merging
if (clearMergeCandidate) {
bmData->clearMergeCandidate(rep.granuleRange);
}
}
if (rep.mergeCandidate && !ignore) {
// mark granule as merge candidate
ASSERT(!rep.doSplit);
CODE_PROBE(true, "Granule merge candidate");
if (BM_DEBUG) {
fmt::print("Manager {0} merge candidate granule [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
bmData->setMergeCandidate(rep.granuleRange, rep.granuleID, rep.startVersion);
}
}
}
if (rep.mergeCandidate && !ignore) {
// mark granule as merge candidate
ASSERT(!rep.doSplit);
CODE_PROBE(true, "Granule merge candidate");
if (BM_DEBUG) {
fmt::print("Manager {0} merge candidate granule [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
bmData->setMergeCandidate(rep.granuleRange, rep.granuleID, rep.startVersion);
when(wait(bwInterf->onChange())) { throw connection_failed(); }
}
}
} catch (Error& e) {
@ -3140,21 +3166,32 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
}
}
ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf) {
ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData,
Reference<AsyncVar<BlobWorkerInterface>> bwInterf) {
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
try {
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
choose {
when(wait(waitFailure)) {
if (BM_DEBUG) {
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf.id().toString());
loop {
state Future<Void> waitFailure =
waitFailureClient(bwInterf->get().waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
choose {
when(wait(waitFailure)) {
if (BM_DEBUG) {
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf->get().id().toString());
}
choose {
when(wait(delay(30))) { // FIXME: knob
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf->get().id());
break;
}
when(wait(bwInterf->onChange())) {}
}
}
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
}
when(wait(monitorStatus)) {
// should only return when manager got replaced
ASSERT(!bmData->iAmReplaced.canBeSet());
when(wait(monitorStatus)) {
// should only return when manager got replaced
ASSERT(!bmData->iAmReplaced.canBeSet());
break;
}
when(wait(bwInterf->onChange())) {}
}
}
} catch (Error& e) {
@ -3164,14 +3201,17 @@ ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWork
}
if (BM_DEBUG) {
fmt::print(
"BM {0} got monitoring error {1} from BW {2}\n", bmData->epoch, e.name(), bwInterf.id().toString());
fmt::print("BM {0} got monitoring error {1} from BW {2}\n",
bmData->epoch,
e.name(),
bwInterf->get().id().toString());
}
// Expected errors here are: [broken_promise]
if (e.code() != error_code_broken_promise) {
if (BM_DEBUG) {
fmt::print("BM got unexpected error {0} monitoring BW {1}\n", e.name(), bwInterf.id().toString());
fmt::print(
"BM got unexpected error {0} monitoring BW {1}\n", e.name(), bwInterf->get().id().toString());
}
TraceEvent(SevError, "BlobManagerUnexpectedErrorMonitorBW", bmData->id)
.error(e)
@ -3186,10 +3226,10 @@ ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWork
}
// kill the blob worker
wait(killBlobWorker(bmData, bwInterf, true));
wait(killBlobWorker(bmData, bwInterf->get(), true));
if (BM_DEBUG) {
fmt::print("No longer monitoring BW {0}\n", bwInterf.id().toString());
fmt::print("No longer monitoring BW {0}\n", bwInterf->get().id().toString());
}
return Void();
}
@ -3206,14 +3246,21 @@ ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promis
bool foundAnyNew = false;
for (auto& worker : blobWorkers) {
if (!bmData->deadWorkers.count(worker.id())) {
if (!bmData->workerAddresses.count(worker.stableAddress()) &&
worker.locality.dcId() == bmData->dcId) {
if (bmData->workersById.count(worker.id())) {
if (bmData->workersById[worker.id()]->get().waitFailure.getEndpoint().token !=
worker.waitFailure.getEndpoint().token) {
bmData->workersById[worker.id()]->setUnconditional(worker);
}
} else if (!bmData->workerAddresses.count(worker.stableAddress()) &&
worker.locality.dcId() == bmData->dcId) {
bmData->workerAddresses.insert(worker.stableAddress());
bmData->workersById[worker.id()] = worker;
Reference<AsyncVar<BlobWorkerInterface>> interf =
makeReference<AsyncVar<BlobWorkerInterface>>(worker);
bmData->workersById[worker.id()] = interf;
bmData->workerStats[worker.id()] = BlobWorkerInfo();
bmData->addActor.send(monitorBlobWorker(bmData, worker));
bmData->addActor.send(monitorBlobWorker(bmData, interf));
foundAnyNew = true;
} else if (!bmData->workersById.count(worker.id())) {
} else {
bmData->addActor.send(killBlobWorker(bmData, worker, false));
}
}
@ -3529,13 +3576,29 @@ ACTOR Future<Void> updateEpoch(Reference<BlobManagerData> bmData, int64_t epoch)
}
}
ACTOR Future<Optional<GetGranuleAssignmentsReply>> granuleAssignment(Reference<BlobManagerData> bmData,
Reference<AsyncVar<BlobWorkerInterface>> interf) {
loop {
GetGranuleAssignmentsRequest req;
req.managerEpoch = bmData->epoch;
choose {
when(Optional<GetGranuleAssignmentsReply> rep =
wait(timeout(brokenPromiseToNever(interf->get().granuleAssignmentsRequest.getReply(req)),
SERVER_KNOBS->BLOB_WORKER_TIMEOUT))) {
return rep;
}
when(wait(interf->onChange())) {}
}
}
}
ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
state double recoveryStartTime = now();
state Promise<Void> workerListReady;
bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady));
wait(workerListReady.getFuture());
state std::vector<BlobWorkerInterface> startingWorkers;
state std::vector<Reference<AsyncVar<BlobWorkerInterface>>> startingWorkers;
for (auto& it : bmData->workersById) {
startingWorkers.push_back(it.second);
}
@ -3609,13 +3672,14 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
// At this point, bmData->workersById is a list of all alive blob workers, but could also include some dead BWs.
// The algorithm below works as follows:
//
// 1. We get the existing granule mappings. We do this by asking all active blob workers for their current granule
// assignments. This guarantees a consistent snapshot of the state of that worker's assignments: Any request it
// recieved and processed from the old manager before the granule assignment request will be included in the
// assignments, and any request it recieves from the old manager afterwards will be rejected with
// 1. We get the existing granule mappings. We do this by asking all active blob workers for their current
// granule
// assignments. This guarantees a consistent snapshot of the state of that worker's assignments: Any request
// it recieved and processed from the old manager before the granule assignment request will be included in
// the assignments, and any request it recieves from the old manager afterwards will be rejected with
// blob_manager_replaced. We then read from the database as the source of truth for the assignment. We will
// reconcile the set of ongoing splits to this mapping, and any ranges that are not already assigned to existing
// blob workers will be reassigned.
// reconcile the set of ongoing splits to this mapping, and any ranges that are not already assigned to
// existing blob workers will be reassigned.
//
// 2. For every range in our granuleAssignments, we send an assign request to the stream of requests,
// ultimately giving every range back to some worker (trying to mimic the state of the old BM).
@ -3643,10 +3707,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
state std::vector<Future<Optional<GetGranuleAssignmentsReply>>> aliveAssignments;
aliveAssignments.reserve(startingWorkers.size());
for (auto& it : startingWorkers) {
GetGranuleAssignmentsRequest req;
req.managerEpoch = bmData->epoch;
aliveAssignments.push_back(timeout(brokenPromiseToNever(it.granuleAssignmentsRequest.getReply(req)),
SERVER_KNOBS->BLOB_WORKER_TIMEOUT));
aliveAssignments.push_back(granuleAssignment(bmData, it));
}
state std::vector<std::pair<UID, KeyRange>> outOfDateAssignments;
@ -3655,7 +3716,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
for (; assignIdx < aliveAssignments.size(); assignIdx++) {
Optional<GetGranuleAssignmentsReply> reply = wait(aliveAssignments[assignIdx]);
UID workerId = startingWorkers[assignIdx].id();
UID workerId = startingWorkers[assignIdx]->get().id();
if (reply.present()) {
if (BM_DEBUG) {
@ -3700,9 +3761,9 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
// DB is the source of truth, so read from here, and resolve any conflicts with current worker mapping
// We don't have a consistent snapshot of the mapping ACROSS blob workers, so we need the DB to reconcile any
// differences (eg blob manager revoked from worker A, assigned to B, the revoke from A was processed but the assign
// to B wasn't, meaning in the snapshot nobody owns the granule). This also handles races with a BM persisting a
// boundary change, then dying before notifying the workers
// differences (eg blob manager revoked from worker A, assigned to B, the revoke from A was processed but the
// assign to B wasn't, meaning in the snapshot nobody owns the granule). This also handles races with a BM
// persisting a boundary change, then dying before notifying the workers
state Key beginKey = blobGranuleMappingKeys.begin;
loop {
try {
@ -3835,8 +3896,8 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
workerId == UID() || epoch == 0 ? " (?)" : workerId.toString().substr(0, 5).c_str());
}
// if worker id is already set to a known worker that replied with it in the mapping, range is already assigned
// there. If not, need to explicitly assign it to someone
// if worker id is already set to a known worker that replied with it in the mapping, range is already
// assigned there. If not, need to explicitly assign it to someone
if (workerId == UID() || epoch == 0 || !endingWorkers.count(workerId)) {
// prevent racing status updates from old owner from causing issues until this request gets sent out
// properly
@ -3947,7 +4008,7 @@ ACTOR Future<Void> chaosRangeMover(Reference<BlobManagerData> bmData) {
int numExistingBWOnAddr(Reference<BlobManagerData> self, const AddressExclusion& addr) {
int numExistingBW = 0;
for (auto& server : self->workersById) {
const NetworkAddress& netAddr = server.second.stableAddress();
const NetworkAddress& netAddr = server.second->get().stableAddress();
AddressExclusion usedAddr(netAddr.ip, netAddr.port);
if (usedAddr == addr) {
++numExistingBW;
@ -3971,6 +4032,8 @@ ACTOR Future<Void> initializeBlobWorker(Reference<BlobManagerData> self, Recruit
state InitializeBlobWorkerRequest initReq;
initReq.reqId = deterministicRandom()->randomUniqueID();
initReq.interfaceId = interfaceId;
initReq.storeType =
SERVER_KNOBS->BLOB_WORKER_STATEFUL ? KeyValueStoreType::SSD_BTREE_V2 : KeyValueStoreType::END;
// acknowledge that this worker is currently being recruited on
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());
@ -4012,12 +4075,18 @@ ACTOR Future<Void> initializeBlobWorker(Reference<BlobManagerData> self, Recruit
BlobWorkerInterface bwi = newBlobWorker.get().interf;
if (!self->deadWorkers.count(bwi.id())) {
if (!self->workerAddresses.count(bwi.stableAddress()) && bwi.locality.dcId() == self->dcId) {
if (self->workersById.count(bwi.id())) {
if (self->workersById[bwi.id()]->get().waitFailure.getEndpoint().token !=
bwi.waitFailure.getEndpoint().token) {
self->workersById[bwi.id()]->setUnconditional(bwi);
}
} else if (!self->workerAddresses.count(bwi.stableAddress()) && bwi.locality.dcId() == self->dcId) {
self->workerAddresses.insert(bwi.stableAddress());
self->workersById[bwi.id()] = bwi;
Reference<AsyncVar<BlobWorkerInterface>> interf = makeReference<AsyncVar<BlobWorkerInterface>>(bwi);
self->workersById[bwi.id()] = interf;
self->workerStats[bwi.id()] = BlobWorkerInfo();
self->addActor.send(monitorBlobWorker(self, bwi));
} else if (!self->workersById.count(bwi.id())) {
self->addActor.send(monitorBlobWorker(self, interf));
} else {
self->addActor.send(killBlobWorker(self, bwi, false));
}
}
@ -4063,7 +4132,7 @@ ACTOR Future<Void> blobWorkerRecruiter(
// workers that are used by existing blob workers should be excluded
for (auto const& [bwId, bwInterf] : self->workersById) {
auto addr = bwInterf.stableAddress();
auto addr = bwInterf->get().stableAddress();
AddressExclusion addrExcl(addr.ip, addr.port);
recruitReq.excludeAddresses.emplace_back(addrExcl);
}
@ -4192,9 +4261,9 @@ ACTOR Future<bool> canDeleteFullGranuleSplit(Reference<BlobManagerData> self, UI
}
ASSERT(st == BlobGranuleSplitState::Assigned);
// if assigned, granule may or may not have snapshotted. Check files to confirm. Since a re-snapshot is
// the first file written for a new granule, any files present mean it has re-snapshotted from this
// granule
// if assigned, granule may or may not have snapshotted. Check files to confirm. Since a re-snapshot
// is the first file written for a new granule, any files present mean it has re-snapshotted from
// this granule
KeyRange granuleFileRange = blobGranuleFileKeyRangeFor(child);
RangeResult files = wait(tr.getRange(granuleFileRange, 1));
if (files.empty()) {
@ -4321,9 +4390,9 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
// delete the granule, since we need to keep the last snapshot and deltas for splitting
// Or, if the granule isn't finalized (still needs the history entry for the old change feed id, because all data
// from the old change feed hasn't yet been persisted in blob), we can delete the files but need to keep the granule
// history entry.
// Or, if the granule isn't finalized (still needs the history entry for the old change feed id, because all
// data from the old change feed hasn't yet been persisted in blob), we can delete the files but need to keep
// the granule history entry.
state bool canDeleteHistoryKey;
if (force) {
canDeleteHistoryKey = true;
@ -4601,11 +4670,11 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// set force purged range, but don't clear mapping range yet, so that if a new BM recovers in the middle
// of purging, it still knows what granules to purge
// set force purged range, but don't clear mapping range yet, so that if a new BM recovers in the
// middle of purging, it still knows what granules to purge
wait(checkManagerLock(&tr, self));
// FIXME: need to handle this better if range is unaligned. Need to not truncate existing granules, and
// instead cover whole of intersecting granules at begin/end
// FIXME: need to handle this better if range is unaligned. Need to not truncate existing granules,
// and instead cover whole of intersecting granules at begin/end
wait(krmSetRangeCoalescing(&tr, blobGranuleForcePurgedKeys.begin, range, normalKeys, "1"_sr));
wait(tr.commit());
@ -4894,8 +4963,8 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size());
}
// Go backwards through set of granules to guarantee deleting oldest first. This avoids orphaning granules in the
// deletion process
// Go backwards through set of granules to guarantee deleting oldest first. This avoids orphaning granules in
// the deletion process
if (!toFullyDelete.empty()) {
state std::vector<Future<Void>> fullDeletions;
KeyRangeMap<std::pair<Version, Future<Void>>> parentDelete;
@ -4973,8 +5042,8 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
loop {
try {
// clear mapping range, so that a new BM doesn't try to recover force purged granules, and clients can't
// read them
// clear mapping range, so that a new BM doesn't try to recover force purged granules, and clients
// can't read them
wait(checkManagerLock(&tr, self));
wait(krmSetRange(&tr, blobGranuleMappingKeys.begin, range, blobGranuleMappingValueFor(UID())));
// FIXME: there is probably a cleaner fix than setting extra keys in the database if someone does a
@ -5206,7 +5275,7 @@ static void blobManagerExclusionSafetyCheck(Reference<BlobManagerData> self,
}
for (const AddressExclusion& excl : req.exclusions) {
for (auto& worker : self->workersById) {
if (excl.excludes(worker.second.address())) {
if (excl.excludes(worker.second->get().address())) {
remainingWorkers.erase(worker.first);
}
}
@ -5314,8 +5383,8 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
++bmData->stats.ccErrors;
}
}
// wait at least some interval if snapshot is small and to not overwhelm the system with reads (for example,
// empty database with one empty granule)
// wait at least some interval if snapshot is small and to not overwhelm the system with reads (for
// example, empty database with one empty granule)
wait(rateLimiter->getAllowance(allowanceBytes) && delay(SERVER_KNOBS->BGCC_MIN_INTERVAL));
} else {
if (BM_DEBUG) {

View File

@ -43,6 +43,7 @@
#include "fdbserver/MutationTracking.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/IKeyValueStore.h"
#include "flow/Arena.h"
#include "flow/CompressionUtils.h"
@ -245,6 +246,7 @@ struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry>
struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
IKeyValueStore* storage;
PromiseStream<Future<Void>> addActor;
@ -294,8 +296,8 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
bool isFullRestoreMode = false;
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db)
: id(id), db(db), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, IKeyValueStore* storage)
: id(id), db(db), storage(storage), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
resnapshotBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_BUDGET_BYTES)),
deltaWritesBudget(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES)),
@ -4722,6 +4724,33 @@ ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlo
}
}
ACTOR Future<Void> restartRangeAssignment(Reference<BlobWorkerData> self, Key begin, Key end) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
state AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(begin, end);
req.type = AssignRequestType::Normal;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> lockValue = wait(tr->get(blobGranuleLockKeyFor(req.keyRange)));
if (lockValue.present()) {
std::tuple<int64_t, int64_t, UID> currentOwner = decodeBlobGranuleLockValue(lockValue.get());
req.managerEpoch = std::get<0>(currentOwner);
req.managerSeqno = std::get<1>(currentOwner);
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
wait(handleRangeAssign(self, req, true));
return Void();
}
void handleBlobVersionRequest(Reference<BlobWorkerData> bwData, MinBlobVersionRequest req) {
bwData->db->setDesiredChangeFeedVersion(
std::max<Version>(0, req.grv - (SERVER_KNOBS->TARGET_BW_LAG_UPDATE * SERVER_KNOBS->VERSIONS_PER_SECOND)));
@ -4733,17 +4762,28 @@ void handleBlobVersionRequest(Reference<BlobWorkerData> bwData, MinBlobVersionRe
req.reply.send(rep);
}
ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData,
BlobWorkerInterface interf,
bool replaceExisting) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
state Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
TraceEvent("BlobWorkerRegister", bwData->id);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
// FIXME: should be able to remove this conflict range
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
if (replaceExisting) {
Optional<Value> val = wait(tr->get(blobWorkerListKey));
if (!val.present()) {
throw worker_removed();
}
} else {
// FIXME: should be able to remove this conflict range
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
}
tr->set(blobWorkerListKey, blobWorkerListValue(interf));
// Get manager lock from DB
@ -5181,57 +5221,9 @@ ACTOR Future<Void> simForceFullMemory(Reference<BlobWorkerData> bwData) {
}
}
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
state Reference<BlobWorkerData> self(new BlobWorkerData(bwInterf.id(), dbInfo, cx));
self->id = bwInterf.id();
self->locality = bwInterf.locality;
// Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption state
// using the DB Config rather than passing it through the initalization request for the blob manager and blob worker
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode").detail("Mode", self->encryptMode.toString());
ACTOR Future<Void> blobWorkerCore(BlobWorkerInterface bwInterf, Reference<BlobWorkerData> self) {
state Future<Void> collection = actorCollection(self->addActor.getFuture());
if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n");
}
try {
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
if (BW_DEBUG) {
fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL);
}
self->bstore = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}
}
// register the blob worker to the system keyspace
wait(registerBlobWorker(self, bwInterf));
} catch (Error& e) {
if (BW_DEBUG) {
fmt::print("BW got init error {0}\n", e.name());
}
// if any errors came up while initializing the blob worker, let the blob manager know
// that recruitment failed
if (!recruitReply.isSet()) {
recruitReply.sendError(recruitment_failed());
}
throw e;
}
// By now, we know that initialization was successful, so
// respond to the initialization request with the interface itself
// Note: this response gets picked up by the blob manager
InitializeBlobWorkerReply rep;
rep.interf = bwInterf;
recruitReply.send(rep);
self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
self->addActor.send(runGRVChecks(self));
self->addActor.send(monitorTenants(self));
@ -5244,8 +5236,6 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
self->addActor.send(simForceFullMemory(self));
}
TraceEvent("BlobWorkerInit", self->id).log();
try {
loop choose {
when(BlobGranuleFileRequest req = waitNext(bwInterf.blobGranuleFileRequest.getFuture())) {
@ -5405,7 +5395,227 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
self->shuttingDown = true;
wait(self->granuleMetadata.clearAsync());
throw worker_removed();
}
bool blobWorkerTerminated(Reference<BlobWorkerData> self, IKeyValueStore* persistentData, Error const& e) {
if (persistentData) {
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
persistentData->dispose();
} else {
persistentData->close();
}
}
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed ||
e.code() == error_code_file_not_found || e.code() == error_code_actor_cancelled) {
TraceEvent("BlobWorkerTerminated", self->id).errorUnsuppressed(e);
return true;
} else {
return false;
}
}
#define PERSIST_PREFIX "\xff\xff"
static const KeyRef persistID = PERSIST_PREFIX "ID"_sr;
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
IKeyValueStore* persistentData) {
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
state Reference<BlobWorkerData> self(new BlobWorkerData(bwInterf.id(), dbInfo, cx, persistentData));
self->id = bwInterf.id();
self->locality = bwInterf.locality;
try {
// Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption
// state using the DB Config rather than passing it through the initalization request for the blob manager and
// blob worker
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode").detail("Mode", self->encryptMode.toString());
if (self->storage) {
TraceEvent("BlobWorkerInit4", self->id);
wait(self->storage->init());
TraceEvent("BlobWorkerInit5", self->id);
self->storage->set(KeyValueRef(persistID, BinaryWriter::toValue(self->id, Unversioned())));
wait(self->storage->commit());
TraceEvent("BlobWorkerInit6", self->id);
}
if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n");
}
try {
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
if (BW_DEBUG) {
fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL);
}
self->bstore = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}
}
// register the blob worker to the system keyspace
wait(registerBlobWorker(self, bwInterf, false));
} catch (Error& e) {
if (BW_DEBUG) {
fmt::print("BW got init error {0}\n", e.name());
}
// if any errors came up while initializing the blob worker, let the blob manager know
// that recruitment failed
if (!recruitReply.isSet()) {
recruitReply.sendError(recruitment_failed());
}
throw e;
}
// By now, we know that initialization was successful, so
// respond to the initialization request with the interface itself
// Note: this response gets picked up by the blob manager
InitializeBlobWorkerReply rep;
rep.interf = bwInterf;
recruitReply.send(rep);
TraceEvent("BlobWorkerInit", self->id).log();
wait(blobWorkerCore(bwInterf, self));
return Void();
} catch (Error& e) {
if (blobWorkerTerminated(self, persistentData, e)) {
return Void();
}
throw e;
}
}
ACTOR Future<Void> restorePersistentState(Reference<BlobWorkerData> self) {
state Future<Optional<Value>> fID = self->storage->readValue(persistID);
wait(waitForAll(std::vector{ fID }));
if (!fID.get().present()) {
CODE_PROBE(true, "Restored uninitialized blob worker");
throw worker_removed();
}
UID recoveredID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
ASSERT(recoveredID == self->id);
return Void();
}
ACTOR Future<Void> restoreGranules(Reference<BlobWorkerData> self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
state Key beginKey = blobGranuleMappingKeys.begin;
// FIXME: use range stream instead
state int rowLimit = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : 10000;
state std::vector<Future<Void>> assignments;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
KeyRange nextRange(KeyRangeRef(beginKey, blobGranuleMappingKeys.end));
// using the krm functions can produce incorrect behavior here as it does weird stuff with beginKey
state GetRangeLimits limits(rowLimit, GetRangeLimits::BYTE_LIMIT_UNLIMITED);
limits.minRows = 2;
RangeResult results = wait(tr->getRange(nextRange, limits));
// Add the mappings to our in memory key range map
for (int rangeIdx = 0; rangeIdx < results.size() - 1; rangeIdx++) {
Key granuleStartKey = results[rangeIdx].key.removePrefix(blobGranuleMappingKeys.begin);
Key granuleEndKey = results[rangeIdx + 1].key.removePrefix(blobGranuleMappingKeys.begin);
if (results[rangeIdx].value.size()) {
UID existingOwner = decodeBlobGranuleMappingValue(results[rangeIdx].value);
if (existingOwner == self->id) {
assignments.push_back(restartRangeAssignment(self, granuleStartKey, granuleEndKey));
}
}
}
if (!results.more || results.size() <= 1) {
break;
}
// re-read last key to get range that starts there
beginKey = results.back().key;
} catch (Error& e) {
wait(tr->onError(e));
}
}
wait(waitForAll(assignments));
return Void();
}
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
Promise<Void> recovered,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
IKeyValueStore* persistentData) {
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
state Reference<BlobWorkerData> self(new BlobWorkerData(bwInterf.id(), dbInfo, cx, persistentData));
self->id = bwInterf.id();
self->locality = bwInterf.locality;
try {
// Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption
// state using the DB Config rather than passing it through the initalization request for the blob manager and
// blob worker
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode").detail("Mode", self->encryptMode.toString());
TraceEvent("BlobWorkerInit1", self->id);
wait(self->storage->init());
TraceEvent("BlobWorkerInit2", self->id);
wait(self->storage->commit());
TraceEvent("BlobWorkerInit3", self->id);
wait(restorePersistentState(self));
TraceEvent("BlobWorkerInit4", self->id);
if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n");
}
try {
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
if (BW_DEBUG) {
fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL);
}
self->bstore = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}
}
wait(restoreGranules(self));
// register the blob worker to the system keyspace
wait(registerBlobWorker(self, bwInterf, true));
} catch (Error& e) {
if (BW_DEBUG) {
fmt::print("BW got init error {0}\n", e.name());
}
throw e;
}
if (recovered.canBeSet()) {
recovered.send(Void());
}
TraceEvent("BlobWorkerInit", self->id).log();
wait(blobWorkerCore(bwInterf, self));
return Void();
} catch (Error& e) {
if (recovered.canBeSet())
recovered.send(Void());
if (blobWorkerTerminated(self, persistentData, e)) {
return Void();
}
throw e;
}
}
// TODO add unit tests for assign/revoke range, especially version ordering

View File

@ -887,11 +887,12 @@ struct InitializeBlobWorkerRequest {
constexpr static FileIdentifier file_identifier = 5838547;
UID reqId;
UID interfaceId;
KeyValueStoreType storeType;
ReplyPromise<InitializeBlobWorkerReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, interfaceId, reply);
serializer(ar, reqId, interfaceId, storeType, reply);
}
};
@ -1177,16 +1178,22 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
ConfigDBType configDBType,
Reference<AsyncVar<Optional<UID>>> clusterId);
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwi,
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady,
Reference<AsyncVar<ServerDBInfo> const> dbInfo);
ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ei, Reference<AsyncVar<ServerDBInfo>> db);
// These servers are started by workerServer
class IKeyValueStore;
class ServerCoordinators;
class IDiskQueue;
class IPageEncryptionKeyProvider;
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwi,
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
IKeyValueStore* persistentData);
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwi,
Promise<Void> recovered,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
IKeyValueStore* persistentData);
ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ei, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,

View File

@ -332,6 +332,7 @@ StringRef fileLogDataPrefix = "log-"_sr;
StringRef fileVersionedLogDataPrefix = "log2-"_sr;
StringRef fileLogQueuePrefix = "logqueue-"_sr;
StringRef tlogQueueExtension = "fdq"_sr;
StringRef fileBlobWorkerPrefix = "bw-"_sr;
enum class FilesystemCheck {
FILES_ONLY,
@ -483,7 +484,7 @@ TLogFn tLogFnForOptions(TLogOptions options) {
}
struct DiskStore {
enum COMPONENT { TLogData, Storage, UNSET };
enum COMPONENT { TLogData, Storage, BlobWorker, UNSET };
UID storeID = UID();
std::string filename = ""; // For KVStoreMemory just the base filename to be passed to IDiskQueue
@ -542,6 +543,9 @@ std::vector<DiskStore> getDiskStores(std::string folder,
store.tLogOptions.version = TLogVersion::V2;
store.tLogOptions.spillType = TLogSpillType::VALUE;
prefix = fileLogDataPrefix;
} else if (filename.startsWith(fileBlobWorkerPrefix)) {
store.storedComponent = DiskStore::BlobWorker;
prefix = fileBlobWorkerPrefix;
} else {
continue;
}
@ -2033,6 +2037,35 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
logData.back().actor = oldLog.getFuture() || tl;
logData.back().uid = s.storeID;
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
} else if (s.storedComponent == DiskStore::BlobWorker) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker;
BlobWorkerInterface recruited(locality, s.storeID);
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id(), details, "Restored");
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.blobGranuleFileRequest);
DUMPTOKEN(recruited.assignBlobRangeRequest);
DUMPTOKEN(recruited.revokeBlobRangeRequest);
DUMPTOKEN(recruited.granuleAssignmentsRequest);
DUMPTOKEN(recruited.granuleStatusStreamRequest);
DUMPTOKEN(recruited.haltBlobWorker);
DUMPTOKEN(recruited.minBlobVersionRequest);
TraceEvent("BlobWorkerOpen1", recruited.id()).detail("Filename", s.filename);
IKeyValueStore* data = openKVStore(s.storeType, s.filename, recruited.id(), memoryLimit);
filesClosed.add(data->onClosed());
Promise<Void> recovery;
Future<Void> bw = blobWorker(recruited, recovery, dbInfo, data);
recoveries.push_back(recovery.getFuture());
bw = handleIOErrors(bw, data, recruited.id());
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
}
}
@ -2651,6 +2684,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
}
when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) {
if (!blobWorkerCache.exists(req.reqId)) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker;
BlobWorkerInterface recruited(locality, req.interfaceId);
recruited.initEndpoints();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id());
@ -2664,8 +2700,21 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.haltBlobWorker);
DUMPTOKEN(recruited.minBlobVersionRequest);
IKeyValueStore* data = nullptr;
if (req.storeType != KeyValueStoreType::END) {
std::string filename =
filenameFromId(req.storeType, folder, fileBlobWorkerPrefix.toString(), recruited.id());
TraceEvent("BlobWorkerOpen2", recruited.id()).detail("Filename", filename);
data = openKVStore(req.storeType, filename, recruited.id(), memoryLimit);
filesClosed.add(data->onClosed());
}
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo, data);
if (req.storeType != KeyValueStoreType::END) {
bw = handleIOErrors(bw, data, recruited.id());
}
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
} else {