Shard based move (#6981)

* Shard based move.

* Clean up.

* Clear results on retry in getInitialDataDistribution.

* Remove assertion on SHARD_ENCODE_LOCATION_METADATA for compatibility.

* Resolved comments.

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2022-07-07 20:49:16 -07:00 committed by GitHub
parent e63cb431e1
commit bc5bfaffda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 2054 additions and 213 deletions

View File

@ -111,6 +111,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT, 20 );
init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed
init( CHANGE_FEED_EMPTY_BATCH_TIME, 0.005 );
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -7900,8 +7900,9 @@ Future<Standalone<VectorRef<BlobGranuleChunkRef>>> Transaction::readBlobGranules
return readBlobGranulesActor(this, range, begin, readVersion, readVersionOut);
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
state ReadYourWritesTransaction tr(cx);
state Version version = invalidVersion;
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -7911,12 +7912,13 @@ ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware
tr.set(perpetualStorageWiggleKey, enable ? "1"_sr : "0"_sr);
wait(tr.commit());
version = tr.getCommittedVersion();
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
return version;
}
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,

View File

@ -584,6 +584,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REMOVE_RETRY_DELAY, 1.0 );
init( MOVE_KEYS_KRM_LIMIT, 2000 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT = 2;
init( MOVE_KEYS_KRM_LIMIT_BYTES, 1e5 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT_BYTES = 5e4; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map
init( MOVE_SHARD_KRM_ROW_LIMIT, 20000 );
init( MOVE_SHARD_KRM_BYTE_LIMIT, 1e6 );
init( MAX_SKIP_TAGS, 1 ); //The TLogs require tags to be densely packed to be memory efficient, so be careful increasing this knob
init( MAX_ADDED_SOURCES_MULTIPLIER, 2.0 );

View File

@ -27,6 +27,9 @@
#include "flow/serialize.h"
#include "flow/UnitTest.h"
FDB_DEFINE_BOOLEAN_PARAM(AssignEmptyRange);
FDB_DEFINE_BOOLEAN_PARAM(UnassignShard);
const KeyRef systemKeysPrefix = LiteralStringRef("\xff");
const KeyRangeRef normalKeys(KeyRef(), systemKeysPrefix);
const KeyRangeRef systemKeys(systemKeysPrefix, LiteralStringRef("\xff\xff"));
@ -43,6 +46,10 @@ const KeyRangeRef keyServersKeyServersKeys(LiteralStringRef("\xff/keyServers/\xf
LiteralStringRef("\xff/keyServers/\xff/keyServers0"));
const KeyRef keyServersKeyServersKey = keyServersKeyServersKeys.begin;
// These constants are selected to be easily recognized during debugging.
const UID anonymousShardId = UID(0x666666, 0x88888888);
const uint64_t emptyShardId = 0x7777777;
const Key keyServersKey(const KeyRef& k) {
return k.withPrefix(keyServersPrefix);
}
@ -87,6 +94,21 @@ const Value keyServersValue(RangeResult result, const std::vector<UID>& src, con
return keyServersValue(srcTag, destTag);
}
const Value keyServersValue(const std::vector<UID>& src,
const std::vector<UID>& dest,
const UID& srcID,
const UID& destID) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withShardEncodeLocationMetaData()));
if (dest.empty()) {
ASSERT(!destID.isValid());
wr << src << dest << srcID;
} else {
wr << src << dest << srcID << destID;
}
return wr.toValue();
}
const Value keyServersValue(const std::vector<Tag>& srcTag, const std::vector<Tag>& destTag) {
// src and dest are expected to be sorted
BinaryWriter wr(IncludeVersion(ProtocolVersion::withKeyServerValueV2()));
@ -106,6 +128,11 @@ void decodeKeyServersValue(RangeResult result,
}
BinaryReader rd(value, IncludeVersion());
if (rd.protocolVersion().hasShardEncodeLocationMetaData()) {
UID srcId, destId;
decodeKeyServersValue(result, value, src, dest, srcId, destId);
return;
}
if (!rd.protocolVersion().hasKeyServerValueV2()) {
rd >> src >> dest;
return;
@ -145,6 +172,42 @@ void decodeKeyServersValue(RangeResult result,
}
}
void decodeKeyServersValue(RangeResult result,
const ValueRef& value,
std::vector<UID>& src,
std::vector<UID>& dest,
UID& srcID,
UID& destID,
bool missingIsError) {
src.clear();
dest.clear();
srcID = UID();
destID = UID();
if (value.size() == 0) {
return;
}
BinaryReader rd(value, IncludeVersion());
if (rd.protocolVersion().hasShardEncodeLocationMetaData()) {
rd >> src >> dest >> srcID;
if (rd.empty()) {
ASSERT(dest.empty());
} else {
rd >> destID;
rd.assertEnd();
}
} else {
decodeKeyServersValue(result, value, src, dest, missingIsError);
if (!src.empty()) {
srcID = anonymousShardId;
}
if (!dest.empty()) {
destID = anonymousShardId;
}
}
}
void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
const ValueRef& value,
std::vector<UID>& src,
@ -167,6 +230,16 @@ void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
if (value.size() !=
sizeof(ProtocolVersion) + sizeof(int) + srcLen * sizeof(Tag) + sizeof(int) + destLen * sizeof(Tag)) {
rd >> src >> dest;
if (rd.protocolVersion().hasShardEncodeLocationMetaData()) {
UID srcId, destId;
rd >> srcId;
if (rd.empty()) {
ASSERT(dest.empty());
destId = UID();
} else {
rd >> destId;
}
}
rd.assertEnd();
return;
}
@ -242,6 +315,33 @@ CheckpointMetaData decodeCheckpointValue(const ValueRef& value) {
return checkpoint;
}
// "\xff/dataMoves/[[UID]] := [[DataMoveMetaData]]"
const KeyRangeRef dataMoveKeys("\xff/dataMoves/"_sr, "\xff/dataMoves0"_sr);
const Key dataMoveKeyFor(UID dataMoveId) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(dataMoveKeys.begin);
wr << dataMoveId;
return wr.toValue();
}
const Value dataMoveValue(const DataMoveMetaData& dataMoveMetaData) {
return ObjectWriter::toValue(dataMoveMetaData, IncludeVersion());
}
UID decodeDataMoveKey(const KeyRef& key) {
UID id;
BinaryReader rd(key.removePrefix(dataMoveKeys.begin), Unversioned());
rd >> id;
return id;
}
DataMoveMetaData decodeDataMoveValue(const ValueRef& value) {
DataMoveMetaData dataMove;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(dataMove);
return dataMove;
}
// "\xff/cacheServer/[[UID]] := StorageServerInterface"
const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"), LiteralStringRef("\xff/cacheServer0"));
const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin;
@ -308,6 +408,20 @@ const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTr
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
serverKeysFalse;
const UID newShardId(const uint64_t physicalShardId, AssignEmptyRange assignEmptyRange, UnassignShard unassignShard) {
uint64_t split = 0;
if (assignEmptyRange) {
split = emptyShardId;
} else if (unassignShard) {
split = 0;
} else {
do {
split = deterministicRandom()->randomUInt64();
} while (split == anonymousShardId.second() || split == 0 || split == emptyShardId);
}
return UID(physicalShardId, split);
}
const Key serverKeysKey(UID serverID, const KeyRef& key) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(serverKeysPrefix);
@ -342,7 +456,43 @@ std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key) {
}
bool serverHasKey(ValueRef storedValue) {
return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
UID teamId;
bool assigned, emptyRange;
decodeServerKeysValue(storedValue, assigned, emptyRange, teamId);
return assigned;
}
const Value serverKeysValue(const UID& id) {
if (!id.isValid()) {
return serverKeysFalse;
}
BinaryWriter wr(IncludeVersion(ProtocolVersion::withShardEncodeLocationMetaData()));
wr << id;
return wr.toValue();
}
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) {
if (value.size() == 0) {
id = UID();
assigned = false;
emptyRange = false;
} else if (value == serverKeysTrue) {
assigned = true;
emptyRange = false;
} else if (value == serverKeysTrueEmptyRange) {
assigned = true;
emptyRange = true;
} else if (value == serverKeysFalse) {
assigned = false;
emptyRange = false;
} else {
BinaryReader rd(value, IncludeVersion());
ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData());
rd >> id;
assigned = id.second() != 0;
emptyRange = id.second() == emptyShardId;
}
}
const KeyRef cacheKeysPrefix = LiteralStringRef("\xff\x02/cacheKeys/");
@ -1466,3 +1616,91 @@ TEST_CASE("/SystemData/SerDes/SSI") {
return Void();
}
// Tests compatibility of different keyServersValue() and decodeKeyServersValue().
TEST_CASE("noSim/SystemData/compat/KeyServers") {
printf("testing keyServers serdes\n");
std::vector<UID> src, dest;
std::map<Tag, UID> tag_uid;
std::map<UID, Tag> uid_tag;
std::vector<Tag> srcTag, destTag;
const int n = 3;
int8_t locality = 1;
uint16_t id = 1;
UID srcId = deterministicRandom()->randomUniqueID(), destId = deterministicRandom()->randomUniqueID();
for (int i = 0; i < n; ++i) {
src.push_back(deterministicRandom()->randomUniqueID());
tag_uid.emplace(Tag(locality, id), src.back());
uid_tag.emplace(src.back(), Tag(locality, id++));
dest.push_back(deterministicRandom()->randomUniqueID());
tag_uid.emplace(Tag(locality, id), dest.back());
uid_tag.emplace(dest.back(), Tag(locality, id++));
}
std::sort(src.begin(), src.end());
std::sort(dest.begin(), dest.end());
RangeResult idTag;
for (int i = 0; i < src.size(); ++i) {
idTag.push_back_deep(idTag.arena(), KeyValueRef(serverTagKeyFor(src[i]), serverTagValue(uid_tag[src[i]])));
}
for (int i = 0; i < dest.size(); ++i) {
idTag.push_back_deep(idTag.arena(), KeyValueRef(serverTagKeyFor(dest[i]), serverTagValue(uid_tag[dest[i]])));
}
auto decodeAndVerify =
[&src, &dest, &tag_uid, &idTag](ValueRef v, const UID expectedSrcId, const UID expectedDestId) {
std::vector<UID> resSrc, resDest;
UID resSrcId, resDestId;
decodeKeyServersValue(idTag, v, resSrc, resDest, resSrcId, resDestId);
TraceEvent("VerifyKeyServersSerDes")
.detail("ExpectedSrc", describe(src))
.detail("ActualSrc", describe(resSrc))
.detail("ExpectedDest", describe(dest))
.detail("ActualDest", describe(resDest))
.detail("ExpectedDestID", expectedDestId)
.detail("ActualDestID", resDestId)
.detail("ExpectedSrcID", expectedSrcId)
.detail("ActualSrcID", resSrcId);
ASSERT(std::equal(src.begin(), src.end(), resSrc.begin()));
ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin()));
ASSERT(resSrcId == expectedSrcId);
ASSERT(resDestId == expectedDestId);
resSrc.clear();
resDest.clear();
decodeKeyServersValue(idTag, v, resSrc, resDest);
ASSERT(std::equal(src.begin(), src.end(), resSrc.begin()));
ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin()));
resSrc.clear();
resDest.clear();
decodeKeyServersValue(tag_uid, v, resSrc, resDest);
ASSERT(std::equal(src.begin(), src.end(), resSrc.begin()));
ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin()));
};
Value v = keyServersValue(src, dest, srcId, destId);
decodeAndVerify(v, srcId, destId);
printf("ssi serdes test part.1 complete\n");
v = keyServersValue(idTag, src, dest);
decodeAndVerify(v, anonymousShardId, anonymousShardId);
printf("ssi serdes test part.2 complete\n");
dest.clear();
destId = UID();
v = keyServersValue(src, dest, srcId, destId);
decodeAndVerify(v, srcId, destId);
printf("ssi serdes test part.3 complete\n");
v = keyServersValue(idTag, src, dest);
decodeAndVerify(v, anonymousShardId, UID());
printf("ssi serdes test complete\n");
return Void();
}

View File

@ -110,6 +110,7 @@ public:
int RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT;
bool QUARANTINE_TSS_ON_MISMATCH;
double CHANGE_FEED_EMPTY_BATCH_TIME;
bool SHARD_ENCODE_LOCATION_METADATA;
// KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -537,7 +537,8 @@ inline uint64_t getWriteOperationCost(uint64_t bytes) {
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
// will be 1. Otherwise, the value will be 0.
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False);
// Returns the FDB version at which the transaction was committed.
ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False);
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
bool primary,

View File

@ -520,6 +520,8 @@ public:
int MOVE_KEYS_KRM_LIMIT_BYTES; // This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT
// (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an
// attempt to read a key range map
int MOVE_SHARD_KRM_ROW_LIMIT;
int MOVE_SHARD_KRM_BYTE_LIMIT;
int MAX_SKIP_TAGS;
double MAX_ADDED_SOURCES_MULTIPLIER;

View File

@ -86,4 +86,45 @@ struct CheckpointMetaData {
}
};
#endif
// A DataMoveMetaData object corresponds to a single data move.
struct DataMoveMetaData {
enum Phase {
InvalidPhase = 0,
Prepare = 1, // System keyspace is being modified.
Running = 2, // System keyspace has been modified, data move in action.
Completing = 3, // Data transfer has finished, finalizing system keyspace.
Deleting = 4, // Data move is cancelled.
};
constexpr static FileIdentifier file_identifier = 13804362;
UID id; // A unique id for this data move.
Version version;
KeyRange range;
int priority;
std::set<UID> src;
std::set<UID> dest;
int16_t phase; // DataMoveMetaData::Phase.
DataMoveMetaData() = default;
DataMoveMetaData(UID id, Version version, KeyRange range)
: id(id), version(version), range(std::move(range)), priority(0) {}
DataMoveMetaData(UID id, KeyRange range) : id(id), version(invalidVersion), range(std::move(range)), priority(0) {}
Phase getPhase() const { return static_cast<Phase>(phase); }
void setPhase(Phase phase) { this->phase = static_cast<int16_t>(phase); }
std::string toString() const {
std::string res = "DataMoveMetaData: [ID]: " + id.shortString() + " [Range]: " + range.toString() +
" [Phase]: " + std::to_string(static_cast<int>(phase)) +
" [Source Servers]: " + describe(src) + " [Destination Servers]: " + describe(dest);
return res;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, version, range, phase, src, dest);
}
};
#endif

View File

@ -33,6 +33,9 @@
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-variable"
FDB_DECLARE_BOOLEAN_PARAM(AssignEmptyRange);
FDB_DECLARE_BOOLEAN_PARAM(UnassignShard);
struct RestoreLoaderInterface;
struct RestoreApplierInterface;
struct RestoreMasterInterface;
@ -49,14 +52,27 @@ extern const KeyRef afterAllKeys;
// An internal mapping of where shards are located in the database. [[begin]] is the start of the shard range
// and the result is a list of serverIDs or Tags where these shards are located. These values can be changed
// as data movement occurs.
// With ShardEncodeLocationMetaData, the encoding format is:
// "\xff/keyServers/[[begin]]" := "[[std::vector<serverID>, std::vector<serverID>], srcID, destID]", where srcID
// and destID are the source and destination `shard id`, respectively.
extern const KeyRangeRef keyServersKeys, keyServersKeyServersKeys;
extern const KeyRef keyServersPrefix, keyServersEnd, keyServersKeyServersKey;
// Used during the transition to the new location metadata format with shard IDs.
// If `SHARD_ENCODE_LOCATION_METADATA` is enabled, any shard that doesn't have a shard ID will be assigned this
// temporary ID, until a permanent ID is assigned to it.
extern const UID anonymousShardId;
extern const uint64_t assignedEmptyShardId;
const Key keyServersKey(const KeyRef& k);
const KeyRef keyServersKey(const KeyRef& k, Arena& arena);
const Value keyServersValue(RangeResult result,
const std::vector<UID>& src,
const std::vector<UID>& dest = std::vector<UID>());
const Value keyServersValue(const std::vector<Tag>& srcTag, const std::vector<Tag>& destTag = std::vector<Tag>());
const Value keyServersValue(const std::vector<UID>& src,
const std::vector<UID>& dest,
const UID& srcID,
const UID& destID);
// `result` must be the full result of getting serverTagKeys
void decodeKeyServersValue(RangeResult result,
const ValueRef& value,
@ -67,6 +83,13 @@ void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
const ValueRef& value,
std::vector<UID>& src,
std::vector<UID>& dest);
void decodeKeyServersValue(RangeResult result,
const ValueRef& value,
std::vector<UID>& src,
std::vector<UID>& dest,
UID& srcID,
UID& destID,
bool missingIsError = true);
extern const KeyRef clusterIdKey;
@ -77,6 +100,13 @@ const Value checkpointValue(const CheckpointMetaData& checkpoint);
UID decodeCheckpointKey(const KeyRef& key);
CheckpointMetaData decodeCheckpointValue(const ValueRef& value);
// "\xff/dataMoves/[[UID]] := [[DataMoveMetaData]]"
extern const KeyRangeRef dataMoveKeys;
const Key dataMoveKeyFor(UID dataMoveId);
const Value dataMoveValue(const DataMoveMetaData& dataMove);
UID decodeDataMoveKey(const KeyRef& key);
DataMoveMetaData decodeDataMoveValue(const ValueRef& value);
// "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
// This will be added by the cache server on initialization and removed by DD
// TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future
@ -102,11 +132,16 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
extern const KeyRangeRef serverKeysRange;
extern const KeyRef serverKeysPrefix;
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
const UID newShardId(const uint64_t physicalShardId,
AssignEmptyRange assignEmptyRange,
UnassignShard unassignShard = UnassignShard::False);
const Key serverKeysKey(UID serverID, const KeyRef& keys);
const Key serverKeysPrefixFor(UID serverID);
UID serverKeysDecodeServer(const KeyRef& key);
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key);
bool serverHasKey(ValueRef storedValue);
const Value serverKeysValue(const UID& id);
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id);
extern const KeyRangeRef conflictingKeysRange;
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;

View File

@ -46,6 +46,7 @@ FDB_DEFINE_BOOLEAN_PARAM(PreferLowerDiskUtil);
FDB_DEFINE_BOOLEAN_PARAM(TeamMustHaveShards);
FDB_DEFINE_BOOLEAN_PARAM(ForReadBalance);
FDB_DEFINE_BOOLEAN_PARAM(PreferLowerReadUtil);
FDB_DEFINE_BOOLEAN_PARAM(FindTeamByServers);
class DDTeamCollectionImpl {
ACTOR static Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) {
@ -154,6 +155,19 @@ public:
return Void();
}
// Find the team with the exact storage servers as req.src.
static void getTeamByServers(DDTeamCollection* self, GetTeamRequest req) {
const std::string servers = TCTeamInfo::serversToString(req.src);
Optional<Reference<IDataDistributionTeam>> res;
for (const auto& team : self->teams) {
if (team->getServerIDsStr() == servers) {
res = team;
break;
}
}
req.reply.send(std::make_pair(res, false));
}
// SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case)
// use keys, src, dest, metrics, priority, system load, etc.. to decide...
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
@ -721,19 +735,19 @@ public:
bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() ||
(lastZeroHealthy && !self->zeroHealthyTeams->get()) || containsFailed);
//TraceEvent("TeamHealthChangeDetected", self->distributorId)
// .detail("Team", team->getDesc())
// .detail("ServersLeft", serversLeft)
// .detail("LastServersLeft", lastServersLeft)
// .detail("AnyUndesired", anyUndesired)
// .detail("LastAnyUndesired", lastAnyUndesired)
// .detail("AnyWrongConfiguration", anyWrongConfiguration)
// .detail("LastWrongConfiguration", lastWrongConfiguration)
// .detail("ContainsWigglingServer", anyWigglingServer)
// .detail("Recheck", recheck)
// .detail("BadTeam", badTeam)
// .detail("LastZeroHealthy", lastZeroHealthy)
// .detail("ZeroHealthyTeam", self->zeroHealthyTeams->get());
TraceEvent(SevVerbose, "TeamHealthChangeDetected", self->distributorId)
.detail("Team", team->getDesc())
.detail("ServersLeft", serversLeft)
.detail("LastServersLeft", lastServersLeft)
.detail("AnyUndesired", anyUndesired)
.detail("LastAnyUndesired", lastAnyUndesired)
.detail("AnyWrongConfiguration", anyWrongConfiguration)
.detail("LastWrongConfiguration", lastWrongConfiguration)
.detail("ContainsWigglingServer", anyWigglingServer)
.detail("Recheck", recheck)
.detail("BadTeam", badTeam)
.detail("LastZeroHealthy", lastZeroHealthy)
.detail("ZeroHealthyTeam", self->zeroHealthyTeams->get());
lastReady = self->initialFailureReactionDelay.isReady();
lastZeroHealthy = self->zeroHealthyTeams->get();
@ -867,6 +881,11 @@ public:
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor(
ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary));
TraceEvent(SevVerbose, "ServerTeamRelocatingShards", self->distributorId)
.detail("Info", team->getDesc())
.detail("TeamID", team->getTeamID())
.detail("Shards", shards.size());
for (int i = 0; i < shards.size(); i++) {
// Make it high priority to move keys off failed server or else RelocateShards may never be
// addressed
@ -960,6 +979,7 @@ public:
} catch (Error& e) {
if (logTeamEvents) {
TraceEvent("TeamTrackerStopping", self->distributorId)
.errorUnsuppressed(e)
.detail("ServerPrimary", self->primary)
.detail("Team", team->getDesc())
.detail("Priority", team->getPriority());
@ -1608,10 +1628,10 @@ public:
// could cause us to not store the mutations sent to the short lived storage server.
if (ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
bool canRemove = wait(canRemoveStorageServer(tr, serverID));
// TraceEvent("WaitForAllDataRemoved")
// .detail("Server", serverID)
// .detail("CanRemove", canRemove)
// .detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID));
TraceEvent(SevVerbose, "WaitForAllDataRemoved")
.detail("Server", serverID)
.detail("CanRemove", canRemove)
.detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID));
ASSERT_GE(teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID), 0);
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) {
return Void();
@ -2702,7 +2722,11 @@ public:
ACTOR static Future<Void> serverGetTeamRequests(DDTeamCollection* self, TeamCollectionInterface tci) {
loop {
GetTeamRequest req = waitNext(tci.getTeam.getFuture());
self->addActor.send(self->getTeam(req));
if (req.findTeamByServers) {
getTeamByServers(self, req);
} else {
self->addActor.send(self->getTeam(req));
}
}
}
@ -3134,12 +3158,19 @@ public:
.detail("Primary", self->isPrimary());
for (i = 0; i < teams.size(); i++) {
const auto& team = teams[i];
TraceEvent("ServerTeamInfo", self->getDistributorId())
.detail("TeamIndex", i)
.detail("Healthy", team->isHealthy())
.detail("TeamSize", team->size())
.detail("MemberIDs", team->getServerIDsStr())
.detail("Primary", self->isPrimary());
.detail("Primary", self->isPrimary())
.detail("TeamID", team->getTeamID())
.detail(
"Shards",
self->shardsAffectedByTeamFailure
->getShardsFor(ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary))
.size());
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}

View File

@ -52,6 +52,57 @@
#include "flow/actorcompiler.h" // This must be the last #include.
void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) {
if (!valid) {
if (shard.hasDest && shard.destId != anonymousShardId) {
TraceEvent(SevError, "DataMoveValidationError")
.detail("Range", range)
.detail("Reason", "DataMoveMissing")
.detail("ShardPrimaryDest", describe(shard.primaryDest))
.detail("ShardRemoteDest", describe(shard.remoteDest));
}
return;
}
ASSERT(this->meta.range.contains(range));
if (!shard.hasDest) {
TraceEvent(SevError, "DataMoveValidationError")
.detail("Range", range)
.detail("Reason", "ShardMissingDest")
.detail("DataMoveMetaData", this->meta.toString())
.detail("DataMovePrimaryDest", describe(this->primaryDest))
.detail("DataMoveRemoteDest", describe(this->remoteDest));
cancelled = true;
return;
}
if (shard.destId != this->meta.id) {
TraceEvent(SevError, "DataMoveValidationError")
.detail("Range", range)
.detail("Reason", "DataMoveIDMissMatch")
.detail("DataMoveMetaData", this->meta.toString())
.detail("ShardMoveID", shard.destId);
cancelled = true;
return;
}
if (!std::includes(
this->primaryDest.begin(), this->primaryDest.end(), shard.primaryDest.begin(), shard.primaryDest.end()) ||
!std::includes(
this->remoteDest.begin(), this->remoteDest.end(), shard.remoteDest.begin(), shard.remoteDest.end())) {
TraceEvent(SevError, "DataMoveValidationError")
.detail("Range", range)
.detail("Reason", "DataMoveDestMissMatch")
.detail("DataMoveMetaData", this->meta.toString())
.detail("DataMovePrimaryDest", describe(this->primaryDest))
.detail("DataMoveRemoteDest", describe(this->remoteDest))
.detail("ShardPrimaryDest", describe(shard.primaryDest))
.detail("ShardRemoteDest", describe(shard.remoteDest));
cancelled = true;
}
}
// Read keyservers, return unique set of teams
ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Database cx,
UID distributorId,
@ -68,14 +119,18 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
state std::map<UID, Optional<Key>> server_dc;
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
state int numDataMoves = 0;
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
// causing entries to be duplicated
loop {
numDataMoves = 0;
server_dc.clear();
result->allServers.clear();
tss_servers.clear();
team_cache.clear();
succeeded = false;
try {
// Read healthyZone value which is later used to determine on/off of failure triggered DD
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
@ -113,8 +168,6 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
for (int i = 0; i < workers.get().size(); i++)
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
succeeded = true;
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
if (!ssi.isTss()) {
@ -125,6 +178,42 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
}
}
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY);
for (int i = 0; i < dms.size(); ++i) {
auto dataMove = std::make_shared<DataMove>(decodeDataMoveValue(dms[i].value), true);
const DataMoveMetaData& meta = dataMove->meta;
for (const UID& id : meta.src) {
auto& dc = server_dc[id];
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
dataMove->remoteSrc.push_back(id);
} else {
dataMove->primarySrc.push_back(id);
}
}
for (const UID& id : meta.dest) {
auto& dc = server_dc[id];
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
dataMove->remoteDest.push_back(id);
} else {
dataMove->primaryDest.push_back(id);
}
}
std::sort(dataMove->primarySrc.begin(), dataMove->primarySrc.end());
std::sort(dataMove->remoteSrc.begin(), dataMove->remoteSrc.end());
std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end());
std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end());
auto ranges = result->dataMoveMap.intersectingRanges(meta.range);
for (auto& r : ranges) {
ASSERT(!r.value()->valid);
}
result->dataMoveMap.insert(meta.range, std::move(dataMove));
++numDataMoves;
}
succeeded = true;
break;
} catch (Error& e) {
wait(tr.onError(e));
@ -153,11 +242,12 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
succeeded = true;
std::vector<UID> src, dest, last;
UID srcId, destId;
// for each range
for (int i = 0; i < keyServers.size() - 1; i++) {
DDShardInfo info(keyServers[i].key);
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId);
DDShardInfo info(keyServers[i].key, srcId, destId);
if (remoteDcIds.size()) {
auto srcIter = team_cache.find(src);
if (srcIter == team_cache.end()) {
@ -233,6 +323,14 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
result->shards.push_back(DDShardInfo(allKeys.end));
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
const DDShardInfo& iShard = result->shards[shard];
KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key);
result->dataMoveMap[keys.begin]->validateShard(iShard, keys);
}
}
// add tss to server list AFTER teams are built
for (auto& it : tss_servers) {
result->allServers.push_back(it);
@ -438,6 +536,8 @@ static std::set<int> const& normalDDQueueErrors() {
if (s.empty()) {
s.insert(error_code_movekeys_conflict);
s.insert(error_code_broken_promise);
s.insert(error_code_data_move_cancelled);
s.insert(error_code_data_move_dest_team_not_found);
}
return s;
}
@ -644,39 +744,79 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state int shard = 0;
for (; shard < initData->shards.size() - 1; shard++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard + 1].key);
const DDShardInfo& iShard = initData->shards[shard];
KeyRangeRef keys = KeyRangeRef(iShard.key, initData->shards[shard + 1].key);
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true));
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.primarySrc, true));
if (configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
}
if (g_network->isSimulated()) {
TraceEvent("DDInitShard")
.detail("Keys", keys)
.detail("PrimarySrc", describe(initData->shards[shard].primarySrc))
.detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest))
.detail("RemoteDest", describe(initData->shards[shard].remoteDest));
.detail("PrimarySrc", describe(iShard.primarySrc))
.detail("RemoteSrc", describe(iShard.remoteSrc))
.detail("PrimaryDest", describe(iShard.primaryDest))
.detail("RemoteDest", describe(iShard.remoteDest))
.detail("SrcID", iShard.srcId)
.detail("DestID", iShard.destId);
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if (initData->shards[shard].hasDest) {
if (iShard.hasDest && iShard.destId == anonymousShardId) {
// This shard is already in flight. Ideally we should use dest in ShardsAffectedByTeamFailure and
// generate a dataDistributionRelocator directly in DataDistributionQueue to track it, but it's
// easier to just (with low priority) schedule it for movement.
bool unhealthy = initData->shards[shard].primarySrc.size() != configuration.storageTeamSize;
bool unhealthy = iShard.primarySrc.size() != configuration.storageTeamSize;
if (!unhealthy && configuration.usableRegions > 1) {
unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize;
unhealthy = iShard.remoteSrc.size() != configuration.storageTeamSize;
}
output.send(RelocateShard(keys,
unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY
: SERVER_KNOBS->PRIORITY_RECOVER_MOVE,
RelocateReason::OTHER));
}
wait(yield(TaskPriority::DataDistribution));
}
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = initData->dataMoveMap.ranges().begin();
for (; it != initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER);
rs.dataMoveId = meta.id;
rs.cancelled = true;
output.send(rs);
TraceEvent("DDInitScheduledCancelDataMove", self->ddId).detail("DataMove", meta.toString());
} else if (it.value()->valid) {
TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString());
ASSERT(meta.range == it.range());
// TODO: Persist priority in DataMoveMetaData.
RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER);
rs.dataMoveId = meta.id;
rs.dataMove = it.value();
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->primaryDest, true));
if (!rs.dataMove->remoteDest.empty()) {
teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->remoteDest, false));
}
// Since a DataMove could cover more than one keyrange, e.g., during merge, we need to define
// the target shard and restart the shard tracker.
shardsAffectedByTeamFailure->restartShardTracker.send(rs.keys);
shardsAffectedByTeamFailure->defineShard(rs.keys);
// When restoring a DataMove, the destination team is determined, and hence we need to register
// the data move now, so that team failures can be captured.
shardsAffectedByTeamFailure->moveShard(rs.keys, teams);
output.send(rs);
wait(yield(TaskPriority::DataDistribution));
}
}
std::vector<TeamCollectionInterface> tcis;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
@ -859,6 +999,8 @@ static std::set<int> const& normalDataDistributorErrors() {
s.insert(error_code_actor_cancelled);
s.insert(error_code_please_reboot);
s.insert(error_code_movekeys_conflict);
s.insert(error_code_data_move_cancelled);
s.insert(error_code_data_move_dest_team_not_found);
}
return s;
}

View File

@ -64,6 +64,7 @@ struct RelocateData {
double startTime;
UID randomId;
UID dataMoveId;
int workFactor;
std::vector<UID> src;
std::vector<UID> completeSources;
@ -71,18 +72,24 @@ struct RelocateData {
bool wantsNewServers;
bool cancellable;
TraceInterval interval;
std::shared_ptr<DataMove> dataMove;
RelocateData()
: priority(-1), boundaryPriority(-1), healthPriority(-1), reason(RelocateReason::INVALID), startTime(-1),
workFactor(0), wantsNewServers(false), cancellable(false), interval("QueuedRelocation") {}
dataMoveId(anonymousShardId), workFactor(0), wantsNewServers(false), cancellable(false),
interval("QueuedRelocation") {}
explicit RelocateData(RelocateShard const& rs)
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), reason(rs.reason), startTime(now()),
randomId(deterministicRandom()->randomUniqueID()), workFactor(0),
randomId(deterministicRandom()->randomUniqueID()), dataMoveId(rs.dataMoveId), workFactor(0),
wantsNewServers(isMountainChopperPriority(rs.priority) || isValleyFillerPriority(rs.priority) ||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT),
cancellable(true), interval("QueuedRelocation") {}
cancellable(true), interval("QueuedRelocation"), dataMove(rs.dataMove) {
if (dataMove != nullptr) {
this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end());
}
}
static bool isHealthPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_POPULATE_REGION ||
@ -97,6 +104,8 @@ struct RelocateData {
return priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD || priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD;
}
bool isRestore() const { return this->dataMove != nullptr; }
bool operator>(const RelocateData& rhs) const {
return priority != rhs.priority
? priority > rhs.priority
@ -436,13 +445,26 @@ void complete(RelocateData const& relocation, std::map<UID, Busyness>& busymap,
completeDest(relocation, destBusymap);
}
// Cancells in-flight data moves intersecting with range.
ACTOR Future<Void> cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState);
ACTOR Future<Void> dataDistributionRelocator(struct DDQueueData* self,
RelocateData rd,
Future<Void> prevCleanup,
const DDEnabledState* ddEnabledState);
struct DDQueueData {
ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it.
struct DDDataMove {
DDDataMove() = default;
explicit DDDataMove(UID id) : id(id) {}
bool isValid() const { return id.isValid(); }
UID id;
Future<Void> cancel;
};
ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it.
UID distributorId;
MoveKeysLock lock;
Database cx;
@ -454,6 +476,7 @@ struct DDQueueData {
FlowLock startMoveKeysParallelismLock;
FlowLock finishMoveKeysParallelismLock;
FlowLock cleanUpDataMoveParallelismLock;
Reference<FlowLock> fetchSourceLock;
int activeRelocations;
@ -478,6 +501,7 @@ struct DDQueueData {
KeyRangeMap<RelocateData> inFlight;
// Track all actors that relocates specified keys to a good place; Key: keyRange; Value: actor
KeyRangeActorMap inFlightActors;
KeyRangeMap<DDDataMove> dataMoves;
Promise<Void> error;
PromiseStream<RelocateData> dataTransferComplete;
@ -556,6 +580,7 @@ struct DDQueueData {
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
cleanUpDataMoveParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
queuedRelocations(0), bytesWritten(0), teamSize(teamSize), singleRegionTeamSize(singleRegionTeamSize),
output(output), input(input), getShardMetrics(getShardMetrics), getTopKMetrics(getTopKMetrics), lastInterval(0),
@ -784,6 +809,10 @@ struct DDQueueData {
/*TraceEvent(rrs.interval.end(), mi.id()).detail("Result","Cancelled")
.detail("WasFetching", foundActiveFetching).detail("Contained", rd.keys.contains( rrs.keys ));*/
queuedRelocations--;
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
.detail("DataMoveID", rrs.dataMoveId)
.detail("RandomID", rrs.randomId)
.detail("Total", queuedRelocations);
finishRelocation(rrs.priority, rrs.healthPriority);
}
}
@ -812,6 +841,10 @@ struct DDQueueData {
.detail("KeyBegin", rrs.keys.begin).detail("KeyEnd", rrs.keys.end)
.detail("Priority", rrs.priority).detail("WantsNewServers", rrs.wantsNewServers);*/
queuedRelocations++;
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
.detail("DataMoveID", rrs.dataMoveId)
.detail("RandomID", rrs.randomId)
.detail("Total", queuedRelocations);
startRelocation(rrs.priority, rrs.healthPriority);
fetchingSourcesQueue.insert(rrs);
@ -834,6 +867,10 @@ struct DDQueueData {
.detail("Priority", newData.priority).detail("WantsNewServers",
newData.wantsNewServers);*/
queuedRelocations++;
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
.detail("DataMoveID", newData.dataMoveId)
.detail("RandomID", newData.randomId)
.detail("Total", queuedRelocations);
startRelocation(newData.priority, newData.healthPriority);
foundActiveRelocation = true;
}
@ -945,6 +982,7 @@ struct DDQueueData {
}
if (overlappingInFlight) {
ASSERT(!rd.isRestore());
// logRelocation( rd, "SkippingOverlappingInFlight" );
continue;
}
@ -963,7 +1001,7 @@ struct DDQueueData {
// SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the
// queue
// FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY
if (!canLaunchSrc(rd, teamSize, singleRegionTeamSize, busymap, cancellableRelocations)) {
if (!rd.isRestore() && !canLaunchSrc(rd, teamSize, singleRegionTeamSize, busymap, cancellableRelocations)) {
// logRelocation( rd, "SkippingQueuedRelocation" );
continue;
}
@ -974,14 +1012,23 @@ struct DDQueueData {
// logRelocation( rd, "LaunchingRelocation" );
//TraceEvent(rd.interval.end(), distributorId).detail("Result","Success");
queuedRelocations--;
finishRelocation(rd.priority, rd.healthPriority);
if (!rd.isRestore()) {
queuedRelocations--;
TraceEvent(SevVerbose, "QueuedRelocationsChanged")
.detail("DataMoveID", rd.dataMoveId)
.detail("RandomID", rd.randomId)
.detail("Total", queuedRelocations);
finishRelocation(rd.priority, rd.healthPriority);
// now we are launching: remove this entry from the queue of all the src servers
for (int i = 0; i < rd.src.size(); i++) {
ASSERT(queue[rd.src[i]].erase(rd));
// now we are launching: remove this entry from the queue of all the src servers
for (int i = 0; i < rd.src.size(); i++) {
ASSERT(queue[rd.src[i]].erase(rd));
}
}
Future<Void> fCleanup =
CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA ? cancelDataMove(this, rd.keys, ddEnabledState) : Void();
// If there is a job in flight that wants data relocation which we are about to cancel/modify,
// make sure that we keep the relocation intent for the job that we launch
auto f = inFlight.intersectingRanges(rd.keys);
@ -1000,12 +1047,29 @@ struct DDQueueData {
for (int r = 0; r < ranges.size(); r++) {
RelocateData& rrs = inFlight.rangeContaining(ranges[r].begin)->value();
rrs.keys = ranges[r];
if (rd.keys == ranges[r] && rd.isRestore()) {
ASSERT(rd.dataMove != nullptr);
ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
rrs.dataMoveId = rd.dataMove->meta.id;
} else {
ASSERT_WE_THINK(!rd.isRestore()); // Restored data move should not overlap.
// TODO(psm): The shard id is determined by DD.
rrs.dataMove.reset();
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
rrs.dataMoveId = deterministicRandom()->randomUniqueID();
} else {
rrs.dataMoveId = anonymousShardId;
}
}
launch(rrs, busymap, singleRegionTeamSize);
activeRelocations++;
TraceEvent(SevVerbose, "InFlightRelocationChange")
.detail("Launch", rrs.dataMoveId)
.detail("Total", activeRelocations);
startRelocation(rrs.priority, rrs.healthPriority);
// Start the actor that relocates data in the rrs.keys
inFlightActors.insert(rrs.keys, dataDistributionRelocator(this, rrs, ddEnabledState));
inFlightActors.insert(rrs.keys, dataDistributionRelocator(this, rrs, fCleanup, ddEnabledState));
}
// logRelocation( rd, "LaunchedRelocation" );
@ -1048,8 +1112,58 @@ struct DDQueueData {
for (auto& id : ids)
lastAsSource[id] = t;
}
// Schedules cancellation of a data move.
void enqueueCancelledDataMove(UID dataMoveId, KeyRange range, const DDEnabledState* ddEnabledState) {
std::vector<Future<Void>> cleanup;
auto f = this->dataMoves.intersectingRanges(range);
for (auto it = f.begin(); it != f.end(); ++it) {
if (it->value().isValid()) {
TraceEvent(SevError, "DDEnqueueCancelledDataMoveConflict", this->distributorId)
.detail("DataMoveID", dataMoveId)
.detail("CancelledRange", range)
.detail("ConflictingDataMoveID", it->value().id)
.detail("ConflictingRange", KeyRangeRef(it->range().begin, it->range().end));
return;
}
}
DDQueueData::DDDataMove dataMove(dataMoveId);
dataMove.cancel = cleanUpDataMove(
this->cx, dataMoveId, this->lock, &this->cleanUpDataMoveParallelismLock, range, ddEnabledState);
this->dataMoves.insert(range, dataMove);
TraceEvent(SevInfo, "DDEnqueuedCancelledDataMove", this->distributorId)
.detail("DataMoveID", dataMoveId)
.detail("Range", range);
}
};
ACTOR Future<Void> cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState) {
std::vector<Future<Void>> cleanup;
auto f = self->dataMoves.intersectingRanges(range);
for (auto it = f.begin(); it != f.end(); ++it) {
if (!it->value().isValid()) {
continue;
}
KeyRange keys = KeyRangeRef(it->range().begin, it->range().end);
TraceEvent(SevInfo, "DDQueueCancelDataMove", self->distributorId)
.detail("DataMoveID", it->value().id)
.detail("DataMoveRange", keys)
.detail("Range", range);
if (!it->value().cancel.isValid()) {
it->value().cancel = cleanUpDataMove(
self->cx, it->value().id, self->lock, &self->cleanUpDataMoveParallelismLock, keys, ddEnabledState);
}
cleanup.push_back(it->value().cancel);
}
wait(waitForAll(cleanup));
auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(range);
if (!ranges.empty()) {
self->dataMoves.insert(KeyRangeRef(ranges.front().begin, ranges.back().end), DDQueueData::DDDataMove());
}
return Void();
}
static std::string destServersString(std::vector<std::pair<Reference<IDataDistributionTeam>, bool>> const& bestTeams) {
std::stringstream ss;
@ -1064,7 +1178,10 @@ static std::string destServersString(std::vector<std::pair<Reference<IDataDistri
// This actor relocates the specified keys to a good place.
// The inFlightActor key range map stores the actor for each RelocateData
ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd, const DDEnabledState* ddEnabledState) {
ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
RelocateData rd,
Future<Void> prevCleanup,
const DDEnabledState* ddEnabledState) {
state Promise<Void> errorOut(self->error);
state TraceInterval relocateShardInterval("RelocateShard");
state PromiseStream<RelocateData> dataTransferComplete(self->dataTransferComplete);
@ -1101,6 +1218,29 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
self->suppressIntervals = 0;
}
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
ASSERT(inFlightRange.range() == rd.keys);
ASSERT(inFlightRange.value().randomId == rd.randomId);
ASSERT(inFlightRange.value().dataMoveId == rd.dataMoveId);
inFlightRange.value().cancellable = false;
wait(prevCleanup);
auto f = self->dataMoves.intersectingRanges(rd.keys);
for (auto it = f.begin(); it != f.end(); ++it) {
KeyRangeRef kr(it->range().begin, it->range().end);
const UID mId = it->value().id;
if (mId.isValid() && mId != rd.dataMoveId) {
TraceEvent("DDRelocatorConflictingDataMove", distributorId)
.detail("CurrentDataMoveID", rd.dataMoveId)
.detail("DataMoveID", mId)
.detail("Range", kr);
}
}
self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove(rd.dataMoveId));
}
state StorageMetrics metrics =
wait(brokenPromiseToNever(self->getShardMetrics.getReply(GetMetricsRequest(rd.keys))));
@ -1112,6 +1252,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
loop {
state int tciIndex = 0;
state bool foundTeams = true;
state bool bestTeamReady = false;
anyHealthy = false;
allHealthy = true;
anyWithSource = false;
@ -1119,58 +1260,83 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
bestTeams.clear();
// Get team from teamCollections in different DCs and find the best one
while (tciIndex < self->teamCollections.size()) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT)
inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT)
inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(WantNewServers(rd.wantsNewServers),
WantTrueBest(isValleyFillerPriority(rd.priority)),
PreferLowerDiskUtil::True,
TeamMustHaveShards::False,
ForReadBalance(rd.reason == RelocateReason::REBALANCE_READ),
PreferLowerReadUtil::True,
inflightPenalty);
req.src = rd.src;
req.completeSources = rd.completeSources;
// bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any
// server that hosts the relocateData. This is possible, for example, in a fearless configuration
// when the remote DC is just brought up.
Future<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> fbestTeam =
brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req));
state bool bestTeamReady = fbestTeam.isReady();
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> bestTeam = wait(fbestTeam);
if (tciIndex > 0 && !bestTeamReady) {
// self->shardsAffectedByTeamFailure->moveShard must be called without any waits after getting
// the destination team or we could miss failure notifications for the storage servers in the
// destination team
TraceEvent("BestTeamNotReady");
foundTeams = false;
break;
}
// If a DC has no healthy team, we stop checking the other DCs until
// the unhealthy DC is healthy again or is excluded.
if (!bestTeam.first.present()) {
foundTeams = false;
break;
}
if (!bestTeam.first.get()->isHealthy()) {
allHealthy = false;
} else {
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && rd.isRestore()) {
auto req = GetTeamRequest(tciIndex == 0 ? rd.dataMove->primaryDest : rd.dataMove->remoteDest);
Future<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> fbestTeam =
brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req));
bestTeamReady = fbestTeam.isReady();
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> bestTeam = wait(fbestTeam);
if (tciIndex > 0 && !bestTeamReady) {
// self->shardsAffectedByTeamFailure->moveShard must be called without any waits after
// getting the destination team or we could miss failure notifications for the storage
// servers in the destination team
TraceEvent("BestTeamNotReady")
.detail("TeamCollectionIndex", tciIndex)
.detail("RestoreDataMoveForDest",
describe(tciIndex == 0 ? rd.dataMove->primaryDest : rd.dataMove->remoteDest));
foundTeams = false;
break;
}
if (!bestTeam.first.present() || !bestTeam.first.get()->isHealthy()) {
foundTeams = false;
break;
}
anyHealthy = true;
}
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
} else {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT)
inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT)
inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
if (bestTeam.second) {
anyWithSource = true;
}
auto req = GetTeamRequest(WantNewServers(rd.wantsNewServers),
WantTrueBest(isValleyFillerPriority(rd.priority)),
PreferLowerDiskUtil::True,
TeamMustHaveShards::False,
ForReadBalance(rd.reason == RelocateReason::REBALANCE_READ),
PreferLowerReadUtil::True,
inflightPenalty);
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
req.src = rd.src;
req.completeSources = rd.completeSources;
// bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any
// server that hosts the relocateData. This is possible, for example, in a fearless
// configuration when the remote DC is just brought up.
Future<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> fbestTeam =
brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req));
bestTeamReady = fbestTeam.isReady();
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> bestTeam = wait(fbestTeam);
if (tciIndex > 0 && !bestTeamReady) {
// self->shardsAffectedByTeamFailure->moveShard must be called without any waits after
// getting the destination team or we could miss failure notifications for the storage
// servers in the destination team
TraceEvent("BestTeamNotReady");
foundTeams = false;
break;
}
// If a DC has no healthy team, we stop checking the other DCs until
// the unhealthy DC is healthy again or is excluded.
if (!bestTeam.first.present()) {
foundTeams = false;
break;
}
if (!bestTeam.first.get()->isHealthy()) {
allHealthy = false;
} else {
anyHealthy = true;
}
if (bestTeam.second) {
anyWithSource = true;
}
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
}
tciIndex++;
}
// once we've found healthy candidate teams, make sure they're not overloaded with outstanding moves
@ -1204,6 +1370,9 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
.detail("TeamCollectionId", tciIndex)
.detail("AnyDestOverloaded", anyDestOverloaded)
.detail("NumOfTeamCollections", self->teamCollections.size());
if (rd.isRestore() && stuckCount > 50) {
throw data_move_dest_team_not_found();
}
wait(delay(SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskPriority::DataDistributionLaunch));
}
@ -1225,6 +1394,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
auto& serverIds = bestTeams[i].first->getServerIDs();
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(serverIds, i == 0));
// TODO(psm): Make DataMoveMetaData aware of the two-step data move optimization.
if (allHealthy && anyWithSource && !bestTeams[i].second) {
// When all servers in bestTeams[i] do not hold the shard (!bestTeams[i].second), it indicates
// the bestTeams[i] is in a new DC where data has not been replicated to.
@ -1262,7 +1432,9 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
.detail("DestTeamSize", totalIds);
}
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams);
if (!rd.isRestore()) {
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams);
}
// FIXME: do not add data in flight to servers that were already in the src.
healthyDestinations.addDataInFlightToTeam(+metrics.bytes);
@ -1296,6 +1468,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
state Promise<Void> dataMovementComplete;
// Move keys from source to destination by changing the serverKeyList and keyServerList system keys
state Future<Void> doMoveKeys = moveKeys(self->cx,
rd.dataMoveId,
rd.keys,
destIds,
healthyIds,
@ -1305,7 +1478,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
&self->finishMoveKeysParallelismLock,
self->teamCollections.size() > 1,
relocateShardInterval.pairID,
ddEnabledState);
ddEnabledState,
CancelConflictingDataMoves::False);
state Future<Void> pollHealth =
signalledTransferComplete ? Never()
: delay(SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch);
@ -1319,6 +1493,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
extraIds.clear();
ASSERT(totalIds == destIds.size()); // Sanity check the destIDs before we move keys
doMoveKeys = moveKeys(self->cx,
rd.dataMoveId,
rd.keys,
destIds,
healthyIds,
@ -1328,9 +1503,20 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
&self->finishMoveKeysParallelismLock,
self->teamCollections.size() > 1,
relocateShardInterval.pairID,
ddEnabledState);
ddEnabledState,
CancelConflictingDataMoves::False);
} else {
self->fetchKeysComplete.insert(rd);
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(rd.keys);
if (ranges.size() == 1 && static_cast<KeyRange>(ranges[0]) == rd.keys &&
ranges[0].value.id == rd.dataMoveId && !ranges[0].value.cancel.isValid()) {
self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove());
TraceEvent(SevVerbose, "DequeueDataMoveOnSuccess", self->distributorId)
.detail("DataMoveID", rd.dataMoveId)
.detail("DataMoveRange", rd.keys);
}
}
break;
}
}
@ -1423,12 +1609,13 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
}
}
} catch (Error& e) {
state Error err = e;
TraceEvent(relocateShardInterval.end(), distributorId)
.errorUnsuppressed(e)
.errorUnsuppressed(err)
.detail("Duration", now() - startTime);
if (now() - startTime > 600) {
TraceEvent(SevWarnAlways, "RelocateShardTooLong")
.errorUnsuppressed(e)
.errorUnsuppressed(err)
.detail("Duration", now() - startTime)
.detail("Dest", describe(destIds))
.detail("Src", describe(rd.src));
@ -1438,12 +1625,16 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
relocationComplete.send(rd);
if (e.code() != error_code_actor_cancelled) {
if (err.code() == error_code_data_move_dest_team_not_found) {
wait(cancelDataMove(self, rd.keys, ddEnabledState));
}
if (err.code() != error_code_actor_cancelled && err.code() != error_code_data_move_cancelled) {
if (errorOut.canBeSet()) {
errorOut.sendError(e);
errorOut.sendError(err);
}
}
throw;
throw err;
}
}
@ -2036,10 +2227,18 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
choose {
when(RelocateShard rs = waitNext(self.input)) {
bool wasEmpty = serversToLaunchFrom.empty();
self.queueRelocation(rs, serversToLaunchFrom);
if (wasEmpty && !serversToLaunchFrom.empty())
launchQueuedWorkTimeout = delay(0, TaskPriority::DataDistributionLaunch);
if (rs.isRestore()) {
ASSERT(rs.dataMove != nullptr);
ASSERT(rs.dataMoveId.isValid());
self.launchQueuedWork(RelocateData(rs), ddEnabledState);
} else if (rs.cancelled) {
self.enqueueCancelledDataMove(rs.dataMoveId, rs.keys, ddEnabledState);
} else {
bool wasEmpty = serversToLaunchFrom.empty();
self.queueRelocation(rs, serversToLaunchFrom);
if (wasEmpty && !serversToLaunchFrom.empty())
launchQueuedWorkTimeout = delay(0, TaskPriority::DataDistributionLaunch);
}
}
when(wait(launchQueuedWorkTimeout)) {
self.launchQueuedWork(serversToLaunchFrom, ddEnabledState);
@ -2059,6 +2258,10 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(RelocateData done = waitNext(self.relocationComplete.getFuture())) {
self.activeRelocations--;
TraceEvent(SevVerbose, "InFlightRelocationChange")
.detail("Complete", done.dataMoveId)
.detail("IsRestore", done.isRestore())
.detail("Total", self.activeRelocations);
self.finishRelocation(done.priority, done.healthPriority);
self.fetchKeysComplete.erase(done);
// self.logRelocation( done, "ShardRelocatorDone" );
@ -2123,7 +2326,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
} catch (Error& e) {
if (e.code() != error_code_broken_promise && // FIXME: Get rid of these broken_promise errors every time we
// are killed by the master dying
e.code() != error_code_movekeys_conflict)
e.code() != error_code_movekeys_conflict && e.code() != error_code_data_move_cancelled &&
e.code() != error_code_data_move_dest_team_not_found)
TraceEvent(SevError, "DataDistributionQueueError", distributorId).error(e);
throw e;
}

View File

@ -712,19 +712,18 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
}
}
/*TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId)
// .detail("TrackerId", trackerID)
.detail("BeginKey", keys.begin.printableNonNull())
.detail("EndKey", keys.end.printableNonNull())
.detail("ShouldSplit", shouldSplit)
.detail("ShouldMerge", shouldMerge)
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough())
.detail("CurrentMetrics", stats.toString())
.detail("ShardBoundsMaxBytes", shardBounds.max.bytes)
.detail("ShardBoundsMinBytes", shardBounds.min.bytes)
.detail("WriteBandwitdhStatus", bandwidthStatus)
.detail("SplitBecauseHighWriteBandWidth", ( bandwidthStatus == BandwidthStatusHigh && keys.begin <
keyServersKeys.begin ) ? "Yes" :"No");*/
// TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId)
// .detail("BeginKey", keys.begin.printable())
// .detail("EndKey", keys.end.printable())
// .detail("ShouldSplit", shouldSplit)
// .detail("ShouldMerge", shouldMerge)
// .detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough())
// .detail("CurrentMetrics", stats.toString())
// .detail("ShardBoundsMaxBytes", shardBounds.max.bytes)
// .detail("ShardBoundsMinBytes", shardBounds.min.bytes)
// .detail("WriteBandwitdhStatus", bandwidthStatus)
// .detail("SplitBecauseHighWriteBandWidth",
// (bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin) ? "Yes" : "No");
if (!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
onChange = onChange || shardMerger(self, keys, shardSize);
@ -1050,6 +1049,9 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
self.sizeChanges.add(fetchShardMetricsList(&self, req));
}
when(wait(self.sizeChanges.getResult())) {}
when(KeyRange req = waitNext(self.shardsAffectedByTeamFailure->restartShardTracker.getFuture())) {
restartShardTrackers(&self, req);
}
}
} catch (Error& e) {
TraceEvent(SevError, "DataDistributionTrackerError", self.distributorId).error(e);

File diff suppressed because it is too large Load Diff

View File

@ -142,6 +142,10 @@ int64_t getQueueSize(const TraceEventFields& md) {
return inputBytes - durableBytes;
}
int64_t getDurableVersion(const TraceEventFields& md) {
return boost::lexical_cast<int64_t>(md.getValue("DurableVersion"));
}
// Computes the popped version lag for tlogs
int64_t getPoppedVersionLag(const TraceEventFields& md) {
int64_t persistentDataDurableVersion = boost::lexical_cast<int64_t>(md.getValue("PersistentDataDurableVersion"));
@ -355,23 +359,42 @@ int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& message
}
// Timeout wrapper when getting the storage metrics. This will do some additional tracing
ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi) {
state Future<TraceEventFields> result =
wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics")));
state Future<Void> timeout = delay(1.0);
choose {
when(TraceEventFields res = wait(result)) { return res; }
when(wait(timeout)) {
ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi, Version version) {
state int retries = 0;
loop {
++retries;
state Future<TraceEventFields> result =
wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics")));
state Future<Void> timeout = delay(30.0);
choose {
when(TraceEventFields res = wait(result)) {
if (version == invalidVersion || getDurableVersion(res) >= static_cast<int64_t>(version)) {
return res;
}
}
when(wait(timeout)) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics")
.detail("Storage", format("%016" PRIx64, storage.first()));
throw timed_out();
}
}
if (retries > 30) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics")
.detail("Storage", format("%016" PRIx64, storage.first()));
.detail("Reason", "Could not fetch StorageMetrics x30")
.detail("Storage", format("%016" PRIx64, storage.first()))
.detail("Version", version);
throw timed_out();
}
wait(delay(1.0));
}
};
}
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Version version) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
@ -394,7 +417,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
.detail("SS", servers[i].id());
throw attribute_not_found();
}
messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second));
messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second, version));
}
wait(waitForAll(messages));
@ -767,7 +790,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
// To get around this, quiet Database will disable the perpetual wiggle in the setup phase.
printf("Set perpetual_storage_wiggle=0 ...\n");
wait(setPerpetualStorageWiggle(cx, false, LockAware::True));
state Version version = wait(setPerpetualStorageWiggle(cx, false, LockAware::True));
printf("Set perpetual_storage_wiggle=0 Done.\n");
// Require 3 consecutive successful quiet database checks spaced 2 second apart
@ -784,7 +807,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
tLogQueueInfo = getTLogQueueInfo(cx, dbInfo);
dataDistributionQueueSize = getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0);
teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo, version);
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
versionOffset = getVersionOffset(cx, distributorWorker, dbInfo);

View File

@ -1764,7 +1764,9 @@ void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) {
// Also configures the cluster behaviour through setting some flags on the simulator.
void SimulationConfig::setTss(const TestConfig& testConfig) {
int tssCount = 0;
if (!testConfig.simpleConfig && !testConfig.disableTss && deterministicRandom()->random01() < 0.25) {
// TODO: Support TSS in SHARD_ENCODE_LOCATION_METADATA mode.
if (!testConfig.simpleConfig && !testConfig.disableTss && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
deterministicRandom()->random01() < 0.25) {
// 1 or 2 tss
tssCount = deterministicRandom()->randomInt(1, 3);
}

View File

@ -319,6 +319,21 @@ TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Opti
}
}
// static
std::string TCTeamInfo::serversToString(std::vector<UID> servers) {
if (servers.empty()) {
return "[unset]";
}
std::sort(servers.begin(), servers.end());
std::stringstream ss;
for (const auto& id : servers) {
ss << id.toString() << " ";
}
return ss.str();
}
std::vector<StorageServerInterface> TCTeamInfo::getLastKnownServerInterfaces() const {
std::vector<StorageServerInterface> v;
v.reserve(servers.size());
@ -329,16 +344,7 @@ std::vector<StorageServerInterface> TCTeamInfo::getLastKnownServerInterfaces() c
}
std::string TCTeamInfo::getServerIDsStr() const {
std::stringstream ss;
if (serverIDs.empty())
return "[unset]";
for (const auto& id : serverIDs) {
ss << id.toString() << " ";
}
return std::move(ss).str();
return serversToString(this->serverIDs);
}
void TCTeamInfo::addDataInFlightToTeam(int64_t delta) {

View File

@ -24,23 +24,54 @@
#elif !defined(FDBSERVER_DATA_DISTRIBUTION_ACTOR_H)
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H
#include <boost/heap/skew_heap.hpp>
#include <boost/heap/policies.hpp>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/LogSystem.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/MoveKeys.actor.h"
#include <boost/heap/policies.hpp>
#include <boost/heap/skew_heap.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
enum class RelocateReason { INVALID = -1, OTHER, REBALANCE_DISK, REBALANCE_READ };
struct DDShardInfo;
// Represents a data move in DD.
struct DataMove {
DataMove() : meta(DataMoveMetaData()), restore(false), valid(false), cancelled(false) {}
explicit DataMove(DataMoveMetaData meta, bool restore)
: meta(std::move(meta)), restore(restore), valid(true), cancelled(meta.getPhase() == DataMoveMetaData::Deleting) {
}
// Checks if the DataMove is consistent with the shard.
void validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority = SERVER_KNOBS->PRIORITY_RECOVER_MOVE);
bool isCancelled() const { return this->cancelled; }
const DataMoveMetaData meta;
bool restore; // The data move is scheduled by a previous DD, and is being recovered now.
bool valid; // The data move data is integral.
bool cancelled; // The data move has been cancelled.
std::vector<UID> primarySrc;
std::vector<UID> remoteSrc;
std::vector<UID> primaryDest;
std::vector<UID> remoteDest;
};
struct RelocateShard {
KeyRange keys;
int priority;
bool cancelled; // The data move should be cancelled.
std::shared_ptr<DataMove> dataMove; // Not null if this is a restored data move.
UID dataMoveId;
RelocateReason reason;
RelocateShard() : priority(0), reason(RelocateReason::INVALID) {}
RelocateShard() : priority(0), cancelled(false), dataMoveId(anonymousShardId), reason(RelocateReason::INVALID) {}
RelocateShard(KeyRange const& keys, int priority, RelocateReason reason)
: keys(keys), priority(priority), reason(reason) {}
: keys(keys), priority(priority), cancelled(false), dataMoveId(anonymousShardId), reason(reason) {}
bool isRestore() const { return this->dataMove != nullptr; }
};
struct IDataDistributionTeam {
@ -88,6 +119,7 @@ FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil);
FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards);
FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance);
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil);
FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers);
struct GetTeamRequest {
bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server
@ -97,6 +129,7 @@ struct GetTeamRequest {
bool forReadBalance;
bool preferLowerReadUtil; // only make sense when forReadBalance is true
double inflightPenalty;
bool findTeamByServers;
std::vector<UID> completeSources;
std::vector<UID> src;
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
@ -113,7 +146,13 @@ struct GetTeamRequest {
double inflightPenalty = 1.0)
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil),
teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance),
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty) {}
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty),
findTeamByServers(FindTeamByServers::False) {}
GetTeamRequest(std::vector<UID> servers)
: wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False),
preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False),
forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0),
findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {}
// return true if a.score < b.score
[[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const {
@ -129,7 +168,8 @@ struct GetTeamRequest {
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
<< " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty << ";";
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty
<< " findTeamByServers:" << findTeamByServers << ";";
ss << "CompleteSources:";
for (const auto& cs : completeSources) {
ss << cs.toString() << ",";
@ -224,6 +264,8 @@ public:
bool operator>=(const Team& r) const { return !(*this < r); }
bool operator==(const Team& r) const { return servers == r.servers && primary == r.primary; }
bool operator!=(const Team& r) const { return !(*this == r); }
std::string toString() const { return describe(servers); };
};
// This tracks the data distribution on the data distribution server so that teamTrackers can
@ -254,6 +296,8 @@ public:
void finishMove(KeyRangeRef keys);
void check() const;
PromiseStream<KeyRange> restartShardTracker;
private:
struct OrderByTeamKey {
bool operator()(const std::pair<Team, KeyRange>& lhs, const std::pair<Team, KeyRange>& rhs) const {
@ -283,17 +327,23 @@ struct DDShardInfo {
std::vector<UID> primaryDest;
std::vector<UID> remoteDest;
bool hasDest;
UID srcId;
UID destId;
explicit DDShardInfo(Key key) : key(key), hasDest(false) {}
DDShardInfo(Key key, UID srcId, UID destId) : key(key), hasDest(false), srcId(srcId), destId(destId) {}
};
struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
InitialDataDistribution() : dataMoveMap(std::make_shared<DataMove>()) {}
int mode;
std::vector<std::pair<StorageServerInterface, ProcessClass>> allServers;
std::set<std::vector<UID>> primaryTeams;
std::set<std::vector<UID>> remoteTeams;
std::vector<DDShardInfo> shards;
Optional<Key> initHealthyZoneValue;
KeyRangeMap<std::shared_ptr<DataMove>> dataMoveMap;
};
struct ShardMetrics {

View File

@ -25,12 +25,15 @@
#elif !defined(FDBSERVER_MOVEKEYS_ACTOR_H)
#define FDBSERVER_MOVEKEYS_ACTOR_H
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MasterInterface.h"
#include "flow/BooleanParam.h"
#include "flow/actorcompiler.h"
FDB_DECLARE_BOOLEAN_PARAM(CancelConflictingDataMoves);
struct MoveKeysLock {
UID prevOwner, myOwner, prevWrite;
template <class Ar>
@ -66,7 +69,13 @@ void seedShardServers(Arena& trArena, CommitTransactionRef& tr, std::vector<Stor
// Called by the master server to write the very first transaction to the database
// establishing a set of shard servers and all invariants of the systemKeys.
// Eventually moves the given keys to the given destination team
// Caller is responsible for cancelling it before issuing an overlapping move,
// for restarting the remainder, and for not otherwise cancelling it before
// it returns (since it needs to execute the finishMoveKeys transaction).
// When dataMoveId.isValid(), the keyrange will be moved to a shard designated as dataMoveId.
ACTOR Future<Void> moveKeys(Database occ,
UID dataMoveId,
KeyRange keys,
std::vector<UID> destinationTeam,
std::vector<UID> healthyDestinations,
@ -76,11 +85,16 @@ ACTOR Future<Void> moveKeys(Database occ,
FlowLock* finishMoveKeysParallelismLock,
bool hasRemote,
UID relocationIntervalId, // for logging only
const DDEnabledState* ddEnabledState);
// Eventually moves the given keys to the given destination team
// Caller is responsible for cancelling it before issuing an overlapping move,
// for restarting the remainder, and for not otherwise cancelling it before
// it returns (since it needs to execute the finishMoveKeys transaction).
const DDEnabledState* ddEnabledState,
CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False);
// Cancels a data move designated by dataMoveId.
ACTOR Future<Void> cleanUpDataMove(Database occ,
UID dataMoveId,
MoveKeysLock lock,
FlowLock* cleanUpDataMoveParallelismLock,
KeyRange range,
const DDEnabledState* ddEnabledState);
ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServerInterface server);
// Adds a newly recruited storage server to a database (e.g. adding it to FF/serverList)

View File

@ -24,13 +24,16 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/DatabaseContext.h" // for clone()
#include "fdbclient/FDBTypes.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
Version const& version);
Future<int64_t> getDataDistributionQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
bool const& reportInFlight);

View File

@ -184,6 +184,8 @@ public:
Optional<Reference<TCTenantInfo>>& getTenant() { return tenant; }
static std::string serversToString(std::vector<UID> servers);
std::string getTeamID() const override { return id.shortString(); }
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override;

View File

@ -6678,6 +6678,7 @@ private:
KeyRef startKey;
bool nowAssigned;
bool emptyRange;
UID dataMoveId;
bool processedStartKey;
KeyRef cacheStartKey;
@ -6712,8 +6713,7 @@ private:
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same
// keys
startKey = m.param1;
nowAssigned = m.param2 != serverKeysFalse;
emptyRange = m.param2 == serverKeysTrueEmptyRange;
decodeServerKeysValue(m.param2, nowAssigned, emptyRange, dataMoveId);
processedStartKey = true;
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)

View File

@ -1652,7 +1652,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
if (perpetualWiggleEnabled) { // restore the enabled perpetual storage wiggle setting
printf("Set perpetual_storage_wiggle=1 ...\n");
wait(setPerpetualStorageWiggle(cx, true, LockAware::True));
Version cVer = wait(setPerpetualStorageWiggle(cx, true, LockAware::True));
printf("Set perpetual_storage_wiggle=1 Done.\n");
}
}

View File

@ -24,6 +24,7 @@
#include "flow/IRandom.h"
#include "fdbclient/Tracing.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/IRateControl.h"
@ -273,7 +274,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
// Check that nothing is in the storage server queues
try {
int64_t maxStorageServerQueueSize = wait(getMaxStorageServerQueueSize(cx, self->dbInfo));
int64_t maxStorageServerQueueSize =
wait(getMaxStorageServerQueueSize(cx, self->dbInfo, invalidVersion));
if (maxStorageServerQueueSize > 0) {
TraceEvent("ConsistencyCheck_ExceedStorageServerQueueLimit")
.detail("MaxQueueSize", maxStorageServerQueueSize);

View File

@ -49,7 +49,7 @@ struct DataLossRecoveryWorkload : TestWorkload {
NetworkAddress addr;
DataLossRecoveryWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), startMoveKeysParallelismLock(1), finishMoveKeysParallelismLock(1), enabled(!clientId),
: TestWorkload(wcx), startMoveKeysParallelismLock(5), finishMoveKeysParallelismLock(5), enabled(!clientId),
pass(true) {}
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
@ -78,19 +78,26 @@ struct DataLossRecoveryWorkload : TestWorkload {
wait(self->writeAndVerify(self, cx, key, oldValue));
TraceEvent("DataLossRecovery").detail("Phase", "InitialWrites");
// Move [key, endKey) to team: {address}.
state NetworkAddress address = wait(self->disableDDAndMoveShard(self, cx, KeyRangeRef(key, endKey)));
TraceEvent("DataLossRecovery").detail("Phase", "Moved");
wait(self->readAndVerify(self, cx, key, oldValue));
TraceEvent("DataLossRecovery").detail("Phase", "ReadAfterMove");
// Kill team {address}, and expect read to timeout.
self->killProcess(self, address);
TraceEvent("DataLossRecovery").detail("Phase", "KilledProcess");
wait(self->readAndVerify(self, cx, key, timed_out()));
TraceEvent("DataLossRecovery").detail("Phase", "VerifiedReadTimeout");
// Reenable DD and exclude address as fail, so that [key, endKey) will be dropped and moved to a new team.
// Expect read to return 'value not found'.
int ignore = wait(setDDMode(cx, 1));
wait(self->exclude(cx, address));
TraceEvent("DataLossRecovery").detail("Phase", "Excluded");
wait(self->readAndVerify(self, cx, key, Optional<Value>()));
TraceEvent("DataLossRecovery").detail("Phase", "VerifiedDataDropped");
// Write will scceed.
wait(self->writeAndVerify(self, cx, key, newValue));
@ -172,6 +179,7 @@ struct DataLossRecoveryWorkload : TestWorkload {
ACTOR Future<NetworkAddress> disableDDAndMoveShard(DataLossRecoveryWorkload* self, Database cx, KeyRange keys) {
// Disable DD to avoid DD undoing of our move.
state int ignore = wait(setDDMode(cx, 0));
TraceEvent("DataLossRecovery").detail("Phase", "DisabledDD");
state NetworkAddress addr;
// Pick a random SS as the dest, keys will reside on a single server after the move.
@ -203,7 +211,9 @@ struct DataLossRecoveryWorkload : TestWorkload {
MoveKeysLock moveKeysLock;
moveKeysLock.myOwner = owner;
TraceEvent("DataLossRecovery").detail("Phase", "StartMoveKeys");
wait(moveKeys(cx,
deterministicRandom()->randomUniqueID(),
keys,
dest,
dest,
@ -213,9 +223,11 @@ struct DataLossRecoveryWorkload : TestWorkload {
&self->finishMoveKeysParallelismLock,
false,
UID(), // for logging only
&ddEnabledState));
&ddEnabledState,
CancelConflictingDataMoves::True));
break;
} catch (Error& e) {
TraceEvent("DataLossRecovery").error(e).detail("Phase", "MoveRangeError");
if (e.code() == error_code_movekeys_conflict) {
// Conflict on moveKeysLocks with the current running DD is expected, just retry.
tr.reset();

View File

@ -143,6 +143,7 @@ struct MoveKeysWorkload : TestWorkload {
state Promise<Void> signal;
state DDEnabledState ddEnabledState;
wait(moveKeys(cx,
deterministicRandom()->randomUniqueID(),
keys,
destinationTeamIDs,
destinationTeamIDs,
@ -152,7 +153,8 @@ struct MoveKeysWorkload : TestWorkload {
&fl2,
false,
relocateShardInterval.pairID,
&ddEnabledState));
&ddEnabledState,
CancelConflictingDataMoves::True));
TraceEvent(relocateShardInterval.end()).detail("Result", "Success");
return Void();
} catch (Error& e) {

View File

@ -209,7 +209,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( ZERO_LENGTH_FILE_PAD, 1 );
init( TRACE_FLUSH_INTERVAL, 0.25 );
init( TRACE_RETRY_OPEN_INTERVAL, 1.00 );
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time)
init( TRACE_SYNC_ENABLED, 0 );

View File

@ -173,6 +173,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, EncryptionAtRest);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, ShardEncodeLocationMetaData);
};
template <>

View File

@ -95,6 +95,8 @@ ERROR( page_encoding_not_supported, 1071, "Page encoding type is not supported o
ERROR( page_decoding_failed, 1072, "Page content decoding failed" )
ERROR( unexpected_encoding_type, 1073, "Page content decoding failed" )
ERROR( encryption_key_not_found, 1074, "Encryption key not found" )
ERROR( data_move_cancelled, 1075, "Data move was cancelled" )
ERROR( data_move_dest_team_not_found, 1076, "Dest team was not found for data move" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -186,6 +186,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/WriteDuringRead.toml)
add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml)
add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT)
add_fdb_test(TEST_FILES noSim/SystemDataTest.toml UNIT)
if (WITH_ROCKSDB_EXPERIMENTAL)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)