diff --git a/fdbclient/BlobWorkerInterface.h b/fdbclient/BlobWorkerInterface.h index c40a64ee28..480883b263 100644 --- a/fdbclient/BlobWorkerInterface.h +++ b/fdbclient/BlobWorkerInterface.h @@ -94,19 +94,6 @@ struct BlobGranuleFileRequest { } }; -struct AssignBlobRangeReply { - constexpr static FileIdentifier file_identifier = 6431923; - bool epochOk; // false if the worker has seen a new manager - - AssignBlobRangeReply() {} - explicit AssignBlobRangeReply(bool epochOk) : epochOk(epochOk) {} - - template - void serialize(Ar& ar) { - serializer(ar, epochOk); - } -}; - struct RevokeBlobRangeRequest { constexpr static FileIdentifier file_identifier = 4844288; Arena arena; @@ -114,7 +101,7 @@ struct RevokeBlobRangeRequest { int64_t managerEpoch; int64_t managerSeqno; bool dispose; - ReplyPromise reply; + ReplyPromise reply; RevokeBlobRangeRequest() {} @@ -142,7 +129,7 @@ struct AssignBlobRangeRequest { AssignRequestType type; - ReplyPromise reply; + ReplyPromise reply; AssignBlobRangeRequest() {} diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index c4a7b66262..5750965068 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -341,7 +341,6 @@ ACTOR Future doRangeAssignment(Reference bmData, } try { - state AssignBlobRangeReply rep; if (assignment.isAssign) { ASSERT(assignment.assign.present()); ASSERT(!assignment.revoke.present()); @@ -357,8 +356,7 @@ ACTOR Future doRangeAssignment(Reference bmData, if (bmData->workersById.count(workerID) == 0) { throw no_more_servers(); } - AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req)); - rep = _rep; + wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req)); } else { ASSERT(!assignment.assign.present()); ASSERT(assignment.revoke.present()); @@ -372,24 +370,21 @@ ACTOR Future doRangeAssignment(Reference bmData, // if that worker isn't alive anymore, this is a noop if (bmData->workersById.count(workerID)) { - AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); - rep = _rep; + wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req)); } else { return Void(); } } - if (!rep.epochOk) { - if (BM_DEBUG) { - printf("BM heard from BW that there is a new manager with higher epoch\n"); - } - if (bmData->iAmReplaced.canBeSet()) { - bmData->iAmReplaced.send(Void()); - } - } } catch (Error& e) { if (e.code() == error_code_operation_cancelled) { throw; } + if (e.code() == error_code_blob_manager_replaced) { + if (bmData->iAmReplaced.canBeSet()) { + bmData->iAmReplaced.send(Void()); + } + return Void(); + } if (e.code() == error_code_granule_assignment_conflict) { // Another blob worker already owns the range, don't retry. // And, if it was us that send the request to another worker for this range, this actor should have been @@ -565,7 +560,7 @@ ACTOR Future checkManagerLock(Reference tr, Ref bmData->iAmReplaced.send(Void()); } - throw granule_assignment_conflict(); + throw blob_manager_replaced(); } tr->addReadConflictRange(singleKeyRange(blobManagerEpochKey)); @@ -973,6 +968,31 @@ ACTOR Future deregisterBlobWorker(Reference bmData, BlobW } } +ACTOR Future haltBlobWorker(Reference bmData, BlobWorkerInterface bwInterf) { + loop { + try { + wait(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))); + return Void(); + } catch (Error& e) { + // throw other errors instead of returning? + if (e.code() == error_code_operation_cancelled) { + throw; + } + // TODO REMOVE + fmt::print("BM {0} got error {1} trying to halt blob worker {2}\n", + bmData->epoch, + e.name(), + bwInterf.id().toString()); + if (e.code() != error_code_blob_manager_replaced) { + return Void(); + } + if (bmData->iAmReplaced.canBeSet()) { + bmData->iAmReplaced.send(Void()); + } + } + } +} + ACTOR Future killBlobWorker(Reference bmData, BlobWorkerInterface bwInterf, bool registered) { state UID bwId = bwInterf.id(); @@ -1028,8 +1048,7 @@ ACTOR Future killBlobWorker(Reference bmData, BlobWorkerI if (BM_DEBUG) { fmt::print("Sending halt to BW {}\n", bwId.toString()); } - bmData->addActor.send( - brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id)))); + bmData->addActor.send(haltBlobWorker(bmData, bwInterf)); wait(deregister); @@ -1121,6 +1140,14 @@ ACTOR Future monitorBlobWorkerStatus(Reference bmData, Bl throw e; } + // if manager is replaced, die + if (e.code() == error_code_blob_manager_replaced) { + if (bmData->iAmReplaced.canBeSet()) { + bmData->iAmReplaced.send(Void()); + } + return Void(); + } + // if we got an error constructing or reading from stream that is retryable, wait and retry. // Sometimes we get connection_failed without the failure monitor tripping. One example is simulation's // rollRandomClose. In this case, just reconstruct the stream. If it was a transient failure, it works, and @@ -1766,8 +1793,7 @@ ACTOR Future haltBlobGranules(Reference bmData) { std::vector> deregisterBlobWorkers; for (auto& worker : blobWorkers) { // TODO: send a special req to blob workers so they clean up granules/CFs - bmData->addActor.send( - brokenPromiseToNever(worker.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id)))); + bmData->addActor.send(haltBlobWorker(bmData, worker)); deregisterBlobWorkers.emplace_back(deregisterBlobWorker(bmData, worker)); } waitForAll(deregisterBlobWorkers); diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index a9488a5594..5e049265f0 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -2774,7 +2774,7 @@ ACTOR Future handleRangeAssign(Reference bwData, } if (!isSelfReassign) { ASSERT(!req.reply.isSet()); - req.reply.send(AssignBlobRangeReply(true)); + req.reply.send(Void()); } return Void(); } catch (Error& e) { @@ -2807,9 +2807,9 @@ ACTOR Future handleRangeAssign(Reference bwData, ACTOR Future handleRangeRevoke(Reference bwData, RevokeBlobRangeRequest req) { try { - bool _shouldStart = - wait(changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false)); - req.reply.send(AssignBlobRangeReply(true)); + wait(success( + changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false))); + req.reply.send(Void()); return Void(); } catch (Error& e) { // FIXME: retry on error if dispose fails? @@ -2952,6 +2952,8 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, // TODO: pick a reasonable byte limit instead of just piggy-backing req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES); self->currentManagerStatusStream.set(req.reply); + } else { + req.reply.sendError(blob_manager_replaced()); } } when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) { @@ -2970,7 +2972,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (self->managerEpochOk(assignReq.managerEpoch)) { self->addActor.send(handleRangeAssign(self, assignReq, false)); } else { - assignReq.reply.send(AssignBlobRangeReply(false)); + assignReq.reply.sendError(blob_manager_replaced()); } } when(RevokeBlobRangeRequest _req = waitNext(bwInterf.revokeBlobRangeRequest.getFuture())) { @@ -2988,14 +2990,13 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (self->managerEpochOk(revokeReq.managerEpoch)) { self->addActor.send(handleRangeRevoke(self, revokeReq)); } else { - revokeReq.reply.send(AssignBlobRangeReply(false)); + revokeReq.reply.sendError(blob_manager_replaced()); } } when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) { self->addActor.send(handleRangeAssign(self, granuleToReassign, true)); } when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) { - req.reply.send(Void()); if (self->managerEpochOk(req.managerEpoch)) { TraceEvent("BlobWorkerHalted", self->id) .detail("ReqID", req.requesterID) @@ -3003,7 +3004,10 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (BW_DEBUG) { fmt::print("BW {0} was halted by manager {1}\n", bwInterf.id().toString(), req.managerEpoch); } + req.reply.send(Void()); break; + } else { + req.reply.sendError(blob_manager_replaced()); } } when(wait(collection)) { diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 9b5bffeba9..41d69a0573 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -85,6 +85,7 @@ ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob g ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" ) ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during granule materialization" ) ERROR( blob_granule_transaction_too_old, 1064, "Read version is older than blob granule history supports" ) +ERROR( blob_manager_replaced, 1065, "This blob manager has been replaced." ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )