Revert matchIndex feature
It is not protocol compatible, revert it to avoid deployment issue. Will have a new PR to have the feature if moving forward.
This commit is contained in:
parent
f93a4c8779
commit
29161b2fda
|
@ -1075,7 +1075,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_mapped_range(FDBTransaction*
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse);
|
||||
|
@ -1088,7 +1087,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_mapped_range(FDBTransaction*
|
|||
KeySelectorRef(KeyRef(end_key_name, end_key_name_length), end_or_equal, end_offset),
|
||||
StringRef(mapper_name, mapper_name_length),
|
||||
GetRangeLimits(limit, target_bytes),
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse)
|
||||
.extractPtr());
|
||||
|
|
|
@ -599,7 +599,6 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_mapped_range(FDBTran
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
|
|
|
@ -310,7 +310,6 @@ MappedKeyValueArrayFuture Transaction::get_mapped_range(const uint8_t* begin_key
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
return MappedKeyValueArrayFuture(fdb_transaction_get_mapped_range(tr_,
|
||||
|
@ -328,7 +327,6 @@ MappedKeyValueArrayFuture Transaction::get_mapped_range(const uint8_t* begin_key
|
|||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse));
|
||||
}
|
||||
|
|
|
@ -344,7 +344,6 @@ public:
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
|
|
|
@ -269,7 +269,6 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
|
|||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
fdb::MappedKeyValueArrayFuture f1 = tr.get_mapped_range(begin_key_name,
|
||||
|
@ -286,7 +285,6 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
|
|||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
|
||||
|
@ -961,11 +959,7 @@ std::map<std::string, std::string> fillInRecords(int n) {
|
|||
return data;
|
||||
}
|
||||
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId,
|
||||
int endId,
|
||||
fdb::Transaction& tr,
|
||||
std::string mapper,
|
||||
int matchIndex) {
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId, int endId, fdb::Transaction& tr, std::string mapper) {
|
||||
std::string indexEntryKeyBegin = indexEntryKey(beginId);
|
||||
std::string indexEntryKeyEnd = indexEntryKey(endId);
|
||||
|
||||
|
@ -979,19 +973,14 @@ GetMappedRangeResult getMappedIndexEntries(int beginId,
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ matchIndex,
|
||||
/* snapshot */ false,
|
||||
/* reverse */ 0);
|
||||
}
|
||||
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId,
|
||||
int endId,
|
||||
fdb::Transaction& tr,
|
||||
int matchIndex,
|
||||
bool allMissing) {
|
||||
GetMappedRangeResult getMappedIndexEntries(int beginId, int endId, fdb::Transaction& tr, bool allMissing) {
|
||||
std::string mapper =
|
||||
Tuple::makeTuple(prefix, RECORD, (allMissing ? "{K[2]}"_sr : "{K[3]}"_sr), "{...}"_sr).pack().toString();
|
||||
return getMappedIndexEntries(beginId, endId, tr, mapper, matchIndex);
|
||||
return getMappedIndexEntries(beginId, endId, tr, mapper);
|
||||
}
|
||||
|
||||
TEST_CASE("versionstamp_unit_test") {
|
||||
|
@ -1070,16 +1059,7 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
|
|||
while (1) {
|
||||
int beginId = 1;
|
||||
int endId = 19;
|
||||
const double r = deterministicRandom()->random01();
|
||||
int matchIndex = MATCH_INDEX_ALL;
|
||||
if (r < 0.25) {
|
||||
matchIndex = MATCH_INDEX_NONE;
|
||||
} else if (r < 0.5) {
|
||||
matchIndex = MATCH_INDEX_MATCHED_ONLY;
|
||||
} else if (r < 0.75) {
|
||||
matchIndex = MATCH_INDEX_UNMATCHED_ONLY;
|
||||
}
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr, matchIndex, false);
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr, false);
|
||||
|
||||
if (result.err) {
|
||||
fdb::EmptyFuture f1 = tr.on_error(result.err);
|
||||
|
@ -1094,15 +1074,7 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
|
|||
int id = beginId;
|
||||
for (int i = 0; i < expectSize; i++, id++) {
|
||||
const auto& mkv = result.mkvs[i];
|
||||
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
} else if (matchIndex == MATCH_INDEX_MATCHED_ONLY) {
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
} else if (matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
|
||||
CHECK(EMPTY.compare(mkv.key) == 0);
|
||||
} else {
|
||||
CHECK(EMPTY.compare(mkv.key) == 0);
|
||||
}
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
CHECK(EMPTY.compare(mkv.value) == 0);
|
||||
CHECK(mkv.range_results.size() == SPLIT_SIZE);
|
||||
for (int split = 0; split < SPLIT_SIZE; split++) {
|
||||
|
@ -1124,16 +1096,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_missing_all_secondary") {
|
|||
while (1) {
|
||||
int beginId = 1;
|
||||
int endId = 19;
|
||||
const double r = deterministicRandom()->random01();
|
||||
int matchIndex = MATCH_INDEX_ALL;
|
||||
if (r < 0.25) {
|
||||
matchIndex = MATCH_INDEX_NONE;
|
||||
} else if (r < 0.5) {
|
||||
matchIndex = MATCH_INDEX_MATCHED_ONLY;
|
||||
} else if (r < 0.75) {
|
||||
matchIndex = MATCH_INDEX_UNMATCHED_ONLY;
|
||||
}
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr, matchIndex, true);
|
||||
auto result = getMappedIndexEntries(beginId, endId, tr, true);
|
||||
|
||||
if (result.err) {
|
||||
fdb::EmptyFuture f1 = tr.on_error(result.err);
|
||||
|
@ -1148,15 +1111,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_missing_all_secondary") {
|
|||
int id = beginId;
|
||||
for (int i = 0; i < expectSize; i++, id++) {
|
||||
const auto& mkv = result.mkvs[i];
|
||||
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
} else if (matchIndex == MATCH_INDEX_MATCHED_ONLY) {
|
||||
CHECK(EMPTY.compare(mkv.key) == 0);
|
||||
} else if (matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
} else {
|
||||
CHECK(EMPTY.compare(mkv.key) == 0);
|
||||
}
|
||||
CHECK(indexEntryKey(id).compare(mkv.key) == 0);
|
||||
CHECK(EMPTY.compare(mkv.value) == 0);
|
||||
}
|
||||
break;
|
||||
|
@ -1176,7 +1131,6 @@ TEST_CASE("fdb_transaction_get_mapped_range_restricted_to_serializable") {
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ MATCH_INDEX_ALL,
|
||||
/* snapshot */ true, // Set snapshot to true
|
||||
/* reverse */ 0);
|
||||
ASSERT(result.err == error_code_unsupported_operation);
|
||||
|
@ -1196,7 +1150,6 @@ TEST_CASE("fdb_transaction_get_mapped_range_restricted_to_ryw_enable") {
|
|||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* matchIndex */ MATCH_INDEX_ALL,
|
||||
/* snapshot */ false,
|
||||
/* reverse */ 0);
|
||||
ASSERT(result.err == error_code_unsupported_operation);
|
||||
|
@ -1225,7 +1178,7 @@ TEST_CASE("fdb_transaction_get_mapped_range_fail_on_mapper_not_tuple") {
|
|||
};
|
||||
assertNotTuple(mapper);
|
||||
fdb::Transaction tr(db);
|
||||
auto result = getMappedIndexEntries(1, 3, tr, mapper, MATCH_INDEX_ALL);
|
||||
auto result = getMappedIndexEntries(1, 3, tr, mapper);
|
||||
ASSERT(result.err == error_code_mapper_not_tuple);
|
||||
}
|
||||
|
||||
|
|
|
@ -1614,7 +1614,6 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
jint targetBytes,
|
||||
jint streamingMode,
|
||||
jint iteration,
|
||||
jint matchIndex,
|
||||
jboolean snapshot,
|
||||
jboolean reverse) {
|
||||
if (!tPtr || !keyBeginBytes || !keyEndBytes || !mapperBytes) {
|
||||
|
@ -1662,7 +1661,6 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
targetBytes,
|
||||
(FDBStreamingMode)streamingMode,
|
||||
iteration,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
|
||||
|
|
|
@ -192,12 +192,12 @@ class MappedRangeQueryIntegrationTest {
|
|||
|
||||
RangeQueryWithIndex mappedRangeQuery = (int begin, int end, Database db) -> db.run(tr -> {
|
||||
try {
|
||||
List<MappedKeyValue> kvs = tr.getMappedRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)), MAPPER,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED,
|
||||
FDBTransaction.MATCH_INDEX_ALL, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
List<MappedKeyValue> kvs =
|
||||
tr.getMappedRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)), MAPPER,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
Assertions.assertEquals(end - begin, kvs.size());
|
||||
|
||||
if (validate) {
|
||||
|
|
|
@ -33,11 +33,6 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
|
|||
|
||||
class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionConsumer {
|
||||
|
||||
static public final int MATCH_INDEX_ALL = 0;
|
||||
static public final int MATCH_INDEX_NONE = 1;
|
||||
static public final int MATCH_INDEX_MATCHED_ONLY = 2;
|
||||
static public final int MATCH_INDEX_UNMATCHED_ONLY = 3;
|
||||
|
||||
private final Database database;
|
||||
private final Executor executor;
|
||||
private final TransactionOptions options;
|
||||
|
@ -104,8 +99,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
|
||||
@Override
|
||||
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper,
|
||||
int limit, int matchIndex, boolean reverse,
|
||||
StreamingMode mode) {
|
||||
int limit, boolean reverse, StreamingMode mode) {
|
||||
|
||||
throw new UnsupportedOperationException("getMappedRange is only supported in serializable");
|
||||
}
|
||||
|
@ -369,12 +363,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
|
||||
@Override
|
||||
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit,
|
||||
int matchIndex, boolean reverse, StreamingMode mode) {
|
||||
boolean reverse, StreamingMode mode) {
|
||||
if (mapper == null) {
|
||||
throw new IllegalArgumentException("Mapper must be non-null");
|
||||
}
|
||||
return new MappedRangeQuery(FDBTransaction.this, false, begin, end, mapper, limit, matchIndex, reverse, mode,
|
||||
eventKeeper);
|
||||
return new MappedRangeQuery(FDBTransaction.this, false, begin, end, mapper, limit, reverse, mode, eventKeeper);
|
||||
}
|
||||
|
||||
///////////////////
|
||||
|
@ -479,8 +472,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
protected FutureMappedResults getMappedRange_internal(KeySelector begin, KeySelector end,
|
||||
byte[] mapper, // Nullable
|
||||
int rowLimit, int targetBytes, int streamingMode,
|
||||
int iteration, boolean isSnapshot, boolean reverse,
|
||||
int matchIndex) {
|
||||
int iteration, boolean isSnapshot, boolean reverse) {
|
||||
if (eventKeeper != null) {
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
}
|
||||
|
@ -490,11 +482,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
" -- range get: (%s, %s) limit: %d, bytes: %d, mode: %d, iteration: %d, snap: %s, reverse %s",
|
||||
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
|
||||
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
|
||||
return new FutureMappedResults(
|
||||
Transaction_getMappedRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), end.getKey(),
|
||||
end.orEqual(), end.getOffset(), mapper, rowLimit, targetBytes, streamingMode,
|
||||
iteration, matchIndex, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
return new FutureMappedResults(Transaction_getMappedRange(getPtr(), begin.getKey(), begin.orEqual(),
|
||||
begin.getOffset(), end.getKey(), end.orEqual(),
|
||||
end.getOffset(), mapper, rowLimit, targetBytes,
|
||||
streamingMode, iteration, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -836,7 +828,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
byte[] mapper, // Nonnull
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
int matchIndex, boolean isSnapshot, boolean reverse);
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native void Transaction_addConflictRange(long cPtr,
|
||||
byte[] keyBegin, byte[] keyEnd, int conflictRangeType);
|
||||
private native void Transaction_set(long cPtr, byte[] key, byte[] value);
|
||||
|
|
|
@ -53,21 +53,18 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
private final byte[] mapper; // Nonnull
|
||||
private final boolean snapshot;
|
||||
private final int rowLimit;
|
||||
private final int matchIndex;
|
||||
private final boolean reverse;
|
||||
private final StreamingMode streamingMode;
|
||||
private final EventKeeper eventKeeper;
|
||||
|
||||
MappedRangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, byte[] mapper,
|
||||
int rowLimit, int matchIndex, boolean reverse, StreamingMode streamingMode,
|
||||
EventKeeper eventKeeper) {
|
||||
int rowLimit, boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
|
||||
this.tr = transaction;
|
||||
this.begin = begin;
|
||||
this.end = end;
|
||||
this.mapper = mapper;
|
||||
this.snapshot = isSnapshot;
|
||||
this.rowLimit = rowLimit;
|
||||
this.matchIndex = matchIndex;
|
||||
this.reverse = reverse;
|
||||
this.streamingMode = streamingMode;
|
||||
this.eventKeeper = eventKeeper;
|
||||
|
@ -91,14 +88,14 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
|
||||
FutureMappedResults range =
|
||||
tr.getMappedRange_internal(this.begin, this.end, this.mapper, this.rowLimit, 0,
|
||||
StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse, this.matchIndex);
|
||||
StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse);
|
||||
return range.thenApply(result -> result.get().values).whenComplete((result, e) -> range.close());
|
||||
}
|
||||
|
||||
// If the streaming mode is not EXACT, simply collect the results of an
|
||||
// iteration into a list
|
||||
return AsyncUtil.collect(
|
||||
new MappedRangeQuery(tr, snapshot, begin, end, mapper, rowLimit, matchIndex, reverse, mode, eventKeeper),
|
||||
new MappedRangeQuery(tr, snapshot, begin, end, mapper, rowLimit, reverse, mode, eventKeeper),
|
||||
tr.getExecutor());
|
||||
}
|
||||
|
||||
|
@ -109,7 +106,7 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
*/
|
||||
@Override
|
||||
public AsyncRangeIterator iterator() {
|
||||
return new AsyncRangeIterator(this.rowLimit, this.matchIndex, this.reverse, this.streamingMode);
|
||||
return new AsyncRangeIterator(this.rowLimit, this.reverse, this.streamingMode);
|
||||
}
|
||||
|
||||
private class AsyncRangeIterator implements AsyncIterator<MappedKeyValue> {
|
||||
|
@ -117,7 +114,6 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
private final boolean rowsLimited;
|
||||
private final boolean reverse;
|
||||
private final StreamingMode streamingMode;
|
||||
private final int matchIndex;
|
||||
|
||||
// There is the chance for parallelism in the two "chunks" for fetched data
|
||||
private MappedRangeResult chunk = null;
|
||||
|
@ -135,13 +131,12 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
private CompletableFuture<Boolean> nextFuture;
|
||||
private boolean isCancelled = false;
|
||||
|
||||
private AsyncRangeIterator(int rowLimit, int matchIndex, boolean reverse, StreamingMode streamingMode) {
|
||||
private AsyncRangeIterator(int rowLimit, boolean reverse, StreamingMode streamingMode) {
|
||||
this.begin = MappedRangeQuery.this.begin;
|
||||
this.end = MappedRangeQuery.this.end;
|
||||
this.rowsLimited = rowLimit != 0;
|
||||
this.rowsRemaining = rowLimit;
|
||||
this.reverse = reverse;
|
||||
this.matchIndex = matchIndex;
|
||||
this.streamingMode = streamingMode;
|
||||
|
||||
startNextFetch();
|
||||
|
@ -222,9 +217,8 @@ class MappedRangeQuery implements AsyncIterable<MappedKeyValue> {
|
|||
|
||||
nextFuture = new CompletableFuture<>();
|
||||
final long sTime = System.nanoTime();
|
||||
fetchingChunk =
|
||||
tr.getMappedRange_internal(begin, end, mapper, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
|
||||
++iteration, snapshot, reverse, matchIndex);
|
||||
fetchingChunk = tr.getMappedRange_internal(begin, end, mapper, rowsLimited ? rowsRemaining : 0, 0,
|
||||
streamingMode.code(), ++iteration, snapshot, reverse);
|
||||
|
||||
BiConsumer<MappedRangeResultInfo, Throwable> cons = new FetchComplete(fetchingChunk, nextFuture);
|
||||
if (eventKeeper != null) {
|
||||
|
|
|
@ -440,12 +440,6 @@ public interface ReadTransaction extends ReadTransactionContext {
|
|||
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
|
||||
* should not limit the number of results. If {@code reverse} is {@code true} rows
|
||||
* will be limited starting at the end of the range.
|
||||
* @param matchIndex the mode to return index entries based on whether their
|
||||
* corresponding records are present, examples:
|
||||
* {@link FDBTransaction#MATCH_INDEX_ALL}
|
||||
* {@link FDBTransaction#MATCH_INDEX_NONE}
|
||||
* {@link FDBTransaction#MATCH_INDEX_MATCHED_ONLY}
|
||||
* {@link FDBTransaction#MATCH_INDEX_UNMATCHED_ONLY}
|
||||
* @param reverse return results starting at the end of the range in reverse order.
|
||||
* Reading ranges in reverse is supported natively by the database and should
|
||||
* have minimal extra cost.
|
||||
|
@ -466,7 +460,7 @@ public interface ReadTransaction extends ReadTransactionContext {
|
|||
* @return a handle to access the results of the asynchronous call
|
||||
*/
|
||||
AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit,
|
||||
int matchIndex, boolean reverse, StreamingMode mode);
|
||||
boolean reverse, StreamingMode mode);
|
||||
|
||||
/**
|
||||
* Gets an estimate for the number of bytes stored in the given range.
|
||||
|
|
|
@ -173,7 +173,6 @@ ThreadFuture<MappedRangeResult> DLTransaction::getMappedRange(const KeySelectorR
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
FdbCApi::FDBFuture* f = api->transactionGetMappedRange(tr,
|
||||
|
@ -191,7 +190,6 @@ ThreadFuture<MappedRangeResult> DLTransaction::getMappedRange(const KeySelectorR
|
|||
limits.bytes,
|
||||
FDB_STREAMING_MODE_EXACT,
|
||||
0,
|
||||
matchIndex,
|
||||
snapshot,
|
||||
reverse);
|
||||
return toThreadFuture<MappedRangeResult>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
|
@ -1530,7 +1528,6 @@ ThreadFuture<MappedRangeResult> MultiVersionTransaction::getMappedRange(const Ke
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
return executeOperation(&ITransaction::getMappedRange,
|
||||
|
@ -1538,7 +1535,6 @@ ThreadFuture<MappedRangeResult> MultiVersionTransaction::getMappedRange(const Ke
|
|||
end,
|
||||
mapper,
|
||||
std::forward<GetRangeLimits>(limits),
|
||||
std::forward<int>(matchIndex),
|
||||
std::forward<bool>(snapshot),
|
||||
std::forward<bool>(reverse));
|
||||
}
|
||||
|
|
|
@ -4154,23 +4154,11 @@ PublicRequestStream<GetKeyValuesFamilyRequest> StorageServerInterface::*getRange
|
|||
}
|
||||
}
|
||||
|
||||
template <class GetKeyValuesFamilyRequest>
|
||||
void setMatchIndex(GetKeyValuesFamilyRequest& req, int matchIndex) {
|
||||
if constexpr (std::is_same<GetKeyValuesFamilyRequest, GetKeyValuesRequest>::value) {
|
||||
// do nothing;
|
||||
} else if (std::is_same<GetKeyValuesFamilyRequest, GetMappedKeyValuesRequest>::value) {
|
||||
req.matchIndex = matchIndex;
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class GetKeyValuesFamilyRequest, class GetKeyValuesFamilyReply, class RangeResultFamily>
|
||||
Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
|
||||
KeyRange keys,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant) {
|
||||
state RangeResultFamily output;
|
||||
|
@ -4205,7 +4193,6 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
|
|||
req.begin = firstGreaterOrEqual(range.begin);
|
||||
req.end = firstGreaterOrEqual(range.end);
|
||||
|
||||
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
|
||||
req.spanContext = span.context;
|
||||
trState->cx->getLatestCommitVersions(locations[shard].locations, trState, req.ssLatestCommitVersions);
|
||||
|
||||
|
@ -4376,7 +4363,6 @@ Future<RangeResultFamily> getRangeFallback(Reference<TransactionState> trState,
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant) {
|
||||
Future<Key> fb = resolveKey(trState, begin, useTenant);
|
||||
|
@ -4393,7 +4379,7 @@ Future<RangeResultFamily> getRangeFallback(Reference<TransactionState> trState,
|
|||
// or allKeys.begin exists in the database/tenant and will be part of the conflict range anyways
|
||||
|
||||
RangeResultFamily _r = wait(getExactRange<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState, KeyRangeRef(b, e), mapper, limits, matchIndex, reverse, useTenant));
|
||||
trState, KeyRangeRef(b, e), mapper, limits, reverse, useTenant));
|
||||
RangeResultFamily r = _r;
|
||||
|
||||
if (b == allKeys.begin && ((reverse && !r.more) || !reverse))
|
||||
|
@ -4516,7 +4502,6 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
Promise<std::pair<Key, Key>> conflictRange,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse,
|
||||
UseTenant useTenant = UseTenant::True) {
|
||||
|
@ -4562,7 +4547,6 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
state GetKeyValuesFamilyRequest req;
|
||||
req.mapper = mapper;
|
||||
req.arena.dependsOn(mapper.arena());
|
||||
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
|
||||
req.tenantInfo = useTenant ? trState->getTenantInfo() : TenantInfo();
|
||||
req.options = trState->readOptions;
|
||||
req.version = trState->readVersion();
|
||||
|
@ -4736,14 +4720,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
if (!rep.data.size()) {
|
||||
RangeResultFamily result = wait(
|
||||
getRangeFallback<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState,
|
||||
originalBegin,
|
||||
originalEnd,
|
||||
mapper,
|
||||
originalLimits,
|
||||
matchIndex,
|
||||
reverse,
|
||||
useTenant));
|
||||
trState, originalBegin, originalEnd, mapper, originalLimits, reverse, useTenant));
|
||||
getRangeFinished(
|
||||
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
|
||||
return result;
|
||||
|
@ -4775,14 +4752,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
|
|||
if (e.code() == error_code_wrong_shard_server) {
|
||||
RangeResultFamily result = wait(
|
||||
getRangeFallback<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState,
|
||||
originalBegin,
|
||||
originalEnd,
|
||||
mapper,
|
||||
originalLimits,
|
||||
matchIndex,
|
||||
reverse,
|
||||
useTenant));
|
||||
trState, originalBegin, originalEnd, mapper, originalLimits, reverse, useTenant));
|
||||
getRangeFinished(
|
||||
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
|
||||
return result;
|
||||
|
@ -5349,16 +5319,8 @@ Future<RangeResult> getRange(Reference<TransactionState> const& trState,
|
|||
GetRangeLimits const& limits,
|
||||
Reverse const& reverse,
|
||||
UseTenant const& useTenant) {
|
||||
return getRange<GetKeyValuesRequest, GetKeyValuesReply, RangeResult>(trState,
|
||||
begin,
|
||||
end,
|
||||
""_sr,
|
||||
limits,
|
||||
Promise<std::pair<Key, Key>>(),
|
||||
MATCH_INDEX_ALL,
|
||||
Snapshot::True,
|
||||
reverse,
|
||||
useTenant);
|
||||
return getRange<GetKeyValuesRequest, GetKeyValuesReply, RangeResult>(
|
||||
trState, begin, end, ""_sr, limits, Promise<std::pair<Key, Key>>(), Snapshot::True, reverse, useTenant);
|
||||
}
|
||||
|
||||
bool DatabaseContext::debugUseTags = false;
|
||||
|
@ -5723,7 +5685,6 @@ Future<RangeResultFamily> Transaction::getRangeInternal(const KeySelector& begin
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
++trState->cx->transactionLogicalReads;
|
||||
|
@ -5766,7 +5727,7 @@ Future<RangeResultFamily> Transaction::getRangeInternal(const KeySelector& begin
|
|||
}
|
||||
|
||||
return ::getRange<GetKeyValuesFamilyRequest, GetKeyValuesFamilyReply, RangeResultFamily>(
|
||||
trState, b, e, mapper, limits, conflictRange, matchIndex, snapshot, reverse);
|
||||
trState, b, e, mapper, limits, conflictRange, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
|
@ -5775,18 +5736,17 @@ Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
|||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
return getRangeInternal<GetKeyValuesRequest, GetKeyValuesReply, RangeResult>(
|
||||
begin, end, ""_sr, limits, MATCH_INDEX_ALL, snapshot, reverse);
|
||||
begin, end, ""_sr, limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<MappedRangeResult> Transaction::getMappedRange(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
return getRangeInternal<GetMappedKeyValuesRequest, GetMappedKeyValuesReply, MappedRangeResult>(
|
||||
begin, end, mapper, limits, matchIndex, snapshot, reverse);
|
||||
begin, end, mapper, limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
|
|
|
@ -77,12 +77,11 @@ public:
|
|||
|
||||
template <bool reverse>
|
||||
struct GetMappedRangeReq {
|
||||
GetMappedRangeReq(KeySelector begin, KeySelector end, Key mapper, int matchIndex, GetRangeLimits limits)
|
||||
: begin(begin), end(end), mapper(mapper), limits(limits), matchIndex(matchIndex) {}
|
||||
GetMappedRangeReq(KeySelector begin, KeySelector end, Key mapper, GetRangeLimits limits)
|
||||
: begin(begin), end(end), mapper(mapper), limits(limits) {}
|
||||
KeySelector begin, end;
|
||||
Key mapper;
|
||||
GetRangeLimits limits;
|
||||
int matchIndex;
|
||||
using Result = MappedRangeResult;
|
||||
};
|
||||
|
||||
|
@ -1156,13 +1155,8 @@ public:
|
|||
else
|
||||
read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
|
||||
}
|
||||
MappedRangeResult v = wait(ryw->tr.getMappedRange(read.begin,
|
||||
read.end,
|
||||
read.mapper,
|
||||
read.limits,
|
||||
read.matchIndex,
|
||||
snapshot,
|
||||
backwards ? Reverse::True : Reverse::False));
|
||||
MappedRangeResult v = wait(ryw->tr.getMappedRange(
|
||||
read.begin, read.end, read.mapper, read.limits, snapshot, backwards ? Reverse::True : Reverse::False));
|
||||
return v;
|
||||
}
|
||||
|
||||
|
@ -1786,7 +1780,6 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
if (getDatabase()->apiVersionAtLeast(630)) {
|
||||
|
@ -1834,9 +1827,9 @@ Future<MappedRangeResult> ReadYourWritesTransaction::getMappedRange(KeySelector
|
|||
|
||||
Future<MappedRangeResult> result =
|
||||
reverse ? RYWImpl::readWithConflictRangeForGetMappedRange(
|
||||
this, RYWImpl::GetMappedRangeReq<true>(begin, end, mapper, matchIndex, limits), snapshot)
|
||||
this, RYWImpl::GetMappedRangeReq<true>(begin, end, mapper, limits), snapshot)
|
||||
: RYWImpl::readWithConflictRangeForGetMappedRange(
|
||||
this, RYWImpl::GetMappedRangeReq<false>(begin, end, mapper, matchIndex, limits), snapshot);
|
||||
this, RYWImpl::GetMappedRangeReq<false>(begin, end, mapper, limits), snapshot);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -498,7 +498,6 @@ ThreadFuture<MappedRangeResult> ThreadSafeTransaction::getMappedRange(const KeyS
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
KeySelector b = begin;
|
||||
|
@ -506,9 +505,9 @@ ThreadFuture<MappedRangeResult> ThreadSafeTransaction::getMappedRange(const KeyS
|
|||
Key h = mapper;
|
||||
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
return onMainThread([tr, b, e, h, limits, matchIndex, snapshot, reverse]() -> Future<MappedRangeResult> {
|
||||
return onMainThread([tr, b, e, h, limits, snapshot, reverse]() -> Future<MappedRangeResult> {
|
||||
tr->checkDeferredError();
|
||||
return tr->getMappedRange(b, e, h, limits, matchIndex, Snapshot{ snapshot }, Reverse{ reverse });
|
||||
return tr->getMappedRange(b, e, h, limits, Snapshot{ snapshot }, Reverse{ reverse });
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -149,11 +149,6 @@ static const Tag invalidTag{ tagLocalitySpecial, 0 };
|
|||
static const Tag txsTag{ tagLocalitySpecial, 1 };
|
||||
static const Tag cacheTag{ tagLocalitySpecial, 2 };
|
||||
|
||||
const int MATCH_INDEX_ALL = 0;
|
||||
const int MATCH_INDEX_NONE = 1;
|
||||
const int MATCH_INDEX_MATCHED_ONLY = 2;
|
||||
const int MATCH_INDEX_UNMATCHED_ONLY = 3;
|
||||
|
||||
enum { txsTagOld = -1, invalidTagOld = -100 };
|
||||
|
||||
struct TagsAndMessage {
|
||||
|
|
|
@ -68,7 +68,6 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) = 0;
|
||||
virtual ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) = 0;
|
||||
|
|
|
@ -74,7 +74,6 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) = 0;
|
||||
virtual Future<Standalone<VectorRef<const char*>>> getAddressesForKey(Key const& key) = 0;
|
||||
|
|
|
@ -329,7 +329,6 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int targetBytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
int matchIndex,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
FDBFuture* (*transactionGetVersionstamp)(FDBTransaction* tr);
|
||||
|
@ -492,7 +491,6 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
@ -731,7 +729,6 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
|
|
@ -396,7 +396,6 @@ public:
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False);
|
||||
|
||||
|
@ -406,7 +405,6 @@ private:
|
|||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse);
|
||||
|
||||
|
|
|
@ -54,7 +54,6 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex = MATCH_INDEX_ALL,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
|
|
|
@ -115,7 +115,6 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override;
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ public:
|
|||
KeySelector end,
|
||||
Key mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
|
|
|
@ -453,7 +453,6 @@ struct GetMappedKeyValuesRequest : TimedRequest {
|
|||
KeyRef mapper;
|
||||
Version version; // or latestVersion
|
||||
int limit, limitBytes;
|
||||
int matchIndex;
|
||||
Optional<TagSet> tags;
|
||||
Optional<ReadOptions> options;
|
||||
ReplyPromise<GetMappedKeyValuesReply> reply;
|
||||
|
@ -480,7 +479,6 @@ struct GetMappedKeyValuesRequest : TimedRequest {
|
|||
tenantInfo,
|
||||
options,
|
||||
ssLatestCommitVersions,
|
||||
matchIndex,
|
||||
arena);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -165,7 +165,6 @@ public:
|
|||
const KeySelectorRef& end,
|
||||
const StringRef& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
|
|
|
@ -55,7 +55,6 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) = 0;
|
||||
|
||||
|
@ -128,10 +127,9 @@ struct FlowTransactionWrapper : public TransactionWrapper {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) override {
|
||||
return transaction.getMappedRange(begin, end, mapper, limits, matchIndex, snapshot, reverse);
|
||||
return transaction.getMappedRange(begin, end, mapper, limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
// Gets the key from the database specified by a given key selector
|
||||
|
@ -204,11 +202,9 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
|
|||
KeySelector& end,
|
||||
Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
int matchIndex,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) override {
|
||||
return unsafeThreadFutureToFuture(
|
||||
transaction->getMappedRange(begin, end, mapper, limits, matchIndex, snapshot, reverse));
|
||||
return unsafeThreadFutureToFuture(transaction->getMappedRange(begin, end, mapper, limits, snapshot, reverse));
|
||||
}
|
||||
|
||||
// Gets the key from the database specified by a given key selector
|
||||
|
|
|
@ -5079,7 +5079,6 @@ ACTOR Future<Void> mapSubquery(StorageServer* data,
|
|||
Version version,
|
||||
GetMappedKeyValuesRequest* pOriginalReq,
|
||||
Arena* pArena,
|
||||
int matchIndex,
|
||||
bool isRangeQuery,
|
||||
KeyValueRef* it,
|
||||
MappedKeyValueRef* kvm,
|
||||
|
@ -5087,11 +5086,8 @@ ACTOR Future<Void> mapSubquery(StorageServer* data,
|
|||
if (isRangeQuery) {
|
||||
// Use the mappedKey as the prefix of the range query.
|
||||
GetRangeReqAndResultRef getRange = wait(quickGetKeyValues(data, mappedKey, version, pArena, pOriginalReq));
|
||||
if ((!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY) ||
|
||||
(getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY) || matchIndex == MATCH_INDEX_ALL) {
|
||||
kvm->key = it->key;
|
||||
kvm->value = it->value;
|
||||
}
|
||||
kvm->key = it->key;
|
||||
kvm->value = it->value;
|
||||
kvm->reqAndResult = getRange;
|
||||
} else {
|
||||
GetValueReqAndResultRef getValue = wait(quickGetValue(data, mappedKey, version, pArena, pOriginalReq));
|
||||
|
@ -5120,7 +5116,6 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
StringRef mapper,
|
||||
// To provide span context, tags, debug ID to underlying lookups.
|
||||
GetMappedKeyValuesRequest* pOriginalReq,
|
||||
int matchIndex,
|
||||
int* remainingLimitBytes) {
|
||||
state GetMappedKeyValuesReply result;
|
||||
result.version = input.version;
|
||||
|
@ -5168,8 +5163,8 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
// std::cout << "key:" << printable(kvm->key) << ", value:" << printable(kvm->value)
|
||||
// << ", mappedKey:" << printable(mappedKey) << std::endl;
|
||||
|
||||
subqueries.push_back(mapSubquery(
|
||||
data, input.version, pOriginalReq, &result.arena, matchIndex, isRangeQuery, it, kvm, mappedKey));
|
||||
subqueries.push_back(
|
||||
mapSubquery(data, input.version, pOriginalReq, &result.arena, isRangeQuery, it, kvm, mappedKey));
|
||||
}
|
||||
wait(waitForAll(subqueries));
|
||||
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
|
||||
|
@ -5533,7 +5528,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
|
|||
try {
|
||||
// Map the scanned range to another list of keys and look up.
|
||||
GetMappedKeyValuesReply _r =
|
||||
wait(mapKeyValues(data, getKeyValuesReply, req.mapper, &req, req.matchIndex, &remainingLimitBytes));
|
||||
wait(mapKeyValues(data, getKeyValuesReply, req.mapper, &req, &remainingLimitBytes));
|
||||
r = _r;
|
||||
} catch (Error& e) {
|
||||
// catch txn_too_old here if prefetch runs for too long, and returns it back to client
|
||||
|
|
|
@ -163,21 +163,11 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
static bool validateRecord(int expectedId,
|
||||
const MappedKeyValueRef* it,
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex,
|
||||
bool isBoundary,
|
||||
bool allMissing) {
|
||||
// std::cout << "validateRecord expectedId " << expectedId << " it->key " << printable(it->key)
|
||||
// << " indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId))
|
||||
// << " matchIndex: " << matchIndex << std::endl;
|
||||
if (matchIndex == MATCH_INDEX_ALL || isBoundary) {
|
||||
ASSERT(it->key == indexEntryKey(expectedId));
|
||||
} else if (matchIndex == MATCH_INDEX_MATCHED_ONLY) {
|
||||
ASSERT(it->key == (allMissing ? EMPTY : indexEntryKey(expectedId)));
|
||||
} else if (matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
|
||||
ASSERT(it->key == (allMissing ? indexEntryKey(expectedId) : EMPTY));
|
||||
} else {
|
||||
ASSERT(it->key == EMPTY);
|
||||
}
|
||||
// << " indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId)) << std::endl;
|
||||
|
||||
ASSERT(it->key == indexEntryKey(expectedId));
|
||||
ASSERT(it->value == EMPTY);
|
||||
|
||||
if (self->SPLIT_RECORDS) {
|
||||
|
@ -223,7 +213,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
int byteLimit,
|
||||
int expectedBeginId,
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex,
|
||||
bool allMissing) {
|
||||
|
||||
std::cout << "start scanMappedRangeWithLimits beginSelector:" << beginSelector.toString()
|
||||
|
@ -238,7 +227,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
endSelector,
|
||||
mapper,
|
||||
GetRangeLimits(limit, byteLimit),
|
||||
matchIndex,
|
||||
self->snapshot,
|
||||
Reverse::False));
|
||||
// showResult(result);
|
||||
|
@ -253,8 +241,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
int cnt = 0;
|
||||
const MappedKeyValueRef* it = result.begin();
|
||||
for (; cnt < result.size(); cnt++, it++) {
|
||||
if (validateRecord(
|
||||
expectedId, it, self, matchIndex, cnt == 0 || cnt == result.size() - 1, allMissing)) {
|
||||
if (validateRecord(expectedId, it, self, allMissing)) {
|
||||
needRetry = true;
|
||||
break;
|
||||
}
|
||||
|
@ -301,7 +288,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
state int byteLimit = 10000;
|
||||
while (true) {
|
||||
MappedRangeResult result = wait(self->scanMappedRangeWithLimits(
|
||||
cx, beginSelector, endSelector, mapper, limit, byteLimit, beginId, self, MATCH_INDEX_ALL, false));
|
||||
cx, beginSelector, endSelector, mapper, limit, byteLimit, beginId, self, false));
|
||||
if (result.empty()) {
|
||||
TraceEvent("EmptyResult");
|
||||
}
|
||||
|
@ -315,7 +302,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
int endId,
|
||||
Key mapper,
|
||||
GetMappedRangeWorkload* self,
|
||||
int matchIndex,
|
||||
bool allMissing = false) {
|
||||
Key beginTuple = Tuple::makeTuple(prefix, INDEX, indexKey(beginId)).getDataAsStandalone();
|
||||
state KeySelector beginSelector = KeySelector(firstGreaterOrEqual(beginTuple));
|
||||
|
@ -328,16 +314,8 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
<< " FRACTION_INDEX_BYTELIMIT_PREFETCH: " << SERVER_KNOBS->FRACTION_INDEX_BYTELIMIT_PREFETCH
|
||||
<< " MAX_PARALLEL_QUICK_GET_VALUE: " << SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE << std::endl;
|
||||
while (true) {
|
||||
MappedRangeResult result = wait(self->scanMappedRangeWithLimits(cx,
|
||||
beginSelector,
|
||||
endSelector,
|
||||
mapper,
|
||||
limit,
|
||||
byteLimit,
|
||||
expectedBeginId,
|
||||
self,
|
||||
matchIndex,
|
||||
allMissing));
|
||||
MappedRangeResult result = wait(self->scanMappedRangeWithLimits(
|
||||
cx, beginSelector, endSelector, mapper, limit, byteLimit, expectedBeginId, self, allMissing));
|
||||
expectedBeginId += result.size();
|
||||
if (result.more) {
|
||||
if (result.empty()) {
|
||||
|
@ -410,7 +388,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
endSelector,
|
||||
mapper,
|
||||
GetRangeLimits(GetRangeLimits::ROW_LIMIT_UNLIMITED),
|
||||
MATCH_INDEX_ALL,
|
||||
self->snapshot,
|
||||
Reverse::False);
|
||||
}
|
||||
|
@ -570,23 +547,9 @@ struct GetMappedRangeWorkload : ApiWorkload {
|
|||
state Key mapper = getMapper(self, false);
|
||||
// The scanned range cannot be too large to hit get_mapped_key_values_has_more. We have a unit validating the
|
||||
// error is thrown when the range is large.
|
||||
const double r = deterministicRandom()->random01();
|
||||
int matchIndex = MATCH_INDEX_ALL;
|
||||
if (r < 0.25) {
|
||||
matchIndex = MATCH_INDEX_NONE;
|
||||
} else if (r < 0.5) {
|
||||
matchIndex = MATCH_INDEX_MATCHED_ONLY;
|
||||
} else if (r < 0.75) {
|
||||
matchIndex = MATCH_INDEX_UNMATCHED_ONLY;
|
||||
}
|
||||
state bool originalStrictlyEnforeByteLimit = SERVER_KNOBS->STRICTLY_ENFORCE_BYTE_LIMIT;
|
||||
(const_cast<ServerKnobs*> SERVER_KNOBS)->STRICTLY_ENFORCE_BYTE_LIMIT = deterministicRandom()->coinflip();
|
||||
wait(self->scanMappedRange(cx, 10, 490, mapper, self, matchIndex));
|
||||
|
||||
{
|
||||
Key mapperMissing = getMapper(self, true);
|
||||
wait(self->scanMappedRange(cx, 10, 490, mapperMissing, self, MATCH_INDEX_UNMATCHED_ONLY, true));
|
||||
}
|
||||
wait(self->scanMappedRange(cx, 10, 490, mapper, self));
|
||||
wait(testMetric(cx, self, 10, 490, mapper, self->checkStorageQueueSeconds));
|
||||
|
||||
// reset it to default
|
||||
|
|
Loading…
Reference in New Issue