From 5fa6c687d672857a276a33fddc3e12ab2c00dd0f Mon Sep 17 00:00:00 2001 From: Suraj Gupta Date: Wed, 15 Sep 2021 11:35:58 -0400 Subject: [PATCH] Add blob manager as a singleton. --- fdbclient/Schemas.cpp | 2 + fdbclient/ServerKnobs.cpp | 2 + fdbclient/ServerKnobs.h | 2 + fdbclient/StorageServerInterface.h | 1 - fdbrpc/Locality.cpp | 17 ++ fdbrpc/Locality.h | 5 + fdbrpc/simulator.h | 2 + fdbserver/BlobManager.actor.cpp | 74 ++++--- fdbserver/BlobManagerInterface.h | 47 ++++- fdbserver/ClusterController.actor.cpp | 190 +++++++++++++++++- fdbserver/Ratekeeper.actor.cpp | 10 - fdbserver/ServerDBInfo.actor.h | 3 + fdbserver/Status.actor.cpp | 4 + fdbserver/WorkerInterface.actor.h | 28 ++- fdbserver/worker.actor.cpp | 41 +++- .../workloads/ConsistencyCheck.actor.cpp | 16 ++ 16 files changed, 373 insertions(+), 71 deletions(-) diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 0cecd29250..982aa6eba0 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -94,6 +94,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "cluster_controller", "data_distributor", "ratekeeper", + "blob_manager", "storage_cache", "router", "coordinator" @@ -492,6 +493,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "unreachable_master_worker", "unreachable_dataDistributor_worker", "unreachable_ratekeeper_worker", + "unreachable_blobManager_worker", "unreadable_configuration", "full_replication_timeout", "client_issues", diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c1599fc604..bc43c64cd3 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -461,12 +461,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ATTEMPT_RECRUITMENT_DELAY, 0.035 ); init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 ); init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 ); + init( WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 1.0 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001; init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0; init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND ); init( FORCE_RECOVERY_CHECK_DELAY, 5.0 ); init( RATEKEEPER_FAILURE_TIME, 1.0 ); + init( BLOB_MANAGER_FAILURE_TIME, 1.0 ); init( REPLACE_INTERFACE_DELAY, 60.0 ); init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 ); init( COORDINATOR_REGISTER_INTERVAL, 5.0 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index e509246024..979f9a8e94 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -384,6 +384,7 @@ public: double ATTEMPT_RECRUITMENT_DELAY; double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY; double WAIT_FOR_RATEKEEPER_JOIN_DELAY; + double WAIT_FOR_BLOB_MANAGER_JOIN_DELAY; double WORKER_FAILURE_TIME; double CHECK_OUTSTANDING_INTERVAL; double INCOMPATIBLE_PEERS_LOGGING_INTERVAL; @@ -391,6 +392,7 @@ public: int64_t MAX_VERSION_DIFFERENCE; double FORCE_RECOVERY_CHECK_DELAY; double RATEKEEPER_FAILURE_TIME; + double BLOB_MANAGER_FAILURE_TIME; double REPLACE_INTERFACE_DELAY; double REPLACE_INTERFACE_CHECK_DELAY; double COORDINATOR_REGISTER_INTERVAL; diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 1239f499f0..1c8a8a8004 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -30,7 +30,6 @@ #include "fdbrpc/Stats.h" #include "fdbrpc/TimedRequest.h" #include "fdbrpc/TSSComparison.h" -#include "fdbclient/TagThrottle.actor.h" #include "fdbclient/BlobGranuleCommon.h" #include "fdbclient/CommitTransaction.h" #include "fdbclient/TagThrottle.actor.h" diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index b897322f7b..693ac2cb5c 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -229,6 +229,23 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const default: return ProcessClass::WorstFit; } + case ProcessClass::BlobManager: + switch (_class) { + case ProcessClass::BlobManagerClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::CoordinatorClass: + case ProcessClass::TesterClass: + case ProcessClass::StorageCacheClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; + } case ProcessClass::StorageCache: switch (_class) { case ProcessClass::StorageCacheClass: diff --git a/fdbrpc/Locality.h b/fdbrpc/Locality.h index 4d964eba9c..d1bb4fd10b 100644 --- a/fdbrpc/Locality.h +++ b/fdbrpc/Locality.h @@ -46,6 +46,7 @@ struct ProcessClass { StorageCacheClass, BackupClass, GrvProxyClass, + BlobManagerClass, InvalidClass = -1 }; @@ -69,6 +70,7 @@ struct ProcessClass { ClusterController, DataDistributor, Ratekeeper, + BlobManager, StorageCache, Backup, Worker, // used for actor lineage tracking @@ -104,6 +106,7 @@ public: else if (s=="data_distributor") _class = DataDistributorClass; else if (s=="coordinator") _class = CoordinatorClass; else if (s=="ratekeeper") _class = RatekeeperClass; + else if (s=="blob_manager") _class = BlobManagerClass; else if (s=="storage_cache") _class = StorageCacheClass; else if (s=="backup") _class = BackupClass; else _class = InvalidClass; @@ -131,6 +134,7 @@ public: else if (classStr=="data_distributor") _class = DataDistributorClass; else if (classStr=="coordinator") _class = CoordinatorClass; else if (classStr=="ratekeeper") _class = RatekeeperClass; + else if (classStr=="blob_manager") _class = BlobManagerClass; else if (classStr=="storage_cache") _class = StorageCacheClass; else if (classStr=="backup") _class = BackupClass; else _class = InvalidClass; @@ -168,6 +172,7 @@ public: case DataDistributorClass: return "data_distributor"; case CoordinatorClass: return "coordinator"; case RatekeeperClass: return "ratekeeper"; + case BlobManagerClass: return "blob_manager"; case StorageCacheClass: return "storage_cache"; case BackupClass: return "backup"; default: return "invalid"; diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index ff40737fe1..11f03f5795 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -154,6 +154,8 @@ public: return false; case ProcessClass::RatekeeperClass: return false; + case ProcessClass::BlobManagerClass: + return false; case ProcessClass::StorageCacheClass: return false; case ProcessClass::BackupClass: diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 7d12c7c681..90a8fd9793 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -197,6 +197,7 @@ struct BlobWorkerStats { struct BlobManagerData { UID id; Database db; + PromiseStream> addActor; std::unordered_map workersById; std::unordered_map workerStats; // mapping between workerID -> workerStats @@ -481,35 +482,6 @@ ACTOR Future checkManagerLock(Reference tr, Blo return Void(); } -// TODO eventually CC should probably do this and pass it as part of recruitment? -ACTOR Future acquireManagerLock(BlobManagerData* bmData) { - state Reference tr = makeReference(bmData->db); - - loop { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - try { - Optional oldEpoch = wait(tr->get(blobManagerEpochKey)); - state int64_t newEpoch; - if (oldEpoch.present()) { - newEpoch = decodeBlobManagerEpochValue(oldEpoch.get()) + 1; - } else { - newEpoch = 1; // start at 1 - } - - tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch)); - - wait(tr->commit()); - return newEpoch; - } catch (Error& e) { - if (BM_DEBUG) { - printf("Acquiring blob manager lock got error %s\n", e.name()); - } - wait(tr->onError(e)); - } - } -} - // FIXME: this does all logic in one transaction. Adding a giant range to an existing database to hybridize would spread // require doing a ton of storage metrics calls, which we should split up across multiple transactions likely. ACTOR Future monitorClientRanges(BlobManagerData* bmData) { @@ -853,9 +825,9 @@ ACTOR Future rangeMover(BlobManagerData* bmData) { } } -// TODO MOVE ELSEWHERE -// TODO replace locality with full BlobManagerInterface eventually -ACTOR Future blobManager(LocalityData locality, Reference const> dbInfo) { +ACTOR Future blobManager(BlobManagerInterface bmInterf, + Reference const> dbInfo, + int64_t epoch) { state BlobManagerData self(deterministicRandom()->randomUniqueID(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); @@ -873,15 +845,41 @@ ACTOR Future blobManager(LocalityData locality, Reference tr = makeReference(self.db); + + loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + try { + wait(checkManagerLock(tr, &self)); + wait(tr->commit()); + break; + } catch (Error& e) { + if (e.code() == error_code_granule_assignment_conflict) { + if (BM_DEBUG) { + printf("Blob manager dying...\n"); + } + throw; + } + + // if we get here, most likely error is a read-write conflict + if (BM_DEBUG) { + printf("Blob manager lock check got unexpected error %s\n", e.name()); + } + wait(tr->onError(e)); + } + } + if (BM_DEBUG) { - printf("Blob manager acquired lock at epoch %lld\n", _epoch); + printf("Blob manager acquired lock at epoch %lld\n", epoch); } int numWorkers = 2; for (int i = 0; i < numWorkers; i++) { - state BlobWorkerInterface bwInterf(locality, deterministicRandom()->randomUniqueID()); + state BlobWorkerInterface bwInterf(bmInterf.locality, deterministicRandom()->randomUniqueID()); bwInterf.initEndpoints(); self.workersById.insert({ bwInterf.id(), bwInterf }); self.workerStats.insert({ bwInterf.id(), BlobWorkerStats() }); @@ -918,8 +916,8 @@ ACTOR Future blobManager(LocalityData locality, Reference knownBlobRanges(false, normalKeys.end); Arena ar; diff --git a/fdbserver/BlobManagerInterface.h b/fdbserver/BlobManagerInterface.h index dd22076d03..4f6efcd8c5 100644 --- a/fdbserver/BlobManagerInterface.h +++ b/fdbserver/BlobManagerInterface.h @@ -22,17 +22,44 @@ #define FDBSERVER_BLOBMANAGERINTERFACE_H #pragma once -#include "fdbserver/ServerDBInfo.h" +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/Locality.h" +#include "fdbrpc/fdbrpc.h" -// TODO everything below here should go away once blob manager isn't embedded in another role +struct BlobManagerInterface { + constexpr static FileIdentifier file_identifier = 369169; + RequestStream> waitFailure; + RequestStream haltBlobManager; + struct LocalityData locality; + UID myId; -// TODO add actual interface, remove functionality stuff hack for ratekeeper -void updateClientBlobRanges(KeyRangeMap* knownBlobRanges, - RangeResult dbBlobRanges, - Arena a, - VectorRef* rangesToAdd, - VectorRef* rangesToRemove); + BlobManagerInterface() {} + explicit BlobManagerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {} -Future blobManager(const LocalityData& locality, const Reference const>& dbInfo); + void initEndpoints() {} + UID id() const { return myId; } + NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); } + bool operator==(const BlobManagerInterface& r) const { return id() == r.id(); } + bool operator!=(const BlobManagerInterface& r) const { return !(*this == r); } -#endif \ No newline at end of file + template + void serialize(Archive& ar) { + serializer(ar, waitFailure, haltBlobManager, locality, myId); + } +}; + +struct HaltBlobManagerRequest { + constexpr static FileIdentifier file_identifier = 4149140; + UID requesterID; + ReplyPromise reply; + + HaltBlobManagerRequest() {} + explicit HaltBlobManagerRequest(UID uid) : requesterID(uid) {} + + template + void serialize(Ar& ar) { + serializer(ar, requesterID, reply); + } +}; + +#endif diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d5c8902686..d2d8ac1112 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -38,6 +38,7 @@ #include "fdbserver/LogSystemConfig.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/RatekeeperInterface.h" +#include "fdbserver/BlobManagerInterface.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/Status.h" #include "fdbserver/LatencyBandConfig.h" @@ -69,6 +70,7 @@ struct WorkerInfo : NonCopyable { WorkerDetails details; Future haltRatekeeper; Future haltDistributor; + Future haltBlobManager; Standalone> issues; WorkerInfo() @@ -89,7 +91,8 @@ struct WorkerInfo : NonCopyable { WorkerInfo(WorkerInfo&& r) noexcept : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)), - haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), issues(r.issues) {} + haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), haltBlobManager(r.haltBlobManager), + issues(r.issues) {} void operator=(WorkerInfo&& r) noexcept { watcher = std::move(r.watcher); reply = std::move(r.reply); @@ -100,6 +103,7 @@ struct WorkerInfo : NonCopyable { details = std::move(r.details); haltRatekeeper = r.haltRatekeeper; haltDistributor = r.haltDistributor; + haltBlobManager = r.haltBlobManager; issues = r.issues; } }; @@ -160,6 +164,14 @@ public: serverInfo->set(newInfo); } + void setBlobManager(const BlobManagerInterface& interf) { + auto newInfo = serverInfo->get(); + newInfo.id = deterministicRandom()->randomUniqueID(); + newInfo.infoGeneration = ++dbInfoCount; + newInfo.blobManager = interf; + serverInfo->set(newInfo); + } + void clearInterf(ProcessClass::ClassType t) { auto newInfo = serverInfo->get(); newInfo.id = deterministicRandom()->randomUniqueID(); @@ -168,6 +180,8 @@ public: newInfo.distributor = Optional(); } else if (t == ProcessClass::RatekeeperClass) { newInfo.ratekeeper = Optional(); + } else if (t == ProcessClass::BlobManagerClass) { + newInfo.blobManager = Optional(); } serverInfo->set(newInfo); } @@ -242,7 +256,9 @@ public: return (db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == processId) || (db.serverInfo->get().ratekeeper.present() && - db.serverInfo->get().ratekeeper.get().locality.processId() == processId); + db.serverInfo->get().ratekeeper.get().locality.processId() == processId) || + (db.serverInfo->get().blobManager.present() && + db.serverInfo->get().blobManager.get().locality.processId() == processId); } WorkerDetails getStorageWorker(RecruitStorageRequest const& req) { @@ -2755,7 +2771,8 @@ public: bool onMasterIsBetter(const WorkerDetails& worker, ProcessClass::ClusterRole role) const { ASSERT(masterProcessId.present()); const auto& pid = worker.interf.locality.processId(); - if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) || + if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper && + role != ProcessClass::BlobManager) || pid == masterProcessId.get()) { return false; } @@ -3042,6 +3059,8 @@ public: Optional recruitingDistributorID; AsyncVar recruitRatekeeper; Optional recruitingRatekeeperID; + AsyncVar recruitBlobManager; + Optional recruitingBlobManagerID; // Stores the health information from a particular worker's perspective. struct WorkerHealth { @@ -3077,7 +3096,7 @@ public: clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false), - recruitDistributor(false), recruitRatekeeper(false), + recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), clusterControllerMetrics("ClusterController", id.toString()), openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics), registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics), @@ -3158,6 +3177,27 @@ struct DataDistributorSingleton : Singleton { void recruit(ClusterControllerData* cc) const { cc->recruitDistributor.set(true); } }; +struct BlobManagerSingleton : Singleton { + + BlobManagerSingleton(const Optional& interface) : Singleton(interface) {} + + Role getRole() const { return Role::BLOB_MANAGER; } + ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::BlobManager; } + + void setInterfaceToDbInfo(ClusterControllerData* cc) const { + if (interface.present()) { + cc->db.setBlobManager(interface.get()); + } + } + void halt(ClusterControllerData* cc, Optional> pid) const { + if (interface.present()) { + cc->id_worker[pid].haltBlobManager = + brokenPromiseToNever(interface.get().haltBlobManager.getReply(HaltBlobManagerRequest(cc->id))); + } + } + void recruit(ClusterControllerData* cc) const { cc->recruitBlobManager.set(true); } +}; + ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, ClusterControllerData::DBInfo* db) { state MasterInterface iMaster; @@ -3216,6 +3256,7 @@ ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, ClusterC dbInfo.clusterInterface = db->serverInfo->get().clusterInterface; dbInfo.distributor = db->serverInfo->get().distributor; dbInfo.ratekeeper = db->serverInfo->get().ratekeeper; + dbInfo.blobManager = db->serverInfo->get().blobManager; dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig; TraceEvent("CCWDB", cluster->id) @@ -3481,14 +3522,17 @@ void checkBetterSingletons(ClusterControllerData* self) { // Try to find a new process for each singleton. WorkerDetails newRKWorker = findNewProcessForSingleton(self, ProcessClass::Ratekeeper, id_used); WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used); + WorkerDetails newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used); // Find best possible fitnesses for each singleton. auto bestFitnessForRK = findBestFitnessForSingleton(self, newRKWorker, ProcessClass::Ratekeeper); auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor); + auto bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager); auto& db = self->db.serverInfo->get(); auto rkSingleton = RatekeeperSingleton(db.ratekeeper); auto ddSingleton = DataDistributorSingleton(db.distributor); + auto bmSingleton = BlobManagerSingleton(db.blobManager); // Check if the singletons are healthy. // side effect: try to rerecruit the singletons to more optimal processes @@ -3498,31 +3542,40 @@ void checkBetterSingletons(ClusterControllerData* self) { bool ddHealthy = isHealthySingleton( self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID); + bool bmHealthy = isHealthySingleton( + self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID); + // if any of the singletons are unhealthy (rerecruited or not stable), then do not // consider any further re-recruitments - if (!(rkHealthy && ddHealthy)) { + if (!(rkHealthy && ddHealthy && bmHealthy)) { return; } // if we reach here, we know that the singletons are healthy so let's // check if we can colocate the singletons in a more optimal way + // TODO: verify that we don't need to get the pid from the worker like we were doing before Optional> currRKProcessId = rkSingleton.interface.get().locality.processId(); Optional> currDDProcessId = ddSingleton.interface.get().locality.processId(); + Optional> currBMProcessId = bmSingleton.interface.get().locality.processId(); Optional> newRKProcessId = newRKWorker.interf.locality.processId(); Optional> newDDProcessId = newDDWorker.interf.locality.processId(); + Optional> newBMProcessId = newBMWorker.interf.locality.processId(); - auto currColocMap = getColocCounts({ currRKProcessId, currDDProcessId }); - auto newColocMap = getColocCounts({ newRKProcessId, newDDProcessId }); + auto currColocMap = getColocCounts({ currRKProcessId, currDDProcessId, currBMProcessId }); + auto newColocMap = getColocCounts({ newRKProcessId, newDDProcessId, newBMProcessId }); - // if the new coloc counts are not worse (i.e. each singleton's coloc count has not increased) + // if the new coloc counts are collectively better (i.e. each singleton's coloc count has not increased) if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] && - newColocMap[newDDProcessId] <= currColocMap[currDDProcessId]) { + newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] && + newColocMap[newBMProcessId] <= currColocMap[currBMProcessId]) { // rerecruit the singleton for which we have found a better process, if any if (newColocMap[newRKProcessId] < currColocMap[currRKProcessId]) { rkSingleton.recruit(self); } else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) { ddSingleton.recruit(self); + } else if (newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) { + bmSingleton.recruit(self); } } } @@ -4050,6 +4103,13 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID); } + if (req.blobManagerInterf.present()) { + auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager); + auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf); + haltRegisteringOrCurrentSingleton( + self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID); + } + // Notify the worker to register again with new process class/exclusive property if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) { req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo)); @@ -4861,6 +4921,117 @@ ACTOR Future monitorRatekeeper(ClusterControllerData* self) { } } +// Acquires the BM lock by getting the next epoch no. +ACTOR Future getNextBMEpoch(ClusterControllerData* self) { + state Reference tr = makeReference(self->cx); + + loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + try { + Optional oldEpoch = wait(tr->get(blobManagerEpochKey)); + state int64_t newEpoch = oldEpoch.present() ? decodeBlobManagerEpochValue(oldEpoch.get()) + 1 : 1; + tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch)); + + wait(tr->commit()); + return newEpoch; + } catch (Error& e) { + printf("Acquiring blob manager lock got error %s\n", e.name()); + wait(tr->onError(e)); + } + } +} + +ACTOR Future startBlobManager(ClusterControllerData* self) { + wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. + + TraceEvent("CCStartBlobManager", self->id).log(); + loop { + try { + state bool noBlobManager = !self->db.serverInfo->get().blobManager.present(); + while (!self->masterProcessId.present() || + self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || + self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); + } + if (noBlobManager && self->db.serverInfo->get().blobManager.present()) { + // Existing blob manager registers while waiting, so skip. + return Void(); + } + + state std::map>, int> id_used = self->getUsedIds(); + state WorkerFitnessInfo bmWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, + ProcessClass::BlobManager, + ProcessClass::NeverAssign, + self->db.config, + id_used); + + int64_t nextEpoch = wait(getNextBMEpoch(self)); + InitializeBlobManagerRequest req(deterministicRandom()->randomUniqueID(), nextEpoch); + state WorkerDetails worker = bmWorker.worker; + if (self->onMasterIsBetter(worker, ProcessClass::BlobManager)) { + worker = self->id_worker[self->masterProcessId.get()].details; + } + + self->recruitingBlobManagerID = req.reqId; + TraceEvent("CCRecruitBlobManager", self->id) + .detail("Addr", worker.interf.address()) + .detail("BMID", req.reqId); + + ErrorOr interf = wait(worker.interf.blobManager.getReplyUnlessFailedFor( + req, SERVER_KNOBS->WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 0)); + if (interf.present()) { + self->recruitBlobManager.set(false); + self->recruitingBlobManagerID = interf.get().id(); + const auto& blobManager = self->db.serverInfo->get().blobManager; + TraceEvent("CCBlobManagerRecruited", self->id) + .detail("Addr", worker.interf.address()) + .detail("BMID", interf.get().id()); + if (blobManager.present() && blobManager.get().id() != interf.get().id() && + self->id_worker.count(blobManager.get().locality.processId())) { + TraceEvent("CCHaltBlobManagerAfterRecruit", self->id) + .detail("BMID", blobManager.get().id()) + .detail("DcID", printable(self->clusterControllerDcId)); + BlobManagerSingleton(blobManager).halt(self, blobManager.get().locality.processId()); + } + if (!blobManager.present() || blobManager.get().id() != interf.get().id()) { + self->db.setBlobManager(interf.get()); + } + checkOutstandingRequests(self); + return Void(); + } + } catch (Error& e) { + TraceEvent("CCBlobManagerRecruitError", self->id).error(e); + if (e.code() != error_code_no_more_servers) { + throw; + } + } + wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY)); + } +} + +ACTOR Future monitorBlobManager(ClusterControllerData* self) { + while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + wait(self->db.serverInfo->onChange()); + } + + loop { + if (self->db.serverInfo->get().blobManager.present() && !self->recruitBlobManager.get()) { + choose { + when(wait(waitFailureClient(self->db.serverInfo->get().blobManager.get().waitFailure, + SERVER_KNOBS->BLOB_MANAGER_FAILURE_TIME))) { + TraceEvent("CCBlobManagerDied", self->id) + .detail("BMID", self->db.serverInfo->get().blobManager.get().id()); + self->db.clearInterf(ProcessClass::BlobManagerClass); + } + when(wait(self->recruitBlobManager.onChange())) {} + } + } else { + wait(startBlobManager(self)); + } + } +} + ACTOR Future dbInfoUpdater(ClusterControllerData* self) { state Future dbInfoChange = self->db.serverInfo->onChange(); state Future updateDBInfo = self->updateDBInfo.onTrigger(); @@ -4989,6 +5160,7 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(handleForcedRecoveries(&self, interf)); self.addActor.send(monitorDataDistributor(&self)); self.addActor.send(monitorRatekeeper(&self)); + self.addActor.send(monitorBlobManager(&self)); // self.addActor.send(monitorTSSMapping(&self)); self.addActor.send(dbInfoUpdater(&self)); self.addActor.send(traceCounters("ClusterControllerMetrics", diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 63529882e9..cea5c74d4c 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -25,8 +25,6 @@ #include "fdbrpc/simulator.h" #include "fdbclient/DatabaseContext.h" #include "fdbclient/ReadYourWrites.h" -#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE -#include "fdbserver/BlobManagerInterface.h" // TODO REMOVE #include "fdbclient/TagThrottle.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" @@ -1440,14 +1438,6 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceTAG_MEASUREMENT_INTERVAL)); - // TODO MOVE eventually - if (!g_network->isSimulated() && SERVER_KNOBS->BG_URL == "") { - printf("Not starting blob manager poc, no url configured\n"); - } else { - printf("Starting blob manager with url=%s\n", SERVER_KNOBS->BG_URL.c_str()); - self.addActor.send(blobManager(rkInterf.locality, dbInfo)); - } - TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) .detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) diff --git a/fdbserver/ServerDBInfo.actor.h b/fdbserver/ServerDBInfo.actor.h index f0d5d8c53c..e134b9ba82 100644 --- a/fdbserver/ServerDBInfo.actor.h +++ b/fdbserver/ServerDBInfo.actor.h @@ -30,6 +30,7 @@ #include "fdbserver/MasterInterface.h" #include "fdbserver/LogSystemConfig.h" #include "fdbserver/RatekeeperInterface.h" +#include "fdbserver/BlobManagerInterface.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/LatencyBandConfig.h" #include "fdbserver/WorkerInterface.actor.h" @@ -47,6 +48,7 @@ struct ServerDBInfo { Optional distributor; // The best guess of current data distributor. MasterInterface master; // The best guess as to the most recent master, which might still be recovering Optional ratekeeper; + Optional blobManager; std::vector resolvers; DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; @@ -78,6 +80,7 @@ struct ServerDBInfo { distributor, master, ratekeeper, + blobManager, resolvers, recoveryCount, recoveryState, diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 7bedbfc757..b560a044f9 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -795,6 +795,10 @@ ACTOR static Future processStatusFetcher( roles.addRole("ratekeeper", db->get().ratekeeper.get()); } + if (db->get().blobManager.present()) { + roles.addRole("blob_manager", db->get().blobManager.get()); + } + for (auto& tLogSet : db->get().logSystemConfig.tLogs) { for (auto& it : tLogSet.logRouters) { if (it.present()) { diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 5c4607b83f..393862ea86 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -30,6 +30,7 @@ #include "fdbserver/MasterInterface.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/RatekeeperInterface.h" +#include "fdbserver/BlobManagerInterface.h" #include "fdbserver/ResolverInterface.h" #include "fdbclient/ClientBooleanParams.h" #include "fdbclient/StorageServerInterface.h" @@ -52,6 +53,7 @@ struct WorkerInterface { RequestStream grvProxy; RequestStream dataDistributor; RequestStream ratekeeper; + RequestStream blobManager; RequestStream resolver; RequestStream storage; RequestStream logRouter; @@ -105,6 +107,7 @@ struct WorkerInterface { grvProxy, dataDistributor, ratekeeper, + blobManager, resolver, storage, logRouter, @@ -375,6 +378,7 @@ struct RegisterWorkerRequest { Generation generation; Optional distributorInterf; Optional ratekeeperInterf; + Optional blobManagerInterf; Standalone> issues; std::vector incompatiblePeers; ReplyPromise reply; @@ -391,12 +395,13 @@ struct RegisterWorkerRequest { Generation generation, Optional ddInterf, Optional rkInterf, + Optional bmInterf, bool degraded, Version lastSeenKnobVersion, ConfigClassSet knobConfigClassSet) : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), - generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), degraded(degraded), - lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {} + generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), + degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {} template void serialize(Ar& ar) { @@ -408,6 +413,7 @@ struct RegisterWorkerRequest { generation, distributorInterf, ratekeeperInterf, + blobManagerInterf, issues, incompatiblePeers, reply, @@ -613,6 +619,20 @@ struct InitializeRatekeeperRequest { } }; +struct InitializeBlobManagerRequest { + constexpr static FileIdentifier file_identifier = 2567474; + UID reqId; + int64_t epoch; + ReplyPromise reply; + + InitializeBlobManagerRequest() {} + explicit InitializeBlobManagerRequest(UID uid, int64_t epoch) : reqId(uid), epoch(epoch) {} + template + void serialize(Ar& ar) { + serializer(ar, reqId, epoch, reply); + } +}; + struct InitializeResolverRequest { constexpr static FileIdentifier file_identifier = 7413317; uint64_t recoveryCount; @@ -813,6 +833,7 @@ struct Role { static const Role LOG_ROUTER; static const Role DATA_DISTRIBUTOR; static const Role RATEKEEPER; + static const Role BLOB_MANAGER; static const Role STORAGE_CACHE; static const Role COORDINATOR; static const Role BACKUP; @@ -843,6 +864,8 @@ struct Role { return DATA_DISTRIBUTOR; case ProcessClass::Ratekeeper: return RATEKEEPER; + case ProcessClass::BlobManager: + return BLOB_MANAGER; case ProcessClass::StorageCache: return STORAGE_CACHE; case ProcessClass::Backup: @@ -958,6 +981,7 @@ ACTOR Future logRouter(TLogInterface interf, Reference const> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); ACTOR Future ratekeeper(RatekeeperInterface rki, Reference const> db); +ACTOR Future blobManager(BlobManagerInterface bmi, Reference const> db, int64_t epoch); ACTOR Future storageCacheServer(StorageServerInterface interf, uint16_t id, Reference const> db); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index b175d4f3d0..0e6f2239b1 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -42,6 +42,7 @@ #include "fdbserver/IDiskQueue.h" #include "fdbclient/DatabaseContext.h" #include "fdbserver/DataDistributorInterface.h" +#include "fdbserver/BlobManagerInterface.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/CoordinationInterface.h" @@ -517,6 +518,7 @@ ACTOR Future registrationClient(Reference> const> ddInterf, Reference> const> rkInterf, + Reference> const> bmInterf, Reference const> degraded, Reference connFile, Reference> const> issues, @@ -539,6 +541,7 @@ ACTOR Future registrationClient(Referenceget(), rkInterf->get(), + bmInterf->get(), degraded->get(), localConfig->lastSeenVersion(), localConfig->configClassSet()); @@ -596,6 +599,7 @@ ACTOR Future registrationClient(ReferenceonChange())) { break; } when(wait(ddInterf->onChange())) { break; } when(wait(rkInterf->onChange())) { break; } + when(wait(bmInterf->onChange())) { break; } when(wait(degraded->onChange())) { break; } when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; } when(wait(issues->onChange())) { break; } @@ -619,6 +623,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference workerServer(Reference connFile, state Reference>> ddInterf( new AsyncVar>()); state Reference>> rkInterf(new AsyncVar>()); + state Reference>> bmInterf(new AsyncVar>()); state Future handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last state ActorCollection errorForwarders(false); state Future loggingTrigger = Void(); @@ -1493,6 +1502,7 @@ ACTOR Future workerServer(Reference connFile, initialClass, ddInterf, rkInterf, + bmInterf, degraded, connFile, issues, @@ -1531,7 +1541,9 @@ ACTOR Future workerServer(Reference connFile, .detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID()) .detail("DataDistributorID", - localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()); + localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()) + .detail("BlobManagerID", + localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID()); dbInfo->set(localInfo); } errorForwarders.add( @@ -1665,6 +1677,32 @@ ACTOR Future workerServer(Reference connFile, TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id()); req.reply.send(recruited); } + when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) { + BlobManagerInterface recruited(locality, req.reqId); + recruited.initEndpoints(); + + if (bmInterf->get().present()) { + recruited = bmInterf->get().get(); + TEST(true); // Recruited while already a blob manager. + // What if this one is the new one that caused the old one to error out, because it up'ed the epoch. + // But now the new one can't register because bmInterf is already set. + // Should be okay based on logic of haltRegisteringorPreviousSingleton bc references to bmInterf are + // passed around. + } else { + startRole(Role::BLOB_MANAGER, recruited.id(), interf.id()); + DUMPTOKEN(recruited.waitFailure); + + Future blobManagerProcess = blobManager(recruited, dbInfo, req.epoch); + errorForwarders.add(forwardError( + errors, + Role::BLOB_MANAGER, + recruited.id(), + setWhenDoneOrError(blobManagerProcess, bmInterf, Optional()))); + bmInterf->set(Optional(recruited)); + } + TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id()); + req.reply.send(recruited); + } when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) { if (!backupWorkerCache.exists(req.reqId)) { LocalLineage _; @@ -2486,6 +2524,7 @@ const Role Role::TESTER("Tester", "TS"); const Role Role::LOG_ROUTER("LogRouter", "LR"); const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD"); const Role Role::RATEKEEPER("Ratekeeper", "RK"); +const Role Role::BLOB_MANAGER("BlobManager", "BM"); const Role Role::STORAGE_CACHE("StorageCache", "SC"); const Role Role::COORDINATOR("Coordinator", "CD"); const Role Role::BACKUP("Backup", "BK"); diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index c79cde3200..b769797337 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -2257,6 +2257,22 @@ struct ConsistencyCheckWorkload : TestWorkload { return false; } + // Check BlobManager + if (db.blobManager.present() && + (!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) || + nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness( + ProcessClass::BlobManager) > fitnessLowerBound)) { + TraceEvent("ConsistencyCheck_BlobManagerNotBest") + .detail("BestBlobManagerFitness", fitnessLowerBound) + .detail( + "ExistingBlobManagerFitness", + nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) + ? nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness( + ProcessClass::BlobManager) + : -1); + return false; + } + // TODO: Check Tlog return true;