fix: do not allow more than one blob worker per address

This commit is contained in:
Evan Tschannen 2021-12-03 10:29:22 -08:00
parent f12ac4468e
commit f2838740f1
2 changed files with 49 additions and 14 deletions

View File

@ -204,6 +204,7 @@ struct BlobManagerData {
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, BlobWorkerStats> workerStats; // mapping between workerID -> workerStats
std::unordered_set<NetworkAddress> workerAddresses;
KeyRangeMap<UID> workerAssignments;
KeyRangeMap<bool> knownBlobRanges;
@ -771,15 +772,19 @@ ACTOR Future<Void> deregisterBlobWorker(BlobManagerData* bmData, BlobWorkerInter
}
}
ACTOR Future<Void> killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
ACTOR Future<Void> killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf, bool registered) {
UID bwId = bwInterf.id();
// Remove blob worker from stats map so that when we try to find a worker to takeover the range,
// the one we just killed isn't considered.
// Remove it from workersById also since otherwise that worker addr will remain excluded
// when we try to recruit new blob workers.
bmData->workerStats.erase(bwId);
bmData->workersById.erase(bwId);
if (registered) {
bmData->workerStats.erase(bwId);
bmData->workersById.erase(bwId);
bmData->workerAddresses.erase(bwInterf.stableAddress());
}
// Remove blob worker from persisted list of blob workers
Future<Void> deregister = deregisterBlobWorker(bmData, bwInterf);
@ -961,7 +966,7 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
}
// kill the blob worker
wait(killBlobWorker(bmData, bwInterf));
wait(killBlobWorker(bmData, bwInterf, true));
if (BM_DEBUG) {
printf("No longer monitoring BW %s\n", bwInterf.id().toString().c_str());
@ -969,6 +974,23 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
return Void();
}
ACTOR Future<Void> checkBlobWorkerList(BlobManagerData* bmData) {
loop {
wait(delay(1.0));
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
for (auto& worker : blobWorkers) {
if (!bmData->workerAddresses.count(worker.stableAddress())) {
bmData->workerAddresses.insert(worker.stableAddress());
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
bmData->addActor.send(monitorBlobWorker(bmData, worker));
} else if (!bmData->workersById.count(worker.id())) {
bmData->addActor.send(killBlobWorker(bmData, worker, false));
}
}
}
}
ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
// skip this entire algorithm for the first blob manager
if (bmData->epoch == 1) {
@ -982,12 +1004,19 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
// add all blob workers to this new blob manager's records and start monitoring it
for (auto worker : blobWorkers) {
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
bmData->addActor.send(monitorBlobWorker(bmData, worker));
for (auto& worker : blobWorkers) {
if (!bmData->workerAddresses.count(worker.stableAddress())) {
bmData->workerAddresses.insert(worker.stableAddress());
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
bmData->addActor.send(monitorBlobWorker(bmData, worker));
} else if (!bmData->workersById.count(worker.id())) {
bmData->addActor.send(killBlobWorker(bmData, worker, false));
}
}
bmData->addActor.send(checkBlobWorkerList(bmData));
// Once we acknowledge the existing blob workers, we can go ahead and recruit new ones
bmData->startRecruiting.trigger();
@ -1229,6 +1258,7 @@ ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorker
if (newBlobWorker.present()) {
BlobWorkerInterface bwi = newBlobWorker.get().interf;
self->workerAddresses.insert(bwi.stableAddress());
self->workersById[bwi.id()] = bwi;
self->workerStats[bwi.id()] = BlobWorkerStats();
self->addActor.send(monitorBlobWorker(self, bwi));

View File

@ -1394,6 +1394,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state std::map<SharedLogsKey, SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
state std::string coordFolder = abspath(_coordFolder);
@ -2036,13 +2037,17 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
}
}
when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) {
BlobWorkerInterface recruited(locality, req.interfaceId);
recruited.initEndpoints();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id());
if (!blobWorkerCache.exists(req.reqId)) {
BlobWorkerInterface recruited(locality, req.interfaceId);
recruited.initEndpoints();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id());
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
} else {
forwardPromise(req.reply, blobWorkerCache.get(req.reqId));
}
}
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
LocalLineage _;