foundationdb/fdbserver/BlobManager.actor.cpp

1698 lines
60 KiB
C++
Raw Normal View History

/*
* BlobManager.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
2021-09-03 00:09:37 +08:00
#include <vector>
#include <unordered_map>
2021-10-18 22:49:25 +08:00
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/Knobs.h"
2021-10-20 23:54:19 +08:00
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
2021-10-22 05:39:38 +08:00
#define BM_DEBUG true
// FIXME: change all BlobManagerData* to Reference<BlobManagerData> to avoid segfaults if core loop gets error
// TODO add comments + documentation
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove,
KeyRef rangeStart,
KeyRef rangeEnd,
bool rangeActive) {
if (BM_DEBUG) {
printf("db range [%s - %s): %s\n",
rangeStart.printable().c_str(),
rangeEnd.printable().c_str(),
rangeActive ? "T" : "F");
}
KeyRange keyRange(KeyRangeRef(rangeStart, rangeEnd));
auto allRanges = knownBlobRanges->intersectingRanges(keyRange);
for (auto& r : allRanges) {
if (r.value() != rangeActive) {
KeyRef overlapStart = (r.begin() > keyRange.begin) ? r.begin() : keyRange.begin;
KeyRef overlapEnd = (keyRange.end < r.end()) ? keyRange.end : r.end();
KeyRangeRef overlap(overlapStart, overlapEnd);
if (rangeActive) {
if (BM_DEBUG) {
printf("BM Adding client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
rangesToAdd->push_back_deep(ar, overlap);
} else {
if (BM_DEBUG) {
printf("BM Removing client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
rangesToRemove->push_back_deep(ar, overlap);
}
}
}
knownBlobRanges->insert(keyRange, rangeActive);
}
void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
RangeResult dbBlobRanges,
Arena ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove) {
if (BM_DEBUG) {
printf("Updating %d client blob ranges", dbBlobRanges.size() / 2);
for (int i = 0; i < dbBlobRanges.size() - 1; i += 2) {
printf(" [%s - %s)", dbBlobRanges[i].key.printable().c_str(), dbBlobRanges[i + 1].key.printable().c_str());
}
printf("\n");
}
// essentially do merge diff of current known blob ranges and new ranges, to assign new ranges to
// workers and revoke old ranges from workers
// basically, for any range that is set in results that isn't set in ranges, assign the range to the
// worker. for any range that isn't set in results that is set in ranges, revoke the range from the
// worker. and, update ranges to match results as you go
// FIXME: could change this to O(N) instead of O(NLogN) by doing a sorted merge instead of requesting the
// intersection for each insert, but this operation is pretty infrequent so it's probably not necessary
if (dbBlobRanges.size() == 0) {
// special case. Nothing in the DB, reset knownBlobRanges and revoke all existing ranges from workers
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, normalKeys.end, false);
} else {
if (dbBlobRanges[0].key > normalKeys.begin) {
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, dbBlobRanges[0].key, false);
}
for (int i = 0; i < dbBlobRanges.size() - 1; i++) {
if (dbBlobRanges[i].key >= normalKeys.end) {
if (BM_DEBUG) {
printf("Found invalid blob range start %s\n", dbBlobRanges[i].key.printable().c_str());
}
break;
}
bool active = dbBlobRanges[i].value == LiteralStringRef("1");
if (active) {
ASSERT(dbBlobRanges[i + 1].value == StringRef());
if (BM_DEBUG) {
printf("BM sees client range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
dbBlobRanges[i + 1].key.printable().c_str());
}
}
KeyRef endKey = dbBlobRanges[i + 1].key;
if (endKey > normalKeys.end) {
if (BM_DEBUG) {
printf("Removing system keyspace from blob range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
endKey.printable().c_str());
}
endKey = normalKeys.end;
}
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, dbBlobRanges[i].key, endKey, active);
}
if (dbBlobRanges[dbBlobRanges.size() - 1].key < normalKeys.end) {
handleClientBlobRange(knownBlobRanges,
ar,
rangesToAdd,
rangesToRemove,
dbBlobRanges[dbBlobRanges.size() - 1].key,
normalKeys.end,
false);
}
}
knownBlobRanges->coalesce(normalKeys);
}
void getRanges(std::vector<std::pair<KeyRangeRef, bool>>& results, KeyRangeMap<bool>& knownBlobRanges) {
if (BM_DEBUG) {
printf("Getting ranges:\n");
}
auto allRanges = knownBlobRanges.ranges();
for (auto& r : allRanges) {
results.emplace_back(r.range(), r.value());
if (BM_DEBUG) {
printf(
" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" : "F");
}
}
}
struct RangeAssignmentData {
2021-10-22 05:39:38 +08:00
// TODO: put these in an enum
bool continueAssignment;
2021-10-22 05:39:38 +08:00
bool specialAssignment;
2021-10-22 05:39:38 +08:00
RangeAssignmentData() : continueAssignment(false), specialAssignment(false) {}
RangeAssignmentData(bool continueAssignment, bool specialAssignment = false)
: continueAssignment(continueAssignment), specialAssignment(specialAssignment) {}
};
struct RangeRevokeData {
bool dispose;
RangeRevokeData() {}
RangeRevokeData(bool dispose) : dispose(dispose) {}
};
2021-08-31 02:07:25 +08:00
struct RangeAssignment {
bool isAssign;
KeyRange keyRange;
Optional<UID> worker;
2021-08-31 02:59:53 +08:00
// I tried doing this with a union and it was just kind of messy
Optional<RangeAssignmentData> assign;
Optional<RangeRevokeData> revoke;
2021-08-31 02:07:25 +08:00
};
// TODO: track worker's reads/writes eventually
struct BlobWorkerStats {
int numGranulesAssigned;
BlobWorkerStats(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
};
struct BlobManagerData {
UID id;
Database db;
2021-09-15 23:35:58 +08:00
PromiseStream<Future<Void>> addActor;
2021-09-03 00:09:37 +08:00
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, BlobWorkerStats> workerStats; // mapping between workerID -> workerStats
2021-08-31 02:07:25 +08:00
KeyRangeMap<UID> workerAssignments;
KeyRangeMap<bool> knownBlobRanges;
2021-10-22 05:39:38 +08:00
AsyncTrigger startRecruiting;
2021-10-09 01:46:06 +08:00
Debouncer restartRecruiting;
std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on
2021-10-22 05:39:38 +08:00
AsyncVar<int> recruitingStream;
int64_t epoch = -1;
int64_t seqNo = 1;
2021-08-31 02:07:25 +08:00
Promise<Void> iAmReplaced;
// The order maintained here is important. The order ranges are put into the promise stream is the order they get
// assigned sequence numbers
PromiseStream<RangeAssignment> rangesToAssign;
BlobManagerData(UID id, Database db)
2021-10-09 01:46:06 +08:00
: id(id), db(db), knownBlobRanges(false, normalKeys.end),
2021-10-22 05:39:38 +08:00
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
~BlobManagerData() { printf("Destroying blob manager data for %s\n", id.toString().c_str()); }
};
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesTransaction> tr, KeyRange range) {
// TODO is it better to just pass empty metrics to estimated?
2021-10-18 21:56:47 +08:00
// redo split if previous txn failed to calculate it
loop {
try {
if (BM_DEBUG) {
printf(
"Splitting new range [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
}
StorageMetrics estimated = wait(tr->getTransaction().getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY));
2021-10-18 21:56:47 +08:00
if (BM_DEBUG) {
printf("Estimated bytes for [%s - %s): %lld\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
estimated.bytes);
}
2021-10-18 21:56:47 +08:00
if (estimated.bytes > SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
// printf(" Splitting range\n");
// only split on bytes
StorageMetrics splitMetrics;
splitMetrics.bytes = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES;
splitMetrics.bytesPerKSecond = splitMetrics.infinity;
splitMetrics.iosPerKSecond = splitMetrics.infinity;
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidth
Standalone<VectorRef<KeyRef>> keys =
wait(tr->getTransaction().splitStorageMetrics(range, splitMetrics, estimated));
return keys;
} else {
// printf(" Not splitting range\n");
Standalone<VectorRef<KeyRef>> keys;
keys.push_back_deep(keys.arena(), range.begin);
keys.push_back_deep(keys.arena(), range.end);
return keys;
}
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
// Picks a worker with the fewest number of already assigned ranges.
// If there is a tie, picks one such worker at random.
2021-10-22 05:39:38 +08:00
ACTOR Future<UID> pickWorkerForAssign(BlobManagerData* bmData) {
state int minGranulesAssigned = INT_MAX;
state std::vector<UID> eligibleWorkers;
printf("workerStats.size()=%d, recruitingStream=%d\n", bmData->workerStats.size(), bmData->recruitingStream.get());
while (bmData->workerStats.size() == 0) {
printf("looping waiting for workerstats\n");
wait(bmData->recruitingStream.onChange());
}
printf("workerStats.size()=%d\n", bmData->workerStats.size());
for (auto const& worker : bmData->workerStats) {
UID currId = worker.first;
int granulesAssigned = worker.second.numGranulesAssigned;
if (granulesAssigned < minGranulesAssigned) {
eligibleWorkers.resize(0);
minGranulesAssigned = granulesAssigned;
eligibleWorkers.emplace_back(currId);
} else if (granulesAssigned == minGranulesAssigned) {
eligibleWorkers.emplace_back(currId);
}
}
2021-08-31 02:07:25 +08:00
// pick a random worker out of the eligible workers
ASSERT(eligibleWorkers.size() > 0);
int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size());
if (BM_DEBUG) {
printf("picked worker %s, which has a minimal number (%d) of granules assigned\n",
eligibleWorkers[idx].toString().c_str(),
minGranulesAssigned);
}
return eligibleWorkers[idx];
2021-08-31 02:07:25 +08:00
}
ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment assignment, UID workerID, int64_t seqNo) {
if (BM_DEBUG) {
printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n",
2021-10-01 23:08:00 +08:00
bmData->id.toString().c_str(),
assignment.isAssign ? "assigning" : "revoking",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str(),
bmData->epoch,
seqNo);
}
2021-08-31 02:07:25 +08:00
try {
state AssignBlobRangeReply rep;
if (assignment.isAssign) {
ASSERT(assignment.assign.present());
ASSERT(!assignment.revoke.present());
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo;
req.continueAssignment = assignment.assign.get().continueAssignment;
2021-10-22 05:39:38 +08:00
req.specialAssignment = assignment.assign.get().specialAssignment;
2021-10-01 23:08:00 +08:00
// if that worker isn't alive anymore, add the range back into the stream
if (bmData->workersById.count(workerID) == 0) {
2021-10-14 02:56:17 +08:00
throw no_more_servers();
2021-10-01 23:08:00 +08:00
}
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
rep = _rep;
} else {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
2021-10-01 23:08:00 +08:00
// if that worker isn't alive anymore, this is a noop
if (bmData->workersById.count(workerID)) {
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
rep = _rep;
} else {
return Void();
}
}
2021-08-31 02:07:25 +08:00
if (!rep.epochOk) {
if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
}
2021-08-31 02:07:25 +08:00
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
}
} catch (Error& e) {
// TODO confirm: using reliable delivery this should only trigger if the worker is marked as failed, right?
// So assignment needs to be retried elsewhere, and a revoke is trivially complete
if (assignment.isAssign) {
if (BM_DEBUG) {
printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n",
assignment.keyRange.begin.printable().c_str(),
2021-10-01 23:08:00 +08:00
assignment.keyRange.end.printable().c_str(),
workerID.toString().c_str());
}
2021-08-31 02:07:25 +08:00
// re-send revoke to queue to handle range being un-assigned from that worker before the new one
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = workerID;
revokeOld.keyRange = assignment.keyRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(revokeOld);
// send assignment back to queue as is, clearing designated worker if present
2021-10-22 05:39:38 +08:00
if (assignment.assign.get().specialAssignment && assignment.worker.get() != UID()) {
} else {
assignment.worker.reset();
}
2021-08-31 02:07:25 +08:00
bmData->rangesToAssign.send(assignment);
// FIXME: improvement would be to add history of failed workers to assignment so it can try other ones first
} else {
if (BM_DEBUG) {
printf("BM got error revoking range [%s - %s) from worker %s",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
}
if (assignment.revoke.get().dispose) {
if (BM_DEBUG) {
printf(", retrying for dispose\n");
}
// send assignment back to queue as is, clearing designated worker if present
assignment.worker.reset();
bmData->rangesToAssign.send(assignment);
//
} else {
if (BM_DEBUG) {
printf(", ignoring\n");
}
}
2021-08-31 02:07:25 +08:00
}
}
return Void();
}
2021-08-31 02:07:25 +08:00
ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
loop {
2021-09-14 23:19:15 +08:00
// inject delay into range assignments
if (BUGGIFY_WITH_PROB(0.05)) {
wait(delay(deterministicRandom()->random01()));
}
2021-10-22 05:39:38 +08:00
state RangeAssignment assignment = waitNext(bmData->rangesToAssign.getFuture());
state int64_t seqNo = bmData->seqNo;
2021-08-31 02:07:25 +08:00
bmData->seqNo++;
// modify the in-memory assignment data structures, and send request off to worker
2021-10-22 05:39:38 +08:00
state UID workerId;
2021-08-31 02:07:25 +08:00
if (assignment.isAssign) {
// Ensure range isn't currently assigned anywhere, and there is only 1 intersecting range
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
int count = 0;
for (auto& it : currentAssignments) {
if (assignment.assign.get().continueAssignment) {
ASSERT(assignment.worker.present());
ASSERT(it.value() == assignment.worker.get());
} else {
2021-10-22 05:39:38 +08:00
// ASSERT(it.value() == UID());
}
2021-08-31 02:07:25 +08:00
count++;
}
ASSERT(count == 1);
2021-10-22 05:39:38 +08:00
if (assignment.worker.present() && assignment.worker.get().isValid()) {
workerId = assignment.worker.get();
} else {
UID _workerId = wait(pickWorkerForAssign(bmData));
workerId = _workerId;
}
2021-08-31 02:07:25 +08:00
bmData->workerAssignments.insert(assignment.keyRange, workerId);
2021-10-22 05:39:38 +08:00
/*
2021-10-13 04:36:05 +08:00
ASSERT(bmData->workerStats.count(workerId));
if (!assignment.assign.get().continueAssignment) {
2021-10-22 05:39:38 +08:00
bmData->workerStats[workerId].numGranulesAssigned += 1;
2021-10-01 23:08:00 +08:00
}
2021-10-22 05:39:38 +08:00
*/
2021-10-01 23:08:00 +08:00
2021-08-31 02:07:25 +08:00
// FIXME: if range is assign, have some sort of semaphore for outstanding assignments so we don't assign
// a ton ranges at once and blow up FDB with reading initial snapshots.
bmData->addActor.send(doRangeAssignment(bmData, assignment, workerId, seqNo));
2021-08-31 02:07:25 +08:00
} else {
// Revoking a range could be a large range that contains multiple ranges.
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
for (auto& it : currentAssignments) {
// ensure range doesn't truncate existing ranges
ASSERT(it.begin() >= assignment.keyRange.begin);
2021-08-31 02:59:53 +08:00
ASSERT(it.end() <= assignment.keyRange.end);
2021-08-31 02:07:25 +08:00
// It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of
// the same logical change
2021-10-01 23:08:00 +08:00
if (bmData->workerStats.count(it.value())) {
bmData->workerStats[it.value()].numGranulesAssigned -= 1;
}
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
2021-08-31 02:07:25 +08:00
}
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
}
}
ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, BlobManagerData* bmData) {
Optional<Value> currentLockValue = wait(tr->get(blobManagerEpochKey));
ASSERT(currentLockValue.present());
int64_t currentEpoch = decodeBlobManagerEpochValue(currentLockValue.get());
if (currentEpoch != bmData->epoch) {
ASSERT(currentEpoch > bmData->epoch);
2021-09-14 23:19:15 +08:00
if (BM_DEBUG) {
printf("BM %s found new epoch %d > %d in lock check\n",
bmData->id.toString().c_str(),
currentEpoch,
bmData->epoch);
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
throw granule_assignment_conflict();
}
tr->addReadConflictRange(singleKeyRange(blobManagerEpochKey));
return Void();
}
2021-10-26 00:19:28 +08:00
// FIXME: this does all logic in one transaction. Adding a giant range to an existing database to blobify would
// require doing a ton of storage metrics calls, which we should split up across multiple transactions likely.
2021-08-31 02:07:25 +08:00
ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
loop {
2021-08-31 02:07:25 +08:00
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
if (BM_DEBUG) {
printf("Blob manager checking for range updates\n");
}
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// TODO probably knobs here? This should always be pretty small though
RangeResult results = wait(krmGetRanges(
tr, blobRangeKeys.begin, KeyRange(normalKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
state Arena ar;
ar.dependsOn(results.arena());
VectorRef<KeyRangeRef> rangesToAdd;
VectorRef<KeyRangeRef> rangesToRemove;
2021-08-31 02:07:25 +08:00
updateClientBlobRanges(&bmData->knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove);
for (KeyRangeRef range : rangesToRemove) {
if (BM_DEBUG) {
printf("BM Got range to revoke [%s - %s)\n",
range.begin.printable().c_str(),
range.end.printable().c_str());
}
RangeAssignment ra;
ra.isAssign = false;
ra.keyRange = range;
ra.revoke = RangeRevokeData(true); // dispose=true
bmData->rangesToAssign.send(ra);
}
state std::vector<Future<Standalone<VectorRef<KeyRef>>>> splitFutures;
// Divide new ranges up into equal chunks by using SS byte sample
for (KeyRangeRef range : rangesToAdd) {
2021-08-31 02:07:25 +08:00
// assert that this range contains no currently assigned ranges in this
splitFutures.push_back(splitRange(tr, range));
}
for (auto f : splitFutures) {
Standalone<VectorRef<KeyRef>> splits = wait(f);
if (BM_DEBUG) {
printf("Split client range [%s - %s) into %d ranges:\n",
splits[0].printable().c_str(),
splits[splits.size() - 1].printable().c_str(),
splits.size() - 1);
}
2021-08-31 02:07:25 +08:00
for (int i = 0; i < splits.size() - 1; i++) {
KeyRange range = KeyRange(KeyRangeRef(splits[i], splits[i + 1]));
if (BM_DEBUG) {
printf(" [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
}
2021-08-31 02:07:25 +08:00
RangeAssignment ra;
ra.isAssign = true;
ra.keyRange = range;
2021-09-23 01:46:20 +08:00
ra.assign = RangeAssignmentData(false); // continue=false
bmData->rangesToAssign.send(ra);
}
}
state Future<Void> watchFuture = tr->watch(blobRangeChangeKey);
wait(tr->commit());
if (BM_DEBUG) {
printf("Blob manager done processing client ranges, awaiting update\n");
}
2021-08-31 02:07:25 +08:00
wait(watchFuture);
break;
} catch (Error& e) {
if (BM_DEBUG) {
printf("Blob manager got error looking for range updates %s\n", e.name());
}
wait(tr->onError(e));
}
}
}
}
ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
UID currentWorkerId,
KeyRange granuleRange,
UID granuleID,
Version granuleStartVersion,
Version latestVersion) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
state Standalone<VectorRef<KeyRef>> newRanges;
state int64_t newLockSeqno = -1;
// first get ranges to split
2021-10-18 21:56:47 +08:00
if (newRanges.empty()) {
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, granuleRange));
newRanges = _newRanges;
}
if (newRanges.size() == 2) {
// not large enough to split, just reassign back to worker
if (BM_DEBUG) {
printf("Not splitting existing range [%s - %s). Continuing assignment to %s\n",
granuleRange.begin.printable().c_str(),
granuleRange.end.printable().c_str(),
currentWorkerId.toString().c_str());
}
RangeAssignment raContinue;
raContinue.isAssign = true;
raContinue.worker = currentWorkerId;
raContinue.keyRange = granuleRange;
2021-09-23 01:46:20 +08:00
raContinue.assign = RangeAssignmentData(true); // continue assignment and re-snapshot
bmData->rangesToAssign.send(raContinue);
return Void();
}
// Need to split range. Persist intent to split and split metadata to DB BEFORE sending split requests
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
ASSERT(newRanges.size() >= 2);
// make sure we're still manager when this transaction gets committed
wait(checkManagerLock(tr, bmData));
// acquire lock for old granule to make sure nobody else modifies it
state Key lockKey = blobGranuleLockKeyFor(granuleRange);
Optional<Value> lockValue = wait(tr->get(lockKey));
ASSERT(lockValue.present());
std::tuple<int64_t, int64_t, UID> prevGranuleLock = decodeBlobGranuleLockValue(lockValue.get());
if (std::get<0>(prevGranuleLock) > bmData->epoch) {
2021-09-14 23:19:15 +08:00
if (BM_DEBUG) {
printf("BM %s found a higher epoch %d than %d for granule lock of [%s - %s)\n",
bmData->id.toString().c_str(),
std::get<0>(prevGranuleLock),
bmData->epoch,
granuleRange.begin.printable().c_str(),
granuleRange.end.printable().c_str());
2021-09-14 23:19:15 +08:00
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
if (newLockSeqno == -1) {
newLockSeqno = bmData->seqNo;
bmData->seqNo++;
ASSERT(newLockSeqno > std::get<1>(prevGranuleLock));
} else {
// previous transaction could have succeeded but got commit_unknown_result
ASSERT(newLockSeqno >= std::get<1>(prevGranuleLock));
}
2021-09-23 01:46:20 +08:00
// acquire granule lock so nobody else can make changes to this granule.
tr->set(lockKey, blobGranuleLockValueFor(bmData->epoch, newLockSeqno, std::get<2>(prevGranuleLock)));
// set up split metadata
for (int i = 0; i < newRanges.size() - 1; i++) {
UID newGranuleID = deterministicRandom()->randomUniqueID();
Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID);
tr->atomicOp(splitKey,
2021-09-23 01:46:20 +08:00
blobGranuleSplitValueFor(BlobGranuleSplitState::Started),
MutationRef::SetVersionstampedValue);
Key historyKey = blobGranuleHistoryKeyFor(KeyRangeRef(newRanges[i], newRanges[i + 1]), latestVersion);
Standalone<BlobGranuleHistoryValue> historyValue;
historyValue.granuleID = newGranuleID;
historyValue.parentGranules.push_back(historyValue.arena(),
std::pair(granuleRange, granuleStartVersion));
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
}
2021-09-23 01:46:20 +08:00
wait(tr->commit());
break;
} catch (Error& e) {
if (e.code() == error_code_granule_assignment_conflict) {
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
wait(tr->onError(e));
}
}
if (BM_DEBUG) {
2021-09-24 22:55:37 +08:00
printf("Splitting range [%s - %s) into (%d):\n",
granuleRange.begin.printable().c_str(),
granuleRange.end.printable().c_str(),
2021-09-24 22:55:37 +08:00
newRanges.size() - 1);
for (int i = 0; i < newRanges.size() - 1; i++) {
printf(" [%s - %s)\n", newRanges[i].printable().c_str(), newRanges[i + 1].printable().c_str());
}
}
// transaction committed, send range assignments
// revoke from current worker
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.worker = currentWorkerId;
raRevoke.keyRange = granuleRange;
raRevoke.revoke = RangeRevokeData(false); // not a dispose
bmData->rangesToAssign.send(raRevoke);
for (int i = 0; i < newRanges.size() - 1; i++) {
// reassign new range and do handover of previous range
RangeAssignment raAssignSplit;
raAssignSplit.isAssign = true;
raAssignSplit.keyRange = KeyRangeRef(newRanges[i], newRanges[i + 1]);
2021-09-23 01:46:20 +08:00
raAssignSplit.assign = RangeAssignmentData(false);
// don't care who this range gets assigned to
bmData->rangesToAssign.send(raAssignSplit);
}
return Void();
}
2021-10-22 05:39:38 +08:00
ACTOR Future<Void> deregisterBlobWorker(BlobManagerData* bmData, BlobWorkerInterface interf) {
2021-10-20 23:54:19 +08:00
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
tr->clear(blobWorkerListKey);
wait(tr->commit());
if (BM_DEBUG) {
printf("Deregistered blob worker %s\n", interf.id().toString().c_str());
}
return Void();
} catch (Error& e) {
if (BM_DEBUG) {
printf("Deregistering blob worker %s got error %s\n", interf.id().toString().c_str(), e.name());
}
wait(tr->onError(e));
}
}
}
2021-10-22 05:39:38 +08:00
ACTOR Future<Void> killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
2021-10-01 23:08:00 +08:00
UID bwId = bwInterf.id();
// Remove blob worker from stats map so that when we try to find a worker to takeover the range,
// the one we just killed isn't considered.
2021-10-13 04:36:05 +08:00
// Remove it from workersById also since otherwise that worker addr will remain excluded
2021-10-01 23:08:00 +08:00
// when we try to recruit new blob workers.
bmData->workerStats.erase(bwId);
bmData->workersById.erase(bwId);
2021-10-22 05:39:38 +08:00
Future<Void> deregister = deregisterBlobWorker(bmData, bwInterf);
bmData->restartRecruiting.trigger();
2021-10-01 23:08:00 +08:00
// for every range owned by this blob worker, we want to
2021-10-13 04:36:05 +08:00
// - send a revoke request for that range
2021-10-01 23:08:00 +08:00
// - add the range back to the stream of ranges to be assigned
2021-10-13 04:36:05 +08:00
if (BM_DEBUG) {
printf("Taking back ranges from BW %s\n", bwId.toString().c_str());
}
2021-10-01 23:08:00 +08:00
for (auto& it : bmData->workerAssignments.ranges()) {
if (it.cvalue() == bwId) {
2021-10-13 04:36:05 +08:00
// Send revoke request
2021-10-01 23:08:00 +08:00
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.keyRange = it.range();
raRevoke.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(raRevoke);
// Add range back into the stream of ranges to be assigned
RangeAssignment raAssign;
raAssign.isAssign = true;
raAssign.worker = Optional<UID>();
raAssign.keyRange = it.range();
raAssign.assign = RangeAssignmentData(false); // not a continue
bmData->rangesToAssign.send(raAssign);
}
}
// Send halt to blob worker, with no expectation of hearing back
2021-10-13 04:36:05 +08:00
if (BM_DEBUG) {
printf("Sending halt to BW %s\n", bwId.toString().c_str());
}
2021-10-01 23:08:00 +08:00
bmData->addActor.send(
brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
2021-10-20 23:54:19 +08:00
2021-10-22 05:39:38 +08:00
wait(deregister);
return Void();
2021-10-01 23:08:00 +08:00
}
ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
// outer loop handles reconstructing stream if it got a retryable error
loop {
try {
state ReplyPromiseStream<GranuleStatusReply> statusStream =
bwInterf.granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
// read from stream until worker fails (should never get explicit end_of_stream)
loop {
GranuleStatusReply rep = waitNext(statusStream.getFuture());
2021-10-01 23:08:00 +08:00
if (BM_DEBUG) {
printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
bwInterf.id().toString().c_str(),
rep.doSplit ? "split" : "");
}
if (rep.epoch > bmData->epoch) {
if (BM_DEBUG) {
2021-10-01 23:08:00 +08:00
printf("BM heard from BW %s that there is a new manager with higher epoch\n",
bwInterf.id().toString().c_str());
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
}
// TODO maybe this won't be true eventually, but right now the only time the blob worker reports back is
// to split the range.
ASSERT(rep.doSplit);
2021-10-13 04:36:05 +08:00
// only evaluate for split if this worker currently owns the granule in this blob manager's mapping
2021-10-01 23:08:00 +08:00
auto currGranuleAssignment = bmData->workerAssignments.rangeContaining(rep.granuleRange.begin);
if (!(currGranuleAssignment.begin() == rep.granuleRange.begin &&
currGranuleAssignment.end() == rep.granuleRange.end &&
currGranuleAssignment.cvalue() == bwInterf.id())) {
continue;
}
auto lastReqForGranule = lastSeenSeqno.rangeContaining(rep.granuleRange.begin);
if (rep.granuleRange.begin == lastReqForGranule.begin() &&
rep.granuleRange.end == lastReqForGranule.end() && rep.epoch == lastReqForGranule.value().first &&
rep.seqno == lastReqForGranule.value().second) {
if (BM_DEBUG) {
printf("Manager %lld received repeat status for the same granule [%s - %s) @ %lld, ignoring.",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str());
}
} else {
if (BM_DEBUG) {
printf("Manager %lld evaluating [%s - %s) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str());
}
lastSeenSeqno.insert(rep.granuleRange, std::pair(rep.epoch, rep.seqno));
bmData->addActor.send(maybeSplitRange(
bmData, bwInterf.id(), rep.granuleRange, rep.granuleID, rep.startVersion, rep.latestVersion));
}
}
} catch (Error& e) {
2021-10-22 05:39:38 +08:00
printf("BM %s got error %s while monitoring BW %s---------------\n",
bmData->id.toString().c_str(),
e.name(),
bwInterf.id().toString().c_str());
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// if we got an error constructing or reading from stream that is retryable, wait and retry.
ASSERT(e.code() != error_code_end_of_stream);
if (e.code() == error_code_connection_failed || e.code() == error_code_request_maybe_delivered) {
// FIXME: this could throw connection_failed and we could handle catch this the same as the failure
// detection triggering
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
continue;
} else {
if (BM_DEBUG) {
printf("BM got unexpected error %s monitoring BW %s status\n",
e.name(),
bwInterf.id().toString().c_str());
}
// TODO change back from SevError?
TraceEvent(SevError, "BWStatusMonitoringFailed", bmData->id)
.detail("BlobWorkerID", bwInterf.id())
.error(e);
throw e;
}
}
}
}
ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
try {
2021-10-22 05:39:38 +08:00
printf("adding waitFailure for BW %s\n", bwInterf.id().toString().c_str());
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
choose {
when(wait(waitFailure)) {
if (BM_DEBUG) {
printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str());
}
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
}
when(wait(monitorStatus)) {
ASSERT(false);
throw internal_error();
}
}
} catch (Error& e) {
2021-10-20 23:54:19 +08:00
// will blob worker get cleaned up in this case?
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// FIXME: forward errors somewhere from here
if (BM_DEBUG) {
printf("BM got unexpected error %s monitoring BW %s\n", e.name(), bwInterf.id().toString().c_str());
}
// TODO change back from SevError?
TraceEvent(SevError, "BWMonitoringFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()).error(e);
throw e;
}
2021-10-01 23:08:00 +08:00
2021-10-13 04:36:05 +08:00
// kill the blob worker
2021-10-22 05:39:38 +08:00
wait(killBlobWorker(bmData, bwInterf));
2021-10-13 04:36:05 +08:00
2021-10-01 23:08:00 +08:00
// Trigger recruitment for a new blob worker
2021-10-13 04:36:05 +08:00
if (BM_DEBUG) {
printf("Restarting recruitment to replace dead BW %s\n", bwInterf.id().toString().c_str());
}
2021-10-22 05:39:38 +08:00
// bmData->restartRecruiting.trigger();
2021-10-01 23:08:00 +08:00
2021-10-13 04:36:05 +08:00
if (BM_DEBUG) {
printf("No longer monitoring BW %s\n", bwInterf.id().toString().c_str());
}
2021-10-01 23:08:00 +08:00
return Void();
}
2021-10-20 23:54:19 +08:00
ACTOR Future<Void> ackExistingBlobWorkers(BlobManagerData* bmData) {
2021-10-22 05:39:38 +08:00
printf("acking\n");
2021-10-20 23:54:19 +08:00
// get list of last known blob workers
// note: the list will include every blob worker that the old manager knew about
// but it might also contain blob workers that died while the new manager was being recruited
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
// add all blob workers to this new blob manager's records
for (auto worker : blobWorkers) {
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
// if the worker died when the new BM was being recruited, the monitor will fail and we
// will clean up the dead blob worker (and recruit new ones in place)
2021-10-22 05:39:38 +08:00
printf("adding monitor for BW %s\n", worker.id().toString().c_str());
2021-10-20 23:54:19 +08:00
bmData->addActor.send(monitorBlobWorker(bmData, worker));
}
2021-10-22 05:39:38 +08:00
bmData->startRecruiting.trigger();
2021-10-20 23:54:19 +08:00
2021-10-22 05:39:38 +08:00
// TODO: is there anyway we can guarantee the monitor ran so that we have a complete set of alive BWs?
// A not so great way of doing this is timeouts
2021-10-20 23:54:19 +08:00
// At this point, bmData->workersById is a complete list of blob workers
2021-10-22 05:39:38 +08:00
printf("after adding monitors\n");
2021-10-20 23:54:19 +08:00
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
2021-10-22 05:39:38 +08:00
// If blob manager dies, and while another is being recruited, a blob worker dies, we need to handle sending
2021-10-20 23:54:19 +08:00
// the ranges for the dead blob worker to new workers. For every range persisted in the DB, send an assign request
// to the worker that it was intended for. If the worker indeed had it, its a noop. If the worker didn't have it,
// it'll persist it. If the worker is dead, i.e. it died while a new BM was being recruited, (easy check: just check
// if that worker is still a worker that we know about via getBlobWorkers), let other workers take over its ranges.
//
// TODO: confirm the below are solved now with the new way of checking BWs liveness.
// Problem: when a blob worker dies while a blob manager is being recruited, we wouldn't recruit new BW to replace
// that one, even though we should have. Easy fix: when going through the list of persisted blob workers
// assignments, if any of them are dead, just recruit a new one. Problem: if the blob worker that died didn't have
// any assignments, it wouldn't be in this list, and so we wouldn't rerecruit for it.
// 1. Get existing assignments using blobGranuleMappingKeys.begin
// 2. Get split intentions using blobGranuleSplitKeys
// 3. Start sending assign requests to blob workers
//
//
// Cases to consider:
// - Blob Manager revokes and re-assigns a range but only the revoke went through before BM died. Then,
// granuleMappingKeys still has G->oldBW so
// the worst case here is that we end up giving G back to the oldBW
// - Blob Manager revokes and re-assigns a range but neither went through before BM died. Same as above.
// - While the BM is recovering, a BW dies. Then, when the BM comes back, it seems granuleMappings of the form
// G->deadBW. Since
// we have a complete list of alive BWs by now, we can just reassign these G's to the next best worker
// - BM persisted intent to split ranges but died before sending them. We just have to re-assign these. If any of
// the new workers died, then
// it's one of the cases above.
//
// We have to think about how to recreate bmData->granuleAssignments and the order in which to send these assigns.
// If we apply all of granuleMappingKeys in a keyrangemap and then apply the splits over it, I think that works.
// We can't just send assigns for granuleMappingKeys because entries there might be dated.
//
// We need a special type of assign req (something like continue) that if fails, doesn't try to find a different
// worker. The reason is that if we can guarantee the worker was alive by the time it got the req, then if the req
// failed,
//
state RangeResult blobGranuleMappings;
state RangeResult blobGranuleSplits;
// get assignments
loop {
try {
2021-10-22 05:39:38 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// tr->reset();
2021-10-20 23:54:19 +08:00
wait(checkManagerLock(tr, bmData));
2021-10-22 05:39:38 +08:00
RangeResult _results = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, KeyRange(normalKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
2021-10-20 23:54:19 +08:00
blobGranuleMappings = _results;
if (blobGranuleMappings.more) {
2021-10-22 05:39:38 +08:00
// TODO: accumulate resutls
2021-10-20 23:54:19 +08:00
}
2021-10-22 05:39:38 +08:00
wait(tr->commit()); // TODO: don't need commit
break;
2021-10-20 23:54:19 +08:00
} catch (Error& e) {
wait(tr->onError(e));
}
}
2021-10-22 05:39:38 +08:00
for (int rangeIdx = 0; rangeIdx < blobGranuleMappings.size() - 1; rangeIdx++) {
Key granuleStartKey = blobGranuleMappings[rangeIdx].key;
Key granuleEndKey = blobGranuleMappings[rangeIdx + 1].key;
if (blobGranuleMappings[rangeIdx].value.size()) {
UID existingOwner = decodeBlobGranuleMappingValue(blobGranuleMappings[rangeIdx].value);
UID newOwner = bmData->workersById.count(existingOwner) ? existingOwner : UID();
printf("about to insert [%s-%s] into workerassignments\n",
granuleStartKey.printable().c_str(),
granuleStartKey.printable().c_str());
bmData->workerAssignments.insert(KeyRangeRef(granuleStartKey, granuleEndKey), newOwner);
}
}
2021-10-20 23:54:19 +08:00
// get splits
2021-10-22 05:39:38 +08:00
tr->reset();
2021-10-20 23:54:19 +08:00
loop {
try {
2021-10-22 05:39:38 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
2021-10-20 23:54:19 +08:00
wait(checkManagerLock(tr, bmData));
2021-10-22 05:39:38 +08:00
// tr->reset();
RangeResult _results =
wait(tr->getRange(KeyRangeRef(blobGranuleSplitKeys.begin, blobGranuleSplitKeys.end), 10000));
blobGranuleSplits = _results;
if (blobGranuleSplits.more) {
// TODO: accumulate resutls
2021-10-20 23:54:19 +08:00
}
2021-10-22 05:39:38 +08:00
wait(tr->commit()); // don't need commit
break;
2021-10-20 23:54:19 +08:00
} catch (Error& e) {
wait(tr->onError(e));
}
}
2021-10-22 05:39:38 +08:00
for (auto split : blobGranuleSplits) {
UID parentGranuleID, granuleID; //
BlobGranuleSplitState splitState; // enum
Version version; // version at which split happened
if (split.expectedSize() == 0) {
continue;
}
std::tie(parentGranuleID, granuleID) = decodeBlobGranuleSplitKey(split.key);
std::tie(splitState, version) = decodeBlobGranuleSplitValue(split.value);
const KeyRange range = blobGranuleSplitKeyRangeFor(parentGranuleID);
UID owner = UID();
// TODO: need actual owner otherwise retry logic will be messed up. we might have sent the
// req to a worker who didn't finish it yet and so the assignment is not in granuleAssignments
// and if we try to assign to someone else, there will be two workers owning the same range which is
// problematic. Is this problematic though? One of the seqno's will becomes stale!
// bmData->workerAssignments.insert(range, UID());
if (splitState <= BlobGranuleSplitState::Started) {
printf("about to insert [%s-%s] into workerassignments\n",
range.begin.printable().c_str(),
range.end.printable().c_str());
bmData->workerAssignments.insert(range, UID());
}
2021-10-20 23:54:19 +08:00
}
2021-10-22 05:39:38 +08:00
if (bmData->workerAssignments.size() == 1) {
return Void();
}
2021-10-20 23:54:19 +08:00
2021-10-22 05:39:38 +08:00
for (auto& range : bmData->workerAssignments.ranges()) {
printf("assigning range [%s-%s]\n",
range.range().begin.printable().c_str(),
range.range().end.printable().c_str());
// what should we do if we get here and the worker was alive, but between now and the assignment, the
// worker dies. what's the retry logic going to be?
RangeAssignment raAssign;
raAssign.isAssign = true;
raAssign.worker = range.value();
raAssign.keyRange = range.range();
raAssign.assign = RangeAssignmentData(false, true); // special assignment
bmData->rangesToAssign.send(raAssign);
}
2021-10-20 23:54:19 +08:00
return Void();
}
2021-10-15 07:25:34 +08:00
ACTOR Future<Void> chaosRangeMover(BlobManagerData* bmData) {
2021-09-25 23:30:27 +08:00
ASSERT(g_network->isSimulated());
2021-08-31 02:59:53 +08:00
loop {
wait(delay(30.0));
2021-09-25 23:30:27 +08:00
if (g_simulator.speedUpSimulation) {
if (BM_DEBUG) {
printf("Range mover stopping\n");
}
return Void();
}
2021-08-31 02:59:53 +08:00
if (bmData->workersById.size() > 1) {
int tries = 10;
while (tries > 0) {
tries--;
auto randomRange = bmData->workerAssignments.randomRange();
if (randomRange.value() != UID()) {
if (BM_DEBUG) {
printf("Range mover moving range [%s - %s): %s\n",
randomRange.begin().printable().c_str(),
randomRange.end().printable().c_str(),
randomRange.value().toString().c_str());
}
2021-08-31 02:59:53 +08:00
2021-10-22 05:39:38 +08:00
// FIXME: with low probability, could immediately revoke it from the new assignment and move
// it back right after to test that race
2021-09-24 22:55:37 +08:00
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.keyRange = randomRange.range();
revokeOld.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(revokeOld);
RangeAssignment assignNew;
assignNew.isAssign = true;
assignNew.keyRange = randomRange.range();
2021-09-23 01:46:20 +08:00
assignNew.assign = RangeAssignmentData(false); // not a continue
bmData->rangesToAssign.send(assignNew);
2021-08-31 02:59:53 +08:00
break;
}
}
if (tries == 0 && BM_DEBUG) {
2021-09-23 01:46:20 +08:00
printf("Range mover couldn't find random range to move, skipping\n");
2021-08-31 02:59:53 +08:00
}
} else if (BM_DEBUG) {
2021-08-31 02:59:53 +08:00
printf("Range mover found %d workers, skipping\n", bmData->workerAssignments.size());
}
}
}
2021-09-29 07:15:32 +08:00
// Returns the number of blob workers on addr
int numExistingBWOnAddr(BlobManagerData* self, const AddressExclusion& addr) {
int numExistingBW = 0;
for (auto& server : self->workersById) {
const NetworkAddress& netAddr = server.second.stableAddress();
AddressExclusion usedAddr(netAddr.ip, netAddr.port);
if (usedAddr == addr) {
++numExistingBW;
}
}
return numExistingBW;
}
2021-09-29 07:15:32 +08:00
// Tries to recruit a blob worker on the candidateWorker process
ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorkerReply candidateWorker) {
const NetworkAddress& netAddr = candidateWorker.worker.stableAddress();
AddressExclusion workerAddr(netAddr.ip, netAddr.port);
2021-10-22 05:39:38 +08:00
self->recruitingStream.set(self->recruitingStream.get() + 1);
// Ask the candidateWorker to initialize a BW only if the worker does not have a pending request
if (numExistingBWOnAddr(self, workerAddr) == 0 &&
self->recruitingLocalities.count(candidateWorker.worker.stableAddress()) == 0) {
state UID interfaceId = deterministicRandom()->randomUniqueID();
state InitializeBlobWorkerRequest initReq;
initReq.reqId = deterministicRandom()->randomUniqueID();
initReq.interfaceId = interfaceId;
2021-09-29 07:15:32 +08:00
// acknowledge that this worker is currently being recruited on
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());
TraceEvent("BMRecruiting")
.detail("State", "Sending request to worker")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address());
2021-09-29 07:15:32 +08:00
// send initialization request to worker (i.e. worker.actor.cpp)
// here, the worker will construct the blob worker at which point the BW will start!
Future<ErrorOr<InitializeBlobWorkerReply>> fRecruit =
candidateWorker.worker.blobWorker.tryGetReply(initReq, TaskPriority::BlobManager);
2021-09-29 07:15:32 +08:00
// wait on the reply to the request
state ErrorOr<InitializeBlobWorkerReply> newBlobWorker = wait(fRecruit);
2021-09-29 07:15:32 +08:00
// if the initialization failed in an unexpected way, then kill the BM.
// if it failed in an expected way, add some delay before we try to recruit again
// on this worker
if (newBlobWorker.isError()) {
TraceEvent(SevWarn, "BMRecruitmentError").error(newBlobWorker.getError());
if (!newBlobWorker.isError(error_code_recruitment_failed) &&
!newBlobWorker.isError(error_code_request_maybe_delivered)) {
throw newBlobWorker.getError();
}
wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::BlobManager));
}
2021-09-29 07:15:32 +08:00
// if the initialization succeeded, add the blob worker's interface to
// the blob manager's data and start monitoring the blob worker
if (newBlobWorker.present()) {
BlobWorkerInterface bwi = newBlobWorker.get().interf;
2021-10-22 05:39:38 +08:00
printf("Adding worker %s to BM Records\n", bwi.id().toString().c_str());
2021-10-01 23:08:00 +08:00
self->workersById[bwi.id()] = bwi;
self->workerStats[bwi.id()] = BlobWorkerStats();
self->addActor.send(monitorBlobWorker(self, bwi));
TraceEvent("BMRecruiting")
.detail("State", "Finished request")
.detail("WorkerID", candidateWorker.worker.id())
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
.detail("Interf", interfaceId)
.detail("Addr", candidateWorker.worker.address());
}
2021-09-29 07:15:32 +08:00
// acknowledge that this worker is not actively being recruited on anymore.
// if the initialization did succeed, then this worker will still be excluded
// since it was added to workersById.
self->recruitingLocalities.erase(candidateWorker.worker.stableAddress());
}
// try to recruit more blob workers
2021-10-22 05:39:38 +08:00
self->recruitingStream.set(self->recruitingStream.get() - 1);
self->restartRecruiting.trigger();
return Void();
}
2021-09-29 07:15:32 +08:00
// Recruits blob workers in a loop
ACTOR Future<Void> blobWorkerRecruiter(
BlobManagerData* self,
Reference<IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>> recruitBlobWorker) {
state Future<RecruitBlobWorkerReply> fCandidateWorker;
state RecruitBlobWorkerRequest lastRequest;
2021-10-22 05:39:38 +08:00
loop choose {
when(wait(self->startRecruiting.onTrigger())) { break; }
}
loop {
try {
state RecruitBlobWorkerRequest recruitReq;
2021-09-29 07:15:32 +08:00
// workers that are used by existing blob workers should be excluded
for (auto const& [bwId, bwInterf] : self->workersById) {
auto addr = bwInterf.stableAddress();
AddressExclusion addrExcl(addr.ip, addr.port);
recruitReq.excludeAddresses.emplace_back(addrExcl);
}
2021-09-29 07:15:32 +08:00
// workers that are used by blob workers that are currently being recruited should be excluded
for (auto addr : self->recruitingLocalities) {
recruitReq.excludeAddresses.emplace_back(AddressExclusion(addr.ip, addr.port));
}
2021-10-22 05:39:38 +08:00
printf("Recruiting now. Excluding: \n");
for (auto addr : recruitReq.excludeAddresses) {
printf(" - %s\n", addr.toString().c_str());
}
TraceEvent("BMRecruiting").detail("State", "Sending request to CC");
if (!fCandidateWorker.isValid() || fCandidateWorker.isReady() ||
recruitReq.excludeAddresses != lastRequest.excludeAddresses) {
lastRequest = recruitReq;
2021-09-29 07:15:32 +08:00
// send req to cluster controller to get back a candidate worker we can recruit on
fCandidateWorker =
brokenPromiseToNever(recruitBlobWorker->get().getReply(recruitReq, TaskPriority::BlobManager));
}
choose {
2021-10-22 05:39:38 +08:00
// when we get back a worker we can use, we will try to initialize a blob worker onto that
// process
when(RecruitBlobWorkerReply candidateWorker = wait(fCandidateWorker)) {
2021-10-22 05:39:38 +08:00
printf("About to initialize BW\n");
self->addActor.send(initializeBlobWorker(self, candidateWorker));
}
2021-09-29 07:15:32 +08:00
// when the CC changes, so does the request stream so we need to restart recruiting here
when(wait(recruitBlobWorker->onChange())) { fCandidateWorker = Future<RecruitBlobWorkerReply>(); }
2021-09-29 07:15:32 +08:00
// signal used to restart the loop and try to recruit the next blob worker
2021-10-13 04:36:05 +08:00
when(wait(self->restartRecruiting.onTrigger())) {}
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager));
} catch (Error& e) {
if (e.code() != error_code_timed_out) {
throw;
}
TEST(true); // Blob worker recruitment timed out
}
}
}
2021-09-15 23:35:58 +08:00
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) {
2021-08-31 02:07:25 +08:00
state BlobManagerData self(deterministicRandom()->randomUniqueID(),
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> collection = actorCollection(self.addActor.getFuture());
2021-08-31 02:07:25 +08:00
if (BM_DEBUG) {
printf("Blob manager starting...\n");
}
2021-09-15 23:35:58 +08:00
self.epoch = epoch;
// make sure the epoch hasn't gotten stale
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self.db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
wait(checkManagerLock(tr, &self));
} catch (Error& e) {
if (BM_DEBUG) {
printf("Blob manager lock check got unexpected error %s. Dying...\n", e.name());
2021-09-15 23:35:58 +08:00
}
return Void();
2021-09-15 23:35:58 +08:00
}
if (BM_DEBUG) {
2021-09-15 23:35:58 +08:00
printf("Blob manager acquired lock at epoch %lld\n", epoch);
}
2021-08-31 02:07:25 +08:00
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });
2021-08-31 02:07:25 +08:00
2021-10-22 05:39:38 +08:00
self.addActor.send(blobWorkerRecruiter(&self, recruitBlobWorker));
2021-10-20 23:54:19 +08:00
// we need to acknowledge existing blob workers before recruiting any new ones
wait(ackExistingBlobWorkers(&self));
2021-10-22 05:39:38 +08:00
// self.addActor.send(blobWorkerRecruiter(&self, recruitBlobWorker));
self.addActor.send(monitorClientRanges(&self));
self.addActor.send(rangeAssigner(&self));
2021-09-23 01:46:20 +08:00
if (BUGGIFY) {
2021-10-15 07:25:34 +08:00
self.addActor.send(chaosRangeMover(&self));
2021-09-23 01:46:20 +08:00
}
2021-08-31 02:07:25 +08:00
// TODO probably other things here eventually
try {
loop choose {
when(wait(self.iAmReplaced.getFuture())) {
if (BM_DEBUG) {
printf("Blob Manager exiting because it is replaced\n");
}
break;
}
when(HaltBlobManagerRequest req = waitNext(bmInterf.haltBlobManager.getFuture())) {
req.reply.send(Void());
TraceEvent("BlobManagerHalted", bmInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(wait(collection)) {
TraceEvent("BlobManagerActorCollectionError");
ASSERT(false);
throw internal_error();
}
2021-08-31 02:07:25 +08:00
}
} catch (Error& err) {
TraceEvent("BlobManagerDied", bmInterf.id()).error(err, true);
2021-08-31 02:07:25 +08:00
}
return Void();
2021-08-31 02:07:25 +08:00
}
// Test:
// start empty
// DB has [A - B). That should show up in knownBlobRanges and should be in added
// DB has nothing. knownBlobRanges should be empty and [A - B) should be in removed
// DB has [A - B) and [C - D). They should both show up in knownBlobRanges and added.
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [B - C) should be in added.
// DB has [A - C). It should show up coalesced in knownBlobRanges, and [C - D) should be in removed.
// DB has [B - C). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
// DB has [B - D). It should show up coalesced in knownBlobRanges, and [C - D) should be removed.
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
2021-10-22 05:39:38 +08:00
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D)
// should be in removed.
TEST_CASE("/blobmanager/updateranges") {
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
Arena ar;
VectorRef<KeyRangeRef> added;
VectorRef<KeyRangeRef> removed;
StringRef active = LiteralStringRef("1");
StringRef inactive = StringRef();
RangeResult dbDataEmpty;
2021-10-14 05:26:44 +08:00
std::vector<std::pair<KeyRangeRef, bool>> kbrRanges;
StringRef keyA = StringRef(ar, LiteralStringRef("A"));
StringRef keyB = StringRef(ar, LiteralStringRef("B"));
StringRef keyC = StringRef(ar, LiteralStringRef("C"));
StringRef keyD = StringRef(ar, LiteralStringRef("D"));
// db data setup
RangeResult dbDataAB;
dbDataAB.emplace_back(ar, keyA, active);
dbDataAB.emplace_back(ar, keyB, inactive);
RangeResult dbDataAC;
dbDataAC.emplace_back(ar, keyA, active);
dbDataAC.emplace_back(ar, keyC, inactive);
RangeResult dbDataAD;
dbDataAD.emplace_back(ar, keyA, active);
dbDataAD.emplace_back(ar, keyD, inactive);
RangeResult dbDataBC;
dbDataBC.emplace_back(ar, keyB, active);
dbDataBC.emplace_back(ar, keyC, inactive);
RangeResult dbDataBD;
dbDataBD.emplace_back(ar, keyB, active);
dbDataBD.emplace_back(ar, keyD, inactive);
RangeResult dbDataCD;
dbDataCD.emplace_back(ar, keyC, active);
dbDataCD.emplace_back(ar, keyD, inactive);
RangeResult dbDataAB_CD;
dbDataAB_CD.emplace_back(ar, keyA, active);
dbDataAB_CD.emplace_back(ar, keyB, inactive);
dbDataAB_CD.emplace_back(ar, keyC, active);
dbDataAB_CD.emplace_back(ar, keyD, inactive);
// key ranges setup
KeyRangeRef rangeAB = KeyRangeRef(keyA, keyB);
KeyRangeRef rangeAC = KeyRangeRef(keyA, keyC);
KeyRangeRef rangeAD = KeyRangeRef(keyA, keyD);
KeyRangeRef rangeBC = KeyRangeRef(keyB, keyC);
KeyRangeRef rangeBD = KeyRangeRef(keyB, keyD);
KeyRangeRef rangeCD = KeyRangeRef(keyC, keyD);
KeyRangeRef rangeStartToA = KeyRangeRef(normalKeys.begin, keyA);
KeyRangeRef rangeStartToB = KeyRangeRef(normalKeys.begin, keyB);
KeyRangeRef rangeStartToC = KeyRangeRef(normalKeys.begin, keyC);
KeyRangeRef rangeBToEnd = KeyRangeRef(keyB, normalKeys.end);
KeyRangeRef rangeCToEnd = KeyRangeRef(keyC, normalKeys.end);
KeyRangeRef rangeDToEnd = KeyRangeRef(keyD, normalKeys.end);
// actual test
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 1);
ASSERT(kbrRanges[0].first == normalKeys);
ASSERT(!kbrRanges[0].second);
// DB has [A - B)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBToEnd);
ASSERT(!kbrRanges[2].second);
// DB has nothing
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataEmpty, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeAB);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges[0].first == normalKeys);
ASSERT(!kbrRanges[0].second);
// DB has [A - B) and [C - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 2);
ASSERT(added[0] == rangeAB);
ASSERT(added[1] == rangeCD);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 5);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBC);
ASSERT(!kbrRanges[2].second);
ASSERT(kbrRanges[3].first == rangeCD);
ASSERT(kbrRanges[3].second);
ASSERT(kbrRanges[4].first == rangeDToEnd);
ASSERT(!kbrRanges[4].second);
// DB has [A - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAC, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeCD);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [B - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeAB);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [B - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeCD);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - B) and [C - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeBC);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 5);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBC);
ASSERT(!kbrRanges[2].second);
ASSERT(kbrRanges[3].first == rangeCD);
ASSERT(kbrRanges[3].second);
ASSERT(kbrRanges[4].first == rangeDToEnd);
ASSERT(!kbrRanges[4].second);
// DB has [B - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);
ASSERT(removed.size() == 2);
ASSERT(removed[0] == rangeAB);
ASSERT(removed[1] == rangeCD);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
return Void();
}