Reworked blob manager recovery to be more efficient and handle overlapping ongoing splits

This commit is contained in:
Josh Slocum 2022-01-24 14:12:36 -06:00
parent 4262241c92
commit 1494f8216b
3 changed files with 196 additions and 192 deletions

View File

@ -1312,6 +1312,21 @@ const KeyRange blobGranuleSplitBoundaryKeyRangeFor(UID const& parentGranuleID) {
return KeyRangeRef(startKey, strinc(startKey));
}
const Key blobGranuleSplitBoundaryValueFor(int64_t epoch, int64_t seqno) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << epoch;
wr << seqno;
return wr.toValue();
}
std::pair<int64_t, int64_t> decodeBlobGranuleSplitBoundaryValue(ValueRef const& value) {
int64_t epoch, seqno;
BinaryReader reader(value, IncludeVersion());
reader >> epoch;
reader >> seqno;
return std::pair(epoch, seqno);
}
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) {
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
wr.serializeBytes(blobGranuleHistoryKeys.begin);

View File

@ -596,6 +596,9 @@ const Key blobGranuleSplitBoundaryKeyFor(UID const& parentGranuleID, KeyRef cons
std::pair<UID, Key> decodeBlobGranuleSplitBoundaryKey(KeyRef const& key);
const KeyRange blobGranuleSplitBoundaryKeyRangeFor(UID const& parentGranuleID);
const Key blobGranuleSplitBoundaryValueFor(int64_t epoch, int64_t seqno);
std::pair<int64_t, int64_t> decodeBlobGranuleSplitBoundaryValue(ValueRef const& value);
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version);
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(KeyRef const& key);
const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range);

View File

@ -43,6 +43,11 @@
#define BM_DEBUG true
// DO NOT CHANGE THIS
// Special key where the value means the epoch + sequence number of the split, instead of the actual boundary
// Chosen because this should not be a start or end key in any split
static Key splitBoundarySpecialKey = "\xff\xff\xff"_sr;
// TODO add comments + documentation
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena& ar,
@ -830,6 +835,9 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
newGranuleIDs.push_back(deterministicRandom()->randomUniqueID());
}
state int64_t splitSeqno = bmData->seqNo;
bmData->seqNo++;
// Need to split range. Persist intent to split and split metadata to DB BEFORE sending split assignments to blob
// workers, so that nothing is lost on blob manager recovery
loop {
@ -904,6 +912,9 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
granuleRange.begin.printable(),
granuleRange.end.printable());*/
// first key in split boundaries is special: key that doesn't occur normally to the (epoch, seqno) of split
tr->set(blobGranuleSplitBoundaryKeyFor(granuleID, splitBoundarySpecialKey),
blobGranuleSplitBoundaryValueFor(bmData->epoch, splitSeqno));
for (int i = 0; i < newRanges.size() - 1; i++) {
/*fmt::print(" {0} [{1} - {2})\n",
newGranuleIDs[i].toString().substr(0, 6),
@ -1324,7 +1335,13 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
// At this point, bmData->workersById is a list of all alive blob workers, but could also include some dead BWs.
// The algorithm below works as follows:
// 1. We get the existing granule mappings that were persisted by blob workers who were assigned ranges and
// 1. We get the ongoing split boundaries to construct the set of granules we should have. For these splits, we
// simply assign the range to the next best worker if it is not present in the assignment mapping. This is not
// any worse than what the old blob manager would have done. Details: Note that this means that if a worker we
// intended to give a splitted range to dies before the new BM recovers, then we'll simply assign the range to
// the next best worker.
//
// 2. We get the existing granule mappings that were persisted by blob workers who were assigned ranges and
// add them to bmData->granuleAssignments, which is a key range map.
// Details: re-assignments might have happened between the time the mapping was last updated and now.
// For example, suppose a blob manager sends requests to the range assigner stream to move a granule G.
@ -1334,35 +1351,169 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
// still owned. In the above case, even if the revoke goes through, since we don't update the mapping during
// revokes, this is the same as the case above. Another case to consider is when a blob worker dies when the
// BM is recovering. Now the mapping at this time looks like G->deadBW. But the rangeAssigner handles this:
// we'll try to assign a range to a dead worker and fail and reassign it to the next best worker.
// we'll try to assign a range to a dead worker and fail and reassign it to the next best worker. It will also
// handle the case where the mapping does not reflect the desired set of granules based on the ongoing spits, and
// correct it.
//
// 3. We get the existing split intentions and boundaries that were Started but not acknowledged by any blob workers
// and add them to our key range map, bmData->granuleAssignments. Note that we are adding them on top of the
// granule mappings and since we are using a key range map, we end up with the same set of shard boundaries as
// the old blob manager had. For these splits, we simply assign the range to the next best worker. This is not
// any worst than what the old blob manager would have done. Details: Note that this means that if a worker we
// intended to give a splitted range to dies before the new BM recovers, then we'll simply assign the range to
// the next best worker.
//
// 4. For every range in our granuleAssignments, we send an assign request to the stream of requests,
// 3. For every range in our granuleAssignments, we send an assign request to the stream of requests,
// ultimately giving every range back to some worker (trying to mimic the state of the old BM).
// If the worker already had the range, this is a no-op. If the worker didn't have it, it will
// begin persisting it. The worker that had the same range before will now be at a lower seqno.
state KeyRangeMap<Optional<UID>> workerAssignments;
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
state std::unordered_map<UID, KeyRange> granuleIdToRange;
// TODO KNOB
state int rowLimit = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : 10000;
if (BM_DEBUG) {
fmt::print("BM {0} recovering:\n", bmData->epoch);
fmt::print("BM {0} found old assignments:\n", bmData->epoch);
fmt::print("BM {0} found in progress splits:\n", bmData->epoch);
}
// TODO use range stream instead
state UID currentParentID = UID();
state Optional<UID> nextParentID;
state std::vector<Key> splitBoundaries;
state std::pair<int64_t, int64_t>
splitEpochSeqno; // used to order splits since we can have multiple splits of the same range in progress at once
state Key boundaryBeginKey = blobGranuleSplitBoundaryKeys.begin;
state RangeResult boundaryResult;
boundaryResult.readThrough = boundaryBeginKey;
boundaryResult.more = true;
state int boundaryResultIdx = 0;
// Step 3. Get the latest known split and merge state. Because we can have multiple splits in progress at the same
// time, and we don't know which parts of those are reflected in the current set of worker assignments we read, we
// have to construct the current desired set of granules from the set of ongoing splits and merges. Then, if any of
// those are not represented in the worker mapping, we must add them.
state KeyRangeMap<std::pair<int64_t, int64_t>> inProgressSplits;
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
// Advance boundary reader
loop {
if (boundaryResultIdx >= boundaryResult.size()) {
if (!boundaryResult.more) {
break;
}
ASSERT(boundaryResult.readThrough.present() || boundaryResult.size() > 0);
boundaryBeginKey = boundaryResult.readThrough.present() ? boundaryResult.readThrough.get()
: keyAfter(boundaryResult.back().key);
loop {
try {
RangeResult r = wait(
tr->getRange(KeyRangeRef(boundaryBeginKey, blobGranuleSplitBoundaryKeys.end), rowLimit));
ASSERT(r.size() > 0 || !r.more);
boundaryResult = r;
boundaryResultIdx = 0;
break;
} catch (Error& e) {
if (BM_DEBUG) {
fmt::print("BM {0} got error advancing boundary cursor: {1}\n", bmData->epoch, e.name());
}
wait(tr->onError(e));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
}
// if we got a response and there are zero rows, we are done
if (boundaryResult.empty()) {
break;
}
}
bool foundNext = false;
while (boundaryResultIdx < boundaryResult.size()) {
UID parentGranuleID;
Key boundaryKey;
std::tie(parentGranuleID, boundaryKey) =
decodeBlobGranuleSplitBoundaryKey(boundaryResult[boundaryResultIdx].key);
if (parentGranuleID != currentParentID) {
// nextParentID should have already been set by split reader
nextParentID = parentGranuleID;
foundNext = true;
break;
}
if (splitBoundarySpecialKey == boundaryKey) {
ASSERT(splitEpochSeqno.first == 0 && splitEpochSeqno.second == 0);
ASSERT(boundaryResult[boundaryResultIdx].value.size() > 0);
splitEpochSeqno = decodeBlobGranuleSplitBoundaryValue(boundaryResult[boundaryResultIdx].value);
ASSERT(splitEpochSeqno.first != 0 && splitEpochSeqno.second != 0);
} else {
ASSERT(boundaryResult[boundaryResultIdx].value.size() == 0);
splitBoundaries.push_back(boundaryKey);
}
boundaryResultIdx++;
}
if (foundNext) {
break;
}
}
// process this split
if (currentParentID != UID()) {
std::sort(splitBoundaries.begin(), splitBoundaries.end());
if (BM_DEBUG) {
fmt::print(" [{0} - {1}) {2} @ ({3}, {4}):\n",
splitBoundaries.front().printable(),
splitBoundaries.back().printable(),
currentParentID.toString().substr(0, 6),
splitEpochSeqno.first,
splitEpochSeqno.second);
}
for (int i = 0; i < splitBoundaries.size() - 1; i++) {
// if this split boundary has not been opened by a blob worker yet, or was not in the assignment list
// when we previously read it, we must ensure it gets assigned to one
KeyRange range = KeyRange(KeyRangeRef(splitBoundaries[i], splitBoundaries[i + 1]));
// same algorithm as worker map. If we read boundary changes from the log out of order, save the newer
// ones, apply this one, and re-apply the other ones over this one don't concurrently modify with
// iterator
std::vector<std::pair<KeyRange, std::pair<int64_t, int64_t>>> newer;
newer.reserve(splitBoundaries.size() - 1);
auto intersecting = inProgressSplits.intersectingRanges(range);
for (auto& it : intersecting) {
if (splitEpochSeqno.first < it.value().first ||
(splitEpochSeqno.first == it.value().first && splitEpochSeqno.second > it.value().second)) {
newer.push_back(std::pair(it.range(), it.value()));
}
}
inProgressSplits.insert(range, splitEpochSeqno);
for (auto& it : newer) {
inProgressSplits.insert(it.first, it.second);
}
if (BM_DEBUG) {
fmt::print(" [{0} - {1})\n", range.begin.printable(), range.end.printable());
}
}
}
splitBoundaries.clear();
splitEpochSeqno = std::pair(0, 0);
if (!nextParentID.present()) {
break;
}
currentParentID = nextParentID.get();
nextParentID.reset();
}
if (BM_DEBUG) {
fmt::print("BM {0} found old assignments:\n", bmData->epoch);
}
// TODO could populate most/all of this list by just asking existing blob workers for their range sets to reduce DB
// read load on BM restart Step 1. Get the latest known mapping of granules to blob workers (i.e. assignments)
// This must happen causally AFTER reading the split boundaries, since the blob workers can clear the split
// boundaries for a granule as part of persisting their assignment.
state KeyRef beginKey = normalKeys.begin;
loop {
try {
@ -1413,190 +1564,25 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
}
}
// TODO use range stream instead
state UID currentParentID = UID();
state Optional<UID> nextParentID;
state std::vector<Key> splitBoundaries;
state std::vector<std::pair<UID, BlobGranuleSplitState>> splitStates;
state Key splitBeginKey = blobGranuleSplitKeys.begin;
state RangeResult splitResult;
splitResult.readThrough = splitBeginKey;
splitResult.more = true;
state int splitResultIdx = 0;
state Key boundaryBeginKey = blobGranuleSplitBoundaryKeys.begin;
state RangeResult boundaryResult;
boundaryResult.readThrough = boundaryBeginKey;
boundaryResult.more = true;
state int boundaryResultIdx = 0;
// Step 3. Get the latest known split intentions and boundaries
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if (BM_DEBUG) {
fmt::print("BM {0} found in progress splits:\n", bmData->epoch);
printf("Splits overriding the following ranges:\n");
}
loop {
// Advance both split and boundary readers until we hit another granule or EOS, to get the full state for one
// granule split. Effectively a stream merge.
// Advance split reader
loop {
if (splitResultIdx >= splitResult.size()) {
if (!splitResult.more) {
break;
}
ASSERT(splitResult.readThrough.present() || splitResult.size() > 0);
splitBeginKey = splitResult.readThrough.present() ? splitResult.readThrough.get()
: keyAfter(splitResult.back().key);
loop {
try {
RangeResult r =
wait(tr->getRange(KeyRangeRef(splitBeginKey, blobGranuleSplitKeys.end), rowLimit));
ASSERT(r.size() > 0 || !r.more);
splitResult = r;
splitResultIdx = 0;
break;
} catch (Error& e) {
if (BM_DEBUG) {
fmt::print("BM {0} got error advancing split cursor: {1}\n", bmData->epoch, e.name());
}
wait(tr->onError(e));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
}
// if we got a response and there are zero rows, we are done
if (splitResult.empty()) {
ASSERT(!splitResult.more);
break;
}
}
bool foundNext = false;
while (splitResultIdx < splitResult.size()) {
UID parentGranuleID, granuleID;
std::tie(parentGranuleID, granuleID) = decodeBlobGranuleSplitKey(splitResult[splitResultIdx].key);
if (parentGranuleID != currentParentID) {
nextParentID = parentGranuleID;
foundNext = true;
break;
}
BlobGranuleSplitState splitState;
Version version;
std::tie(splitState, version) = decodeBlobGranuleSplitValue(splitResult[splitResultIdx].value);
splitStates.push_back(std::pair(granuleID, splitState));
splitResultIdx++;
}
if (foundNext) {
break;
}
// Apply current granule boundaries to the assignment map. If they don't exactly match what is currently in the map,
// override and assign it to a new worker
auto splits = inProgressSplits.intersectingRanges(normalKeys);
for (auto& it : splits) {
if (it.value().first == 0 || it.value().second == 0) {
// no in-progress splits for this range
continue;
}
auto r = workerAssignments.rangeContaining(it.begin());
// Advance boundary reader
loop {
if (boundaryResultIdx >= boundaryResult.size()) {
if (!boundaryResult.more) {
break;
}
ASSERT(boundaryResult.readThrough.present() || boundaryResult.size() > 0);
boundaryBeginKey = boundaryResult.readThrough.present() ? boundaryResult.readThrough.get()
: keyAfter(boundaryResult.back().key);
loop {
try {
RangeResult r = wait(
tr->getRange(KeyRangeRef(boundaryBeginKey, blobGranuleSplitBoundaryKeys.end), rowLimit));
ASSERT(r.size() > 0 || !r.more);
boundaryResult = r;
boundaryResultIdx = 0;
break;
} catch (Error& e) {
if (BM_DEBUG) {
fmt::print("BM {0} got error advancing boundary cursor: {1}\n", bmData->epoch, e.name());
}
wait(tr->onError(e));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
}
// if we got a response and there are zero rows, we are done
if (boundaryResult.empty()) {
break;
}
}
bool foundNext = false;
while (boundaryResultIdx < boundaryResult.size()) {
UID parentGranuleID;
Key boundaryKey;
std::tie(parentGranuleID, boundaryKey) =
decodeBlobGranuleSplitBoundaryKey(boundaryResult[boundaryResultIdx].key);
if (parentGranuleID != currentParentID) {
// nextParentID should have already been set by split reader
ASSERT(nextParentID.present());
ASSERT(nextParentID.get() == parentGranuleID);
foundNext = true;
break;
}
splitBoundaries.push_back(boundaryKey);
boundaryResultIdx++;
}
if (foundNext) {
break;
}
// if this range is at all different from the worker mapping, the mapping is out of date
if (r.begin() != it.begin() || r.end() != it.end()) {
// the empty UID signifies that we need to find an owner (worker) for this range
workerAssignments.insert(it.range(), UID());
fmt::print(" [{0} - {1})\n", it.begin().printable().c_str(), it.end().printable().c_str());
}
// process this split
if (currentParentID != UID()) {
ASSERT(splitStates.size() > 0);
ASSERT(splitBoundaries.size() - 1 == splitStates.size());
std::sort(splitBoundaries.begin(), splitBoundaries.end());
if (BM_DEBUG) {
fmt::print(" [{0} - {1}) {2}:\n",
splitBoundaries.front().printable(),
splitBoundaries.back().printable(),
currentParentID.toString().substr(0, 6));
}
for (int i = 0; i < splitStates.size(); i++) {
// if this split boundary had not been opened by a blob worker before the last manager crashed, we must
// ensure it gets assigned to one
KeyRange range = KeyRange(KeyRangeRef(splitBoundaries[i], splitBoundaries[i + 1]));
if (BM_DEBUG) {
printf(" ");
}
if (splitStates[i].second <= BlobGranuleSplitState::Initialized) {
// the empty UID signifies that we need to find an owner (worker) for this range
if (BM_DEBUG) {
printf("*** ");
}
workerAssignments.insert(range, UID());
}
if (BM_DEBUG) {
fmt::print("[{0} - {1}) {2}\n",
range.begin.printable(),
range.end.printable(),
splitStates[i].first.toString().substr(0, 6));
}
}
}
splitBoundaries.clear();
splitStates.clear();
if (!nextParentID.present()) {
break;
}
currentParentID = nextParentID.get();
nextParentID.reset();
}
// Step 4. Send assign requests for all the granules and transfer assignments