Fixing granule revoke on recover

This commit is contained in:
Josh Slocum 2022-01-31 19:46:20 -06:00
parent 30d8c593c8
commit d46ef148bb
1 changed files with 46 additions and 20 deletions

View File

@ -531,26 +531,32 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, workerId, seqNo));
} else {
// Revoking a range could be a large range that contains multiple ranges.
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
for (auto& it : currentAssignments) {
// ensure range doesn't truncate existing ranges
ASSERT(it.begin() >= assignment.keyRange.begin);
ASSERT(it.end() <= assignment.keyRange.end);
// It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of
// the same logical change
if (bmData->workerStats.count(it.value())) {
bmData->workerStats[it.value()].numGranulesAssigned -= 1;
if (assignment.worker.present()) {
// revoke this specific range from this specific worker. Either part of recovery or failing a worker
if (bmData->workerStats.count(assignment.worker.get())) {
bmData->workerStats[assignment.worker.get()].numGranulesAssigned -= 1;
}
bmData->addActor.send(doRangeAssignment(bmData, assignment, assignment.worker.get(), seqNo));
} else {
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
for (auto& it : currentAssignments) {
// ensure range doesn't truncate existing ranges
ASSERT(it.begin() >= assignment.keyRange.begin);
ASSERT(it.end() <= assignment.keyRange.end);
bmData->assignsInProgress.cancel(assignment.keyRange);
// It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part
// of the same logical change
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
if (bmData->workerStats.count(it.value())) {
bmData->workerStats[it.value()].numGranulesAssigned -= 1;
}
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
}
}
bmData->assignsInProgress.cancel(assignment.keyRange);
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
}
@ -1573,13 +1579,25 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
state std::vector<std::pair<UID, KeyRange>> outOfDateAssignments;
state int successful = 0;
state int assignIdx = 0;
// FIXME: more CPU efficient to do sorted merge of assignments?
for (; assignIdx < aliveAssignments.size(); assignIdx++) {
Optional<GetGranuleAssignmentsReply> reply = wait(aliveAssignments[assignIdx]);
UID workerId = startingWorkers[assignIdx].id();
if (reply.present()) {
if (BM_DEBUG) {
fmt::print(" Worker {}: ({})\n", workerId.toString().substr(0, 5), reply.get().assignments.size());
}
successful++;
for (auto& assignment : reply.get().assignments) {
if (BM_DEBUG) {
fmt::print(" [{0} - {1}): ({2}, {3})\n",
assignment.range.begin.printable(),
assignment.range.end.printable(),
assignment.epochAssigned,
assignment.seqnoAssigned);
}
bmData->knownBlobRanges.insert(assignment.range, true);
addAssignment(workerAssignments,
assignment.range,
@ -1588,9 +1606,14 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
assignment.seqnoAssigned,
&outOfDateAssignments);
}
wait(yield());
if (bmData->workerStats.count(workerId)) {
bmData->workerStats[workerId].numGranulesAssigned = reply.get().assignments.size();
}
} else {
// TODO mark as failed and kill it
if (BM_DEBUG) {
fmt::print(" Worker {}: failed\n", workerId.toString().substr(0, 5));
}
}
}
@ -1635,7 +1658,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
fmt::print(" [{0} - {1})={2}\n",
granuleStartKey.printable(),
granuleEndKey.printable(),
results[rangeIdx].value.printable());
existingOwner.toString().substr(0, 5));
}
} else {
if (BM_DEBUG) {
@ -1697,7 +1720,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
TEST(!outOfDateAssignments.empty()); // BM resolved conflicting assignments on recovery
for (auto& it : outOfDateAssignments) {
if (BM_DEBUG) {
fmt::print("BM {0} revoking out of date assignment [%s - %s): %s:\n",
fmt::print("BM {0} revoking out of date assignment [{1} - {2}): {3}:\n",
bmData->epoch,
it.second.begin.printable().c_str(),
it.second.end.printable().c_str(),
@ -1707,6 +1730,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
raRevoke.isAssign = false;
raRevoke.worker = it.first;
raRevoke.keyRange = it.second;
raRevoke.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(raRevoke);
}
@ -1728,10 +1752,12 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
bmData->workerAssignments.insert(range.range(), workerId);
if (BM_DEBUG) {
fmt::print(" [{0} - {1}){2}\n",
fmt::print(" [{0} - {1}) @ ({2}, {3}): {4}\n",
range.begin().printable(),
range.end().printable(),
workerId == UID() || epoch == 0 ? " (*)" : "");
epoch,
seqno,
workerId == UID() || epoch == 0 ? " (?)" : workerId.toString().substr(0, 5).c_str());
}
// if worker id is already set to a known worker that replied with it in the mapping, range is already assigned