Merge pull request #10754 from halfprice/zhewu/wiggle-storage-enging-locality
Perpetual wiggle storage migration using `perpetual_storage_wiggle_engine` config
This commit is contained in:
commit
bb970d6d61
|
@ -309,6 +309,8 @@ void configureGenerator(const char* text,
|
|||
"resolvers=",
|
||||
"perpetual_storage_wiggle=",
|
||||
"perpetual_storage_wiggle_locality=",
|
||||
// TODO(zhewu): update fdbcli command documentation.
|
||||
"perpetual_storage_wiggle_engine=",
|
||||
"storage_migration_type=",
|
||||
"tenant_mode=",
|
||||
"blob_granules_enabled=",
|
||||
|
|
|
@ -37,7 +37,8 @@ void DatabaseConfiguration::resetInternal() {
|
|||
commitProxyCount = grvProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor =
|
||||
storageTeamSize = desiredLogRouterCount = -1;
|
||||
tLogVersion = TLogVersion::DEFAULT;
|
||||
tLogDataStoreType = storageServerStoreType = testingStorageServerStoreType = KeyValueStoreType::END;
|
||||
tLogDataStoreType = storageServerStoreType = testingStorageServerStoreType = perpetualStoreType =
|
||||
KeyValueStoreType::END;
|
||||
desiredTSSCount = 0;
|
||||
tLogSpillType = TLogSpillType::DEFAULT;
|
||||
autoCommitProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_COMMIT_PROXIES;
|
||||
|
@ -384,6 +385,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
result["backup_worker_enabled"] = (int32_t)backupWorkerEnabled;
|
||||
result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed;
|
||||
result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality;
|
||||
if (perpetualStoreType.storeType() != KeyValueStoreType::END) {
|
||||
result["perpetual_storage_wiggle_engine"] = perpetualStoreType.toString();
|
||||
}
|
||||
result["storage_migration_type"] = storageMigrationType.toString();
|
||||
result["blob_granules_enabled"] = (int32_t)blobGranulesEnabled;
|
||||
result["tenant_mode"] = tenantMode.toString();
|
||||
|
@ -411,7 +415,8 @@ std::string DatabaseConfiguration::configureStringFromJSON(const StatusObject& j
|
|||
// For string values, some properties can set with a "<name>=<value>" syntax in "configure"
|
||||
// Such properites are listed here:
|
||||
static std::set<std::string> directSet = {
|
||||
"storage_migration_type", "tenant_mode", "encryption_at_rest_mode", "storage_engine", "log_engine"
|
||||
"storage_migration_type", "tenant_mode", "encryption_at_rest_mode",
|
||||
"storage_engine", "log_engine", "perpetual_storage_wiggle_engine"
|
||||
};
|
||||
|
||||
if (directSet.contains(kv.first)) {
|
||||
|
@ -665,6 +670,9 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
return false;
|
||||
}
|
||||
perpetualStorageWiggleLocality = value.toString();
|
||||
} else if (ck == "perpetual_storage_wiggle_engine"_sr) {
|
||||
parse((&type), value);
|
||||
perpetualStoreType = (KeyValueStoreType::StoreType)type;
|
||||
} else if (ck == "storage_migration_type"_sr) {
|
||||
parse((&type), value);
|
||||
storageMigrationType = (StorageMigrationType::MigrationType)type;
|
||||
|
|
|
@ -236,7 +236,7 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
|
|||
}
|
||||
}
|
||||
|
||||
if (key == "storage_engine" || key == "log_engine") {
|
||||
if (key == "storage_engine" || key == "log_engine" || key == "perpetual_storage_wiggle_engine") {
|
||||
StringRef s = value;
|
||||
|
||||
// Parse as engine_name[:p=v]... to handle future storage engine params
|
||||
|
|
|
@ -252,6 +252,7 @@ struct DatabaseConfiguration {
|
|||
// Perpetual Storage Setting
|
||||
int32_t perpetualStorageWiggleSpeed;
|
||||
std::string perpetualStorageWiggleLocality;
|
||||
KeyValueStoreType perpetualStoreType;
|
||||
|
||||
// Storage Migration Type
|
||||
StorageMigrationType storageMigrationType;
|
||||
|
|
|
@ -38,6 +38,19 @@ auto get(MapContainer& m, K const& k) -> decltype(m.at(k)) {
|
|||
return it->second;
|
||||
}
|
||||
|
||||
void ParsePerpetualStorageWiggleLocality(const std::string& localityKeyValue,
|
||||
Optional<Value>* localityKey,
|
||||
Optional<Value>* localityValue) {
|
||||
// parsing format is like "datahall:0"
|
||||
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
|
||||
|
||||
// get key and value from perpetual_storage_wiggle_locality.
|
||||
int split = localityKeyValue.find(':');
|
||||
*localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
|
||||
*localityValue =
|
||||
Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace data_distribution {
|
||||
|
@ -2385,6 +2398,24 @@ public:
|
|||
state InitializeStorageRequest isr;
|
||||
isr.storeType = recruitTss ? self->configuration.testingStorageServerStoreType
|
||||
: self->configuration.storageServerStoreType;
|
||||
|
||||
// Check if perpetual storage wiggle is enabled and perpetualStoreType is set. If so, we use
|
||||
// perpetualStoreType for all new SSes that match perpetualStorageWiggleLocality.
|
||||
// Note that this only applies to regular storage servers, not TSS.
|
||||
if (!recruitTss && self->configuration.storageMigrationType == StorageMigrationType::GRADUAL &&
|
||||
self->configuration.perpetualStoreType.storeType() != KeyValueStoreType::END) {
|
||||
if (self->configuration.perpetualStorageWiggleLocality == "0") {
|
||||
isr.storeType = self->configuration.perpetualStoreType;
|
||||
} else {
|
||||
Optional<Value> localityKey;
|
||||
Optional<Value> localityValue;
|
||||
ParsePerpetualStorageWiggleLocality(
|
||||
self->configuration.perpetualStorageWiggleLocality, &localityKey, &localityValue);
|
||||
if (candidateWorker.worker.locality.get(localityKey.get()) == localityValue) {
|
||||
isr.storeType = self->configuration.perpetualStoreType;
|
||||
}
|
||||
}
|
||||
}
|
||||
isr.seedTag = invalidTag;
|
||||
isr.reqId = deterministicRandom()->randomUniqueID();
|
||||
isr.interfaceId = interfaceId;
|
||||
|
@ -2489,7 +2520,8 @@ public:
|
|||
.detail("WorkerLocality", candidateWorker.worker.locality.toString())
|
||||
.detail("Interf", interfaceId)
|
||||
.detail("Addr", candidateWorker.worker.address())
|
||||
.detail("RecruitingStream", self->recruitingStream.get());
|
||||
.detail("RecruitingStream", self->recruitingStream.get())
|
||||
.detail("StoreType", isr.storeType);
|
||||
|
||||
if (newServer.present()) {
|
||||
UID id = newServer.get().interf.id();
|
||||
|
@ -3863,14 +3895,7 @@ Future<UID> DDTeamCollection::getNextWigglingServerID() {
|
|||
// NOTE: because normal \xff/conf change through `changeConfig` now will cause DD throw `movekeys_conflict()`
|
||||
// then recruit a new DD, we only need to read current configuration once
|
||||
if (configuration.perpetualStorageWiggleLocality != "0") {
|
||||
// parsing format is like "datahall:0"
|
||||
std::string& localityKeyValue = configuration.perpetualStorageWiggleLocality;
|
||||
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
|
||||
// get key and value from perpetual_storage_wiggle_locality.
|
||||
int split = localityKeyValue.find(':');
|
||||
localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
|
||||
localityValue = Optional<Value>(
|
||||
ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
|
||||
ParsePerpetualStorageWiggleLocality(configuration.perpetualStorageWiggleLocality, &localityKey, &localityValue);
|
||||
}
|
||||
|
||||
return DDTeamCollectionImpl::getNextWigglingServerID(storageWiggler, localityKey, localityValue, this);
|
||||
|
|
|
@ -162,7 +162,7 @@ void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int pr
|
|||
this->primaryDest.begin(), this->primaryDest.end(), shard.primaryDest.begin(), shard.primaryDest.end()) ||
|
||||
!std::equal(
|
||||
this->remoteDest.begin(), this->remoteDest.end(), shard.remoteDest.begin(), shard.remoteDest.end())) {
|
||||
TraceEvent(SevError, "DataMoveValidationError")
|
||||
TraceEvent(g_network->isSimulated() ? SevWarn : SevError, "DataMoveValidationError")
|
||||
.detail("Range", range)
|
||||
.detail("Reason", "DataMoveDestMissMatch")
|
||||
.detail("DataMoveMetaData", this->meta.toString())
|
||||
|
|
|
@ -348,7 +348,22 @@ public:
|
|||
std::set<Optional<Standalone<StringRef>>> includeDCs(req.includeDCs.begin(), req.includeDCs.end());
|
||||
std::set<AddressExclusion> excludedAddresses(req.excludeAddresses.begin(), req.excludeAddresses.end());
|
||||
|
||||
for (auto& it : id_worker)
|
||||
for (auto& it : id_worker) {
|
||||
TraceEvent(SevVerbose, "RecruitStorageTry")
|
||||
.detail("Worker", it.second.details.interf.address())
|
||||
.detail("WorkerAvailable", workerAvailable(it.second, false))
|
||||
.detail("RecoverDiskFiles", it.second.details.recoveredDiskFiles)
|
||||
.detail("NotExcludedMachine", !excludedMachines.count(it.second.details.interf.locality.zoneId()))
|
||||
.detail("IncludeDC",
|
||||
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())))
|
||||
.detail("NotExcludedAddress", !addressExcluded(excludedAddresses, it.second.details.interf.address()))
|
||||
.detail("NotExcludedAddress2",
|
||||
(!it.second.details.interf.secondaryAddress().present() ||
|
||||
!addressExcluded(excludedAddresses, it.second.details.interf.secondaryAddress().get())))
|
||||
.detail("MachineFitnessMatch",
|
||||
it.second.details.processClass.machineClassFitness(ProcessClass::Storage) <=
|
||||
ProcessClass::UnsetFit)
|
||||
.detail("MachineFitness", it.second.details.processClass.machineClassFitness(ProcessClass::Storage));
|
||||
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
|
||||
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
|
||||
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
|
||||
|
@ -358,6 +373,7 @@ public:
|
|||
it.second.details.processClass.machineClassFitness(ProcessClass::Storage) <= ProcessClass::UnsetFit) {
|
||||
return it.second.details;
|
||||
}
|
||||
}
|
||||
|
||||
if (req.criticalRecruitment) {
|
||||
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
|
||||
|
|
|
@ -949,6 +949,33 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
|||
if (!keyValueStoreType.present()) {
|
||||
TraceEvent("ConsistencyCheck_ServerUnavailable").detail("ServerID", storageServers[i].id());
|
||||
self->testFailure("Storage server unavailable");
|
||||
} else if (configuration.perpetualStoreType.storeType() != KeyValueStoreType::END) {
|
||||
// Perpetual storage wiggle is used to migrate storage. Check that the matched storage servers are
|
||||
// correctly migrated.
|
||||
if (wiggleLocalityKeyValue == "0" ||
|
||||
(storageServers[i].locality.get(wiggleLocalityKey).present() &&
|
||||
storageServers[i].locality.get(wiggleLocalityKey).get().toString() == wiggleLocalityValue)) {
|
||||
if (keyValueStoreType.get() != configuration.perpetualStoreType) {
|
||||
TraceEvent("ConsistencyCheck_WrongKeyValueStoreType")
|
||||
.detail("ServerID", storageServers[i].id())
|
||||
.detail("StoreType", keyValueStoreType.get().toString())
|
||||
.detail("DesiredType", configuration.perpetualStoreType.toString())
|
||||
.detail("IsPerpetualStoreType", true);
|
||||
self->testFailure("Storage server has wrong key-value store type");
|
||||
return true;
|
||||
}
|
||||
} else if ((!storageServers[i].isTss() &&
|
||||
keyValueStoreType.get() != configuration.storageServerStoreType) ||
|
||||
(storageServers[i].isTss() &&
|
||||
keyValueStoreType.get() != configuration.testingStorageServerStoreType)) {
|
||||
TraceEvent("ConsistencyCheck_WrongKeyValueStoreType")
|
||||
.detail("ServerID", storageServers[i].id())
|
||||
.detail("StoreType", keyValueStoreType.get().toString())
|
||||
.detail("DesiredType", configuration.perpetualStoreType.toString())
|
||||
.detail("IsPerpetualStoreType", false);
|
||||
self->testFailure("Storage server has wrong key-value store type");
|
||||
return true;
|
||||
}
|
||||
} else if (((!storageServers[i].isTss() &&
|
||||
keyValueStoreType.get() != configuration.storageServerStoreType) ||
|
||||
(storageServers[i].isTss() &&
|
||||
|
@ -959,7 +986,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
|||
TraceEvent("ConsistencyCheck_WrongKeyValueStoreType")
|
||||
.detail("ServerID", storageServers[i].id())
|
||||
.detail("StoreType", keyValueStoreType.get().toString())
|
||||
.detail("DesiredType", configuration.storageServerStoreType.toString());
|
||||
.detail("DesiredType", configuration.storageServerStoreType.toString())
|
||||
.detail("IsPerpetualStoreType", false);
|
||||
self->testFailure("Storage server has wrong key-value store type");
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ bool storageWiggleStatsEqual(StorageWiggleMetrics const& a, StorageWiggleMetrics
|
|||
return res;
|
||||
}
|
||||
|
||||
namespace {
|
||||
ACTOR Future<bool> IssueConfigurationChange(Database cx, std::string config, bool force) {
|
||||
printf("Issuing configuration change: %s\n", config.c_str());
|
||||
state ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), config, force));
|
||||
|
@ -54,6 +55,7 @@ ACTOR Future<bool> IssueConfigurationChange(Database cx, std::string config, boo
|
|||
wait(delay(5.0)); // wait for read window
|
||||
return true;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// a wrapper for test protected method
|
||||
struct DDTeamCollectionTester : public DDTeamCollection {
|
||||
|
|
|
@ -0,0 +1,230 @@
|
|||
/*
|
||||
* PerpetualWiggleStorageMigrationWorkload.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/DDSharedContext.h"
|
||||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbclient/VersionedMap.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbrpc/SimulatorProcessInfo.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
ACTOR Future<bool> IssueConfigurationChange(Database cx, std::string config, bool force) {
|
||||
printf("Issuing configuration change: %s\n", config.c_str());
|
||||
state ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), config, force));
|
||||
if (res != ConfigurationResult::SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
wait(delay(5.0)); // wait for read window
|
||||
return true;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
|
||||
|
||||
static constexpr auto NAME = "PerpetualWiggleStorageMigrationWorkload";
|
||||
|
||||
PerpetualWiggleStorageMigrationWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
|
||||
|
||||
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
|
||||
// This test requires exclude/include runs smoothly, so we disable all the failure injection workloads.
|
||||
out.insert("all");
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (clientId == 0) {
|
||||
return _start(this, cx);
|
||||
}
|
||||
return Void();
|
||||
};
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; };
|
||||
|
||||
ACTOR static Future<Void> _start(PerpetualWiggleStorageMigrationWorkload* self, Database cx) {
|
||||
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
|
||||
// The test should have enough storage servers to exclude.
|
||||
ASSERT(storageServers.size() > 3);
|
||||
|
||||
// Pick a storage process to exclude and later include. This process should always use storage engine from
|
||||
// `storage_engine` configuration.
|
||||
state StorageServerInterface ssToExcludeInclude =
|
||||
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
|
||||
state ISimulator::ProcessInfo* p = g_simulator->getProcessByAddress(ssToExcludeInclude.address());
|
||||
while (!p->isReliable()) {
|
||||
ssToExcludeInclude = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
|
||||
p = g_simulator->getProcessByAddress(ssToExcludeInclude.address());
|
||||
}
|
||||
|
||||
TraceEvent("Test_PickedProcessToExcludeInclude")
|
||||
.detail("ProcessID", ssToExcludeInclude.locality.processId())
|
||||
.detail("Address", ssToExcludeInclude.address());
|
||||
|
||||
// Pick a storage process to migrate to storage engine specified in `perpetual_storage_wiggle_engine`.
|
||||
state StorageServerInterface ssToWiggle =
|
||||
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
|
||||
while (ssToExcludeInclude.locality.processId() == ssToWiggle.locality.processId()) {
|
||||
ssToWiggle = storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
|
||||
}
|
||||
TraceEvent("Test_PickedProcessToMigrate")
|
||||
.detail("ProcessID", ssToWiggle.locality.processId())
|
||||
.detail("Address", ssToWiggle.address());
|
||||
|
||||
// Issue a configuration change to ONLY migrate `ssToWiggle` using perpetual wiggle.
|
||||
std::string migrationLocality =
|
||||
LocalityData::keyProcessId.toString() + ":" + ssToWiggle.locality.processId()->toString();
|
||||
bool change =
|
||||
wait(IssueConfigurationChange(cx,
|
||||
"perpetual_storage_wiggle_engine=ssd-rocksdb-v1 perpetual_storage_wiggle=1 "
|
||||
"storage_migration_type=gradual perpetual_storage_wiggle_locality=" +
|
||||
migrationLocality,
|
||||
true));
|
||||
TraceEvent("Test_ConfigChangeDone").detail("Success", change);
|
||||
ASSERT(change);
|
||||
|
||||
// Now, let's exclude `ssToExcludeInclude` process and include it again. The new SS created on this process
|
||||
// should always uses `storage_engine` config, which is `ssd-2`.
|
||||
state std::vector<AddressExclusion> servers;
|
||||
servers.push_back(AddressExclusion(ssToExcludeInclude.address().ip, ssToExcludeInclude.address().port));
|
||||
|
||||
// Since we have enough storage servers and there won't be any failure, let's use exclude failed to make sure
|
||||
// the exclude process can succeed.
|
||||
wait(excludeServers(cx, servers, true));
|
||||
TraceEvent("Test_DoneExcludeServer").log();
|
||||
|
||||
try {
|
||||
std::set<NetworkAddress> inProgress = wait(checkForExcludingServers(cx, servers, true));
|
||||
ASSERT(inProgress.empty());
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_timed_out) {
|
||||
// it might never be excluded from serverList
|
||||
TraceEvent(SevError, "Test_WaitingForExclusionTakeTooLong").log();
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
TraceEvent("Test_CheckingExcludeServerDone").log();
|
||||
|
||||
// Include all the processes the cluster knows.
|
||||
wait(includeServers(cx, std::vector<AddressExclusion>(1)));
|
||||
TraceEvent("Test_IncludeServerDone").log();
|
||||
|
||||
wait(validateDatabase(cx, ssToExcludeInclude, ssToWiggle));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> validateDatabase(Database cx,
|
||||
StorageServerInterface ssToExcludeInclude,
|
||||
StorageServerInterface ssToWiggle) {
|
||||
// Wait until `ssToExcludeInclude` to be recruited as storage server again.
|
||||
state int missingTargetCount = 0;
|
||||
loop {
|
||||
std::vector<StorageServerInterface> allStorageServers = wait(getStorageServers(cx));
|
||||
bool foundTarget = false;
|
||||
for (auto& ss : allStorageServers) {
|
||||
if (ss.address() == ssToExcludeInclude.address()) {
|
||||
foundTarget = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (foundTarget) {
|
||||
break;
|
||||
}
|
||||
++missingTargetCount;
|
||||
if (missingTargetCount > 5) {
|
||||
// Sometimes, the excluded storage process may not be recruited as storage server again (depending on
|
||||
// the process class). So we don't wait indefinitely here.
|
||||
break;
|
||||
}
|
||||
wait(delay(20));
|
||||
}
|
||||
|
||||
// Wait until wiggle process to migrate to new storage engine.
|
||||
state int missingWiggleStorageCount = 0;
|
||||
state std::vector<StorageServerInterface> allSSes;
|
||||
state bool doneCheckingWiggleStorage = false;
|
||||
state bool containWiggleStorage = false;
|
||||
loop {
|
||||
std::vector<StorageServerInterface> SSes = wait(getStorageServers(cx));
|
||||
allSSes = SSes;
|
||||
|
||||
state int i = 0;
|
||||
containWiggleStorage = false;
|
||||
doneCheckingWiggleStorage = false;
|
||||
for (i = 0; i < allSSes.size(); ++i) {
|
||||
state StorageServerInterface ssInterface = allSSes[i];
|
||||
state ReplyPromise<KeyValueStoreType> typeReply;
|
||||
ErrorOr<KeyValueStoreType> keyValueStoreType =
|
||||
wait(ssInterface.getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0));
|
||||
if (keyValueStoreType.present()) {
|
||||
TraceEvent(SevDebug, "Test_KvStorageType")
|
||||
.detail("StorageServer", ssInterface.address())
|
||||
.detail("StorageType", keyValueStoreType.get().toString());
|
||||
|
||||
if (ssInterface.address() == ssToExcludeInclude.address()) {
|
||||
// If `ssToExcludeInclude` exists, it must remain using `storage_engine` type.
|
||||
ASSERT(keyValueStoreType.get().toString() == "ssd-2");
|
||||
}
|
||||
if (ssInterface.address() == ssToWiggle.address()) {
|
||||
// If `ssToWiggle` exists, we wait until it is migrate to `perpetual_storage_wiggle_engine`.
|
||||
containWiggleStorage = true;
|
||||
if (keyValueStoreType.get().toString() == "ssd-rocksdb-v1") {
|
||||
doneCheckingWiggleStorage = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevDebug, "Test_KvStorageType")
|
||||
.detail("StorageServer", ssInterface.address())
|
||||
.detail("StorageType", "Unknown")
|
||||
.detail("Error", keyValueStoreType.getError().name());
|
||||
}
|
||||
}
|
||||
if (doneCheckingWiggleStorage) {
|
||||
break;
|
||||
}
|
||||
if (!containWiggleStorage) {
|
||||
++missingWiggleStorageCount;
|
||||
if (missingWiggleStorageCount > 5) {
|
||||
TraceEvent("Test_TimeoutWaitingForWiggleStorageToShowUp").log();
|
||||
break;
|
||||
}
|
||||
}
|
||||
wait(delay(20));
|
||||
}
|
||||
|
||||
if (!doneCheckingWiggleStorage) {
|
||||
// If we fail to validate that the wiggle storage has been migrated to new storage engine, sometimes it is
|
||||
// because after exclusion, the process may not be recruited as storage server, so we must not see it as a
|
||||
// storage engine in the last check.
|
||||
ASSERT(!containWiggleStorage);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override { return; }
|
||||
};
|
||||
|
||||
WorkloadFactory<PerpetualWiggleStorageMigrationWorkload> PerpetualWiggleStorageMigrationWorkload;
|
|
@ -275,6 +275,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml)
|
||||
add_fdb_test(TEST_FILES rare/LargeApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml)
|
||||
add_fdb_test(TEST_FILES rare/PerpetualWiggleStorageMigration.toml)
|
||||
add_fdb_test(TEST_FILES rare/RYWDisable.toml)
|
||||
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
||||
add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml)
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
[configuration]
|
||||
config = 'triple'
|
||||
storageEngineType = 0
|
||||
processesPerMachine = 2
|
||||
coordinators = 3
|
||||
machineCount = 45
|
||||
asanMachineCount = 20
|
||||
encryptModes=['disabled']
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'PerpetualWiggleStorageMigration'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'PerpetualWiggleStorageMigrationWorkload'
|
Loading…
Reference in New Issue