Add PerpetualWiggleStorageMigrationWorkload documentation.

This commit is contained in:
Zhe Wu 2023-08-09 10:56:48 -07:00
parent 17ae952f15
commit ab4ae712e8
4 changed files with 88 additions and 56 deletions

View File

@ -309,7 +309,7 @@ void configureGenerator(const char* text,
"resolvers=",
"perpetual_storage_wiggle=",
"perpetual_storage_wiggle_locality=",
"perpetual_storage_engine=",
"perpetual_storage_wiggle_engine=",
"storage_migration_type=",
"tenant_mode=",
"blob_granules_enabled=",

View File

@ -386,7 +386,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed;
result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality;
if (perpetualStoreType.storeType() != KeyValueStoreType::END) {
result["perpetual_storage_engine"] = perpetualStoreType.toString();
result["perpetual_storage_wiggle_engine"] = perpetualStoreType.toString();
}
result["storage_migration_type"] = storageMigrationType.toString();
result["blob_granules_enabled"] = (int32_t)blobGranulesEnabled;
@ -416,7 +416,7 @@ std::string DatabaseConfiguration::configureStringFromJSON(const StatusObject& j
// Such properites are listed here:
static std::set<std::string> directSet = {
"storage_migration_type", "tenant_mode", "encryption_at_rest_mode",
"storage_engine", "log_engine", "perpetual_storage_engine"
"storage_engine", "log_engine", "perpetual_storage_wiggle_engine"
};
if (directSet.contains(kv.first)) {
@ -670,7 +670,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
return false;
}
perpetualStorageWiggleLocality = value.toString();
} else if (ck == "perpetual_storage_engine"_sr) {
} else if (ck == "perpetual_storage_wiggle_engine"_sr) {
parse((&type), value);
perpetualStoreType = (KeyValueStoreType::StoreType)type;
} else if (ck == "storage_migration_type"_sr) {

View File

@ -236,7 +236,7 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
}
}
if (key == "storage_engine" || key == "log_engine" || key == "perpetual_storage_engine") {
if (key == "storage_engine" || key == "log_engine" || key == "perpetual_storage_wiggle_engine") {
StringRef s = value;
// Parse as engine_name[:p=v]... to handle future storage engine params

View File

@ -29,6 +29,7 @@
#include "fdbclient/VersionedMap.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbrpc/SimulatorProcessInfo.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
@ -46,11 +47,13 @@ ACTOR Future<bool> IssueConfigurationChange(Database cx, std::string config, boo
struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
static constexpr auto NAME = "PerpetualWiggleStorageMigrationWorkload";
StorageWiggleMetrics lastMetrics;
PerpetualWiggleStorageMigrationWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
// This test requires exclude/include runs smoothly, so we disable all the failure injection workloads.
out.insert("all");
}
Future<Void> start(Database const& cx) override {
if (clientId == 0) {
@ -63,119 +66,142 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
ACTOR static Future<Void> _start(PerpetualWiggleStorageMigrationWorkload* self, Database cx) {
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
if (storageServers.size() < 2) {
TraceEvent("ZZZZZTestDoesNotHaveEnoughStorageServer").detail("StorageServerCount", storageServers.size());
return Void();
}
state StorageServerInterface randomSS1 =
// The test should have enough storage servers to exclude.
ASSERT(storageServers.size() > 3);
// Pick a storage process to exclude and later include. This process should always use storage engine from
// `storage_engine` configuration.
state StorageServerInterface ssToExcludeInclude =
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
state ISimulator::ProcessInfo* p = g_simulator->getProcessByAddress(randomSS1.address());
state ISimulator::ProcessInfo* p = g_simulator->getProcessByAddress(ssToExcludeInclude.address());
while (!p->isReliable()) {
randomSS1 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
p = g_simulator->getProcessByAddress(randomSS1.address());
ssToExcludeInclude = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
p = g_simulator->getProcessByAddress(ssToExcludeInclude.address());
}
TraceEvent("ZZZZZFoundProcessToReboot")
.detail("ProcessID", randomSS1.locality.processId())
.detail("Address", randomSS1.address());
TraceEvent("Test_PickedProcessToExcludeInclude")
.detail("ProcessID", ssToExcludeInclude.locality.processId())
.detail("Address", ssToExcludeInclude.address());
state StorageServerInterface randomSS2 =
// Pick a storage process to migrate to storage engine specified in `perpetual_storage_wiggle_engine`.
state StorageServerInterface ssToWiggle =
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
while (randomSS1.locality.processId() == randomSS2.locality.processId()) {
randomSS2 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
while (ssToExcludeInclude.locality.processId() == ssToWiggle.locality.processId()) {
ssToWiggle = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
}
TraceEvent("ZZZZZFoundProcessToMigrate")
.detail("ProcessID", randomSS2.locality.processId())
.detail("Address", randomSS2.address());
TraceEvent("Test_PickedProcessToMigrate")
.detail("ProcessID", ssToWiggle.locality.processId())
.detail("Address", ssToWiggle.address());
// Issue a configuration change to ONLY migrate `ssToWiggle` using perpetual wiggle.
std::string migrationLocality =
LocalityData::keyProcessId.toString() + ":" + randomSS2.locality.processId()->toString();
// std::string migrationLocality = LocalityData::keyProcessId.toString() + ":101010101";
LocalityData::keyProcessId.toString() + ":" + ssToWiggle.locality.processId()->toString();
bool change =
wait(IssueConfigurationChange(cx,
"perpetual_storage_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 "
"perpetual_storage_wiggle_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 "
"storage_migration_type=gradual perpetual_storage_wiggle_locality=" +
migrationLocality,
true));
TraceEvent("ZZZZZConfigChangeResult").detail("Success", change);
TraceEvent("Test_ConfigChangeDone").detail("Success", change);
ASSERT(change);
// g_simulator->rebootProcess(p, ISimulator::KillType::RebootProcessAndDelete);
// Now, let's exclude `ssToExcludeInclude` process and include it again. The new SS created on this process
// should always uses `storage_engine` config, which is `ssd-2`.
state std::vector<AddressExclusion> servers;
servers.push_back(AddressExclusion(randomSS1.address().ip, randomSS1.address().port));
servers.push_back(AddressExclusion(ssToExcludeInclude.address().ip, ssToExcludeInclude.address().port));
// Since we have enough storage servers and there won't be any failure, let's use exclude failed to make sure
// the exclude process can succeed.
wait(excludeServers(cx, servers, true));
TraceEvent("ZZZZZDoneExcludeServer").log();
TraceEvent("Test_DoneExcludeServer").log();
try {
// timeoutError() is needed because sometimes excluding process can take forever
// state double timeout = 300.0;
// std::set<NetworkAddress> inProgress =
// wait(timeoutError(checkForExcludingServers(cx, servers, true), timeout));
std::set<NetworkAddress> inProgress = wait(checkForExcludingServers(cx, servers, true));
ASSERT(inProgress.empty());
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
// it might never be excluded from serverList
TraceEvent("ZZZZZWaitingForExclusionTakeTooLong").log();
return Void();
TraceEvent(SevError, "Test_WaitingForExclusionTakeTooLong").log();
}
throw e;
}
TraceEvent("ZZZZZDoneCheckingExcludeServer").log();
TraceEvent("Test_CheckingExcludeServerDone").log();
// Include all the processes the cluster knows.
wait(includeServers(cx, std::vector<AddressExclusion>(1)));
TraceEvent("ZZZZZIncludeServer").log();
TraceEvent("Test_IncludeServerDone").log();
state std::vector<StorageServerInterface> allSSes;
wait(validateDatabase(cx, ssToExcludeInclude, ssToWiggle));
return Void();
}
ACTOR static Future<Void> validateDatabase(Database cx,
StorageServerInterface ssToExcludeInclude,
StorageServerInterface ssToWiggle) {
// Wait until `ssToExcludeInclude` to be recruited as storage server again.
state int missingTargetCount = 0;
loop {
std::vector<StorageServerInterface> SSes = wait(getStorageServers(cx));
std::vector<StorageServerInterface> allStorageServers = wait(getStorageServers(cx));
bool foundTarget = false;
for (auto& ss : SSes) {
if (ss.address() == randomSS1.address()) {
for (auto& ss : allStorageServers) {
if (ss.address() == ssToExcludeInclude.address()) {
foundTarget = true;
break;
}
}
if (foundTarget) {
allSSes = SSes;
break;
}
++missingTargetCount;
if (missingTargetCount > 5) {
allSSes = SSes;
// Sometimes, the excluded storage process may not be recruited as storage server again (depending on
// the process class). So we don't wait indefinitely here.
break;
}
wait(delay(20));
}
// Wait until wiggle process to migrate to new storage engine.
state int missingWiggleStorageCount = 0;
state std::vector<StorageServerInterface> allSSes;
state bool doneCheckingWiggleStorage = false;
state bool wiggleStorageGone = false;
loop {
std::vector<StorageServerInterface> SSes = wait(getStorageServers(cx));
allSSes = SSes;
TraceEvent("ZZZZZCheckingStorageEngineType").log();
state int i = 0;
state bool doneCheckingWiggleStorage = false;
state bool containWiggleStorage = false;
doneCheckingWiggleStorage = false;
for (i = 0; i < allSSes.size(); ++i) {
state StorageServerInterface ssInterface = allSSes[i];
state ReplyPromise<KeyValueStoreType> typeReply;
ErrorOr<KeyValueStoreType> keyValueStoreType =
wait(ssInterface.getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0));
if (keyValueStoreType.present()) {
TraceEvent("ZZZZZKvStorageType")
.detail("SS", ssInterface.address())
TraceEvent(SevDebug, "Test_KvStorageType")
.detail("StorageServer", ssInterface.address())
.detail("StorageType", keyValueStoreType.get().toString());
if (ssInterface.address() == randomSS1.address()) {
if (ssInterface.address() == ssToExcludeInclude.address()) {
// If `ssToExcludeInclude` exists, it must remain using `storage_engine` type.
ASSERT(keyValueStoreType.get().toString() == "ssd-2");
}
if (ssInterface.address() == randomSS2.address()) {
if (ssInterface.address() == ssToWiggle.address()) {
// If `ssToWiggle` exists, we wait until it is migrate to `perpetual_storage_wiggle_engine`.
containWiggleStorage = true;
if (keyValueStoreType.get().toString() == "ssd-rocksdb-v1") {
TraceEvent("ZZZZZWiggleDone").log();
doneCheckingWiggleStorage = true;
}
}
} else {
TraceEvent("ZZZZZKvStorageType").detail("SS", ssInterface.address()).detail("StorageType", "None");
TraceEvent(SevDebug, "Test_KvStorageType")
.detail("StorageServer", ssInterface.address())
.detail("StorageType", "Unknown");
if (ssInterface.address() == ssToWiggle.address()) {
wiggleStorageGone = true;
}
}
}
if (doneCheckingWiggleStorage) {
@ -183,14 +209,20 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
}
if (!containWiggleStorage) {
++missingWiggleStorageCount;
if (missingWiggleStorageCount == 6) {
TraceEvent("ZZZZTimeoutWaitingForWiggleStorageToShowUp").log();
if (missingWiggleStorageCount > 5) {
TraceEvent("Test_TimeoutWaitingForWiggleStorageToShowUp").log();
break;
}
}
wait(delay(20));
}
TraceEvent("ZZZZZFinishTest").log();
if (!doneCheckingWiggleStorage) {
// If we fail to validate that the wiggle storage has been migrated to new storage engine, sometimes it is
// because after exclusion, the process may not be recruited as storage server, so we must not see it as a
// storage engine in the last check.
ASSERT(wiggleStorageGone);
}
return Void();
}