diff --git a/fdbcli/ConfigureCommand.actor.cpp b/fdbcli/ConfigureCommand.actor.cpp index 84cc176e72..71453d5c4a 100644 --- a/fdbcli/ConfigureCommand.actor.cpp +++ b/fdbcli/ConfigureCommand.actor.cpp @@ -309,7 +309,7 @@ void configureGenerator(const char* text, "resolvers=", "perpetual_storage_wiggle=", "perpetual_storage_wiggle_locality=", - "perpetual_storage_engine=", + "perpetual_storage_wiggle_engine=", "storage_migration_type=", "tenant_mode=", "blob_granules_enabled=", diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index 5042436dd0..76ba07f1ff 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -386,7 +386,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const { result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed; result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality; if (perpetualStoreType.storeType() != KeyValueStoreType::END) { - result["perpetual_storage_engine"] = perpetualStoreType.toString(); + result["perpetual_storage_wiggle_engine"] = perpetualStoreType.toString(); } result["storage_migration_type"] = storageMigrationType.toString(); result["blob_granules_enabled"] = (int32_t)blobGranulesEnabled; @@ -416,7 +416,7 @@ std::string DatabaseConfiguration::configureStringFromJSON(const StatusObject& j // Such properites are listed here: static std::set directSet = { "storage_migration_type", "tenant_mode", "encryption_at_rest_mode", - "storage_engine", "log_engine", "perpetual_storage_engine" + "storage_engine", "log_engine", "perpetual_storage_wiggle_engine" }; if (directSet.contains(kv.first)) { @@ -670,7 +670,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) { return false; } perpetualStorageWiggleLocality = value.toString(); - } else if (ck == "perpetual_storage_engine"_sr) { + } else if (ck == "perpetual_storage_wiggle_engine"_sr) { parse((&type), value); perpetualStoreType = (KeyValueStoreType::StoreType)type; } else if (ck == "storage_migration_type"_sr) { diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 16d2efa465..1ec9259c2f 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -236,7 +236,7 @@ std::map configForToken(std::string const& mode) { } } - if (key == "storage_engine" || key == "log_engine" || key == "perpetual_storage_engine") { + if (key == "storage_engine" || key == "log_engine" || key == "perpetual_storage_wiggle_engine") { StringRef s = value; // Parse as engine_name[:p=v]... to handle future storage engine params diff --git a/fdbserver/workloads/PerpetualWiggleStorageMigrationWorkload.actor.cpp b/fdbserver/workloads/PerpetualWiggleStorageMigrationWorkload.actor.cpp index 06b8de83ed..55f0d7c4c0 100644 --- a/fdbserver/workloads/PerpetualWiggleStorageMigrationWorkload.actor.cpp +++ b/fdbserver/workloads/PerpetualWiggleStorageMigrationWorkload.actor.cpp @@ -29,6 +29,7 @@ #include "fdbclient/VersionedMap.h" #include "fdbclient/ReadYourWrites.h" #include "fdbrpc/SimulatorProcessInfo.h" + #include "flow/actorcompiler.h" // This must be the last #include. namespace { @@ -46,11 +47,13 @@ ACTOR Future IssueConfigurationChange(Database cx, std::string config, boo struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload { static constexpr auto NAME = "PerpetualWiggleStorageMigrationWorkload"; - StorageWiggleMetrics lastMetrics; PerpetualWiggleStorageMigrationWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} - void disableFailureInjectionWorkloads(std::set& out) const override { out.insert("all"); } + void disableFailureInjectionWorkloads(std::set& out) const override { + // This test requires exclude/include runs smoothly, so we disable all the failure injection workloads. + out.insert("all"); + } Future start(Database const& cx) override { if (clientId == 0) { @@ -63,119 +66,142 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload { ACTOR static Future _start(PerpetualWiggleStorageMigrationWorkload* self, Database cx) { state std::vector storageServers = wait(getStorageServers(cx)); - if (storageServers.size() < 2) { - TraceEvent("ZZZZZTestDoesNotHaveEnoughStorageServer").detail("StorageServerCount", storageServers.size()); - return Void(); - } - state StorageServerInterface randomSS1 = + // The test should have enough storage servers to exclude. + ASSERT(storageServers.size() > 3); + + // Pick a storage process to exclude and later include. This process should always use storage engine from + // `storage_engine` configuration. + state StorageServerInterface ssToExcludeInclude = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; - state ISimulator::ProcessInfo* p = g_simulator->getProcessByAddress(randomSS1.address()); + state ISimulator::ProcessInfo* p = g_simulator->getProcessByAddress(ssToExcludeInclude.address()); while (!p->isReliable()) { - randomSS1 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; - p = g_simulator->getProcessByAddress(randomSS1.address()); + ssToExcludeInclude = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; + p = g_simulator->getProcessByAddress(ssToExcludeInclude.address()); } - TraceEvent("ZZZZZFoundProcessToReboot") - .detail("ProcessID", randomSS1.locality.processId()) - .detail("Address", randomSS1.address()); + TraceEvent("Test_PickedProcessToExcludeInclude") + .detail("ProcessID", ssToExcludeInclude.locality.processId()) + .detail("Address", ssToExcludeInclude.address()); - state StorageServerInterface randomSS2 = + // Pick a storage process to migrate to storage engine specified in `perpetual_storage_wiggle_engine`. + state StorageServerInterface ssToWiggle = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; - while (randomSS1.locality.processId() == randomSS2.locality.processId()) { - randomSS2 = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; + while (ssToExcludeInclude.locality.processId() == ssToWiggle.locality.processId()) { + ssToWiggle = storageServers[deterministicRandom()->randomInt(0, storageServers.size())]; } - TraceEvent("ZZZZZFoundProcessToMigrate") - .detail("ProcessID", randomSS2.locality.processId()) - .detail("Address", randomSS2.address()); + TraceEvent("Test_PickedProcessToMigrate") + .detail("ProcessID", ssToWiggle.locality.processId()) + .detail("Address", ssToWiggle.address()); + // Issue a configuration change to ONLY migrate `ssToWiggle` using perpetual wiggle. std::string migrationLocality = - LocalityData::keyProcessId.toString() + ":" + randomSS2.locality.processId()->toString(); - // std::string migrationLocality = LocalityData::keyProcessId.toString() + ":101010101"; + LocalityData::keyProcessId.toString() + ":" + ssToWiggle.locality.processId()->toString(); bool change = wait(IssueConfigurationChange(cx, - "perpetual_storage_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 " + "perpetual_storage_wiggle_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 " "storage_migration_type=gradual perpetual_storage_wiggle_locality=" + migrationLocality, true)); - TraceEvent("ZZZZZConfigChangeResult").detail("Success", change); + TraceEvent("Test_ConfigChangeDone").detail("Success", change); + ASSERT(change); - // g_simulator->rebootProcess(p, ISimulator::KillType::RebootProcessAndDelete); + // Now, let's exclude `ssToExcludeInclude` process and include it again. The new SS created on this process + // should always uses `storage_engine` config, which is `ssd-2`. state std::vector servers; - servers.push_back(AddressExclusion(randomSS1.address().ip, randomSS1.address().port)); + servers.push_back(AddressExclusion(ssToExcludeInclude.address().ip, ssToExcludeInclude.address().port)); + + // Since we have enough storage servers and there won't be any failure, let's use exclude failed to make sure + // the exclude process can succeed. wait(excludeServers(cx, servers, true)); - TraceEvent("ZZZZZDoneExcludeServer").log(); + TraceEvent("Test_DoneExcludeServer").log(); try { - // timeoutError() is needed because sometimes excluding process can take forever - // state double timeout = 300.0; - // std::set inProgress = - // wait(timeoutError(checkForExcludingServers(cx, servers, true), timeout)); std::set inProgress = wait(checkForExcludingServers(cx, servers, true)); ASSERT(inProgress.empty()); } catch (Error& e) { if (e.code() == error_code_timed_out) { // it might never be excluded from serverList - TraceEvent("ZZZZZWaitingForExclusionTakeTooLong").log(); - return Void(); + TraceEvent(SevError, "Test_WaitingForExclusionTakeTooLong").log(); } throw e; } - TraceEvent("ZZZZZDoneCheckingExcludeServer").log(); + TraceEvent("Test_CheckingExcludeServerDone").log(); + // Include all the processes the cluster knows. wait(includeServers(cx, std::vector(1))); - TraceEvent("ZZZZZIncludeServer").log(); + TraceEvent("Test_IncludeServerDone").log(); - state std::vector allSSes; + wait(validateDatabase(cx, ssToExcludeInclude, ssToWiggle)); + return Void(); + } + + ACTOR static Future validateDatabase(Database cx, + StorageServerInterface ssToExcludeInclude, + StorageServerInterface ssToWiggle) { + // Wait until `ssToExcludeInclude` to be recruited as storage server again. state int missingTargetCount = 0; loop { - std::vector SSes = wait(getStorageServers(cx)); + std::vector allStorageServers = wait(getStorageServers(cx)); bool foundTarget = false; - for (auto& ss : SSes) { - if (ss.address() == randomSS1.address()) { + for (auto& ss : allStorageServers) { + if (ss.address() == ssToExcludeInclude.address()) { foundTarget = true; + break; } } if (foundTarget) { - allSSes = SSes; break; } ++missingTargetCount; if (missingTargetCount > 5) { - allSSes = SSes; + // Sometimes, the excluded storage process may not be recruited as storage server again (depending on + // the process class). So we don't wait indefinitely here. break; } wait(delay(20)); } + + // Wait until wiggle process to migrate to new storage engine. state int missingWiggleStorageCount = 0; + state std::vector allSSes; + state bool doneCheckingWiggleStorage = false; + state bool wiggleStorageGone = false; loop { std::vector SSes = wait(getStorageServers(cx)); allSSes = SSes; - TraceEvent("ZZZZZCheckingStorageEngineType").log(); + state int i = 0; - state bool doneCheckingWiggleStorage = false; state bool containWiggleStorage = false; + doneCheckingWiggleStorage = false; for (i = 0; i < allSSes.size(); ++i) { state StorageServerInterface ssInterface = allSSes[i]; state ReplyPromise typeReply; ErrorOr keyValueStoreType = wait(ssInterface.getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0)); if (keyValueStoreType.present()) { - TraceEvent("ZZZZZKvStorageType") - .detail("SS", ssInterface.address()) + TraceEvent(SevDebug, "Test_KvStorageType") + .detail("StorageServer", ssInterface.address()) .detail("StorageType", keyValueStoreType.get().toString()); - if (ssInterface.address() == randomSS1.address()) { + + if (ssInterface.address() == ssToExcludeInclude.address()) { + // If `ssToExcludeInclude` exists, it must remain using `storage_engine` type. ASSERT(keyValueStoreType.get().toString() == "ssd-2"); } - if (ssInterface.address() == randomSS2.address()) { + if (ssInterface.address() == ssToWiggle.address()) { + // If `ssToWiggle` exists, we wait until it is migrate to `perpetual_storage_wiggle_engine`. containWiggleStorage = true; if (keyValueStoreType.get().toString() == "ssd-rocksdb-v1") { - TraceEvent("ZZZZZWiggleDone").log(); doneCheckingWiggleStorage = true; } } } else { - TraceEvent("ZZZZZKvStorageType").detail("SS", ssInterface.address()).detail("StorageType", "None"); + TraceEvent(SevDebug, "Test_KvStorageType") + .detail("StorageServer", ssInterface.address()) + .detail("StorageType", "Unknown"); + if (ssInterface.address() == ssToWiggle.address()) { + wiggleStorageGone = true; + } } } if (doneCheckingWiggleStorage) { @@ -183,14 +209,20 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload { } if (!containWiggleStorage) { ++missingWiggleStorageCount; - if (missingWiggleStorageCount == 6) { - TraceEvent("ZZZZTimeoutWaitingForWiggleStorageToShowUp").log(); + if (missingWiggleStorageCount > 5) { + TraceEvent("Test_TimeoutWaitingForWiggleStorageToShowUp").log(); break; } } wait(delay(20)); } - TraceEvent("ZZZZZFinishTest").log(); + + if (!doneCheckingWiggleStorage) { + // If we fail to validate that the wiggle storage has been migrated to new storage engine, sometimes it is + // because after exclusion, the process may not be recruited as storage server, so we must not see it as a + // storage engine in the last check. + ASSERT(wiggleStorageGone); + } return Void(); }