Merge pull request #10856 from halfprice/zhewu/wiggle-locality-list

Make `perpetual_storage_wiggle_locality` database option to take a list of localities
This commit is contained in:
Zhe Wu 2023-09-20 15:42:25 -07:00 committed by GitHub
commit 87083652a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 72 deletions

View File

@ -21,6 +21,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/NativeAPI.actor.h"
#include <boost/algorithm/string.hpp>
KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, Optional<KeyRef> prefix) {
if (!prefix.present() || prefix.get().empty()) {
@ -251,4 +252,143 @@ KeyValueStoreType KeyValueStoreType::fromString(const std::string& str) {
throw unknown_storage_engine();
}
return it->second;
}
TEST_CASE("/PerpetualStorageWiggleLocality/Validation") {
ASSERT(isValidPerpetualStorageWiggleLocality("aaa:bbb"));
ASSERT(isValidPerpetualStorageWiggleLocality("aaa:bbb;ccc:ddd"));
ASSERT(isValidPerpetualStorageWiggleLocality("0"));
ASSERT(!isValidPerpetualStorageWiggleLocality("aaa:bbb;"));
ASSERT(!isValidPerpetualStorageWiggleLocality("aaa:bbb;ccc"));
ASSERT(!isValidPerpetualStorageWiggleLocality(""));
return Void();
}
std::vector<std::pair<Optional<Value>, Optional<Value>>> ParsePerpetualStorageWiggleLocality(
const std::string& localityKeyValues) {
// parsing format is like "datahall:0<;locality:filter>"
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValues));
std::vector<std::pair<Optional<Value>, Optional<Value>>> parsedLocalities;
if (localityKeyValues == "0") {
return parsedLocalities;
}
std::vector<std::string> splitLocalityKeyValues;
boost::split(splitLocalityKeyValues, localityKeyValues, [](char c) { return c == ';'; });
for (const auto& localityKeyValue : splitLocalityKeyValues) {
ASSERT(!localityKeyValue.empty());
// get key and value from perpetual_storage_wiggle_locality.
int split = localityKeyValue.find(':');
auto key = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
auto value = Optional<Value>(
ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
parsedLocalities.push_back(std::make_pair(key, value));
}
return parsedLocalities;
}
bool localityMatchInList(const std::vector<std::pair<Optional<Value>, Optional<Value>>>& localityKeyValues,
const LocalityData& locality) {
for (const auto& [localityKey, localityValue] : localityKeyValues) {
if (locality.get(localityKey.get()) == localityValue) {
return true;
}
}
return false;
}
TEST_CASE("/PerpetualStorageWiggleLocality/ParsePerpetualStorageWiggleLocality") {
{
auto localityKeyValues = ParsePerpetualStorageWiggleLocality("aaa:bbb");
ASSERT(localityKeyValues.size() == 1);
ASSERT(localityKeyValues[0].first.get() == "aaa");
ASSERT(localityKeyValues[0].second.get() == "bbb");
{
LocalityData locality;
locality.set("aaa"_sr, "bbb"_sr);
ASSERT(localityMatchInList(localityKeyValues, locality));
}
{
LocalityData locality;
locality.set("aaa"_sr, "ccc"_sr);
ASSERT(!localityMatchInList(localityKeyValues, locality));
}
}
{
auto localityKeyValues = ParsePerpetualStorageWiggleLocality("aaa:bbb;ccc:ddd");
ASSERT(localityKeyValues.size() == 2);
ASSERT(localityKeyValues[0].first.get() == "aaa");
ASSERT(localityKeyValues[0].second.get() == "bbb");
ASSERT(localityKeyValues[1].first.get() == "ccc");
ASSERT(localityKeyValues[1].second.get() == "ddd");
{
LocalityData locality;
locality.set("aaa"_sr, "bbb"_sr);
ASSERT(localityMatchInList(localityKeyValues, locality));
}
{
LocalityData locality;
locality.set("ccc"_sr, "ddd"_sr);
ASSERT(localityMatchInList(localityKeyValues, locality));
}
{
LocalityData locality;
locality.set("aaa"_sr, "ddd"_sr);
ASSERT(!localityMatchInList(localityKeyValues, locality));
}
}
{
auto localityKeyValues = ParsePerpetualStorageWiggleLocality("aaa:111;bbb:222;ccc:3dd");
ASSERT(localityKeyValues.size() == 3);
ASSERT(localityKeyValues[0].first.get() == "aaa");
ASSERT(localityKeyValues[0].second.get() == "111");
ASSERT(localityKeyValues[1].first.get() == "bbb");
ASSERT(localityKeyValues[1].second.get() == "222");
ASSERT(localityKeyValues[2].first.get() == "ccc");
ASSERT(localityKeyValues[2].second.get() == "3dd");
{
LocalityData locality;
locality.set("aaa"_sr, "111"_sr);
ASSERT(localityMatchInList(localityKeyValues, locality));
}
{
LocalityData locality;
locality.set("bbb"_sr, "222"_sr);
ASSERT(localityMatchInList(localityKeyValues, locality));
}
{
LocalityData locality;
locality.set("ccc"_sr, "222"_sr);
ASSERT(!localityMatchInList(localityKeyValues, locality));
}
}
{
auto localityKeyValues = ParsePerpetualStorageWiggleLocality("0");
ASSERT(localityKeyValues.empty());
{
LocalityData locality;
locality.set("aaa"_sr, "111"_sr);
ASSERT(!localityMatchInList(localityKeyValues, locality));
}
}
return Void();
}

View File

@ -24,6 +24,7 @@
#include <algorithm>
#include <array>
#include <cinttypes>
#include <regex>
#include <set>
#include <string>
#include <variant>
@ -35,6 +36,7 @@
#include "flow/ProtocolVersion.h"
#include "flow/flow.h"
#include "fdbclient/Status.h"
#include "fdbrpc/Locality.h"
typedef int64_t Version;
typedef uint64_t LogEpoch;
@ -1612,12 +1614,23 @@ struct DatabaseSharedState {
: protocolVersion(currentProtocolVersion()), mutexLock(Mutex()), grvCacheSpace(GRVCacheSpace()), refCount(0) {}
};
const static std::regex wiggleLocalityValidation("(\\w+:\\w+)(;\\w+:\\w+)*");
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
int pos = locality.find(':');
// locality should be either 0 or in the format '<non_empty_string>:<non_empty_string>'
return ((pos > 0 && pos < locality.size() - 1) || locality == "0");
if (locality == "0") {
return true;
}
return std::regex_match(locality, wiggleLocalityValidation);
}
// Parses `perpetual_storage_wiggle_locality` database option.
std::vector<std::pair<Optional<Value>, Optional<Value>>> ParsePerpetualStorageWiggleLocality(
const std::string& localityKeyValues);
// Whether the locality matches any locality filter in `localityKeyValues` (which is supposed to be parsed from
// ParsePerpetualStorageWiggleLocality).
bool localityMatchInList(const std::vector<std::pair<Optional<Value>, Optional<Value>>>& localityKeyValues,
const LocalityData& locality);
// matches what's in fdb_c.h
struct ReadBlobGranuleContext {
// User context to pass along to functions

View File

@ -42,19 +42,6 @@ auto get(MapContainer& m, K const& k) -> decltype(m.at(k)) {
return it->second;
}
void ParsePerpetualStorageWiggleLocality(const std::string& localityKeyValue,
Optional<Value>* localityKey,
Optional<Value>* localityValue) {
// parsing format is like "datahall:0"
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
// get key and value from perpetual_storage_wiggle_locality.
int split = localityKeyValue.find(':');
*localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
*localityValue =
Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
}
} // namespace
namespace data_distribution {
@ -2421,11 +2408,9 @@ public:
if (self->configuration.perpetualStorageWiggleLocality == "0") {
isr.storeType = self->configuration.perpetualStoreType;
} else {
Optional<Value> localityKey;
Optional<Value> localityValue;
ParsePerpetualStorageWiggleLocality(
self->configuration.perpetualStorageWiggleLocality, &localityKey, &localityValue);
if (candidateWorker.worker.locality.get(localityKey.get()) == localityValue) {
std::vector<std::pair<Optional<Value>, Optional<Value>>> localityKeyValues =
ParsePerpetualStorageWiggleLocality(self->configuration.perpetualStorageWiggleLocality);
if (localityMatchInList(localityKeyValues, candidateWorker.worker.locality)) {
isr.storeType = self->configuration.perpetualStoreType;
}
}
@ -2973,10 +2958,11 @@ public:
}
}
ACTOR static Future<UID> getNextWigglingServerID(Reference<StorageWiggler> wiggler,
Optional<Value> localityKey = Optional<Value>(),
Optional<Value> localityValue = Optional<Value>(),
DDTeamCollection* teamCollection = nullptr) {
ACTOR static Future<UID> getNextWigglingServerID(
Reference<StorageWiggler> wiggler,
std::vector<std::pair<Optional<Value>, Optional<Value>>> localityKeyValues =
std::vector<std::pair<Optional<Value>, Optional<Value>>>(),
DDTeamCollection* teamCollection = nullptr) {
ASSERT(wiggler->teamCollection == teamCollection);
loop {
// when the DC need more
@ -2988,12 +2974,11 @@ public:
}
// if perpetual_storage_wiggle_locality has value and not 0(disabled).
if (localityKey.present()) {
if (!localityKeyValues.empty()) {
// Whether the selected server matches the locality
auto server = teamCollection->server_info.at(id.get());
// TraceEvent("PerpetualLocality").detail("Server", server->getLastKnownInterface().locality.get(localityKey)).detail("Desire", localityValue);
if (server->getLastKnownInterface().locality.get(localityKey.get()) == localityValue) {
if (localityMatchInList(localityKeyValues, server->getLastKnownInterface().locality)) {
return id.get();
}
@ -3018,21 +3003,21 @@ public:
// SOMEDAY: support wiggle multiple SS at once
ASSERT(!self->wigglingId.present()); // only single process wiggle is allowed
Optional<Value> localityKey;
Optional<Value> localityValue;
std::vector<std::pair<Optional<Value>, Optional<Value>>> localityKeyValues;
if (self->configuration.perpetualStorageWiggleLocality != "0") {
ParsePerpetualStorageWiggleLocality(
self->configuration.perpetualStorageWiggleLocality, &localityKey, &localityValue);
localityKeyValues =
ParsePerpetualStorageWiggleLocality(self->configuration.perpetualStorageWiggleLocality);
}
// if perpetual_storage_wiggle_locality has value and not 0(disabled).
if (localityKey.present()) {
if (!localityKeyValues.empty()) {
if (self->server_info.count(res.begin()->first)) {
auto server = self->server_info.at(res.begin()->first);
// Update the wigglingId only if it matches the locality.
if (server->getLastKnownInterface().locality.get(localityKey.get()) == localityValue) {
self->wigglingId = res.begin()->first;
for (const auto& [localityKey, localityValue] : localityKeyValues) {
// Update the wigglingId only if it matches the locality.
if (server->getLastKnownInterface().locality.get(localityKey.get()) == localityValue) {
self->wigglingId = res.begin()->first;
}
}
}
} else {
@ -3956,16 +3941,14 @@ Future<Void> DDTeamCollection::monitorHealthyTeams() {
}
Future<UID> DDTeamCollection::getNextWigglingServerID() {
Optional<Value> localityKey;
Optional<Value> localityValue;
std::vector<std::pair<Optional<Value>, Optional<Value>>> localityKeyValues;
// NOTE: because normal \xff/conf change through `changeConfig` now will cause DD throw `movekeys_conflict()`
// then recruit a new DD, we only need to read current configuration once
if (configuration.perpetualStorageWiggleLocality != "0") {
ParsePerpetualStorageWiggleLocality(configuration.perpetualStorageWiggleLocality, &localityKey, &localityValue);
localityKeyValues = ParsePerpetualStorageWiggleLocality(configuration.perpetualStorageWiggleLocality);
}
return DDTeamCollectionImpl::getNextWigglingServerID(storageWiggler, localityKey, localityValue, this);
return DDTeamCollectionImpl::getNextWigglingServerID(storageWiggler, localityKeyValues, this);
}
Future<Void> DDTeamCollection::readStorageWiggleMap() {
@ -6740,8 +6723,7 @@ TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithTSS") {
KeyValueStoreType::SSD_BTREE_V2));
ASSERT(!wiggler->getNextServerId(true).present());
ASSERT(wiggler->getNextServerId(collection->reachTSSPairTarget()) == UID(1, 0));
UID id = wait(
DDTeamCollectionImpl::getNextWigglingServerID(wiggler, Optional<Value>(), Optional<Value>(), collection.get()));
UID id = wait(DDTeamCollectionImpl::getNextWigglingServerID(wiggler, {}, collection.get()));
ASSERT(now() - startTime < SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0);
ASSERT(id == UID(2, 0));
return Void();

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <algorithm>
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
@ -305,14 +307,9 @@ struct ConfigureDatabaseWorkload : TestWorkload {
state DatabaseConfiguration conf = wait(getDatabaseConfiguration(cx));
state std::string wiggleLocalityKeyValue = conf.perpetualStorageWiggleLocality;
state std::string wiggleLocalityKey;
state std::string wiggleLocalityValue;
state std::vector<std::pair<Optional<Value>, Optional<Value>>> wiggleLocalityKeyValues =
ParsePerpetualStorageWiggleLocality(wiggleLocalityKeyValue);
state int i;
if (wiggleLocalityKeyValue != "0") {
int split = wiggleLocalityKeyValue.find(':');
wiggleLocalityKey = wiggleLocalityKeyValue.substr(0, split);
wiggleLocalityValue = wiggleLocalityKeyValue.substr(split + 1);
}
state bool pass = true;
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
@ -321,8 +318,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
// Check that each storage server has the correct key value store type
if (!storageServers[i].isTss() &&
(wiggleLocalityKeyValue == "0" ||
(storageServers[i].locality.get(wiggleLocalityKey).present() &&
storageServers[i].locality.get(wiggleLocalityKey).get().toString() == wiggleLocalityValue))) {
localityMatchInList(wiggleLocalityKeyValues, storageServers[i].locality))) {
ReplyPromise<KeyValueStoreType> typeReply;
ErrorOr<KeyValueStoreType> keyValueStoreType =
wait(storageServers[i].getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0));
@ -478,19 +474,31 @@ struct ConfigureDatabaseWorkload : TestWorkload {
state std::string randomPerpetualWiggleLocality;
if (deterministicRandom()->random01() < 0.25) {
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
StorageServerInterface randomSS =
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
std::string localityFilter;
int selectSSCount =
deterministicRandom()->randomInt(1, std::min(4, (int)(storageServers.size())));
std::vector<StringRef> localityKeys = { LocalityData::keyDcId,
LocalityData::keyDataHallId,
LocalityData::keyZoneId,
LocalityData::keyMachineId,
LocalityData::keyProcessId };
StringRef randomLocalityKey =
localityKeys[deterministicRandom()->randomInt(0, localityKeys.size())];
if (randomSS.locality.isPresent(randomLocalityKey)) {
randomPerpetualWiggleLocality =
" perpetual_storage_wiggle_locality=" + randomLocalityKey.toString() + ":" +
randomSS.locality.get(randomLocalityKey).get().toString();
for (int i = 0; i < selectSSCount; ++i) {
StorageServerInterface randomSS =
storageServers[deterministicRandom()->randomInt(0, storageServers.size())];
StringRef randomLocalityKey =
localityKeys[deterministicRandom()->randomInt(0, localityKeys.size())];
if (randomSS.locality.isPresent(randomLocalityKey)) {
if (localityFilter.size() > 0) {
localityFilter += ";";
}
localityFilter += randomLocalityKey.toString() + ":" +
randomSS.locality.get(randomLocalityKey).get().toString();
}
}
if (localityFilter.size() > 0) {
TraceEvent("ConfigureTestSettingWiggleLocality").detail("LocalityFilter", localityFilter);
randomPerpetualWiggleLocality = " perpetual_storage_wiggle_locality=" + localityFilter;
}
}

View File

@ -931,13 +931,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
state int j;
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
state std::string wiggleLocalityKeyValue = configuration.perpetualStorageWiggleLocality;
state std::string wiggleLocalityKey;
state std::string wiggleLocalityValue;
if (wiggleLocalityKeyValue != "0") {
int split = wiggleLocalityKeyValue.find(':');
wiggleLocalityKey = wiggleLocalityKeyValue.substr(0, split);
wiggleLocalityValue = wiggleLocalityKeyValue.substr(split + 1);
}
state std::vector<std::pair<Optional<Value>, Optional<Value>>> wiggleLocalityKeyValues =
ParsePerpetualStorageWiggleLocality(configuration.perpetualStorageWiggleLocality);
// Check each pair of storage servers for an address match
for (i = 0; i < storageServers.size(); i++) {
@ -953,8 +948,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
// Perpetual storage wiggle is used to migrate storage. Check that the matched storage servers are
// correctly migrated.
if (wiggleLocalityKeyValue == "0" ||
(storageServers[i].locality.get(wiggleLocalityKey).present() &&
storageServers[i].locality.get(wiggleLocalityKey).get().toString() == wiggleLocalityValue)) {
localityMatchInList(wiggleLocalityKeyValues, storageServers[i].locality)) {
if (keyValueStoreType.get() != configuration.perpetualStoreType) {
TraceEvent("ConsistencyCheck_WrongKeyValueStoreType")
.detail("ServerID", storageServers[i].id())
@ -981,8 +975,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
(storageServers[i].isTss() &&
keyValueStoreType.get() != configuration.testingStorageServerStoreType)) &&
(wiggleLocalityKeyValue == "0" ||
(storageServers[i].locality.get(wiggleLocalityKey).present() &&
storageServers[i].locality.get(wiggleLocalityKey).get().toString() == wiggleLocalityValue))) {
localityMatchInList(wiggleLocalityKeyValues, storageServers[i].locality))) {
TraceEvent("ConsistencyCheck_WrongKeyValueStoreType")
.detail("ServerID", storageServers[i].id())
.detail("StoreType", keyValueStoreType.get().toString())