Introduce getRangeAndHop to push computations down to FDB
This commit is contained in:
parent
ed0558bad9
commit
0853661d13
|
@ -436,21 +436,12 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_addresses_for_key(FDBTransac
|
|||
return (FDBFuture*)(TXN(tr)->getAddressesForKey(KeyRef(key_name, key_name_length)).extractPtr());
|
||||
}
|
||||
|
||||
FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
// Set to the actual limit, target_bytes, and reverse.
|
||||
FDBFuture* validate_and_update_parameters(int& limit,
|
||||
int& target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
fdb_bool_t& reverse) {
|
||||
/* This method may be called with a runtime API version of 13, in
|
||||
which negative row limits are a reverse range read */
|
||||
if (g_api_version <= 13 && limit < 0) {
|
||||
|
@ -500,6 +491,27 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
|
|||
else if (mode_bytes != GetRangeLimits::BYTE_LIMIT_UNLIMITED)
|
||||
target_bytes = std::min(target_bytes, mode_bytes);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse);
|
||||
if (r != nullptr)
|
||||
return r;
|
||||
return (
|
||||
FDBFuture*)(TXN(tr)
|
||||
->getRange(
|
||||
|
@ -511,6 +523,60 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
FDBFuture* fdb_transaction_get_range_and_hop_impl(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
uint8_t const* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse);
|
||||
if (r != nullptr)
|
||||
return r;
|
||||
return (
|
||||
FDBFuture*)(TXN(tr)
|
||||
->getRangeAndHop(
|
||||
KeySelectorRef(KeyRef(begin_key_name, begin_key_name_length), begin_or_equal, begin_offset),
|
||||
KeySelectorRef(KeyRef(end_key_name, end_key_name_length), end_or_equal, end_offset),
|
||||
StringRef(hop_info_name, hop_info_name_length),
|
||||
GetRangeLimits(limit, target_bytes),
|
||||
snapshot,
|
||||
reverse)
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
// TODO: Support FDB_API_ADDED in generate_asm.py and then this can be replaced with fdb_api_ptr_unimpl.
|
||||
FDBFuture* fdb_transaction_get_range_and_hop_v699(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
uint8_t const* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
fprintf(stderr, "UNIMPLEMENTED FDB API FUNCTION\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
FDBFuture* fdb_transaction_get_range_selector_v13(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
|
@ -702,6 +768,7 @@ extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version
|
|||
// WARNING: use caution when implementing removed functions by calling public API functions. This can lead to
|
||||
// undesired behavior when using the multi-version API. Instead, it is better to have both the removed and public
|
||||
// functions call an internal implementation function. See fdb_create_database_impl for an example.
|
||||
FDB_API_CHANGED(fdb_transaction_get_range_and_hop, 700);
|
||||
FDB_API_REMOVED(fdb_future_get_version, 620);
|
||||
FDB_API_REMOVED(fdb_create_cluster, 610);
|
||||
FDB_API_REMOVED(fdb_cluster_create_database, 610);
|
||||
|
|
|
@ -244,6 +244,24 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range(FDBTransaction
|
|||
fdb_bool_t reverse);
|
||||
#endif
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range_and_hop(FDBTransaction* tr,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
uint8_t const* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
DLLEXPORT void fdb_transaction_set(FDBTransaction* tr,
|
||||
uint8_t const* key_name,
|
||||
int key_name_length,
|
||||
|
|
|
@ -193,6 +193,41 @@ KeyValueArrayFuture Transaction::get_range(const uint8_t* begin_key_name,
|
|||
reverse));
|
||||
}
|
||||
|
||||
KeyValueArrayFuture Transaction::get_range_and_hop(const uint8_t* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
const uint8_t* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
const uint8_t* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
return KeyValueArrayFuture(fdb_transaction_get_range_and_hop(tr_,
|
||||
begin_key_name,
|
||||
begin_key_name_length,
|
||||
begin_or_equal,
|
||||
begin_offset,
|
||||
end_key_name,
|
||||
end_key_name_length,
|
||||
end_or_equal,
|
||||
end_offset,
|
||||
hop_info_name,
|
||||
hop_info_name_length,
|
||||
limit,
|
||||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
snapshot,
|
||||
reverse));
|
||||
}
|
||||
|
||||
EmptyFuture Transaction::watch(std::string_view key) {
|
||||
return EmptyFuture(fdb_transaction_watch(tr_, (const uint8_t*)key.data(), key.size()));
|
||||
}
|
||||
|
|
|
@ -219,6 +219,24 @@ public:
|
|||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
// Returns a future which will be set to an FDBKeyValue array.
|
||||
KeyValueArrayFuture get_range_and_hop(const uint8_t* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
const uint8_t* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
const uint8_t* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
|
||||
// Wrapper around fdb_transaction_watch. Returns a future representing an
|
||||
// empty value.
|
||||
EmptyFuture watch(std::string_view key);
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#define DOCTEST_CONFIG_IMPLEMENT
|
||||
#include "doctest.h"
|
||||
#include "fdbclient/rapidjson/document.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
|
||||
#include "flow/config.h"
|
||||
|
||||
|
@ -76,7 +77,7 @@ fdb_error_t wait_future(fdb::Future& f) {
|
|||
// Given a string s, returns the "lowest" string greater than any string that
|
||||
// starts with s. Taken from
|
||||
// https://github.com/apple/foundationdb/blob/e7d72f458c6a985fdfa677ae021f357d6f49945b/flow/flow.cpp#L223.
|
||||
std::string strinc(const std::string& s) {
|
||||
std::string strinc_str(const std::string& s) {
|
||||
int index = -1;
|
||||
for (index = s.size() - 1; index >= 0; --index) {
|
||||
if ((uint8_t)s[index] != 255) {
|
||||
|
@ -92,16 +93,16 @@ std::string strinc(const std::string& s) {
|
|||
return r;
|
||||
}
|
||||
|
||||
TEST_CASE("strinc") {
|
||||
CHECK(strinc("a").compare("b") == 0);
|
||||
CHECK(strinc("y").compare("z") == 0);
|
||||
CHECK(strinc("!").compare("\"") == 0);
|
||||
CHECK(strinc("*").compare("+") == 0);
|
||||
CHECK(strinc("fdb").compare("fdc") == 0);
|
||||
CHECK(strinc("foundation database 6").compare("foundation database 7") == 0);
|
||||
TEST_CASE("strinc_str") {
|
||||
CHECK(strinc_str("a").compare("b") == 0);
|
||||
CHECK(strinc_str("y").compare("z") == 0);
|
||||
CHECK(strinc_str("!").compare("\"") == 0);
|
||||
CHECK(strinc_str("*").compare("+") == 0);
|
||||
CHECK(strinc_str("fdb").compare("fdc") == 0);
|
||||
CHECK(strinc_str("foundation database 6").compare("foundation database 7") == 0);
|
||||
|
||||
char terminated[] = { 'a', 'b', '\xff' };
|
||||
CHECK(strinc(std::string(terminated, 3)).compare("ac") == 0);
|
||||
CHECK(strinc_str(std::string(terminated, 3)).compare("ac") == 0);
|
||||
}
|
||||
|
||||
// Helper function to add `prefix` to all keys in the given map. Returns a new
|
||||
|
@ -117,7 +118,7 @@ std::map<std::string, std::string> create_data(std::map<std::string, std::string
|
|||
// Clears all data in the database, then inserts the given key value pairs.
|
||||
void insert_data(FDBDatabase* db, const std::map<std::string, std::string>& data) {
|
||||
fdb::Transaction tr(db);
|
||||
auto end_key = strinc(prefix);
|
||||
auto end_key = strinc_str(prefix);
|
||||
while (1) {
|
||||
tr.clear_range(prefix, end_key);
|
||||
for (const auto& [key, val] : data) {
|
||||
|
@ -224,6 +225,59 @@ GetRangeResult get_range(fdb::Transaction& tr,
|
|||
return GetRangeResult{ results, out_more != 0, 0 };
|
||||
}
|
||||
|
||||
GetRangeResult get_range_and_hop(fdb::Transaction& tr,
|
||||
const uint8_t* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
fdb_bool_t begin_or_equal,
|
||||
int begin_offset,
|
||||
const uint8_t* end_key_name,
|
||||
int end_key_name_length,
|
||||
fdb_bool_t end_or_equal,
|
||||
int end_offset,
|
||||
const uint8_t* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int target_bytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse) {
|
||||
fdb::KeyValueArrayFuture f1 = tr.get_range_and_hop(begin_key_name,
|
||||
begin_key_name_length,
|
||||
begin_or_equal,
|
||||
begin_offset,
|
||||
end_key_name,
|
||||
end_key_name_length,
|
||||
end_or_equal,
|
||||
end_offset,
|
||||
hop_info_name,
|
||||
hop_info_name_length,
|
||||
limit,
|
||||
target_bytes,
|
||||
mode,
|
||||
iteration,
|
||||
snapshot,
|
||||
reverse);
|
||||
|
||||
fdb_error_t err = wait_future(f1);
|
||||
if (err) {
|
||||
return GetRangeResult{ {}, false, err };
|
||||
}
|
||||
|
||||
const FDBKeyValue* out_kv;
|
||||
int out_count;
|
||||
fdb_bool_t out_more;
|
||||
fdb_check(f1.get(&out_kv, &out_count, &out_more));
|
||||
|
||||
std::vector<std::pair<std::string, std::string>> results;
|
||||
for (int i = 0; i < out_count; ++i) {
|
||||
std::string key((const char*)out_kv[i].key, out_kv[i].key_length);
|
||||
std::string value((const char*)out_kv[i].value, out_kv[i].value_length);
|
||||
results.emplace_back(key, value);
|
||||
}
|
||||
return GetRangeResult{ results, out_more != 0, 0 };
|
||||
}
|
||||
|
||||
// Clears all data in the database.
|
||||
void clear_data(FDBDatabase* db) {
|
||||
insert_data(db, {});
|
||||
|
@ -819,6 +873,86 @@ TEST_CASE("fdb_transaction_set_read_version future_version") {
|
|||
CHECK(err == 1009); // future_version
|
||||
}
|
||||
|
||||
const std::string EMPTY = Tuple().pack().toString();
|
||||
const KeyRef RECORD = "RECORD"_sr;
|
||||
const KeyRef INDEX = "INDEX"_sr;
|
||||
static KeyRef primaryKey(const int i) {
|
||||
return KeyRef(format("primary-key-of-record-%08d", i));
|
||||
}
|
||||
static KeyRef indexKey(const int i) {
|
||||
return KeyRef(format("index-key-of-record-%08d", i));
|
||||
}
|
||||
static ValueRef dataOfRecord(const int i) {
|
||||
return KeyRef(format("data-of-record-%08d", i));
|
||||
}
|
||||
static std::string indexEntryKey(const int i) {
|
||||
return Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack().toString();
|
||||
}
|
||||
static std::string recordKey(const int i) {
|
||||
return Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack().toString();
|
||||
}
|
||||
static std::string recordValue(const int i) {
|
||||
return Tuple().append(dataOfRecord(i)).pack().toString();
|
||||
}
|
||||
|
||||
TEST_CASE("fdb_transaction_get_range_and_hop") {
|
||||
// Note: The user requested `prefix` should be added as the first element of the tuple that forms the key, rather
|
||||
// than the prefix of the key. So we don't use key() or create_data() in this test.
|
||||
std::map<std::string, std::string> data;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
data[indexEntryKey(i)] = EMPTY;
|
||||
data[recordKey(i)] = recordValue(i);
|
||||
}
|
||||
insert_data(db, data);
|
||||
|
||||
std::string hop_info = Tuple().append(prefix).append(RECORD).append("{K[3]}"_sr).pack().toString();
|
||||
|
||||
fdb::Transaction tr(db);
|
||||
// get_range_and_hop is only support without RYW. This is a must!!!
|
||||
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
|
||||
while (1) {
|
||||
auto result = get_range_and_hop(
|
||||
tr,
|
||||
// [0, 1]
|
||||
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((const uint8_t*)indexEntryKey(0).c_str(), indexEntryKey(0).size()),
|
||||
FDB_KEYSEL_FIRST_GREATER_THAN((const uint8_t*)indexEntryKey(1).c_str(), indexEntryKey(1).size()),
|
||||
(const uint8_t*)hop_info.c_str(),
|
||||
hop_info.size(),
|
||||
/* limit */ 0,
|
||||
/* target_bytes */ 0,
|
||||
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
|
||||
/* iteration */ 0,
|
||||
/* snapshot */ false,
|
||||
/* reverse */ 0);
|
||||
|
||||
if (result.err) {
|
||||
fdb::EmptyFuture f1 = tr.on_error(result.err);
|
||||
fdb_check(wait_future(f1));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only the first 2 records are supposed to be returned.
|
||||
if (result.kvs.size() < 2) {
|
||||
CHECK(result.more);
|
||||
// Retry.
|
||||
continue;
|
||||
}
|
||||
|
||||
CHECK(result.kvs.size() == 2);
|
||||
CHECK(!result.more);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
const auto& [key, value] = result.kvs[i];
|
||||
std::cout << "result[" << i << "]: key=" << key << ", value=" << value << std::endl;
|
||||
// OUTPUT:
|
||||
// result[0]: key=fdbRECORDprimary-key-of-record-00000000, value=data-of-record-00000000
|
||||
// result[1]: key=fdbRECORDprimary-key-of-record-00000001, value=data-of-record-00000001
|
||||
CHECK(recordKey(i).compare(key) == 0);
|
||||
CHECK(recordValue(i).compare(value) == 0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("fdb_transaction_get_range reverse") {
|
||||
std::map<std::string, std::string> data = create_data({ { "a", "1" }, { "b", "2" }, { "c", "3" }, { "d", "4" } });
|
||||
insert_data(db, data);
|
||||
|
@ -1726,7 +1860,7 @@ TEST_CASE("fdb_transaction_add_conflict_range") {
|
|||
|
||||
fdb::Transaction tr2(db);
|
||||
while (1) {
|
||||
fdb_check(tr2.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE));
|
||||
fdb_check(tr2.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE));
|
||||
fdb::EmptyFuture f1 = tr2.commit();
|
||||
|
||||
fdb_error_t err = wait_future(f1);
|
||||
|
@ -1739,8 +1873,8 @@ TEST_CASE("fdb_transaction_add_conflict_range") {
|
|||
}
|
||||
|
||||
while (1) {
|
||||
fdb_check(tr.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_READ));
|
||||
fdb_check(tr.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE));
|
||||
fdb_check(tr.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_READ));
|
||||
fdb_check(tr.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE));
|
||||
fdb::EmptyFuture f1 = tr.commit();
|
||||
|
||||
fdb_error_t err = wait_future(f1);
|
||||
|
@ -2217,7 +2351,7 @@ TEST_CASE("commit_does_not_reset") {
|
|||
continue;
|
||||
}
|
||||
|
||||
fdb_check(tr2.add_conflict_range(key("foo"), strinc(key("foo")), FDB_CONFLICT_RANGE_TYPE_READ));
|
||||
fdb_check(tr2.add_conflict_range(key("foo"), strinc_str(key("foo")), FDB_CONFLICT_RANGE_TYPE_READ));
|
||||
tr2.set(key("foo"), "bar");
|
||||
fdb::EmptyFuture tr2CommitFuture = tr2.commit();
|
||||
err = wait_future(tr2CommitFuture);
|
||||
|
|
|
@ -756,6 +756,75 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeAndHop(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong tPtr,
|
||||
jbyteArray keyBeginBytes,
|
||||
jboolean orEqualBegin,
|
||||
jint offsetBegin,
|
||||
jbyteArray keyEndBytes,
|
||||
jboolean orEqualEnd,
|
||||
jint offsetEnd,
|
||||
jbyteArray hopInfoBytes,
|
||||
jint rowLimit,
|
||||
jint targetBytes,
|
||||
jint streamingMode,
|
||||
jint iteration,
|
||||
jboolean snapshot,
|
||||
jboolean reverse) {
|
||||
if (!tPtr || !keyBeginBytes || !keyEndBytes || !hopInfoBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTransaction* tr = (FDBTransaction*)tPtr;
|
||||
|
||||
uint8_t* barrBegin = (uint8_t*)jenv->GetByteArrayElements(keyBeginBytes, JNI_NULL);
|
||||
if (!barrBegin) {
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* barrEnd = (uint8_t*)jenv->GetByteArrayElements(keyEndBytes, JNI_NULL);
|
||||
if (!barrEnd) {
|
||||
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* barrHopInfo = (uint8_t*)jenv->GetByteArrayElements(hopInfoBytes, JNI_NULL);
|
||||
if (!barrHopInfo) {
|
||||
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT);
|
||||
if (!jenv->ExceptionOccurred())
|
||||
throwRuntimeEx(jenv, "Error getting handle to native resources");
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture* f = fdb_transaction_get_range_and_hop(tr,
|
||||
barrBegin,
|
||||
jenv->GetArrayLength(keyBeginBytes),
|
||||
orEqualBegin,
|
||||
offsetBegin,
|
||||
barrEnd,
|
||||
jenv->GetArrayLength(keyEndBytes),
|
||||
orEqualEnd,
|
||||
offsetEnd,
|
||||
barrHopInfo,
|
||||
jenv->GetArrayLength(hopInfoBytes),
|
||||
rowLimit,
|
||||
targetBytes,
|
||||
(FDBStreamingMode)streamingMode,
|
||||
iteration,
|
||||
snapshot,
|
||||
reverse);
|
||||
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT);
|
||||
jenv->ReleaseByteArrayElements(hopInfoBytes, (jbyte*)barrHopInfo, JNI_ABORT);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(JNIEnv* jenv,
|
||||
jobject,
|
||||
jlong future,
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* RangeAndHopQueryIntegrationTest.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.apple.foundationdb.async.AsyncIterable;
|
||||
import com.apple.foundationdb.async.AsyncUtil;
|
||||
import com.apple.foundationdb.tuple.ByteArrayUtil;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
@ExtendWith(RequiresDatabase.class)
|
||||
class RangeAndHopQueryIntegrationTest {
|
||||
private static final FDB fdb = FDB.selectAPIVersion(710);
|
||||
public String databaseArg = null;
|
||||
private Database openFDB() { return fdb.open(databaseArg); }
|
||||
|
||||
@BeforeEach
|
||||
@AfterEach
|
||||
void clearDatabase() throws Exception {
|
||||
/*
|
||||
* Empty the database before and after each run, just in case
|
||||
*/
|
||||
try (Database db = openFDB()) {
|
||||
db.run(tr -> {
|
||||
tr.clear(Range.startsWith(new byte[] { (byte)0x00 }));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static private final byte[] EMPTY = Tuple.from().pack();
|
||||
static private final String PREFIX = "prefix";
|
||||
static private final String RECORD = "RECORD";
|
||||
static private final String INDEX = "INDEX";
|
||||
static private String primaryKey(int i) { return String.format("primary-key-of-record-%08d", i); }
|
||||
static private String indexKey(int i) { return String.format("index-key-of-record-%08d", i); }
|
||||
static private String dataOfRecord(int i) { return String.format("data-of-record-%08d", i); }
|
||||
|
||||
static byte[] HOP_INFO = Tuple.from(PREFIX, RECORD, "{K[3]}").pack();
|
||||
static private byte[] indexEntryKey(final int i) {
|
||||
return Tuple.from(PREFIX, INDEX, indexKey(i), primaryKey(i)).pack();
|
||||
}
|
||||
static private byte[] recordKey(final int i) { return Tuple.from(PREFIX, RECORD, primaryKey(i)).pack(); }
|
||||
static private byte[] recordValue(final int i) { return Tuple.from(dataOfRecord(i)).pack(); }
|
||||
|
||||
static private void insertRecordWithIndex(final Transaction tr, final int i) {
|
||||
tr.set(indexEntryKey(i), EMPTY);
|
||||
tr.set(recordKey(i), recordValue(i));
|
||||
}
|
||||
|
||||
private static String getArgFromEnv() {
|
||||
String[] clusterFiles = MultiClientHelper.readClusterFromEnv();
|
||||
String cluster = clusterFiles[0];
|
||||
System.out.printf("Using Cluster: %s\n", cluster);
|
||||
return cluster;
|
||||
}
|
||||
public static void main(String[] args) throws Exception {
|
||||
final RangeAndHopQueryIntegrationTest test = new RangeAndHopQueryIntegrationTest();
|
||||
test.databaseArg = getArgFromEnv();
|
||||
test.clearDatabase();
|
||||
test.comparePerformance();
|
||||
test.clearDatabase();
|
||||
}
|
||||
|
||||
int numRecords = 10000;
|
||||
int numQueries = 10000;
|
||||
int numRecordsPerQuery = 100;
|
||||
boolean validate = false;
|
||||
@Test
|
||||
void comparePerformance() {
|
||||
FDB fdb = FDB.selectAPIVersion(710);
|
||||
try (Database db = openFDB()) {
|
||||
insertRecordsWithIndexes(numRecords, db);
|
||||
instrument(rangeQueryAndGet, "rangeQueryAndGet", db);
|
||||
instrument(rangeQueryAndHop, "rangeQueryAndHop", db);
|
||||
}
|
||||
}
|
||||
|
||||
private void instrument(final RangeQueryWithIndex query, final String name, final Database db) {
|
||||
System.out.printf("Starting %s (numQueries:%d, numRecordsPerQuery:%d)\n", name, numQueries, numRecordsPerQuery);
|
||||
long startTime = System.currentTimeMillis();
|
||||
for (int queryId = 0; queryId < numQueries; queryId++) {
|
||||
int begin = ThreadLocalRandom.current().nextInt(numRecords - numRecordsPerQuery);
|
||||
query.run(begin, begin + numRecordsPerQuery, db);
|
||||
}
|
||||
long time = System.currentTimeMillis() - startTime;
|
||||
System.out.printf("Finished %s, it takes %d ms for %d queries (%d qps)\n", name, time, numQueries,
|
||||
numQueries * 1000L / time);
|
||||
}
|
||||
|
||||
static private final int RECORDS_PER_TXN = 100;
|
||||
static private void insertRecordsWithIndexes(int n, Database db) {
|
||||
int i = 0;
|
||||
while (i < n) {
|
||||
int begin = i;
|
||||
int end = Math.min(n, i + RECORDS_PER_TXN);
|
||||
// insert [begin, end) in one transaction
|
||||
db.run(tr -> {
|
||||
for (int t = begin; t < end; t++) {
|
||||
insertRecordWithIndex(tr, t);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
i = end;
|
||||
}
|
||||
}
|
||||
|
||||
public interface RangeQueryWithIndex {
|
||||
void run(int begin, int end, Database db);
|
||||
}
|
||||
|
||||
RangeQueryWithIndex rangeQueryAndGet = (int begin, int end, Database db) -> db.run(tr -> {
|
||||
try {
|
||||
List<KeyValue> kvs = tr.getRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)),
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
Assertions.assertEquals(end - begin, kvs.size());
|
||||
|
||||
// Get the records of each index entry IN PARALLEL.
|
||||
List<CompletableFuture<byte[]>> resultFutures = new ArrayList<>();
|
||||
// In reality, we need to get the record key by parsing the index entry key. But considering this is a
|
||||
// performance test, we just ignore the returned key and simply generate it from recordKey.
|
||||
for (int id = begin; id < end; id++) {
|
||||
resultFutures.add(tr.get(recordKey(id)));
|
||||
}
|
||||
AsyncUtil.whenAll(resultFutures).get();
|
||||
|
||||
if (validate) {
|
||||
final Iterator<KeyValue> indexes = kvs.iterator();
|
||||
final Iterator<CompletableFuture<byte[]>> records = resultFutures.iterator();
|
||||
for (int id = begin; id < end; id++) {
|
||||
Assertions.assertTrue(indexes.hasNext());
|
||||
assertByteArrayEquals(indexEntryKey(id), indexes.next().getKey());
|
||||
Assertions.assertTrue(records.hasNext());
|
||||
assertByteArrayEquals(recordValue(id), records.next().get());
|
||||
}
|
||||
Assertions.assertFalse(indexes.hasNext());
|
||||
Assertions.assertFalse(records.hasNext());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Assertions.fail("Unexpected exception", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
RangeQueryWithIndex rangeQueryAndHop = (int begin, int end, Database db) -> db.run(tr -> {
|
||||
try {
|
||||
tr.options().setReadYourWritesDisable();
|
||||
List<KeyValue> kvs = tr.getRangeAndHop(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)),
|
||||
KeySelector.firstGreaterOrEqual(indexEntryKey(end)), HOP_INFO,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL)
|
||||
.asList()
|
||||
.get();
|
||||
Assertions.assertEquals(end - begin, kvs.size());
|
||||
|
||||
if (validate) {
|
||||
final Iterator<KeyValue> results = kvs.iterator();
|
||||
for (int id = begin; id < end; id++) {
|
||||
Assertions.assertTrue(results.hasNext());
|
||||
assertByteArrayEquals(recordValue(id), results.next().getValue());
|
||||
}
|
||||
Assertions.assertFalse(results.hasNext());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Assertions.fail("Unexpected exception", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
void assertByteArrayEquals(byte[] expected, byte[] actual) {
|
||||
Assertions.assertEquals(ByteArrayUtil.printable(expected), ByteArrayUtil.printable(actual));
|
||||
}
|
||||
|
||||
@Test
|
||||
void rangeAndHopQueryOverMultipleRows() throws Exception {
|
||||
try (Database db = openFDB()) {
|
||||
insertRecordsWithIndexes(3, db);
|
||||
|
||||
List<byte[]> expected_data_of_records = new ArrayList<>();
|
||||
for (int i = 0; i <= 1; i++) {
|
||||
expected_data_of_records.add(recordValue(i));
|
||||
}
|
||||
|
||||
db.run(tr -> {
|
||||
// getRangeAndHop is only support without RYW. This is a must!!!
|
||||
tr.options().setReadYourWritesDisable();
|
||||
|
||||
Iterator<KeyValue> kvs =
|
||||
tr.getRangeAndHop(KeySelector.firstGreaterOrEqual(indexEntryKey(0)),
|
||||
KeySelector.firstGreaterThan(indexEntryKey(1)), HOP_INFO,
|
||||
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL)
|
||||
.iterator();
|
||||
Iterator<byte[]> expected_data_of_records_iter = expected_data_of_records.iterator();
|
||||
while (expected_data_of_records_iter.hasNext()) {
|
||||
Assertions.assertTrue(kvs.hasNext(), "iterator ended too early");
|
||||
KeyValue kv = kvs.next();
|
||||
byte[] actual_data_of_record = kv.getValue();
|
||||
byte[] expected_data_of_record = expected_data_of_records_iter.next();
|
||||
|
||||
// System.out.println("result key:" + ByteArrayUtil.printable(kv.getKey()) + " value:" +
|
||||
// ByteArrayUtil.printable(kv.getValue())); Output:
|
||||
// result
|
||||
// key:\x02prefix\x00\x02INDEX\x00\x02index-key-of-record-0\x00\x02primary-key-of-record-0\x00
|
||||
// value:\x02data-of-record-0\x00
|
||||
// result
|
||||
// key:\x02prefix\x00\x02INDEX\x00\x02index-key-of-record-1\x00\x02primary-key-of-record-1\x00
|
||||
// value:\x02data-of-record-1\x00
|
||||
|
||||
// For now, we don't guarantee what that the returned keys mean.
|
||||
Assertions.assertArrayEquals(expected_data_of_record, actual_data_of_record,
|
||||
"Incorrect data of record!");
|
||||
}
|
||||
Assertions.assertFalse(kvs.hasNext(), "Iterator returned too much data");
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -88,8 +88,11 @@ public class FakeFDBTransaction extends FDBTransaction {
|
|||
public int getNumRangeCalls() { return numRangeCalls; }
|
||||
|
||||
@Override
|
||||
protected FutureResults getRange_internal(KeySelector begin, KeySelector end, int rowLimit, int targetBytes,
|
||||
int streamingMode, int iteration, boolean isSnapshot, boolean reverse) {
|
||||
protected FutureResults getRange_internal(KeySelector begin, KeySelector end,
|
||||
// TODO: hop is not supported in FakeFDBTransaction yet.
|
||||
byte[] hopInfo, // Nullable
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse) {
|
||||
numRangeCalls++;
|
||||
// TODO this is probably not correct for all KeySelector instances--we'll want to match with real behavior
|
||||
NavigableMap<byte[], byte[]> range =
|
||||
|
|
|
@ -91,6 +91,15 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
return FDBTransaction.this.getRangeSplitPoints(range, chunkSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncIterable<KeyValue> getRangeAndHop(KeySelector begin, KeySelector end, byte[] hopInfo, int limit,
|
||||
boolean reverse, StreamingMode mode) {
|
||||
if (hopInfo == null) {
|
||||
throw new IllegalArgumentException("HopInfo must be non-null");
|
||||
}
|
||||
return new RangeQuery(FDBTransaction.this, true, begin, end, hopInfo, limit, reverse, mode, eventKeeper);
|
||||
}
|
||||
|
||||
///////////////////
|
||||
// getRange -> KeySelectors
|
||||
///////////////////
|
||||
|
@ -338,6 +347,15 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
return this.getRangeSplitPoints(range.begin, range.end, chunkSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncIterable<KeyValue> getRangeAndHop(KeySelector begin, KeySelector end, byte[] hopInfo, int limit,
|
||||
boolean reverse, StreamingMode mode) {
|
||||
if (hopInfo == null) {
|
||||
throw new IllegalArgumentException("HopInfo must be non-null");
|
||||
}
|
||||
return new RangeQuery(this, false, begin, end, hopInfo, limit, reverse, mode, eventKeeper);
|
||||
}
|
||||
|
||||
///////////////////
|
||||
// getRange -> KeySelectors
|
||||
///////////////////
|
||||
|
@ -415,10 +433,10 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
}
|
||||
|
||||
// Users of this function must close the returned FutureResults when finished
|
||||
protected FutureResults getRange_internal(
|
||||
KeySelector begin, KeySelector end,
|
||||
int rowLimit, int targetBytes, int streamingMode,
|
||||
int iteration, boolean isSnapshot, boolean reverse) {
|
||||
protected FutureResults getRange_internal(KeySelector begin, KeySelector end,
|
||||
byte[] hopInfo, // Nullable
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse) {
|
||||
if (eventKeeper != null) {
|
||||
eventKeeper.increment(Events.JNI_CALL);
|
||||
}
|
||||
|
@ -429,10 +447,14 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
|
||||
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
|
||||
return new FutureResults(
|
||||
Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
|
||||
streamingMode, iteration, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
hopInfo == null
|
||||
? Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), end.getKey(),
|
||||
end.orEqual(), end.getOffset(), rowLimit, targetBytes, streamingMode,
|
||||
iteration, isSnapshot, reverse)
|
||||
: Transaction_getRangeAndHop(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
|
||||
end.getKey(), end.orEqual(), end.getOffset(), hopInfo, rowLimit,
|
||||
targetBytes, streamingMode, iteration, isSnapshot, reverse),
|
||||
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
|
@ -771,6 +793,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native long Transaction_getRangeAndHop(long cPtr, byte[] keyBegin, boolean orEqualBegin, int offsetBegin,
|
||||
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
|
||||
byte[] hopInfo, // Nonnull
|
||||
int rowLimit, int targetBytes, int streamingMode, int iteration,
|
||||
boolean isSnapshot, boolean reverse);
|
||||
private native void Transaction_addConflictRange(long cPtr,
|
||||
byte[] keyBegin, byte[] keyEnd, int conflictRangeType);
|
||||
private native void Transaction_set(long cPtr, byte[] key, byte[] value);
|
||||
|
|
|
@ -49,17 +49,19 @@ class RangeQuery implements AsyncIterable<KeyValue> {
|
|||
private final FDBTransaction tr;
|
||||
private final KeySelector begin;
|
||||
private final KeySelector end;
|
||||
private final byte[] hopInfo; // Nullable
|
||||
private final boolean snapshot;
|
||||
private final int rowLimit;
|
||||
private final boolean reverse;
|
||||
private final StreamingMode streamingMode;
|
||||
private final EventKeeper eventKeeper;
|
||||
|
||||
RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit,
|
||||
boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
|
||||
RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, byte[] hopInfo,
|
||||
int rowLimit, boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
|
||||
this.tr = transaction;
|
||||
this.begin = begin;
|
||||
this.end = end;
|
||||
this.hopInfo = hopInfo;
|
||||
this.snapshot = isSnapshot;
|
||||
this.rowLimit = rowLimit;
|
||||
this.reverse = reverse;
|
||||
|
@ -67,6 +69,12 @@ class RangeQuery implements AsyncIterable<KeyValue> {
|
|||
this.eventKeeper = eventKeeper;
|
||||
}
|
||||
|
||||
// RangeQueryAndHop
|
||||
RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit,
|
||||
boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
|
||||
this(transaction, isSnapshot, begin, end, null, rowLimit, reverse, streamingMode, eventKeeper);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the results from the range requested as a {@code List}. If there were no
|
||||
* limits on the original query and there is a large amount of data in the database
|
||||
|
@ -83,17 +91,17 @@ class RangeQuery implements AsyncIterable<KeyValue> {
|
|||
|
||||
// if the streaming mode is EXACT, try and grab things as one chunk
|
||||
if(mode == StreamingMode.EXACT) {
|
||||
FutureResults range = tr.getRange_internal(
|
||||
this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(),
|
||||
1, this.snapshot, this.reverse);
|
||||
|
||||
FutureResults range = tr.getRange_internal(this.begin, this.end, this.hopInfo, this.rowLimit, 0,
|
||||
StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse);
|
||||
return range.thenApply(result -> result.get().values)
|
||||
.whenComplete((result, e) -> range.close());
|
||||
}
|
||||
|
||||
// If the streaming mode is not EXACT, simply collect the results of an
|
||||
// iteration into a list
|
||||
return AsyncUtil.collect(new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode, eventKeeper),
|
||||
tr.getExecutor());
|
||||
return AsyncUtil.collect(
|
||||
new RangeQuery(tr, snapshot, begin, end, hopInfo, rowLimit, reverse, mode, eventKeeper), tr.getExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -221,8 +229,8 @@ class RangeQuery implements AsyncIterable<KeyValue> {
|
|||
|
||||
nextFuture = new CompletableFuture<>();
|
||||
final long sTime = System.nanoTime();
|
||||
fetchingChunk = tr.getRange_internal(begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
|
||||
++iteration, snapshot, reverse);
|
||||
fetchingChunk = tr.getRange_internal(begin, end, hopInfo, rowsLimited ? rowsRemaining : 0, 0,
|
||||
streamingMode.code(), ++iteration, snapshot, reverse);
|
||||
|
||||
BiConsumer<RangeResultInfo,Throwable> cons = new FetchComplete(fetchingChunk,nextFuture);
|
||||
if(eventKeeper!=null){
|
||||
|
|
|
@ -424,6 +424,42 @@ public interface ReadTransaction extends ReadTransactionContext {
|
|||
AsyncIterable<KeyValue> getRange(Range range,
|
||||
int limit, boolean reverse, StreamingMode mode);
|
||||
|
||||
/**
|
||||
* Gets an ordered range of keys and values from the database. The begin
|
||||
* and end keys are specified by {@code KeySelector}s, with the begin
|
||||
* {@code KeySelector} inclusive and the end {@code KeySelector} exclusive.
|
||||
*
|
||||
* @see KeySelector
|
||||
* @see AsyncIterator
|
||||
*
|
||||
* @param begin the beginning of the range (inclusive)
|
||||
* @param end the end of the range (exclusive)
|
||||
* @param hopInfo TODO
|
||||
* @param limit the maximum number of results to return. Limits results to the
|
||||
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
|
||||
* should not limit the number of results. If {@code reverse} is {@code true} rows
|
||||
* will be limited starting at the end of the range.
|
||||
* @param reverse return results starting at the end of the range in reverse order.
|
||||
* Reading ranges in reverse is supported natively by the database and should
|
||||
* have minimal extra cost.
|
||||
* @param mode provide a hint about how the results are to be used. This
|
||||
* can provide speed improvements or efficiency gains based on the caller's
|
||||
* knowledge of the upcoming access pattern.
|
||||
*
|
||||
* <p>
|
||||
* When converting the result of this query to a list using {@link AsyncIterable#asList()} with the {@code
|
||||
* ITERATOR} streaming mode, the query is automatically modified to fetch results in larger batches. This is done
|
||||
* because it is known in advance that the {@link AsyncIterable#asList()} function will fetch all results in the
|
||||
* range. If a limit is specified, the {@code EXACT} streaming mode will be used, and otherwise it will use {@code
|
||||
* WANT_ALL}.
|
||||
*
|
||||
* To achieve comparable performance when iterating over an entire range without using {@link
|
||||
* AsyncIterable#asList()}, the same streaming mode would need to be used.
|
||||
* </p>
|
||||
* @return a handle to access the results of the asynchronous call
|
||||
*/
|
||||
AsyncIterable<KeyValue> getRangeAndHop(KeySelector begin, KeySelector end, byte[] hopInfo, int limit,
|
||||
boolean reverse, StreamingMode mode);
|
||||
|
||||
/**
|
||||
* Gets an estimate for the number of bytes stored in the given range.
|
||||
|
|
|
@ -52,6 +52,7 @@ set(JAVA_INTEGRATION_TESTS
|
|||
src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java
|
||||
src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java
|
||||
src/integration/com/apple/foundationdb/RepeatableReadMultiThreadClientTest.java
|
||||
src/integration/com/apple/foundationdb/RangeAndHopQueryIntegrationTest.java
|
||||
)
|
||||
|
||||
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,
|
||||
|
|
|
@ -30,6 +30,7 @@ Features
|
|||
* Improved the efficiency with which storage servers replicate data between themselves. `(PR #5017) <https://github.com/apple/foundationdb/pull/5017>`_
|
||||
* Added support to ``exclude command`` to exclude based on locality match. `(PR #5113) <https://github.com/apple/foundationdb/pull/5113>`_
|
||||
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5328) <https://github.com/apple/foundationdb/pull/5328>`_
|
||||
* Added "get range and hop" feature with new APIs (see Bindings section). Storage servers are able to generate the keys in the queries based on another query. With this, upper layer can push some computations down to FDB, to improve latency and bandwidth when read. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
|
||||
|
||||
Performance
|
||||
-----------
|
||||
|
@ -86,6 +87,8 @@ Bindings
|
|||
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) <https://github.com/apple/foundationdb/pull/4241/files>`_
|
||||
* C: Added ``fdb_database_get_main_thread_busyness`` function to report how busy a client's main thread is. `(PR #4504) <https://github.com/apple/foundationdb/pull/4504>`_
|
||||
* Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) <https://github.com/apple/foundationdb/pull/4564>`_
|
||||
* C: Added ``fdb_transaction_get_range_and_hop`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
|
||||
* Java: Added ``Transaction.getRangeAndHop`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
|
||||
|
||||
Other Changes
|
||||
-------------
|
||||
|
|
|
@ -369,6 +369,7 @@ public:
|
|||
Counter transactionGetKeyRequests;
|
||||
Counter transactionGetValueRequests;
|
||||
Counter transactionGetRangeRequests;
|
||||
Counter transactionGetRangeAndHopRequests;
|
||||
Counter transactionGetRangeStreamRequests;
|
||||
Counter transactionWatchRequests;
|
||||
Counter transactionGetAddressesForKeyRequests;
|
||||
|
|
|
@ -59,6 +59,12 @@ public:
|
|||
GetRangeLimits limits,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) = 0;
|
||||
virtual ThreadFuture<RangeResult> getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) = 0;
|
||||
virtual ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) = 0;
|
||||
virtual ThreadFuture<Standalone<StringRef>> getVersionstamp() = 0;
|
||||
|
||||
|
|
|
@ -63,6 +63,12 @@ public:
|
|||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) = 0;
|
||||
virtual Future<RangeResult> getRangeAndHop(KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) = 0;
|
||||
virtual Future<Standalone<VectorRef<const char*>>> getAddressesForKey(Key const& key) = 0;
|
||||
virtual Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) = 0;
|
||||
virtual Future<int64_t> getEstimatedRangeSizeBytes(KeyRange const& keys) = 0;
|
||||
|
|
|
@ -141,6 +141,41 @@ ThreadFuture<RangeResult> DLTransaction::getRange(const KeyRangeRef& keys,
|
|||
return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
ThreadFuture<RangeResult> DLTransaction::getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
FdbCApi::FDBFuture* f = api->transactionGetRangeAndHop(tr,
|
||||
begin.getKey().begin(),
|
||||
begin.getKey().size(),
|
||||
begin.orEqual,
|
||||
begin.offset,
|
||||
end.getKey().begin(),
|
||||
end.getKey().size(),
|
||||
end.orEqual,
|
||||
end.offset,
|
||||
hopInfo.begin(),
|
||||
hopInfo.size(),
|
||||
limits.rows,
|
||||
limits.bytes,
|
||||
FDB_STREAMING_MODE_EXACT,
|
||||
0,
|
||||
snapshot,
|
||||
reverse);
|
||||
return toThreadFuture<RangeResult>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
const FdbCApi::FDBKeyValue* kvs;
|
||||
int count;
|
||||
FdbCApi::fdb_bool_t more;
|
||||
FdbCApi::fdb_error_t error = api->futureGetKeyValueArray(f, &kvs, &count, &more);
|
||||
ASSERT(!error);
|
||||
|
||||
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
|
||||
return RangeResult(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena());
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> DLTransaction::getAddressesForKey(const KeyRef& key) {
|
||||
FdbCApi::FDBFuture* f = api->transactionGetAddressesForKey(tr, key.begin(), key.size());
|
||||
|
||||
|
@ -452,6 +487,7 @@ void DLApi::init() {
|
|||
loadClientFunction(&api->transactionGetKey, lib, fdbCPath, "fdb_transaction_get_key");
|
||||
loadClientFunction(&api->transactionGetAddressesForKey, lib, fdbCPath, "fdb_transaction_get_addresses_for_key");
|
||||
loadClientFunction(&api->transactionGetRange, lib, fdbCPath, "fdb_transaction_get_range");
|
||||
loadClientFunction(&api->transactionGetRangeAndHop, lib, fdbCPath, "fdb_transaction_get_range_and_hop");
|
||||
loadClientFunction(
|
||||
&api->transactionGetVersionstamp, lib, fdbCPath, "fdb_transaction_get_versionstamp", headerVersion >= 410);
|
||||
loadClientFunction(&api->transactionSet, lib, fdbCPath, "fdb_transaction_set");
|
||||
|
@ -731,6 +767,18 @@ ThreadFuture<RangeResult> MultiVersionTransaction::getRange(const KeyRangeRef& k
|
|||
return abortableFuture(f, tr.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<RangeResult> MultiVersionTransaction::getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
auto tr = getTransaction();
|
||||
auto f = tr.transaction ? tr.transaction->getRangeAndHop(begin, end, hopInfo, limits, snapshot, reverse)
|
||||
: makeTimeout<RangeResult>();
|
||||
return abortableFuture(f, tr.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<StringRef>> MultiVersionTransaction::getVersionstamp() {
|
||||
auto tr = getTransaction();
|
||||
auto f = tr.transaction ? tr.transaction->getVersionstamp() : makeTimeout<Standalone<StringRef>>();
|
||||
|
|
|
@ -118,6 +118,23 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
FDBFuture* (*transactionGetRangeAndHop)(FDBTransaction* tr,
|
||||
uint8_t const* beginKeyName,
|
||||
int beginKeyNameLength,
|
||||
fdb_bool_t beginOrEqual,
|
||||
int beginOffset,
|
||||
uint8_t const* endKeyName,
|
||||
int endKeyNameLength,
|
||||
fdb_bool_t endOrEqual,
|
||||
int endOffset,
|
||||
uint8_t const* hop_info_name,
|
||||
int hop_info_name_length,
|
||||
int limit,
|
||||
int targetBytes,
|
||||
FDBStreamingMode mode,
|
||||
int iteration,
|
||||
fdb_bool_t snapshot,
|
||||
fdb_bool_t reverse);
|
||||
FDBFuture* (*transactionGetVersionstamp)(FDBTransaction* tr);
|
||||
|
||||
void (*transactionSet)(FDBTransaction* tr,
|
||||
|
@ -219,6 +236,12 @@ public:
|
|||
GetRangeLimits limits,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) override;
|
||||
ThreadFuture<RangeResult> getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
|
||||
|
@ -360,6 +383,12 @@ public:
|
|||
GetRangeLimits limits,
|
||||
bool snapshot = false,
|
||||
bool reverse = false) override;
|
||||
ThreadFuture<RangeResult> getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
|
||||
|
|
|
@ -160,6 +160,8 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
|
|||
TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics));
|
||||
queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(),
|
||||
TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics));
|
||||
queueModel.updateTssEndpoint(ssi.getKeyValuesAndHop.getEndpoint().token.first(),
|
||||
TSSEndpointData(tssi.id(), tssi.getKeyValuesAndHop.getEndpoint(), metrics));
|
||||
queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(),
|
||||
TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics));
|
||||
|
||||
|
@ -183,6 +185,7 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
|
|||
queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first());
|
||||
queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first());
|
||||
queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first());
|
||||
queueModel.removeTssEndpoint(ssi.getKeyValuesAndHop.getEndpoint().token.first());
|
||||
queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first());
|
||||
|
||||
queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first());
|
||||
|
@ -1196,6 +1199,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
|
||||
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc),
|
||||
transactionGetRangeAndHopRequests("GetRangeAndHopRequests", cc),
|
||||
transactionGetRangeStreamRequests("GetRangeStreamRequests", cc), transactionWatchRequests("WatchRequests", cc),
|
||||
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
|
||||
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
|
||||
|
@ -1451,6 +1455,7 @@ DatabaseContext::DatabaseContext(const Error& err)
|
|||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
|
||||
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc),
|
||||
transactionGetRangeAndHopRequests("GetRangeAndHopRequests", cc),
|
||||
transactionGetRangeStreamRequests("GetRangeStreamRequests", cc), transactionWatchRequests("WatchRequests", cc),
|
||||
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
|
||||
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
|
||||
|
@ -3029,7 +3034,8 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
|
|||
return Void();
|
||||
}
|
||||
|
||||
void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesRequest& req) {
|
||||
template <class GetKeyValuesMaybeHopRequest>
|
||||
void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesMaybeHopRequest& req) {
|
||||
if (limits.bytes != 0) {
|
||||
if (!limits.hasRowLimit())
|
||||
req.limit = CLIENT_KNOBS->REPLY_BYTE_LIMIT; // Can't get more than this many rows anyway
|
||||
|
@ -3049,26 +3055,47 @@ void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesRe
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getExactRange(Database cx,
|
||||
Version version,
|
||||
KeyRange keys,
|
||||
GetRangeLimits limits,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
template <class GetKeyValuesMaybeHopRequest>
|
||||
RequestStream<GetKeyValuesMaybeHopRequest> StorageServerInterface::*getRangeRequestStream() {
|
||||
if constexpr (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesRequest>::value) {
|
||||
return &StorageServerInterface::getKeyValues;
|
||||
} else if (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesAndHopRequest>::value) {
|
||||
return &StorageServerInterface::getKeyValuesAndHop;
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
|
||||
Future<RangeResult> getExactRange(Database cx,
|
||||
Version version,
|
||||
KeyRange keys,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
state RangeResult output;
|
||||
state Span span("NAPI:getExactRange"_loc, info.spanID);
|
||||
|
||||
// printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
|
||||
loop {
|
||||
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
|
||||
cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues, info));
|
||||
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
keys,
|
||||
CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT,
|
||||
reverse,
|
||||
getRangeRequestStream<GetKeyValuesMaybeHopRequest>(),
|
||||
info));
|
||||
ASSERT(locations.size());
|
||||
state int shard = 0;
|
||||
loop {
|
||||
const KeyRangeRef& range = locations[shard].first;
|
||||
|
||||
GetKeyValuesRequest req;
|
||||
GetKeyValuesMaybeHopRequest req;
|
||||
req.hopInfo = hopInfo;
|
||||
req.arena.dependsOn(hopInfo.arena());
|
||||
|
||||
req.version = version;
|
||||
req.begin = firstGreaterOrEqual(range.begin);
|
||||
req.end = firstGreaterOrEqual(range.end);
|
||||
|
@ -3098,14 +3125,14 @@ ACTOR Future<RangeResult> getExactRange(Database cx,
|
|||
.detail("Servers", locations[shard].second->description());*/
|
||||
}
|
||||
++cx->transactionPhysicalReads;
|
||||
state GetKeyValuesReply rep;
|
||||
state GetKeyValuesMaybeHopReply rep;
|
||||
try {
|
||||
choose {
|
||||
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
|
||||
when(GetKeyValuesReply _rep =
|
||||
when(GetKeyValuesMaybeHopReply _rep =
|
||||
wait(loadBalance(cx.getPtr(),
|
||||
locations[shard].second,
|
||||
&StorageServerInterface::getKeyValues,
|
||||
getRangeRequestStream<GetKeyValuesMaybeHopRequest>(),
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::False,
|
||||
|
@ -3155,7 +3182,7 @@ ACTOR Future<RangeResult> getExactRange(Database cx,
|
|||
.detail("BlockBytes", rep.data.expectedSize());
|
||||
ASSERT(false);
|
||||
}
|
||||
TEST(true); // GetKeyValuesReply.more in getExactRange
|
||||
TEST(true); // GetKeyValuesMaybeHopReply.more in getExactRange
|
||||
// Make next request to the same shard with a beginning key just after the last key returned
|
||||
if (reverse)
|
||||
locations[shard].first =
|
||||
|
@ -3231,14 +3258,16 @@ Future<Key> resolveKey(Database const& cx,
|
|||
return getKey(cx, key, version, info, tags);
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getRangeFallback(Database cx,
|
||||
Version version,
|
||||
KeySelector begin,
|
||||
KeySelector end,
|
||||
GetRangeLimits limits,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
ACTOR template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
|
||||
Future<RangeResult> getRangeFallback(Database cx,
|
||||
Version version,
|
||||
KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
if (version == latestVersion) {
|
||||
state Transaction transaction(cx);
|
||||
transaction.setOption(FDBTransactionOptions::CAUSAL_READ_RISKY);
|
||||
|
@ -3261,7 +3290,8 @@ ACTOR Future<RangeResult> getRangeFallback(Database cx,
|
|||
// if b is allKeys.begin, we have either read through the beginning of the database,
|
||||
// or allKeys.begin exists in the database and will be part of the conflict range anyways
|
||||
|
||||
RangeResult _r = wait(getExactRange(cx, version, KeyRangeRef(b, e), limits, reverse, info, tags));
|
||||
RangeResult _r = wait(getExactRange<GetKeyValuesMaybeHopRequest, GetKeyValuesMaybeHopReply>(
|
||||
cx, version, KeyRangeRef(b, e), hopInfo, limits, reverse, info, tags));
|
||||
RangeResult r = _r;
|
||||
|
||||
if (b == allKeys.begin && ((reverse && !r.more) || !reverse))
|
||||
|
@ -3286,6 +3316,7 @@ ACTOR Future<RangeResult> getRangeFallback(Database cx,
|
|||
return r;
|
||||
}
|
||||
|
||||
// TODO: Client should add hop keys to conflict ranges.
|
||||
void getRangeFinished(Database cx,
|
||||
Reference<TransactionLogInfo> trLogInfo,
|
||||
double startTime,
|
||||
|
@ -3340,17 +3371,23 @@ void getRangeFinished(Database cx,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getRange(Database cx,
|
||||
Reference<TransactionLogInfo> trLogInfo,
|
||||
Future<Version> fVersion,
|
||||
KeySelector begin,
|
||||
KeySelector end,
|
||||
GetRangeLimits limits,
|
||||
Promise<std::pair<Key, Key>> conflictRange,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
// GetKeyValuesMaybeHopRequest: GetKeyValuesRequest or GetKeyValuesAndHopRequest
|
||||
// GetKeyValuesMaybeHopReply: GetKeyValuesReply or GetKeyValuesAndHopReply
|
||||
// Sadly we need GetKeyValuesMaybeHopReply because cannot do something like: state
|
||||
// REPLY_TYPE(GetKeyValuesMaybeHopRequest) rep;
|
||||
ACTOR template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
|
||||
Future<RangeResult> getRange(Database cx,
|
||||
Reference<TransactionLogInfo> trLogInfo,
|
||||
Future<Version> fVersion,
|
||||
KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Promise<std::pair<Key, Key>> conflictRange,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse,
|
||||
TransactionInfo info,
|
||||
TagSet tags) {
|
||||
state GetRangeLimits originalLimits(limits);
|
||||
state KeySelector originalBegin = begin;
|
||||
state KeySelector originalEnd = end;
|
||||
|
@ -3384,11 +3421,13 @@ ACTOR Future<RangeResult> getRange(Database cx,
|
|||
|
||||
Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
|
||||
Reverse locationBackward{ reverse ? (end - 1).isBackward() : begin.isBackward() };
|
||||
state std::pair<KeyRange, Reference<LocationInfo>> beginServer =
|
||||
wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKeyValues, info, locationBackward));
|
||||
state std::pair<KeyRange, Reference<LocationInfo>> beginServer = wait(getKeyLocation(
|
||||
cx, locationKey, getRangeRequestStream<GetKeyValuesMaybeHopRequest>(), info, locationBackward));
|
||||
state KeyRange shard = beginServer.first;
|
||||
state bool modifiedSelectors = false;
|
||||
state GetKeyValuesRequest req;
|
||||
state GetKeyValuesMaybeHopRequest req;
|
||||
req.hopInfo = hopInfo;
|
||||
req.arena.dependsOn(hopInfo.arena());
|
||||
|
||||
req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys);
|
||||
req.version = readVersion;
|
||||
|
@ -3447,17 +3486,17 @@ ACTOR Future<RangeResult> getRange(Database cx,
|
|||
}
|
||||
|
||||
++cx->transactionPhysicalReads;
|
||||
state GetKeyValuesReply rep;
|
||||
state GetKeyValuesMaybeHopReply rep;
|
||||
try {
|
||||
if (CLIENT_BUGGIFY_WITH_PROB(.01)) {
|
||||
throw deterministicRandom()->randomChoice(
|
||||
std::vector<Error>{ transaction_too_old(), future_version() });
|
||||
}
|
||||
// state AnnotateActor annotation(currentLineage);
|
||||
GetKeyValuesReply _rep =
|
||||
GetKeyValuesMaybeHopReply _rep =
|
||||
wait(loadBalance(cx.getPtr(),
|
||||
beginServer.second,
|
||||
&StorageServerInterface::getKeyValues,
|
||||
getRangeRequestStream<GetKeyValuesMaybeHopRequest>(),
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::False,
|
||||
|
@ -3557,11 +3596,12 @@ ACTOR Future<RangeResult> getRange(Database cx,
|
|||
|
||||
if (!rep.more) {
|
||||
ASSERT(modifiedSelectors);
|
||||
TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange
|
||||
TEST(true); // !GetKeyValuesMaybeHopReply.more and modifiedSelectors in getRange
|
||||
|
||||
if (!rep.data.size()) {
|
||||
RangeResult result = wait(getRangeFallback(
|
||||
cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags));
|
||||
RangeResult result =
|
||||
wait(getRangeFallback<GetKeyValuesMaybeHopRequest, GetKeyValuesMaybeHopReply>(
|
||||
cx, version, originalBegin, originalEnd, hopInfo, originalLimits, reverse, info, tags));
|
||||
getRangeFinished(cx,
|
||||
trLogInfo,
|
||||
startTime,
|
||||
|
@ -3579,7 +3619,7 @@ ACTOR Future<RangeResult> getRange(Database cx,
|
|||
else
|
||||
begin = firstGreaterOrEqual(shard.end);
|
||||
} else {
|
||||
TEST(true); // GetKeyValuesReply.more in getRange
|
||||
TEST(true); // GetKeyValuesMaybeHopReply.more in getRange
|
||||
if (reverse)
|
||||
end = firstGreaterOrEqual(output[output.size() - 1].key);
|
||||
else
|
||||
|
@ -3597,8 +3637,9 @@ ACTOR Future<RangeResult> getRange(Database cx,
|
|||
Reverse{ reverse ? (end - 1).isBackward() : begin.isBackward() });
|
||||
|
||||
if (e.code() == error_code_wrong_shard_server) {
|
||||
RangeResult result = wait(getRangeFallback(
|
||||
cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags));
|
||||
RangeResult result =
|
||||
wait(getRangeFallback<GetKeyValuesMaybeHopRequest, GetKeyValuesMaybeHopReply>(
|
||||
cx, version, originalBegin, originalEnd, hopInfo, originalLimits, reverse, info, tags));
|
||||
getRangeFinished(cx,
|
||||
trLogInfo,
|
||||
startTime,
|
||||
|
@ -4164,17 +4205,18 @@ Future<RangeResult> getRange(Database const& cx,
|
|||
Reverse const& reverse,
|
||||
TransactionInfo const& info,
|
||||
TagSet const& tags) {
|
||||
return getRange(cx,
|
||||
Reference<TransactionLogInfo>(),
|
||||
fVersion,
|
||||
begin,
|
||||
end,
|
||||
limits,
|
||||
Promise<std::pair<Key, Key>>(),
|
||||
Snapshot::True,
|
||||
reverse,
|
||||
info,
|
||||
tags);
|
||||
return getRange<GetKeyValuesRequest, GetKeyValuesReply>(cx,
|
||||
Reference<TransactionLogInfo>(),
|
||||
fVersion,
|
||||
begin,
|
||||
end,
|
||||
""_sr,
|
||||
limits,
|
||||
Promise<std::pair<Key, Key>>(),
|
||||
Snapshot::True,
|
||||
reverse,
|
||||
info,
|
||||
tags);
|
||||
}
|
||||
|
||||
bool DatabaseContext::debugUseTags = false;
|
||||
|
@ -4469,13 +4511,26 @@ Future<Key> Transaction::getKey(const KeySelector& key, Snapshot snapshot) {
|
|||
return getKeyAndConflictRange(cx, key, getReadVersion(), conflictRange, info, options.readTags);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
template <class GetKeyValuesMaybeHopRequest>
|
||||
void increaseCounterForRequest(Database cx) {
|
||||
if constexpr (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesRequest>::value) {
|
||||
++cx->transactionGetRangeRequests;
|
||||
} else if (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesAndHopRequest>::value) {
|
||||
++cx->transactionGetRangeAndHopRequests;
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
|
||||
Future<RangeResult> Transaction::getRangeMaybeHop(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
++cx->transactionLogicalReads;
|
||||
++cx->transactionGetRangeRequests;
|
||||
increaseCounterForRequest<GetKeyValuesMaybeHopRequest>(cx);
|
||||
|
||||
if (limits.isReached())
|
||||
return RangeResult();
|
||||
|
@ -4507,8 +4562,37 @@ Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
|||
extraConflictRanges.push_back(conflictRange.getFuture());
|
||||
}
|
||||
|
||||
return ::getRange(
|
||||
cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info, options.readTags);
|
||||
return ::getRange<GetKeyValuesMaybeHopRequest, GetKeyValuesMaybeHopReply>(cx,
|
||||
trLogInfo,
|
||||
getReadVersion(),
|
||||
b,
|
||||
e,
|
||||
hopInfo,
|
||||
limits,
|
||||
conflictRange,
|
||||
snapshot,
|
||||
reverse,
|
||||
info,
|
||||
options.readTags);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
return getRangeMaybeHop<GetKeyValuesRequest, GetKeyValuesReply>(begin, end, ""_sr, limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRangeAndHop(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
|
||||
return getRangeMaybeHop<GetKeyValuesAndHopRequest, GetKeyValuesAndHopReply>(
|
||||
begin, end, hopInfo, limits, snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> Transaction::getRange(const KeySelector& begin,
|
||||
|
|
|
@ -289,6 +289,23 @@ public:
|
|||
reverse);
|
||||
}
|
||||
|
||||
[[nodiscard]] Future<RangeResult> getRangeAndHop(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False);
|
||||
|
||||
private:
|
||||
template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
|
||||
Future<RangeResult> getRangeMaybeHop(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse);
|
||||
|
||||
public:
|
||||
// A method for streaming data from the storage server that is more efficient than getRange when reading large
|
||||
// amounts of data
|
||||
[[nodiscard]] Future<Void> getRangeStream(const PromiseStream<Standalone<RangeResultRef>>& results,
|
||||
|
|
|
@ -50,6 +50,14 @@ public:
|
|||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override;
|
||||
Future<RangeResult> getRangeAndHop(KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
void set(KeyRef const& key, ValueRef const& value) override;
|
||||
void clear(KeyRangeRef const&) override { throw client_invalid_operation(); }
|
||||
void clear(KeyRef const&) override;
|
||||
|
|
|
@ -74,6 +74,16 @@ public:
|
|||
using Result = RangeResult;
|
||||
};
|
||||
|
||||
template <bool reverse>
|
||||
struct GetRangeAndHopReq {
|
||||
GetRangeAndHopReq(KeySelector begin, KeySelector end, Key hopInfo, GetRangeLimits limits)
|
||||
: begin(begin), end(end), hopInfo(hopInfo), limits(limits) {}
|
||||
KeySelector begin, end;
|
||||
Key hopInfo;
|
||||
GetRangeLimits limits;
|
||||
using Result = RangeResult;
|
||||
};
|
||||
|
||||
// read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW
|
||||
// reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator. Fills in the snapshot cache
|
||||
// as a side effect but does not affect conflict ranges. Some (indicated) overloads of read are required to update
|
||||
|
@ -203,6 +213,36 @@ public:
|
|||
return v;
|
||||
}
|
||||
|
||||
ACTOR template <bool backwards>
|
||||
static Future<RangeResult> readThroughAndHop(ReadYourWritesTransaction* ryw,
|
||||
GetRangeAndHopReq<backwards> read,
|
||||
Snapshot snapshot) {
|
||||
if (backwards && read.end.offset > 1) {
|
||||
// FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result
|
||||
// actually does.
|
||||
Key key = wait(ryw->tr.getKey(read.end, snapshot));
|
||||
if (key > ryw->getMaxReadKey())
|
||||
read.end = firstGreaterOrEqual(ryw->getMaxReadKey());
|
||||
else
|
||||
read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
|
||||
}
|
||||
|
||||
RangeResult v = wait(ryw->tr.getRangeAndHop(
|
||||
read.begin, read.end, read.hopInfo, read.limits, snapshot, backwards ? Reverse::True : Reverse::False));
|
||||
KeyRef maxKey = ryw->getMaxReadKey();
|
||||
if (v.size() > 0) {
|
||||
if (!backwards && v[v.size() - 1].key >= maxKey) {
|
||||
state RangeResult _v = v;
|
||||
int i = _v.size() - 2;
|
||||
for (; i >= 0 && _v[i].key >= maxKey; --i) {
|
||||
}
|
||||
return RangeResult(RangeResultRef(VectorRef<KeyValueRef>(&_v[0], i + 1), false), _v.arena());
|
||||
}
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
// addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant
|
||||
// conflict range
|
||||
|
||||
|
@ -309,6 +349,15 @@ public:
|
|||
}
|
||||
}
|
||||
ACTOR template <class Req>
|
||||
static Future<typename Req::Result> readWithConflictRangeThroughAndHop(ReadYourWritesTransaction* ryw,
|
||||
Req req,
|
||||
Snapshot snapshot) {
|
||||
choose {
|
||||
when(typename Req::Result result = wait(readThroughAndHop(ryw, req, snapshot))) { return result; }
|
||||
when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
|
||||
}
|
||||
}
|
||||
ACTOR template <class Req>
|
||||
static Future<typename Req::Result> readWithConflictRangeSnapshot(ReadYourWritesTransaction* ryw, Req req) {
|
||||
state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
|
||||
choose {
|
||||
|
@ -344,6 +393,20 @@ public:
|
|||
return readWithConflictRangeRYW(ryw, req, snapshot);
|
||||
}
|
||||
|
||||
template <class Req>
|
||||
static inline Future<typename Req::Result> readWithConflictRangeAndHop(ReadYourWritesTransaction* ryw,
|
||||
Req const& req,
|
||||
Snapshot snapshot) {
|
||||
if (ryw->options.readYourWritesDisabled) {
|
||||
return readWithConflictRangeThroughAndHop(ryw, req, snapshot);
|
||||
} else if (snapshot && ryw->options.snapshotRywEnabled <= 0) {
|
||||
TEST(true); // readWithConflictRangeSnapshot not supported for hop
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
TEST(true); // readWithConflictRangeRYW not supported for hop
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
|
||||
template <class Iter>
|
||||
static void resolveKeySelectorFromCache(KeySelector& key,
|
||||
Iter& it,
|
||||
|
@ -1509,6 +1572,65 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(const KeySelector& begin
|
|||
return getRange(begin, end, GetRangeLimits(limit), snapshot, reverse);
|
||||
}
|
||||
|
||||
Future<RangeResult> ReadYourWritesTransaction::getRangeAndHop(KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse) {
|
||||
if (getDatabase()->apiVersionAtLeast(630)) {
|
||||
if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() &&
|
||||
end.getKey() <= specialKeys.end) {
|
||||
TEST(true); // Special key space get range (Hop)
|
||||
throw client_invalid_operation(); // Not support special keys.
|
||||
}
|
||||
} else {
|
||||
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")) {
|
||||
throw client_invalid_operation(); // Not support special keys.
|
||||
}
|
||||
}
|
||||
|
||||
if (checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
||||
if (resetPromise.isSet())
|
||||
return resetPromise.getFuture().getError();
|
||||
|
||||
KeyRef maxKey = getMaxReadKey();
|
||||
if (begin.getKey() > maxKey || end.getKey() > maxKey)
|
||||
return key_outside_legal_range();
|
||||
|
||||
// This optimization prevents nullptr operations from being added to the conflict range
|
||||
if (limits.isReached()) {
|
||||
TEST(true); // RYW range read limit 0 (Hop)
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
if (!limits.isValid())
|
||||
return range_limits_invalid();
|
||||
|
||||
if (begin.orEqual)
|
||||
begin.removeOrEqual(begin.arena());
|
||||
|
||||
if (end.orEqual)
|
||||
end.removeOrEqual(end.arena());
|
||||
|
||||
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
|
||||
TEST(true); // RYW range inverted (Hop)
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
Future<RangeResult> result =
|
||||
reverse ? RYWImpl::readWithConflictRangeAndHop(
|
||||
this, RYWImpl::GetRangeAndHopReq<true>(begin, end, hopInfo, limits), snapshot)
|
||||
: RYWImpl::readWithConflictRangeAndHop(
|
||||
this, RYWImpl::GetRangeAndHopReq<false>(begin, end, hopInfo, limits), snapshot);
|
||||
|
||||
reading.add(success(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<const char*>>> ReadYourWritesTransaction::getAddressesForKey(const Key& key) {
|
||||
if (checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
|
|
|
@ -104,6 +104,12 @@ public:
|
|||
snapshot,
|
||||
reverse);
|
||||
}
|
||||
Future<RangeResult> getRangeAndHop(KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override;
|
||||
|
||||
[[nodiscard]] Future<Standalone<VectorRef<const char*>>> getAddressesForKey(const Key& key) override;
|
||||
Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) override;
|
||||
|
|
|
@ -644,6 +644,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( MAX_STORAGE_COMMIT_TIME, 120.0 ); //The max fsync stall time on the storage server and tlog before marking a disk as failed
|
||||
init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1;
|
||||
init( ENABLE_CLEAR_RANGE_EAGER_READS, true );
|
||||
init( QUICK_GET_VALUE_FALLBACK, true );
|
||||
init( QUICK_GET_KEY_VALUES_FALLBACK, true );
|
||||
|
||||
//Wait Failure
|
||||
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
||||
|
|
|
@ -585,6 +585,8 @@ public:
|
|||
double MAX_STORAGE_COMMIT_TIME;
|
||||
int64_t RANGESTREAM_LIMIT_BYTES;
|
||||
bool ENABLE_CLEAR_RANGE_EAGER_READS;
|
||||
bool QUICK_GET_VALUE_FALLBACK;
|
||||
bool QUICK_GET_KEY_VALUES_FALLBACK;
|
||||
|
||||
// Wait Failure
|
||||
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
||||
|
|
|
@ -59,6 +59,14 @@ public:
|
|||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override;
|
||||
Future<RangeResult> getRangeAndHop(KeySelector begin,
|
||||
KeySelector end,
|
||||
Key hopInfo,
|
||||
GetRangeLimits limits,
|
||||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False) override {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
Future<Void> commit() override;
|
||||
Version getCommittedVersion() const override;
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
|
|
@ -152,6 +152,45 @@ void TSS_traceMismatch(TraceEvent& event,
|
|||
.detail("TSSReply", tssResultsString);
|
||||
}
|
||||
|
||||
// range reads and hop
|
||||
template <>
|
||||
bool TSS_doCompare(const GetKeyValuesAndHopReply& src, const GetKeyValuesAndHopReply& tss) {
|
||||
return src.more == tss.more && src.data == tss.data;
|
||||
}
|
||||
|
||||
template <>
|
||||
const char* TSS_mismatchTraceName(const GetKeyValuesAndHopRequest& req) {
|
||||
return "TSSMismatchGetKeyValuesAndHop";
|
||||
}
|
||||
|
||||
template <>
|
||||
void TSS_traceMismatch(TraceEvent& event,
|
||||
const GetKeyValuesAndHopRequest& req,
|
||||
const GetKeyValuesAndHopReply& src,
|
||||
const GetKeyValuesAndHopReply& tss) {
|
||||
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
|
||||
for (auto& it : src.data) {
|
||||
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
||||
}
|
||||
|
||||
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
|
||||
for (auto& it : tss.data) {
|
||||
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
||||
}
|
||||
event
|
||||
.detail(
|
||||
"Begin",
|
||||
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
|
||||
.detail("End",
|
||||
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
|
||||
.detail("Version", req.version)
|
||||
.detail("Limit", req.limit)
|
||||
.detail("LimitBytes", req.limitBytes)
|
||||
.setMaxFieldLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE * 4 / 10)
|
||||
.detail("SSReply", ssResultsString)
|
||||
.detail("TSSReply", tssResultsString);
|
||||
}
|
||||
|
||||
// streaming range reads
|
||||
template <>
|
||||
bool TSS_doCompare(const GetKeyValuesStreamReply& src, const GetKeyValuesStreamReply& tss) {
|
||||
|
@ -356,6 +395,12 @@ void TSSMetrics::recordLatency(const GetKeyValuesRequest& req, double ssLatency,
|
|||
TSSgetKeyValuesLatency.addSample(tssLatency);
|
||||
}
|
||||
|
||||
template <>
|
||||
void TSSMetrics::recordLatency(const GetKeyValuesAndHopRequest& req, double ssLatency, double tssLatency) {
|
||||
SSgetKeyValuesAndHopLatency.addSample(ssLatency);
|
||||
TSSgetKeyValuesAndHopLatency.addSample(tssLatency);
|
||||
}
|
||||
|
||||
template <>
|
||||
void TSSMetrics::recordLatency(const WatchValueRequest& req, double ssLatency, double tssLatency) {}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#define FDBCLIENT_STORAGESERVERINTERFACE_H
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/QueueModel.h"
|
||||
|
@ -65,6 +66,7 @@ struct StorageServerInterface {
|
|||
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
|
||||
// selector offset prevents all data from being read in one range read
|
||||
RequestStream<struct GetKeyValuesRequest> getKeyValues;
|
||||
RequestStream<struct GetKeyValuesAndHopRequest> getKeyValuesAndHop;
|
||||
|
||||
RequestStream<struct GetShardStateRequest> getShardState;
|
||||
RequestStream<struct WaitMetricsRequest> waitMetrics;
|
||||
|
@ -129,6 +131,8 @@ struct StorageServerInterface {
|
|||
RequestStream<struct OverlappingChangeFeedsRequest>(getValue.getEndpoint().getAdjustedEndpoint(15));
|
||||
changeFeedPop =
|
||||
RequestStream<struct ChangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(16));
|
||||
getKeyValuesAndHop =
|
||||
RequestStream<struct GetKeyValuesAndHopRequest>(getValue.getEndpoint().getAdjustedEndpoint(17));
|
||||
}
|
||||
} else {
|
||||
ASSERT(Ar::isDeserializing);
|
||||
|
@ -174,6 +178,7 @@ struct StorageServerInterface {
|
|||
streams.push_back(changeFeedStream.getReceiver());
|
||||
streams.push_back(overlappingChangeFeeds.getReceiver());
|
||||
streams.push_back(changeFeedPop.getReceiver());
|
||||
streams.push_back(getKeyValuesAndHop.getReceiver(TaskPriority::LoadBalancedEndpoint));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -296,6 +301,9 @@ struct GetKeyValuesRequest : TimedRequest {
|
|||
SpanID spanContext;
|
||||
Arena arena;
|
||||
KeySelectorRef begin, end;
|
||||
// This is a dummy field there has never been used.
|
||||
// TODO: Get rid of this by constexpr or other template magic in getRange
|
||||
KeyRef hopInfo = KeyRef();
|
||||
Version version; // or latestVersion
|
||||
int limit, limitBytes;
|
||||
bool isFetchKeys;
|
||||
|
@ -310,6 +318,44 @@ struct GetKeyValuesRequest : TimedRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct GetKeyValuesAndHopReply : public LoadBalancedReply {
|
||||
constexpr static FileIdentifier file_identifier = 1783067;
|
||||
Arena arena;
|
||||
// The key is the key in the requested range rather than the hop key.
|
||||
VectorRef<KeyValueRef, VecSerStrategy::String> data;
|
||||
Version version; // useful when latestVersion was requested
|
||||
bool more;
|
||||
bool cached = false;
|
||||
|
||||
GetKeyValuesAndHopReply() : version(invalidVersion), more(false), cached(false) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, LoadBalancedReply::penalty, LoadBalancedReply::error, data, version, more, cached, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetKeyValuesAndHopRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 6795747;
|
||||
SpanID spanContext;
|
||||
Arena arena;
|
||||
KeySelectorRef begin, end;
|
||||
KeyRef hopInfo;
|
||||
Version version; // or latestVersion
|
||||
int limit, limitBytes;
|
||||
bool isFetchKeys;
|
||||
Optional<TagSet> tags;
|
||||
Optional<UID> debugID;
|
||||
ReplyPromise<GetKeyValuesAndHopReply> reply;
|
||||
|
||||
GetKeyValuesAndHopRequest() : isFetchKeys(false) {}
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(
|
||||
ar, begin, end, hopInfo, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply {
|
||||
constexpr static FileIdentifier file_identifier = 1783066;
|
||||
Arena arena;
|
||||
|
|
|
@ -257,6 +257,23 @@ ThreadFuture<RangeResult> ThreadSafeTransaction::getRange(const KeySelectorRef&
|
|||
});
|
||||
}
|
||||
|
||||
ThreadFuture<RangeResult> ThreadSafeTransaction::getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) {
|
||||
KeySelector b = begin;
|
||||
KeySelector e = end;
|
||||
Key h = hopInfo;
|
||||
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
return onMainThread([tr, b, e, h, limits, snapshot, reverse]() -> Future<RangeResult> {
|
||||
tr->checkDeferredError();
|
||||
return tr->getRangeAndHop(b, e, h, limits, Snapshot{ snapshot }, Reverse{ reverse });
|
||||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> ThreadSafeTransaction::getAddressesForKey(const KeyRef& key) {
|
||||
Key k = key;
|
||||
|
||||
|
|
|
@ -106,6 +106,12 @@ public:
|
|||
bool reverse = false) override {
|
||||
return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse);
|
||||
}
|
||||
ThreadFuture<RangeResult> getRangeAndHop(const KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
const StringRef& hopInfo,
|
||||
GetRangeLimits limits,
|
||||
bool snapshot,
|
||||
bool reverse) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
|
||||
|
|
|
@ -51,10 +51,12 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||
ContinuousSample<double> SSgetValueLatency;
|
||||
ContinuousSample<double> SSgetKeyLatency;
|
||||
ContinuousSample<double> SSgetKeyValuesLatency;
|
||||
ContinuousSample<double> SSgetKeyValuesAndHopLatency;
|
||||
|
||||
ContinuousSample<double> TSSgetValueLatency;
|
||||
ContinuousSample<double> TSSgetKeyLatency;
|
||||
ContinuousSample<double> TSSgetKeyValuesLatency;
|
||||
ContinuousSample<double> TSSgetKeyValuesAndHopLatency;
|
||||
|
||||
std::unordered_map<int, uint64_t> ssErrorsByCode;
|
||||
std::unordered_map<int, uint64_t> tssErrorsByCode;
|
||||
|
@ -103,7 +105,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
|
||||
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
|
||||
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000),
|
||||
TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
|
||||
SSgetKeyValuesAndHopLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000),
|
||||
TSSgetKeyValuesLatency(1000), TSSgetKeyValuesAndHopLatency(1000) {}
|
||||
};
|
||||
|
||||
template <class Rep>
|
||||
|
|
|
@ -208,6 +208,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/MemoryLifetime.actor.cpp
|
||||
workloads/MetricLogging.actor.cpp
|
||||
workloads/MutationLogReaderCorrectness.actor.cpp
|
||||
workloads/IndexPrefetchDemo.actor.cpp
|
||||
workloads/ParallelRestore.actor.cpp
|
||||
workloads/Performance.actor.cpp
|
||||
workloads/Ping.actor.cpp
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TransactionLineage.h"
|
||||
#include "fdbclient/VersionedMap.h"
|
||||
|
@ -779,8 +780,9 @@ public:
|
|||
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeStreamQueries, finishedQueries,
|
||||
lowPriorityQueries, rowsQueried, bytesQueried, watchQueries, emptyQueries;
|
||||
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeAndHopQueries,
|
||||
getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries,
|
||||
emptyQueries;
|
||||
|
||||
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
|
||||
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
|
||||
|
@ -807,6 +809,9 @@ public:
|
|||
Counter wrongShardServer;
|
||||
Counter fetchedVersions;
|
||||
Counter fetchesFromLogs;
|
||||
// The following counters measure how many of "hop"s in the getRangeAndHopQueries are effective. "Miss" means
|
||||
// fallback if fallback is enabled, otherwise means failure (so that another layer could implement fallback).
|
||||
Counter quickGetValueHit, quickGetValueMiss, quickGetKeyValuesHit, quickGetKeyValuesMiss;
|
||||
|
||||
LatencySample readLatencySample;
|
||||
LatencyBands readLatencyBands;
|
||||
|
@ -814,22 +819,25 @@ public:
|
|||
Counters(StorageServer* self)
|
||||
: cc("StorageServer", self->thisServerID.toString()), allQueries("QueryQueue", cc),
|
||||
getKeyQueries("GetKeyQueries", cc), getValueQueries("GetValueQueries", cc),
|
||||
getRangeQueries("GetRangeQueries", cc), getRangeStreamQueries("GetRangeStreamQueries", cc),
|
||||
finishedQueries("FinishedQueries", cc), lowPriorityQueries("LowPriorityQueries", cc),
|
||||
rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc),
|
||||
emptyQueries("EmptyQueries", cc), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc),
|
||||
bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc),
|
||||
sampledBytesCleared("SampledBytesCleared", cc), kvFetched("KVFetched", cc), mutations("Mutations", cc),
|
||||
setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc),
|
||||
atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc),
|
||||
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
|
||||
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
|
||||
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
|
||||
wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
|
||||
fetchesFromLogs("FetchesFromLogs", cc), readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
getRangeQueries("GetRangeQueries", cc), getRangeAndHopQueries("GetRangeAndHopQueries", cc),
|
||||
getRangeStreamQueries("GetRangeStreamQueries", cc), finishedQueries("FinishedQueries", cc),
|
||||
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
|
||||
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
|
||||
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
|
||||
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
|
||||
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
|
||||
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
|
||||
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
|
||||
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
|
||||
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
|
||||
readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc),
|
||||
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
|
||||
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
|
||||
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
|
||||
readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
|
||||
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
|
||||
specialCounter(cc, "Version", [self]() { return self->version.get(); });
|
||||
|
@ -1985,6 +1993,37 @@ void merge(Arena& arena,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data, StringRef key, Version version) {
|
||||
if (data->shards[key]->isReadable()) {
|
||||
try {
|
||||
// TODO: Use a lower level API may be better? Or tweak priorities?
|
||||
GetValueRequest req(Span().context, key, version, Optional<TagSet>(), Optional<UID>());
|
||||
data->actors.add(data->readGuard(req, getValueQ));
|
||||
GetValueReply reply = wait(req.reply.getFuture());
|
||||
++data->counters.quickGetValueHit;
|
||||
return reply.value;
|
||||
} catch (Error& e) {
|
||||
// Fallback.
|
||||
}
|
||||
} else {
|
||||
// Fallback.
|
||||
}
|
||||
|
||||
++data->counters.quickGetValueMiss;
|
||||
if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) {
|
||||
state Transaction tr(data->cx);
|
||||
tr.setVersion(version);
|
||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||
tr.info.taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
Future<Optional<Value>> valueFuture = tr.get(key, Snapshot::True);
|
||||
// TODO: async in case it needs to read from other servers.
|
||||
state Optional<Value> valueOption = wait(valueFuture);
|
||||
return valueOption;
|
||||
} else {
|
||||
throw hop_quick_get_value_miss();
|
||||
}
|
||||
};
|
||||
|
||||
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
|
||||
// readRange has O(|result|) + O(log |data|) cost
|
||||
ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
||||
|
@ -2470,6 +2509,440 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data, StringRef prefix, Version version) {
|
||||
try {
|
||||
// TODO: Use a lower level API may be better? Or tweak priorities?
|
||||
GetKeyValuesRequest req;
|
||||
req.spanContext = Span().context;
|
||||
req.arena = Arena();
|
||||
req.begin = firstGreaterOrEqual(KeyRef(req.arena, prefix));
|
||||
req.end = firstGreaterOrEqual(strinc(prefix, req.arena));
|
||||
req.version = version;
|
||||
|
||||
data->actors.add(data->readGuard(req, getKeyValuesQ));
|
||||
GetKeyValuesReply reply = wait(req.reply.getFuture());
|
||||
++data->counters.quickGetKeyValuesHit;
|
||||
|
||||
// Convert GetKeyValuesReply to RangeResult.
|
||||
return RangeResult(RangeResultRef(reply.data, reply.more), reply.arena);
|
||||
} catch (Error& e) {
|
||||
// Fallback.
|
||||
}
|
||||
|
||||
++data->counters.quickGetKeyValuesMiss;
|
||||
if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) {
|
||||
state Transaction tr(data->cx);
|
||||
tr.setVersion(version);
|
||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||
tr.info.taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
Future<RangeResult> rangeResultFuture = tr.getRange(prefixRange(prefix), Snapshot::True);
|
||||
// TODO: async in case it needs to read from other servers.
|
||||
RangeResult rangeResult = wait(rangeResultFuture);
|
||||
return rangeResult;
|
||||
} else {
|
||||
throw hop_quick_get_key_values_miss();
|
||||
}
|
||||
};
|
||||
|
||||
Key constructHopKey(KeyValueRef* keyValue, Tuple& hopKeyFormatTuple, bool& isRangeQuery) {
|
||||
// Lazily parse key and/or value to tuple because they may not need to be a tuple if not used.
|
||||
Optional<Tuple> keyTuple;
|
||||
Optional<Tuple> valueTuple;
|
||||
|
||||
Tuple hopKeyTuple;
|
||||
for (int i = 0; i < hopKeyFormatTuple.size(); i++) {
|
||||
Tuple::ElementType type = hopKeyFormatTuple.getType(i);
|
||||
if (type == Tuple::BYTES || type == Tuple::UTF8) {
|
||||
std::string s = hopKeyFormatTuple.getString(i).toString();
|
||||
auto sz = s.size();
|
||||
|
||||
// Handle escape.
|
||||
bool escaped = false;
|
||||
size_t p = 0;
|
||||
while (true) {
|
||||
size_t found = s.find("{{", p);
|
||||
if (found == std::string::npos) {
|
||||
break;
|
||||
}
|
||||
s.replace(found, 2, "{");
|
||||
p += 1;
|
||||
escaped = true;
|
||||
}
|
||||
p = 0;
|
||||
while (true) {
|
||||
size_t found = s.find("}}", p);
|
||||
if (found == std::string::npos) {
|
||||
break;
|
||||
}
|
||||
s.replace(found, 2, "}");
|
||||
p += 1;
|
||||
escaped = true;
|
||||
}
|
||||
if (escaped) {
|
||||
// If the element uses escape, cope the escaped version.
|
||||
hopKeyTuple.append(s);
|
||||
}
|
||||
// {K[??]} or {V[??]}
|
||||
else if (sz > 5 && s[0] == '{' && (s[1] == 'K' || s[1] == 'V') && s[2] == '[' && s[sz - 2] == ']' &&
|
||||
s[sz - 1] == '}') {
|
||||
int idx;
|
||||
try {
|
||||
idx = std::stoi(s.substr(3, sz - 5));
|
||||
} catch (std::exception& e) {
|
||||
throw hop_bad_index();
|
||||
}
|
||||
Tuple* referenceTuple;
|
||||
if (s[1] == 'K') {
|
||||
// Use keyTuple as reference.
|
||||
if (!keyTuple.present()) {
|
||||
// May throw exception if the key is not parsable as a tuple.
|
||||
keyTuple = Tuple::unpack(keyValue->key);
|
||||
}
|
||||
referenceTuple = &keyTuple.get();
|
||||
} else if (s[1] == 'V') {
|
||||
// Use valueTuple as reference.
|
||||
if (!valueTuple.present()) {
|
||||
// May throw exception if the value is not parsable as a tuple.
|
||||
valueTuple = Tuple::unpack(keyValue->value);
|
||||
}
|
||||
referenceTuple = &valueTuple.get();
|
||||
} else {
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
if (idx < 0 || idx >= referenceTuple->size()) {
|
||||
throw hop_bad_index();
|
||||
}
|
||||
hopKeyTuple.append(referenceTuple->subTuple(idx, idx + 1));
|
||||
} else if (s == "{...}") {
|
||||
// Range query.
|
||||
if (i != hopKeyFormatTuple.size() - 1) {
|
||||
// It must be the last element of the hop info tuple
|
||||
throw hop_bad_range_decriptor();
|
||||
}
|
||||
// Every record will try to set it. It's ugly, but not wrong.
|
||||
isRangeQuery = true;
|
||||
// Do not add it to the hop key.
|
||||
} else {
|
||||
// If the element is a string but neither escaped nor descriptors, just copy it.
|
||||
hopKeyTuple.append(hopKeyFormatTuple.subTuple(i, i + 1));
|
||||
}
|
||||
} else {
|
||||
// If the element not a string, just copy it.
|
||||
hopKeyTuple.append(hopKeyFormatTuple.subTuple(i, i + 1));
|
||||
}
|
||||
}
|
||||
return hopKeyTuple.getDataAsStandalone();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/storageserver/constructHopKey") {
|
||||
Key key = Tuple().append("key-0"_sr).append("key-1"_sr).append("key-2"_sr).getDataAsStandalone();
|
||||
Value value = Tuple().append("value-0"_sr).append("value-1"_sr).append("value-2"_sr).getDataAsStandalone();
|
||||
state KeyValueRef kvr(key, value);
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple()
|
||||
.append("normal"_sr)
|
||||
.append("{{escaped}}"_sr)
|
||||
.append("{K[2]}"_sr)
|
||||
.append("{V[0]}"_sr)
|
||||
.append("{...}"_sr);
|
||||
|
||||
bool isRangeQuery = false;
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
|
||||
Key expectedHopKey = Tuple()
|
||||
.append("normal"_sr)
|
||||
.append("{escaped}"_sr)
|
||||
.append("key-2"_sr)
|
||||
.append("value-0"_sr)
|
||||
.getDataAsStandalone();
|
||||
// std::cout << printable(hopKey) << " == " << printable(expectedHopKey) << std::endl;
|
||||
ASSERT(hopKey.compare(expectedHopKey) == 0);
|
||||
ASSERT(isRangeQuery == true);
|
||||
}
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
||||
|
||||
bool isRangeQuery = false;
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
|
||||
Key expectedHopKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
||||
// std::cout << printable(hopKey) << " == " << printable(expectedHopKey) << std::endl;
|
||||
ASSERT(hopKey.compare(expectedHopKey) == 0);
|
||||
ASSERT(isRangeQuery == false);
|
||||
}
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr);
|
||||
|
||||
bool isRangeQuery = false;
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
|
||||
Key expectedHopKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone();
|
||||
// std::cout << printable(hopKey) << " == " << printable(expectedHopKey) << std::endl;
|
||||
ASSERT(hopKey.compare(expectedHopKey) == 0);
|
||||
ASSERT(isRangeQuery == false);
|
||||
}
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple().append("{K[100]}"_sr);
|
||||
bool isRangeQuery = false;
|
||||
state bool throwException = false;
|
||||
try {
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_hop_bad_index);
|
||||
throwException = true;
|
||||
}
|
||||
ASSERT(throwException);
|
||||
}
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple().append("{...}"_sr).append("last-element"_sr);
|
||||
bool isRangeQuery = false;
|
||||
state bool throwException2 = false;
|
||||
try {
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_hop_bad_range_decriptor);
|
||||
throwException2 = true;
|
||||
}
|
||||
ASSERT(throwException2);
|
||||
}
|
||||
{
|
||||
Tuple hopInfoTuple = Tuple().append("{K[not-a-number]}"_sr);
|
||||
bool isRangeQuery = false;
|
||||
state bool throwException3 = false;
|
||||
try {
|
||||
Key hopKey = constructHopKey(&kvr, hopInfoTuple, isRangeQuery);
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_hop_bad_index);
|
||||
throwException3 = true;
|
||||
}
|
||||
ASSERT(throwException3);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<GetKeyValuesAndHopReply> hop(StorageServer* data, GetKeyValuesReply input, StringRef hopInfo) {
|
||||
state GetKeyValuesAndHopReply result;
|
||||
result.version = input.version;
|
||||
result.more = input.more;
|
||||
result.cached = input.cached;
|
||||
result.arena.dependsOn(input.arena);
|
||||
|
||||
result.data.reserve(result.arena, input.data.size());
|
||||
state bool isRangeQuery = false;
|
||||
state Tuple hopKeyFormatTuple = Tuple::unpack(hopInfo);
|
||||
state KeyValueRef* it = input.data.begin();
|
||||
for (; it != input.data.end(); it++) {
|
||||
state StringRef key = it->key;
|
||||
|
||||
state Key hopKey = constructHopKey(it, hopKeyFormatTuple, isRangeQuery);
|
||||
// Make sure the hopKey is always available, so that it's good even we want to get key asynchronously.
|
||||
result.arena.dependsOn(hopKey.arena());
|
||||
|
||||
if (isRangeQuery) {
|
||||
// Use the hopKey as the prefix of the range query.
|
||||
RangeResult rangeResult = wait(quickGetKeyValues(data, hopKey, input.version));
|
||||
|
||||
if (rangeResult.more) {
|
||||
// Probably the fan out is too large. The user should use the old way to query.
|
||||
throw hop_quick_get_key_values_has_more();
|
||||
}
|
||||
result.arena.dependsOn(rangeResult.arena());
|
||||
for (int i = 0; i < rangeResult.size(); i++) {
|
||||
result.data.emplace_back(result.arena, rangeResult[i].key, rangeResult[i].value);
|
||||
}
|
||||
} else {
|
||||
Optional<Value> valueOption = wait(quickGetValue(data, hopKey, input.version));
|
||||
|
||||
if (valueOption.present()) {
|
||||
Value value = valueOption.get();
|
||||
result.arena.dependsOn(value.arena());
|
||||
result.data.emplace_back(result.arena, hopKey, value);
|
||||
} else {
|
||||
// TODO: Shall we throw exception if the key doesn't exist or the range is empty?
|
||||
// throw hop_no_such_key();
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Most of the actor is copied from getKeyValuesQ. I tried to use templates but things become nearly impossible after
|
||||
// combining actor shenanigans with template shenanigans.
|
||||
ACTOR Future<Void> getKeyValuesAndHopQ(StorageServer* data, GetKeyValuesAndHopRequest req)
|
||||
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
|
||||
// selector offset prevents all data from being read in one range read
|
||||
{
|
||||
state Span span("SS:getKeyValuesAndHop"_loc, { req.spanContext });
|
||||
state int64_t resultSize = 0;
|
||||
state IKeyValueStore::ReadType type =
|
||||
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
|
||||
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
|
||||
|
||||
++data->counters.getRangeAndHopQueries;
|
||||
++data->counters.allQueries;
|
||||
++data->readQueueSizeMetric;
|
||||
data->maxQueryQueue = std::max<int>(
|
||||
data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
|
||||
|
||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||
// so we need to downgrade here
|
||||
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.isFetchKeys) {
|
||||
wait(delay(0, TaskPriority::FetchKeys));
|
||||
} else {
|
||||
wait(data->getQueryDelay());
|
||||
}
|
||||
|
||||
try {
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndHop.Before");
|
||||
state Version version = wait(waitForVersion(data, req.version, span.context));
|
||||
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
// try {
|
||||
state KeyRange shard = getShardKeyRange(data, req.begin);
|
||||
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndHop.AfterVersion");
|
||||
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
|
||||
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
|
||||
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
|
||||
//"None").detail("In", "getKeyValuesAndHop>getShardKeyRange"); throw e; }
|
||||
|
||||
if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) {
|
||||
// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin",
|
||||
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
|
||||
// shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValuesAndHop>checkShardExtents");
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
|
||||
state int offset1 = 0;
|
||||
state int offset2;
|
||||
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.begin.getKey())
|
||||
: findKey(data, req.begin, version, shard, &offset1, span.context, type);
|
||||
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.end.getKey())
|
||||
: findKey(data, req.end, version, shard, &offset2, span.context, type);
|
||||
state Key begin = wait(fBegin);
|
||||
state Key end = wait(fEnd);
|
||||
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndHop.AfterKeys");
|
||||
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
|
||||
|
||||
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
|
||||
// An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next shard is the
|
||||
// end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either
|
||||
// begin is past end or equal to end (so the result is definitely empty)
|
||||
if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) {
|
||||
TEST(true); // wrong_shard_server due to offset in getKeyValuesWithHopQ
|
||||
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end,
|
||||
// and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway
|
||||
// via its "slow path"), but we would have to add some flags to the response to encode whether we went off
|
||||
// the beginning and the end, since it needs that information.
|
||||
//TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValuesAndHop>checkOffsets").detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2);
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
|
||||
if (begin >= end) {
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndHop.Send");
|
||||
//.detail("Begin",begin).detail("End",end);
|
||||
|
||||
GetKeyValuesAndHopReply none;
|
||||
none.version = version;
|
||||
none.more = false;
|
||||
none.penalty = data->getPenalty();
|
||||
|
||||
data->checkChangeCounter(changeCounter,
|
||||
KeyRangeRef(std::min<KeyRef>(req.begin.getKey(), req.end.getKey()),
|
||||
std::max<KeyRef>(req.begin.getKey(), req.end.getKey())));
|
||||
req.reply.send(none);
|
||||
} else {
|
||||
state int remainingLimitBytes = req.limitBytes;
|
||||
|
||||
GetKeyValuesReply _r = wait(
|
||||
readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context, type));
|
||||
|
||||
// Hop!!!
|
||||
state GetKeyValuesAndHopReply r = wait(hop(data, _r, req.hopInfo));
|
||||
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndHop.AfterReadRange");
|
||||
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
|
||||
data->checkChangeCounter(
|
||||
changeCounter,
|
||||
KeyRangeRef(std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())),
|
||||
std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey()))));
|
||||
if (EXPENSIVE_VALIDATION) {
|
||||
// TODO: Only hop keys are returned, which are not supposed to be in the range.
|
||||
// for (int i = 0; i < r.data.size(); i++)
|
||||
// ASSERT(r.data[i].key >= begin && r.data[i].key < end);
|
||||
// TODO: GetKeyValuesWithHopRequest doesn't respect limit yet.
|
||||
// ASSERT(r.data.size() <= std::abs(req.limit));
|
||||
}
|
||||
|
||||
/*for( int i = 0; i < r.data.size(); i++ ) {
|
||||
StorageMetrics m;
|
||||
m.bytesPerKSecond = r.data[i].expectedSize();
|
||||
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
|
||||
data->metrics.notify(r.data[i].key, m);
|
||||
}*/
|
||||
|
||||
// For performance concerns, the cost of a range read is billed to the start key and end key of the range.
|
||||
int64_t totalByteSize = 0;
|
||||
for (int i = 0; i < r.data.size(); i++) {
|
||||
totalByteSize += r.data[i].expectedSize();
|
||||
}
|
||||
if (totalByteSize > 0 && SERVER_KNOBS->READ_SAMPLING_ENABLED) {
|
||||
int64_t bytesReadPerKSecond = std::max(totalByteSize, SERVER_KNOBS->EMPTY_READ_PENALTY) / 2;
|
||||
data->metrics.notifyBytesReadPerKSecond(r.data[0].key, bytesReadPerKSecond);
|
||||
data->metrics.notifyBytesReadPerKSecond(r.data[r.data.size() - 1].key, bytesReadPerKSecond);
|
||||
}
|
||||
|
||||
r.penalty = data->getPenalty();
|
||||
req.reply.send(r);
|
||||
|
||||
resultSize = req.limitBytes - remainingLimitBytes;
|
||||
data->counters.bytesQueried += resultSize;
|
||||
data->counters.rowsQueried += r.data.size();
|
||||
if (r.data.size() == 0) {
|
||||
++data->counters.emptyQueries;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (!canReplyWith(e))
|
||||
throw;
|
||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
||||
++data->counters.finishedQueries;
|
||||
--data->readQueueSizeMetric;
|
||||
|
||||
double duration = g_network->timer() - req.requestTime();
|
||||
data->counters.readLatencySample.addMeasurement(duration);
|
||||
if (data->latencyBandConfig.present()) {
|
||||
int maxReadBytes =
|
||||
data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
|
||||
int maxSelectorOffset =
|
||||
data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
|
||||
data->counters.readLatencyBands.addMeasurement(duration,
|
||||
resultSize > maxReadBytes ||
|
||||
abs(req.begin.offset) > maxSelectorOffset ||
|
||||
abs(req.end.offset) > maxSelectorOffset);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRequest req)
|
||||
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
|
||||
// selector offset prevents all data from being read in one range read
|
||||
|
@ -5690,6 +6163,19 @@ ACTOR Future<Void> serveGetKeyValuesRequests(StorageServer* self, FutureStream<G
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serveGetKeyValuesAndHopRequests(StorageServer* self,
|
||||
FutureStream<GetKeyValuesAndHopRequest> getKeyValuesAndHop) {
|
||||
// TODO: Is it fine to keep TransactionLineage::Operation::GetKeyValues here?
|
||||
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyValues;
|
||||
loop {
|
||||
GetKeyValuesAndHopRequest req = waitNext(getKeyValuesAndHop);
|
||||
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
|
||||
// before doing real work
|
||||
self->actors.add(self->readGuard(req, getKeyValuesAndHopQ));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serveGetKeyValuesStreamRequests(StorageServer* self,
|
||||
FutureStream<GetKeyValuesStreamRequest> getKeyValuesStream) {
|
||||
loop {
|
||||
|
@ -5889,6 +6375,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
self->actors.add(checkBehind(self));
|
||||
self->actors.add(serveGetValueRequests(self, ssi.getValue.getFuture()));
|
||||
self->actors.add(serveGetKeyValuesRequests(self, ssi.getKeyValues.getFuture()));
|
||||
self->actors.add(serveGetKeyValuesAndHopRequests(self, ssi.getKeyValuesAndHop.getFuture()));
|
||||
self->actors.add(serveGetKeyValuesStreamRequests(self, ssi.getKeyValuesStream.getFuture()));
|
||||
self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture()));
|
||||
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
|
||||
|
|
|
@ -1097,6 +1097,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
|
|||
DUMPTOKEN(recruited.getValue);
|
||||
DUMPTOKEN(recruited.getKey);
|
||||
DUMPTOKEN(recruited.getKeyValues);
|
||||
DUMPTOKEN(recruited.getKeyValuesAndHop);
|
||||
DUMPTOKEN(recruited.getShardState);
|
||||
DUMPTOKEN(recruited.waitMetrics);
|
||||
DUMPTOKEN(recruited.splitMetrics);
|
||||
|
@ -1108,6 +1109,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
|
|||
DUMPTOKEN(recruited.getKeyValueStoreType);
|
||||
DUMPTOKEN(recruited.watchValue);
|
||||
DUMPTOKEN(recruited.getKeyValuesStream);
|
||||
DUMPTOKEN(recruited.getKeyValuesAndHop);
|
||||
|
||||
prevStorageServer =
|
||||
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<IClusterConnectionRecord>(nullptr));
|
||||
|
@ -1478,6 +1480,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
DUMPTOKEN(recruited.getKeyValueStoreType);
|
||||
DUMPTOKEN(recruited.watchValue);
|
||||
DUMPTOKEN(recruited.getKeyValuesStream);
|
||||
DUMPTOKEN(recruited.getKeyValuesAndHop);
|
||||
|
||||
Promise<Void> recovery;
|
||||
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
|
||||
|
@ -1574,6 +1577,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
DUMPTOKEN(recruited.getValue);
|
||||
DUMPTOKEN(recruited.getKey);
|
||||
DUMPTOKEN(recruited.getKeyValues);
|
||||
DUMPTOKEN(recruited.getKeyValuesAndHop);
|
||||
DUMPTOKEN(recruited.getShardState);
|
||||
DUMPTOKEN(recruited.waitMetrics);
|
||||
DUMPTOKEN(recruited.splitMetrics);
|
||||
|
@ -1931,6 +1935,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
DUMPTOKEN(recruited.getKeyValueStoreType);
|
||||
DUMPTOKEN(recruited.watchValue);
|
||||
DUMPTOKEN(recruited.getKeyValuesStream);
|
||||
DUMPTOKEN(recruited.getKeyValuesAndHop);
|
||||
// printf("Recruited as storageServer\n");
|
||||
|
||||
std::string filename =
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* IndexPrefetchDemo.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/MutationLogReader.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRef prefix = "prefix"_sr;
|
||||
const KeyRef RECORD = "RECORD"_sr;
|
||||
const KeyRef INDEX = "INDEX"_sr;
|
||||
|
||||
struct IndexPrefetchDemoWorkload : TestWorkload {
|
||||
bool enabled;
|
||||
|
||||
IndexPrefetchDemoWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
enabled = !clientId; // only do this on the "first" client
|
||||
}
|
||||
|
||||
std::string description() const override { return "IndexPrefetchDemo"; }
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (enabled) {
|
||||
return _start(cx, this);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
static KeyRef primaryKey(int i) { return KeyRef("primary-key-of-record-" + std::to_string(i)); }
|
||||
static KeyRef indexKey(int i) { return KeyRef("index-key-of-record-" + std::to_string(i)); }
|
||||
static KeyRef data(int i) { return KeyRef("data-of-record-" + std::to_string(i)); }
|
||||
|
||||
ACTOR Future<Void> fillInRecords(Database cx, int n) {
|
||||
std::cout << "start fillInRecords n=" << n << std::endl;
|
||||
// TODO: When n is large, split into multiple transactions.
|
||||
state Transaction tr(cx);
|
||||
try {
|
||||
tr.reset();
|
||||
for (int i = 0; i < n; i++) {
|
||||
tr.set(Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack(),
|
||||
Tuple().append(data(i)).pack());
|
||||
tr.set(Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack(),
|
||||
Tuple().pack());
|
||||
}
|
||||
wait(tr.commit());
|
||||
std::cout << "finished fillInRecords" << std::endl;
|
||||
} catch (Error& e) {
|
||||
std::cout << "failed fillInRecords" << std::endl;
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
static void showResult(const RangeResult& result) {
|
||||
std::cout << "result size: " << result.size() << std::endl;
|
||||
const KeyValueRef* it = result.begin();
|
||||
for (; it != result.end(); it++) {
|
||||
std::cout << "key=" << it->key.printable() << ", value=" << it->value.printable() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> scanRange(Database cx, KeyRangeRef range) {
|
||||
std::cout << "start scanRange " << range.toString() << std::endl;
|
||||
// TODO: When n is large, split into multiple transactions.
|
||||
state Transaction tr(cx);
|
||||
try {
|
||||
tr.reset();
|
||||
RangeResult result = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY));
|
||||
showResult(result);
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
std::cout << "finished scanRange" << std::endl;
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> scanRangeAndHop(Database cx, KeyRange range, Key hopInfo) {
|
||||
std::cout << "start scanRangeAndHop " << range.toString() << std::endl;
|
||||
// TODO: When n is large, split into multiple transactions.
|
||||
state Transaction tr(cx);
|
||||
try {
|
||||
tr.reset();
|
||||
RangeResult result = wait(tr.getRangeAndHop(KeySelector(firstGreaterOrEqual(range.begin), range.arena()),
|
||||
KeySelector(firstGreaterOrEqual(range.end), range.arena()),
|
||||
hopInfo,
|
||||
GetRangeLimits(CLIENT_KNOBS->TOO_MANY)));
|
||||
showResult(result);
|
||||
// result size: 2
|
||||
// key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-2\x00, value=\x01data-of-record-2\x00
|
||||
// key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-3\x00, value=\x01data-of-record-3\x00
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
std::cout << "finished scanRangeAndHop" << std::endl;
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(Database cx, IndexPrefetchDemoWorkload* self) {
|
||||
// TODO: Use toml to config
|
||||
wait(self->fillInRecords(cx, 5));
|
||||
|
||||
wait(self->scanRange(cx, normalKeys));
|
||||
|
||||
Key someIndexesBegin = Tuple().append(prefix).append(INDEX).append(indexKey(2)).getDataAsStandalone();
|
||||
Key someIndexesEnd = Tuple().append(prefix).append(INDEX).append(indexKey(4)).getDataAsStandalone();
|
||||
state KeyRange someIndexes = KeyRangeRef(someIndexesBegin, someIndexesEnd);
|
||||
wait(self->scanRange(cx, someIndexes));
|
||||
|
||||
Tuple hopInfoTuple;
|
||||
hopInfoTuple << prefix << RECORD << "{K[3]}"_sr;
|
||||
Key hopInfo = hopInfoTuple.getDataAsStandalone();
|
||||
wait(self->scanRangeAndHop(cx, someIndexes, hopInfo));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<IndexPrefetchDemoWorkload> IndexPrefetchDemoWorkloadFactory("IndexPrefetchDemo");
|
|
@ -613,6 +613,7 @@ inline static void flushOutputStreams() {
|
|||
#if defined(_MSC_VER)
|
||||
#define DLLEXPORT __declspec(dllexport)
|
||||
#elif defined(__GNUG__)
|
||||
#undef DLLEXPORT
|
||||
#define DLLEXPORT __attribute__((visibility("default")))
|
||||
#else
|
||||
#error Missing symbol export
|
||||
|
|
|
@ -159,6 +159,12 @@ ERROR( blocked_from_network_thread, 2026, "Detected a deadlock in a callback cal
|
|||
ERROR( invalid_config_db_range_read, 2027, "Invalid configuration database range read" )
|
||||
ERROR( invalid_config_db_key, 2028, "Invalid configuration database key provided" )
|
||||
ERROR( invalid_config_path, 2029, "Invalid configuration path" )
|
||||
ERROR( hop_bad_index, 2030, "The index in K[] or V[] is not a valid number or out of range" )
|
||||
ERROR( hop_no_such_key, 2031, "A hop key is not set in database" )
|
||||
ERROR( hop_bad_range_decriptor, 2032, "\"{...}\" must be the last element of the hop info tuple" )
|
||||
ERROR( hop_quick_get_key_values_has_more, 2033, "One of the secondary range queries is too large" )
|
||||
ERROR( hop_quick_get_value_miss, 2034, "Find a hop key that is not served in the same SS" )
|
||||
ERROR( hop_quick_get_key_values_miss, 2035, "Find a hop range that is not served in the same SS" )
|
||||
|
||||
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
|
||||
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )
|
||||
|
|
|
@ -150,6 +150,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
|
||||
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
|
||||
add_fdb_test(TEST_FILES fast/MutationLogReaderCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES fast/IndexPrefetchDemo.toml)
|
||||
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)
|
||||
add_fdb_test(TEST_FILES fast/RandomSelector.toml)
|
||||
add_fdb_test(TEST_FILES fast/RandomUnitTests.toml)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
[[test]]
|
||||
testTitle = 'IndexPrefetchDemo'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IndexPrefetchDemo'
|
Loading…
Reference in New Issue