Making PerpetualWiggleStorageMigration pass reliably

This commit is contained in:
Zhe Wu 2023-08-08 20:58:53 -07:00
parent f65e3a35f2
commit cdca09dc93
4 changed files with 63 additions and 32 deletions

View File

@ -349,6 +349,7 @@ public:
std::set<AddressExclusion> excludedAddresses(req.excludeAddresses.begin(), req.excludeAddresses.end());
for (auto& it : id_worker) {
/*
TraceEvent("ZZZZZRecruitStorageTry")
.detail("Worker", it.second.details.interf.address())
.detail("WorkerAvailable", workerAvailable(it.second, false))
@ -364,7 +365,8 @@ public:
it.second.details.processClass.machineClassFitness(ProcessClass::Storage) <=
ProcessClass::UnsetFit)
.detail("MachineFitness", it.second.details.processClass.machineClassFitness(ProcessClass::Storage));
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
*/
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&

View File

@ -50,20 +50,23 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
PerpetualWiggleStorageMigrationWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
/*
ACTOR static Future<Void> _setup(PerpetualWiggleStorageMigrationWorkload* self, Database cx) {
wait(success(setHealthyZone(cx, ignoreSSFailuresZoneString, 0)));
bool success = wait(IssueConfigurationChange(cx, "storage_migration_type=disabled", true));
ASSERT(success);
wait(delay(30.0)); // make sure the DD has already quit before the test start
return Void();
// wait(success(setHealthyZone(cx, ignoreSSFailuresZoneString, 0)));
// bool success = wait(IssueConfigurationChange(cx, "storage_migration_type=disabled", true));
// ASSERT(success);
// wait(delay(30.0)); // make sure the DD has already quit before the test start
return Void();
}
Future<Void> setup(Database const& cx) override {
if (clientId == 0) {
return _setup(this, cx); // force to disable DD
}
return Void();
}
if (clientId == 0) {
return _setup(this, cx); // force to disable DD
}
return Void();
} */
Future<Void> start(Database const& cx) override {
if (clientId == 0) {
@ -92,7 +95,8 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
.detail("ProcessID", randomSS1.locality.processId())
.detail("Address", randomSS1.address());
StorageServerInterface randomSS2 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
state StorageServerInterface randomSS2 =
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
while (randomSS1.locality.processId() == randomSS2.locality.processId()) {
randomSS2 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
}
@ -100,9 +104,9 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
.detail("ProcessID", randomSS2.locality.processId())
.detail("Address", randomSS2.address());
// std::string migrationLocality = LocalityData::keyProcessId.toString() + ":" +
// randomSS2.locality.processId()->toString();
std::string migrationLocality = LocalityData::keyProcessId.toString() + ":101010101";
std::string migrationLocality =
LocalityData::keyProcessId.toString() + ":" + randomSS2.locality.processId()->toString();
// std::string migrationLocality = LocalityData::keyProcessId.toString() + ":101010101";
bool change =
wait(IssueConfigurationChange(cx,
"storage_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 "
@ -116,12 +120,16 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
servers.push_back(AddressExclusion(randomSS1.address().ip, randomSS1.address().port));
state double timeout = 100.0;
wait(excludeServers(cx, servers));
TraceEvent("ZZZZZDoneExcludeServer").log();
// timeoutError() is needed because sometimes excluding process can take forever
std::set<NetworkAddress> inProgress = wait(timeoutError(checkForExcludingServers(cx, servers, true), timeout));
// std::set<NetworkAddress> inProgress = wait(timeoutError(checkForExcludingServers(cx, servers, true),
// timeout));
std::set<NetworkAddress> inProgress = wait(checkForExcludingServers(cx, servers, true));
TraceEvent("ZZZZZDoneCheckingExcludeServer").log();
wait(includeServers(cx, std::vector<AddressExclusion>(1)));
TraceEvent("ZZZZZIncludeServer").log();
// wait(delay(300));
state std::vector<StorageServerInterface> allSSes;
loop {
std::vector<StorageServerInterface> SSes = wait(getStorageServers(cx));
@ -135,23 +143,40 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
allSSes = SSes;
break;
}
wait(delay(10));
}
state int i = 0;
for (i = 0; i < allSSes.size(); ++i) {
state StorageServerInterface ssInterface = allSSes[i];
state ReplyPromise<KeyValueStoreType> typeReply;
ErrorOr<KeyValueStoreType> keyValueStoreType =
wait(ssInterface.getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0));
if (keyValueStoreType.present()) {
TraceEvent("ZZZZZKvStorageType")
.detail("SS", ssInterface.address())
.detail("StorageType", keyValueStoreType.get().toString());
} else {
TraceEvent("ZZZZZKvStorageType").detail("SS", ssInterface.address()).detail("StorageType", "None");
loop {
std::vector<StorageServerInterface> SSes = wait(getStorageServers(cx));
allSSes = SSes;
TraceEvent("ZZZZZCheckingStorageEngineType").log();
state int i = 0;
state bool doneCheckingWiggleStorage = false;
for (i = 0; i < allSSes.size(); ++i) {
state StorageServerInterface ssInterface = allSSes[i];
state ReplyPromise<KeyValueStoreType> typeReply;
ErrorOr<KeyValueStoreType> keyValueStoreType =
wait(ssInterface.getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0));
if (keyValueStoreType.present()) {
TraceEvent("ZZZZZKvStorageType")
.detail("SS", ssInterface.address())
.detail("StorageType", keyValueStoreType.get().toString());
if (ssInterface.address() == randomSS1.address()) {
ASSERT(keyValueStoreType.get().toString() == "ssd-rocksdb-v1");
}
if (ssInterface.address() == randomSS2.address()) {
if (keyValueStoreType.get().toString() == "ssd-rocksdb-v1") {
TraceEvent("ZZZZZWiggleDone").log();
doneCheckingWiggleStorage = true;
}
}
} else {
TraceEvent("ZZZZZKvStorageType").detail("SS", ssInterface.address()).detail("StorageType", "None");
}
}
if (ssInterface.address() == randomSS1.address()) {
ASSERT(keyValueStoreType.get().toString() == "ssd-rocksdb-v1");
if (doneCheckingWiggleStorage) {
break;
}
wait(delay(10));
}
TraceEvent("ZZZZZFinishTest").log();
return Void();

View File

@ -8,7 +8,7 @@ set(TEST_KEEP_LOGS "FAILED" CACHE STRING "Which logs to keep (NONE, FAILED, ALL)
set(TEST_KEEP_SIMDIR "NONE" CACHE STRING "Which simfdb directories to keep (NONE, FAILED, ALL)")
set(TEST_AGGREGATE_TRACES "NONE" CACHE STRING "Create aggregated trace files (NONE, FAILED, ALL)")
set(TEST_LOG_FORMAT "xml" CACHE STRING "Format for test trace files (xml, json)")
set(TEST_INCLUDE ".*" CACHE STRING "Include only tests that match the given regex")
set(TEST_INCLUDE ".*PerpetualWiggleStorageMigration.*" CACHE STRING "Include only tests that match the given regex")
set(TEST_EXCLUDE ".^" CACHE STRING "Exclude all tests matching the given regex")
set(SANITIZER_OPTIONS "UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1;TSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/contrib/tsan.suppressions;LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/contrib/lsan.suppressions" CACHE STRING "Environment variables setting sanitizer options")

View File

@ -1,6 +1,10 @@
[configuration]
config = 'triple'
storageEngineType = 0
processesPerMachine = 2
coordinators = 3
machineCount = 45
asanMachineCount = 20
encryptModes=['disabled']
tenantModes=['disabled']