Cleaning up debugging and fixing race in blob manager recruitment

This commit is contained in:
Josh Slocum 2022-03-17 14:40:34 -05:00
parent a85b578d2b
commit 0f9e88572a
3 changed files with 52 additions and 30 deletions

View File

@ -2735,17 +2735,18 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
if (BM_DEBUG) {
fmt::print("BM {} exiting because it is replaced\n", self->epoch);
}
TraceEvent("BlobManagerReplaced", bmInterf.id()).detail("Epoch", epoch);
break;
}
when(HaltBlobManagerRequest req = waitNext(bmInterf.haltBlobManager.getFuture())) {
req.reply.send(Void());
TraceEvent("BlobManagerHalted", bmInterf.id()).detail("ReqID", req.requesterID);
TraceEvent("BlobManagerHalted", bmInterf.id()).detail("Epoch", epoch).detail("ReqID", req.requesterID);
break;
}
when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) {
wait(haltBlobGranules(self));
req.reply.send(Void());
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID);
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("Epoch", epoch).detail("ReqID", req.requesterID);
break;
}
when(BlobManagerExclusionSafetyCheckRequest exclCheckReq =
@ -2753,7 +2754,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
blobManagerExclusionSafetyCheck(self, exclCheckReq);
}
when(wait(collection)) {
TraceEvent("BlobManagerActorCollectionError");
TraceEvent(SevError, "BlobManagerActorCollectionError");
ASSERT(false);
throw internal_error();
}

View File

@ -2123,6 +2123,7 @@ ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch));
wait(tr->commit());
TraceEvent(SevDebug, "CCNextBlobManagerEpoch", self->id).detail("Epoch", newEpoch);
return newEpoch;
} catch (Error& e) {
wait(tr->onError(e));

View File

@ -524,20 +524,21 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
// Register the worker interf to cluster controller (cc) and
// re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change.
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<Optional<BlobManagerInterface>> const> bmInterf,
Reference<AsyncVar<Optional<EncryptKeyProxyInterface>> const> ekpInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<Void> registrationClient(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>> const> bmInterf,
Reference<AsyncVar<Optional<EncryptKeyProxyInterface>> const> ekpInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -567,7 +568,8 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
requestGeneration++,
ddInterf->get(),
rkInterf->get(),
bmInterf->get(),
bmInterf->get().present() ? bmInterf->get().get().second
: Optional<BlobManagerInterface>(),
ekpInterf->get(),
degraded->get(),
localConfig->lastSeenVersion(),
@ -1374,6 +1376,24 @@ ACTOR Future<Void> chaosMetricsLogger() {
}
}
// like genericactors setWhenDoneOrError, but we need to take into account the bm epoch. We don't want to reset it if
// this manager was replaced by a later manager (with a higher epoch) on this worker
ACTOR Future<Void> resetBlobManagerWhenDoneOrError(
Future<Void> blobManagerProcess,
Reference<AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>>> var,
int64_t epoch) {
try {
wait(blobManagerProcess);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
}
if (var->get().present() && var->get().get().first == epoch) {
var->set(Optional<std::pair<int64_t, BlobManagerInterface>>());
}
return Void();
}
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
@ -1395,8 +1415,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Reference<AsyncVar<Optional<BlobManagerInterface>>> bmInterf(new AsyncVar<Optional<BlobManagerInterface>>());
state int64_t myBMEpoch = -1;
state Reference<AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>>> bmEpochAndInterf(
new AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>>());
state Reference<AsyncVar<Optional<EncryptKeyProxyInterface>>> ekpInterf(
new AsyncVar<Optional<EncryptKeyProxyInterface>>());
state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
@ -1672,7 +1692,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
initialClass,
ddInterf,
rkInterf,
bmInterf,
bmEpochAndInterf,
ekpInterf,
degraded,
connRecord,
@ -1874,8 +1894,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
BlobManagerInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (bmInterf->get().present() && myBMEpoch == req.epoch) {
recruited = bmInterf->get().get();
if (bmEpochAndInterf->get().present() && bmEpochAndInterf->get().get().first == req.epoch) {
recruited = bmEpochAndInterf->get().get().second;
TEST(true); // Recruited while already a blob manager.
} else {
@ -1884,7 +1904,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// Also, not halting lets us handle the case here where the last BM had a higher
// epoch and somehow the epochs got out of order by a delayed initialize request. The one we start
// here will just halt on the lock check.
myBMEpoch = req.epoch;
startRole(Role::BLOB_MANAGER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.haltBlobManager);
@ -1892,12 +1911,13 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.blobManagerExclCheckReq);
Future<Void> blobManagerProcess = blobManager(recruited, dbInfo, req.epoch);
errorForwarders.add(forwardError(
errors,
Role::BLOB_MANAGER,
recruited.id(),
setWhenDoneOrError(blobManagerProcess, bmInterf, Optional<BlobManagerInterface>())));
bmInterf->set(Optional<BlobManagerInterface>(recruited));
errorForwarders.add(
forwardError(errors,
Role::BLOB_MANAGER,
recruited.id(),
resetBlobManagerWhenDoneOrError(blobManagerProcess, bmEpochAndInterf, req.epoch)));
bmEpochAndInterf->set(
Optional<std::pair<int64_t, BlobManagerInterface>>(std::pair(req.epoch, recruited)));
}
TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id());
req.reply.send(recruited);