Fix simulation failure with the new MoveKeysParams
This commit is contained in:
parent
5f51129834
commit
7e55522cb0
|
@ -1859,19 +1859,35 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
state Error error = success();
|
||||
state Promise<Void> dataMovementComplete;
|
||||
// Move keys from source to destination by changing the serverKeyList and keyServerList system keys
|
||||
state Future<Void> doMoveKeys =
|
||||
self->txnProcessor->moveKeys(MoveKeysParams(rd.dataMoveId,
|
||||
rd.keys,
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
dataMovementComplete,
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False));
|
||||
std::unique_ptr<MoveKeysParams> params;
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
params = std::make_unique<MoveKeysParams>(rd.dataMoveId,
|
||||
std::vector<KeyRange>{ rd.keys },
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
dataMovementComplete,
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False);
|
||||
} else {
|
||||
params = std::make_unique<MoveKeysParams>(rd.dataMoveId,
|
||||
rd.keys,
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
dataMovementComplete,
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False);
|
||||
}
|
||||
state Future<Void> doMoveKeys = self->txnProcessor->moveKeys(*params);
|
||||
state Future<Void> pollHealth =
|
||||
signalledTransferComplete ? Never()
|
||||
: delay(SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch);
|
||||
|
@ -1884,19 +1900,35 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
healthyIds.insert(healthyIds.end(), extraIds.begin(), extraIds.end());
|
||||
extraIds.clear();
|
||||
ASSERT(totalIds == destIds.size()); // Sanity check the destIDs before we move keys
|
||||
doMoveKeys =
|
||||
self->txnProcessor->moveKeys(MoveKeysParams(rd.dataMoveId,
|
||||
rd.keys,
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False));
|
||||
std::unique_ptr<MoveKeysParams> params;
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
params = std::make_unique<MoveKeysParams>(rd.dataMoveId,
|
||||
std::vector<KeyRange>{ rd.keys },
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False);
|
||||
} else {
|
||||
params = std::make_unique<MoveKeysParams>(rd.dataMoveId,
|
||||
rd.keys,
|
||||
destIds,
|
||||
healthyIds,
|
||||
self->lock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
self->teamCollections.size() > 1,
|
||||
relocateShardInterval.pairID,
|
||||
ddEnabledState,
|
||||
CancelConflictingDataMoves::False);
|
||||
}
|
||||
doMoveKeys = self->txnProcessor->moveKeys(*params);
|
||||
} else {
|
||||
self->fetchKeysComplete.insert(rd);
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
|
|
|
@ -58,8 +58,12 @@ public:
|
|||
|
||||
struct MoveKeysParams {
|
||||
UID dataMoveId;
|
||||
|
||||
// Only one of `keys` and `ranges` can be set. `ranges` is created mainly for physical shard moves to move a full
|
||||
// physical shard with multiple key ranges.
|
||||
Optional<KeyRange> keys;
|
||||
Optional<std::vector<KeyRange>> ranges;
|
||||
|
||||
std::vector<UID> destinationTeam, healthyDestinations;
|
||||
MoveKeysLock lock;
|
||||
Promise<Void> dataMovementComplete;
|
||||
|
@ -80,7 +84,7 @@ struct MoveKeysParams {
|
|||
const Promise<Void>& dataMovementComplete,
|
||||
FlowLock* startMoveKeysParallelismLock,
|
||||
FlowLock* finishMoveKeysParallelismLock,
|
||||
bool hasRemove,
|
||||
bool hasRemote,
|
||||
UID relocationIntervalId,
|
||||
const DDEnabledState* ddEnabledState,
|
||||
CancelConflictingDataMoves cancelConflictingDataMoves)
|
||||
|
@ -99,7 +103,7 @@ struct MoveKeysParams {
|
|||
const Promise<Void>& dataMovementComplete,
|
||||
FlowLock* startMoveKeysParallelismLock,
|
||||
FlowLock* finishMoveKeysParallelismLock,
|
||||
bool hasRemove,
|
||||
bool hasRemote,
|
||||
UID relocationIntervalId,
|
||||
const DDEnabledState* ddEnabledState,
|
||||
CancelConflictingDataMoves cancelConflictingDataMoves)
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -215,19 +216,35 @@ struct DataLossRecoveryWorkload : TestWorkload {
|
|||
moveKeysLock.myOwner = owner;
|
||||
|
||||
TraceEvent("DataLossRecovery").detail("Phase", "StartMoveKeys");
|
||||
wait(moveKeys(cx,
|
||||
MoveKeysParams(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
dest,
|
||||
dest,
|
||||
moveKeysLock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
false,
|
||||
UID(), // for logging only
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True)));
|
||||
std::unique_ptr<MoveKeysParams> params;
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
params = std::make_unique<MoveKeysParams>(deterministicRandom()->randomUniqueID(),
|
||||
std::vector<KeyRange>{ keys },
|
||||
dest,
|
||||
dest,
|
||||
moveKeysLock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
false,
|
||||
UID(), // for logging only
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True);
|
||||
} else {
|
||||
params = std::make_unique<MoveKeysParams>(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
dest,
|
||||
dest,
|
||||
moveKeysLock,
|
||||
Promise<Void>(),
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
false,
|
||||
UID(), // for logging only
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True);
|
||||
}
|
||||
wait(moveKeys(cx, *params));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("DataLossRecovery").error(e).detail("Phase", "MoveRangeError");
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/VersionedMap.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
|
@ -343,18 +344,33 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
|
|||
KeyRange keys = self->getRandomKeys();
|
||||
std::vector<UID> destTeam = self->getRandomTeam();
|
||||
std::sort(destTeam.begin(), destTeam.end());
|
||||
return MoveKeysParams(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
destTeam,
|
||||
destTeam,
|
||||
lock,
|
||||
Promise<Void>(),
|
||||
nullptr,
|
||||
nullptr,
|
||||
false,
|
||||
UID(),
|
||||
self->ddContext.ddEnabledState.get(),
|
||||
CancelConflictingDataMoves::True);
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
return MoveKeysParams(deterministicRandom()->randomUniqueID(),
|
||||
std::vector<KeyRange>{ keys },
|
||||
destTeam,
|
||||
destTeam,
|
||||
lock,
|
||||
Promise<Void>(),
|
||||
nullptr,
|
||||
nullptr,
|
||||
false,
|
||||
UID(),
|
||||
self->ddContext.ddEnabledState.get(),
|
||||
CancelConflictingDataMoves::True);
|
||||
} else {
|
||||
return MoveKeysParams(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
destTeam,
|
||||
destTeam,
|
||||
lock,
|
||||
Promise<Void>(),
|
||||
nullptr,
|
||||
nullptr,
|
||||
false,
|
||||
UID(),
|
||||
self->ddContext.ddEnabledState.get(),
|
||||
CancelConflictingDataMoves::True);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testMoveKeys(IDDTxnProcessorApiWorkload* self) {
|
||||
|
|
|
@ -345,7 +345,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
|
|||
TraceEvent("TestMoveShardStartMoveKeys").detail("DataMove", dataMoveId);
|
||||
wait(moveKeys(cx,
|
||||
MoveKeysParams(dataMoveId,
|
||||
keys,
|
||||
std::vector<KeyRange>{ keys },
|
||||
dests,
|
||||
dests,
|
||||
moveKeysLock,
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
|
@ -155,19 +156,35 @@ struct MoveKeysWorkload : FailureInjectionWorkload {
|
|||
try {
|
||||
state Promise<Void> signal;
|
||||
state DDEnabledState ddEnabledState;
|
||||
wait(moveKeys(cx,
|
||||
MoveKeysParams(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
destinationTeamIDs,
|
||||
destinationTeamIDs,
|
||||
lock,
|
||||
signal,
|
||||
&fl1,
|
||||
&fl2,
|
||||
false,
|
||||
relocateShardInterval.pairID,
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True)));
|
||||
std::unique_ptr<MoveKeysParams> params;
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
params = std::make_unique<MoveKeysParams>(deterministicRandom()->randomUniqueID(),
|
||||
std::vector<KeyRange>{ keys },
|
||||
destinationTeamIDs,
|
||||
destinationTeamIDs,
|
||||
lock,
|
||||
signal,
|
||||
&fl1,
|
||||
&fl2,
|
||||
false,
|
||||
relocateShardInterval.pairID,
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True);
|
||||
} else {
|
||||
params = std::make_unique<MoveKeysParams>(deterministicRandom()->randomUniqueID(),
|
||||
keys,
|
||||
destinationTeamIDs,
|
||||
destinationTeamIDs,
|
||||
lock,
|
||||
signal,
|
||||
&fl1,
|
||||
&fl2,
|
||||
false,
|
||||
relocateShardInterval.pairID,
|
||||
&ddEnabledState,
|
||||
CancelConflictingDataMoves::True);
|
||||
}
|
||||
wait(moveKeys(cx, *params));
|
||||
TraceEvent(relocateShardInterval.end()).detail("Result", "Success");
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
|
Loading…
Reference in New Issue