Added explicit error for old blob manager instead of attaching it to response objects

This commit is contained in:
Josh Slocum 2022-01-20 11:43:34 -06:00
parent 3be1bcd588
commit 62acbcfe19
4 changed files with 58 additions and 40 deletions

View File

@ -94,19 +94,6 @@ struct BlobGranuleFileRequest {
}
};
struct AssignBlobRangeReply {
constexpr static FileIdentifier file_identifier = 6431923;
bool epochOk; // false if the worker has seen a new manager
AssignBlobRangeReply() {}
explicit AssignBlobRangeReply(bool epochOk) : epochOk(epochOk) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, epochOk);
}
};
struct RevokeBlobRangeRequest {
constexpr static FileIdentifier file_identifier = 4844288;
Arena arena;
@ -114,7 +101,7 @@ struct RevokeBlobRangeRequest {
int64_t managerEpoch;
int64_t managerSeqno;
bool dispose;
ReplyPromise<AssignBlobRangeReply> reply;
ReplyPromise<Void> reply;
RevokeBlobRangeRequest() {}
@ -142,7 +129,7 @@ struct AssignBlobRangeRequest {
AssignRequestType type;
ReplyPromise<AssignBlobRangeReply> reply;
ReplyPromise<Void> reply;
AssignBlobRangeRequest() {}

View File

@ -341,7 +341,6 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
}
try {
state AssignBlobRangeReply rep;
if (assignment.isAssign) {
ASSERT(assignment.assign.present());
ASSERT(!assignment.revoke.present());
@ -357,8 +356,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
if (bmData->workersById.count(workerID) == 0) {
throw no_more_servers();
}
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
rep = _rep;
wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
} else {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
@ -372,24 +370,21 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
// if that worker isn't alive anymore, this is a noop
if (bmData->workersById.count(workerID)) {
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
rep = _rep;
wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
} else {
return Void();
}
}
if (!rep.epochOk) {
if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw;
}
if (e.code() == error_code_blob_manager_replaced) {
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
if (e.code() == error_code_granule_assignment_conflict) {
// Another blob worker already owns the range, don't retry.
// And, if it was us that send the request to another worker for this range, this actor should have been
@ -565,7 +560,7 @@ ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, Ref
bmData->iAmReplaced.send(Void());
}
throw granule_assignment_conflict();
throw blob_manager_replaced();
}
tr->addReadConflictRange(singleKeyRange(blobManagerEpochKey));
@ -973,6 +968,31 @@ ACTOR Future<Void> deregisterBlobWorker(Reference<BlobManagerData> bmData, BlobW
}
}
ACTOR Future<Void> haltBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf) {
loop {
try {
wait(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id)));
return Void();
} catch (Error& e) {
// throw other errors instead of returning?
if (e.code() == error_code_operation_cancelled) {
throw;
}
// TODO REMOVE
fmt::print("BM {0} got error {1} trying to halt blob worker {2}\n",
bmData->epoch,
e.name(),
bwInterf.id().toString());
if (e.code() != error_code_blob_manager_replaced) {
return Void();
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
}
}
}
ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf, bool registered) {
state UID bwId = bwInterf.id();
@ -1028,8 +1048,7 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
if (BM_DEBUG) {
fmt::print("Sending halt to BW {}\n", bwId.toString());
}
bmData->addActor.send(
brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
bmData->addActor.send(haltBlobWorker(bmData, bwInterf));
wait(deregister);
@ -1121,6 +1140,14 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
throw e;
}
// if manager is replaced, die
if (e.code() == error_code_blob_manager_replaced) {
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
// if we got an error constructing or reading from stream that is retryable, wait and retry.
// Sometimes we get connection_failed without the failure monitor tripping. One example is simulation's
// rollRandomClose. In this case, just reconstruct the stream. If it was a transient failure, it works, and
@ -1766,8 +1793,7 @@ ACTOR Future<Void> haltBlobGranules(Reference<BlobManagerData> bmData) {
std::vector<Future<Void>> deregisterBlobWorkers;
for (auto& worker : blobWorkers) {
// TODO: send a special req to blob workers so they clean up granules/CFs
bmData->addActor.send(
brokenPromiseToNever(worker.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
bmData->addActor.send(haltBlobWorker(bmData, worker));
deregisterBlobWorkers.emplace_back(deregisterBlobWorker(bmData, worker));
}
waitForAll(deregisterBlobWorkers);

View File

@ -2774,7 +2774,7 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
}
if (!isSelfReassign) {
ASSERT(!req.reply.isSet());
req.reply.send(AssignBlobRangeReply(true));
req.reply.send(Void());
}
return Void();
} catch (Error& e) {
@ -2807,9 +2807,9 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlobRangeRequest req) {
try {
bool _shouldStart =
wait(changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false));
req.reply.send(AssignBlobRangeReply(true));
wait(success(
changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false)));
req.reply.send(Void());
return Void();
} catch (Error& e) {
// FIXME: retry on error if dispose fails?
@ -2952,6 +2952,8 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
// TODO: pick a reasonable byte limit instead of just piggy-backing
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
self->currentManagerStatusStream.set(req.reply);
} else {
req.reply.sendError(blob_manager_replaced());
}
}
when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) {
@ -2970,7 +2972,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (self->managerEpochOk(assignReq.managerEpoch)) {
self->addActor.send(handleRangeAssign(self, assignReq, false));
} else {
assignReq.reply.send(AssignBlobRangeReply(false));
assignReq.reply.sendError(blob_manager_replaced());
}
}
when(RevokeBlobRangeRequest _req = waitNext(bwInterf.revokeBlobRangeRequest.getFuture())) {
@ -2988,14 +2990,13 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (self->managerEpochOk(revokeReq.managerEpoch)) {
self->addActor.send(handleRangeRevoke(self, revokeReq));
} else {
revokeReq.reply.send(AssignBlobRangeReply(false));
revokeReq.reply.sendError(blob_manager_replaced());
}
}
when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) {
self->addActor.send(handleRangeAssign(self, granuleToReassign, true));
}
when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) {
req.reply.send(Void());
if (self->managerEpochOk(req.managerEpoch)) {
TraceEvent("BlobWorkerHalted", self->id)
.detail("ReqID", req.requesterID)
@ -3003,7 +3004,10 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (BW_DEBUG) {
fmt::print("BW {0} was halted by manager {1}\n", bwInterf.id().toString(), req.managerEpoch);
}
req.reply.send(Void());
break;
} else {
req.reply.sendError(blob_manager_replaced());
}
}
when(wait(collection)) {

View File

@ -85,6 +85,7 @@ ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob g
ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" )
ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during granule materialization" )
ERROR( blob_granule_transaction_too_old, 1064, "Read version is older than blob granule history supports" )
ERROR( blob_manager_replaced, 1065, "This blob manager has been replaced." )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )