Merge pull request #7502 from jzhou77/main
Add pipelining for secondary queries in index prefetch
This commit is contained in:
commit
d60cab788e
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "fdbclient/ServerKnobs.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#define init(...) KNOB_FN(__VA_ARGS__, INIT_ATOMIC_KNOB, INIT_KNOB)(__VA_ARGS__)
|
||||
|
||||
|
@ -733,6 +734,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 );
|
||||
init( QUICK_GET_VALUE_FALLBACK, true );
|
||||
init( QUICK_GET_KEY_VALUES_FALLBACK, true );
|
||||
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
||||
|
||||
|
|
|
@ -689,6 +689,7 @@ public:
|
|||
bool ENABLE_CLEAR_RANGE_EAGER_READS;
|
||||
bool QUICK_GET_VALUE_FALLBACK;
|
||||
bool QUICK_GET_KEY_VALUES_FALLBACK;
|
||||
int MAX_PARALLEL_QUICK_GET_VALUE;
|
||||
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||
|
|
|
@ -114,7 +114,7 @@ bool canReplyWith(Error e) {
|
|||
return true;
|
||||
default:
|
||||
return false;
|
||||
};
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
@ -1675,7 +1675,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
}
|
||||
|
||||
return Void();
|
||||
};
|
||||
}
|
||||
|
||||
// Pessimistic estimate the number of overhead bytes used by each
|
||||
// watch. Watch key references are stored in an AsyncMap<Key,bool>, and actors
|
||||
|
@ -2937,7 +2937,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
|
|||
} else {
|
||||
throw quick_get_value_miss();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
|
||||
// readRange has O(|result|) + O(log |data|) cost
|
||||
|
@ -3551,7 +3551,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
|||
} else {
|
||||
throw quick_get_key_values_miss();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void unpackKeyTuple(Tuple** referenceTuple, Optional<Tuple>& keyTuple, KeyValueRef* keyValue) {
|
||||
if (!keyTuple.present()) {
|
||||
|
@ -3800,6 +3800,36 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Issues a secondary query (either range and point read) and fills results into "kvm".
|
||||
ACTOR Future<Void> mapSubquery(StorageServer* data,
|
||||
Version version,
|
||||
GetMappedKeyValuesRequest* pOriginalReq,
|
||||
Arena* pArena,
|
||||
int matchIndex,
|
||||
bool isRangeQuery,
|
||||
bool isBoundary,
|
||||
KeyValueRef* it,
|
||||
MappedKeyValueRef* kvm,
|
||||
Key mappedKey) {
|
||||
if (isRangeQuery) {
|
||||
// Use the mappedKey as the prefix of the range query.
|
||||
GetRangeReqAndResultRef getRange = wait(quickGetKeyValues(data, mappedKey, version, pArena, pOriginalReq));
|
||||
if ((!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY) ||
|
||||
(getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY)) {
|
||||
kvm->key = it->key;
|
||||
kvm->value = it->value;
|
||||
}
|
||||
|
||||
kvm->boundaryAndExist = isBoundary && !getRange.result.empty();
|
||||
kvm->reqAndResult = getRange;
|
||||
} else {
|
||||
GetValueReqAndResultRef getValue = wait(quickGetValue(data, mappedKey, version, pArena, pOriginalReq));
|
||||
kvm->reqAndResult = getValue;
|
||||
kvm->boundaryAndExist = isBoundary && getValue.result.present();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
||||
GetKeyValuesReply input,
|
||||
StringRef mapper,
|
||||
|
@ -3829,43 +3859,49 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
preprocessMappedKey(mappedKeyFormatTuple, vt, isRangeQuery);
|
||||
|
||||
state int sz = input.data.size();
|
||||
state int i = 0;
|
||||
for (; i < sz; i++) {
|
||||
state KeyValueRef* it = &input.data[i];
|
||||
state MappedKeyValueRef kvm;
|
||||
state bool isBoundary = i == 0 || i == sz - 1;
|
||||
// need to keep the boundary, so that caller can use it as a continuation.
|
||||
if (isBoundary || matchIndex == MATCH_INDEX_ALL) {
|
||||
kvm.key = it->key;
|
||||
kvm.value = it->value;
|
||||
}
|
||||
|
||||
state Key mappedKey = constructMappedKey(it, vt, mappedKeyTuple, mappedKeyFormatTuple);
|
||||
// Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously.
|
||||
result.arena.dependsOn(mappedKey.arena());
|
||||
|
||||
// std::cout << "key:" << printable(kvm.key) << ", value:" << printable(kvm.value)
|
||||
// << ", mappedKey:" << printable(mappedKey) << std::endl;
|
||||
|
||||
if (isRangeQuery) {
|
||||
// Use the mappedKey as the prefix of the range query.
|
||||
GetRangeReqAndResultRef getRange =
|
||||
wait(quickGetKeyValues(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
||||
if ((!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY) ||
|
||||
(getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY)) {
|
||||
kvm.key = it->key;
|
||||
kvm.value = it->value;
|
||||
const int k = std::min(sz, SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE);
|
||||
state std::vector<MappedKeyValueRef> kvms(k);
|
||||
state std::vector<Future<Void>> subqueries;
|
||||
state int offset = 0;
|
||||
for (; offset < sz; offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) {
|
||||
// Divide into batches of MAX_PARALLEL_QUICK_GET_VALUE subqueries
|
||||
for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) {
|
||||
KeyValueRef* it = &input.data[i + offset];
|
||||
MappedKeyValueRef* kvm = &kvms[i];
|
||||
bool isBoundary = (i + offset) == 0 || (i + offset) == sz - 1;
|
||||
// need to keep the boundary, so that caller can use it as a continuation.
|
||||
if (isBoundary || matchIndex == MATCH_INDEX_ALL) {
|
||||
kvm->key = it->key;
|
||||
kvm->value = it->value;
|
||||
} else {
|
||||
// Clear key value to the default.
|
||||
kvm->key = ""_sr;
|
||||
kvm->value = ""_sr;
|
||||
}
|
||||
|
||||
kvm.boundaryAndExist = isBoundary && !getRange.result.empty();
|
||||
kvm.reqAndResult = getRange;
|
||||
} else {
|
||||
GetValueReqAndResultRef getValue =
|
||||
wait(quickGetValue(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
||||
kvm.reqAndResult = getValue;
|
||||
kvm.boundaryAndExist = isBoundary && getValue.result.present();
|
||||
Key mappedKey = constructMappedKey(it, vt, mappedKeyTuple, mappedKeyFormatTuple);
|
||||
// Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously.
|
||||
result.arena.dependsOn(mappedKey.arena());
|
||||
|
||||
// std::cout << "key:" << printable(kvm->key) << ", value:" << printable(kvm->value)
|
||||
// << ", mappedKey:" << printable(mappedKey) << std::endl;
|
||||
|
||||
subqueries.push_back(mapSubquery(data,
|
||||
input.version,
|
||||
pOriginalReq,
|
||||
&result.arena,
|
||||
matchIndex,
|
||||
isRangeQuery,
|
||||
isBoundary,
|
||||
it,
|
||||
kvm,
|
||||
mappedKey));
|
||||
}
|
||||
wait(waitForAll(subqueries));
|
||||
subqueries.clear();
|
||||
for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) {
|
||||
result.data.push_back(result.arena, kvms[i]);
|
||||
}
|
||||
result.data.push_back(result.arena, kvm);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -6225,7 +6261,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
}
|
||||
|
||||
return Void();
|
||||
};
|
||||
}
|
||||
|
||||
AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys)
|
||||
: keys(keys), server(server), transferredVersion(invalidVersion), fetchVersion(invalidVersion), phase(WaitPrevious) {
|
||||
|
|
Loading…
Reference in New Issue