reject blob workers joining from the wrong data center

we must run the checkblobworkers actors even on epoch 1
check for an already killed worker even right after it is recruited
This commit is contained in:
Evan Tschannen 2021-12-05 15:02:25 -08:00
parent 98b4299fb2
commit 13ef5afb9c
1 changed files with 21 additions and 14 deletions

View File

@ -200,6 +200,7 @@ struct BlobWorkerStats {
struct BlobManagerData {
UID id;
Database db;
Optional<Key> dcId;
PromiseStream<Future<Void>> addActor;
std::unordered_map<UID, BlobWorkerInterface> workersById;
@ -223,8 +224,8 @@ struct BlobManagerData {
// assigned sequence numbers
PromiseStream<RangeAssignment> rangesToAssign;
BlobManagerData(UID id, Database db)
: id(id), db(db), knownBlobRanges(false, normalKeys.end),
BlobManagerData(UID id, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), knownBlobRanges(false, normalKeys.end),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
~BlobManagerData() { printf("Destroying blob manager data for %s\n", id.toString().c_str()); }
};
@ -990,7 +991,7 @@ ACTOR Future<Void> checkBlobWorkerList(BlobManagerData* bmData, Promise<Void> wo
// add all blob workers to this new blob manager's records and start monitoring it
for (auto& worker : blobWorkers) {
if (!bmData->deadWorkers.count(worker.id())) {
if (!bmData->workerAddresses.count(worker.stableAddress())) {
if (!bmData->workerAddresses.count(worker.stableAddress()) && worker.locality.dcId() == bmData->dcId) {
bmData->workerAddresses.insert(worker.stableAddress());
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
@ -1008,12 +1009,6 @@ ACTOR Future<Void> checkBlobWorkerList(BlobManagerData* bmData, Promise<Void> wo
}
ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
// skip this entire algorithm for the first blob manager
if (bmData->epoch == 1) {
bmData->startRecruiting.trigger();
return Void();
}
state Promise<Void> workerListReady;
bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady));
wait(workerListReady.getFuture());
@ -1021,6 +1016,11 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
// Once we acknowledge the existing blob workers, we can go ahead and recruit new ones
bmData->startRecruiting.trigger();
// skip them rest of the algorithm for the first blob manager
if (bmData->epoch == 1) {
return Void();
}
// At this point, bmData->workersById is a list of all alive blob workers, but could also include some dead BWs.
// The algorithm below works as follows:
// 1. We get the existing granule mappings that were persisted by blob workers who were assigned ranges and
@ -1259,10 +1259,16 @@ ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorker
if (newBlobWorker.present()) {
BlobWorkerInterface bwi = newBlobWorker.get().interf;
if (!self->deadWorkers.count(bwi.id())) {
if (!self->workerAddresses.count(bwi.stableAddress()) && bwi.locality.dcId() == self->dcId) {
self->workerAddresses.insert(bwi.stableAddress());
self->workersById[bwi.id()] = bwi;
self->workerStats[bwi.id()] = BlobWorkerStats();
self->addActor.send(monitorBlobWorker(self, bwi));
} else if (!self->workersById.count(bwi.id())) {
self->addActor.send(killBlobWorker(self, bwi, false));
}
}
TraceEvent("BMRecruiting")
.detail("State", "Finished request")
@ -1349,7 +1355,8 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) {
state BlobManagerData self(deterministicRandom()->randomUniqueID(),
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
bmInterf.locality.dcId());
state Future<Void> collection = actorCollection(self.addActor.getFuture());