Handle blob work failure

This commit is contained in:
Suraj Gupta 2021-10-01 11:08:00 -04:00
parent abdf5d434c
commit dfb9655c57
3 changed files with 272 additions and 81 deletions

View File

@ -35,6 +35,8 @@ struct BlobWorkerInterface {
RequestStream<struct AssignBlobRangeRequest> assignBlobRangeRequest; RequestStream<struct AssignBlobRangeRequest> assignBlobRangeRequest;
RequestStream<struct RevokeBlobRangeRequest> revokeBlobRangeRequest; RequestStream<struct RevokeBlobRangeRequest> revokeBlobRangeRequest;
RequestStream<struct GranuleStatusStreamRequest> granuleStatusStreamRequest; RequestStream<struct GranuleStatusStreamRequest> granuleStatusStreamRequest;
RequestStream<struct HaltBlobWorkerRequest> haltBlobWorker;
struct LocalityData locality; struct LocalityData locality;
UID myId; UID myId;
@ -57,6 +59,7 @@ struct BlobWorkerInterface {
assignBlobRangeRequest, assignBlobRangeRequest,
revokeBlobRangeRequest, revokeBlobRangeRequest,
granuleStatusStreamRequest, granuleStatusStreamRequest,
haltBlobWorker,
locality, locality,
myId); myId);
} }
@ -182,4 +185,20 @@ struct GranuleStatusStreamRequest {
} }
}; };
struct HaltBlobWorkerRequest {
constexpr static FileIdentifier file_identifier = 1985879;
UID requesterID;
ReplyPromise<Void> reply;
int64_t managerEpoch;
HaltBlobWorkerRequest() {}
explicit HaltBlobWorkerRequest(int64_t managerEpoch, UID uid) : requesterID(uid), managerEpoch(managerEpoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, managerEpoch, requesterID, reply);
}
};
#endif #endif

View File

@ -208,7 +208,7 @@ struct BlobManagerData {
KeyRangeMap<UID> workerAssignments; KeyRangeMap<UID> workerAssignments;
KeyRangeMap<bool> knownBlobRanges; KeyRangeMap<bool> knownBlobRanges;
Debouncer restartRecruiting; AsyncVar<Void> restartRecruiting;
std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on
int64_t epoch = -1; int64_t epoch = -1;
@ -221,8 +221,7 @@ struct BlobManagerData {
PromiseStream<RangeAssignment> rangesToAssign; PromiseStream<RangeAssignment> rangesToAssign;
BlobManagerData(UID id, Database db) BlobManagerData(UID id, Database db)
: id(id), db(db), knownBlobRanges(false, normalKeys.end), : id(id), db(db), knownBlobRanges(false, normalKeys.end), restartRecruiting() {}
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY) {}
~BlobManagerData() { printf("Destroying blob manager data for %s\n", id.toString().c_str()); } ~BlobManagerData() { printf("Destroying blob manager data for %s\n", id.toString().c_str()); }
}; };
@ -283,6 +282,10 @@ static UID pickWorkerForAssign(BlobManagerData* bmData) {
} }
// pick a random worker out of the eligible workers // pick a random worker out of the eligible workers
if (eligibleWorkers.size() == 0) {
printf("%d eligible workers\n", bmData->workerStats.size());
}
ASSERT(eligibleWorkers.size() > 0); ASSERT(eligibleWorkers.size() > 0);
int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size()); int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size());
if (BM_DEBUG) { if (BM_DEBUG) {
@ -298,7 +301,7 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
if (BM_DEBUG) { if (BM_DEBUG) {
printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n", printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n",
workerID.toString().c_str(), bmData->id.toString().c_str(),
assignment.isAssign ? "assigning" : "revoking", assignment.isAssign ? "assigning" : "revoking",
assignment.keyRange.begin.printable().c_str(), assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str(), assignment.keyRange.end.printable().c_str(),
@ -318,6 +321,11 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
req.managerEpoch = bmData->epoch; req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo; req.managerSeqno = seqNo;
req.continueAssignment = assignment.assign.get().continueAssignment; req.continueAssignment = assignment.assign.get().continueAssignment;
// if that worker isn't alive anymore, add the range back into the stream
if (bmData->workersById.count(workerID) == 0) {
throw granule_assignment_conflict(); // TODO: find a better error to throw
}
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req)); AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
rep = _rep; rep = _rep;
} else { } else {
@ -331,8 +339,13 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
req.managerSeqno = seqNo; req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose; req.dispose = assignment.revoke.get().dispose;
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); // if that worker isn't alive anymore, this is a noop
rep = _rep; if (bmData->workersById.count(workerID)) {
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
rep = _rep;
} else {
return Void();
}
} }
if (!rep.epochOk) { if (!rep.epochOk) {
if (BM_DEBUG) { if (BM_DEBUG) {
@ -349,7 +362,8 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
if (BM_DEBUG) { if (BM_DEBUG) {
printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n", printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n",
assignment.keyRange.begin.printable().c_str(), assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str()); assignment.keyRange.end.printable().c_str(),
workerID.toString().c_str());
} }
// re-send revoke to queue to handle range being un-assigned from that worker before the new one // re-send revoke to queue to handle range being un-assigned from that worker before the new one
RangeAssignment revokeOld; RangeAssignment revokeOld;
@ -404,7 +418,9 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
// Ensure range isn't currently assigned anywhere, and there is only 1 intersecting range // Ensure range isn't currently assigned anywhere, and there is only 1 intersecting range
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange); auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
int count = 0; int count = 0;
printf("intersecting ranges in currentAssignments:\n");
for (auto& it : currentAssignments) { for (auto& it : currentAssignments) {
printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str());
if (assignment.assign.get().continueAssignment) { if (assignment.assign.get().continueAssignment) {
ASSERT(assignment.worker.present()); ASSERT(assignment.worker.present());
ASSERT(it.value() == assignment.worker.get()); ASSERT(it.value() == assignment.worker.get());
@ -419,6 +435,11 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
bmData->workerAssignments.insert(assignment.keyRange, workerId); bmData->workerAssignments.insert(assignment.keyRange, workerId);
bmData->workerStats[workerId].numGranulesAssigned += 1; bmData->workerStats[workerId].numGranulesAssigned += 1;
printf("current ranges after inserting assign: \n");
for (auto it : bmData->workerAssignments.ranges()) {
printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str());
}
// FIXME: if range is assign, have some sort of semaphore for outstanding assignments so we don't assign // FIXME: if range is assign, have some sort of semaphore for outstanding assignments so we don't assign
// a ton ranges at once and blow up FDB with reading initial snapshots. // a ton ranges at once and blow up FDB with reading initial snapshots.
bmData->addActor.send(doRangeAssignment(bmData, assignment, workerId, seqNo)); bmData->addActor.send(doRangeAssignment(bmData, assignment, workerId, seqNo));
@ -432,12 +453,20 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
// It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of // It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of
// the same logical change // the same logical change
bmData->workerStats[it.value()].numGranulesAssigned -= 1;
if (!assignment.worker.present() || assignment.worker.get() == it.value()) if (bmData->workerStats.count(it.value())) {
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo)); bmData->workerStats[it.value()].numGranulesAssigned -= 1;
}
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
} }
bmData->workerAssignments.insert(assignment.keyRange, UID()); bmData->workerAssignments.insert(assignment.keyRange, UID());
printf("current ranges after inserting revoke: \n");
for (auto it : bmData->workerAssignments.ranges()) {
printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str());
}
} }
} }
} }
@ -701,6 +730,73 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
return Void(); return Void();
} }
void reassignRanges(BlobManagerData* bmData, UID bwId) {
printf("taking back ranges for worker %s\n", bwId.toString().c_str());
// for every range owned by this blob worker, we want to
// - send a revoke request for that range to the blob worker
// - add the range back to the stream of ranges to be assigned
for (auto& it : bmData->workerAssignments.ranges()) {
if (it.cvalue() == bwId) {
// Send revoke request to worker
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.worker = bwId;
raRevoke.keyRange = it.range();
raRevoke.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(raRevoke);
// Add range back into the stream of ranges to be assigned
RangeAssignment raAssign;
raAssign.isAssign = true;
raAssign.worker = Optional<UID>();
raAssign.keyRange = it.range();
raAssign.assign = RangeAssignmentData(false); // not a continue
bmData->rangesToAssign.send(raAssign);
}
}
}
void killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
UID bwId = bwInterf.id();
// Remove blob worker from stats map so that when we try to find a worker to takeover the range,
// the one we just killed isn't considered.
// Remove it from workersById also since otherwise that addr will remain excluded
// when we try to recruit new blob workers.
printf("removing bw %s from BM workerStats\n", bwId.toString().c_str());
bmData->workerStats.erase(bwId);
bmData->workersById.erase(bwId);
// for every range owned by this blob worker, we want to
// - send a revoke request for that range to the blob worker
// - add the range back to the stream of ranges to be assigned
printf("taking back ranges from bw %s\n", bwId.toString().c_str());
for (auto& it : bmData->workerAssignments.ranges()) {
if (it.cvalue() == bwId) {
// Send revoke request to worker
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.worker = bwId;
raRevoke.keyRange = it.range();
raRevoke.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(raRevoke);
// Add range back into the stream of ranges to be assigned
RangeAssignment raAssign;
raAssign.isAssign = true;
raAssign.worker = Optional<UID>();
raAssign.keyRange = it.range();
raAssign.assign = RangeAssignmentData(false); // not a continue
bmData->rangesToAssign.send(raAssign);
}
}
// Send halt to blob worker, with no expectation of hearing back
printf("sending halt to bw %s\n", bwId.toString().c_str());
bmData->addActor.send(
brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
}
ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) { ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno; state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
// outer loop handles reconstructing stream if it got a retryable error // outer loop handles reconstructing stream if it got a retryable error
@ -711,6 +807,7 @@ ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn
// read from stream until worker fails (should never get explicit end_of_stream) // read from stream until worker fails (should never get explicit end_of_stream)
loop { loop {
GranuleStatusReply rep = waitNext(statusStream.getFuture()); GranuleStatusReply rep = waitNext(statusStream.getFuture());
if (BM_DEBUG) { if (BM_DEBUG) {
printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n", printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n",
bmData->epoch, bmData->epoch,
@ -723,7 +820,8 @@ ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn
} }
if (rep.epoch > bmData->epoch) { if (rep.epoch > bmData->epoch) {
if (BM_DEBUG) { if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n"); printf("BM heard from BW %s that there is a new manager with higher epoch\n",
bwInterf.id().toString().c_str());
} }
if (bmData->iAmReplaced.canBeSet()) { if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void()); bmData->iAmReplaced.send(Void());
@ -734,8 +832,12 @@ ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn
// to split the range. // to split the range.
ASSERT(rep.doSplit); ASSERT(rep.doSplit);
// FIXME: only evaluate for split if this worker currently owns the granule in this blob manager's auto currGranuleAssignment = bmData->workerAssignments.rangeContaining(rep.granuleRange.begin);
// mapping if (!(currGranuleAssignment.begin() == rep.granuleRange.begin &&
currGranuleAssignment.end() == rep.granuleRange.end &&
currGranuleAssignment.cvalue() == bwInterf.id())) {
continue;
}
auto lastReqForGranule = lastSeenSeqno.rangeContaining(rep.granuleRange.begin); auto lastReqForGranule = lastSeenSeqno.rangeContaining(rep.granuleRange.begin);
if (rep.granuleRange.begin == lastReqForGranule.begin() && if (rep.granuleRange.begin == lastReqForGranule.begin() &&
@ -797,10 +899,7 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str()); printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str());
} }
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()); TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
// get all of its ranges killBlobWorker(bmData, bwInterf);
// send revoke request to get back all its ranges
// send halt (look at rangeMover)
// send all its ranges to assignranges stream
return Void(); return Void();
} }
when(wait(monitorStatus)) { when(wait(monitorStatus)) {
@ -820,6 +919,13 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
TraceEvent(SevError, "BWMonitoringFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()).error(e); TraceEvent(SevError, "BWMonitoringFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()).error(e);
throw e; throw e;
} }
// Trigger recruitment for a new blob worker
printf("restarting recruitment in monitorblobworker\n");
bmData->restartRecruiting.trigger();
printf("about to stop monitoring %s\n", bwInterf.id().toString().c_str());
return Void();
} }
// TODO this is only for chaos testing right now!! REMOVE LATER // TODO this is only for chaos testing right now!! REMOVE LATER
@ -937,8 +1043,8 @@ ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorker
if (newBlobWorker.present()) { if (newBlobWorker.present()) {
BlobWorkerInterface bwi = newBlobWorker.get().interf; BlobWorkerInterface bwi = newBlobWorker.get().interf;
self->workersById.insert({ bwi.id(), bwi }); self->workersById[bwi.id()] = bwi;
self->workerStats.insert({ bwi.id(), BlobWorkerStats() }); self->workerStats[bwi.id()] = BlobWorkerStats();
self->addActor.send(monitorBlobWorker(self, bwi)); self->addActor.send(monitorBlobWorker(self, bwi));
TraceEvent("BMRecruiting") TraceEvent("BMRecruiting")
@ -984,6 +1090,10 @@ ACTOR Future<Void> blobWorkerRecruiter(
} }
TraceEvent("BMRecruiting").detail("State", "Sending request to CC"); TraceEvent("BMRecruiting").detail("State", "Sending request to CC");
printf("EXCLUDING THE FOLLOWING IN REQ:\n");
for (auto addr : recruitReq.excludeAddresses) {
printf("- %s\n", addr.toString().c_str());
}
if (!fCandidateWorker.isValid() || fCandidateWorker.isReady() || if (!fCandidateWorker.isValid() || fCandidateWorker.isReady() ||
recruitReq.excludeAddresses != lastRequest.excludeAddresses) { recruitReq.excludeAddresses != lastRequest.excludeAddresses) {
@ -1003,7 +1113,7 @@ ACTOR Future<Void> blobWorkerRecruiter(
when(wait(recruitBlobWorker->onChange())) { fCandidateWorker = Future<RecruitBlobWorkerReply>(); } when(wait(recruitBlobWorker->onChange())) { fCandidateWorker = Future<RecruitBlobWorkerReply>(); }
// signal used to restart the loop and try to recruit the next blob worker // signal used to restart the loop and try to recruit the next blob worker
when(wait(self->restartRecruiting.onTrigger())) {} when(wait(self->restartRecruiting.onChange())) { printf("RESTARTED RECRUITING. BACK TO TOP\n"); }
} }
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager)); wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager));
} catch (Error& e) { } catch (Error& e) {

View File

@ -34,6 +34,7 @@
#include "fdbserver/MutationTracking.h" #include "fdbserver/MutationTracking.h"
#include "fdbserver/WaitFailure.h" #include "fdbserver/WaitFailure.h"
#include "flow/Arena.h" #include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/IRandom.h" #include "flow/IRandom.h"
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
#include "flow/flow.h" #include "flow/flow.h"
@ -41,8 +42,6 @@
#define BW_DEBUG true #define BW_DEBUG true
#define BW_REQUEST_DEBUG false #define BW_REQUEST_DEBUG false
// FIXME: change all BlobWorkerData* to Reference<BlobWorkerData> to avoid segfaults if core loop gets error
// TODO add comments + documentation // TODO add comments + documentation
struct BlobFileIndex { struct BlobFileIndex {
Version version; Version version;
@ -76,10 +75,12 @@ struct GranuleChangeFeedInfo {
// FIXME: the circular dependencies here are getting kind of gross // FIXME: the circular dependencies here are getting kind of gross
struct GranuleMetadata; struct GranuleMetadata;
struct BlobWorkerData; struct BlobWorkerData;
ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwData, AssignBlobRangeRequest req); ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(Reference<BlobWorkerData> bwData,
ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<GranuleMetadata> metadata); AssignBlobRangeRequest req);
ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData, Reference<GranuleMetadata> metadata);
// for a range that may or may not be set
// for a range that is active
struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> { struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
KeyRange keyRange; KeyRange keyRange;
@ -112,12 +113,17 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
AssignBlobRangeRequest originalReq; AssignBlobRangeRequest originalReq;
Future<Void> start(BlobWorkerData* bwData, AssignBlobRangeRequest req) { Future<Void> start(Reference<BlobWorkerData> bwData, AssignBlobRangeRequest req) {
originalReq = req; originalReq = req;
assignFuture = persistAssignWorkerRange(bwData, req); assignFuture = persistAssignWorkerRange(bwData, req);
fileUpdaterFuture = blobGranuleUpdateFiles(bwData, Reference<GranuleMetadata>::addRef(this)); fileUpdaterFuture = blobGranuleUpdateFiles(bwData, Reference<GranuleMetadata>::addRef(this));
// bwData->actors.add(blobGranuleUpdateFiles(bwData, Reference<GranuleMetadata>::addRef(this)));
// this could be the cause of the seg fault. since this is not being waited on,
// when start get cancelled, blobGranuleUpdateFiles won't get cancelled. so instead I added it to actors, so
// that it is explicitly cancelled. maybe this fixes it?
return success(assignFuture); return success(assignFuture);
// return Void();
} }
void resume() { void resume() {
@ -145,7 +151,6 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
} }
}; };
// for a range that may or may not be set
struct GranuleRangeMetadata { struct GranuleRangeMetadata {
int64_t lastEpoch; int64_t lastEpoch;
int64_t lastSeqno; int64_t lastSeqno;
@ -154,14 +159,25 @@ struct GranuleRangeMetadata {
GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {}
GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference<GranuleMetadata> activeMetadata) GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference<GranuleMetadata> activeMetadata)
: lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {}
/*
~GranuleRangeMetadata() {
if (activeMetadata.isValid()) {
activeMetadata->cancel(false);
}
}
*/
}; };
struct BlobWorkerData { struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id; UID id;
Database db; Database db;
AsyncVar<bool> dead;
BlobWorkerStats stats; BlobWorkerStats stats;
PromiseStream<Future<Void>> addActor;
ActorCollection actors{ false };
LocalityData locality; LocalityData locality;
int64_t currentManagerEpoch = -1; int64_t currentManagerEpoch = -1;
@ -178,7 +194,8 @@ struct BlobWorkerData {
PromiseStream<AssignBlobRangeRequest> granuleUpdateErrors; PromiseStream<AssignBlobRangeRequest> granuleUpdateErrors;
BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {} BlobWorkerData(UID id, Database db)
: id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), actors(false), dead(false) {}
~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); } ~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); }
bool managerEpochOk(int64_t epoch) { bool managerEpochOk(int64_t epoch) {
@ -481,7 +498,7 @@ static Value getFileValue(std::string fname, int64_t offset, int64_t length) {
// the data in it may not yet be committed, and even though previous delta fiels with lower versioned data may still be // the data in it may not yet be committed, and even though previous delta fiels with lower versioned data may still be
// in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files // in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files
// exist. Before updating FDB, we ensure the version is committed and all previous delta files have updated FDB. // exist. Before updating FDB, we ensure the version is committed and all previous delta files have updated FDB.
ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData, ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
KeyRange keyRange, KeyRange keyRange,
int64_t epoch, int64_t epoch,
int64_t seqno, int64_t seqno,
@ -589,7 +606,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
} }
} }
ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData, ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
KeyRange keyRange, KeyRange keyRange,
int64_t epoch, int64_t epoch,
int64_t seqno, int64_t seqno,
@ -707,7 +724,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
return BlobFileIndex(version, fname, 0, serialized.size()); return BlobFileIndex(version, fname, 0, serialized.size());
} }
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, Reference<GranuleMetadata> metadata) { ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata) {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Dumping snapshot from FDB for [%s - %s)\n", printf("Dumping snapshot from FDB for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(), metadata->keyRange.begin.printable().c_str(),
@ -755,7 +773,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
} }
// files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule. // files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule.
ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata, Reference<GranuleMetadata> metadata,
GranuleFiles files) { GranuleFiles files) {
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
@ -874,7 +892,7 @@ static bool filterOldMutations(const KeyRange& range,
return false; return false;
} }
ACTOR Future<Void> handleCompletedDeltaFile(BlobWorkerData* bwData, ACTOR Future<Void> handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata, Reference<GranuleMetadata> metadata,
BlobFileIndex completedDeltaFile, BlobFileIndex completedDeltaFile,
Key cfKey, Key cfKey,
@ -892,12 +910,15 @@ ACTOR Future<Void> handleCompletedDeltaFile(BlobWorkerData* bwData,
// have completed // have completed
// FIXME: also have these be async, have each pop change feed wait on the prior one, wait on them before // FIXME: also have these be async, have each pop change feed wait on the prior one, wait on them before
// re-snapshotting // re-snapshotting
printf("in handleCompletedDeltaFile for BW %s\n", bwData->id.toString().c_str());
Future<Void> popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version); Future<Void> popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version);
wait(popFuture); wait(popFuture);
printf("popChangeFeedMutations returned\n");
} }
while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) { while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) {
rollbacksInProgress.pop_front(); rollbacksInProgress.pop_front();
} }
printf("removed rollbacks\n");
return Void(); return Void();
} }
@ -1026,7 +1047,7 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
// updater for a single granule // updater for a single granule
// TODO: this is getting kind of large. Should try to split out this actor if it continues to grow? // TODO: this is getting kind of large. Should try to split out this actor if it continues to grow?
// FIXME: handle errors here (forward errors) // FIXME: handle errors here (forward errors)
ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<GranuleMetadata> metadata) { ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData, Reference<GranuleMetadata> metadata) {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> oldChangeFeedStream; state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> oldChangeFeedStream;
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> changeFeedStream; state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> changeFeedStream;
state Future<BlobFileIndex> inFlightBlobSnapshot; state Future<BlobFileIndex> inFlightBlobSnapshot;
@ -1146,6 +1167,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/); changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/);
} else { } else {
readOldChangeFeed = false; readOldChangeFeed = false;
printf("before getChangeFeedStream, my ID is %s\n", bwData->id.toString().c_str());
changeFeedFuture = bwData->db->getChangeFeedStream( changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange); changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
} }
@ -1347,12 +1369,15 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
Optional<Void> result = wait(timeout(metadata->resumeSnapshot.getFuture(), 1.0)); Optional<Void> result = wait(timeout(metadata->resumeSnapshot.getFuture(), 1.0));
if (result.present()) { if (result.present()) {
break; break;
} else if (bwData->dead.get()) {
throw actor_cancelled();
} }
// FIXME: re-trigger this loop if blob manager status stream changes // FIXME: re-trigger this loop if blob manager status stream changes
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Granule [%s - %s)\n, hasn't heard back from BM, re-sending status\n", printf("Granule [%s - %s)\n, hasn't heard back from BM in BW %s, re-sending status\n",
metadata->keyRange.begin.printable().c_str(), metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str()); metadata->keyRange.end.printable().c_str(),
bwData->id.toString().c_str());
} }
} }
@ -1528,6 +1553,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
} }
} catch (Error& e) { } catch (Error& e) {
printf("IN CATCH FOR blobGranuleUpdateFiles -----------------------------------\n ");
if (e.code() == error_code_operation_cancelled) { if (e.code() == error_code_operation_cancelled) {
throw; throw;
} }
@ -1642,7 +1668,7 @@ static Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version
return waitForVersionActor(metadata, v); return waitForVersionActor(metadata, v);
} }
ACTOR Future<Void> handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGranuleFileRequest req) { ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
try { try {
// TODO REMOVE in api V2 // TODO REMOVE in api V2
ASSERT(req.beginVersion == 0); ASSERT(req.beginVersion == 0);
@ -1713,7 +1739,10 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran
choose { choose {
when(wait(waitForVersionFuture)) {} when(wait(waitForVersionFuture)) {}
when(wait(metadata->rollbackCount.onChange())) {} when(wait(metadata->rollbackCount.onChange())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } when(wait(metadata->cancelled.getFuture())) {
printf("metadata was cancelled\n");
throw wrong_shard_server();
}
} }
if (rollbackCount == metadata->rollbackCount.get()) { if (rollbackCount == metadata->rollbackCount.get()) {
@ -1821,7 +1850,8 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran
return Void(); return Void();
} }
ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwData, AssignBlobRangeRequest req) { ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(Reference<BlobWorkerData> bwData,
AssignBlobRangeRequest req) {
ASSERT(!req.continueAssignment); ASSERT(!req.continueAssignment);
state Transaction tr(bwData->db); state Transaction tr(bwData->db);
state Key lockKey = granuleLockKey(req.keyRange); state Key lockKey = granuleLockKey(req.keyRange);
@ -1971,7 +2001,7 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwD
} }
} }
static GranuleRangeMetadata constructActiveBlobRange(BlobWorkerData* bwData, static GranuleRangeMetadata constructActiveBlobRange(Reference<BlobWorkerData> bwData,
KeyRange keyRange, KeyRange keyRange,
int64_t epoch, int64_t epoch,
int64_t seqno) { int64_t seqno) {
@ -2013,7 +2043,7 @@ static bool newerRangeAssignment(GranuleRangeMetadata oldMetadata, int64_t epoch
// Returns future to wait on to ensure prior work of other granules is done before responding to the manager with a // Returns future to wait on to ensure prior work of other granules is done before responding to the manager with a
// successful assignment And if the change produced a new granule that needs to start doing work, returns the new // successful assignment And if the change produced a new granule that needs to start doing work, returns the new
// granule so that the caller can start() it with the appropriate starting state. // granule so that the caller can start() it with the appropriate starting state.
static std::pair<Future<Void>, Reference<GranuleMetadata>> changeBlobRange(BlobWorkerData* bwData, static std::pair<Future<Void>, Reference<GranuleMetadata>> changeBlobRange(Reference<BlobWorkerData> bwData,
KeyRange keyRange, KeyRange keyRange,
int64_t epoch, int64_t epoch,
int64_t seqno, int64_t seqno,
@ -2105,7 +2135,7 @@ static std::pair<Future<Void>, Reference<GranuleMetadata>> changeBlobRange(BlobW
return std::pair(waitForAll(futures), newMetadata.activeMetadata); return std::pair(waitForAll(futures), newMetadata.activeMetadata);
} }
static bool resumeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t epoch, int64_t seqno) { static bool resumeBlobRange(Reference<BlobWorkerData> bwData, KeyRange keyRange, int64_t epoch, int64_t seqno) {
auto existingRange = bwData->granuleMetadata.rangeContaining(keyRange.begin); auto existingRange = bwData->granuleMetadata.rangeContaining(keyRange.begin);
// if range boundaries don't match, or this (epoch, seqno) is old or the granule is inactive, ignore // if range boundaries don't match, or this (epoch, seqno) is old or the granule is inactive, ignore
if (keyRange.begin != existingRange.begin() || keyRange.end != existingRange.end() || if (keyRange.begin != existingRange.begin() || keyRange.end != existingRange.end() ||
@ -2142,7 +2172,7 @@ static bool resumeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t e
return true; return true;
} }
ACTOR Future<Void> registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterface interf) { ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db); state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
loop { loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -2167,7 +2197,9 @@ ACTOR Future<Void> registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterfac
} }
} }
ACTOR Future<Void> handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequest req, bool isSelfReassign) { ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
AssignBlobRangeRequest req,
bool isSelfReassign) {
try { try {
if (req.continueAssignment) { if (req.continueAssignment) {
resumeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno); resumeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno);
@ -2178,6 +2210,9 @@ ACTOR Future<Void> handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ
wait(futureAndNewGranule.first); wait(futureAndNewGranule.first);
if (futureAndNewGranule.second.isValid()) { if (futureAndNewGranule.second.isValid()) {
printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str());
// WAITING ON START BUT ITS NOT AN ACTOR!!!!!!! SO WHEN handlerangeassign gets operation_cancelled, it
// won't get propogated to start
wait(futureAndNewGranule.second->start(bwData, req)); wait(futureAndNewGranule.second->start(bwData, req));
} }
} }
@ -2187,6 +2222,8 @@ ACTOR Future<Void> handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ
} }
return Void(); return Void();
} catch (Error& e) { } catch (Error& e) {
printf("BW %s GOT ERROR %s IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str(), e.name());
state Error eState = e;
if (BW_DEBUG) { if (BW_DEBUG) {
printf("AssignRange [%s - %s) got error %s\n", printf("AssignRange [%s - %s) got error %s\n",
req.keyRange.begin.printable().c_str(), req.keyRange.begin.printable().c_str(),
@ -2194,16 +2231,23 @@ ACTOR Future<Void> handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ
e.name()); e.name());
} }
/*
if (futureAndNewGranule.get().second.isValid()) {
wait(futureAndNewGranule.get().second->cancel(false));
}
*/
if (!isSelfReassign) { if (!isSelfReassign) {
if (canReplyWith(e)) { if (canReplyWith(eState)) {
req.reply.sendError(e); req.reply.sendError(eState);
} }
} }
throw;
throw eState;
} }
} }
ACTOR Future<Void> handleRangeRevoke(BlobWorkerData* bwData, RevokeBlobRangeRequest req) { ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlobRangeRequest req) {
try { try {
wait( wait(
changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false).first); changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false).first);
@ -2229,7 +2273,7 @@ ACTOR Future<Void> handleRangeRevoke(BlobWorkerData* bwData, RevokeBlobRangeRequ
// uncommitted data. This means we must ensure the data is actually committed before "committing" those writes in // uncommitted data. This means we must ensure the data is actually committed before "committing" those writes in
// the blob granule. The simplest way to do this is to have the blob worker do a periodic GRV, which is guaranteed // the blob granule. The simplest way to do this is to have the blob worker do a periodic GRV, which is guaranteed
// to be an earlier committed version. // to be an earlier committed version.
ACTOR Future<Void> runCommitVersionChecks(BlobWorkerData* bwData) { ACTOR Future<Void> runCommitVersionChecks(Reference<BlobWorkerData> bwData) {
state Transaction tr(bwData->db); state Transaction tr(bwData->db);
loop { loop {
// only do grvs to get committed version if we need it to persist delta files // only do grvs to get committed version if we need it to persist delta files
@ -2262,9 +2306,12 @@ ACTOR Future<Void> runCommitVersionChecks(BlobWorkerData* bwData) {
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
ReplyPromise<InitializeBlobWorkerReply> recruitReply, ReplyPromise<InitializeBlobWorkerReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) { Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state BlobWorkerData self(bwInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); state Reference<BlobWorkerData> self(
self.id = bwInterf.id(); new BlobWorkerData(bwInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
self.locality = bwInterf.locality; self->id = bwInterf.id();
self->locality = bwInterf.locality;
state Future<Void> collection = actorCollection(self->addActor.getFuture());
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n"); printf("Initializing blob worker s3 stuff\n");
@ -2275,19 +2322,19 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (BW_DEBUG) { if (BW_DEBUG) {
printf("BW constructing simulated backup container\n"); printf("BW constructing simulated backup container\n");
} }
self.bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
} else { } else {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("BW constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); printf("BW constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
} }
self.bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
if (BW_DEBUG) { if (BW_DEBUG) {
printf("BW constructed backup container\n"); printf("BW constructed backup container\n");
} }
} }
// register the blob worker to the system keyspace // register the blob worker to the system keyspace
wait(registerBlobWorker(&self, bwInterf)); wait(registerBlobWorker(self, bwInterf));
} catch (Error& e) { } catch (Error& e) {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("BW got backup container init error %s\n", e.name()); printf("BW got backup container init error %s\n", e.name());
@ -2307,11 +2354,8 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
rep.interf = bwInterf; rep.interf = bwInterf;
recruitReply.send(rep); recruitReply.send(rep);
state PromiseStream<Future<Void>> addActor; self->actors.add(waitFailureServer(bwInterf.waitFailure.getFuture()));
state Future<Void> collection = actorCollection(addActor.getFuture()); self->actors.add(runCommitVersionChecks(self));
addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
addActor.send(runCommitVersionChecks(&self));
try { try {
loop choose { loop choose {
@ -2319,25 +2363,25 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
/*printf("Got blob granule request [%s - %s)\n", /*printf("Got blob granule request [%s - %s)\n",
req.keyRange.begin.printable().c_str(), req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());*/ req.keyRange.end.printable().c_str());*/
++self.stats.readRequests; ++self->stats.readRequests;
++self.stats.activeReadRequests; ++self->stats.activeReadRequests;
addActor.send(handleBlobGranuleFileRequest(&self, req)); self->actors.add(handleBlobGranuleFileRequest(self, req));
} }
when(GranuleStatusStreamRequest req = waitNext(bwInterf.granuleStatusStreamRequest.getFuture())) { when(GranuleStatusStreamRequest req = waitNext(bwInterf.granuleStatusStreamRequest.getFuture())) {
if (self.managerEpochOk(req.managerEpoch)) { if (self->managerEpochOk(req.managerEpoch)) {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Worker %s got new granule status endpoint\n", self.id.toString().c_str()); printf("Worker %s got new granule status endpoint\n", self->id.toString().c_str());
} }
self.currentManagerStatusStream = req.reply; self->currentManagerStatusStream = req.reply;
} }
} }
when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) { when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) {
++self.stats.rangeAssignmentRequests; ++self->stats.rangeAssignmentRequests;
--self.stats.numRangesAssigned; --self->stats.numRangesAssigned;
state AssignBlobRangeRequest assignReq = _req; state AssignBlobRangeRequest assignReq = _req;
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Worker %s assigned range [%s - %s) @ (%lld, %lld):\n continue=%s\n", printf("Worker %s assigned range [%s - %s) @ (%lld, %lld):\n continue=%s\n",
self.id.toString().c_str(), self->id.toString().c_str(),
assignReq.keyRange.begin.printable().c_str(), assignReq.keyRange.begin.printable().c_str(),
assignReq.keyRange.end.printable().c_str(), assignReq.keyRange.end.printable().c_str(),
assignReq.managerEpoch, assignReq.managerEpoch,
@ -2345,18 +2389,18 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
assignReq.continueAssignment ? "T" : "F"); assignReq.continueAssignment ? "T" : "F");
} }
if (self.managerEpochOk(assignReq.managerEpoch)) { if (self->managerEpochOk(assignReq.managerEpoch)) {
addActor.send(handleRangeAssign(&self, assignReq, false)); self->actors.add(handleRangeAssign(self, assignReq, false));
} else { } else {
assignReq.reply.send(AssignBlobRangeReply(false)); assignReq.reply.send(AssignBlobRangeReply(false));
} }
} }
when(RevokeBlobRangeRequest _req = waitNext(bwInterf.revokeBlobRangeRequest.getFuture())) { when(RevokeBlobRangeRequest _req = waitNext(bwInterf.revokeBlobRangeRequest.getFuture())) {
state RevokeBlobRangeRequest revokeReq = _req; state RevokeBlobRangeRequest revokeReq = _req;
--self.stats.numRangesAssigned; --self->stats.numRangesAssigned;
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Worker %s revoked range [%s - %s) @ (%lld, %lld):\n dispose=%s\n", printf("Worker %s revoked range [%s - %s) @ (%lld, %lld):\n dispose=%s\n",
self.id.toString().c_str(), self->id.toString().c_str(),
revokeReq.keyRange.begin.printable().c_str(), revokeReq.keyRange.begin.printable().c_str(),
revokeReq.keyRange.end.printable().c_str(), revokeReq.keyRange.end.printable().c_str(),
revokeReq.managerEpoch, revokeReq.managerEpoch,
@ -2364,30 +2408,48 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
revokeReq.dispose ? "T" : "F"); revokeReq.dispose ? "T" : "F");
} }
if (self.managerEpochOk(revokeReq.managerEpoch)) { if (self->managerEpochOk(revokeReq.managerEpoch)) {
addActor.send(handleRangeRevoke(&self, revokeReq)); self->actors.add(handleRangeRevoke(self, revokeReq));
} else { } else {
revokeReq.reply.send(AssignBlobRangeReply(false)); revokeReq.reply.send(AssignBlobRangeReply(false));
} }
} }
when(AssignBlobRangeRequest granuleToReassign = waitNext(self.granuleUpdateErrors.getFuture())) { when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) {
addActor.send(handleRangeAssign(&self, granuleToReassign, true)); self->actors.add(handleRangeAssign(self, granuleToReassign, true));
} }
when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) {
req.reply.send(Void());
if (self->managerEpochOk(req.managerEpoch)) {
TraceEvent("BlobWorkerHalted", bwInterf.id()).detail("ReqID", req.requesterID);
printf("BW %s was halted\n", bwInterf.id().toString().c_str());
break;
}
}
// when(wait(delay(10))) { throw granule_assignment_conflict(); }
when(wait(collection)) { when(wait(collection)) {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("BW actor collection returned, exiting\n"); printf("BW actor collection returned, exiting\n");
} }
ASSERT(false); ASSERT(false);
throw internal_error(); throw granule_assignment_conflict();
} }
} }
} catch (Error& e) { } catch (Error& e) {
if (BW_DEBUG) { if (BW_DEBUG) {
printf("Blob worker got error %s, exiting\n", e.name()); printf("Blob worker got error %s, exiting\n", e.name());
} }
TraceEvent("BlobWorkerDied", self.id).error(e, true); TraceEvent("BlobWorkerDied", self->id).error(e, true);
throw e;
} }
printf("cancelling actors for BW %s\n", self->id.toString().c_str());
self->actors.clear(false);
// self->addActor.sendError(granule_assignment_conflict());
//
// self.addActor..clear(false); // tehcnically shouldn't need this since when self goes out of scope, so will
// self.actors
// at which point the cancels will be triggered?
self->dead.set(true);
return Void();
} }
// TODO add unit tests for assign/revoke range, especially version ordering // TODO add unit tests for assign/revoke range, especially version ordering