diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 58e1c29247..c18984f058 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -20,6 +20,7 @@ #include "fdbclient/ServerKnobs.h" #include "flow/IRandom.h" +#include "flow/flow.h" #define init(...) KNOB_FN(__VA_ARGS__, INIT_ATOMIC_KNOB, INIT_KNOB)(__VA_ARGS__) @@ -733,6 +734,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 ); init( QUICK_GET_VALUE_FALLBACK, true ); init( QUICK_GET_KEY_VALUES_FALLBACK, true ); + init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100); init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 329fc55e08..c6c926ab7a 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -689,6 +689,7 @@ public: bool ENABLE_CLEAR_RANGE_EAGER_READS; bool QUICK_GET_VALUE_FALLBACK; bool QUICK_GET_KEY_VALUES_FALLBACK; + int MAX_PARALLEL_QUICK_GET_VALUE; int CHECKPOINT_TRANSFER_BLOCK_BYTES; int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c5b4ab653a..3368c8097c 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -114,7 +114,7 @@ bool canReplyWith(Error e) { return true; default: return false; - }; + } } } // namespace @@ -1675,7 +1675,7 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { } return Void(); -}; +} // Pessimistic estimate the number of overhead bytes used by each // watch. Watch key references are stored in an AsyncMap, and actors @@ -2937,7 +2937,7 @@ ACTOR Future quickGetValue(StorageServer* data, } else { throw quick_get_value_miss(); } -}; +} // If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending). // readRange has O(|result|) + O(log |data|) cost @@ -3551,7 +3551,7 @@ ACTOR Future quickGetKeyValues( } else { throw quick_get_key_values_miss(); } -}; +} void unpackKeyTuple(Tuple** referenceTuple, Optional& keyTuple, KeyValueRef* keyValue) { if (!keyTuple.present()) { @@ -3800,6 +3800,36 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") { return Void(); } +// Issues a secondary query (either range and point read) and fills results into "kvm". +ACTOR Future mapSubquery(StorageServer* data, + Version version, + GetMappedKeyValuesRequest* pOriginalReq, + Arena* pArena, + int matchIndex, + bool isRangeQuery, + bool isBoundary, + KeyValueRef* it, + MappedKeyValueRef* kvm, + Key mappedKey) { + if (isRangeQuery) { + // Use the mappedKey as the prefix of the range query. + GetRangeReqAndResultRef getRange = wait(quickGetKeyValues(data, mappedKey, version, pArena, pOriginalReq)); + if ((!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY) || + (getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY)) { + kvm->key = it->key; + kvm->value = it->value; + } + + kvm->boundaryAndExist = isBoundary && !getRange.result.empty(); + kvm->reqAndResult = getRange; + } else { + GetValueReqAndResultRef getValue = wait(quickGetValue(data, mappedKey, version, pArena, pOriginalReq)); + kvm->reqAndResult = getValue; + kvm->boundaryAndExist = isBoundary && getValue.result.present(); + } + return Void(); +} + ACTOR Future mapKeyValues(StorageServer* data, GetKeyValuesReply input, StringRef mapper, @@ -3829,43 +3859,49 @@ ACTOR Future mapKeyValues(StorageServer* data, preprocessMappedKey(mappedKeyFormatTuple, vt, isRangeQuery); state int sz = input.data.size(); - state int i = 0; - for (; i < sz; i++) { - state KeyValueRef* it = &input.data[i]; - state MappedKeyValueRef kvm; - state bool isBoundary = i == 0 || i == sz - 1; - // need to keep the boundary, so that caller can use it as a continuation. - if (isBoundary || matchIndex == MATCH_INDEX_ALL) { - kvm.key = it->key; - kvm.value = it->value; - } - - state Key mappedKey = constructMappedKey(it, vt, mappedKeyTuple, mappedKeyFormatTuple); - // Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously. - result.arena.dependsOn(mappedKey.arena()); - - // std::cout << "key:" << printable(kvm.key) << ", value:" << printable(kvm.value) - // << ", mappedKey:" << printable(mappedKey) << std::endl; - - if (isRangeQuery) { - // Use the mappedKey as the prefix of the range query. - GetRangeReqAndResultRef getRange = - wait(quickGetKeyValues(data, mappedKey, input.version, &(result.arena), pOriginalReq)); - if ((!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY) || - (getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY)) { - kvm.key = it->key; - kvm.value = it->value; + const int k = std::min(sz, SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE); + state std::vector kvms(k); + state std::vector> subqueries; + state int offset = 0; + for (; offset < sz; offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) { + // Divide into batches of MAX_PARALLEL_QUICK_GET_VALUE subqueries + for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) { + KeyValueRef* it = &input.data[i + offset]; + MappedKeyValueRef* kvm = &kvms[i]; + bool isBoundary = (i + offset) == 0 || (i + offset) == sz - 1; + // need to keep the boundary, so that caller can use it as a continuation. + if (isBoundary || matchIndex == MATCH_INDEX_ALL) { + kvm->key = it->key; + kvm->value = it->value; + } else { + // Clear key value to the default. + kvm->key = ""_sr; + kvm->value = ""_sr; } - kvm.boundaryAndExist = isBoundary && !getRange.result.empty(); - kvm.reqAndResult = getRange; - } else { - GetValueReqAndResultRef getValue = - wait(quickGetValue(data, mappedKey, input.version, &(result.arena), pOriginalReq)); - kvm.reqAndResult = getValue; - kvm.boundaryAndExist = isBoundary && getValue.result.present(); + Key mappedKey = constructMappedKey(it, vt, mappedKeyTuple, mappedKeyFormatTuple); + // Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously. + result.arena.dependsOn(mappedKey.arena()); + + // std::cout << "key:" << printable(kvm->key) << ", value:" << printable(kvm->value) + // << ", mappedKey:" << printable(mappedKey) << std::endl; + + subqueries.push_back(mapSubquery(data, + input.version, + pOriginalReq, + &result.arena, + matchIndex, + isRangeQuery, + isBoundary, + it, + kvm, + mappedKey)); + } + wait(waitForAll(subqueries)); + subqueries.clear(); + for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) { + result.data.push_back(result.arena, kvms[i]); } - result.data.push_back(result.arena, kvm); } return result; } @@ -6225,7 +6261,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } return Void(); -}; +} AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys) : keys(keys), server(server), transferredVersion(invalidVersion), fetchVersion(invalidVersion), phase(WaitPrevious) {