Added newDataMoveId(). (#9647)
* Added newDataMoveId(). * Added `ENABLE_DD_PHYSICAL_SHARD_MOVE` * fmt. * Replace `teamId` with `shardId`.
This commit is contained in:
parent
aeaedb147f
commit
0f5e75b34b
|
@ -171,6 +171,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
// Data distribution
|
||||
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
|
||||
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
|
||||
init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false );
|
||||
init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
|
||||
init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD
|
||||
init( ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME, 600.0 ); if( randomize && BUGGIFY ) ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME = 0.0; // 600 seconds; for ENABLE_DD_PHYSICAL_SHARD
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
|
||||
FDB_DEFINE_BOOLEAN_PARAM(AssignEmptyRange);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(UnassignShard);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(EnablePhysicalShardMove);
|
||||
|
||||
const KeyRef systemKeysPrefix = "\xff"_sr;
|
||||
const KeyRangeRef normalKeys(KeyRef(), systemKeysPrefix);
|
||||
|
@ -47,8 +48,9 @@ const KeyRangeRef keyServersKeyServersKeys("\xff/keyServers/\xff/keyServers/"_sr
|
|||
const KeyRef keyServersKeyServersKey = keyServersKeyServersKeys.begin;
|
||||
|
||||
// These constants are selected to be easily recognized during debugging.
|
||||
// Note that the last bit of the follwing constants is 0, indicating that physical shard move is disabled.
|
||||
const UID anonymousShardId = UID(0x666666, 0x88888888);
|
||||
const uint64_t emptyShardId = 0x7777777;
|
||||
const uint64_t emptyShardId = 0x2222222;
|
||||
|
||||
const Key keyServersKey(const KeyRef& k) {
|
||||
return k.withPrefix(keyServersPrefix);
|
||||
|
@ -457,7 +459,10 @@ const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTr
|
|||
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
|
||||
serverKeysFalse;
|
||||
|
||||
const UID newShardId(const uint64_t physicalShardId, AssignEmptyRange assignEmptyRange, UnassignShard unassignShard) {
|
||||
const UID newDataMoveId(const uint64_t physicalShardId,
|
||||
AssignEmptyRange assignEmptyRange,
|
||||
EnablePhysicalShardMove enablePSM,
|
||||
UnassignShard unassignShard) {
|
||||
uint64_t split = 0;
|
||||
if (assignEmptyRange) {
|
||||
split = emptyShardId;
|
||||
|
@ -466,6 +471,11 @@ const UID newShardId(const uint64_t physicalShardId, AssignEmptyRange assignEmpt
|
|||
} else {
|
||||
do {
|
||||
split = deterministicRandom()->randomUInt64();
|
||||
if (enablePSM) {
|
||||
split |= 1U;
|
||||
} else {
|
||||
split &= ~1U;
|
||||
}
|
||||
} while (split == anonymousShardId.second() || split == 0 || split == emptyShardId);
|
||||
}
|
||||
return UID(physicalShardId, split);
|
||||
|
@ -505,9 +515,10 @@ std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key) {
|
|||
}
|
||||
|
||||
bool serverHasKey(ValueRef storedValue) {
|
||||
UID teamId;
|
||||
UID shardId;
|
||||
bool assigned, emptyRange;
|
||||
decodeServerKeysValue(storedValue, assigned, emptyRange, teamId);
|
||||
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
|
||||
decodeServerKeysValue(storedValue, assigned, emptyRange, enablePSM, shardId);
|
||||
return assigned;
|
||||
}
|
||||
|
||||
|
@ -521,7 +532,12 @@ const Value serverKeysValue(const UID& id) {
|
|||
return wr.toValue();
|
||||
}
|
||||
|
||||
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) {
|
||||
void decodeServerKeysValue(const ValueRef& value,
|
||||
bool& assigned,
|
||||
bool& emptyRange,
|
||||
EnablePhysicalShardMove& enablePSM,
|
||||
UID& id) {
|
||||
enablePSM = EnablePhysicalShardMove::False;
|
||||
if (value.size() == 0) {
|
||||
assigned = false;
|
||||
emptyRange = false;
|
||||
|
@ -544,6 +560,9 @@ void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRan
|
|||
rd >> id;
|
||||
assigned = id.second() != 0;
|
||||
emptyRange = id.second() == emptyShardId;
|
||||
if (id.second() & 1U) {
|
||||
enablePSM = EnablePhysicalShardMove::True;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -164,6 +164,7 @@ public:
|
|||
// Data distribution
|
||||
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.
|
||||
bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true.
|
||||
bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move.
|
||||
int64_t MAX_PHYSICAL_SHARD_BYTES;
|
||||
double PHYSICAL_SHARD_METRICS_DELAY;
|
||||
double ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME;
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(AssignEmptyRange);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(UnassignShard);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(EnablePhysicalShardMove);
|
||||
|
||||
struct RestoreLoaderInterface;
|
||||
struct RestoreApplierInterface;
|
||||
|
@ -145,16 +146,21 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
|
|||
extern const KeyRangeRef serverKeysRange;
|
||||
extern const KeyRef serverKeysPrefix;
|
||||
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
|
||||
const UID newShardId(const uint64_t physicalShardId,
|
||||
AssignEmptyRange assignEmptyRange,
|
||||
UnassignShard unassignShard = UnassignShard::False);
|
||||
const UID newDataMoveId(const uint64_t physicalShardId,
|
||||
AssignEmptyRange assignEmptyRange,
|
||||
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False,
|
||||
UnassignShard unassignShard = UnassignShard::False);
|
||||
const Key serverKeysKey(UID serverID, const KeyRef& keys);
|
||||
const Key serverKeysPrefixFor(UID serverID);
|
||||
UID serverKeysDecodeServer(const KeyRef& key);
|
||||
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key);
|
||||
bool serverHasKey(ValueRef storedValue);
|
||||
const Value serverKeysValue(const UID& id);
|
||||
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id);
|
||||
void decodeServerKeysValue(const ValueRef& value,
|
||||
bool& assigned,
|
||||
bool& emptyRange,
|
||||
EnablePhysicalShardMove& enablePSM,
|
||||
UID& id);
|
||||
|
||||
extern const KeyRangeRef conflictingKeysRange;
|
||||
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
|
||||
|
|
|
@ -1253,7 +1253,14 @@ struct DDQueue : public IDDRelocationQueue {
|
|||
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD) {
|
||||
rrs.dataMoveId = UID();
|
||||
} else {
|
||||
rrs.dataMoveId = deterministicRandom()->randomUniqueID();
|
||||
rrs.dataMoveId =
|
||||
newDataMoveId(deterministicRandom()->randomUInt64(),
|
||||
AssignEmptyRange::False,
|
||||
EnablePhysicalShardMove(SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE));
|
||||
TraceEvent(SevInfo, "DDDataMoveInitiatedWithRandomDestID")
|
||||
.detail("DataMoveID", rrs.dataMoveId.toString())
|
||||
.detail("Range", rrs.keys)
|
||||
.detail("Reason", rrs.reason.toString());
|
||||
}
|
||||
} else {
|
||||
rrs.dataMoveId = anonymousShardId;
|
||||
|
@ -1746,7 +1753,12 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
} else {
|
||||
self->moveCreateNewPhysicalShard++;
|
||||
}
|
||||
rd.dataMoveId = newShardId(physicalShardIDCandidate, AssignEmptyRange::False);
|
||||
rd.dataMoveId = newDataMoveId(physicalShardIDCandidate,
|
||||
AssignEmptyRange::False,
|
||||
EnablePhysicalShardMove(SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE));
|
||||
TraceEvent(SevInfo, "DDDataMoveInitiated")
|
||||
.detail("DataMoveID", rd.dataMoveId.toString())
|
||||
.detail("Reason", rd.reason.toString());
|
||||
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
|
||||
inFlightRange.value().dataMoveId = rd.dataMoveId;
|
||||
auto f = self->dataMoves.intersectingRanges(rd.keys);
|
||||
|
|
|
@ -2043,9 +2043,10 @@ ACTOR Future<bool> canRemoveStorageServer(Reference<ReadYourWritesTransaction> t
|
|||
|
||||
// Return true if the entire range is false. Since these values are coalesced, we can return false if there is more
|
||||
// than one result
|
||||
UID teamId;
|
||||
UID shardId;
|
||||
bool assigned, emptyRange;
|
||||
decodeServerKeysValue(keys[0].value, assigned, emptyRange, teamId);
|
||||
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
|
||||
decodeServerKeysValue(keys[0].value, assigned, emptyRange, enablePSM, shardId);
|
||||
TraceEvent(SevVerbose, "CanRemoveStorageServer")
|
||||
.detail("ServerID", serverID)
|
||||
.detail("Key1", keys[0].key)
|
||||
|
@ -2274,7 +2275,8 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
}
|
||||
}
|
||||
|
||||
const UID shardId = newShardId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True);
|
||||
const UID shardId =
|
||||
newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True);
|
||||
|
||||
// Assign the shard to teamForDroppedRange in keyServer space.
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
|
@ -2653,13 +2655,13 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
|
|||
// to a specific
|
||||
// key (keyServersKeyServersKey)
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
const UID teamId = deterministicRandom()->randomUniqueID();
|
||||
ksValue = keyServersValue(serverSrcUID, /*dest=*/std::vector<UID>(), teamId, UID());
|
||||
const UID shardId = deterministicRandom()->randomUniqueID();
|
||||
ksValue = keyServersValue(serverSrcUID, /*dest=*/std::vector<UID>(), shardId, UID());
|
||||
krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value());
|
||||
|
||||
for (auto& s : servers) {
|
||||
krmSetPreviouslyEmptyRange(
|
||||
tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysValue(teamId), serverKeysFalse);
|
||||
tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysValue(shardId), serverKeysFalse);
|
||||
}
|
||||
} else {
|
||||
krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value());
|
||||
|
|
|
@ -8713,7 +8713,8 @@ private:
|
|||
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same
|
||||
// keys
|
||||
startKey = m.param1;
|
||||
decodeServerKeysValue(m.param2, nowAssigned, emptyRange, dataMoveId);
|
||||
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
|
||||
decodeServerKeysValue(m.param2, nowAssigned, emptyRange, enablePSM, dataMoveId);
|
||||
processedStartKey = true;
|
||||
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
|
||||
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)
|
||||
|
|
Loading…
Reference in New Issue