Add tenant support back to mapped range requests. Fix ACTOR warning.
This commit is contained in:
parent
e8077b65e1
commit
a04934465c
|
@ -3900,7 +3900,7 @@ int64_t inline getRangeResultFamilyBytes(MappedRangeResultRef result) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Client should add mapped keys to conflict ranges.
|
// TODO: Client should add mapped keys to conflict ranges.
|
||||||
ACTOR template <class RangeResultFamily> // RangeResult or MappedRangeResult
|
template <class RangeResultFamily> // RangeResult or MappedRangeResult
|
||||||
void getRangeFinished(Reference<TransactionState> trState,
|
void getRangeFinished(Reference<TransactionState> trState,
|
||||||
double startTime,
|
double startTime,
|
||||||
KeySelector begin,
|
KeySelector begin,
|
||||||
|
|
|
@ -2224,8 +2224,12 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
|
||||||
if (data->shards[key]->isReadable()) {
|
if (data->shards[key]->isReadable()) {
|
||||||
try {
|
try {
|
||||||
// TODO: Use a lower level API may be better? Or tweak priorities?
|
// TODO: Use a lower level API may be better? Or tweak priorities?
|
||||||
GetValueRequest req(
|
GetValueRequest req(pOriginalReq->spanContext,
|
||||||
pOriginalReq->spanContext, TenantInfo(), key, version, pOriginalReq->tags, pOriginalReq->debugID);
|
pOriginalReq->tenantInfo,
|
||||||
|
key,
|
||||||
|
version,
|
||||||
|
pOriginalReq->tags,
|
||||||
|
pOriginalReq->debugID);
|
||||||
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
|
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
|
||||||
// original request level, rather than individual underlying lookups. The reason is that throttle any
|
// original request level, rather than individual underlying lookups. The reason is that throttle any
|
||||||
// individual underlying lookup will fail the original request, which is not productive.
|
// individual underlying lookup will fail the original request, which is not productive.
|
||||||
|
@ -2245,7 +2249,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
|
||||||
|
|
||||||
++data->counters.quickGetValueMiss;
|
++data->counters.quickGetValueMiss;
|
||||||
if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) {
|
if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) {
|
||||||
state Transaction tr(data->cx);
|
state Transaction tr(data->cx, pOriginalReq->tenantInfo.name);
|
||||||
tr.setVersion(version);
|
tr.setVersion(version);
|
||||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||||
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||||
|
@ -2816,6 +2820,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
||||||
req.begin = getRange.begin;
|
req.begin = getRange.begin;
|
||||||
req.end = getRange.end;
|
req.end = getRange.end;
|
||||||
req.version = version;
|
req.version = version;
|
||||||
|
req.tenantInfo = pOriginalReq->tenantInfo;
|
||||||
// TODO: Validate when the underlying range query exceeds the limit.
|
// TODO: Validate when the underlying range query exceeds the limit.
|
||||||
// TODO: Use remainingLimit, remainingLimitBytes rather than separate knobs.
|
// TODO: Use remainingLimit, remainingLimitBytes rather than separate knobs.
|
||||||
req.limit = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT;
|
req.limit = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT;
|
||||||
|
@ -2843,7 +2848,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
||||||
|
|
||||||
++data->counters.quickGetKeyValuesMiss;
|
++data->counters.quickGetKeyValuesMiss;
|
||||||
if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) {
|
if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) {
|
||||||
state Transaction tr(data->cx);
|
state Transaction tr(data->cx, pOriginalReq->tenantInfo.name);
|
||||||
tr.setVersion(version);
|
tr.setVersion(version);
|
||||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||||
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||||
|
@ -2858,10 +2863,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Key constructMappedKey(KeyValueRef* keyValue,
|
Key constructMappedKey(KeyValueRef* keyValue, Tuple& mappedKeyFormatTuple, bool& isRangeQuery) {
|
||||||
Tuple& mappedKeyFormatTuple,
|
|
||||||
bool& isRangeQuery,
|
|
||||||
Optional<Key> tenantPrefix) {
|
|
||||||
// Lazily parse key and/or value to tuple because they may not need to be a tuple if not used.
|
// Lazily parse key and/or value to tuple because they may not need to be a tuple if not used.
|
||||||
Optional<Tuple> keyTuple;
|
Optional<Tuple> keyTuple;
|
||||||
Optional<Tuple> valueTuple;
|
Optional<Tuple> valueTuple;
|
||||||
|
@ -2951,13 +2953,7 @@ Key constructMappedKey(KeyValueRef* keyValue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
KeyRef mappedKey = mappedKeyTuple.pack();
|
return mappedKeyTuple.pack();
|
||||||
|
|
||||||
if (tenantPrefix.present()) {
|
|
||||||
return mappedKey.withPrefix(tenantPrefix.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
return mappedKey;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
|
@ -2973,7 +2969,7 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
.append("{...}"_sr);
|
.append("{...}"_sr);
|
||||||
|
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
|
|
||||||
Key expectedMappedKey = Tuple()
|
Key expectedMappedKey = Tuple()
|
||||||
.append("normal"_sr)
|
.append("normal"_sr)
|
||||||
|
@ -2985,33 +2981,11 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
ASSERT(mappedKey.compare(expectedMappedKey) == 0);
|
ASSERT(mappedKey.compare(expectedMappedKey) == 0);
|
||||||
ASSERT(isRangeQuery == true);
|
ASSERT(isRangeQuery == true);
|
||||||
}
|
}
|
||||||
{
|
|
||||||
Tuple mapperTuple = Tuple()
|
|
||||||
.append("normal"_sr)
|
|
||||||
.append("{{escaped}}"_sr)
|
|
||||||
.append("{K[2]}"_sr)
|
|
||||||
.append("{V[0]}"_sr)
|
|
||||||
.append("{...}"_sr);
|
|
||||||
|
|
||||||
bool isRangeQuery = false;
|
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, TenantMapEntry(1, "foo"_sr).prefix);
|
|
||||||
|
|
||||||
Key expectedMappedKey = Tuple()
|
|
||||||
.append("normal"_sr)
|
|
||||||
.append("{escaped}"_sr)
|
|
||||||
.append("key-2"_sr)
|
|
||||||
.append("value-0"_sr)
|
|
||||||
.getDataAsStandalone()
|
|
||||||
.withPrefix("foo\x00\x00\x00\x00\x00\x00\x00\x01"_sr);
|
|
||||||
// std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl;
|
|
||||||
ASSERT(mappedKey.compare(expectedMappedKey) == 0);
|
|
||||||
ASSERT(isRangeQuery == true);
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
||||||
|
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
|
|
||||||
Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
||||||
// std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl;
|
// std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl;
|
||||||
|
@ -3022,7 +2996,7 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
||||||
|
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
|
|
||||||
Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
||||||
// std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl;
|
// std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl;
|
||||||
|
@ -3034,7 +3008,7 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
state bool throwException = false;
|
state bool throwException = false;
|
||||||
try {
|
try {
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
ASSERT(e.code() == error_code_mapper_bad_index);
|
ASSERT(e.code() == error_code_mapper_bad_index);
|
||||||
throwException = true;
|
throwException = true;
|
||||||
|
@ -3046,7 +3020,7 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
state bool throwException2 = false;
|
state bool throwException2 = false;
|
||||||
try {
|
try {
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
ASSERT(e.code() == error_code_mapper_bad_range_decriptor);
|
ASSERT(e.code() == error_code_mapper_bad_range_decriptor);
|
||||||
throwException2 = true;
|
throwException2 = true;
|
||||||
|
@ -3058,7 +3032,7 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
|
||||||
bool isRangeQuery = false;
|
bool isRangeQuery = false;
|
||||||
state bool throwException3 = false;
|
state bool throwException3 = false;
|
||||||
try {
|
try {
|
||||||
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery, Optional<Key>());
|
Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
ASSERT(e.code() == error_code_mapper_bad_index);
|
ASSERT(e.code() == error_code_mapper_bad_index);
|
||||||
throwException3 = true;
|
throwException3 = true;
|
||||||
|
@ -3090,7 +3064,7 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
||||||
kvm.value = it->value;
|
kvm.value = it->value;
|
||||||
|
|
||||||
state bool isRangeQuery = false;
|
state bool isRangeQuery = false;
|
||||||
state Key mappedKey = constructMappedKey(it, mappedKeyFormatTuple, isRangeQuery, tenantPrefix);
|
state Key mappedKey = constructMappedKey(it, mappedKeyFormatTuple, isRangeQuery);
|
||||||
// Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously.
|
// Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously.
|
||||||
result.arena.dependsOn(mappedKey.arena());
|
result.arena.dependsOn(mappedKey.arena());
|
||||||
|
|
||||||
|
@ -3101,12 +3075,10 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
||||||
// Use the mappedKey as the prefix of the range query.
|
// Use the mappedKey as the prefix of the range query.
|
||||||
GetRangeReqAndResultRef getRange =
|
GetRangeReqAndResultRef getRange =
|
||||||
wait(quickGetKeyValues(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
wait(quickGetKeyValues(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
||||||
// TODO: Remove tenant prefixes in the keys if they haven't been removed?
|
|
||||||
kvm.reqAndResult = getRange;
|
kvm.reqAndResult = getRange;
|
||||||
} else {
|
} else {
|
||||||
GetValueReqAndResultRef getValue =
|
GetValueReqAndResultRef getValue =
|
||||||
wait(quickGetValue(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
wait(quickGetValue(data, mappedKey, input.version, &(result.arena), pOriginalReq));
|
||||||
// TODO: Remove tenant prefixes in the keys if they haven't been removed?
|
|
||||||
kvm.reqAndResult = getValue;
|
kvm.reqAndResult = getValue;
|
||||||
}
|
}
|
||||||
result.data.push_back(result.arena, kvm);
|
result.data.push_back(result.arena, kvm);
|
||||||
|
|
Loading…
Reference in New Issue