Don't rely on database config to be updated.

This commit is contained in:
Suraj Gupta 2021-12-09 16:22:11 -05:00 committed by Josh Slocum
parent 6765d3dff7
commit 968a4f9f50
9 changed files with 53 additions and 33 deletions

View File

@ -141,6 +141,7 @@ public:
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus; std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
Future<Void> clientCounter; Future<Void> clientCounter;
int clientCount; int clientCount;
AsyncVar<bool> blobGranulesEnabled;
DBInfo() DBInfo()
: clientInfo(new AsyncVar<ClientDBInfo>()), serverInfo(new AsyncVar<ServerDBInfo>()), : clientInfo(new AsyncVar<ClientDBInfo>()), serverInfo(new AsyncVar<ServerDBInfo>()),
@ -151,7 +152,8 @@ public:
EnableLocalityLoadBalance::True, EnableLocalityLoadBalance::True,
TaskPriority::DefaultEndpoint, TaskPriority::DefaultEndpoint,
LockAware::True)), // SOMEDAY: Locality! LockAware::True)), // SOMEDAY: Locality!
unfinishedRecoveries(0), logGenerations(0), cachePopulated(false), clientCount(0) { unfinishedRecoveries(0), logGenerations(0), cachePopulated(false), clientCount(0),
blobGranulesEnabled(config.blobGranulesEnabled) {
clientCounter = countClients(this); clientCounter = countClients(this);
} }
@ -3727,7 +3729,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used); WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used);
WorkerDetails newBMWorker; WorkerDetails newBMWorker;
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used); newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
} }
@ -3736,7 +3738,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor); auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor);
ProcessClass::Fitness bestFitnessForBM; ProcessClass::Fitness bestFitnessForBM;
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager); bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
} }
@ -3754,7 +3756,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID); self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID);
bool bmHealthy = true; bool bmHealthy = true;
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
bmHealthy = isHealthySingleton<BlobManagerInterface>( bmHealthy = isHealthySingleton<BlobManagerInterface>(
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID); self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
} }
@ -3773,7 +3775,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
Optional<Standalone<StringRef>> newDDProcessId = newDDWorker.interf.locality.processId(); Optional<Standalone<StringRef>> newDDProcessId = newDDWorker.interf.locality.processId();
Optional<Standalone<StringRef>> currBMProcessId, newBMProcessId; Optional<Standalone<StringRef>> currBMProcessId, newBMProcessId;
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
currBMProcessId = bmSingleton.interface.get().locality.processId(); currBMProcessId = bmSingleton.interface.get().locality.processId();
newBMProcessId = newBMWorker.interf.locality.processId(); newBMProcessId = newBMWorker.interf.locality.processId();
@ -3781,7 +3783,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
std::vector<Optional<Standalone<StringRef>>> currPids = { currRKProcessId, currDDProcessId }; std::vector<Optional<Standalone<StringRef>>> currPids = { currRKProcessId, currDDProcessId };
std::vector<Optional<Standalone<StringRef>>> newPids = { newRKProcessId, newDDProcessId }; std::vector<Optional<Standalone<StringRef>>> newPids = { newRKProcessId, newDDProcessId };
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
currPids.emplace_back(currBMProcessId); currPids.emplace_back(currBMProcessId);
newPids.emplace_back(newBMProcessId); newPids.emplace_back(newBMProcessId);
} }
@ -3790,7 +3792,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto newColocMap = getColocCounts(newPids); auto newColocMap = getColocCounts(newPids);
// if the knob is disabled, the BM coloc counts should have no affect on the coloc counts check below // if the knob is disabled, the BM coloc counts should have no affect on the coloc counts check below
if (!self->db.config.blobGranulesEnabled) { if (!self->db.blobGranulesEnabled.get()) {
ASSERT(currColocMap[currBMProcessId] == 0); ASSERT(currColocMap[currBMProcessId] == 0);
ASSERT(newColocMap[newBMProcessId] == 0); ASSERT(newColocMap[newBMProcessId] == 0);
} }
@ -3804,7 +3806,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
rkSingleton.recruit(self); rkSingleton.recruit(self);
} else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) { } else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) {
ddSingleton.recruit(self); ddSingleton.recruit(self);
} else if (self->db.config.blobGranulesEnabled && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) { } else if (self->db.blobGranulesEnabled.get() && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self); bmSingleton.recruit(self);
} }
} }
@ -3826,7 +3828,7 @@ ACTOR Future<Void> doCheckOutstandingRequests(ClusterControllerData* self) {
checkOutstandingRecruitmentRequests(self); checkOutstandingRecruitmentRequests(self);
checkOutstandingStorageRequests(self); checkOutstandingStorageRequests(self);
if (self->db.config.blobGranulesEnabled) { if (self->db.blobGranulesEnabled.get()) {
checkOutstandingBlobWorkerRequests(self); checkOutstandingBlobWorkerRequests(self);
} }
checkBetterSingletons(self); checkBetterSingletons(self);
@ -4376,7 +4378,7 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID); self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID);
} }
if (self->db.config.blobGranulesEnabled && req.blobManagerInterf.present()) { if (self->db.blobGranulesEnabled.get() && req.blobManagerInterf.present()) {
auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager); auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager);
auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf); auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf);
haltRegisteringOrCurrentSingleton<BlobManagerInterface>( haltRegisteringOrCurrentSingleton<BlobManagerInterface>(
@ -5358,16 +5360,24 @@ ACTOR Future<Void> startBlobManager(ClusterControllerData* self) {
ACTOR Future<Void> watchBlobGranulesConfigKey(ClusterControllerData* self) { ACTOR Future<Void> watchBlobGranulesConfigKey(ClusterControllerData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx); state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
state Key blobGranuleConfigKey = configKeysPrefix.withSuffix(StringRef("blob_granules_enabled"));
loop { loop {
try { try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Key blobGranuleConfigKey = configKeysPrefix.withSuffix(StringRef("blob_granules_enabled"));
state Future<Void> watch = tr->watch(blobGranuleConfigKey); state Future<Void> watch = tr->watch(blobGranuleConfigKey);
wait(tr->commit()); wait(tr->commit());
wait(watch); wait(watch);
return Void();
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> blobConfig = wait(tr->get(blobGranuleConfigKey));
if (blobConfig.present()) {
self->db.blobGranulesEnabled.set(blobConfig.get() == LiteralStringRef("1"));
}
} catch (Error& e) { } catch (Error& e) {
wait(tr->onError(e)); wait(tr->onError(e));
} }
@ -5392,19 +5402,19 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
when(wait(self->recruitBlobManager.onChange())) {} when(wait(self->recruitBlobManager.onChange())) {}
when(wait(watchConfigChange)) { when(wait(watchConfigChange)) {
// if there is a blob manager present but blob granules are now disabled, stop the BM // if there is a blob manager present but blob granules are now disabled, stop the BM
if (!self->db.config.blobGranulesEnabled) { if (!self->db.blobGranulesEnabled.get()) {
const auto& blobManager = self->db.serverInfo->get().blobManager; const auto& blobManager = self->db.serverInfo->get().blobManager;
BlobManagerSingleton(blobManager) BlobManagerSingleton(blobManager)
.haltBlobGranules(self, blobManager.get().locality.processId()); .haltBlobGranules(self, blobManager.get().locality.processId());
} }
} }
} }
} else if (self->db.config.blobGranulesEnabled) { } else if (self->db.blobGranulesEnabled.get()) {
// if there is no blob manager present but blob granules are now enabled, recruit a BM // if there is no blob manager present but blob granules are now enabled, recruit a BM
wait(startBlobManager(self)); wait(startBlobManager(self));
} else { } else {
// if there is no blob manager present and blob granules are disabled, wait for a config change // if there is no blob manager present and blob granules are disabled, wait for a config change
wait(watchConfigChange); wait(self->db.blobGranulesEnabled.onChange());
} }
} }
} }
@ -5557,6 +5567,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(monitorDataDistributor(&self)); self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self)); self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorBlobManager(&self)); self.addActor.send(monitorBlobManager(&self));
self.addActor.send(watchBlobGranulesConfigKey(&self));
// self.addActor.send(monitorTSSMapping(&self)); // self.addActor.send(monitorTSSMapping(&self));
self.addActor.send(dbInfoUpdater(&self)); self.addActor.send(dbInfoUpdater(&self));
self.addActor.send(traceCounters("ClusterControllerMetrics", self.addActor.send(traceCounters("ClusterControllerMetrics",

View File

@ -263,6 +263,9 @@ class TestConfig {
configDBType = configDBTypeFromString(value); configDBType = configDBTypeFromString(value);
} }
} }
if (attrib == "blobGranulesEnabled") {
blobGranulesEnabled = strcmp(value.c_str(), "true") == 0;
}
} }
ifs.close(); ifs.close();
@ -297,6 +300,7 @@ public:
Optional<bool> generateFearless, buggify; Optional<bool> generateFearless, buggify;
Optional<int> datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType, Optional<int> datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType,
stderrSeverity, machineCount, processesPerMachine, coordinators; stderrSeverity, machineCount, processesPerMachine, coordinators;
bool blobGranulesEnabled = false;
Optional<std::string> config; Optional<std::string> config;
ConfigDBType getConfigDBType() const { return configDBType; } ConfigDBType getConfigDBType() const { return configDBType; }
@ -348,7 +352,8 @@ public:
.add("processesPerMachine", &processesPerMachine) .add("processesPerMachine", &processesPerMachine)
.add("coordinators", &coordinators) .add("coordinators", &coordinators)
.add("configDB", &configDBType) .add("configDB", &configDBType)
.add("extraMachineCountDC", &extraMachineCountDC); .add("extraMachineCountDC", &extraMachineCountDC)
.add("blobGranulesEnabled", &blobGranulesEnabled);
try { try {
auto file = toml::parse(testFile); auto file = toml::parse(testFile);
if (file.contains("configuration") && toml::find(file, "configuration").is_table()) { if (file.contains("configuration") && toml::find(file, "configuration").is_table()) {
@ -2021,7 +2026,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
// TODO: caching disabled for this merge // TODO: caching disabled for this merge
int storageCacheMachines = dc == 0 ? 1 : 0; int storageCacheMachines = dc == 0 ? 1 : 0;
int blobWorkerMachines = 0; int blobWorkerMachines = 0;
if (simconfig.db.blobGranulesEnabled) { if (testConfig.blobGranulesEnabled) {
int blobWorkerProcesses = 1 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1); int blobWorkerProcesses = 1 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1);
blobWorkerMachines = std::max(1, blobWorkerProcesses / processesPerMachine); blobWorkerMachines = std::max(1, blobWorkerProcesses / processesPerMachine);
} }

View File

@ -137,11 +137,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
return Void(); return Void();
} }
DatabaseConfiguration _config = wait(getDatabaseConfiguration(cx)); wait(success(ManagementAPI::changeConfig(cx.getReference(), "blob_granules_enabled=1", true)));
self->config = _config;
if (!self->config.blobGranulesEnabled) {
return Void();
}
double initialDelay = deterministicRandom()->random01() * (self->maxDelay - self->minDelay) + self->minDelay; double initialDelay = deterministicRandom()->random01() * (self->maxDelay - self->minDelay) + self->minDelay;
if (BGV_DEBUG) { if (BGV_DEBUG) {
@ -387,10 +383,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
} }
Future<Void> start(Database const& cx) override { Future<Void> start(Database const& cx) override {
if (!config.blobGranulesEnabled) {
return Void();
}
clients.reserve(threads + 1); clients.reserve(threads + 1);
clients.push_back(timeout(findGranules(cx, this), testDuration, Void())); clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
@ -466,13 +458,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
return availabilityPassed && self->mismatches == 0 && checks > 0 && self->timeTravelTooOld == 0; return availabilityPassed && self->mismatches == 0 && checks > 0 && self->timeTravelTooOld == 0;
} }
Future<bool> check(Database const& cx) override { Future<bool> check(Database const& cx) override { return _check(cx, this); }
if (!config.blobGranulesEnabled) {
return true;
}
return _check(cx, this);
}
void getMetrics(std::vector<PerfMetric>& m) override {} void getMetrics(std::vector<PerfMetric>& m) override {}
}; };

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectnessTest' testTitle = 'BlobGranuleCorrectnessTest'

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectnessCleanTest' testTitle = 'BlobGranuleCorrectnessCleanTest'

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCycle' testTitle = 'BlobGranuleCycle'

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCycleClean' testTitle = 'BlobGranuleCycleClean'

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectnessLargeTest' testTitle = 'BlobGranuleCorrectnessLargeTest'

View File

@ -1,3 +1,6 @@
[configuration]
blobGranulesEnabled = true
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectnessLargeCleanTest' testTitle = 'BlobGranuleCorrectnessLargeCleanTest'