Move get_range retry logic to caller

This commit is contained in:
Lukas Joswiak 2020-10-09 15:55:25 -07:00
parent 2d8576c2df
commit 27c84550f1
1 changed files with 106 additions and 87 deletions

View File

@ -166,10 +166,13 @@ struct GetRangeResult {
std::vector<std::pair<std::string, std::string>> kvs;
// True if values remain in the key range requested.
bool more;
// Set to a non-zero value if an error occurred during the transaction.
fdb_error_t err;
};
// Helper function to get a range of kv pairs. Returns a GetRangeResult struct
// containing the results of the range read.
// containing the results of the range read. Caller is responsible for checking
// error on failure and retrying if necessary.
GetRangeResult
get_range(fdb::Transaction& tr, const uint8_t* begin_key_name,
int begin_key_name_length, fdb_bool_t begin_or_equal,
@ -177,32 +180,28 @@ get_range(fdb::Transaction& tr, const uint8_t* begin_key_name,
int end_key_name_length, fdb_bool_t end_or_equal, int end_offset,
int limit, int target_bytes, FDBStreamingMode mode,
int iteration, fdb_bool_t snapshot, fdb_bool_t reverse) {
while (1) {
fdb::KeyValueArrayFuture f1 = tr.get_range(
begin_key_name, begin_key_name_length, begin_or_equal, begin_offset,
end_key_name, end_key_name_length, end_or_equal, end_offset, limit,
target_bytes, mode, iteration, snapshot, reverse);
fdb::KeyValueArrayFuture f1 = tr.get_range(
begin_key_name, begin_key_name_length, begin_or_equal, begin_offset,
end_key_name, end_key_name_length, end_or_equal, end_offset, limit,
target_bytes, mode, iteration, snapshot, reverse);
fdb_error_t err = wait_future(f1);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
const FDBKeyValue *out_kv;
int out_count;
fdb_bool_t out_more;
fdb_check(f1.get(&out_kv, &out_count, &out_more));
std::vector<std::pair<std::string, std::string>> results;
for (int i = 0; i < out_count; ++i) {
std::string key((const char *)out_kv[i].key, out_kv[i].key_length);
std::string value((const char *)out_kv[i].value, out_kv[i].value_length);
results.push_back(std::make_pair(key, value));
}
return GetRangeResult{results, out_more != 0};
fdb_error_t err = wait_future(f1);
if (err) {
return GetRangeResult{{}, false, err};
}
const FDBKeyValue *out_kv;
int out_count;
fdb_bool_t out_more;
fdb_check(f1.get(&out_kv, &out_count, &out_more));
std::vector<std::pair<std::string, std::string>> results;
for (int i = 0; i < out_count; ++i) {
std::string key((const char *)out_kv[i].key, out_kv[i].key_length);
std::string value((const char *)out_kv[i].value, out_kv[i].value_length);
results.push_back(std::make_pair(key, value));
}
return GetRangeResult{results, out_more != 0, 0};
}
// Clears all data in the database.
@ -791,65 +790,76 @@ TEST_CASE("fdb_transaction_get_range reverse") {
insert_data(db, data);
fdb::Transaction tr(db);
auto result = get_range(
tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1, /* limit */ 0,
/* target_bytes */ 0, /* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
/* iteration */ 0, /* snapshot */ false, /* reverse */ 1);
while (1) {
auto result = get_range(
tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1,
/* limit */ 0, /* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL, /* iteration */ 0,
/* snapshot */ false, /* reverse */ 1);
CHECK(result.kvs.size() > 0);
CHECK(result.kvs.size() <= 4);
if (result.kvs.size() < 4) {
CHECK(result.more);
}
if (result.err) {
fdb::EmptyFuture f1 = tr.on_error(result.err);
fdb_check(wait_future(f1));
continue;
}
// Read data in reverse order, keeping in mind that out_count might be
// smaller than requested.
auto it = data.rbegin();
std::advance(it, data.size() - result.kvs.size());
for (auto results_it = result.kvs.begin(); it != data.rend(); ++it) {
std::string data_key = it->first;
std::string data_value = it->second;
CHECK(result.kvs.size() > 0);
CHECK(result.kvs.size() <= 4);
if (result.kvs.size() < 4) {
CHECK(result.more);
}
auto [key, value] = *results_it++;
// Read data in reverse order, keeping in mind that out_count might be
// smaller than requested.
auto it = data.rbegin();
std::advance(it, data.size() - result.kvs.size());
for (auto results_it = result.kvs.begin(); it != data.rend(); ++it) {
std::string data_key = it->first;
std::string data_value = it->second;
CHECK(data_key.compare(key) == 0);
CHECK(data[data_key].compare(value) == 0);
auto [key, value] = *results_it++;
CHECK(data_key.compare(key) == 0);
CHECK(data[data_key].compare(value) == 0);
}
break;
}
}
// Flaky (or broken)
// TEST_CASE("fdb_transaction_get_range limit") {
// std::map<std::string, std::string> data =
// create_data({ { "a", "1" }, { "b", "2" }, { "c", "3" }, { "d", "4" } });
// insert_data(db, data);
//
// fdb::Transaction tr(db);
//
// FDBKeyValue const *out_kv;
// int out_count;
// int out_more;
// get_range(tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
// FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1,
// /* limit */ 2, /* target_bytes */ 0,
// /* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL, /* iteration */ 0,
// /* snapshot */ false, /* reverse */ 0, &out_kv, &out_count, &out_more);
//
// CHECK(out_count > 0);
// CHECK(out_count <= 2);
// if (out_count < 4) {
// CHECK(out_more);
// }
//
// for (int i = 0; i < out_count; ++i) {
// FDBKeyValue kv = *out_kv++;
//
// std::string key((const char *)kv.key, kv.key_length);
// std::string value((const char *)kv.value, kv.value_length);
//
// CHECK(data[key].compare(value) == 0);
// }
// }
TEST_CASE("fdb_transaction_get_range limit") {
std::map<std::string, std::string> data =
create_data({ { "a", "1" }, { "b", "2" }, { "c", "3" }, { "d", "4" } });
insert_data(db, data);
fdb::Transaction tr(db);
while (1) {
auto result = get_range(
tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1,
/* limit */ 2, /* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL, /* iteration */ 0,
/* snapshot */ false, /* reverse */ 0);
if (result.err) {
fdb::EmptyFuture f1 = tr.on_error(result.err);
fdb_check(wait_future(f1));
continue;
}
CHECK(result.kvs.size() > 0);
CHECK(result.kvs.size() <= 2);
if (result.kvs.size() < 4) {
CHECK(result.more);
}
for (int i = 0; i < result.kvs.size(); ++i) {
auto [key, value] = result.kvs[i];
CHECK(data[key].compare(value) == 0);
}
break;
}
}
TEST_CASE("fdb_transaction_get_range FDB_STREAMING_MODE_EXACT") {
std::map<std::string, std::string> data =
@ -857,19 +867,28 @@ TEST_CASE("fdb_transaction_get_range FDB_STREAMING_MODE_EXACT") {
insert_data(db, data);
fdb::Transaction tr(db);
while (1) {
auto result = get_range(
tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1,
/* limit */ 3, /* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_EXACT, /* iteration */ 0,
/* snapshot */ false, /* reverse */ 0);
auto result = get_range(
tr, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(KEY("a"), KEYSIZE("a")),
FDB_KEYSEL_LAST_LESS_OR_EQUAL(KEY("d"), KEYSIZE("d")) + 1, /* limit */ 3,
/* target_bytes */ 0, /* FDBStreamingMode */ FDB_STREAMING_MODE_EXACT,
/* iteration */ 0, /* snapshot */ false, /* reverse */ 0);
if (result.err) {
fdb::EmptyFuture f1 = tr.on_error(result.err);
fdb_check(wait_future(f1));
continue;
}
CHECK(result.kvs.size() == 3);
CHECK(result.more);
CHECK(result.kvs.size() == 3);
CHECK(result.more);
for (int i = 0; i < result.kvs.size(); ++i) {
auto [key, value] = result.kvs[i];
CHECK(data[key].compare(value) == 0);
for (int i = 0; i < result.kvs.size(); ++i) {
auto [key, value] = result.kvs[i];
CHECK(data[key].compare(value) == 0);
}
break;
}
}