Return error when getRangeAndFlatMap has more & Improve simulation tests (#6029)

This commit is contained in:
Tao Lin 2021-12-03 12:50:07 -08:00 committed by GitHub
parent c5562852a5
commit 9b0a9c4503
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 166 additions and 98 deletions

View File

@ -895,35 +895,48 @@ static std::string recordValue(const int i) {
return Tuple().append(dataOfRecord(i)).pack().toString();
}
TEST_CASE("fdb_transaction_get_range_and_flat_map") {
std::map<std::string, std::string> fillInRecords(int n) {
// Note: The user requested `prefix` should be added as the first element of the tuple that forms the key, rather
// than the prefix of the key. So we don't use key() or create_data() in this test.
std::map<std::string, std::string> data;
for (int i = 0; i < 3; i++) {
for (int i = 0; i < n; i++) {
data[indexEntryKey(i)] = EMPTY;
data[recordKey(i)] = recordValue(i);
}
insert_data(db, data);
return data;
}
GetRangeResult getIndexEntriesAndMap(int beginId, int endId, fdb::Transaction& tr) {
std::string indexEntryKeyBegin = indexEntryKey(beginId);
std::string indexEntryKeyEnd = indexEntryKey(endId);
std::string mapper = Tuple().append(prefix).append(RECORD).append("{K[3]}"_sr).pack().toString();
return get_range_and_flat_map(
tr,
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((const uint8_t*)indexEntryKeyBegin.c_str(), indexEntryKeyBegin.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((const uint8_t*)indexEntryKeyEnd.c_str(), indexEntryKeyEnd.size()),
(const uint8_t*)mapper.c_str(),
mapper.size(),
/* limit */ 0,
/* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
/* iteration */ 0,
/* snapshot */ true,
/* reverse */ 0);
}
TEST_CASE("fdb_transaction_get_range_and_flat_map") {
fillInRecords(20);
fdb::Transaction tr(db);
// get_range_and_flat_map is only support without RYW. This is a must!!!
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
while (1) {
auto result = get_range_and_flat_map(
tr,
// [0, 1]
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((const uint8_t*)indexEntryKey(0).c_str(), indexEntryKey(0).size()),
FDB_KEYSEL_FIRST_GREATER_THAN((const uint8_t*)indexEntryKey(1).c_str(), indexEntryKey(1).size()),
(const uint8_t*)mapper.c_str(),
mapper.size(),
/* limit */ 0,
/* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
/* iteration */ 0,
/* snapshot */ true,
/* reverse */ 0);
int beginId = 1;
int endId = 19;
auto result = getIndexEntriesAndMap(beginId, endId, tr);
if (result.err) {
fdb::EmptyFuture f1 = tr.on_error(result.err);
@ -931,28 +944,28 @@ TEST_CASE("fdb_transaction_get_range_and_flat_map") {
continue;
}
// Only the first 2 records are supposed to be returned.
if (result.kvs.size() < 2) {
CHECK(result.more);
// Retry.
continue;
}
CHECK(result.kvs.size() == 2);
int expectSize = endId - beginId;
CHECK(result.kvs.size() == expectSize);
CHECK(!result.more);
for (int i = 0; i < 2; i++) {
int id = beginId;
for (int i = 0; i < result.kvs.size(); i++, id++) {
const auto& [key, value] = result.kvs[i];
std::cout << "result[" << i << "]: key=" << key << ", value=" << value << std::endl;
// OUTPUT:
// result[0]: key=fdbRECORDprimary-key-of-record-00000000, value=data-of-record-00000000
// result[1]: key=fdbRECORDprimary-key-of-record-00000001, value=data-of-record-00000001
CHECK(recordKey(i).compare(key) == 0);
CHECK(recordValue(i).compare(value) == 0);
CHECK(recordKey(id).compare(key) == 0);
CHECK(recordValue(id).compare(value) == 0);
}
break;
}
}
TEST_CASE("fdb_transaction_get_range_and_flat_map get_key_values_and_map_has_more") {
fillInRecords(2000);
fdb::Transaction tr(db);
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
auto result = getIndexEntriesAndMap(100, 1900, tr);
CHECK(result.err == error_code_get_key_values_and_map_has_more);
}
TEST_CASE("fdb_transaction_get_range_and_flat_map_restricted_to_snapshot") {
std::string mapper = Tuple().append(prefix).append(RECORD).append("{K[3]}"_sr).pack().toString();
fdb::Transaction tr(db);

View File

@ -649,8 +649,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_STORAGE_COMMIT_TIME, 120.0 ); //The max fsync stall time on the storage server and tlog before marking a disk as failed
init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1;
init( ENABLE_CLEAR_RANGE_EAGER_READS, true );
init( QUICK_GET_VALUE_FALLBACK, true );
init( QUICK_GET_KEY_VALUES_FALLBACK, true );
init( QUICK_GET_VALUE_FALLBACK, false );
init( QUICK_GET_KEY_VALUES_FALLBACK, false );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -210,7 +210,7 @@ set(FDBSERVER_SRCS
workloads/MemoryLifetime.actor.cpp
workloads/MetricLogging.actor.cpp
workloads/MutationLogReaderCorrectness.actor.cpp
workloads/IndexPrefetchDemo.actor.cpp
workloads/GetRangeAndMap.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/Performance.actor.cpp
workloads/Ping.actor.cpp

View File

@ -92,6 +92,7 @@ bool canReplyWith(Error e) {
case error_code_quick_get_key_values_has_more:
case error_code_quick_get_value_miss:
case error_code_quick_get_key_values_miss:
case error_code_get_key_values_and_map_has_more:
// case error_code_all_alternatives_failed:
return true;
default:
@ -2072,21 +2073,30 @@ void merge(Arena& arena,
}
}
ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data, StringRef key, Version version) {
ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data,
StringRef key,
Version version,
// To provide span context, tags, debug ID to underlying lookups.
GetKeyValuesAndFlatMapRequest* pOriginalReq) {
if (data->shards[key]->isReadable()) {
try {
// TODO: Use a lower level API may be better? Or tweak priorities?
GetValueRequest req(Span().context, key, version, Optional<TagSet>(), Optional<UID>());
data->actors.add(data->readGuard(req, getValueQ));
GetValueRequest req(pOriginalReq->spanContext, key, version, pOriginalReq->tags, pOriginalReq->debugID);
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
// original request level, rather than individual underlying lookups. The reason is that throttle any
// individual underlying lookup will fail the original request, which is not productive.
data->actors.add(getValueQ(data, req));
GetValueReply reply = wait(req.reply.getFuture());
++data->counters.quickGetValueHit;
return reply.value;
if (!reply.error.present()) {
++data->counters.quickGetValueHit;
return reply.value;
}
// Otherwise fallback.
} catch (Error& e) {
// Fallback.
}
} else {
// Fallback.
}
// Otherwise fallback.
++data->counters.quickGetValueMiss;
if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) {
@ -2588,22 +2598,33 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
return Void();
}
ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data, StringRef prefix, Version version) {
ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data,
StringRef prefix,
Version version,
// To provide span context, tags, debug ID to underlying lookups.
GetKeyValuesAndFlatMapRequest* pOriginalReq) {
try {
// TODO: Use a lower level API may be better? Or tweak priorities?
GetKeyValuesRequest req;
req.spanContext = Span().context;
req.spanContext = pOriginalReq->spanContext;
req.arena = Arena();
req.begin = firstGreaterOrEqual(KeyRef(req.arena, prefix));
req.end = firstGreaterOrEqual(strinc(prefix, req.arena));
req.version = version;
req.tags = pOriginalReq->tags;
req.debugID = pOriginalReq->debugID;
data->actors.add(data->readGuard(req, getKeyValuesQ));
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
// original request level, rather than individual underlying lookups. The reason is that throttle any individual
// underlying lookup will fail the original request, which is not productive.
data->actors.add(getKeyValuesQ(data, req));
GetKeyValuesReply reply = wait(req.reply.getFuture());
++data->counters.quickGetKeyValuesHit;
// Convert GetKeyValuesReply to RangeResult.
return RangeResult(RangeResultRef(reply.data, reply.more), reply.arena);
if (!reply.error.present()) {
++data->counters.quickGetKeyValuesHit;
// Convert GetKeyValuesReply to RangeResult.
return RangeResult(RangeResultRef(reply.data, reply.more), reply.arena);
}
// Otherwise fallback.
} catch (Error& e) {
// Fallback.
}
@ -2801,9 +2822,16 @@ TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
return Void();
}
ACTOR Future<GetKeyValuesAndFlatMapReply> flatMap(StorageServer* data, GetKeyValuesReply input, StringRef mapper) {
ACTOR Future<GetKeyValuesAndFlatMapReply> flatMap(StorageServer* data,
GetKeyValuesReply input,
StringRef mapper,
// To provide span context, tags, debug ID to underlying lookups.
GetKeyValuesAndFlatMapRequest* pOriginalReq) {
state GetKeyValuesAndFlatMapReply result;
result.version = input.version;
if (input.more) {
throw get_key_values_and_map_has_more();
}
result.more = input.more;
result.cached = input.cached;
result.arena.dependsOn(input.arena);
@ -2821,7 +2849,7 @@ ACTOR Future<GetKeyValuesAndFlatMapReply> flatMap(StorageServer* data, GetKeyVal
if (isRangeQuery) {
// Use the mappedKey as the prefix of the range query.
RangeResult rangeResult = wait(quickGetKeyValues(data, mappedKey, input.version));
RangeResult rangeResult = wait(quickGetKeyValues(data, mappedKey, input.version, pOriginalReq));
if (rangeResult.more) {
// Probably the fan out is too large. The user should use the old way to query.
@ -2832,7 +2860,7 @@ ACTOR Future<GetKeyValuesAndFlatMapReply> flatMap(StorageServer* data, GetKeyVal
result.data.emplace_back(result.arena, rangeResult[i].key, rangeResult[i].value);
}
} else {
Optional<Value> valueOption = wait(quickGetValue(data, mappedKey, input.version));
Optional<Value> valueOption = wait(quickGetValue(data, mappedKey, input.version, pOriginalReq));
if (valueOption.present()) {
Value value = valueOption.get();
@ -2951,7 +2979,7 @@ ACTOR Future<Void> getKeyValuesAndFlatMapQ(StorageServer* data, GetKeyValuesAndF
state GetKeyValuesAndFlatMapReply r;
try {
// Map the scanned range to another list of keys and look up.
GetKeyValuesAndFlatMapReply _r = wait(flatMap(data, getKeyValuesReply, req.mapper));
GetKeyValuesAndFlatMapReply _r = wait(flatMap(data, getKeyValuesReply, req.mapper, &req));
r = _r;
} catch (Error& e) {
TraceEvent("FlatMapError").error(e);

View File

@ -1,5 +1,5 @@
/*
* IndexPrefetchDemo.actor.cpp
* GetRangeAndMap.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -24,24 +24,26 @@
#include "fdbclient/MutationLogReader.actor.h"
#include "fdbclient/Tuple.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/Knobs.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const Value EMPTY = Tuple().pack();
const KeyRef prefix = "prefix"_sr;
const KeyRef RECORD = "RECORD"_sr;
const KeyRef INDEX = "INDEX"_sr;
struct IndexPrefetchDemoWorkload : TestWorkload {
struct GetRangeAndMapWorkload : TestWorkload {
bool enabled;
const bool BAD_MAPPER = deterministicRandom()->random01() < 0.1;
IndexPrefetchDemoWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
GetRangeAndMapWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
}
std::string description() const override { return "IndexPrefetchDemo"; }
std::string description() const override { return "GetRangeAndMap"; }
Future<Void> start(Database const& cx) override {
if (enabled) {
@ -50,35 +52,41 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
return Void();
}
static Key primaryKey(int i) { return KeyRef("primary-key-of-record-" + std::to_string(i)); }
static Key indexKey(int i) { return KeyRef("index-key-of-record-" + std::to_string(i)); }
static Key data(int i) { return KeyRef("data-of-record-" + std::to_string(i)); }
static Key primaryKey(int i) { return Key(format("primary-key-of-record-%08d", i)); }
static Key indexKey(int i) { return Key(format("index-key-of-record-%08d", i)); }
static Value dataOfRecord(int i) { return Key(format("data-of-record-%08d", i)); }
static Key indexEntryKey(int i) {
return Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack();
}
static Key recordKey(int i) { return Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack(); }
static Value recordValue(int i) { return Tuple().append(dataOfRecord(i)).pack(); }
ACTOR Future<Void> fillInRecords(Database cx, int n) {
std::cout << "start fillInRecords n=" << n << std::endl;
// TODO: When n is large, split into multiple transactions.
state Transaction tr(cx);
try {
tr.reset();
for (int i = 0; i < n; i++) {
tr.set(Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack(),
Tuple().append(data(i)).pack());
tr.set(Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack(),
Tuple().pack());
loop {
std::cout << "start fillInRecords n=" << n << std::endl;
// TODO: When n is large, split into multiple transactions.
state Transaction tr(cx);
try {
tr.reset();
for (int i = 0; i < n; i++) {
tr.set(recordKey(i), recordValue(i));
tr.set(indexEntryKey(i), EMPTY);
}
wait(tr.commit());
std::cout << "finished fillInRecords with version " << tr.getCommittedVersion() << std::endl;
break;
} catch (Error& e) {
std::cout << "failed fillInRecords, retry" << std::endl;
wait(tr.onError(e));
}
wait(tr.commit());
std::cout << "finished fillInRecords" << std::endl;
} catch (Error& e) {
std::cout << "failed fillInRecords" << std::endl;
wait(tr.onError(e));
}
return Void();
}
static void showResult(const RangeResult& result) {
std::cout << "result size: " << result.size() << std::endl;
const KeyValueRef* it = result.begin();
for (; it != result.end(); it++) {
for (const KeyValueRef* it = result.begin(); it != result.end(); it++) {
std::cout << "key=" << it->key.printable() << ", value=" << it->value.printable() << std::endl;
}
}
@ -90,7 +98,7 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
try {
tr.reset();
RangeResult result = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY));
showResult(result);
// showResult(result);
} catch (Error& e) {
wait(tr.onError(e));
}
@ -98,7 +106,15 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> scanRangeAndFlatMap(Database cx, KeyRange range, Key mapper, IndexPrefetchDemoWorkload* self) {
ACTOR Future<Void> scanRangeAndFlatMap(Database cx,
int beginId,
int endId,
Key mapper,
GetRangeAndMapWorkload* self) {
Key someIndexesBegin = Tuple().append(prefix).append(INDEX).append(indexKey(beginId)).getDataAsStandalone();
Key someIndexesEnd = Tuple().append(prefix).append(INDEX).append(indexKey(endId)).getDataAsStandalone();
state KeyRange range = KeyRangeRef(someIndexesBegin, someIndexesEnd);
std::cout << "start scanRangeAndFlatMap " << range.toString() << std::endl;
// TODO: When n is large, split into multiple transactions.
state Transaction tr(cx);
@ -110,16 +126,27 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
mapper,
GetRangeLimits(CLIENT_KNOBS->TOO_MANY),
Snapshot::True));
showResult(result);
// showResult(result);
if (self->BAD_MAPPER) {
TraceEvent("IndexPrefetchDemoWorkloadShouldNotReachable").detail("ResultSize", result.size());
TraceEvent("GetRangeAndMapWorkloadShouldNotReachable").detail("ResultSize", result.size());
}
// result size: 2
// Examples:
// key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-2\x00, value=\x01data-of-record-2\x00
// key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-3\x00, value=\x01data-of-record-3\x00
std::cout << "result.size()=" << result.size() << std::endl;
std::cout << "result.more=" << result.more << std::endl;
ASSERT(result.size() == endId - beginId);
int id = beginId;
for (const KeyValueRef* it = result.begin(); it != result.end(); it++) {
ASSERT(it->key == recordKey(id));
ASSERT(it->value == recordValue(id));
id++;
}
} catch (Error& e) {
if (self->BAD_MAPPER && e.code() == error_code_mapper_bad_index) {
TraceEvent("IndexPrefetchDemoWorkloadBadMapperDetected").error(e);
if ((self->BAD_MAPPER && e.code() == error_code_mapper_bad_index) ||
(!SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK && e.code() == error_code_quick_get_value_miss) ||
(!SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK && e.code() == error_code_quick_get_key_values_miss)) {
TraceEvent("GetRangeAndMapWorkloadExpectedErrorDetected").error(e);
} else {
wait(tr.onError(e));
}
@ -128,18 +155,15 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> _start(Database cx, IndexPrefetchDemoWorkload* self) {
TraceEvent("IndexPrefetchDemoWorkloadConfig").detail("BadMapper", self->BAD_MAPPER);
ACTOR Future<Void> _start(Database cx, GetRangeAndMapWorkload* self) {
TraceEvent("GetRangeAndMapWorkloadConfig").detail("BadMapper", self->BAD_MAPPER);
// TODO: Use toml to config
wait(self->fillInRecords(cx, 5));
wait(self->fillInRecords(cx, 200));
wait(self->scanRange(cx, normalKeys));
Key someIndexesBegin = Tuple().append(prefix).append(INDEX).append(indexKey(2)).getDataAsStandalone();
Key someIndexesEnd = Tuple().append(prefix).append(INDEX).append(indexKey(4)).getDataAsStandalone();
state KeyRange someIndexes = KeyRangeRef(someIndexesBegin, someIndexesEnd);
wait(self->scanRange(cx, someIndexes));
// wait(self->scanRange(cx, someIndexes));
Tuple mapperTuple;
if (self->BAD_MAPPER) {
@ -148,7 +172,9 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
mapperTuple << prefix << RECORD << "{K[3]}"_sr;
}
Key mapper = mapperTuple.getDataAsStandalone();
wait(self->scanRangeAndFlatMap(cx, someIndexes, mapper, self));
// The scanned range cannot be too large to hit get_key_values_and_map_has_more. We have a unit validating the
// error is thrown when the range is large.
wait(self->scanRangeAndFlatMap(cx, 10, 190, mapper, self));
return Void();
}
@ -157,4 +183,4 @@ struct IndexPrefetchDemoWorkload : TestWorkload {
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<IndexPrefetchDemoWorkload> IndexPrefetchDemoWorkloadFactory("IndexPrefetchDemo");
WorkloadFactory<GetRangeAndMapWorkload> GetRangeAndMapWorkloadFactory("GetRangeAndMap");

View File

@ -170,6 +170,7 @@ ERROR( quick_get_value_miss, 2034, "Found a mapped key that is not served in the
ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served in the same SS" )
ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( get_key_values_and_map_has_more, 2038, "getRangeAndFlatMap does not support continuation for now" )
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )

View File

@ -152,7 +152,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
add_fdb_test(TEST_FILES fast/MutationLogReaderCorrectness.toml)
add_fdb_test(TEST_FILES fast/IndexPrefetchDemo.toml)
add_fdb_test(TEST_FILES fast/GetRangeAndMap.toml)
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)
add_fdb_test(TEST_FILES fast/RandomSelector.toml)
add_fdb_test(TEST_FILES fast/RandomUnitTests.toml)

View File

@ -0,0 +1,6 @@
[[test]]
testTitle = 'GetRangeAndMap'
useDB = true
[[test.workload]]
testName = 'GetRangeAndMap'

View File

@ -1,6 +0,0 @@
[[test]]
testTitle = 'IndexPrefetchDemo'
useDB = true
[[test.workload]]
testName = 'IndexPrefetchDemo'