diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 6ee72c396c..90fc288bf5 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -159,6 +159,7 @@ struct AddingShard : NonCopyable { struct StorageServer* server; Version transferredVersion; + Version fetchVersion; // To learn more details of the phase transitions, see function fetchKeys(). The phases below are sorted in // chronological order and do not go back. @@ -179,7 +180,7 @@ struct AddingShard : NonCopyable { // When fetchKeys "partially completes" (splits an adding shard in two), this is used to construct the left half AddingShard(AddingShard* prev, KeyRange const& keys) : keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), - phase(prev->phase) {} + fetchVersion(prev->fetchVersion), phase(prev->phase) {} ~AddingShard() { if (!fetchComplete.isSet()) fetchComplete.send(Void()); @@ -4548,6 +4549,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // Get the history state int debug_getRangeRetries = 0; state int debug_nextRetryToLog = 1; + state Error lastError(); // FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server // we must refresh the cache manually. @@ -4555,8 +4557,27 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { loop { state Transaction tr(data->cx); - fetchVersion = data->version.get(); - + // fetchVersion = data->version.get(); + // A quick fix: + // By default, we use data->version as the fetchVersion. + // In the case where dest SS falls far behind src SS, we use GRV as the fetchVersion instead of + // data->version, and then the dest SS waits for catching up the fetchVersion outside the + // fetchKeysParallelismLock. + // For example, consider dest SS falls far behind src SS. + // At iteration 0, dest SS selects its version as fetchVersion, + // but cannot read src SS and result in error_code_transaction_too_old. + // Due to error_code_transaction_too_old, dest SS starts iteration 1. + // At iteration 1, dest SS selects GRV as fetchVersion and (suppose) can read the data from src SS. + // Then dest SS waits its version catch up with this GRV version and write the data to disk. + // Note that dest SS waits outside the fetchKeysParallelismLock. + if (lastError.code() == error_code_transaction_too_old) { + Version grvVersion = wait(tr.getRawReadVersion()); + fetchVersion = std::max(grvVersion, data->version.get()); + } else { + fetchVersion = std::max(shard->fetchVersion, data->version.get()); + } + ASSERT(fetchVersion >= shard->fetchVersion); // at this point, shard->fetchVersion is the last fetchVersion + shard->fetchVersion = fetchVersion; TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID) .detail("FKID", interval.pairID) .detail("Version", fetchVersion); @@ -4633,6 +4654,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { e.code() != error_code_process_behind && e.code() != error_code_server_overloaded) { throw; } + lastError = e; if (nfk == keys.begin) { TraceEvent("FKBlockFail", data->thisServerID) .errorUnsuppressed(e) @@ -4705,10 +4727,11 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // we have written) state Future fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1); + state Future dataArrive = data->version.whenAtLeast(fetchVersion); wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion)); holdingFKPL.release(); - wait(fetchDurable); + wait(dataArrive && fetchDurable); TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID) .detail("FKID", interval.pairID) @@ -4726,7 +4749,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID); shard->phase = AddingShard::Waiting; - + ASSERT(data->version.get() >= fetchVersion); // Choose a transferredVersion. This choice and timing ensure that // * The transferredVersion can be mutated in versionedData // * The transferredVersion isn't yet committed to storage (so we can write the availability status change) @@ -4746,6 +4769,9 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("StorageVersion", data->storageVersion()); validate(data); + // the minimal version in updates must be larger than fetchVersion + ASSERT(shard->updates.empty() || shard->updates[0].version > fetchVersion); + // Put the updates that were collected during the FinalCommit phase into the batch at the transferredVersion. // Eager reads will be done for them by update(), and the mutations will come back through // AddingShard::addMutations and be applied to versionedMap and mutationLog as normal. The lie about their @@ -4835,11 +4861,15 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { }; AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys) - : keys(keys), server(server), transferredVersion(invalidVersion), phase(WaitPrevious) { + : keys(keys), server(server), transferredVersion(invalidVersion), fetchVersion(invalidVersion), phase(WaitPrevious) { fetchClient = fetchKeys(server, this); } void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const& mutation) { + if (version <= fetchVersion) { + return; + } + server->counters.logicalBytesMoveInOverhead += mutation.expectedSize(); if (mutation.type == mutation.ClearRange) { ASSERT(keys.begin <= mutation.param1 && mutation.param2 <= keys.end);