Optimization: support removing index conditionally (#7116)
This commit is contained in:
parent
85092ab53c
commit
853e6a346b
|
@ -655,6 +655,7 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_mapped_range(FDBTransaction*
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse);
|
||||
|
@ -667,6 +668,7 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_mapped_range(FDBTransaction*
|
|||
KeySelectorRef(KeyRef(end_key_name, end_key_name_length), end_or_equal, end_offset),
|
||||
StringRef(mapper_name, mapper_name_length),
|
||||
GetRangeLimits(limit, target_bytes),
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse)
|
||||
.extractPtr());
|
||||
|
|
|
@ -384,6 +384,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_mapped_range(FDBTran
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
|
|
|
@ -271,6 +271,7 @@ MappedKeyValueArrayFuture Transaction::get_mapped_range(const uint8_t* begin_key
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
return MappedKeyValueArrayFuture(fdb_transaction_get_mapped_range(tr_,
|
||||
|
@ -288,6 +289,7 @@ MappedKeyValueArrayFuture Transaction::get_mapped_range(const uint8_t* begin_key
|
|||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse));
|
||||
}
|
||||
|
|
|
@ -304,6 +304,7 @@ public:
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
|
|
|
@ -261,6 +261,7 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
fdb::MappedKeyValueArrayFuture f1 = tr.get_mapped_range(begin_key_name,
|
||||
|
@ -277,6 +278,7 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
|
|||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
|
||||
|
@ -951,7 +953,11 @@ std::map<std::string, std::string> fillInRecords(int n) {
|
|||
return data;
|
||||
}
|
||||
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId, int endId, fdb::Transaction& tr, std::string mapper) {
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId,
|
||||
int endId,
|
||||
fdb::Transaction& tr,
|
||||
std::string mapper,
|
||||
int matchIndex = MATCH_INDEX_ALL) {
|
||||
std::string indexEntryKeyBegin = indexEntryKey(beginId);
|
||||
std::string indexEntryKeyEnd = indexEntryKey(endId);
|
||||
|
||||
|
@ -965,13 +971,17 @@ GetMappedRangeResult getMappedIndexEntries(int beginId, int endId, fdb::Transact
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ matchIndex,
|
||||
/* snapshot */ false,
|
||||
/* reverse */ 0);
|
||||
}
|
||||
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId, int endId, fdb::Transaction& tr) {
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId,
|
||||
int endId,
|
||||
fdb::Transaction& tr,
|
||||
int matchIndex = MATCH_INDEX_ALL) {
|
||||
std::string mapper = Tuple().append(prefix).append(RECORD).append("{K[3]}"_sr).append("{...}"_sr).pack().toString();
|
||||
return getMappedIndexEntries(beginId, endId, tr, mapper);
|
||||
return getMappedIndexEntries(beginId, endId, tr, mapper, matchIndex);
|
||||
}
|
||||
|
||||
TEST_CASE("fdb_transaction_get_mapped_range") {
|
||||
|
@ -983,7 +993,8 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
|
|||
while (1) {
|
||||
int beginId = 1;
|
||||
int endId = 19;
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr);
|
||||
const int matchIndex = deterministicRandom()->random01() > 0.5 ? MATCH_INDEX_NONE : MATCH_INDEX_ALL;
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr, matchIndex);
|
||||
|
||||
if (result.err) {
|
||||
fdb::EmptyFuture f1 = tr.on_error(result.err);
|
||||
|
@ -998,7 +1009,11 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
|
|||
int id = beginId;
|
||||
for (int i = 0; i < expectSize; i++, id++) {
|
||||
const auto& [key, value, begin, end, range_results] = result.mkvs[i];
|
||||
CHECK(indexEntryKey(id).compare(key) == 0);
|
||||
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
|
||||
CHECK(indexEntryKey(id).compare(key) == 0);
|
||||
} else {
|
||||
CHECK(EMPTY.compare(key) == 0);
|
||||
}
|
||||
CHECK(EMPTY.compare(value) == 0);
|
||||
CHECK(range_results.size() == SPLIT_SIZE);
|
||||
for (int split = 0; split < SPLIT_SIZE; split++) {
|
||||
|
@ -1024,6 +1039,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_restricted_to_serializable") {
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ MATCH_INDEX_ALL,
|
||||
/* snapshot */ true, // Set snapshot to true
|
||||
/* reverse */ 0);
|
||||
ASSERT(result.err == error_code_unsupported_operation);
|
||||
|
@ -1043,6 +1059,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_restricted_to_ryw_enable") {
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ MATCH_INDEX_ALL,
|
||||
/* snapshot */ false,
|
||||
/* reverse */ 0);
|
||||
ASSERT(result.err == error_code_unsupported_operation);
|
||||
|
|
|
@ -960,6 +960,7 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
jint targetBytes,
|
||||
jint streamingMode,
|
||||
jint iteration,
|
||||
jint matchIndex,
|
||||
jboolean snapshot,
|
||||
jboolean reverse) {
|
||||
if (!tPtr || !keyBeginBytes || !keyEndBytes || !mapperBytes) {
|
||||
|
@ -1007,6 +1008,7 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
targetBytes,
|
||||
(FDBStreamingMode)streamingMode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
|
||||
|
|
|
@ -192,12 +192,12 @@ class MappedRangeQueryIntegrationTest {
|
|||
|
||||
RangeQueryWithIndex mappedRangeQuery = (int begin, int end, Database db) -> db.run(tr -> {
|
||||
try {
|
||||
List<MappedKeyValue> kvs =
|
||||
tr.getMappedRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)), MAPPER,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
List<MappedKeyValue> kvs = tr.getMappedRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)), MAPPER,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED,
|
||||
FDBTransaction.MATCH_INDEX_ALL, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
Assertions.assertEquals(end - begin, kvs.size());
|
||||
|
||||
if (validate) {
|
||||
|
|
|
@ -32,6 +32,10 @@ import com.apple.foundationdb.async.AsyncUtil;
|
|||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
|
||||
class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionConsumer {
|
||||
|
||||
static public final int MATCH_INDEX_ALL = 0;
|
||||
static public final int MATCH_INDEX_NONE = 1;
|
||||
|
||||
private final Database database;
|
||||
private final Executor executor;
|
||||
private final TransactionOptions options;
|
||||
|
@ -93,7 +97,8 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
|
||||
@Override
|
||||
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper,
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
int limit, int matchIndex, boolean reverse,
|
||||
StreamingMode mode) {
|
||||
|
||||
throw new UnsupportedOperationException("getMappedRange is only supported in serializable");
|
||||
}
|
||||
|
@ -346,8 +351,8 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
}
|
||||
|
||||
@Override
|
||||
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper,
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit,
|
||||
int matchIndex, boolean reverse, StreamingMode mode) {
|
||||
if (mapper == null) {
|
||||
throw new IllegalArgumentException("Mapper must be non-null");
|
||||
}
|
||||
|
@ -467,9 +472,9 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
|
||||
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
|
||||
return new FutureMappedResults(
|
||||
Transaction_getMappedRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), mapper, rowLimit,
|
||||
targetBytes, streamingMode, iteration, isSnapshot, reverse),
|
||||
Transaction_getMappedRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), end.getKey(),
|
||||
end.orEqual(), end.getOffset(), mapper, rowLimit, targetBytes, streamingMode,
|
||||
iteration, MATCH_INDEX_ALL, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
|
@ -809,12 +814,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native long Transaction_getMappedRange(long cPtr, byte[] keyBegin, boolean orEqualBegin,
|
||||
int offsetBegin, byte[] keyEnd, boolean orEqualEnd,
|
||||
int offsetEnd,
|
||||
byte[] mapper, // Nonnull
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native long Transaction_getMappedRange(long cPtr, byte[] keyBegin, boolean orEqualBegin, int offsetBegin,
|
||||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
byte[] mapper, // Nonnull
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
int matchIndex, boolean isSnapshot, boolean reverse);
|
||||
private native void Transaction_addConflictRange(long cPtr,
|
||||
byte[] keyBegin, byte[] keyEnd, int conflictRangeType);
|
||||
private native void Transaction_set(long cPtr, byte[] key, byte[] value);
|
||||
|
|
|
@ -460,7 +460,7 @@ public interface ReadTransaction extends ReadTransactionContext {
|
|||
* @return a handle to access the results of the asynchronous call
|
||||
*/
|
||||
AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit,
|
||||
boolean reverse, StreamingMode mode);
|
||||
int matchIndex, boolean reverse, StreamingMode mode);
|
||||
|
||||
/**
|
||||
* Gets an estimate for the number of bytes stored in the given range.
|
||||
|
|
|
@ -148,6 +148,9 @@ static const Tag invalidTag{ tagLocalitySpecial, 0 };
|
|||
static const Tag txsTag{ tagLocalitySpecial, 1 };
|
||||
static const Tag cacheTag{ tagLocalitySpecial, 2 };
|
||||
|
||||
const int MATCH_INDEX_ALL = 0;
|
||||
const int MATCH_INDEX_NONE = 1;
|
||||
|
||||
enum { txsTagOld = -1, invalidTagOld = -100 };
|
||||
|
||||
struct TagsAndMessage {
|
||||
|
|
|
@ -68,6 +68,7 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) = 0;
|
||||
virtual ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) = 0;
|
||||
|
|
|
@ -74,6 +74,7 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) = 0;
|
||||
virtual Future<Standalone<VectorRef<const char*>>> getAddressesForKey(Key const& key) = 0;
|
||||
|
|
|
@ -158,6 +158,7 @@ ThreadFuture<MappedRangeResult> DLTransaction::getMappedRange(const KeySelectorR
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
FdbCApi::FDBFuture* f = api->transactionGetMappedRange(tr,
|
||||
|
@ -175,6 +176,7 @@ ThreadFuture<MappedRangeResult> DLTransaction::getMappedRange(const KeySelectorR
|
|||
limits.bytes,
|
||||
FDB_STREAMING_MODE_EXACT,
|
||||
0,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
return toThreadFuture<MappedRangeResult>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
|
@ -971,10 +973,11 @@ ThreadFuture<MappedRangeResult> MultiVersionTransaction::getMappedRange(const Ke
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
auto tr = getTransaction();
|
||||
auto f = tr.transaction ? tr.transaction->getMappedRange(begin, end, mapper, limits, snapshot, reverse)
|
||||
auto f = tr.transaction ? tr.transaction->getMappedRange(begin, end, mapper, limits, matchIndex, snapshot, reverse)
|
||||
: makeTimeout<MappedRangeResult>();
|
||||
return abortableFuture(f, tr.onChange);
|
||||
}
|
||||
|
|
|
@ -218,6 +218,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int targetBytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
FDBFuture* (*transactionGetVersionstamp)(FDBTransaction* tr);
|
||||
|
@ -349,6 +350,7 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
@ -537,6 +539,7 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
|
|
@ -3791,12 +3791,24 @@ PublicRequestStream<GetKeyValuesFamilyRequest> StorageServerInterface::*getRange
|
|||
}
|
||||
}
|
||||
|
||||
template <class GetKeyValuesFamilyRequest>
|
||||
void setMatchIndex(GetKeyValuesFamilyRequest& req, int matchIndex) {
|
||||
if constexpr (std::is_same<GetKeyValuesFamilyRequest, GetKeyValuesRequest>::value) {
|
||||
// do nothing;
|
||||
} else if (std::is_same<GetKeyValuesFamilyRequest, GetMappedKeyValuesRequest>::value) {
|
||||
req.matchIndex = matchIndex;
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class GetKeyValuesFamilyRequest, class GetKeyValuesFamilyReply, class RangeResultFamily>
|
||||
Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
|
||||
Version version,
|
||||
KeyRange keys,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant) {
|
||||
state RangeResultFamily output;
|
||||
|
@ -3830,6 +3842,7 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
|
|||
req.version = version;
|
||||
req.begin = firstGreaterOrEqual(range.begin);
|
||||
req.end = firstGreaterOrEqual(range.end);
|
||||
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
|
||||
req.spanContext = span.context;
|
||||
trState->cx->getLatestCommitVersions(
|
||||
locations[shard].locations, req.version, trState, req.ssLatestCommitVersions);
|
||||
|
@ -4004,6 +4017,7 @@ Future<RangeResultFamily> getRangeFallback(Reference<TransactionState> trState,
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant) {
|
||||
if (version == latestVersion) {
|
||||
|
@ -4029,7 +4043,7 @@ Future<RangeResultFamily> getRangeFallback(Reference<TransactionState> trState,
|
|||
// or allKeys.begin exists in the database/tenant and will be part of the conflict range anyways
|
||||
|
||||
RangeResultFamily _r = wait(getExactRange<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState, version, KeyRangeRef(b, e), mapper, limits, reverse, useTenant));
|
||||
trState, version, KeyRangeRef(b, e), mapper, limits, matchIndex, reverse, useTenant));
|
||||
RangeResultFamily r = _r;
|
||||
|
||||
if (b == allKeys.begin && ((reverse && !r.more) || !reverse))
|
||||
|
@ -4153,6 +4167,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
Promise<std::pair<Key, Key>> conflictRange,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant = UseTenant::True) {
|
||||
|
@ -4205,7 +4220,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
state GetKeyValuesFamilyRequest req;
|
||||
req.mapper = mapper;
|
||||
req.arena.dependsOn(mapper.arena());
|
||||
|
||||
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
|
||||
req.tenantInfo = useTenant ? trState->getTenantInfo() : TenantInfo();
|
||||
req.isFetchKeys = (trState->taskID == TaskPriority::FetchKeys);
|
||||
req.version = readVersion;
|
||||
|
@ -4385,6 +4400,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
originalEnd,
|
||||
mapper,
|
||||
originalLimits,
|
||||
matchIndex,
|
||||
reverse,
|
||||
useTenant));
|
||||
getRangeFinished(
|
||||
|
@ -4425,6 +4441,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
originalEnd,
|
||||
mapper,
|
||||
originalLimits,
|
||||
matchIndex,
|
||||
reverse,
|
||||
useTenant));
|
||||
getRangeFinished(
|
||||
|
@ -5010,6 +5027,7 @@ Future<RangeResult> getRange(Reference<TransactionState> const& trState,
|
|||
""_sr,
|
||||
limits,
|
||||
Promise<std::pair<Key, Key>>(),
|
||||
MATCH_INDEX_ALL,
|
||||
Snapshot::True,
|
||||
reverse,
|
||||
useTenant);
|
||||
|
@ -5364,6 +5382,7 @@ Future<RangeResultFamily> Transaction::getRangeInternal(const KeySelector& begin
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
++trState->cx->transactionLogicalReads;
|
||||
|
@ -5406,7 +5425,7 @@ Future<RangeResultFamily> Transaction::getRangeInternal(const KeySelector& begin
|
|||
}
|
||||
|
||||
return ::getRange<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState, getReadVersion(), b, e, mapper, limits, conflictRange, snapshot, reverse);
|
||||
trState, getReadVersion(), b, e, mapper, limits, conflictRange, matchIndex, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
|
@ -5415,17 +5434,18 @@ Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
|||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
return getRangeInternal<GetKeyValuesRequest, GetKeyValuesReply, RangeResult>(
|
||||
begin, end, ""_sr, limits, snapshot, reverse);
|
||||
begin, end, ""_sr, limits, MATCH_INDEX_ALL, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<MappedRangeResult> Transaction::getMappedRange(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
return getRangeInternal<GetMappedKeyValuesRequest, GetMappedKeyValuesReply, MappedRangeResult>(
|
||||
begin, end, mapper, limits, snapshot, reverse);
|
||||
begin, end, mapper, limits, matchIndex, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
|
|
|
@ -329,6 +329,7 @@ public:
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False);
|
||||
|
||||
|
@ -338,6 +339,7 @@ private:
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse);
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
|
|
|
@ -77,11 +77,12 @@ public:
|
|||
|
||||
template <bool reverse>
|
||||
struct GetMappedRangeReq {
|
||||
GetMappedRangeReq(KeySelector begin, KeySelector end, Key mapper, GetRangeLimits limits)
|
||||
: begin(begin), end(end), mapper(mapper), limits(limits) {}
|
||||
GetMappedRangeReq(KeySelector begin, KeySelector end, Key mapper, int matchIndex, GetRangeLimits limits)
|
||||
: begin(begin), end(end), mapper(mapper), limits(limits), matchIndex(matchIndex) {}
|
||||
KeySelector begin, end;
|
||||
Key mapper;
|
||||
GetRangeLimits limits;
|
||||
int matchIndex;
|
||||
using Result = MappedRangeResult;
|
||||
};
|
||||
|
||||
|
@ -1140,9 +1141,13 @@ public:
|
|||
else
|
||||
read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
|
||||
}
|
||||
|
||||
MappedRangeResult v = wait(ryw->tr.getMappedRange(
|
||||
read.begin, read.end, read.mapper, read.limits, snapshot, backwards ? Reverse::True : Reverse::False));
|
||||
MappedRangeResult v = wait(ryw->tr.getMappedRange(read.begin,
|
||||
read.end,
|
||||
read.mapper,
|
||||
read.limits,
|
||||
read.matchIndex,
|
||||
snapshot,
|
||||
backwards ? Reverse::True : Reverse::False));
|
||||
return v;
|
||||
}
|
||||
|
||||
|
@ -1677,6 +1682,7 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
if (getDatabase()->apiVersionAtLeast(630)) {
|
||||
|
@ -1724,9 +1730,9 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
|
||||
Future<MappedRangeResult> result =
|
||||
reverse ? RYWImpl::readWithConflictRangeForGetMappedRange(
|
||||
this, RYWImpl::GetMappedRangeReq<true>(begin, end, mapper, limits), snapshot)
|
||||
this, RYWImpl::GetMappedRangeReq<true>(begin, end, mapper, matchIndex, limits), snapshot)
|
||||
: RYWImpl::readWithConflictRangeForGetMappedRange(
|
||||
this, RYWImpl::GetMappedRangeReq<false>(begin, end, mapper, limits), snapshot);
|
||||
this, RYWImpl::GetMappedRangeReq<false>(begin, end, mapper, matchIndex, limits), snapshot);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -112,6 +112,7 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override;
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
|
|
|
@ -426,6 +426,7 @@ struct GetMappedKeyValuesRequest : TimedRequest {
|
|||
KeyRef mapper;
|
||||
Version version; // or latestVersion
|
||||
int limit, limitBytes;
|
||||
int matchIndex;
|
||||
bool isFetchKeys;
|
||||
Optional<TagSet> tags;
|
||||
Optional<UID> debugID;
|
||||
|
@ -451,7 +452,8 @@ struct GetMappedKeyValuesRequest : TimedRequest {
|
|||
spanContext,
|
||||
tenantInfo,
|
||||
arena,
|
||||
ssLatestCommitVersions);
|
||||
ssLatestCommitVersions,
|
||||
matchIndex);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -306,6 +306,7 @@ ThreadFuture<MappedRangeResult> ThreadSafeTransaction::getMappedRange(const KeyS
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
KeySelector b = begin;
|
||||
|
@ -313,9 +314,9 @@ ThreadFuture<MappedRangeResult> ThreadSafeTransaction::getMappedRange(const KeyS
|
|||
Key h = mapper;
|
||||
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
return onMainThread([tr, b, e, h, limits, snapshot, reverse]() -> Future<MappedRangeResult> {
|
||||
return onMainThread([tr, b, e, h, limits, matchIndex, snapshot, reverse]() -> Future<MappedRangeResult> {
|
||||
tr->checkDeferredError();
|
||||
return tr->getMappedRange(b, e, h, limits, Snapshot{ snapshot }, Reverse{ reverse });
|
||||
return tr->getMappedRange(b, e, h, limits, matchIndex, Snapshot{ snapshot }, Reverse{ reverse });
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -136,6 +136,7 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
|
|
@ -3723,7 +3723,8 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
StringRef mapper,
|
||||
// To provide span context, tags, debug ID to underlying lookups.
|
||||
GetMappedKeyValuesRequest* pOriginalReq,
|
||||
Optional<Key> tenantPrefix) {
|
||||
Optional<Key> tenantPrefix,
|
||||
int matchIndex) {
|
||||
state GetMappedKeyValuesReply result;
|
||||
result.version = input.version;
|
||||
result.more = input.more;
|
||||
|
@ -3741,15 +3742,20 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
TraceEvent("MapperNotTuple").error(e).detail("Mapper", mapper.printable());
|
||||
throw mapper_not_tuple();
|
||||
}
|
||||
state KeyValueRef* it = input.data.begin();
|
||||
state std::vector<Optional<Tuple>> vt;
|
||||
state bool isRangeQuery = false;
|
||||
preprocessMappedKey(mappedKeyFormatTuple, vt, isRangeQuery);
|
||||
|
||||
for (; it != input.data.end(); it++) {
|
||||
state int sz = input.data.size();
|
||||
state int i = 0;
|
||||
for (; i < sz; i++) {
|
||||
KeyValueRef* it = &input.data[i];
|
||||
state MappedKeyValueRef kvm;
|
||||
kvm.key = it->key;
|
||||
kvm.value = it->value;
|
||||
// need to keep the boundary, so that caller can use it as a continuation.
|
||||
if ((i == 0 || i == sz - 1) || matchIndex == MATCH_INDEX_ALL) {
|
||||
kvm.key = it->key;
|
||||
kvm.value = it->value;
|
||||
}
|
||||
|
||||
state Key mappedKey = constructMappedKey(it, vt, mappedKeyTuple, mappedKeyFormatTuple);
|
||||
// Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously.
|
||||
|
@ -4026,7 +4032,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
|
|||
try {
|
||||
// Map the scanned range to another list of keys and look up.
|
||||
GetMappedKeyValuesReply _r =
|
||||
wait(mapKeyValues(data, getKeyValuesReply, req.mapper, &req, tenantPrefix));
|
||||
wait(mapKeyValues(data, getKeyValuesReply, req.mapper, &req, tenantPrefix, req.matchIndex));
|
||||
r = _r;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("MapError").error(e);
|
||||
|
|
|
@ -56,6 +56,7 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) = 0;
|
||||
|
||||
|
@ -128,9 +129,10 @@ struct FlowTransactionWrapper : public TransactionWrapper {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) override {
|
||||
return transaction.getMappedRange(begin, end, mapper, limits, snapshot, reverse);
|
||||
return transaction.getMappedRange(begin, end, mapper, limits, matchIndex, snapshot, reverse);
|
||||
}
|
||||
|
||||
// Gets the key from the database specified by a given key selector
|
||||
|
@ -203,9 +205,11 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) override {
|
||||
return unsafeThreadFutureToFuture(transaction->getMappedRange(begin, end, mapper, limits, snapshot, reverse));
|
||||
return unsafeThreadFutureToFuture(
|
||||
transaction->getMappedRange(begin, end, mapper, limits, matchIndex, snapshot, reverse));
|
||||
}
|
||||
|
||||
// Gets the key from the database specified by a given key selector
|
||||
|
|
|
@ -145,10 +145,18 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
}
|
||||
|
||||
// Return true if need to retry.
|
||||
static bool validateRecord(int expectedId, const MappedKeyValueRef* it, GetMappedRangeWorkload* self) {
|
||||
static bool validateRecord(int expectedId,
|
||||
const MappedKeyValueRef* it,
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex,
|
||||
bool isBoundary) {
|
||||
// std::cout << "validateRecord expectedId " << expectedId << " it->key " << printable(it->key) << "
|
||||
// indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId)) << std::endl;
|
||||
ASSERT(it->key == indexEntryKey(expectedId));
|
||||
if (matchIndex == MATCH_INDEX_ALL || isBoundary) {
|
||||
ASSERT(it->key == indexEntryKey(expectedId));
|
||||
} else {
|
||||
ASSERT(it->key == EMPTY);
|
||||
}
|
||||
ASSERT(it->value == EMPTY);
|
||||
|
||||
if (self->SPLIT_RECORDS) {
|
||||
|
@ -189,7 +197,8 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
Key mapper,
|
||||
int limit,
|
||||
int expectedBeginId,
|
||||
GetMappedRangeWorkload* self) {
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex) {
|
||||
|
||||
std::cout << "start scanMappedRangeWithLimits beginSelector:" << beginSelector.toString()
|
||||
<< " endSelector:" << endSelector.toString() << " expectedBeginId:" << expectedBeginId
|
||||
|
@ -197,8 +206,13 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
loop {
|
||||
state Reference<TransactionWrapper> tr = self->createTransaction();
|
||||
try {
|
||||
MappedRangeResult result = wait(tr->getMappedRange(
|
||||
beginSelector, endSelector, mapper, GetRangeLimits(limit), self->snapshot, Reverse::False));
|
||||
MappedRangeResult result = wait(tr->getMappedRange(beginSelector,
|
||||
endSelector,
|
||||
mapper,
|
||||
GetRangeLimits(limit),
|
||||
matchIndex,
|
||||
self->snapshot,
|
||||
Reverse::False));
|
||||
// showResult(result);
|
||||
if (self->BAD_MAPPER) {
|
||||
TraceEvent("GetMappedRangeWorkloadShouldNotReachable").detail("ResultSize", result.size());
|
||||
|
@ -208,8 +222,10 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
ASSERT(result.size() <= limit);
|
||||
int expectedId = expectedBeginId;
|
||||
bool needRetry = false;
|
||||
for (const MappedKeyValueRef* it = result.begin(); it != result.end(); it++) {
|
||||
if (validateRecord(expectedId, it, self)) {
|
||||
int cnt = 0;
|
||||
const MappedKeyValueRef* it = result.begin();
|
||||
for (; cnt < result.size(); cnt++, it++) {
|
||||
if (validateRecord(expectedId, it, self, matchIndex, cnt == 0 || cnt == result.size() - 1)) {
|
||||
needRetry = true;
|
||||
break;
|
||||
}
|
||||
|
@ -236,7 +252,12 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> scanMappedRange(Database cx, int beginId, int endId, Key mapper, GetMappedRangeWorkload* self) {
|
||||
ACTOR Future<Void> scanMappedRange(Database cx,
|
||||
int beginId,
|
||||
int endId,
|
||||
Key mapper,
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex) {
|
||||
Key beginTuple = Tuple().append(prefix).append(INDEX).append(indexKey(beginId)).getDataAsStandalone();
|
||||
state KeySelector beginSelector = KeySelector(firstGreaterOrEqual(beginTuple));
|
||||
Key endTuple = Tuple().append(prefix).append(INDEX).append(indexKey(endId)).getDataAsStandalone();
|
||||
|
@ -244,14 +265,15 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
state int limit = 100;
|
||||
state int expectedBeginId = beginId;
|
||||
while (true) {
|
||||
MappedRangeResult result = wait(
|
||||
self->scanMappedRangeWithLimits(cx, beginSelector, endSelector, mapper, limit, expectedBeginId, self));
|
||||
MappedRangeResult result = wait(self->scanMappedRangeWithLimits(
|
||||
cx, beginSelector, endSelector, mapper, limit, expectedBeginId, self, matchIndex));
|
||||
expectedBeginId += result.size();
|
||||
if (result.more) {
|
||||
if (result.empty()) {
|
||||
// This is usually not expected.
|
||||
std::cout << "not result but have more, try again" << std::endl;
|
||||
} else {
|
||||
// auto& reqAndResult = std::get<GetRangeReqAndResultRef>(result.back().reqAndResult);
|
||||
beginSelector = KeySelector(firstGreaterThan(result.back().key));
|
||||
}
|
||||
} else {
|
||||
|
@ -296,6 +318,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
endSelector,
|
||||
mapper,
|
||||
GetRangeLimits(GetRangeLimits::ROW_LIMIT_UNLIMITED),
|
||||
MATCH_INDEX_ALL,
|
||||
self->snapshot,
|
||||
Reverse::False);
|
||||
}
|
||||
|
@ -394,7 +417,8 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
Key mapper = getMapper(self);
|
||||
// The scanned range cannot be too large to hit get_mapped_key_values_has_more. We have a unit validating the
|
||||
// error is thrown when the range is large.
|
||||
wait(self->scanMappedRange(cx, 10, 490, mapper, self));
|
||||
int matchIndex = deterministicRandom()->random01() > 0.5 ? MATCH_INDEX_NONE : MATCH_INDEX_ALL;
|
||||
wait(self->scanMappedRange(cx, 10, 490, mapper, self, matchIndex));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue