Add blob manager as a singleton.

This commit is contained in:
Suraj Gupta 2021-09-15 11:35:58 -04:00
parent 95c004f80b
commit 5fa6c687d6
16 changed files with 373 additions and 71 deletions

View File

@ -94,6 +94,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"cluster_controller",
"data_distributor",
"ratekeeper",
"blob_manager",
"storage_cache",
"router",
"coordinator"
@ -492,6 +493,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"unreachable_master_worker",
"unreachable_dataDistributor_worker",
"unreachable_ratekeeper_worker",
"unreachable_blobManager_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",

View File

@ -461,12 +461,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 );
init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 );
init( WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 1.0 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
init( FORCE_RECOVERY_CHECK_DELAY, 5.0 );
init( RATEKEEPER_FAILURE_TIME, 1.0 );
init( BLOB_MANAGER_FAILURE_TIME, 1.0 );
init( REPLACE_INTERFACE_DELAY, 60.0 );
init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 );
init( COORDINATOR_REGISTER_INTERVAL, 5.0 );

View File

@ -384,6 +384,7 @@ public:
double ATTEMPT_RECRUITMENT_DELAY;
double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY;
double WAIT_FOR_RATEKEEPER_JOIN_DELAY;
double WAIT_FOR_BLOB_MANAGER_JOIN_DELAY;
double WORKER_FAILURE_TIME;
double CHECK_OUTSTANDING_INTERVAL;
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
@ -391,6 +392,7 @@ public:
int64_t MAX_VERSION_DIFFERENCE;
double FORCE_RECOVERY_CHECK_DELAY;
double RATEKEEPER_FAILURE_TIME;
double BLOB_MANAGER_FAILURE_TIME;
double REPLACE_INTERFACE_DELAY;
double REPLACE_INTERFACE_CHECK_DELAY;
double COORDINATOR_REGISTER_INTERVAL;

View File

@ -30,7 +30,6 @@
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbrpc/TSSComparison.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"

View File

@ -229,6 +229,23 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default:
return ProcessClass::WorstFit;
}
case ProcessClass::BlobManager:
switch (_class) {
case ProcessClass::BlobManagerClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass:
case ProcessClass::TesterClass:
case ProcessClass::StorageCacheClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::StorageCache:
switch (_class) {
case ProcessClass::StorageCacheClass:

View File

@ -46,6 +46,7 @@ struct ProcessClass {
StorageCacheClass,
BackupClass,
GrvProxyClass,
BlobManagerClass,
InvalidClass = -1
};
@ -69,6 +70,7 @@ struct ProcessClass {
ClusterController,
DataDistributor,
Ratekeeper,
BlobManager,
StorageCache,
Backup,
Worker, // used for actor lineage tracking
@ -104,6 +106,7 @@ public:
else if (s=="data_distributor") _class = DataDistributorClass;
else if (s=="coordinator") _class = CoordinatorClass;
else if (s=="ratekeeper") _class = RatekeeperClass;
else if (s=="blob_manager") _class = BlobManagerClass;
else if (s=="storage_cache") _class = StorageCacheClass;
else if (s=="backup") _class = BackupClass;
else _class = InvalidClass;
@ -131,6 +134,7 @@ public:
else if (classStr=="data_distributor") _class = DataDistributorClass;
else if (classStr=="coordinator") _class = CoordinatorClass;
else if (classStr=="ratekeeper") _class = RatekeeperClass;
else if (classStr=="blob_manager") _class = BlobManagerClass;
else if (classStr=="storage_cache") _class = StorageCacheClass;
else if (classStr=="backup") _class = BackupClass;
else _class = InvalidClass;
@ -168,6 +172,7 @@ public:
case DataDistributorClass: return "data_distributor";
case CoordinatorClass: return "coordinator";
case RatekeeperClass: return "ratekeeper";
case BlobManagerClass: return "blob_manager";
case StorageCacheClass: return "storage_cache";
case BackupClass: return "backup";
default: return "invalid";

View File

@ -154,6 +154,8 @@ public:
return false;
case ProcessClass::RatekeeperClass:
return false;
case ProcessClass::BlobManagerClass:
return false;
case ProcessClass::StorageCacheClass:
return false;
case ProcessClass::BackupClass:

View File

@ -197,6 +197,7 @@ struct BlobWorkerStats {
struct BlobManagerData {
UID id;
Database db;
PromiseStream<Future<Void>> addActor;
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, BlobWorkerStats> workerStats; // mapping between workerID -> workerStats
@ -481,35 +482,6 @@ ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, Blo
return Void();
}
// TODO eventually CC should probably do this and pass it as part of recruitment?
ACTOR Future<int64_t> acquireManagerLock(BlobManagerData* bmData) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Optional<Value> oldEpoch = wait(tr->get(blobManagerEpochKey));
state int64_t newEpoch;
if (oldEpoch.present()) {
newEpoch = decodeBlobManagerEpochValue(oldEpoch.get()) + 1;
} else {
newEpoch = 1; // start at 1
}
tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch));
wait(tr->commit());
return newEpoch;
} catch (Error& e) {
if (BM_DEBUG) {
printf("Acquiring blob manager lock got error %s\n", e.name());
}
wait(tr->onError(e));
}
}
}
// FIXME: this does all logic in one transaction. Adding a giant range to an existing database to hybridize would spread
// require doing a ton of storage metrics calls, which we should split up across multiple transactions likely.
ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
@ -853,9 +825,9 @@ ACTOR Future<Void> rangeMover(BlobManagerData* bmData) {
}
}
// TODO MOVE ELSEWHERE
// TODO replace locality with full BlobManagerInterface eventually
ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) {
state BlobManagerData self(deterministicRandom()->randomUniqueID(),
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
@ -873,15 +845,41 @@ ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerD
printf("Blob manager taking lock\n");
}
int64_t _epoch = wait(acquireManagerLock(&self));
self.epoch = _epoch;
self.epoch = epoch;
// make sure the epoch hasn't gotten stale
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self.db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
wait(checkManagerLock(tr, &self));
wait(tr->commit());
break;
} catch (Error& e) {
if (e.code() == error_code_granule_assignment_conflict) {
if (BM_DEBUG) {
printf("Blob manager dying...\n");
}
throw;
}
// if we get here, most likely error is a read-write conflict
if (BM_DEBUG) {
printf("Blob manager lock check got unexpected error %s\n", e.name());
}
wait(tr->onError(e));
}
}
if (BM_DEBUG) {
printf("Blob manager acquired lock at epoch %lld\n", _epoch);
printf("Blob manager acquired lock at epoch %lld\n", epoch);
}
int numWorkers = 2;
for (int i = 0; i < numWorkers; i++) {
state BlobWorkerInterface bwInterf(locality, deterministicRandom()->randomUniqueID());
state BlobWorkerInterface bwInterf(bmInterf.locality, deterministicRandom()->randomUniqueID());
bwInterf.initEndpoints();
self.workersById.insert({ bwInterf.id(), bwInterf });
self.workerStats.insert({ bwInterf.id(), BlobWorkerStats() });
@ -918,8 +916,8 @@ ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerD
// DB has [B - D). It should show up coalesced in knownBlobRanges, and [C - D) should be removed.
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) should be
// in removed.
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) should
// be in removed.
TEST_CASE("/blobmanager/updateranges") {
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
Arena ar;

View File

@ -22,17 +22,44 @@
#define FDBSERVER_BLOBMANAGERINTERFACE_H
#pragma once
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/fdbrpc.h"
// TODO everything below here should go away once blob manager isn't embedded in another role
struct BlobManagerInterface {
constexpr static FileIdentifier file_identifier = 369169;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltBlobManagerRequest> haltBlobManager;
struct LocalityData locality;
UID myId;
// TODO add actual interface, remove functionality stuff hack for ratekeeper
void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
RangeResult dbBlobRanges,
Arena a,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove);
BlobManagerInterface() {}
explicit BlobManagerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
Future<Void> blobManager(const LocalityData& locality, const Reference<AsyncVar<ServerDBInfo> const>& dbInfo);
void initEndpoints() {}
UID id() const { return myId; }
NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); }
bool operator==(const BlobManagerInterface& r) const { return id() == r.id(); }
bool operator!=(const BlobManagerInterface& r) const { return !(*this == r); }
#endif
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltBlobManager, locality, myId);
}
};
struct HaltBlobManagerRequest {
constexpr static FileIdentifier file_identifier = 4149140;
UID requesterID;
ReplyPromise<Void> reply;
HaltBlobManagerRequest() {}
explicit HaltBlobManagerRequest(UID uid) : requesterID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, reply);
}
};
#endif

View File

@ -38,6 +38,7 @@
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/Status.h"
#include "fdbserver/LatencyBandConfig.h"
@ -69,6 +70,7 @@ struct WorkerInfo : NonCopyable {
WorkerDetails details;
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
Future<Void> haltBlobManager;
Standalone<VectorRef<StringRef>> issues;
WorkerInfo()
@ -89,7 +91,8 @@ struct WorkerInfo : NonCopyable {
WorkerInfo(WorkerInfo&& r) noexcept
: watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots),
initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)),
haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), issues(r.issues) {}
haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), haltBlobManager(r.haltBlobManager),
issues(r.issues) {}
void operator=(WorkerInfo&& r) noexcept {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -100,6 +103,7 @@ struct WorkerInfo : NonCopyable {
details = std::move(r.details);
haltRatekeeper = r.haltRatekeeper;
haltDistributor = r.haltDistributor;
haltBlobManager = r.haltBlobManager;
issues = r.issues;
}
};
@ -160,6 +164,14 @@ public:
serverInfo->set(newInfo);
}
void setBlobManager(const BlobManagerInterface& interf) {
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.blobManager = interf;
serverInfo->set(newInfo);
}
void clearInterf(ProcessClass::ClassType t) {
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
@ -168,6 +180,8 @@ public:
newInfo.distributor = Optional<DataDistributorInterface>();
} else if (t == ProcessClass::RatekeeperClass) {
newInfo.ratekeeper = Optional<RatekeeperInterface>();
} else if (t == ProcessClass::BlobManagerClass) {
newInfo.blobManager = Optional<BlobManagerInterface>();
}
serverInfo->set(newInfo);
}
@ -242,7 +256,9 @@ public:
return (db.serverInfo->get().distributor.present() &&
db.serverInfo->get().distributor.get().locality.processId() == processId) ||
(db.serverInfo->get().ratekeeper.present() &&
db.serverInfo->get().ratekeeper.get().locality.processId() == processId);
db.serverInfo->get().ratekeeper.get().locality.processId() == processId) ||
(db.serverInfo->get().blobManager.present() &&
db.serverInfo->get().blobManager.get().locality.processId() == processId);
}
WorkerDetails getStorageWorker(RecruitStorageRequest const& req) {
@ -2755,7 +2771,8 @@ public:
bool onMasterIsBetter(const WorkerDetails& worker, ProcessClass::ClusterRole role) const {
ASSERT(masterProcessId.present());
const auto& pid = worker.interf.locality.processId();
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) ||
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper &&
role != ProcessClass::BlobManager) ||
pid == masterProcessId.get()) {
return false;
}
@ -3042,6 +3059,8 @@ public:
Optional<UID> recruitingDistributorID;
AsyncVar<bool> recruitRatekeeper;
Optional<UID> recruitingRatekeeperID;
AsyncVar<bool> recruitBlobManager;
Optional<UID> recruitingBlobManagerID;
// Stores the health information from a particular worker's perspective.
struct WorkerHealth {
@ -3077,7 +3096,7 @@ public:
clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()),
outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()),
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false),
recruitDistributor(false), recruitRatekeeper(false),
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
@ -3158,6 +3177,27 @@ struct DataDistributorSingleton : Singleton<DataDistributorInterface> {
void recruit(ClusterControllerData* cc) const { cc->recruitDistributor.set(true); }
};
struct BlobManagerSingleton : Singleton<BlobManagerInterface> {
BlobManagerSingleton(const Optional<BlobManagerInterface>& interface) : Singleton(interface) {}
Role getRole() const { return Role::BLOB_MANAGER; }
ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::BlobManager; }
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
cc->db.setBlobManager(interface.get());
}
}
void halt(ClusterControllerData* cc, Optional<Standalone<StringRef>> pid) const {
if (interface.present()) {
cc->id_worker[pid].haltBlobManager =
brokenPromiseToNever(interface.get().haltBlobManager.getReply(HaltBlobManagerRequest(cc->id)));
}
}
void recruit(ClusterControllerData* cc) const { cc->recruitBlobManager.set(true); }
};
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterControllerData::DBInfo* db) {
state MasterInterface iMaster;
@ -3216,6 +3256,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterC
dbInfo.clusterInterface = db->serverInfo->get().clusterInterface;
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.blobManager = db->serverInfo->get().blobManager;
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
TraceEvent("CCWDB", cluster->id)
@ -3481,14 +3522,17 @@ void checkBetterSingletons(ClusterControllerData* self) {
// Try to find a new process for each singleton.
WorkerDetails newRKWorker = findNewProcessForSingleton(self, ProcessClass::Ratekeeper, id_used);
WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used);
WorkerDetails newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
// Find best possible fitnesses for each singleton.
auto bestFitnessForRK = findBestFitnessForSingleton(self, newRKWorker, ProcessClass::Ratekeeper);
auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor);
auto bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
auto& db = self->db.serverInfo->get();
auto rkSingleton = RatekeeperSingleton(db.ratekeeper);
auto ddSingleton = DataDistributorSingleton(db.distributor);
auto bmSingleton = BlobManagerSingleton(db.blobManager);
// Check if the singletons are healthy.
// side effect: try to rerecruit the singletons to more optimal processes
@ -3498,31 +3542,40 @@ void checkBetterSingletons(ClusterControllerData* self) {
bool ddHealthy = isHealthySingleton<DataDistributorInterface>(
self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID);
bool bmHealthy = isHealthySingleton<BlobManagerInterface>(
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
// if any of the singletons are unhealthy (rerecruited or not stable), then do not
// consider any further re-recruitments
if (!(rkHealthy && ddHealthy)) {
if (!(rkHealthy && ddHealthy && bmHealthy)) {
return;
}
// if we reach here, we know that the singletons are healthy so let's
// check if we can colocate the singletons in a more optimal way
// TODO: verify that we don't need to get the pid from the worker like we were doing before
Optional<Standalone<StringRef>> currRKProcessId = rkSingleton.interface.get().locality.processId();
Optional<Standalone<StringRef>> currDDProcessId = ddSingleton.interface.get().locality.processId();
Optional<Standalone<StringRef>> currBMProcessId = bmSingleton.interface.get().locality.processId();
Optional<Standalone<StringRef>> newRKProcessId = newRKWorker.interf.locality.processId();
Optional<Standalone<StringRef>> newDDProcessId = newDDWorker.interf.locality.processId();
Optional<Standalone<StringRef>> newBMProcessId = newBMWorker.interf.locality.processId();
auto currColocMap = getColocCounts({ currRKProcessId, currDDProcessId });
auto newColocMap = getColocCounts({ newRKProcessId, newDDProcessId });
auto currColocMap = getColocCounts({ currRKProcessId, currDDProcessId, currBMProcessId });
auto newColocMap = getColocCounts({ newRKProcessId, newDDProcessId, newBMProcessId });
// if the new coloc counts are not worse (i.e. each singleton's coloc count has not increased)
// if the new coloc counts are collectively better (i.e. each singleton's coloc count has not increased)
if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] &&
newColocMap[newDDProcessId] <= currColocMap[currDDProcessId]) {
newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] &&
newColocMap[newBMProcessId] <= currColocMap[currBMProcessId]) {
// rerecruit the singleton for which we have found a better process, if any
if (newColocMap[newRKProcessId] < currColocMap[currRKProcessId]) {
rkSingleton.recruit(self);
} else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) {
ddSingleton.recruit(self);
} else if (newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self);
}
}
}
@ -4050,6 +4103,13 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID);
}
if (req.blobManagerInterf.present()) {
auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager);
auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf);
haltRegisteringOrCurrentSingleton<BlobManagerInterface>(
self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID);
}
// Notify the worker to register again with new process class/exclusive property
if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) {
req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo));
@ -4861,6 +4921,117 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData* self) {
}
}
// Acquires the BM lock by getting the next epoch no.
ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Optional<Value> oldEpoch = wait(tr->get(blobManagerEpochKey));
state int64_t newEpoch = oldEpoch.present() ? decodeBlobManagerEpochValue(oldEpoch.get()) + 1 : 1;
tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch));
wait(tr->commit());
return newEpoch;
} catch (Error& e) {
printf("Acquiring blob manager lock got error %s\n", e.name());
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> startBlobManager(ClusterControllerData* self) {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID.
TraceEvent("CCStartBlobManager", self->id).log();
loop {
try {
state bool noBlobManager = !self->db.serverInfo->get().blobManager.present();
while (!self->masterProcessId.present() ||
self->masterProcessId != self->db.serverInfo->get().master.locality.processId() ||
self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (noBlobManager && self->db.serverInfo->get().blobManager.present()) {
// Existing blob manager registers while waiting, so skip.
return Void();
}
state std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo bmWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId,
ProcessClass::BlobManager,
ProcessClass::NeverAssign,
self->db.config,
id_used);
int64_t nextEpoch = wait(getNextBMEpoch(self));
InitializeBlobManagerRequest req(deterministicRandom()->randomUniqueID(), nextEpoch);
state WorkerDetails worker = bmWorker.worker;
if (self->onMasterIsBetter(worker, ProcessClass::BlobManager)) {
worker = self->id_worker[self->masterProcessId.get()].details;
}
self->recruitingBlobManagerID = req.reqId;
TraceEvent("CCRecruitBlobManager", self->id)
.detail("Addr", worker.interf.address())
.detail("BMID", req.reqId);
ErrorOr<BlobManagerInterface> interf = wait(worker.interf.blobManager.getReplyUnlessFailedFor(
req, SERVER_KNOBS->WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 0));
if (interf.present()) {
self->recruitBlobManager.set(false);
self->recruitingBlobManagerID = interf.get().id();
const auto& blobManager = self->db.serverInfo->get().blobManager;
TraceEvent("CCBlobManagerRecruited", self->id)
.detail("Addr", worker.interf.address())
.detail("BMID", interf.get().id());
if (blobManager.present() && blobManager.get().id() != interf.get().id() &&
self->id_worker.count(blobManager.get().locality.processId())) {
TraceEvent("CCHaltBlobManagerAfterRecruit", self->id)
.detail("BMID", blobManager.get().id())
.detail("DcID", printable(self->clusterControllerDcId));
BlobManagerSingleton(blobManager).halt(self, blobManager.get().locality.processId());
}
if (!blobManager.present() || blobManager.get().id() != interf.get().id()) {
self->db.setBlobManager(interf.get());
}
checkOutstandingRequests(self);
return Void();
}
} catch (Error& e) {
TraceEvent("CCBlobManagerRecruitError", self->id).error(e);
if (e.code() != error_code_no_more_servers) {
throw;
}
}
wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY));
}
}
ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if (self->db.serverInfo->get().blobManager.present() && !self->recruitBlobManager.get()) {
choose {
when(wait(waitFailureClient(self->db.serverInfo->get().blobManager.get().waitFailure,
SERVER_KNOBS->BLOB_MANAGER_FAILURE_TIME))) {
TraceEvent("CCBlobManagerDied", self->id)
.detail("BMID", self->db.serverInfo->get().blobManager.get().id());
self->db.clearInterf(ProcessClass::BlobManagerClass);
}
when(wait(self->recruitBlobManager.onChange())) {}
}
} else {
wait(startBlobManager(self));
}
}
}
ACTOR Future<Void> dbInfoUpdater(ClusterControllerData* self) {
state Future<Void> dbInfoChange = self->db.serverInfo->onChange();
state Future<Void> updateDBInfo = self->updateDBInfo.onTrigger();
@ -4989,6 +5160,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorBlobManager(&self));
// self.addActor.send(monitorTSSMapping(&self));
self.addActor.send(dbInfoUpdater(&self));
self.addActor.send(traceCounters("ClusterControllerMetrics",

View File

@ -25,8 +25,6 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE
#include "fdbserver/BlobManagerInterface.h" // TODO REMOVE
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
@ -1440,14 +1438,6 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
self.addActor.send(
recurring([selfPtr]() { refreshStorageServerCommitCost(selfPtr); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
// TODO MOVE eventually
if (!g_network->isSimulated() && SERVER_KNOBS->BG_URL == "") {
printf("Not starting blob manager poc, no url configured\n");
} else {
printf("Starting blob manager with url=%s\n", SERVER_KNOBS->BG_URL.c_str());
self.addActor.send(blobManager(rkInterf.locality, dbInfo));
}
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)
.detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)

View File

@ -30,6 +30,7 @@
#include "fdbserver/MasterInterface.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -47,6 +48,7 @@ struct ServerDBInfo {
Optional<DataDistributorInterface> distributor; // The best guess of current data distributor.
MasterInterface master; // The best guess as to the most recent master, which might still be recovering
Optional<RatekeeperInterface> ratekeeper;
Optional<BlobManagerInterface> blobManager;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount
recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice;
@ -78,6 +80,7 @@ struct ServerDBInfo {
distributor,
master,
ratekeeper,
blobManager,
resolvers,
recoveryCount,
recoveryState,

View File

@ -795,6 +795,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("ratekeeper", db->get().ratekeeper.get());
}
if (db->get().blobManager.present()) {
roles.addRole("blob_manager", db->get().blobManager.get());
}
for (auto& tLogSet : db->get().logSystemConfig.tLogs) {
for (auto& it : tLogSet.logRouters) {
if (it.present()) {

View File

@ -30,6 +30,7 @@
#include "fdbserver/MasterInterface.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/StorageServerInterface.h"
@ -52,6 +53,7 @@ struct WorkerInterface {
RequestStream<struct InitializeGrvProxyRequest> grvProxy;
RequestStream<struct InitializeDataDistributorRequest> dataDistributor;
RequestStream<struct InitializeRatekeeperRequest> ratekeeper;
RequestStream<struct InitializeBlobManagerRequest> blobManager;
RequestStream<struct InitializeResolverRequest> resolver;
RequestStream<struct InitializeStorageRequest> storage;
RequestStream<struct InitializeLogRouterRequest> logRouter;
@ -105,6 +107,7 @@ struct WorkerInterface {
grvProxy,
dataDistributor,
ratekeeper,
blobManager,
resolver,
storage,
logRouter,
@ -375,6 +378,7 @@ struct RegisterWorkerRequest {
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<BlobManagerInterface> blobManagerInterf;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply;
@ -391,12 +395,13 @@ struct RegisterWorkerRequest {
Generation generation,
Optional<DataDistributorInterface> ddInterf,
Optional<RatekeeperInterface> rkInterf,
Optional<BlobManagerInterface> bmInterf,
bool degraded,
Version lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), degraded(degraded),
lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {}
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet) {}
template <class Ar>
void serialize(Ar& ar) {
@ -408,6 +413,7 @@ struct RegisterWorkerRequest {
generation,
distributorInterf,
ratekeeperInterf,
blobManagerInterf,
issues,
incompatiblePeers,
reply,
@ -613,6 +619,20 @@ struct InitializeRatekeeperRequest {
}
};
struct InitializeBlobManagerRequest {
constexpr static FileIdentifier file_identifier = 2567474;
UID reqId;
int64_t epoch;
ReplyPromise<BlobManagerInterface> reply;
InitializeBlobManagerRequest() {}
explicit InitializeBlobManagerRequest(UID uid, int64_t epoch) : reqId(uid), epoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, epoch, reply);
}
};
struct InitializeResolverRequest {
constexpr static FileIdentifier file_identifier = 7413317;
uint64_t recoveryCount;
@ -813,6 +833,7 @@ struct Role {
static const Role LOG_ROUTER;
static const Role DATA_DISTRIBUTOR;
static const Role RATEKEEPER;
static const Role BLOB_MANAGER;
static const Role STORAGE_CACHE;
static const Role COORDINATOR;
static const Role BACKUP;
@ -843,6 +864,8 @@ struct Role {
return DATA_DISTRIBUTOR;
case ProcessClass::Ratekeeper:
return RATEKEEPER;
case ProcessClass::BlobManager:
return BLOB_MANAGER;
case ProcessClass::StorageCache:
return STORAGE_CACHE;
case ProcessClass::Backup:
@ -958,6 +981,7 @@ ACTOR Future<Void> logRouter(TLogInterface interf,
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> blobManager(BlobManagerInterface bmi, Reference<AsyncVar<ServerDBInfo> const> db, int64_t epoch);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db);

View File

@ -42,6 +42,7 @@
#include "fdbserver/IDiskQueue.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/CoordinationInterface.h"
@ -517,6 +518,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<Optional<BlobManagerInterface>> const> bmInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>> const> issues,
@ -539,6 +541,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
requestGeneration++,
ddInterf->get(),
rkInterf->get(),
bmInterf->get(),
degraded->get(),
localConfig->lastSeenVersion(),
localConfig->configClassSet());
@ -596,6 +599,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
when(wait(ccInterface->onChange())) { break; }
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(bmInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
@ -619,6 +623,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
return true;
}
if (dbi.blobManager.present() && dbi.blobManager.get().address() == address) {
return true;
}
for (const auto& resolver : dbi.resolvers) {
if (resolver.address() == address) {
return true;
@ -1232,6 +1240,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Reference<AsyncVar<Optional<BlobManagerInterface>>> bmInterf(new AsyncVar<Optional<BlobManagerInterface>>());
state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
@ -1493,6 +1502,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
initialClass,
ddInterf,
rkInterf,
bmInterf,
degraded,
connFile,
issues,
@ -1531,7 +1541,9 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
.detail("RatekeeperID",
localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID",
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID())
.detail("BlobManagerID",
localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID());
dbInfo->set(localInfo);
}
errorForwarders.add(
@ -1665,6 +1677,32 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) {
BlobManagerInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (bmInterf->get().present()) {
recruited = bmInterf->get().get();
TEST(true); // Recruited while already a blob manager.
// What if this one is the new one that caused the old one to error out, because it up'ed the epoch.
// But now the new one can't register because bmInterf is already set.
// Should be okay based on logic of haltRegisteringorPreviousSingleton bc references to bmInterf are
// passed around.
} else {
startRole(Role::BLOB_MANAGER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> blobManagerProcess = blobManager(recruited, dbInfo, req.epoch);
errorForwarders.add(forwardError(
errors,
Role::BLOB_MANAGER,
recruited.id(),
setWhenDoneOrError(blobManagerProcess, bmInterf, Optional<BlobManagerInterface>())));
bmInterf->set(Optional<BlobManagerInterface>(recruited));
}
TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
if (!backupWorkerCache.exists(req.reqId)) {
LocalLineage _;
@ -2486,6 +2524,7 @@ const Role Role::TESTER("Tester", "TS");
const Role Role::LOG_ROUTER("LogRouter", "LR");
const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD");
const Role Role::RATEKEEPER("Ratekeeper", "RK");
const Role Role::BLOB_MANAGER("BlobManager", "BM");
const Role Role::STORAGE_CACHE("StorageCache", "SC");
const Role Role::COORDINATOR("Coordinator", "CD");
const Role Role::BACKUP("Backup", "BK");

View File

@ -2257,6 +2257,22 @@ struct ConsistencyCheckWorkload : TestWorkload {
return false;
}
// Check BlobManager
if (db.blobManager.present() &&
(!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) ||
nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness(
ProcessClass::BlobManager) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_BlobManagerNotBest")
.detail("BestBlobManagerFitness", fitnessLowerBound)
.detail(
"ExistingBlobManagerFitness",
nonExcludedWorkerProcessMap.count(db.blobManager.get().address())
? nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness(
ProcessClass::BlobManager)
: -1);
return false;
}
// TODO: Check Tlog
return true;