diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 4522024f3c..0d09978daf 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1040,8 +1040,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_WORKER_DO_REJECT_WHEN_FULL, true ); if ( randomize && BUGGIFY ) BLOB_WORKER_DO_REJECT_WHEN_FULL = false; init( BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD, 0.9 ); init( BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY, 30.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY = deterministicRandom()->randomInt(0, 10) - 1; - init( BLOB_WORKER_STORE_TYPE, 3 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 3; - init( BLOB_WORKER_REJOIN_TIME, 10.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 10.0; + init( BLOB_WORKER_DISK_ENABLED, false ); if ( randomize && BUGGIFY ) BLOB_WORKER_DISK_ENABLED = true; + init( BLOB_WORKER_STORE_TYPE, 3 ); + init( BLOB_WORKER_REJOIN_TIME, 10.0 ); init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 ); init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 36e277b6bc..07a58e962f 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1016,6 +1016,7 @@ public: bool BLOB_WORKER_DO_REJECT_WHEN_FULL; double BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD; double BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY; + bool BLOB_WORKER_DISK_ENABLED; int BLOB_WORKER_STORE_TYPE; double BLOB_WORKER_REJOIN_TIME; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index ca65e0a7ae..7f4d9288e4 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2760,6 +2760,8 @@ ACTOR Future killBlobWorker(Reference bmData, BlobWorkerI successor = Optional(); } + CODE_PROBE(successor.present(), "Blob worker has affinity after failure"); + for (auto& it : rangesToMove) { // Send revoke request RangeAssignment raRevoke; @@ -3802,6 +3804,7 @@ ACTOR Future recoverBlobManager(Reference bmData) { } while (bmData->workerAffinities.count(workerId)) { workerId = bmData->workerAffinities[workerId]; + CODE_PROBE(true, "Blob worker has affinity after reboot"); } // prevent racing status updates from old owner from causing issues until this request gets sent out // properly @@ -3938,8 +3941,7 @@ ACTOR Future initializeBlobWorker(Reference self, Recruit state InitializeBlobWorkerRequest initReq; initReq.reqId = deterministicRandom()->randomUniqueID(); initReq.interfaceId = interfaceId; - initReq.storeType = (KeyValueStoreType::StoreType)( - SERVER_KNOBS->BLOB_WORKER_STORE_TYPE < 0 ? KeyValueStoreType::END : SERVER_KNOBS->BLOB_WORKER_STORE_TYPE); + initReq.storeType = (KeyValueStoreType::StoreType)(SERVER_KNOBS->BLOB_WORKER_STORE_TYPE); // acknowledge that this worker is currently being recruited on self->recruitingLocalities.insert(candidateWorker.worker.stableAddress()); diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 378394fadb..363e720f98 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -5580,9 +5580,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, // Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption // state using the DB Config rather than passing it through the initalization request for the blob manager and // blob worker - DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); - self->encryptMode = config.encryptionAtRestMode; - TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString()); + state Future configFuture = getDatabaseConfiguration(cx); if (self->storage) { wait(self->storage->init()); @@ -5626,6 +5624,10 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, rep.interf = bwInterf; recruitReply.send(rep); + DatabaseConfiguration config = wait(configFuture); + self->encryptMode = config.encryptionAtRestMode; + TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString()); + TraceEvent("BlobWorkerInit", self->id).log(); wait(blobWorkerCore(bwInterf, self)); return Void(); @@ -5642,7 +5644,7 @@ ACTOR Future restorePersistentState(Reference self) { wait(waitForAll(std::vector{ fID })); - if (!fID.get().present()) { + if (!SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED || !fID.get().present()) { CODE_PROBE(true, "Restored uninitialized blob worker"); throw worker_removed(); } @@ -5665,12 +5667,14 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, wait(self->storage->commit()); state UID previous = wait(restorePersistentState(self)); + if (recovered.canBeSet()) { + recovered.send(Void()); + } + // Since the blob worker gets initalized through the blob manager it is more reliable to fetch the encryption // state using the DB Config rather than passing it through the initalization request for the blob manager and // blob worker - DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); - self->encryptMode = config.encryptionAtRestMode; - TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString()); + state Future configFuture = getDatabaseConfiguration(cx); if (BW_DEBUG) { printf("Initializing blob worker s3 stuff\n"); @@ -5695,13 +5699,16 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, throw e; } - if (recovered.canBeSet()) { - recovered.send(Void()); - } - + // Only update the ID on disk after registering with the database so that if this process is rebooted after + // registration and before the ID is updated on disk the next generation will consider its ID from two + // generations ago its successor. self->storage->set(KeyValueRef(persistID, BinaryWriter::toValue(self->id, Unversioned()))); wait(self->storage->commit()); + DatabaseConfiguration config = wait(configFuture); + self->encryptMode = config.encryptionAtRestMode; + TraceEvent("BWEncryptionAtRestMode", self->id).detail("Mode", self->encryptMode.toString()); + TraceEvent("BlobWorkerInit", self->id).log(); wait(blobWorkerCore(bwInterf, self)); return Void(); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index d238ab4873..d947445554 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1790,7 +1790,7 @@ ACTOR Future workerServer(Reference connRecord, state std::map> sharedLogs; state Reference> activeSharedTLog(new AsyncVar()); state WorkerCache backupWorkerCache; - state WorkerCache blobWorkerCache; + state Future blobWorkerFuture = Void(); state WorkerSnapRequest lastSnapReq; // Here the key is UID+role, as we still send duplicate requests to a process which is both storage and tlog @@ -2031,41 +2031,56 @@ ACTOR Future workerServer(Reference connRecord, logData.back().uid = s.storeID; errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl)); } else if (s.storedComponent == DiskStore::BlobWorker) { - LocalLineage _; - getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker; + if (blobWorkerFuture.isReady()) { + LocalLineage _; + getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker; - BlobWorkerInterface recruited(locality, deterministicRandom()->randomUniqueID()); - recruited.initEndpoints(); + BlobWorkerInterface recruited(locality, deterministicRandom()->randomUniqueID()); + recruited.initEndpoints(); - std::map details; - details["StorageEngine"] = s.storeType.toString(); - startRole(Role::BLOB_WORKER, recruited.id(), interf.id(), details, "Restored"); + std::map details; + details["StorageEngine"] = s.storeType.toString(); + startRole(Role::BLOB_WORKER, recruited.id(), interf.id(), details, "Restored"); - DUMPTOKEN(recruited.waitFailure); - DUMPTOKEN(recruited.blobGranuleFileRequest); - DUMPTOKEN(recruited.assignBlobRangeRequest); - DUMPTOKEN(recruited.revokeBlobRangeRequest); - DUMPTOKEN(recruited.granuleAssignmentsRequest); - DUMPTOKEN(recruited.granuleStatusStreamRequest); - DUMPTOKEN(recruited.haltBlobWorker); - DUMPTOKEN(recruited.minBlobVersionRequest); + DUMPTOKEN(recruited.waitFailure); + DUMPTOKEN(recruited.blobGranuleFileRequest); + DUMPTOKEN(recruited.assignBlobRangeRequest); + DUMPTOKEN(recruited.revokeBlobRangeRequest); + DUMPTOKEN(recruited.granuleAssignmentsRequest); + DUMPTOKEN(recruited.granuleStatusStreamRequest); + DUMPTOKEN(recruited.haltBlobWorker); + DUMPTOKEN(recruited.minBlobVersionRequest); - IKeyValueStore* data = openKVStore(s.storeType, - s.filename, - recruited.id(), - memoryLimit, - false, - false, - false, - dbInfo, - EncryptionAtRestMode()); - filesClosed.add(data->onClosed()); + IKeyValueStore* data = openKVStore(s.storeType, + s.filename, + recruited.id(), + memoryLimit, + false, + false, + false, + dbInfo, + EncryptionAtRestMode()); + filesClosed.add(data->onClosed()); - Promise recovery; - Future bw = blobWorker(recruited, recovery, dbInfo, data); - recoveries.push_back(recovery.getFuture()); - bw = handleIOErrors(bw, data, recruited.id()); - errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw)); + Promise recovery; + Future bw = blobWorker(recruited, recovery, dbInfo, data); + recoveries.push_back(recovery.getFuture()); + bw = handleIOErrors(bw, data, recruited.id()); + blobWorkerFuture = bw; + errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw)); + } else { + CODE_PROBE(true, "Multiple blob workers after reboot", probe::decoration::rare); + IKeyValueStore* data = openKVStore(s.storeType, + s.filename, + UID(), + memoryLimit, + false, + false, + false, + dbInfo, + EncryptionAtRestMode()); + data->dispose(); + } } } @@ -2681,7 +2696,7 @@ ACTOR Future workerServer(Reference connRecord, } } when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) { - if (!blobWorkerCache.exists(req.reqId)) { + if (blobWorkerFuture.isReady()) { LocalLineage _; getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobWorker; @@ -2699,7 +2714,7 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.minBlobVersionRequest); IKeyValueStore* data = nullptr; - if (req.storeType != KeyValueStoreType::END) { + if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED && req.storeType != KeyValueStoreType::END) { std::string filename = filenameFromId(req.storeType, folder, fileBlobWorkerPrefix.toString(), recruited.id()); data = openKVStore(req.storeType, @@ -2716,13 +2731,13 @@ ACTOR Future workerServer(Reference connRecord, ReplyPromise blobWorkerReady = req.reply; Future bw = blobWorker(recruited, blobWorkerReady, dbInfo, data); - if (req.storeType != KeyValueStoreType::END) { + if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED && req.storeType != KeyValueStoreType::END) { bw = handleIOErrors(bw, data, recruited.id()); } + blobWorkerFuture = bw; errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw)); - } else { - forwardPromise(req.reply, blobWorkerCache.get(req.reqId)); + req.reply.sendError(recruitment_failed()); } } when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {