ensure a worker cannot run multiple blob worker roles

This commit is contained in:
Evan Tschannen 2023-02-23 09:51:26 -08:00
parent 9047edd14f
commit a581a55452
5 changed files with 77 additions and 51 deletions

View File

@ -1040,8 +1040,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_WORKER_DO_REJECT_WHEN_FULL, true ); if ( randomize && BUGGIFY ) BLOB_WORKER_DO_REJECT_WHEN_FULL = false;
init( BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD, 0.9 );
init( BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY, 30.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY = deterministicRandom()->randomInt(0, 10) - 1;
init( BLOB_WORKER_STORE_TYPE, 3 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 3;
init( BLOB_WORKER_REJOIN_TIME, 10.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 10.0;
init( BLOB_WORKER_DISK_ENABLED, false ); if ( randomize && BUGGIFY ) BLOB_WORKER_DISK_ENABLED = true;
init( BLOB_WORKER_STORE_TYPE, 3 );
init( BLOB_WORKER_REJOIN_TIME, 10.0 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );

View File

@ -1016,6 +1016,7 @@ public:
bool BLOB_WORKER_DO_REJECT_WHEN_FULL;
double BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD;
double BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY;
bool BLOB_WORKER_DISK_ENABLED;
int BLOB_WORKER_STORE_TYPE;
double BLOB_WORKER_REJOIN_TIME;

View File

@ -2760,6 +2760,8 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
successor = Optional<UID>();
}
CODE_PROBE(successor.present(), "Blob worker has affinity after failure");
for (auto& it : rangesToMove) {
// Send revoke request
RangeAssignment raRevoke;
@ -3802,6 +3804,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
}
while (bmData->workerAffinities.count(workerId)) {
workerId = bmData->workerAffinities[workerId];
CODE_PROBE(true, "Blob worker has affinity after reboot");
}
// prevent racing status updates from old owner from causing issues until this request gets sent out
// properly
@ -3938,8 +3941,7 @@ ACTOR Future<Void> initializeBlobWorker(Reference<BlobManagerData> self, Recruit
state InitializeBlobWorkerRequest initReq;
initReq.reqId = deterministicRandom()->randomUniqueID();
initReq.interfaceId = interfaceId;
initReq.storeType = (KeyValueStoreType::StoreType)(
SERVER_KNOBS->BLOB_WORKER_STORE_TYPE < 0 ? KeyValueStoreType::END : SERVER_KNOBS->BLOB_WORKER_STORE_TYPE);
initReq.storeType = (KeyValueStoreType::StoreType)(SERVER_KNOBS->BLOB_WORKER_STORE_TYPE);
// acknowledge that this worker is currently being recruited on
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());

View File

@ -5580,9 +5580,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
// Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption
// state using the DB Config rather than passing it through the initalization request for the blob manager and
// blob worker
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString());
state Future<DatabaseConfiguration> configFuture = getDatabaseConfiguration(cx);
if (self->storage) {
wait(self->storage->init());
@ -5626,6 +5624,10 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
rep.interf = bwInterf;
recruitReply.send(rep);
DatabaseConfiguration config = wait(configFuture);
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString());
TraceEvent("BlobWorkerInit", self->id).log();
wait(blobWorkerCore(bwInterf, self));
return Void();
@ -5642,7 +5644,7 @@ ACTOR Future<UID> restorePersistentState(Reference<BlobWorkerData> self) {
wait(waitForAll(std::vector{ fID }));
if (!fID.get().present()) {
if (!SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED || !fID.get().present()) {
CODE_PROBE(true, "Restored uninitialized blob worker");
throw worker_removed();
}
@ -5665,12 +5667,14 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
wait(self->storage->commit());
state UID previous = wait(restorePersistentState(self));
if (recovered.canBeSet()) {
recovered.send(Void());
}
// Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption
// state using the DB Config rather than passing it through the initalization request for the blob manager and
// blob worker
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString());
state Future<DatabaseConfiguration> configFuture = getDatabaseConfiguration(cx);
if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n");
@ -5695,13 +5699,16 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
throw e;
}
if (recovered.canBeSet()) {
recovered.send(Void());
}
// Only update the ID on disk after registering with the database so that if this process is rebooted after
// registration and before the ID is updated on disk the next generation will consider its ID from two
// generations ago its successor.
self->storage->set(KeyValueRef(persistID, BinaryWriter::toValue(self->id, Unversioned())));
wait(self->storage->commit());
DatabaseConfiguration config = wait(configFuture);
self->encryptMode = config.encryptionAtRestMode;
TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString());
TraceEvent("BlobWorkerInit", self->id).log();
wait(blobWorkerCore(bwInterf, self));
return Void();

View File

@ -1790,7 +1790,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state std::map<SharedLogsKey, std::vector<SharedLogsValue>> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
state Future<Void> blobWorkerFuture = Void();
state WorkerSnapRequest lastSnapReq;
// Here the key is UID+role, as we still send duplicate requests to a process which is both storage and tlog
@ -2031,41 +2031,56 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
logData.back().uid = s.storeID;
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
} else if (s.storedComponent == DiskStore::BlobWorker) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker;
if (blobWorkerFuture.isReady()) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker;
BlobWorkerInterface recruited(locality, deterministicRandom()->randomUniqueID());
recruited.initEndpoints();
BlobWorkerInterface recruited(locality, deterministicRandom()->randomUniqueID());
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id(), details, "Restored");
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id(), details, "Restored");
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.blobGranuleFileRequest);
DUMPTOKEN(recruited.assignBlobRangeRequest);
DUMPTOKEN(recruited.revokeBlobRangeRequest);
DUMPTOKEN(recruited.granuleAssignmentsRequest);
DUMPTOKEN(recruited.granuleStatusStreamRequest);
DUMPTOKEN(recruited.haltBlobWorker);
DUMPTOKEN(recruited.minBlobVersionRequest);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.blobGranuleFileRequest);
DUMPTOKEN(recruited.assignBlobRangeRequest);
DUMPTOKEN(recruited.revokeBlobRangeRequest);
DUMPTOKEN(recruited.granuleAssignmentsRequest);
DUMPTOKEN(recruited.granuleStatusStreamRequest);
DUMPTOKEN(recruited.haltBlobWorker);
DUMPTOKEN(recruited.minBlobVersionRequest);
IKeyValueStore* data = openKVStore(s.storeType,
s.filename,
recruited.id(),
memoryLimit,
false,
false,
false,
dbInfo,
EncryptionAtRestMode());
filesClosed.add(data->onClosed());
IKeyValueStore* data = openKVStore(s.storeType,
s.filename,
recruited.id(),
memoryLimit,
false,
false,
false,
dbInfo,
EncryptionAtRestMode());
filesClosed.add(data->onClosed());
Promise<Void> recovery;
Future<Void> bw = blobWorker(recruited, recovery, dbInfo, data);
recoveries.push_back(recovery.getFuture());
bw = handleIOErrors(bw, data, recruited.id());
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
Promise<Void> recovery;
Future<Void> bw = blobWorker(recruited, recovery, dbInfo, data);
recoveries.push_back(recovery.getFuture());
bw = handleIOErrors(bw, data, recruited.id());
blobWorkerFuture = bw;
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
} else {
CODE_PROBE(true, "Multiple blob workers after reboot", probe::decoration::rare);
IKeyValueStore* data = openKVStore(s.storeType,
s.filename,
UID(),
memoryLimit,
false,
false,
false,
dbInfo,
EncryptionAtRestMode());
data->dispose();
}
}
}
@ -2681,7 +2696,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
}
}
when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) {
if (!blobWorkerCache.exists(req.reqId)) {
if (blobWorkerFuture.isReady()) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker;
@ -2699,7 +2714,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.minBlobVersionRequest);
IKeyValueStore* data = nullptr;
if (req.storeType != KeyValueStoreType::END) {
if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED && req.storeType != KeyValueStoreType::END) {
std::string filename =
filenameFromId(req.storeType, folder, fileBlobWorkerPrefix.toString(), recruited.id());
data = openKVStore(req.storeType,
@ -2716,13 +2731,13 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo, data);
if (req.storeType != KeyValueStoreType::END) {
if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED && req.storeType != KeyValueStoreType::END) {
bw = handleIOErrors(bw, data, recruited.id());
}
blobWorkerFuture = bw;
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
} else {
forwardPromise(req.reply, blobWorkerCache.get(req.reqId));
req.reply.sendError(recruitment_failed());
}
}
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {