Added mechanism for blob manager to poll blob workers for their granule assignments, and used that to improve manager recovery
This commit is contained in:
parent
25a0d857fa
commit
d0113a6776
|
@ -34,6 +34,7 @@ struct BlobWorkerInterface {
|
|||
RequestStream<struct BlobGranuleFileRequest> blobGranuleFileRequest;
|
||||
RequestStream<struct AssignBlobRangeRequest> assignBlobRangeRequest;
|
||||
RequestStream<struct RevokeBlobRangeRequest> revokeBlobRangeRequest;
|
||||
RequestStream<struct GetGranuleAssignmentsRequest> granuleAssignmentsRequest;
|
||||
RequestStream<struct GranuleStatusStreamRequest> granuleStatusStreamRequest;
|
||||
RequestStream<struct HaltBlobWorkerRequest> haltBlobWorker;
|
||||
|
||||
|
@ -58,6 +59,7 @@ struct BlobWorkerInterface {
|
|||
blobGranuleFileRequest,
|
||||
assignBlobRangeRequest,
|
||||
revokeBlobRangeRequest,
|
||||
granuleAssignmentsRequest,
|
||||
granuleStatusStreamRequest,
|
||||
haltBlobWorker,
|
||||
locality,
|
||||
|
@ -116,6 +118,7 @@ struct RevokeBlobRangeRequest {
|
|||
* Reassign: when a new blob manager takes over, it sends Reassign requests to workers to redistribute granules
|
||||
* Normal: Neither continue nor reassign
|
||||
*/
|
||||
// TODO REMOVE reassign now!
|
||||
enum AssignRequestType { Normal = 0, Continue = 1, Reassign = 2 };
|
||||
|
||||
struct AssignBlobRangeRequest {
|
||||
|
@ -213,4 +216,42 @@ struct HaltBlobWorkerRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct GranuleAssignmentRef {
|
||||
KeyRangeRef range;
|
||||
int64_t epochAssigned;
|
||||
int64_t seqnoAssigned;
|
||||
|
||||
GranuleAssignmentRef() {}
|
||||
|
||||
explicit GranuleAssignmentRef(KeyRangeRef range, int64_t epochAssigned, int64_t seqnoAssigned)
|
||||
: range(range), epochAssigned(epochAssigned), seqnoAssigned(seqnoAssigned) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, range, epochAssigned, seqnoAssigned);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetGranuleAssignmentsReply {
|
||||
constexpr static FileIdentifier file_identifier = 9191718;
|
||||
Arena arena;
|
||||
VectorRef<GranuleAssignmentRef> assignments;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, assignments, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetGranuleAssignmentsRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4121494;
|
||||
int64_t managerEpoch;
|
||||
ReplyPromise<GetGranuleAssignmentsReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, managerEpoch, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -1326,19 +1326,79 @@ ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promis
|
|||
}
|
||||
}
|
||||
|
||||
// Shared code for handling KeyRangeMap<tuple(UID, epoch, seqno)> that is used several places in blob manager recovery
|
||||
// when there can be conflicting sources of what assignments exist or which workers owns a granule.
|
||||
// Resolves these conflicts by comparing the epoch + seqno for the range
|
||||
// Special epoch/seqnos:
|
||||
// (0,0): range is not mapped
|
||||
// (0,1): range is mapped, but worker is unknown
|
||||
static void addAssignment(KeyRangeMap<std::tuple<UID, int64_t, int64_t>>& map,
|
||||
const KeyRangeRef& newRange,
|
||||
UID newId,
|
||||
int64_t newEpoch,
|
||||
int64_t newSeqno,
|
||||
std::vector<std::pair<UID, KeyRange>>* outOfDate = nullptr) {
|
||||
std::vector<std::pair<KeyRange, std::tuple<UID, int64_t, int64_t>>> newer;
|
||||
auto intersecting = map.intersectingRanges(newRange);
|
||||
bool allNewer = true;
|
||||
for (auto& old : intersecting) {
|
||||
UID oldWorker = std::get<0>(old.value());
|
||||
int64_t oldEpoch = std::get<1>(old.value());
|
||||
int64_t oldSeqno = std::get<2>(old.value());
|
||||
if (oldEpoch > newEpoch || (oldEpoch == newEpoch && oldSeqno > newSeqno)) {
|
||||
newer.push_back(std::pair(old.range(), std::tuple(oldWorker, oldEpoch, oldSeqno)));
|
||||
} else {
|
||||
allNewer = false;
|
||||
if (newId != UID()) {
|
||||
// different workers can't have same epoch and seqno for granule assignment
|
||||
ASSERT(oldEpoch != newEpoch || oldSeqno != newSeqno);
|
||||
}
|
||||
if (outOfDate != nullptr && oldEpoch > 0) {
|
||||
outOfDate->push_back(std::pair(oldWorker, old.range()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!allNewer) {
|
||||
// if this range supercedes an old range insert it over that
|
||||
map.insert(newRange, std::tuple(newId, newEpoch, newSeqno));
|
||||
|
||||
// then, if there were any ranges superceded by this one, insert them over this one
|
||||
if (newer.size()) {
|
||||
if (outOfDate != nullptr) {
|
||||
outOfDate->push_back(std::pair(newId, newRange));
|
||||
}
|
||||
for (auto& it : newer) {
|
||||
map.insert(it.first, it.second);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (outOfDate != nullptr) {
|
||||
outOfDate->push_back(std::pair(newId, newRange));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
||||
state Promise<Void> workerListReady;
|
||||
bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady));
|
||||
wait(workerListReady.getFuture());
|
||||
|
||||
state std::vector<BlobWorkerInterface> startingWorkers;
|
||||
for (auto& it : bmData->workersById) {
|
||||
startingWorkers.push_back(it.second);
|
||||
}
|
||||
|
||||
// Once we acknowledge the existing blob workers, we can go ahead and recruit new ones
|
||||
bmData->startRecruiting.trigger();
|
||||
|
||||
// skip them rest of the algorithm for the first blob manager
|
||||
// skip the rest of the algorithm for the first blob manager
|
||||
if (bmData->epoch == 1) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
wait(delay(0));
|
||||
|
||||
// At this point, bmData->workersById is a list of all alive blob workers, but could also include some dead BWs.
|
||||
// The algorithm below works as follows:
|
||||
// 1. We get the ongoing split boundaries to construct the set of granules we should have. For these splits, we
|
||||
|
@ -1347,26 +1407,21 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
// intended to give a splitted range to dies before the new BM recovers, then we'll simply assign the range to
|
||||
// the next best worker.
|
||||
//
|
||||
// 2. We get the existing granule mappings that were persisted by blob workers who were assigned ranges and
|
||||
// add them to bmData->granuleAssignments, which is a key range map.
|
||||
// Details: re-assignments might have happened between the time the mapping was last updated and now.
|
||||
// For example, suppose a blob manager sends requests to the range assigner stream to move a granule G.
|
||||
// However, before sending those requests off to the workers, the BM dies. So the persisting mapping
|
||||
// still has G->oldWorker. The following algorithm will re-assign G to oldWorker (as long as it is also still
|
||||
// alive). Note that this is fine because it simply means that the range was not moved optimally, but it is
|
||||
// still owned. In the above case, even if the revoke goes through, since we don't update the mapping during
|
||||
// revokes, this is the same as the case above. Another case to consider is when a blob worker dies when the
|
||||
// BM is recovering. Now the mapping at this time looks like G->deadBW. But the rangeAssigner handles this:
|
||||
// we'll try to assign a range to a dead worker and fail and reassign it to the next best worker. It will also
|
||||
// handle the case where the mapping does not reflect the desired set of granules based on the ongoing spits, and
|
||||
// correct it.
|
||||
// 2. We get the existing granule mappings. We do this by asking all active blob workers for their current granule
|
||||
// assignments. This guarantees a consistent snapshot of the state of that worker's assignments: Any request it
|
||||
// recieved and processed from the old manager before the granule assignment request will be included in the
|
||||
// assignments, and any request it recieves from the old manager afterwards will be rejected with
|
||||
// blob_manager_replaced. We will then read any gaps in the mapping from the database. We will reconcile the set
|
||||
// of ongoing splits to this mapping, and any ranges that are not already assigned to existing blob workers will
|
||||
// be reassigned.
|
||||
//
|
||||
// 3. For every range in our granuleAssignments, we send an assign request to the stream of requests,
|
||||
// ultimately giving every range back to some worker (trying to mimic the state of the old BM).
|
||||
// If the worker already had the range, this is a no-op. If the worker didn't have it, it will
|
||||
// begin persisting it. The worker that had the same range before will now be at a lower seqno.
|
||||
|
||||
state KeyRangeMap<Optional<UID>> workerAssignments;
|
||||
state KeyRangeMap<std::tuple<UID, int64_t, int64_t>> workerAssignments;
|
||||
workerAssignments.insert(normalKeys, std::tuple(UID(), 0, 0));
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
||||
// TODO KNOB
|
||||
|
@ -1395,7 +1450,8 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
// time, and we don't know which parts of those are reflected in the current set of worker assignments we read, we
|
||||
// have to construct the current desired set of granules from the set of ongoing splits and merges. Then, if any of
|
||||
// those are not represented in the worker mapping, we must add them.
|
||||
state KeyRangeMap<std::pair<int64_t, int64_t>> inProgressSplits;
|
||||
state KeyRangeMap<std::tuple<UID, int64_t, int64_t>> inProgressSplits;
|
||||
inProgressSplits.insert(normalKeys, std::tuple(UID(), 0, 0));
|
||||
|
||||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -1483,25 +1539,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
if (BM_DEBUG) {
|
||||
fmt::print(" [{0} - {1})\n", range.begin.printable(), range.end.printable());
|
||||
}
|
||||
|
||||
// same algorithm as worker map. If we read boundary changes from the log out of order, save the newer
|
||||
// ones, apply this one, and re-apply the other ones over this one don't concurrently modify with
|
||||
// iterator
|
||||
std::vector<std::pair<KeyRange, std::pair<int64_t, int64_t>>> newer;
|
||||
newer.reserve(splitBoundaries.size() - 1);
|
||||
auto intersecting = inProgressSplits.intersectingRanges(range);
|
||||
for (auto& it : intersecting) {
|
||||
if (splitEpochSeqno.first < it.value().first ||
|
||||
(splitEpochSeqno.first == it.value().first && splitEpochSeqno.second < it.value().second)) {
|
||||
// range currently there is newer than this range.
|
||||
newer.push_back(std::pair(it.range(), it.value()));
|
||||
}
|
||||
}
|
||||
inProgressSplits.insert(range, splitEpochSeqno);
|
||||
|
||||
for (auto& it : newer) {
|
||||
inProgressSplits.insert(it.first, it.second);
|
||||
}
|
||||
addAssignment(inProgressSplits, range, UID(), splitEpochSeqno.first, splitEpochSeqno.second);
|
||||
}
|
||||
}
|
||||
splitBoundaries.clear();
|
||||
|
@ -1514,15 +1552,62 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
nextParentID.reset();
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} found old assignments:\n", bmData->epoch);
|
||||
}
|
||||
// TODO could populate most/all of this list by just asking existing blob workers for their range sets to reduce DB
|
||||
// read load on BM restart
|
||||
|
||||
// Step 3. Get the latest known mapping of granules to blob workers (i.e. assignments)
|
||||
// This must happen causally AFTER reading the split boundaries, since the blob workers can clear the split
|
||||
// boundaries for a granule as part of persisting their assignment.
|
||||
|
||||
// First, ask existing workers for their mapping
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} requesting assignments from {1} workers:\n", bmData->epoch, startingWorkers.size());
|
||||
}
|
||||
state std::vector<Future<Optional<GetGranuleAssignmentsReply>>> aliveAssignments;
|
||||
aliveAssignments.reserve(startingWorkers.size());
|
||||
for (auto& it : startingWorkers) {
|
||||
GetGranuleAssignmentsRequest req;
|
||||
req.managerEpoch = bmData->epoch;
|
||||
aliveAssignments.push_back(timeout(brokenPromiseToNever(it.granuleAssignmentsRequest.getReply(req)),
|
||||
SERVER_KNOBS->BLOB_WORKER_TIMEOUT));
|
||||
}
|
||||
waitForAll(aliveAssignments);
|
||||
|
||||
state std::vector<std::pair<UID, KeyRange>> outOfDateAssignments;
|
||||
state int successful = 0;
|
||||
state int assignIdx = 0;
|
||||
// FIXME: more CPU efficient to do sorted merge of assignments?
|
||||
for (; assignIdx < aliveAssignments.size(); assignIdx++) {
|
||||
Optional<GetGranuleAssignmentsReply> reply = wait(aliveAssignments[assignIdx]);
|
||||
UID workerId = startingWorkers[assignIdx].id();
|
||||
if (reply.present()) {
|
||||
successful++;
|
||||
for (auto& assignment : reply.get().assignments) {
|
||||
bmData->knownBlobRanges.insert(assignment.range, true);
|
||||
addAssignment(workerAssignments,
|
||||
assignment.range,
|
||||
workerId,
|
||||
assignment.epochAssigned,
|
||||
assignment.seqnoAssigned,
|
||||
&outOfDateAssignments);
|
||||
}
|
||||
wait(yield());
|
||||
} else {
|
||||
// TODO mark as failed and kill it
|
||||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} got assignments from {1}/{2} workers:\n", bmData->epoch, successful, startingWorkers.size());
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} found old assignments:\n", bmData->epoch);
|
||||
}
|
||||
|
||||
// then, read any gaps in worker assignment from FDB
|
||||
// With a small number of blob workers, if even one is missing, doing numGranules/numWorkers small range reads from
|
||||
// FDB is probably less efficient than just reading the whole mapping anyway
|
||||
// Plus, we don't have a consistent snapshot of the mapping ACROSS blob workers, so we need the DB to reconcile any
|
||||
// differences (eg blob manager revoked from worker A, assigned to B, the revoke from A was processed but the assign
|
||||
// to B wasn't, meaning in the snapshot nobody owns the granule)
|
||||
state KeyRef beginKey = blobGranuleMappingKeys.begin;
|
||||
loop {
|
||||
try {
|
||||
|
@ -1543,7 +1628,8 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
if (results[rangeIdx].value.size()) {
|
||||
// note: if the old owner is dead, we handle this in rangeAssigner
|
||||
UID existingOwner = decodeBlobGranuleMappingValue(results[rangeIdx].value);
|
||||
workerAssignments.insert(KeyRangeRef(granuleStartKey, granuleEndKey), existingOwner);
|
||||
addAssignment(workerAssignments, KeyRangeRef(granuleStartKey, granuleEndKey), existingOwner, 0, 1);
|
||||
|
||||
bmData->knownBlobRanges.insert(KeyRangeRef(granuleStartKey, granuleEndKey), true);
|
||||
if (BM_DEBUG) {
|
||||
fmt::print(" [{0} - {1})={2}\n",
|
||||
|
@ -1579,18 +1665,14 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
// override and assign it to a new worker
|
||||
auto splits = inProgressSplits.intersectingRanges(normalKeys);
|
||||
for (auto& it : splits) {
|
||||
if (it.value().first == 0 || it.value().second == 0) {
|
||||
int64_t epoch = std::get<1>(it.value());
|
||||
int64_t seqno = std::get<2>(it.value());
|
||||
if (epoch == 0 || seqno == 0) {
|
||||
// no in-progress splits for this range
|
||||
continue;
|
||||
}
|
||||
auto r = workerAssignments.rangeContaining(it.begin());
|
||||
|
||||
// if this range is at all different from the worker mapping, the mapping is out of date
|
||||
if (r.begin() != it.begin() || r.end() != it.end()) {
|
||||
// the empty UID signifies that we need to find an owner (worker) for this range
|
||||
workerAssignments.insert(it.range(), UID());
|
||||
fmt::print(" [{0} - {1})\n", it.begin().printable().c_str(), it.end().printable().c_str());
|
||||
}
|
||||
addAssignment(workerAssignments, it.range(), UID(), epoch, seqno, &outOfDateAssignments);
|
||||
}
|
||||
|
||||
// Step 4. Send assign requests for all the granules and transfer assignments
|
||||
|
@ -1611,30 +1693,67 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
}
|
||||
}
|
||||
|
||||
// revoke assignments that are old and incorrect
|
||||
TEST(!outOfDateAssignments.empty()); // BM resolved conflicting assignments on recovery
|
||||
for (auto& it : outOfDateAssignments) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} revoking out of date assignment [%s - %s): %s:\n",
|
||||
bmData->epoch,
|
||||
it.second.begin.printable().c_str(),
|
||||
it.second.end.printable().c_str(),
|
||||
it.first.toString().c_str());
|
||||
}
|
||||
RangeAssignment raRevoke;
|
||||
raRevoke.isAssign = false;
|
||||
raRevoke.worker = it.first;
|
||||
raRevoke.keyRange = it.second;
|
||||
bmData->rangesToAssign.send(raRevoke);
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} final ranges:\n", bmData->epoch);
|
||||
}
|
||||
|
||||
for (auto& range : workerAssignments.intersectingRanges(normalKeys)) {
|
||||
if (!range.value().present()) {
|
||||
int64_t epoch = std::get<1>(range.value());
|
||||
int64_t seqno = std::get<2>(range.value());
|
||||
if (epoch == 0 && seqno == 0) {
|
||||
/*if (BM_DEBUG) {
|
||||
fmt::print(" [{0} - {1}) invalid\n", range.begin().printable(), range.end().printable());
|
||||
}*/
|
||||
continue;
|
||||
}
|
||||
|
||||
UID workerId = std::get<0>(range.value());
|
||||
bmData->workerAssignments.insert(range.range(), workerId);
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print(" [{0} - {1})\n", range.begin().printable(), range.end().printable());
|
||||
fmt::print(" [{0} - {1}){2}\n",
|
||||
range.begin().printable(),
|
||||
range.end().printable(),
|
||||
workerId == UID() ? " (*)" : "");
|
||||
}
|
||||
|
||||
bmData->workerAssignments.insert(range.range(), range.value().get());
|
||||
// if worker id is already set to a known worker, range is already assigned there. If not, need to explicitly
|
||||
// assign it
|
||||
if (workerId == UID()) {
|
||||
RangeAssignment raAssign;
|
||||
raAssign.isAssign = true;
|
||||
raAssign.worker = workerId;
|
||||
raAssign.keyRange = range.range();
|
||||
raAssign.assign = RangeAssignmentData(AssignRequestType::Normal);
|
||||
bmData->rangesToAssign.send(raAssign);
|
||||
}
|
||||
}
|
||||
|
||||
RangeAssignment raAssign;
|
||||
raAssign.isAssign = true;
|
||||
raAssign.worker = range.value().get();
|
||||
raAssign.keyRange = range.range();
|
||||
raAssign.assign = RangeAssignmentData(AssignRequestType::Reassign);
|
||||
bmData->rangesToAssign.send(raAssign);
|
||||
// coalesce known blob ranges within boundaries at the very end
|
||||
RangeResult results =
|
||||
wait(krmGetRanges(tr, blobRangeKeys.begin, KeyRange(normalKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
for (int i = 0; i < results.size() - 1; i++) {
|
||||
if (results[i].value.size()) {
|
||||
bmData->knownBlobRanges.coalesce(KeyRangeRef(results[i].key, results[i + 1].key));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -2604,14 +2604,17 @@ static bool newerRangeAssignment(GranuleRangeMetadata oldMetadata, int64_t epoch
|
|||
// manager with a successful assignment And if the change produced a new granule that needs to start
|
||||
// doing work, returns the new granule so that the caller can start() it with the appropriate starting
|
||||
// state.
|
||||
ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
|
||||
KeyRange keyRange,
|
||||
int64_t epoch,
|
||||
int64_t seqno,
|
||||
bool active,
|
||||
bool disposeOnCleanup,
|
||||
bool selfReassign,
|
||||
Optional<AssignRequestType> assignType = Optional<AssignRequestType>()) {
|
||||
|
||||
// Not an actor because we need to guarantee it changes the synchronously as part of the request
|
||||
static bool changeBlobRange(Reference<BlobWorkerData> bwData,
|
||||
KeyRange keyRange,
|
||||
int64_t epoch,
|
||||
int64_t seqno,
|
||||
bool active,
|
||||
bool disposeOnCleanup,
|
||||
bool selfReassign,
|
||||
std::vector<Future<Void>>& toWaitOut,
|
||||
Optional<AssignRequestType> assignType = Optional<AssignRequestType>()) {
|
||||
// since changeBlobRange is used for assigns and revokes,
|
||||
// we assert that assign type is specified iff this is an
|
||||
ASSERT(active == assignType.present());
|
||||
|
@ -2632,23 +2635,11 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
|
|||
// older, or newer. For each older range, cancel it if it is active. Insert the current range.
|
||||
// Re-insert all newer ranges over the current range.
|
||||
|
||||
state std::vector<Future<Void>> futures;
|
||||
|
||||
state std::vector<std::pair<KeyRange, GranuleRangeMetadata>> newerRanges;
|
||||
std::vector<std::pair<KeyRange, GranuleRangeMetadata>> newerRanges;
|
||||
|
||||
auto ranges = bwData->granuleMetadata.intersectingRanges(keyRange);
|
||||
bool alreadyAssigned = false;
|
||||
for (auto& r : ranges) {
|
||||
// I don't think we need this?
|
||||
/*if (!active) {
|
||||
if (r.value().activeMetadata.isValid() && r.value().activeMetadata->cancelled.canBeSet()) {
|
||||
if (BW_DEBUG) {
|
||||
printf("Cancelling activeMetadata\n");
|
||||
}
|
||||
bwData->stats.numRangesAssigned--;
|
||||
r.value().activeMetadata->cancelled.send(Void());
|
||||
}
|
||||
}*/
|
||||
bool thisAssignmentNewer = newerRangeAssignment(r.value(), epoch, seqno);
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("thisAssignmentNewer={}\n", thisAssignmentNewer ? "true" : "false");
|
||||
|
@ -2684,7 +2675,7 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
// applied the same assignment twice, make idempotent
|
||||
if (r.value().activeMetadata.isValid()) {
|
||||
futures.push_back(success(r.value().assignFuture));
|
||||
toWaitOut.push_back(success(r.value().assignFuture));
|
||||
}
|
||||
alreadyAssigned = true;
|
||||
break;
|
||||
|
@ -2711,7 +2702,6 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
|
||||
if (alreadyAssigned) {
|
||||
wait(waitForAll(futures)); // already applied, nothing to do
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2742,7 +2732,6 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
|
|||
bwData->granuleMetadata.insert(it.first, it.second);
|
||||
}
|
||||
|
||||
wait(waitForAll(futures));
|
||||
return newerRanges.size() == 0;
|
||||
}
|
||||
|
||||
|
@ -2809,6 +2798,8 @@ ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWork
|
|||
}
|
||||
}
|
||||
|
||||
// the contract of handleRangeAssign and handleRangeRevoke is that they change the mapping before doing any waiting.
|
||||
// This ensures GetGranuleAssignment returns an up-to-date set of ranges
|
||||
ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
|
||||
AssignBlobRangeRequest req,
|
||||
bool isSelfReassign) {
|
||||
|
@ -2816,8 +2807,17 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
|
|||
if (req.type == AssignRequestType::Continue) {
|
||||
resumeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno);
|
||||
} else {
|
||||
bool shouldStart = wait(changeBlobRange(
|
||||
bwData, req.keyRange, req.managerEpoch, req.managerSeqno, true, false, isSelfReassign, req.type));
|
||||
std::vector<Future<Void>> toWait;
|
||||
state bool shouldStart = changeBlobRange(bwData,
|
||||
req.keyRange,
|
||||
req.managerEpoch,
|
||||
req.managerSeqno,
|
||||
true,
|
||||
false,
|
||||
isSelfReassign,
|
||||
toWait,
|
||||
req.type);
|
||||
wait(waitForAll(toWait));
|
||||
|
||||
if (shouldStart) {
|
||||
bwData->stats.numRangesAssigned++;
|
||||
|
@ -2863,8 +2863,9 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
|
|||
|
||||
ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlobRangeRequest req) {
|
||||
try {
|
||||
wait(success(
|
||||
changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false)));
|
||||
std::vector<Future<Void>> toWait;
|
||||
changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false, toWait);
|
||||
wait(waitForAll(toWait));
|
||||
req.reply.send(Void());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -2943,6 +2944,29 @@ ACTOR Future<Void> runGRVChecks(Reference<BlobWorkerData> bwData) {
|
|||
}
|
||||
}
|
||||
|
||||
static void handleGetGranuleAssignmentsRequest(Reference<BlobWorkerData> self,
|
||||
const GetGranuleAssignmentsRequest& req) {
|
||||
GetGranuleAssignmentsReply reply;
|
||||
auto allRanges = self->granuleMetadata.intersectingRanges(normalKeys);
|
||||
for (auto& it : allRanges) {
|
||||
if (it.value().activeMetadata.isValid()) {
|
||||
// range is active, copy into reply's arena
|
||||
StringRef start = StringRef(reply.arena, it.begin());
|
||||
StringRef end = StringRef(reply.arena, it.end());
|
||||
|
||||
reply.assignments.push_back(
|
||||
reply.arena, GranuleAssignmentRef(KeyRangeRef(start, end), it.value().lastEpoch, it.value().lastSeqno));
|
||||
}
|
||||
}
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("Worker {0} sending {1} granule assignments back to BM {2}\n",
|
||||
self->id.toString(),
|
||||
reply.assignments.size(),
|
||||
req.managerEpoch);
|
||||
}
|
||||
req.reply.send(reply);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
||||
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
|
@ -3060,6 +3084,18 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
|||
when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) {
|
||||
self->addActor.send(handleRangeAssign(self, granuleToReassign, true));
|
||||
}
|
||||
when(GetGranuleAssignmentsRequest req = waitNext(bwInterf.granuleAssignmentsRequest.getFuture())) {
|
||||
if (self->managerEpochOk(req.managerEpoch)) {
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("Worker {0} got granule assignments request from BM {1}\n",
|
||||
self->id.toString(),
|
||||
req.managerEpoch);
|
||||
}
|
||||
handleGetGranuleAssignmentsRequest(self, req);
|
||||
} else {
|
||||
req.reply.sendError(blob_manager_replaced());
|
||||
}
|
||||
}
|
||||
when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) {
|
||||
if (self->managerEpochOk(req.managerEpoch)) {
|
||||
TraceEvent("BlobWorkerHalted", self->id)
|
||||
|
|
Loading…
Reference in New Issue