diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index ba0d8fbd2d..b50bd7992d 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -1770,19 +1770,19 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, state Error error = success(); state Promise dataMovementComplete; // Move keys from source to destination by changing the serverKeyList and keyServerList system keys - state Future doMoveKeys = moveKeys(self->cx, - rd.dataMoveId, - rd.keys, - destIds, - healthyIds, - self->lock, - dataMovementComplete, - &self->startMoveKeysParallelismLock, - &self->finishMoveKeysParallelismLock, - self->teamCollections.size() > 1, - relocateShardInterval.pairID, - ddEnabledState, - CancelConflictingDataMoves::False); + state Future doMoveKeys = + self->txnProcessor->moveKeys(MoveKeysParams{ rd.dataMoveId, + rd.keys, + destIds, + healthyIds, + self->lock, + dataMovementComplete, + &self->startMoveKeysParallelismLock, + &self->finishMoveKeysParallelismLock, + self->teamCollections.size() > 1, + relocateShardInterval.pairID, + ddEnabledState, + CancelConflictingDataMoves::False }); state Future pollHealth = signalledTransferComplete ? Never() : delay(SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch); @@ -1795,19 +1795,19 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, healthyIds.insert(healthyIds.end(), extraIds.begin(), extraIds.end()); extraIds.clear(); ASSERT(totalIds == destIds.size()); // Sanity check the destIDs before we move keys - doMoveKeys = moveKeys(self->cx, - rd.dataMoveId, - rd.keys, - destIds, - healthyIds, - self->lock, - Promise(), - &self->startMoveKeysParallelismLock, - &self->finishMoveKeysParallelismLock, - self->teamCollections.size() > 1, - relocateShardInterval.pairID, - ddEnabledState, - CancelConflictingDataMoves::False); + doMoveKeys = + self->txnProcessor->moveKeys(MoveKeysParams{ rd.dataMoveId, + rd.keys, + destIds, + healthyIds, + self->lock, + Promise(), + &self->startMoveKeysParallelismLock, + &self->finishMoveKeysParallelismLock, + self->teamCollections.size() > 1, + relocateShardInterval.pairID, + ddEnabledState, + CancelConflictingDataMoves::False }); } else { self->fetchKeysComplete.insert(rd); if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 9e7f9e30c4..7cd38d9281 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -282,6 +282,7 @@ Future checkMoveKeysLockReadOnly(Transaction* tr, MoveKeysLock lock, const return checkMoveKeysLock(tr, lock, ddEnabledState, false); } +namespace { ACTOR Future> checkReadWrite(Future> fReply, UID uid, Version version) { ErrorOr reply = wait(fReply); if (!reply.present() || reply.get().first < version) @@ -1795,6 +1796,8 @@ ACTOR static Future finishMoveShards(Database occ, return Void(); } +}; // anonymous namespace + ACTOR Future> addStorageServer(Database cx, StorageServerInterface server) { state Reference tr = makeReference(cx); state KeyBackedMap tssMapDB = KeyBackedMap(tssMappingKeys.begin); @@ -2444,76 +2447,75 @@ ACTOR Future cleanUpDataMove(Database occ, return Void(); } -ACTOR Future moveKeys(Database cx, - UID dataMoveId, - KeyRange keys, - std::vector destinationTeam, - std::vector healthyDestinations, - MoveKeysLock lock, - Promise dataMovementComplete, - FlowLock* startMoveKeysParallelismLock, - FlowLock* finishMoveKeysParallelismLock, - bool hasRemote, - UID relocationIntervalId, - const DDEnabledState* ddEnabledState, - CancelConflictingDataMoves cancelConflictingDataMoves) { - ASSERT(destinationTeam.size()); - std::sort(destinationTeam.begin(), destinationTeam.end()); +Future startMovement(Database occ, MoveKeysParams& params, std::map& tssMapping) { + if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + return startMoveShards(std::move(occ), + params.dataMoveId, + params.keys, + params.destinationTeam, + params.lock, + params.startMoveKeysParallelismLock, + params.relocationIntervalId, + params.ddEnabledState, + params.cancelConflictingDataMoves); + } + return startMoveKeys(std::move(occ), + params.keys, + params.destinationTeam, + params.lock, + params.startMoveKeysParallelismLock, + params.relocationIntervalId, + &tssMapping, + params.ddEnabledState); +} + +Future finishMovement(Database occ, + MoveKeysParams& params, + const std::map& tssMapping) { + if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + return finishMoveShards(std::move(occ), + params.dataMoveId, + params.keys, + params.destinationTeam, + params.lock, + params.finishMoveKeysParallelismLock, + params.hasRemote, + params.relocationIntervalId, + tssMapping, + params.ddEnabledState); + } + return finishMoveKeys(std::move(occ), + params.keys, + params.destinationTeam, + params.lock, + params.finishMoveKeysParallelismLock, + params.hasRemote, + params.relocationIntervalId, + tssMapping, + params.ddEnabledState); +} + +ACTOR Future moveKeys(Database occ, MoveKeysParams params) { + ASSERT(params.destinationTeam.size()); + std::sort(params.destinationTeam.begin(), params.destinationTeam.end()); state std::map tssMapping; - if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { - wait(startMoveShards(cx, - dataMoveId, - keys, - destinationTeam, - lock, - startMoveKeysParallelismLock, - relocationIntervalId, - ddEnabledState, - cancelConflictingDataMoves)); + wait(startMovement(occ, params, tssMapping)); - } else { - wait(startMoveKeys(cx, - keys, - destinationTeam, - lock, - startMoveKeysParallelismLock, - relocationIntervalId, - &tssMapping, - ddEnabledState)); - } + state Future completionSignaller = checkFetchingState(occ, + params.healthyDestinations, + params.keys, + params.dataMovementComplete, + params.relocationIntervalId, + tssMapping); - state Future completionSignaller = - checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId, tssMapping); - - if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { - wait(finishMoveShards(cx, - dataMoveId, - keys, - destinationTeam, - lock, - finishMoveKeysParallelismLock, - hasRemote, - relocationIntervalId, - tssMapping, - ddEnabledState)); - } else { - wait(finishMoveKeys(cx, - keys, - destinationTeam, - lock, - finishMoveKeysParallelismLock, - hasRemote, - relocationIntervalId, - tssMapping, - ddEnabledState)); - } + wait(finishMovement(occ, params, tssMapping)); // This is defensive, but make sure that we always say that the movement is complete before moveKeys completes completionSignaller.cancel(); - if (!dataMovementComplete.isSet()) - dataMovementComplete.send(Void()); + if (!params.dataMovementComplete.isSet()) + params.dataMovementComplete.send(Void()); return Void(); } diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index 451dc84c96..00410a54f8 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -74,6 +74,8 @@ public: const Optional& tssPairID, const MoveKeysLock& lock, const DDEnabledState* ddEnabledState) const = 0; + + virtual Future moveKeys(const MoveKeysParams& params) const = 0; }; class DDTxnProcessorImpl; @@ -125,6 +127,8 @@ public: const DDEnabledState* ddEnabledState) const override { return ::removeStorageServer(cx, serverID, tssPairID, lock, ddEnabledState); } + + Future moveKeys(const MoveKeysParams& params) const override { return ::moveKeys(cx, params); } }; // A mock transaction implementation for test usage. diff --git a/fdbserver/include/fdbserver/MoveKeys.actor.h b/fdbserver/include/fdbserver/MoveKeys.actor.h index b809b02521..f0c142a9b4 100644 --- a/fdbserver/include/fdbserver/MoveKeys.actor.h +++ b/fdbserver/include/fdbserver/MoveKeys.actor.h @@ -56,6 +56,20 @@ public: bool setDDEnabled(bool status, UID snapUID); }; +struct MoveKeysParams { + UID dataMoveId; + KeyRange keys; + std::vector destinationTeam, healthyDestinations; + MoveKeysLock lock; + Promise dataMovementComplete; + FlowLock* startMoveKeysParallelismLock = nullptr; + FlowLock* finishMoveKeysParallelismLock = nullptr; + bool hasRemote; + UID relocationIntervalId; + const DDEnabledState* ddEnabledState = nullptr; + CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False; +}; + // Calling moveKeys, etc with the return value of this actor ensures that no movekeys, etc // has been executed by a different locker since takeMoveKeysLock(), as calling // takeMoveKeysLock() updates "moveKeysLockOwnerKey" to a random UID. @@ -74,19 +88,7 @@ void seedShardServers(Arena& trArena, CommitTransactionRef& tr, std::vector moveKeys(Database occ, - UID dataMoveId, - KeyRange keys, - std::vector destinationTeam, - std::vector healthyDestinations, - MoveKeysLock lock, - Promise dataMovementComplete, - FlowLock* startMoveKeysParallelismLock, - FlowLock* finishMoveKeysParallelismLock, - bool hasRemote, - UID relocationIntervalId, // for logging only - const DDEnabledState* ddEnabledState, - CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False); +ACTOR Future moveKeys(Database occ, MoveKeysParams params); // Cancels a data move designated by dataMoveId. ACTOR Future cleanUpDataMove(Database occ, diff --git a/fdbserver/workloads/DataLossRecovery.actor.cpp b/fdbserver/workloads/DataLossRecovery.actor.cpp index 048b6d4bec..011de1eedb 100644 --- a/fdbserver/workloads/DataLossRecovery.actor.cpp +++ b/fdbserver/workloads/DataLossRecovery.actor.cpp @@ -213,18 +213,18 @@ struct DataLossRecoveryWorkload : TestWorkload { TraceEvent("DataLossRecovery").detail("Phase", "StartMoveKeys"); wait(moveKeys(cx, - deterministicRandom()->randomUniqueID(), - keys, - dest, - dest, - moveKeysLock, - Promise(), - &self->startMoveKeysParallelismLock, - &self->finishMoveKeysParallelismLock, - false, - UID(), // for logging only - &ddEnabledState, - CancelConflictingDataMoves::True)); + MoveKeysParams{ deterministicRandom()->randomUniqueID(), + keys, + dest, + dest, + moveKeysLock, + Promise(), + &self->startMoveKeysParallelismLock, + &self->finishMoveKeysParallelismLock, + false, + UID(), // for logging only + &ddEnabledState, + CancelConflictingDataMoves::True })); break; } catch (Error& e) { TraceEvent("DataLossRecovery").error(e).detail("Phase", "MoveRangeError"); diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index 9fa605a167..80bdcddbee 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -328,17 +328,17 @@ struct PhysicalShardMoveWorkLoad : TestWorkload { TraceEvent("TestMoveShardStartMoveKeys").detail("DataMove", dataMoveId); wait(moveKeys(cx, - dataMoveId, - keys, - dests, - dests, - moveKeysLock, - Promise(), - &self->startMoveKeysParallelismLock, - &self->finishMoveKeysParallelismLock, - false, - deterministicRandom()->randomUniqueID(), // for logging only - &ddEnabledState)); + MoveKeysParams{ dataMoveId, + keys, + dests, + dests, + moveKeysLock, + Promise(), + &self->startMoveKeysParallelismLock, + &self->finishMoveKeysParallelismLock, + false, + deterministicRandom()->randomUniqueID(), // for logging only + &ddEnabledState })); break; } catch (Error& e) { if (e.code() == error_code_movekeys_conflict) { diff --git a/fdbserver/workloads/RandomMoveKeys.actor.cpp b/fdbserver/workloads/RandomMoveKeys.actor.cpp index 6ec85f8381..09de18e68b 100644 --- a/fdbserver/workloads/RandomMoveKeys.actor.cpp +++ b/fdbserver/workloads/RandomMoveKeys.actor.cpp @@ -143,18 +143,18 @@ struct MoveKeysWorkload : TestWorkload { state Promise signal; state DDEnabledState ddEnabledState; wait(moveKeys(cx, - deterministicRandom()->randomUniqueID(), - keys, - destinationTeamIDs, - destinationTeamIDs, - lock, - signal, - &fl1, - &fl2, - false, - relocateShardInterval.pairID, - &ddEnabledState, - CancelConflictingDataMoves::True)); + MoveKeysParams{ deterministicRandom()->randomUniqueID(), + keys, + destinationTeamIDs, + destinationTeamIDs, + lock, + signal, + &fl1, + &fl2, + false, + relocateShardInterval.pairID, + &ddEnabledState, + CancelConflictingDataMoves::True })); TraceEvent(relocateShardInterval.end()).detail("Result", "Success"); return Void(); } catch (Error& e) {