Merge pull request #18 from sfc-gh-sgupta/gate-blob-related-work
Add a knob to gate blob-related work.
This commit is contained in:
commit
abdf5d434c
|
@ -251,6 +251,9 @@ void ClientKnobs::initialize(Randomize randomize) {
|
||||||
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
|
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
|
||||||
init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 );
|
init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 );
|
||||||
|
|
||||||
|
// blob granules
|
||||||
|
init( ENABLE_BLOB_GRANULES, true );
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -242,6 +242,9 @@ public:
|
||||||
double BUSYNESS_SPIKE_START_THRESHOLD;
|
double BUSYNESS_SPIKE_START_THRESHOLD;
|
||||||
double BUSYNESS_SPIKE_SATURATED_THRESHOLD;
|
double BUSYNESS_SPIKE_SATURATED_THRESHOLD;
|
||||||
|
|
||||||
|
// blob granules
|
||||||
|
bool ENABLE_BLOB_GRANULES;
|
||||||
|
|
||||||
ClientKnobs(Randomize randomize);
|
ClientKnobs(Randomize randomize);
|
||||||
void initialize(Randomize randomize);
|
void initialize(Randomize randomize);
|
||||||
};
|
};
|
||||||
|
|
|
@ -7096,6 +7096,9 @@ ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> DatabaseContext::getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range) {
|
Future<Void> DatabaseContext::getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range) {
|
||||||
|
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
throw client_invalid_operation();
|
||||||
|
}
|
||||||
return getBlobGranuleRangesStreamActor(Reference<DatabaseContext>::addRef(this), results, range);
|
return getBlobGranuleRangesStreamActor(Reference<DatabaseContext>::addRef(this), results, range);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7296,6 +7299,9 @@ Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standal
|
||||||
KeyRange range,
|
KeyRange range,
|
||||||
Version begin,
|
Version begin,
|
||||||
Version end) {
|
Version end) {
|
||||||
|
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
throw client_invalid_operation();
|
||||||
|
}
|
||||||
return readBlobGranulesStreamActor(Reference<DatabaseContext>::addRef(this), results, range, begin, end);
|
return readBlobGranulesStreamActor(Reference<DatabaseContext>::addRef(this), results, range, begin, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5241,7 +5241,9 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
||||||
self.addActor.send(handleForcedRecoveries(&self, interf));
|
self.addActor.send(handleForcedRecoveries(&self, interf));
|
||||||
self.addActor.send(monitorDataDistributor(&self));
|
self.addActor.send(monitorDataDistributor(&self));
|
||||||
self.addActor.send(monitorRatekeeper(&self));
|
self.addActor.send(monitorRatekeeper(&self));
|
||||||
|
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
self.addActor.send(monitorBlobManager(&self));
|
self.addActor.send(monitorBlobManager(&self));
|
||||||
|
}
|
||||||
// self.addActor.send(monitorTSSMapping(&self));
|
// self.addActor.send(monitorTSSMapping(&self));
|
||||||
self.addActor.send(dbInfoUpdater(&self));
|
self.addActor.send(dbInfoUpdater(&self));
|
||||||
self.addActor.send(traceCounters("ClusterControllerMetrics",
|
self.addActor.send(traceCounters("ClusterControllerMetrics",
|
||||||
|
|
|
@ -2002,7 +2002,10 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
|
||||||
// TODO: caching disabled for this merge
|
// TODO: caching disabled for this merge
|
||||||
// FIXME: we hardcode some machines to specifically test storage cache and blob workers
|
// FIXME: we hardcode some machines to specifically test storage cache and blob workers
|
||||||
int storageCacheMachines = dc == 0 ? 1 : 0;
|
int storageCacheMachines = dc == 0 ? 1 : 0;
|
||||||
int blobWorkerMachines = 2 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1);
|
int blobWorkerMachines = 0;
|
||||||
|
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
blobWorkerMachines = 2 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1);
|
||||||
|
}
|
||||||
|
|
||||||
int totalMachines = machines + storageCacheMachines + blobWorkerMachines;
|
int totalMachines = machines + storageCacheMachines + blobWorkerMachines;
|
||||||
int useSeedForMachine = deterministicRandom()->randomInt(0, totalMachines);
|
int useSeedForMachine = deterministicRandom()->randomInt(0, totalMachines);
|
||||||
|
|
|
@ -123,6 +123,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
||||||
|
|
||||||
std::string description() const override { return "BlobGranuleVerifier"; }
|
std::string description() const override { return "BlobGranuleVerifier"; }
|
||||||
Future<Void> setup(Database const& cx) override {
|
Future<Void> setup(Database const& cx) override {
|
||||||
|
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
if (doSetup) {
|
if (doSetup) {
|
||||||
double initialDelay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay;
|
double initialDelay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay;
|
||||||
if (BGV_DEBUG) {
|
if (BGV_DEBUG) {
|
||||||
|
@ -380,6 +384,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> start(Database const& cx) override {
|
Future<Void> start(Database const& cx) override {
|
||||||
|
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
clients.reserve(threads + 1);
|
clients.reserve(threads + 1);
|
||||||
clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
|
clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
|
@ -445,7 +453,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
||||||
return self->mismatches == 0 && checks > 0;
|
return self->mismatches == 0 && checks > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<bool> check(Database const& cx) override { return _check(cx, this); }
|
Future<bool> check(Database const& cx) override {
|
||||||
|
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return _check(cx, this);
|
||||||
|
}
|
||||||
void getMetrics(vector<PerfMetric>& m) override {}
|
void getMetrics(vector<PerfMetric>& m) override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -296,9 +296,11 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
||||||
wait(::success(self->checkForExtraDataStores(cx, self)));
|
wait(::success(self->checkForExtraDataStores(cx, self)));
|
||||||
|
|
||||||
// Check blob workers are operating as expected
|
// Check blob workers are operating as expected
|
||||||
|
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||||
bool blobWorkersCorrect = wait(self->checkBlobWorkers(cx, configuration, self));
|
bool blobWorkersCorrect = wait(self->checkBlobWorkers(cx, configuration, self));
|
||||||
if (!blobWorkersCorrect)
|
if (!blobWorkersCorrect)
|
||||||
self->testFailure("Blob workers incorrect");
|
self->testFailure("Blob workers incorrect");
|
||||||
|
}
|
||||||
|
|
||||||
// Check that each machine is operating as its desired class
|
// Check that each machine is operating as its desired class
|
||||||
bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
|
bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
|
||||||
|
@ -2335,7 +2337,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check BlobManager
|
// Check BlobManager
|
||||||
if (db.blobManager.present() &&
|
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && db.blobManager.present() &&
|
||||||
(!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) ||
|
(!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) ||
|
||||||
nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness(
|
nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness(
|
||||||
ProcessClass::BlobManager) > fitnessLowerBound)) {
|
ProcessClass::BlobManager) > fitnessLowerBound)) {
|
||||||
|
|
Loading…
Reference in New Issue