Merge pull request #8162 from sfc-gh-xwang/feature/main/moveKey

Implement txnProcessor->moveKeys(const MoveKeysParams& params)
This commit is contained in:
Xiaoxi Wang 2022-09-13 14:11:49 -07:00 committed by GitHub
commit 2ae01bdf2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 144 additions and 136 deletions

View File

@ -1770,8 +1770,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
state Error error = success();
state Promise<Void> dataMovementComplete;
// Move keys from source to destination by changing the serverKeyList and keyServerList system keys
state Future<Void> doMoveKeys = moveKeys(self->cx,
rd.dataMoveId,
state Future<Void> doMoveKeys =
self->txnProcessor->moveKeys(MoveKeysParams{ rd.dataMoveId,
rd.keys,
destIds,
healthyIds,
@ -1782,7 +1782,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
self->teamCollections.size() > 1,
relocateShardInterval.pairID,
ddEnabledState,
CancelConflictingDataMoves::False);
CancelConflictingDataMoves::False });
state Future<Void> pollHealth =
signalledTransferComplete ? Never()
: delay(SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch);
@ -1795,8 +1795,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
healthyIds.insert(healthyIds.end(), extraIds.begin(), extraIds.end());
extraIds.clear();
ASSERT(totalIds == destIds.size()); // Sanity check the destIDs before we move keys
doMoveKeys = moveKeys(self->cx,
rd.dataMoveId,
doMoveKeys =
self->txnProcessor->moveKeys(MoveKeysParams{ rd.dataMoveId,
rd.keys,
destIds,
healthyIds,
@ -1807,7 +1807,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
self->teamCollections.size() > 1,
relocateShardInterval.pairID,
ddEnabledState,
CancelConflictingDataMoves::False);
CancelConflictingDataMoves::False });
} else {
self->fetchKeysComplete.insert(rd);
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {

View File

@ -282,6 +282,7 @@ Future<Void> checkMoveKeysLockReadOnly(Transaction* tr, MoveKeysLock lock, const
return checkMoveKeysLock(tr, lock, ddEnabledState, false);
}
namespace {
ACTOR Future<Optional<UID>> checkReadWrite(Future<ErrorOr<GetShardStateReply>> fReply, UID uid, Version version) {
ErrorOr<GetShardStateReply> reply = wait(fReply);
if (!reply.present() || reply.get().first < version)
@ -1795,6 +1796,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
return Void();
}
}; // anonymous namespace
ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServerInterface server) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
@ -2444,76 +2447,75 @@ ACTOR Future<Void> cleanUpDataMove(Database occ,
return Void();
}
ACTOR Future<Void> moveKeys(Database cx,
UID dataMoveId,
KeyRange keys,
std::vector<UID> destinationTeam,
std::vector<UID> healthyDestinations,
MoveKeysLock lock,
Promise<Void> dataMovementComplete,
FlowLock* startMoveKeysParallelismLock,
FlowLock* finishMoveKeysParallelismLock,
bool hasRemote,
UID relocationIntervalId,
const DDEnabledState* ddEnabledState,
CancelConflictingDataMoves cancelConflictingDataMoves) {
ASSERT(destinationTeam.size());
std::sort(destinationTeam.begin(), destinationTeam.end());
Future<Void> startMovement(Database occ, MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
return startMoveShards(std::move(occ),
params.dataMoveId,
params.keys,
params.destinationTeam,
params.lock,
params.startMoveKeysParallelismLock,
params.relocationIntervalId,
params.ddEnabledState,
params.cancelConflictingDataMoves);
}
return startMoveKeys(std::move(occ),
params.keys,
params.destinationTeam,
params.lock,
params.startMoveKeysParallelismLock,
params.relocationIntervalId,
&tssMapping,
params.ddEnabledState);
}
Future<Void> finishMovement(Database occ,
MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
return finishMoveShards(std::move(occ),
params.dataMoveId,
params.keys,
params.destinationTeam,
params.lock,
params.finishMoveKeysParallelismLock,
params.hasRemote,
params.relocationIntervalId,
tssMapping,
params.ddEnabledState);
}
return finishMoveKeys(std::move(occ),
params.keys,
params.destinationTeam,
params.lock,
params.finishMoveKeysParallelismLock,
params.hasRemote,
params.relocationIntervalId,
tssMapping,
params.ddEnabledState);
}
ACTOR Future<Void> moveKeys(Database occ, MoveKeysParams params) {
ASSERT(params.destinationTeam.size());
std::sort(params.destinationTeam.begin(), params.destinationTeam.end());
state std::map<UID, StorageServerInterface> tssMapping;
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
wait(startMoveShards(cx,
dataMoveId,
keys,
destinationTeam,
lock,
startMoveKeysParallelismLock,
relocationIntervalId,
ddEnabledState,
cancelConflictingDataMoves));
wait(startMovement(occ, params, tssMapping));
} else {
wait(startMoveKeys(cx,
keys,
destinationTeam,
lock,
startMoveKeysParallelismLock,
relocationIntervalId,
&tssMapping,
ddEnabledState));
}
state Future<Void> completionSignaller = checkFetchingState(occ,
params.healthyDestinations,
params.keys,
params.dataMovementComplete,
params.relocationIntervalId,
tssMapping);
state Future<Void> completionSignaller =
checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId, tssMapping);
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
wait(finishMoveShards(cx,
dataMoveId,
keys,
destinationTeam,
lock,
finishMoveKeysParallelismLock,
hasRemote,
relocationIntervalId,
tssMapping,
ddEnabledState));
} else {
wait(finishMoveKeys(cx,
keys,
destinationTeam,
lock,
finishMoveKeysParallelismLock,
hasRemote,
relocationIntervalId,
tssMapping,
ddEnabledState));
}
wait(finishMovement(occ, params, tssMapping));
// This is defensive, but make sure that we always say that the movement is complete before moveKeys completes
completionSignaller.cancel();
if (!dataMovementComplete.isSet())
dataMovementComplete.send(Void());
if (!params.dataMovementComplete.isSet())
params.dataMovementComplete.send(Void());
return Void();
}

View File

@ -74,6 +74,8 @@ public:
const Optional<UID>& tssPairID,
const MoveKeysLock& lock,
const DDEnabledState* ddEnabledState) const = 0;
virtual Future<Void> moveKeys(const MoveKeysParams& params) const = 0;
};
class DDTxnProcessorImpl;
@ -125,6 +127,8 @@ public:
const DDEnabledState* ddEnabledState) const override {
return ::removeStorageServer(cx, serverID, tssPairID, lock, ddEnabledState);
}
Future<Void> moveKeys(const MoveKeysParams& params) const override { return ::moveKeys(cx, params); }
};
// A mock transaction implementation for test usage.

View File

@ -56,6 +56,20 @@ public:
bool setDDEnabled(bool status, UID snapUID);
};
struct MoveKeysParams {
UID dataMoveId;
KeyRange keys;
std::vector<UID> destinationTeam, healthyDestinations;
MoveKeysLock lock;
Promise<Void> dataMovementComplete;
FlowLock* startMoveKeysParallelismLock = nullptr;
FlowLock* finishMoveKeysParallelismLock = nullptr;
bool hasRemote;
UID relocationIntervalId;
const DDEnabledState* ddEnabledState = nullptr;
CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False;
};
// Calling moveKeys, etc with the return value of this actor ensures that no movekeys, etc
// has been executed by a different locker since takeMoveKeysLock(), as calling
// takeMoveKeysLock() updates "moveKeysLockOwnerKey" to a random UID.
@ -74,19 +88,7 @@ void seedShardServers(Arena& trArena, CommitTransactionRef& tr, std::vector<Stor
// for restarting the remainder, and for not otherwise cancelling it before
// it returns (since it needs to execute the finishMoveKeys transaction).
// When dataMoveId.isValid(), the keyrange will be moved to a shard designated as dataMoveId.
ACTOR Future<Void> moveKeys(Database occ,
UID dataMoveId,
KeyRange keys,
std::vector<UID> destinationTeam,
std::vector<UID> healthyDestinations,
MoveKeysLock lock,
Promise<Void> dataMovementComplete,
FlowLock* startMoveKeysParallelismLock,
FlowLock* finishMoveKeysParallelismLock,
bool hasRemote,
UID relocationIntervalId, // for logging only
const DDEnabledState* ddEnabledState,
CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False);
ACTOR Future<Void> moveKeys(Database occ, MoveKeysParams params);
// Cancels a data move designated by dataMoveId.
ACTOR Future<Void> cleanUpDataMove(Database occ,

View File

@ -213,7 +213,7 @@ struct DataLossRecoveryWorkload : TestWorkload {
TraceEvent("DataLossRecovery").detail("Phase", "StartMoveKeys");
wait(moveKeys(cx,
deterministicRandom()->randomUniqueID(),
MoveKeysParams{ deterministicRandom()->randomUniqueID(),
keys,
dest,
dest,
@ -224,7 +224,7 @@ struct DataLossRecoveryWorkload : TestWorkload {
false,
UID(), // for logging only
&ddEnabledState,
CancelConflictingDataMoves::True));
CancelConflictingDataMoves::True }));
break;
} catch (Error& e) {
TraceEvent("DataLossRecovery").error(e).detail("Phase", "MoveRangeError");

View File

@ -328,7 +328,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
TraceEvent("TestMoveShardStartMoveKeys").detail("DataMove", dataMoveId);
wait(moveKeys(cx,
dataMoveId,
MoveKeysParams{ dataMoveId,
keys,
dests,
dests,
@ -338,7 +338,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
&self->finishMoveKeysParallelismLock,
false,
deterministicRandom()->randomUniqueID(), // for logging only
&ddEnabledState));
&ddEnabledState }));
break;
} catch (Error& e) {
if (e.code() == error_code_movekeys_conflict) {

View File

@ -143,7 +143,7 @@ struct MoveKeysWorkload : TestWorkload {
state Promise<Void> signal;
state DDEnabledState ddEnabledState;
wait(moveKeys(cx,
deterministicRandom()->randomUniqueID(),
MoveKeysParams{ deterministicRandom()->randomUniqueID(),
keys,
destinationTeamIDs,
destinationTeamIDs,
@ -154,7 +154,7 @@ struct MoveKeysWorkload : TestWorkload {
false,
relocateShardInterval.pairID,
&ddEnabledState,
CancelConflictingDataMoves::True));
CancelConflictingDataMoves::True }));
TraceEvent(relocateShardInterval.end()).detail("Result", "Success");
return Void();
} catch (Error& e) {