diff --git a/fdbclient/BlobWorkerInterface.h b/fdbclient/BlobWorkerInterface.h index eb2be3623e..eafa8eb338 100644 --- a/fdbclient/BlobWorkerInterface.h +++ b/fdbclient/BlobWorkerInterface.h @@ -35,6 +35,8 @@ struct BlobWorkerInterface { RequestStream assignBlobRangeRequest; RequestStream revokeBlobRangeRequest; RequestStream granuleStatusStreamRequest; + RequestStream haltBlobWorker; + struct LocalityData locality; UID myId; @@ -57,6 +59,7 @@ struct BlobWorkerInterface { assignBlobRangeRequest, revokeBlobRangeRequest, granuleStatusStreamRequest, + haltBlobWorker, locality, myId); } @@ -182,4 +185,20 @@ struct GranuleStatusStreamRequest { } }; +struct HaltBlobWorkerRequest { + constexpr static FileIdentifier file_identifier = 1985879; + UID requesterID; + ReplyPromise reply; + + int64_t managerEpoch; + + HaltBlobWorkerRequest() {} + explicit HaltBlobWorkerRequest(int64_t managerEpoch, UID uid) : requesterID(uid), managerEpoch(managerEpoch) {} + + template + void serialize(Ar& ar) { + serializer(ar, managerEpoch, requesterID, reply); + } +}; + #endif diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index f1ad4ffc3f..4da0fde2b9 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -208,7 +208,7 @@ struct BlobManagerData { KeyRangeMap workerAssignments; KeyRangeMap knownBlobRanges; - Debouncer restartRecruiting; + AsyncVar restartRecruiting; std::set recruitingLocalities; // the addrs of the workers being recruited on int64_t epoch = -1; @@ -221,8 +221,7 @@ struct BlobManagerData { PromiseStream rangesToAssign; BlobManagerData(UID id, Database db) - : id(id), db(db), knownBlobRanges(false, normalKeys.end), - restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY) {} + : id(id), db(db), knownBlobRanges(false, normalKeys.end), restartRecruiting() {} ~BlobManagerData() { printf("Destroying blob manager data for %s\n", id.toString().c_str()); } }; @@ -283,6 +282,10 @@ static UID pickWorkerForAssign(BlobManagerData* bmData) { } // pick a random worker out of the eligible workers + if (eligibleWorkers.size() == 0) { + printf("%d eligible workers\n", bmData->workerStats.size()); + } + ASSERT(eligibleWorkers.size() > 0); int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size()); if (BM_DEBUG) { @@ -298,7 +301,7 @@ ACTOR Future doRangeAssignment(BlobManagerData* bmData, RangeAssignment as if (BM_DEBUG) { printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n", - workerID.toString().c_str(), + bmData->id.toString().c_str(), assignment.isAssign ? "assigning" : "revoking", assignment.keyRange.begin.printable().c_str(), assignment.keyRange.end.printable().c_str(), @@ -318,6 +321,11 @@ ACTOR Future doRangeAssignment(BlobManagerData* bmData, RangeAssignment as req.managerEpoch = bmData->epoch; req.managerSeqno = seqNo; req.continueAssignment = assignment.assign.get().continueAssignment; + + // if that worker isn't alive anymore, add the range back into the stream + if (bmData->workersById.count(workerID) == 0) { + throw granule_assignment_conflict(); // TODO: find a better error to throw + } AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req)); rep = _rep; } else { @@ -331,8 +339,13 @@ ACTOR Future doRangeAssignment(BlobManagerData* bmData, RangeAssignment as req.managerSeqno = seqNo; req.dispose = assignment.revoke.get().dispose; - AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); - rep = _rep; + // if that worker isn't alive anymore, this is a noop + if (bmData->workersById.count(workerID)) { + AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); + rep = _rep; + } else { + return Void(); + } } if (!rep.epochOk) { if (BM_DEBUG) { @@ -349,7 +362,8 @@ ACTOR Future doRangeAssignment(BlobManagerData* bmData, RangeAssignment as if (BM_DEBUG) { printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n", assignment.keyRange.begin.printable().c_str(), - assignment.keyRange.end.printable().c_str()); + assignment.keyRange.end.printable().c_str(), + workerID.toString().c_str()); } // re-send revoke to queue to handle range being un-assigned from that worker before the new one RangeAssignment revokeOld; @@ -404,7 +418,9 @@ ACTOR Future rangeAssigner(BlobManagerData* bmData) { // Ensure range isn't currently assigned anywhere, and there is only 1 intersecting range auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange); int count = 0; + printf("intersecting ranges in currentAssignments:\n"); for (auto& it : currentAssignments) { + printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str()); if (assignment.assign.get().continueAssignment) { ASSERT(assignment.worker.present()); ASSERT(it.value() == assignment.worker.get()); @@ -419,6 +435,11 @@ ACTOR Future rangeAssigner(BlobManagerData* bmData) { bmData->workerAssignments.insert(assignment.keyRange, workerId); bmData->workerStats[workerId].numGranulesAssigned += 1; + printf("current ranges after inserting assign: \n"); + for (auto it : bmData->workerAssignments.ranges()) { + printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str()); + } + // FIXME: if range is assign, have some sort of semaphore for outstanding assignments so we don't assign // a ton ranges at once and blow up FDB with reading initial snapshots. bmData->addActor.send(doRangeAssignment(bmData, assignment, workerId, seqNo)); @@ -432,12 +453,20 @@ ACTOR Future rangeAssigner(BlobManagerData* bmData) { // It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of // the same logical change - bmData->workerStats[it.value()].numGranulesAssigned -= 1; - if (!assignment.worker.present() || assignment.worker.get() == it.value()) - bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo)); + + if (bmData->workerStats.count(it.value())) { + bmData->workerStats[it.value()].numGranulesAssigned -= 1; + } + + // revoke the range for the worker that owns it, not the worker specified in the revoke + bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo)); } bmData->workerAssignments.insert(assignment.keyRange, UID()); + printf("current ranges after inserting revoke: \n"); + for (auto it : bmData->workerAssignments.ranges()) { + printf("[%s - %s]\n", it.begin().printable().c_str(), it.end().printable().c_str()); + } } } } @@ -701,6 +730,73 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, return Void(); } +void reassignRanges(BlobManagerData* bmData, UID bwId) { + printf("taking back ranges for worker %s\n", bwId.toString().c_str()); + // for every range owned by this blob worker, we want to + // - send a revoke request for that range to the blob worker + // - add the range back to the stream of ranges to be assigned + for (auto& it : bmData->workerAssignments.ranges()) { + if (it.cvalue() == bwId) { + // Send revoke request to worker + RangeAssignment raRevoke; + raRevoke.isAssign = false; + raRevoke.worker = bwId; + raRevoke.keyRange = it.range(); + raRevoke.revoke = RangeRevokeData(false); + bmData->rangesToAssign.send(raRevoke); + + // Add range back into the stream of ranges to be assigned + RangeAssignment raAssign; + raAssign.isAssign = true; + raAssign.worker = Optional(); + raAssign.keyRange = it.range(); + raAssign.assign = RangeAssignmentData(false); // not a continue + bmData->rangesToAssign.send(raAssign); + } + } +} + +void killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) { + UID bwId = bwInterf.id(); + + // Remove blob worker from stats map so that when we try to find a worker to takeover the range, + // the one we just killed isn't considered. + // Remove it from workersById also since otherwise that addr will remain excluded + // when we try to recruit new blob workers. + printf("removing bw %s from BM workerStats\n", bwId.toString().c_str()); + bmData->workerStats.erase(bwId); + bmData->workersById.erase(bwId); + + // for every range owned by this blob worker, we want to + // - send a revoke request for that range to the blob worker + // - add the range back to the stream of ranges to be assigned + printf("taking back ranges from bw %s\n", bwId.toString().c_str()); + for (auto& it : bmData->workerAssignments.ranges()) { + if (it.cvalue() == bwId) { + // Send revoke request to worker + RangeAssignment raRevoke; + raRevoke.isAssign = false; + raRevoke.worker = bwId; + raRevoke.keyRange = it.range(); + raRevoke.revoke = RangeRevokeData(false); + bmData->rangesToAssign.send(raRevoke); + + // Add range back into the stream of ranges to be assigned + RangeAssignment raAssign; + raAssign.isAssign = true; + raAssign.worker = Optional(); + raAssign.keyRange = it.range(); + raAssign.assign = RangeAssignmentData(false); // not a continue + bmData->rangesToAssign.send(raAssign); + } + } + + // Send halt to blob worker, with no expectation of hearing back + printf("sending halt to bw %s\n", bwId.toString().c_str()); + bmData->addActor.send( + brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id)))); +} + ACTOR Future monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) { state KeyRangeMap> lastSeenSeqno; // outer loop handles reconstructing stream if it got a retryable error @@ -711,6 +807,7 @@ ACTOR Future monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn // read from stream until worker fails (should never get explicit end_of_stream) loop { GranuleStatusReply rep = waitNext(statusStream.getFuture()); + if (BM_DEBUG) { printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n", bmData->epoch, @@ -723,7 +820,8 @@ ACTOR Future monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn } if (rep.epoch > bmData->epoch) { if (BM_DEBUG) { - printf("BM heard from BW that there is a new manager with higher epoch\n"); + printf("BM heard from BW %s that there is a new manager with higher epoch\n", + bwInterf.id().toString().c_str()); } if (bmData->iAmReplaced.canBeSet()) { bmData->iAmReplaced.send(Void()); @@ -734,8 +832,12 @@ ACTOR Future monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn // to split the range. ASSERT(rep.doSplit); - // FIXME: only evaluate for split if this worker currently owns the granule in this blob manager's - // mapping + auto currGranuleAssignment = bmData->workerAssignments.rangeContaining(rep.granuleRange.begin); + if (!(currGranuleAssignment.begin() == rep.granuleRange.begin && + currGranuleAssignment.end() == rep.granuleRange.end && + currGranuleAssignment.cvalue() == bwInterf.id())) { + continue; + } auto lastReqForGranule = lastSeenSeqno.rangeContaining(rep.granuleRange.begin); if (rep.granuleRange.begin == lastReqForGranule.begin() && @@ -797,10 +899,7 @@ ACTOR Future monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str()); } TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()); - // get all of its ranges - // send revoke request to get back all its ranges - // send halt (look at rangeMover) - // send all its ranges to assignranges stream + killBlobWorker(bmData, bwInterf); return Void(); } when(wait(monitorStatus)) { @@ -820,6 +919,13 @@ ACTOR Future monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac TraceEvent(SevError, "BWMonitoringFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()).error(e); throw e; } + + // Trigger recruitment for a new blob worker + printf("restarting recruitment in monitorblobworker\n"); + bmData->restartRecruiting.trigger(); + + printf("about to stop monitoring %s\n", bwInterf.id().toString().c_str()); + return Void(); } // TODO this is only for chaos testing right now!! REMOVE LATER @@ -937,8 +1043,8 @@ ACTOR Future initializeBlobWorker(BlobManagerData* self, RecruitBlobWorker if (newBlobWorker.present()) { BlobWorkerInterface bwi = newBlobWorker.get().interf; - self->workersById.insert({ bwi.id(), bwi }); - self->workerStats.insert({ bwi.id(), BlobWorkerStats() }); + self->workersById[bwi.id()] = bwi; + self->workerStats[bwi.id()] = BlobWorkerStats(); self->addActor.send(monitorBlobWorker(self, bwi)); TraceEvent("BMRecruiting") @@ -984,6 +1090,10 @@ ACTOR Future blobWorkerRecruiter( } TraceEvent("BMRecruiting").detail("State", "Sending request to CC"); + printf("EXCLUDING THE FOLLOWING IN REQ:\n"); + for (auto addr : recruitReq.excludeAddresses) { + printf("- %s\n", addr.toString().c_str()); + } if (!fCandidateWorker.isValid() || fCandidateWorker.isReady() || recruitReq.excludeAddresses != lastRequest.excludeAddresses) { @@ -1003,7 +1113,7 @@ ACTOR Future blobWorkerRecruiter( when(wait(recruitBlobWorker->onChange())) { fCandidateWorker = Future(); } // signal used to restart the loop and try to recruit the next blob worker - when(wait(self->restartRecruiting.onTrigger())) {} + when(wait(self->restartRecruiting.onChange())) { printf("RESTARTED RECRUITING. BACK TO TOP\n"); } } wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager)); } catch (Error& e) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 0782cbddab..f4f648c493 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -34,6 +34,7 @@ #include "fdbserver/MutationTracking.h" #include "fdbserver/WaitFailure.h" #include "flow/Arena.h" +#include "flow/Error.h" #include "flow/IRandom.h" #include "flow/actorcompiler.h" // has to be last include #include "flow/flow.h" @@ -41,8 +42,6 @@ #define BW_DEBUG true #define BW_REQUEST_DEBUG false -// FIXME: change all BlobWorkerData* to Reference to avoid segfaults if core loop gets error - // TODO add comments + documentation struct BlobFileIndex { Version version; @@ -76,10 +75,12 @@ struct GranuleChangeFeedInfo { // FIXME: the circular dependencies here are getting kind of gross struct GranuleMetadata; struct BlobWorkerData; -ACTOR Future persistAssignWorkerRange(BlobWorkerData* bwData, AssignBlobRangeRequest req); -ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference metadata); +ACTOR Future persistAssignWorkerRange(Reference bwData, + AssignBlobRangeRequest req); +ACTOR Future blobGranuleUpdateFiles(Reference bwData, Reference metadata); + +// for a range that may or may not be set -// for a range that is active struct GranuleMetadata : NonCopyable, ReferenceCounted { KeyRange keyRange; @@ -112,12 +113,17 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { AssignBlobRangeRequest originalReq; - Future start(BlobWorkerData* bwData, AssignBlobRangeRequest req) { + Future start(Reference bwData, AssignBlobRangeRequest req) { originalReq = req; assignFuture = persistAssignWorkerRange(bwData, req); fileUpdaterFuture = blobGranuleUpdateFiles(bwData, Reference::addRef(this)); + // bwData->actors.add(blobGranuleUpdateFiles(bwData, Reference::addRef(this))); + // this could be the cause of the seg fault. since this is not being waited on, + // when start get cancelled, blobGranuleUpdateFiles won't get cancelled. so instead I added it to actors, so + // that it is explicitly cancelled. maybe this fixes it? return success(assignFuture); + // return Void(); } void resume() { @@ -145,7 +151,6 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { } }; -// for a range that may or may not be set struct GranuleRangeMetadata { int64_t lastEpoch; int64_t lastSeqno; @@ -154,14 +159,25 @@ struct GranuleRangeMetadata { GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference activeMetadata) : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} + /* + ~GranuleRangeMetadata() { + if (activeMetadata.isValid()) { + activeMetadata->cancel(false); + } + } + */ }; -struct BlobWorkerData { +struct BlobWorkerData : NonCopyable, ReferenceCounted { UID id; Database db; + AsyncVar dead; BlobWorkerStats stats; + PromiseStream> addActor; + ActorCollection actors{ false }; + LocalityData locality; int64_t currentManagerEpoch = -1; @@ -178,7 +194,8 @@ struct BlobWorkerData { PromiseStream granuleUpdateErrors; - BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {} + BlobWorkerData(UID id, Database db) + : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), actors(false), dead(false) {} ~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); } bool managerEpochOk(int64_t epoch) { @@ -481,7 +498,7 @@ static Value getFileValue(std::string fname, int64_t offset, int64_t length) { // the data in it may not yet be committed, and even though previous delta fiels with lower versioned data may still be // in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files // exist. Before updating FDB, we ensure the version is committed and all previous delta files have updated FDB. -ACTOR Future writeDeltaFile(BlobWorkerData* bwData, +ACTOR Future writeDeltaFile(Reference bwData, KeyRange keyRange, int64_t epoch, int64_t seqno, @@ -589,7 +606,7 @@ ACTOR Future writeDeltaFile(BlobWorkerData* bwData, } } -ACTOR Future writeSnapshot(BlobWorkerData* bwData, +ACTOR Future writeSnapshot(Reference bwData, KeyRange keyRange, int64_t epoch, int64_t seqno, @@ -707,7 +724,8 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, return BlobFileIndex(version, fname, 0, serialized.size()); } -ACTOR Future dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, Reference metadata) { +ACTOR Future dumpInitialSnapshotFromFDB(Reference bwData, + Reference metadata) { if (BW_DEBUG) { printf("Dumping snapshot from FDB for [%s - %s)\n", metadata->keyRange.begin.printable().c_str(), @@ -755,7 +773,7 @@ ACTOR Future dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R } // files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule. -ACTOR Future compactFromBlob(BlobWorkerData* bwData, +ACTOR Future compactFromBlob(Reference bwData, Reference metadata, GranuleFiles files) { wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); @@ -874,7 +892,7 @@ static bool filterOldMutations(const KeyRange& range, return false; } -ACTOR Future handleCompletedDeltaFile(BlobWorkerData* bwData, +ACTOR Future handleCompletedDeltaFile(Reference bwData, Reference metadata, BlobFileIndex completedDeltaFile, Key cfKey, @@ -892,12 +910,15 @@ ACTOR Future handleCompletedDeltaFile(BlobWorkerData* bwData, // have completed // FIXME: also have these be async, have each pop change feed wait on the prior one, wait on them before // re-snapshotting + printf("in handleCompletedDeltaFile for BW %s\n", bwData->id.toString().c_str()); Future popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version); wait(popFuture); + printf("popChangeFeedMutations returned\n"); } while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) { rollbacksInProgress.pop_front(); } + printf("removed rollbacks\n"); return Void(); } @@ -1026,7 +1047,7 @@ static Version doGranuleRollback(Reference metadata, // updater for a single granule // TODO: this is getting kind of large. Should try to split out this actor if it continues to grow? // FIXME: handle errors here (forward errors) -ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference metadata) { +ACTOR Future blobGranuleUpdateFiles(Reference bwData, Reference metadata) { state PromiseStream>> oldChangeFeedStream; state PromiseStream>> changeFeedStream; state Future inFlightBlobSnapshot; @@ -1146,6 +1167,7 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, ReferencekeyRange*/); } else { readOldChangeFeed = false; + printf("before getChangeFeedStream, my ID is %s\n", bwData->id.toString().c_str()); changeFeedFuture = bwData->db->getChangeFeedStream( changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange); } @@ -1347,12 +1369,15 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference result = wait(timeout(metadata->resumeSnapshot.getFuture(), 1.0)); if (result.present()) { break; + } else if (bwData->dead.get()) { + throw actor_cancelled(); } // FIXME: re-trigger this loop if blob manager status stream changes if (BW_DEBUG) { - printf("Granule [%s - %s)\n, hasn't heard back from BM, re-sending status\n", + printf("Granule [%s - %s)\n, hasn't heard back from BM in BW %s, re-sending status\n", metadata->keyRange.begin.printable().c_str(), - metadata->keyRange.end.printable().c_str()); + metadata->keyRange.end.printable().c_str(), + bwData->id.toString().c_str()); } } @@ -1528,6 +1553,7 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference waitForVersion(Reference metadata, Version return waitForVersionActor(metadata, v); } -ACTOR Future handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGranuleFileRequest req) { +ACTOR Future handleBlobGranuleFileRequest(Reference bwData, BlobGranuleFileRequest req) { try { // TODO REMOVE in api V2 ASSERT(req.beginVersion == 0); @@ -1713,7 +1739,10 @@ ACTOR Future handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran choose { when(wait(waitForVersionFuture)) {} when(wait(metadata->rollbackCount.onChange())) {} - when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } + when(wait(metadata->cancelled.getFuture())) { + printf("metadata was cancelled\n"); + throw wrong_shard_server(); + } } if (rollbackCount == metadata->rollbackCount.get()) { @@ -1821,7 +1850,8 @@ ACTOR Future handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran return Void(); } -ACTOR Future persistAssignWorkerRange(BlobWorkerData* bwData, AssignBlobRangeRequest req) { +ACTOR Future persistAssignWorkerRange(Reference bwData, + AssignBlobRangeRequest req) { ASSERT(!req.continueAssignment); state Transaction tr(bwData->db); state Key lockKey = granuleLockKey(req.keyRange); @@ -1971,7 +2001,7 @@ ACTOR Future persistAssignWorkerRange(BlobWorkerData* bwD } } -static GranuleRangeMetadata constructActiveBlobRange(BlobWorkerData* bwData, +static GranuleRangeMetadata constructActiveBlobRange(Reference bwData, KeyRange keyRange, int64_t epoch, int64_t seqno) { @@ -2013,7 +2043,7 @@ static bool newerRangeAssignment(GranuleRangeMetadata oldMetadata, int64_t epoch // Returns future to wait on to ensure prior work of other granules is done before responding to the manager with a // successful assignment And if the change produced a new granule that needs to start doing work, returns the new // granule so that the caller can start() it with the appropriate starting state. -static std::pair, Reference> changeBlobRange(BlobWorkerData* bwData, +static std::pair, Reference> changeBlobRange(Reference bwData, KeyRange keyRange, int64_t epoch, int64_t seqno, @@ -2105,7 +2135,7 @@ static std::pair, Reference> changeBlobRange(BlobW return std::pair(waitForAll(futures), newMetadata.activeMetadata); } -static bool resumeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t epoch, int64_t seqno) { +static bool resumeBlobRange(Reference bwData, KeyRange keyRange, int64_t epoch, int64_t seqno) { auto existingRange = bwData->granuleMetadata.rangeContaining(keyRange.begin); // if range boundaries don't match, or this (epoch, seqno) is old or the granule is inactive, ignore if (keyRange.begin != existingRange.begin() || keyRange.end != existingRange.end() || @@ -2142,7 +2172,7 @@ static bool resumeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t e return true; } -ACTOR Future registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterface interf) { +ACTOR Future registerBlobWorker(Reference bwData, BlobWorkerInterface interf) { state Reference tr = makeReference(bwData->db); loop { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); @@ -2167,7 +2197,9 @@ ACTOR Future registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterfac } } -ACTOR Future handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequest req, bool isSelfReassign) { +ACTOR Future handleRangeAssign(Reference bwData, + AssignBlobRangeRequest req, + bool isSelfReassign) { try { if (req.continueAssignment) { resumeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno); @@ -2178,6 +2210,9 @@ ACTOR Future handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ wait(futureAndNewGranule.first); if (futureAndNewGranule.second.isValid()) { + printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str()); + // WAITING ON START BUT ITS NOT AN ACTOR!!!!!!! SO WHEN handlerangeassign gets operation_cancelled, it + // won't get propogated to start wait(futureAndNewGranule.second->start(bwData, req)); } } @@ -2187,6 +2222,8 @@ ACTOR Future handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ } return Void(); } catch (Error& e) { + printf("BW %s GOT ERROR %s IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str(), e.name()); + state Error eState = e; if (BW_DEBUG) { printf("AssignRange [%s - %s) got error %s\n", req.keyRange.begin.printable().c_str(), @@ -2194,16 +2231,23 @@ ACTOR Future handleRangeAssign(BlobWorkerData* bwData, AssignBlobRangeRequ e.name()); } + /* + if (futureAndNewGranule.get().second.isValid()) { + wait(futureAndNewGranule.get().second->cancel(false)); + } + */ + if (!isSelfReassign) { - if (canReplyWith(e)) { - req.reply.sendError(e); + if (canReplyWith(eState)) { + req.reply.sendError(eState); } } - throw; + + throw eState; } } -ACTOR Future handleRangeRevoke(BlobWorkerData* bwData, RevokeBlobRangeRequest req) { +ACTOR Future handleRangeRevoke(Reference bwData, RevokeBlobRangeRequest req) { try { wait( changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false).first); @@ -2229,7 +2273,7 @@ ACTOR Future handleRangeRevoke(BlobWorkerData* bwData, RevokeBlobRangeRequ // uncommitted data. This means we must ensure the data is actually committed before "committing" those writes in // the blob granule. The simplest way to do this is to have the blob worker do a periodic GRV, which is guaranteed // to be an earlier committed version. -ACTOR Future runCommitVersionChecks(BlobWorkerData* bwData) { +ACTOR Future runCommitVersionChecks(Reference bwData) { state Transaction tr(bwData->db); loop { // only do grvs to get committed version if we need it to persist delta files @@ -2262,9 +2306,12 @@ ACTOR Future runCommitVersionChecks(BlobWorkerData* bwData) { ACTOR Future blobWorker(BlobWorkerInterface bwInterf, ReplyPromise recruitReply, Reference const> dbInfo) { - state BlobWorkerData self(bwInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); - self.id = bwInterf.id(); - self.locality = bwInterf.locality; + state Reference self( + new BlobWorkerData(bwInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True))); + self->id = bwInterf.id(); + self->locality = bwInterf.locality; + + state Future collection = actorCollection(self->addActor.getFuture()); if (BW_DEBUG) { printf("Initializing blob worker s3 stuff\n"); @@ -2275,19 +2322,19 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (BW_DEBUG) { printf("BW constructing simulated backup container\n"); } - self.bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); } else { if (BW_DEBUG) { printf("BW constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); } - self.bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); if (BW_DEBUG) { printf("BW constructed backup container\n"); } } // register the blob worker to the system keyspace - wait(registerBlobWorker(&self, bwInterf)); + wait(registerBlobWorker(self, bwInterf)); } catch (Error& e) { if (BW_DEBUG) { printf("BW got backup container init error %s\n", e.name()); @@ -2307,11 +2354,8 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, rep.interf = bwInterf; recruitReply.send(rep); - state PromiseStream> addActor; - state Future collection = actorCollection(addActor.getFuture()); - - addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture())); - addActor.send(runCommitVersionChecks(&self)); + self->actors.add(waitFailureServer(bwInterf.waitFailure.getFuture())); + self->actors.add(runCommitVersionChecks(self)); try { loop choose { @@ -2319,25 +2363,25 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, /*printf("Got blob granule request [%s - %s)\n", req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str());*/ - ++self.stats.readRequests; - ++self.stats.activeReadRequests; - addActor.send(handleBlobGranuleFileRequest(&self, req)); + ++self->stats.readRequests; + ++self->stats.activeReadRequests; + self->actors.add(handleBlobGranuleFileRequest(self, req)); } when(GranuleStatusStreamRequest req = waitNext(bwInterf.granuleStatusStreamRequest.getFuture())) { - if (self.managerEpochOk(req.managerEpoch)) { + if (self->managerEpochOk(req.managerEpoch)) { if (BW_DEBUG) { - printf("Worker %s got new granule status endpoint\n", self.id.toString().c_str()); + printf("Worker %s got new granule status endpoint\n", self->id.toString().c_str()); } - self.currentManagerStatusStream = req.reply; + self->currentManagerStatusStream = req.reply; } } when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) { - ++self.stats.rangeAssignmentRequests; - --self.stats.numRangesAssigned; + ++self->stats.rangeAssignmentRequests; + --self->stats.numRangesAssigned; state AssignBlobRangeRequest assignReq = _req; if (BW_DEBUG) { printf("Worker %s assigned range [%s - %s) @ (%lld, %lld):\n continue=%s\n", - self.id.toString().c_str(), + self->id.toString().c_str(), assignReq.keyRange.begin.printable().c_str(), assignReq.keyRange.end.printable().c_str(), assignReq.managerEpoch, @@ -2345,18 +2389,18 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, assignReq.continueAssignment ? "T" : "F"); } - if (self.managerEpochOk(assignReq.managerEpoch)) { - addActor.send(handleRangeAssign(&self, assignReq, false)); + if (self->managerEpochOk(assignReq.managerEpoch)) { + self->actors.add(handleRangeAssign(self, assignReq, false)); } else { assignReq.reply.send(AssignBlobRangeReply(false)); } } when(RevokeBlobRangeRequest _req = waitNext(bwInterf.revokeBlobRangeRequest.getFuture())) { state RevokeBlobRangeRequest revokeReq = _req; - --self.stats.numRangesAssigned; + --self->stats.numRangesAssigned; if (BW_DEBUG) { printf("Worker %s revoked range [%s - %s) @ (%lld, %lld):\n dispose=%s\n", - self.id.toString().c_str(), + self->id.toString().c_str(), revokeReq.keyRange.begin.printable().c_str(), revokeReq.keyRange.end.printable().c_str(), revokeReq.managerEpoch, @@ -2364,30 +2408,48 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, revokeReq.dispose ? "T" : "F"); } - if (self.managerEpochOk(revokeReq.managerEpoch)) { - addActor.send(handleRangeRevoke(&self, revokeReq)); + if (self->managerEpochOk(revokeReq.managerEpoch)) { + self->actors.add(handleRangeRevoke(self, revokeReq)); } else { revokeReq.reply.send(AssignBlobRangeReply(false)); } } - when(AssignBlobRangeRequest granuleToReassign = waitNext(self.granuleUpdateErrors.getFuture())) { - addActor.send(handleRangeAssign(&self, granuleToReassign, true)); + when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) { + self->actors.add(handleRangeAssign(self, granuleToReassign, true)); } + when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) { + req.reply.send(Void()); + if (self->managerEpochOk(req.managerEpoch)) { + TraceEvent("BlobWorkerHalted", bwInterf.id()).detail("ReqID", req.requesterID); + printf("BW %s was halted\n", bwInterf.id().toString().c_str()); + break; + } + } + // when(wait(delay(10))) { throw granule_assignment_conflict(); } when(wait(collection)) { if (BW_DEBUG) { printf("BW actor collection returned, exiting\n"); } ASSERT(false); - throw internal_error(); + throw granule_assignment_conflict(); } } } catch (Error& e) { if (BW_DEBUG) { printf("Blob worker got error %s, exiting\n", e.name()); } - TraceEvent("BlobWorkerDied", self.id).error(e, true); - throw e; + TraceEvent("BlobWorkerDied", self->id).error(e, true); } + + printf("cancelling actors for BW %s\n", self->id.toString().c_str()); + self->actors.clear(false); + // self->addActor.sendError(granule_assignment_conflict()); + // + // self.addActor..clear(false); // tehcnically shouldn't need this since when self goes out of scope, so will + // self.actors + // at which point the cancels will be triggered? + self->dead.set(true); + return Void(); } // TODO add unit tests for assign/revoke range, especially version ordering