diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index ecb78e4df7..e5545251f2 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -436,21 +436,12 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_addresses_for_key(FDBTransac return (FDBFuture*)(TXN(tr)->getAddressesForKey(KeyRef(key_name, key_name_length)).extractPtr()); } -FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr, - uint8_t const* begin_key_name, - int begin_key_name_length, - fdb_bool_t begin_or_equal, - int begin_offset, - uint8_t const* end_key_name, - int end_key_name_length, - fdb_bool_t end_or_equal, - int end_offset, - int limit, - int target_bytes, +// Set to the actual limit, target_bytes, and reverse. +FDBFuture* validate_and_update_parameters(int& limit, + int& target_bytes, FDBStreamingMode mode, int iteration, - fdb_bool_t snapshot, - fdb_bool_t reverse) { + fdb_bool_t& reverse) { /* This method may be called with a runtime API version of 13, in which negative row limits are a reverse range read */ if (g_api_version <= 13 && limit < 0) { @@ -500,6 +491,27 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr, else if (mode_bytes != GetRangeLimits::BYTE_LIMIT_UNLIMITED) target_bytes = std::min(target_bytes, mode_bytes); + return nullptr; +} + +FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr, + uint8_t const* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + uint8_t const* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse) { + FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse); + if (r != nullptr) + return r; return ( FDBFuture*)(TXN(tr) ->getRange( @@ -511,6 +523,60 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr, .extractPtr()); } +FDBFuture* fdb_transaction_get_range_and_flat_map_impl(FDBTransaction* tr, + uint8_t const* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + uint8_t const* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + uint8_t const* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse) { + FDBFuture* r = validate_and_update_parameters(limit, target_bytes, mode, iteration, reverse); + if (r != nullptr) + return r; + return ( + FDBFuture*)(TXN(tr) + ->getRangeAndFlatMap( + KeySelectorRef(KeyRef(begin_key_name, begin_key_name_length), begin_or_equal, begin_offset), + KeySelectorRef(KeyRef(end_key_name, end_key_name_length), end_or_equal, end_offset), + StringRef(mapper_name, mapper_name_length), + GetRangeLimits(limit, target_bytes), + snapshot, + reverse) + .extractPtr()); +} + +// TODO: Support FDB_API_ADDED in generate_asm.py and then this can be replaced with fdb_api_ptr_unimpl. +FDBFuture* fdb_transaction_get_range_and_flat_map_v699(FDBTransaction* tr, + uint8_t const* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + uint8_t const* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + uint8_t const* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse) { + fprintf(stderr, "UNIMPLEMENTED FDB API FUNCTION\n"); + abort(); +} + FDBFuture* fdb_transaction_get_range_selector_v13(FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, @@ -702,6 +768,7 @@ extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version // WARNING: use caution when implementing removed functions by calling public API functions. This can lead to // undesired behavior when using the multi-version API. Instead, it is better to have both the removed and public // functions call an internal implementation function. See fdb_create_database_impl for an example. + FDB_API_CHANGED(fdb_transaction_get_range_and_flat_map, 700); FDB_API_REMOVED(fdb_future_get_version, 620); FDB_API_REMOVED(fdb_create_cluster, 610); FDB_API_REMOVED(fdb_cluster_create_database, 610); diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 81bf10d8a8..d3c65537c5 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -244,6 +244,24 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range(FDBTransaction fdb_bool_t reverse); #endif +DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range_and_flat_map(FDBTransaction* tr, + uint8_t const* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + uint8_t const* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + uint8_t const* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse); + DLLEXPORT void fdb_transaction_set(FDBTransaction* tr, uint8_t const* key_name, int key_name_length, diff --git a/bindings/c/test/unit/fdb_api.cpp b/bindings/c/test/unit/fdb_api.cpp index e59085eeb9..f15db95c62 100644 --- a/bindings/c/test/unit/fdb_api.cpp +++ b/bindings/c/test/unit/fdb_api.cpp @@ -193,6 +193,41 @@ KeyValueArrayFuture Transaction::get_range(const uint8_t* begin_key_name, reverse)); } +KeyValueArrayFuture Transaction::get_range_and_flat_map(const uint8_t* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + const uint8_t* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + const uint8_t* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse) { + return KeyValueArrayFuture(fdb_transaction_get_range_and_flat_map(tr_, + begin_key_name, + begin_key_name_length, + begin_or_equal, + begin_offset, + end_key_name, + end_key_name_length, + end_or_equal, + end_offset, + mapper_name, + mapper_name_length, + limit, + target_bytes, + mode, + iteration, + snapshot, + reverse)); +} + EmptyFuture Transaction::watch(std::string_view key) { return EmptyFuture(fdb_transaction_watch(tr_, (const uint8_t*)key.data(), key.size())); } diff --git a/bindings/c/test/unit/fdb_api.hpp b/bindings/c/test/unit/fdb_api.hpp index 17f25d55ee..fb1304a26e 100644 --- a/bindings/c/test/unit/fdb_api.hpp +++ b/bindings/c/test/unit/fdb_api.hpp @@ -219,6 +219,24 @@ public: fdb_bool_t snapshot, fdb_bool_t reverse); + // Returns a future which will be set to an FDBKeyValue array. + KeyValueArrayFuture get_range_and_flat_map(const uint8_t* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + const uint8_t* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + const uint8_t* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse); + // Wrapper around fdb_transaction_watch. Returns a future representing an // empty value. EmptyFuture watch(std::string_view key); diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp index fe88e6b96f..f59c7f953b 100644 --- a/bindings/c/test/unit/unit_tests.cpp +++ b/bindings/c/test/unit/unit_tests.cpp @@ -40,6 +40,7 @@ #define DOCTEST_CONFIG_IMPLEMENT #include "doctest.h" #include "fdbclient/rapidjson/document.h" +#include "fdbclient/Tuple.h" #include "flow/config.h" @@ -76,7 +77,7 @@ fdb_error_t wait_future(fdb::Future& f) { // Given a string s, returns the "lowest" string greater than any string that // starts with s. Taken from // https://github.com/apple/foundationdb/blob/e7d72f458c6a985fdfa677ae021f357d6f49945b/flow/flow.cpp#L223. -std::string strinc(const std::string& s) { +std::string strinc_str(const std::string& s) { int index = -1; for (index = s.size() - 1; index >= 0; --index) { if ((uint8_t)s[index] != 255) { @@ -92,16 +93,16 @@ std::string strinc(const std::string& s) { return r; } -TEST_CASE("strinc") { - CHECK(strinc("a").compare("b") == 0); - CHECK(strinc("y").compare("z") == 0); - CHECK(strinc("!").compare("\"") == 0); - CHECK(strinc("*").compare("+") == 0); - CHECK(strinc("fdb").compare("fdc") == 0); - CHECK(strinc("foundation database 6").compare("foundation database 7") == 0); +TEST_CASE("strinc_str") { + CHECK(strinc_str("a").compare("b") == 0); + CHECK(strinc_str("y").compare("z") == 0); + CHECK(strinc_str("!").compare("\"") == 0); + CHECK(strinc_str("*").compare("+") == 0); + CHECK(strinc_str("fdb").compare("fdc") == 0); + CHECK(strinc_str("foundation database 6").compare("foundation database 7") == 0); char terminated[] = { 'a', 'b', '\xff' }; - CHECK(strinc(std::string(terminated, 3)).compare("ac") == 0); + CHECK(strinc_str(std::string(terminated, 3)).compare("ac") == 0); } // Helper function to add `prefix` to all keys in the given map. Returns a new @@ -117,7 +118,7 @@ std::map create_data(std::map& data) { fdb::Transaction tr(db); - auto end_key = strinc(prefix); + auto end_key = strinc_str(prefix); while (1) { tr.clear_range(prefix, end_key); for (const auto& [key, val] : data) { @@ -224,6 +225,59 @@ GetRangeResult get_range(fdb::Transaction& tr, return GetRangeResult{ results, out_more != 0, 0 }; } +GetRangeResult get_range_and_flat_map(fdb::Transaction& tr, + const uint8_t* begin_key_name, + int begin_key_name_length, + fdb_bool_t begin_or_equal, + int begin_offset, + const uint8_t* end_key_name, + int end_key_name_length, + fdb_bool_t end_or_equal, + int end_offset, + const uint8_t* mapper_name, + int mapper_name_length, + int limit, + int target_bytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse) { + fdb::KeyValueArrayFuture f1 = tr.get_range_and_flat_map(begin_key_name, + begin_key_name_length, + begin_or_equal, + begin_offset, + end_key_name, + end_key_name_length, + end_or_equal, + end_offset, + mapper_name, + mapper_name_length, + limit, + target_bytes, + mode, + iteration, + snapshot, + reverse); + + fdb_error_t err = wait_future(f1); + if (err) { + return GetRangeResult{ {}, false, err }; + } + + const FDBKeyValue* out_kv; + int out_count; + fdb_bool_t out_more; + fdb_check(f1.get(&out_kv, &out_count, &out_more)); + + std::vector> results; + for (int i = 0; i < out_count; ++i) { + std::string key((const char*)out_kv[i].key, out_kv[i].key_length); + std::string value((const char*)out_kv[i].value, out_kv[i].value_length); + results.emplace_back(key, value); + } + return GetRangeResult{ results, out_more != 0, 0 }; +} + // Clears all data in the database. void clear_data(FDBDatabase* db) { insert_data(db, {}); @@ -819,6 +873,86 @@ TEST_CASE("fdb_transaction_set_read_version future_version") { CHECK(err == 1009); // future_version } +const std::string EMPTY = Tuple().pack().toString(); +const KeyRef RECORD = "RECORD"_sr; +const KeyRef INDEX = "INDEX"_sr; +static KeyRef primaryKey(const int i) { + return KeyRef(format("primary-key-of-record-%08d", i)); +} +static KeyRef indexKey(const int i) { + return KeyRef(format("index-key-of-record-%08d", i)); +} +static ValueRef dataOfRecord(const int i) { + return KeyRef(format("data-of-record-%08d", i)); +} +static std::string indexEntryKey(const int i) { + return Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack().toString(); +} +static std::string recordKey(const int i) { + return Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack().toString(); +} +static std::string recordValue(const int i) { + return Tuple().append(dataOfRecord(i)).pack().toString(); +} + +TEST_CASE("fdb_transaction_get_range_and_flat_map") { + // Note: The user requested `prefix` should be added as the first element of the tuple that forms the key, rather + // than the prefix of the key. So we don't use key() or create_data() in this test. + std::map data; + for (int i = 0; i < 3; i++) { + data[indexEntryKey(i)] = EMPTY; + data[recordKey(i)] = recordValue(i); + } + insert_data(db, data); + + std::string mapper = Tuple().append(prefix).append(RECORD).append("{K[3]}"_sr).pack().toString(); + + fdb::Transaction tr(db); + // get_range_and_flat_map is only support without RYW. This is a must!!! + fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0)); + while (1) { + auto result = get_range_and_flat_map( + tr, + // [0, 1] + FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((const uint8_t*)indexEntryKey(0).c_str(), indexEntryKey(0).size()), + FDB_KEYSEL_FIRST_GREATER_THAN((const uint8_t*)indexEntryKey(1).c_str(), indexEntryKey(1).size()), + (const uint8_t*)mapper.c_str(), + mapper.size(), + /* limit */ 0, + /* target_bytes */ 0, + /* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL, + /* iteration */ 0, + /* snapshot */ false, + /* reverse */ 0); + + if (result.err) { + fdb::EmptyFuture f1 = tr.on_error(result.err); + fdb_check(wait_future(f1)); + continue; + } + + // Only the first 2 records are supposed to be returned. + if (result.kvs.size() < 2) { + CHECK(result.more); + // Retry. + continue; + } + + CHECK(result.kvs.size() == 2); + CHECK(!result.more); + for (int i = 0; i < 2; i++) { + const auto& [key, value] = result.kvs[i]; + std::cout << "result[" << i << "]: key=" << key << ", value=" << value << std::endl; + // OUTPUT: + // result[0]: key=fdbRECORDprimary-key-of-record-00000000, value=data-of-record-00000000 + // result[1]: key=fdbRECORDprimary-key-of-record-00000001, value=data-of-record-00000001 + CHECK(recordKey(i).compare(key) == 0); + CHECK(recordValue(i).compare(value) == 0); + } + break; + } +} + TEST_CASE("fdb_transaction_get_range reverse") { std::map data = create_data({ { "a", "1" }, { "b", "2" }, { "c", "3" }, { "d", "4" } }); insert_data(db, data); @@ -1726,7 +1860,7 @@ TEST_CASE("fdb_transaction_add_conflict_range") { fdb::Transaction tr2(db); while (1) { - fdb_check(tr2.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE)); + fdb_check(tr2.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE)); fdb::EmptyFuture f1 = tr2.commit(); fdb_error_t err = wait_future(f1); @@ -1739,8 +1873,8 @@ TEST_CASE("fdb_transaction_add_conflict_range") { } while (1) { - fdb_check(tr.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_READ)); - fdb_check(tr.add_conflict_range(key("a"), strinc(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE)); + fdb_check(tr.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_READ)); + fdb_check(tr.add_conflict_range(key("a"), strinc_str(key("a")), FDB_CONFLICT_RANGE_TYPE_WRITE)); fdb::EmptyFuture f1 = tr.commit(); fdb_error_t err = wait_future(f1); @@ -2217,7 +2351,7 @@ TEST_CASE("commit_does_not_reset") { continue; } - fdb_check(tr2.add_conflict_range(key("foo"), strinc(key("foo")), FDB_CONFLICT_RANGE_TYPE_READ)); + fdb_check(tr2.add_conflict_range(key("foo"), strinc_str(key("foo")), FDB_CONFLICT_RANGE_TYPE_READ)); tr2.set(key("foo"), "bar"); fdb::EmptyFuture tr2CommitFuture = tr2.commit(); err = wait_future(tr2CommitFuture); diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 587190d3a5..2a4ba668be 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -756,6 +756,76 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1 return (jlong)f; } +JNIEXPORT jlong JNICALL +Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeAndFlatMap(JNIEnv* jenv, + jobject, + jlong tPtr, + jbyteArray keyBeginBytes, + jboolean orEqualBegin, + jint offsetBegin, + jbyteArray keyEndBytes, + jboolean orEqualEnd, + jint offsetEnd, + jbyteArray mapperBytes, + jint rowLimit, + jint targetBytes, + jint streamingMode, + jint iteration, + jboolean snapshot, + jboolean reverse) { + if (!tPtr || !keyBeginBytes || !keyEndBytes || !mapperBytes) { + throwParamNotNull(jenv); + return 0; + } + FDBTransaction* tr = (FDBTransaction*)tPtr; + + uint8_t* barrBegin = (uint8_t*)jenv->GetByteArrayElements(keyBeginBytes, JNI_NULL); + if (!barrBegin) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* barrEnd = (uint8_t*)jenv->GetByteArrayElements(keyEndBytes, JNI_NULL); + if (!barrEnd) { + jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* barrMapper = (uint8_t*)jenv->GetByteArrayElements(mapperBytes, JNI_NULL); + if (!barrMapper) { + jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT); + jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_transaction_get_range_and_flat_map(tr, + barrBegin, + jenv->GetArrayLength(keyBeginBytes), + orEqualBegin, + offsetBegin, + barrEnd, + jenv->GetArrayLength(keyEndBytes), + orEqualEnd, + offsetEnd, + barrMapper, + jenv->GetArrayLength(mapperBytes), + rowLimit, + targetBytes, + (FDBStreamingMode)streamingMode, + iteration, + snapshot, + reverse); + jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT); + jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT); + jenv->ReleaseByteArrayElements(mapperBytes, (jbyte*)barrMapper, JNI_ABORT); + return (jlong)f; +} + JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(JNIEnv* jenv, jobject, jlong future, diff --git a/bindings/java/src/integration/com/apple/foundationdb/RangeAndFlatMapQueryIntegrationTest.java b/bindings/java/src/integration/com/apple/foundationdb/RangeAndFlatMapQueryIntegrationTest.java new file mode 100644 index 0000000000..e2801aafd6 --- /dev/null +++ b/bindings/java/src/integration/com/apple/foundationdb/RangeAndFlatMapQueryIntegrationTest.java @@ -0,0 +1,253 @@ +/* + * RangeAndFlatMapQueryIntegrationTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apple.foundationdb; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; + +import com.apple.foundationdb.async.AsyncIterable; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.tuple.ByteArrayUtil; +import com.apple.foundationdb.tuple.Tuple; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(RequiresDatabase.class) +class RangeAndFlatMapQueryIntegrationTest { + private static final FDB fdb = FDB.selectAPIVersion(710); + public String databaseArg = null; + private Database openFDB() { return fdb.open(databaseArg); } + + @BeforeEach + @AfterEach + void clearDatabase() throws Exception { + /* + * Empty the database before and after each run, just in case + */ + try (Database db = openFDB()) { + db.run(tr -> { + tr.clear(Range.startsWith(new byte[] { (byte)0x00 })); + return null; + }); + } + } + + static private final byte[] EMPTY = Tuple.from().pack(); + static private final String PREFIX = "prefix"; + static private final String RECORD = "RECORD"; + static private final String INDEX = "INDEX"; + static private String primaryKey(int i) { return String.format("primary-key-of-record-%08d", i); } + static private String indexKey(int i) { return String.format("index-key-of-record-%08d", i); } + static private String dataOfRecord(int i) { return String.format("data-of-record-%08d", i); } + + static byte[] MAPPER = Tuple.from(PREFIX, RECORD, "{K[3]}").pack(); + static private byte[] indexEntryKey(final int i) { + return Tuple.from(PREFIX, INDEX, indexKey(i), primaryKey(i)).pack(); + } + static private byte[] recordKey(final int i) { return Tuple.from(PREFIX, RECORD, primaryKey(i)).pack(); } + static private byte[] recordValue(final int i) { return Tuple.from(dataOfRecord(i)).pack(); } + + static private void insertRecordWithIndex(final Transaction tr, final int i) { + tr.set(indexEntryKey(i), EMPTY); + tr.set(recordKey(i), recordValue(i)); + } + + private static String getArgFromEnv() { + String[] clusterFiles = MultiClientHelper.readClusterFromEnv(); + String cluster = clusterFiles[0]; + System.out.printf("Using Cluster: %s\n", cluster); + return cluster; + } + public static void main(String[] args) throws Exception { + final RangeAndFlatMapQueryIntegrationTest test = new RangeAndFlatMapQueryIntegrationTest(); + test.databaseArg = getArgFromEnv(); + test.clearDatabase(); + test.comparePerformance(); + test.clearDatabase(); + } + + int numRecords = 10000; + int numQueries = 10000; + int numRecordsPerQuery = 100; + boolean validate = false; + @Test + void comparePerformance() { + FDB fdb = FDB.selectAPIVersion(710); + try (Database db = openFDB()) { + insertRecordsWithIndexes(numRecords, db); + instrument(rangeQueryAndGet, "rangeQueryAndGet", db); + instrument(rangeQueryAndFlatMap, "rangeQueryAndFlatMap", db); + } + } + + private void instrument(final RangeQueryWithIndex query, final String name, final Database db) { + System.out.printf("Starting %s (numQueries:%d, numRecordsPerQuery:%d)\n", name, numQueries, numRecordsPerQuery); + long startTime = System.currentTimeMillis(); + for (int queryId = 0; queryId < numQueries; queryId++) { + int begin = ThreadLocalRandom.current().nextInt(numRecords - numRecordsPerQuery); + query.run(begin, begin + numRecordsPerQuery, db); + } + long time = System.currentTimeMillis() - startTime; + System.out.printf("Finished %s, it takes %d ms for %d queries (%d qps)\n", name, time, numQueries, + numQueries * 1000L / time); + } + + static private final int RECORDS_PER_TXN = 100; + static private void insertRecordsWithIndexes(int n, Database db) { + int i = 0; + while (i < n) { + int begin = i; + int end = Math.min(n, i + RECORDS_PER_TXN); + // insert [begin, end) in one transaction + db.run(tr -> { + for (int t = begin; t < end; t++) { + insertRecordWithIndex(tr, t); + } + return null; + }); + i = end; + } + } + + public interface RangeQueryWithIndex { + void run(int begin, int end, Database db); + } + + RangeQueryWithIndex rangeQueryAndGet = (int begin, int end, Database db) -> db.run(tr -> { + try { + List kvs = tr.getRange(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)), + KeySelector.firstGreaterOrEqual(indexEntryKey(end)), + ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL) + .asList() + .get(); + Assertions.assertEquals(end - begin, kvs.size()); + + // Get the records of each index entry IN PARALLEL. + List> resultFutures = new ArrayList<>(); + // In reality, we need to get the record key by parsing the index entry key. But considering this is a + // performance test, we just ignore the returned key and simply generate it from recordKey. + for (int id = begin; id < end; id++) { + resultFutures.add(tr.get(recordKey(id))); + } + AsyncUtil.whenAll(resultFutures).get(); + + if (validate) { + final Iterator indexes = kvs.iterator(); + final Iterator> records = resultFutures.iterator(); + for (int id = begin; id < end; id++) { + Assertions.assertTrue(indexes.hasNext()); + assertByteArrayEquals(indexEntryKey(id), indexes.next().getKey()); + Assertions.assertTrue(records.hasNext()); + assertByteArrayEquals(recordValue(id), records.next().get()); + } + Assertions.assertFalse(indexes.hasNext()); + Assertions.assertFalse(records.hasNext()); + } + } catch (Exception e) { + Assertions.fail("Unexpected exception", e); + } + return null; + }); + + RangeQueryWithIndex rangeQueryAndFlatMap = (int begin, int end, Database db) -> db.run(tr -> { + try { + tr.options().setReadYourWritesDisable(); + List kvs = + tr.getRangeAndFlatMap(KeySelector.firstGreaterOrEqual(indexEntryKey(begin)), + KeySelector.firstGreaterOrEqual(indexEntryKey(end)), MAPPER, + ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL) + .asList() + .get(); + Assertions.assertEquals(end - begin, kvs.size()); + + if (validate) { + final Iterator results = kvs.iterator(); + for (int id = begin; id < end; id++) { + Assertions.assertTrue(results.hasNext()); + assertByteArrayEquals(recordValue(id), results.next().getValue()); + } + Assertions.assertFalse(results.hasNext()); + } + } catch (Exception e) { + Assertions.fail("Unexpected exception", e); + } + return null; + }); + + void assertByteArrayEquals(byte[] expected, byte[] actual) { + Assertions.assertEquals(ByteArrayUtil.printable(expected), ByteArrayUtil.printable(actual)); + } + + @Test + void rangeAndFlatMapQueryOverMultipleRows() throws Exception { + try (Database db = openFDB()) { + insertRecordsWithIndexes(3, db); + + List expected_data_of_records = new ArrayList<>(); + for (int i = 0; i <= 1; i++) { + expected_data_of_records.add(recordValue(i)); + } + + db.run(tr -> { + // getRangeAndFlatMap is only support without RYW. This is a must!!! + tr.options().setReadYourWritesDisable(); + + Iterator kvs = + tr.getRangeAndFlatMap(KeySelector.firstGreaterOrEqual(indexEntryKey(0)), + KeySelector.firstGreaterThan(indexEntryKey(1)), MAPPER, + ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL) + .iterator(); + Iterator expected_data_of_records_iter = expected_data_of_records.iterator(); + while (expected_data_of_records_iter.hasNext()) { + Assertions.assertTrue(kvs.hasNext(), "iterator ended too early"); + KeyValue kv = kvs.next(); + byte[] actual_data_of_record = kv.getValue(); + byte[] expected_data_of_record = expected_data_of_records_iter.next(); + + // System.out.println("result key:" + ByteArrayUtil.printable(kv.getKey()) + " value:" + + // ByteArrayUtil.printable(kv.getValue())); Output: + // result + // key:\x02prefix\x00\x02INDEX\x00\x02index-key-of-record-0\x00\x02primary-key-of-record-0\x00 + // value:\x02data-of-record-0\x00 + // result + // key:\x02prefix\x00\x02INDEX\x00\x02index-key-of-record-1\x00\x02primary-key-of-record-1\x00 + // value:\x02data-of-record-1\x00 + + // For now, we don't guarantee what that the returned keys mean. + Assertions.assertArrayEquals(expected_data_of_record, actual_data_of_record, + "Incorrect data of record!"); + } + Assertions.assertFalse(kvs.hasNext(), "Iterator returned too much data"); + + return null; + }); + } + } +} diff --git a/bindings/java/src/junit/com/apple/foundationdb/FakeFDBTransaction.java b/bindings/java/src/junit/com/apple/foundationdb/FakeFDBTransaction.java index f154790b2b..0c5a121c64 100644 --- a/bindings/java/src/junit/com/apple/foundationdb/FakeFDBTransaction.java +++ b/bindings/java/src/junit/com/apple/foundationdb/FakeFDBTransaction.java @@ -88,8 +88,11 @@ public class FakeFDBTransaction extends FDBTransaction { public int getNumRangeCalls() { return numRangeCalls; } @Override - protected FutureResults getRange_internal(KeySelector begin, KeySelector end, int rowLimit, int targetBytes, - int streamingMode, int iteration, boolean isSnapshot, boolean reverse) { + protected FutureResults getRange_internal(KeySelector begin, KeySelector end, + // TODO: map is not supported in FakeFDBTransaction yet. + byte[] mapper, // Nullable + int rowLimit, int targetBytes, int streamingMode, int iteration, + boolean isSnapshot, boolean reverse) { numRangeCalls++; // TODO this is probably not correct for all KeySelector instances--we'll want to match with real behavior NavigableMap range = diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java index 05431a0fba..9bd99c892d 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java @@ -91,6 +91,15 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC return FDBTransaction.this.getRangeSplitPoints(range, chunkSize); } + @Override + public AsyncIterable getRangeAndFlatMap(KeySelector begin, KeySelector end, byte[] mapper, int limit, + boolean reverse, StreamingMode mode) { + if (mapper == null) { + throw new IllegalArgumentException("Mapper must be non-null"); + } + return new RangeQuery(FDBTransaction.this, true, begin, end, mapper, limit, reverse, mode, eventKeeper); + } + /////////////////// // getRange -> KeySelectors /////////////////// @@ -338,6 +347,15 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC return this.getRangeSplitPoints(range.begin, range.end, chunkSize); } + @Override + public AsyncIterable getRangeAndFlatMap(KeySelector begin, KeySelector end, byte[] mapper, int limit, + boolean reverse, StreamingMode mode) { + if (mapper == null) { + throw new IllegalArgumentException("Mapper must be non-null"); + } + return new RangeQuery(this, false, begin, end, mapper, limit, reverse, mode, eventKeeper); + } + /////////////////// // getRange -> KeySelectors /////////////////// @@ -415,10 +433,10 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC } // Users of this function must close the returned FutureResults when finished - protected FutureResults getRange_internal( - KeySelector begin, KeySelector end, - int rowLimit, int targetBytes, int streamingMode, - int iteration, boolean isSnapshot, boolean reverse) { + protected FutureResults getRange_internal(KeySelector begin, KeySelector end, + byte[] mapper, // Nullable + int rowLimit, int targetBytes, int streamingMode, int iteration, + boolean isSnapshot, boolean reverse) { if (eventKeeper != null) { eventKeeper.increment(Events.JNI_CALL); } @@ -429,10 +447,14 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode, iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/ return new FutureResults( - Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), - end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes, - streamingMode, iteration, isSnapshot, reverse), - FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper); + mapper == null + ? Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), end.getKey(), + end.orEqual(), end.getOffset(), rowLimit, targetBytes, streamingMode, + iteration, isSnapshot, reverse) + : Transaction_getRangeAndFlatMap(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), + end.getKey(), end.orEqual(), end.getOffset(), mapper, rowLimit, + targetBytes, streamingMode, iteration, isSnapshot, reverse), + FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper); } finally { pointerReadLock.unlock(); } @@ -771,6 +793,12 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC byte[] keyEnd, boolean orEqualEnd, int offsetEnd, int rowLimit, int targetBytes, int streamingMode, int iteration, boolean isSnapshot, boolean reverse); + private native long Transaction_getRangeAndFlatMap(long cPtr, byte[] keyBegin, boolean orEqualBegin, + int offsetBegin, byte[] keyEnd, boolean orEqualEnd, + int offsetEnd, + byte[] mapper, // Nonnull + int rowLimit, int targetBytes, int streamingMode, int iteration, + boolean isSnapshot, boolean reverse); private native void Transaction_addConflictRange(long cPtr, byte[] keyBegin, byte[] keyEnd, int conflictRangeType); private native void Transaction_set(long cPtr, byte[] key, byte[] value); diff --git a/bindings/java/src/main/com/apple/foundationdb/RangeQuery.java b/bindings/java/src/main/com/apple/foundationdb/RangeQuery.java index d518a0b9db..f91b00471a 100644 --- a/bindings/java/src/main/com/apple/foundationdb/RangeQuery.java +++ b/bindings/java/src/main/com/apple/foundationdb/RangeQuery.java @@ -49,17 +49,19 @@ class RangeQuery implements AsyncIterable { private final FDBTransaction tr; private final KeySelector begin; private final KeySelector end; + private final byte[] mapper; // Nullable private final boolean snapshot; private final int rowLimit; private final boolean reverse; private final StreamingMode streamingMode; private final EventKeeper eventKeeper; - RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit, - boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) { + RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, byte[] mapper, + int rowLimit, boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) { this.tr = transaction; this.begin = begin; this.end = end; + this.mapper = mapper; this.snapshot = isSnapshot; this.rowLimit = rowLimit; this.reverse = reverse; @@ -67,6 +69,12 @@ class RangeQuery implements AsyncIterable { this.eventKeeper = eventKeeper; } + // RangeQueryAndFlatMap + RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit, + boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) { + this(transaction, isSnapshot, begin, end, null, rowLimit, reverse, streamingMode, eventKeeper); + } + /** * Returns all the results from the range requested as a {@code List}. If there were no * limits on the original query and there is a large amount of data in the database @@ -83,16 +91,16 @@ class RangeQuery implements AsyncIterable { // if the streaming mode is EXACT, try and grab things as one chunk if(mode == StreamingMode.EXACT) { - FutureResults range = tr.getRange_internal( - this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(), - 1, this.snapshot, this.reverse); + + FutureResults range = tr.getRange_internal(this.begin, this.end, this.mapper, this.rowLimit, 0, + StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse); return range.thenApply(result -> result.get().values) .whenComplete((result, e) -> range.close()); } // If the streaming mode is not EXACT, simply collect the results of an // iteration into a list - return AsyncUtil.collect(new RangeQuery(tr, snapshot, begin, end, rowLimit, reverse, mode, eventKeeper), + return AsyncUtil.collect(new RangeQuery(tr, snapshot, begin, end, mapper, rowLimit, reverse, mode, eventKeeper), tr.getExecutor()); } @@ -221,8 +229,8 @@ class RangeQuery implements AsyncIterable { nextFuture = new CompletableFuture<>(); final long sTime = System.nanoTime(); - fetchingChunk = tr.getRange_internal(begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(), - ++iteration, snapshot, reverse); + fetchingChunk = tr.getRange_internal(begin, end, mapper, rowsLimited ? rowsRemaining : 0, 0, + streamingMode.code(), ++iteration, snapshot, reverse); BiConsumer cons = new FetchComplete(fetchingChunk,nextFuture); if(eventKeeper!=null){ diff --git a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java index 1dabc08c93..699dfd3ec0 100644 --- a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java @@ -424,6 +424,42 @@ public interface ReadTransaction extends ReadTransactionContext { AsyncIterable getRange(Range range, int limit, boolean reverse, StreamingMode mode); + /** + * Gets an ordered range of keys and values from the database. The begin + * and end keys are specified by {@code KeySelector}s, with the begin + * {@code KeySelector} inclusive and the end {@code KeySelector} exclusive. + * + * @see KeySelector + * @see AsyncIterator + * + * @param begin the beginning of the range (inclusive) + * @param end the end of the range (exclusive) + * @param mapper TODO + * @param limit the maximum number of results to return. Limits results to the + * first keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query + * should not limit the number of results. If {@code reverse} is {@code true} rows + * will be limited starting at the end of the range. + * @param reverse return results starting at the end of the range in reverse order. + * Reading ranges in reverse is supported natively by the database and should + * have minimal extra cost. + * @param mode provide a hint about how the results are to be used. This + * can provide speed improvements or efficiency gains based on the caller's + * knowledge of the upcoming access pattern. + * + *

+ * When converting the result of this query to a list using {@link AsyncIterable#asList()} with the {@code + * ITERATOR} streaming mode, the query is automatically modified to fetch results in larger batches. This is done + * because it is known in advance that the {@link AsyncIterable#asList()} function will fetch all results in the + * range. If a limit is specified, the {@code EXACT} streaming mode will be used, and otherwise it will use {@code + * WANT_ALL}. + * + * To achieve comparable performance when iterating over an entire range without using {@link + * AsyncIterable#asList()}, the same streaming mode would need to be used. + *

+ * @return a handle to access the results of the asynchronous call + */ + AsyncIterable getRangeAndFlatMap(KeySelector begin, KeySelector end, byte[] mapper, int limit, + boolean reverse, StreamingMode mode); /** * Gets an estimate for the number of bytes stored in the given range. diff --git a/bindings/java/src/tests.cmake b/bindings/java/src/tests.cmake index 3e9dce6657..b84c148ac2 100644 --- a/bindings/java/src/tests.cmake +++ b/bindings/java/src/tests.cmake @@ -52,6 +52,7 @@ set(JAVA_INTEGRATION_TESTS src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java src/integration/com/apple/foundationdb/RepeatableReadMultiThreadClientTest.java + src/integration/com/apple/foundationdb/RangeAndFlatMapQueryIntegrationTest.java ) # Resources that are used in integration testing, but are not explicitly test files (JUnit rules, diff --git a/documentation/sphinx/source/release-notes/release-notes-700.rst b/documentation/sphinx/source/release-notes/release-notes-700.rst index 770c4c9af5..a1abd08366 100644 --- a/documentation/sphinx/source/release-notes/release-notes-700.rst +++ b/documentation/sphinx/source/release-notes/release-notes-700.rst @@ -30,6 +30,7 @@ Features * Improved the efficiency with which storage servers replicate data between themselves. `(PR #5017) `_ * Added support to ``exclude command`` to exclude based on locality match. `(PR #5113) `_ * Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5328) `_ +* Added "get range and flat map" feature with new APIs (see Bindings section). Storage servers are able to generate the keys in the queries based on another query. With this, upper layer can push some computations down to FDB, to improve latency and bandwidth when read. `(PR #5609) `_ Performance ----------- @@ -86,6 +87,8 @@ Bindings * C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) `_ * C: Added ``fdb_database_get_main_thread_busyness`` function to report how busy a client's main thread is. `(PR #4504) `_ * Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) `_ +* C: Added ``fdb_transaction_get_range_and_flat_map`` function to support running queries based on another query in one request. `(PR #5609) `_ +* Java: Added ``Transaction.getRangeAndFlatMap`` function to support running queries based on another query in one request. `(PR #5609) `_ Other Changes ------------- diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 837d4ec793..4c038a6fc0 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -369,6 +369,7 @@ public: Counter transactionGetKeyRequests; Counter transactionGetValueRequests; Counter transactionGetRangeRequests; + Counter transactionGetRangeAndFlatMapRequests; Counter transactionGetRangeStreamRequests; Counter transactionWatchRequests; Counter transactionGetAddressesForKeyRequests; diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index cf304202bb..5562ad3ba6 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -59,6 +59,12 @@ public: GetRangeLimits limits, bool snapshot = false, bool reverse = false) = 0; + virtual ThreadFuture getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot = false, + bool reverse = false) = 0; virtual ThreadFuture>> getAddressesForKey(const KeyRef& key) = 0; virtual ThreadFuture> getVersionstamp() = 0; diff --git a/fdbclient/ISingleThreadTransaction.h b/fdbclient/ISingleThreadTransaction.h index 9228184593..62336a15d7 100644 --- a/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/ISingleThreadTransaction.h @@ -63,6 +63,12 @@ public: GetRangeLimits limits, Snapshot = Snapshot::False, Reverse = Reverse::False) = 0; + virtual Future getRangeAndFlatMap(KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Snapshot = Snapshot::False, + Reverse = Reverse::False) = 0; virtual Future>> getAddressesForKey(Key const& key) = 0; virtual Future>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) = 0; virtual Future getEstimatedRangeSizeBytes(KeyRange const& keys) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 9d701439d9..32603d1f53 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -141,6 +141,41 @@ ThreadFuture DLTransaction::getRange(const KeyRangeRef& keys, return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse); } +ThreadFuture DLTransaction::getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) { + FdbCApi::FDBFuture* f = api->transactionGetRangeAndFlatMap(tr, + begin.getKey().begin(), + begin.getKey().size(), + begin.orEqual, + begin.offset, + end.getKey().begin(), + end.getKey().size(), + end.orEqual, + end.offset, + mapper.begin(), + mapper.size(), + limits.rows, + limits.bytes, + FDB_STREAMING_MODE_EXACT, + 0, + snapshot, + reverse); + return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + const FdbCApi::FDBKeyValue* kvs; + int count; + FdbCApi::fdb_bool_t more; + FdbCApi::fdb_error_t error = api->futureGetKeyValueArray(f, &kvs, &count, &more); + ASSERT(!error); + + // The memory for this is stored in the FDBFuture and is released when the future gets destroyed + return RangeResult(RangeResultRef(VectorRef((KeyValueRef*)kvs, count), more), Arena()); + }); +} + ThreadFuture>> DLTransaction::getAddressesForKey(const KeyRef& key) { FdbCApi::FDBFuture* f = api->transactionGetAddressesForKey(tr, key.begin(), key.size()); @@ -452,6 +487,7 @@ void DLApi::init() { loadClientFunction(&api->transactionGetKey, lib, fdbCPath, "fdb_transaction_get_key"); loadClientFunction(&api->transactionGetAddressesForKey, lib, fdbCPath, "fdb_transaction_get_addresses_for_key"); loadClientFunction(&api->transactionGetRange, lib, fdbCPath, "fdb_transaction_get_range"); + loadClientFunction(&api->transactionGetRangeAndFlatMap, lib, fdbCPath, "fdb_transaction_get_range_and_flat_map"); loadClientFunction( &api->transactionGetVersionstamp, lib, fdbCPath, "fdb_transaction_get_versionstamp", headerVersion >= 410); loadClientFunction(&api->transactionSet, lib, fdbCPath, "fdb_transaction_set"); @@ -731,6 +767,18 @@ ThreadFuture MultiVersionTransaction::getRange(const KeyRangeRef& k return abortableFuture(f, tr.onChange); } +ThreadFuture MultiVersionTransaction::getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) { + auto tr = getTransaction(); + auto f = tr.transaction ? tr.transaction->getRangeAndFlatMap(begin, end, mapper, limits, snapshot, reverse) + : makeTimeout(); + return abortableFuture(f, tr.onChange); +} + ThreadFuture> MultiVersionTransaction::getVersionstamp() { auto tr = getTransaction(); auto f = tr.transaction ? tr.transaction->getVersionstamp() : makeTimeout>(); diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 95d9a8b14c..50e21bca57 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -118,6 +118,23 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int iteration, fdb_bool_t snapshot, fdb_bool_t reverse); + FDBFuture* (*transactionGetRangeAndFlatMap)(FDBTransaction* tr, + uint8_t const* beginKeyName, + int beginKeyNameLength, + fdb_bool_t beginOrEqual, + int beginOffset, + uint8_t const* endKeyName, + int endKeyNameLength, + fdb_bool_t endOrEqual, + int endOffset, + uint8_t const* mapper_name, + int mapper_name_length, + int limit, + int targetBytes, + FDBStreamingMode mode, + int iteration, + fdb_bool_t snapshot, + fdb_bool_t reverse); FDBFuture* (*transactionGetVersionstamp)(FDBTransaction* tr); void (*transactionSet)(FDBTransaction* tr, @@ -219,6 +236,12 @@ public: GetRangeLimits limits, bool snapshot = false, bool reverse = false) override; + ThreadFuture getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) override; ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; @@ -360,6 +383,12 @@ public: GetRangeLimits limits, bool snapshot = false, bool reverse = false) override; + ThreadFuture getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) override; ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 117524a43a..91c80a5ad9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -160,6 +160,8 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics)); queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics)); + queueModel.updateTssEndpoint(ssi.getKeyValuesAndFlatMap.getEndpoint().token.first(), + TSSEndpointData(tssi.id(), tssi.getKeyValuesAndFlatMap.getEndpoint(), metrics)); queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics)); @@ -183,6 +185,7 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) { queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first()); + queueModel.removeTssEndpoint(ssi.getKeyValuesAndFlatMap.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first()); @@ -1196,6 +1199,7 @@ DatabaseContext::DatabaseContext(Reference watchValueMap(Future version, return Void(); } -void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesRequest& req) { +template +void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesFamilyRequest& req) { if (limits.bytes != 0) { if (!limits.hasRowLimit()) req.limit = CLIENT_KNOBS->REPLY_BYTE_LIMIT; // Can't get more than this many rows anyway @@ -3049,26 +3055,47 @@ void transformRangeLimits(GetRangeLimits limits, Reverse reverse, GetKeyValuesRe } } -ACTOR Future getExactRange(Database cx, - Version version, - KeyRange keys, - GetRangeLimits limits, - Reverse reverse, - TransactionInfo info, - TagSet tags) { +template +RequestStream StorageServerInterface::*getRangeRequestStream() { + if constexpr (std::is_same::value) { + return &StorageServerInterface::getKeyValues; + } else if (std::is_same::value) { + return &StorageServerInterface::getKeyValuesAndFlatMap; + } else { + UNREACHABLE(); + } +} + +ACTOR template +Future getExactRange(Database cx, + Version version, + KeyRange keys, + Key mapper, + GetRangeLimits limits, + Reverse reverse, + TransactionInfo info, + TagSet tags) { state RangeResult output; state Span span("NAPI:getExactRange"_loc, info.spanID); // printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); loop { - state std::vector>> locations = wait(getKeyRangeLocations( - cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues, info)); + state std::vector>> locations = + wait(getKeyRangeLocations(cx, + keys, + CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, + reverse, + getRangeRequestStream(), + info)); ASSERT(locations.size()); state int shard = 0; loop { const KeyRangeRef& range = locations[shard].first; - GetKeyValuesRequest req; + GetKeyValuesFamilyRequest req; + req.mapper = mapper; + req.arena.dependsOn(mapper.arena()); + req.version = version; req.begin = firstGreaterOrEqual(range.begin); req.end = firstGreaterOrEqual(range.end); @@ -3098,14 +3125,14 @@ ACTOR Future getExactRange(Database cx, .detail("Servers", locations[shard].second->description());*/ } ++cx->transactionPhysicalReads; - state GetKeyValuesReply rep; + state GetKeyValuesFamilyReply rep; try { choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetKeyValuesReply _rep = + when(GetKeyValuesFamilyReply _rep = wait(loadBalance(cx.getPtr(), locations[shard].second, - &StorageServerInterface::getKeyValues, + getRangeRequestStream(), req, TaskPriority::DefaultPromiseEndpoint, AtMostOnce::False, @@ -3155,7 +3182,7 @@ ACTOR Future getExactRange(Database cx, .detail("BlockBytes", rep.data.expectedSize()); ASSERT(false); } - TEST(true); // GetKeyValuesReply.more in getExactRange + TEST(true); // GetKeyValuesFamilyReply.more in getExactRange // Make next request to the same shard with a beginning key just after the last key returned if (reverse) locations[shard].first = @@ -3231,14 +3258,16 @@ Future resolveKey(Database const& cx, return getKey(cx, key, version, info, tags); } -ACTOR Future getRangeFallback(Database cx, - Version version, - KeySelector begin, - KeySelector end, - GetRangeLimits limits, - Reverse reverse, - TransactionInfo info, - TagSet tags) { +ACTOR template +Future getRangeFallback(Database cx, + Version version, + KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Reverse reverse, + TransactionInfo info, + TagSet tags) { if (version == latestVersion) { state Transaction transaction(cx); transaction.setOption(FDBTransactionOptions::CAUSAL_READ_RISKY); @@ -3261,7 +3290,8 @@ ACTOR Future getRangeFallback(Database cx, // if b is allKeys.begin, we have either read through the beginning of the database, // or allKeys.begin exists in the database and will be part of the conflict range anyways - RangeResult _r = wait(getExactRange(cx, version, KeyRangeRef(b, e), limits, reverse, info, tags)); + RangeResult _r = wait(getExactRange( + cx, version, KeyRangeRef(b, e), mapper, limits, reverse, info, tags)); RangeResult r = _r; if (b == allKeys.begin && ((reverse && !r.more) || !reverse)) @@ -3286,6 +3316,7 @@ ACTOR Future getRangeFallback(Database cx, return r; } +// TODO: Client should add mapped keys to conflict ranges. void getRangeFinished(Database cx, Reference trLogInfo, double startTime, @@ -3340,17 +3371,23 @@ void getRangeFinished(Database cx, } } -ACTOR Future getRange(Database cx, - Reference trLogInfo, - Future fVersion, - KeySelector begin, - KeySelector end, - GetRangeLimits limits, - Promise> conflictRange, - Snapshot snapshot, - Reverse reverse, - TransactionInfo info, - TagSet tags) { +// GetKeyValuesFamilyRequest: GetKeyValuesRequest or GetKeyValuesAndFlatMapRequest +// GetKeyValuesFamilyReply: GetKeyValuesReply or GetKeyValuesAndFlatMapReply +// Sadly we need GetKeyValuesFamilyReply because cannot do something like: state +// REPLY_TYPE(GetKeyValuesFamilyRequest) rep; +ACTOR template +Future getRange(Database cx, + Reference trLogInfo, + Future fVersion, + KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Promise> conflictRange, + Snapshot snapshot, + Reverse reverse, + TransactionInfo info, + TagSet tags) { state GetRangeLimits originalLimits(limits); state KeySelector originalBegin = begin; state KeySelector originalEnd = end; @@ -3384,11 +3421,13 @@ ACTOR Future getRange(Database cx, Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()); Reverse locationBackward{ reverse ? (end - 1).isBackward() : begin.isBackward() }; - state std::pair> beginServer = - wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKeyValues, info, locationBackward)); + state std::pair> beginServer = wait(getKeyLocation( + cx, locationKey, getRangeRequestStream(), info, locationBackward)); state KeyRange shard = beginServer.first; state bool modifiedSelectors = false; - state GetKeyValuesRequest req; + state GetKeyValuesFamilyRequest req; + req.mapper = mapper; + req.arena.dependsOn(mapper.arena()); req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys); req.version = readVersion; @@ -3447,17 +3486,17 @@ ACTOR Future getRange(Database cx, } ++cx->transactionPhysicalReads; - state GetKeyValuesReply rep; + state GetKeyValuesFamilyReply rep; try { if (CLIENT_BUGGIFY_WITH_PROB(.01)) { throw deterministicRandom()->randomChoice( std::vector{ transaction_too_old(), future_version() }); } // state AnnotateActor annotation(currentLineage); - GetKeyValuesReply _rep = + GetKeyValuesFamilyReply _rep = wait(loadBalance(cx.getPtr(), beginServer.second, - &StorageServerInterface::getKeyValues, + getRangeRequestStream(), req, TaskPriority::DefaultPromiseEndpoint, AtMostOnce::False, @@ -3557,11 +3596,11 @@ ACTOR Future getRange(Database cx, if (!rep.more) { ASSERT(modifiedSelectors); - TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange + TEST(true); // !GetKeyValuesFamilyReply.more and modifiedSelectors in getRange if (!rep.data.size()) { - RangeResult result = wait(getRangeFallback( - cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags)); + RangeResult result = wait(getRangeFallback( + cx, version, originalBegin, originalEnd, mapper, originalLimits, reverse, info, tags)); getRangeFinished(cx, trLogInfo, startTime, @@ -3579,7 +3618,7 @@ ACTOR Future getRange(Database cx, else begin = firstGreaterOrEqual(shard.end); } else { - TEST(true); // GetKeyValuesReply.more in getRange + TEST(true); // GetKeyValuesFamilyReply.more in getRange if (reverse) end = firstGreaterOrEqual(output[output.size() - 1].key); else @@ -3597,8 +3636,8 @@ ACTOR Future getRange(Database cx, Reverse{ reverse ? (end - 1).isBackward() : begin.isBackward() }); if (e.code() == error_code_wrong_shard_server) { - RangeResult result = wait(getRangeFallback( - cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags)); + RangeResult result = wait(getRangeFallback( + cx, version, originalBegin, originalEnd, mapper, originalLimits, reverse, info, tags)); getRangeFinished(cx, trLogInfo, startTime, @@ -4164,17 +4203,18 @@ Future getRange(Database const& cx, Reverse const& reverse, TransactionInfo const& info, TagSet const& tags) { - return getRange(cx, - Reference(), - fVersion, - begin, - end, - limits, - Promise>(), - Snapshot::True, - reverse, - info, - tags); + return getRange(cx, + Reference(), + fVersion, + begin, + end, + ""_sr, + limits, + Promise>(), + Snapshot::True, + reverse, + info, + tags); } bool DatabaseContext::debugUseTags = false; @@ -4469,13 +4509,26 @@ Future Transaction::getKey(const KeySelector& key, Snapshot snapshot) { return getKeyAndConflictRange(cx, key, getReadVersion(), conflictRange, info, options.readTags); } -Future Transaction::getRange(const KeySelector& begin, - const KeySelector& end, - GetRangeLimits limits, - Snapshot snapshot, - Reverse reverse) { +template +void increaseCounterForRequest(Database cx) { + if constexpr (std::is_same::value) { + ++cx->transactionGetRangeRequests; + } else if (std::is_same::value) { + ++cx->transactionGetRangeAndFlatMapRequests; + } else { + UNREACHABLE(); + } +} + +template +Future Transaction::getRangeInternal(const KeySelector& begin, + const KeySelector& end, + const Key& mapper, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse) { ++cx->transactionLogicalReads; - ++cx->transactionGetRangeRequests; + increaseCounterForRequest(cx); if (limits.isReached()) return RangeResult(); @@ -4507,8 +4560,37 @@ Future Transaction::getRange(const KeySelector& begin, extraConflictRanges.push_back(conflictRange.getFuture()); } - return ::getRange( - cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info, options.readTags); + return ::getRange(cx, + trLogInfo, + getReadVersion(), + b, + e, + mapper, + limits, + conflictRange, + snapshot, + reverse, + info, + options.readTags); +} + +Future Transaction::getRange(const KeySelector& begin, + const KeySelector& end, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse) { + return getRangeInternal(begin, end, ""_sr, limits, snapshot, reverse); +} + +Future Transaction::getRangeAndFlatMap(const KeySelector& begin, + const KeySelector& end, + const Key& mapper, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse) { + + return getRangeInternal( + begin, end, mapper, limits, snapshot, reverse); } Future Transaction::getRange(const KeySelector& begin, diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 6bd5ab892e..af5b2b7419 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -289,6 +289,23 @@ public: reverse); } + [[nodiscard]] Future getRangeAndFlatMap(const KeySelector& begin, + const KeySelector& end, + const Key& mapper, + GetRangeLimits limits, + Snapshot = Snapshot::False, + Reverse = Reverse::False); + +private: + template + Future getRangeInternal(const KeySelector& begin, + const KeySelector& end, + const Key& mapper, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse); + +public: // A method for streaming data from the storage server that is more efficient than getRange when reading large // amounts of data [[nodiscard]] Future getRangeStream(const PromiseStream>& results, diff --git a/fdbclient/PaxosConfigTransaction.h b/fdbclient/PaxosConfigTransaction.h index 4dfceb7a28..3854d4be96 100644 --- a/fdbclient/PaxosConfigTransaction.h +++ b/fdbclient/PaxosConfigTransaction.h @@ -50,6 +50,14 @@ public: GetRangeLimits limits, Snapshot = Snapshot::False, Reverse = Reverse::False) override; + Future getRangeAndFlatMap(KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Snapshot = Snapshot::False, + Reverse = Reverse::False) override { + throw client_invalid_operation(); + } void set(KeyRef const& key, ValueRef const& value) override; void clear(KeyRangeRef const&) override { throw client_invalid_operation(); } void clear(KeyRef const&) override; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index f156a36c85..56ce22fd07 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -74,6 +74,16 @@ public: using Result = RangeResult; }; + template + struct GetRangeAndFlatMapReq { + GetRangeAndFlatMapReq(KeySelector begin, KeySelector end, Key mapper, GetRangeLimits limits) + : begin(begin), end(end), mapper(mapper), limits(limits) {} + KeySelector begin, end; + Key mapper; + GetRangeLimits limits; + using Result = RangeResult; + }; + // read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW // reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator. Fills in the snapshot cache // as a side effect but does not affect conflict ranges. Some (indicated) overloads of read are required to update @@ -203,6 +213,36 @@ public: return v; } + ACTOR template + static Future readThroughAndFlatMap(ReadYourWritesTransaction* ryw, + GetRangeAndFlatMapReq read, + Snapshot snapshot) { + if (backwards && read.end.offset > 1) { + // FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result + // actually does. + Key key = wait(ryw->tr.getKey(read.end, snapshot)); + if (key > ryw->getMaxReadKey()) + read.end = firstGreaterOrEqual(ryw->getMaxReadKey()); + else + read.end = KeySelector(firstGreaterOrEqual(key), key.arena()); + } + + RangeResult v = wait(ryw->tr.getRangeAndFlatMap( + read.begin, read.end, read.mapper, read.limits, snapshot, backwards ? Reverse::True : Reverse::False)); + KeyRef maxKey = ryw->getMaxReadKey(); + if (v.size() > 0) { + if (!backwards && v[v.size() - 1].key >= maxKey) { + state RangeResult _v = v; + int i = _v.size() - 2; + for (; i >= 0 && _v[i].key >= maxKey; --i) { + } + return RangeResult(RangeResultRef(VectorRef(&_v[0], i + 1), false), _v.arena()); + } + } + + return v; + } + // addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant // conflict range @@ -309,6 +349,15 @@ public: } } ACTOR template + static Future readWithConflictRangeThroughAndFlatMap(ReadYourWritesTransaction* ryw, + Req req, + Snapshot snapshot) { + choose { + when(typename Req::Result result = wait(readThroughAndFlatMap(ryw, req, snapshot))) { return result; } + when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); } + } + } + ACTOR template static Future readWithConflictRangeSnapshot(ReadYourWritesTransaction* ryw, Req req) { state SnapshotCache::iterator it(&ryw->cache, &ryw->writes); choose { @@ -344,6 +393,20 @@ public: return readWithConflictRangeRYW(ryw, req, snapshot); } + template + static inline Future readWithConflictRangeAndFlatMap(ReadYourWritesTransaction* ryw, + Req const& req, + Snapshot snapshot) { + if (ryw->options.readYourWritesDisabled) { + return readWithConflictRangeThroughAndFlatMap(ryw, req, snapshot); + } else if (snapshot && ryw->options.snapshotRywEnabled <= 0) { + TEST(true); // readWithConflictRangeSnapshot not supported for getRangeAndFlatMap + throw client_invalid_operation(); + } + TEST(true); // readWithConflictRangeRYW not supported for getRangeAndFlatMap + throw client_invalid_operation(); + } + template static void resolveKeySelectorFromCache(KeySelector& key, Iter& it, @@ -1509,6 +1572,65 @@ Future ReadYourWritesTransaction::getRange(const KeySelector& begin return getRange(begin, end, GetRangeLimits(limit), snapshot, reverse); } +Future ReadYourWritesTransaction::getRangeAndFlatMap(KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse) { + if (getDatabase()->apiVersionAtLeast(630)) { + if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() && + end.getKey() <= specialKeys.end) { + TEST(true); // Special key space get range (FlatMap) + throw client_invalid_operation(); // Not support special keys. + } + } else { + if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")) { + throw client_invalid_operation(); // Not support special keys. + } + } + + if (checkUsedDuringCommit()) { + return used_during_commit(); + } + + if (resetPromise.isSet()) + return resetPromise.getFuture().getError(); + + KeyRef maxKey = getMaxReadKey(); + if (begin.getKey() > maxKey || end.getKey() > maxKey) + return key_outside_legal_range(); + + // This optimization prevents nullptr operations from being added to the conflict range + if (limits.isReached()) { + TEST(true); // RYW range read limit 0 (FlatMap) + return RangeResult(); + } + + if (!limits.isValid()) + return range_limits_invalid(); + + if (begin.orEqual) + begin.removeOrEqual(begin.arena()); + + if (end.orEqual) + end.removeOrEqual(end.arena()); + + if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) { + TEST(true); // RYW range inverted (FlatMap) + return RangeResult(); + } + + Future result = + reverse ? RYWImpl::readWithConflictRangeAndFlatMap( + this, RYWImpl::GetRangeAndFlatMapReq(begin, end, mapper, limits), snapshot) + : RYWImpl::readWithConflictRangeAndFlatMap( + this, RYWImpl::GetRangeAndFlatMapReq(begin, end, mapper, limits), snapshot); + + reading.add(success(result)); + return result; +} + Future>> ReadYourWritesTransaction::getAddressesForKey(const Key& key) { if (checkUsedDuringCommit()) { return used_during_commit(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 3ac84a7658..19ce5c8775 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -104,6 +104,12 @@ public: snapshot, reverse); } + Future getRangeAndFlatMap(KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Snapshot = Snapshot::False, + Reverse = Reverse::False) override; [[nodiscard]] Future>> getAddressesForKey(const Key& key) override; Future>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) override; diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 44f0ec6e2c..6ee837b8a1 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -644,6 +644,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_STORAGE_COMMIT_TIME, 120.0 ); //The max fsync stall time on the storage server and tlog before marking a disk as failed init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1; init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); + init( QUICK_GET_VALUE_FALLBACK, true ); + init( QUICK_GET_KEY_VALUES_FALLBACK, true ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 6a35065204..b8357535c8 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -585,6 +585,8 @@ public: double MAX_STORAGE_COMMIT_TIME; int64_t RANGESTREAM_LIMIT_BYTES; bool ENABLE_CLEAR_RANGE_EAGER_READS; + bool QUICK_GET_VALUE_FALLBACK; + bool QUICK_GET_KEY_VALUES_FALLBACK; // Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; diff --git a/fdbclient/SimpleConfigTransaction.h b/fdbclient/SimpleConfigTransaction.h index 36423b1515..168b1a6c29 100644 --- a/fdbclient/SimpleConfigTransaction.h +++ b/fdbclient/SimpleConfigTransaction.h @@ -59,6 +59,14 @@ public: GetRangeLimits limits, Snapshot = Snapshot::False, Reverse = Reverse::False) override; + Future getRangeAndFlatMap(KeySelector begin, + KeySelector end, + Key mapper, + GetRangeLimits limits, + Snapshot = Snapshot::False, + Reverse = Reverse::False) override { + throw client_invalid_operation(); + } Future commit() override; Version getCommittedVersion() const override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; diff --git a/fdbclient/StorageServerInterface.cpp b/fdbclient/StorageServerInterface.cpp index e2d403fc2f..11c9f46f56 100644 --- a/fdbclient/StorageServerInterface.cpp +++ b/fdbclient/StorageServerInterface.cpp @@ -152,6 +152,45 @@ void TSS_traceMismatch(TraceEvent& event, .detail("TSSReply", tssResultsString); } +// range reads and flat map +template <> +bool TSS_doCompare(const GetKeyValuesAndFlatMapReply& src, const GetKeyValuesAndFlatMapReply& tss) { + return src.more == tss.more && src.data == tss.data; +} + +template <> +const char* TSS_mismatchTraceName(const GetKeyValuesAndFlatMapRequest& req) { + return "TSSMismatchGetKeyValuesAndFlatMap"; +} + +template <> +void TSS_traceMismatch(TraceEvent& event, + const GetKeyValuesAndFlatMapRequest& req, + const GetKeyValuesAndFlatMapReply& src, + const GetKeyValuesAndFlatMapReply& tss) { + std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : ""); + for (auto& it : src.data) { + ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value); + } + + std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : ""); + for (auto& it : tss.data) { + tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value); + } + event + .detail( + "Begin", + format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset)) + .detail("End", + format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset)) + .detail("Version", req.version) + .detail("Limit", req.limit) + .detail("LimitBytes", req.limitBytes) + .setMaxFieldLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE * 4 / 10) + .detail("SSReply", ssResultsString) + .detail("TSSReply", tssResultsString); +} + // streaming range reads template <> bool TSS_doCompare(const GetKeyValuesStreamReply& src, const GetKeyValuesStreamReply& tss) { @@ -356,6 +395,12 @@ void TSSMetrics::recordLatency(const GetKeyValuesRequest& req, double ssLatency, TSSgetKeyValuesLatency.addSample(tssLatency); } +template <> +void TSSMetrics::recordLatency(const GetKeyValuesAndFlatMapRequest& req, double ssLatency, double tssLatency) { + SSgetKeyValuesAndFlatMapLatency.addSample(ssLatency); + TSSgetKeyValuesAndFlatMapLatency.addSample(tssLatency); +} + template <> void TSSMetrics::recordLatency(const WatchValueRequest& req, double ssLatency, double tssLatency) {} diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 1671abe67d..ba912adbb1 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -22,6 +22,7 @@ #define FDBCLIENT_STORAGESERVERINTERFACE_H #pragma once +#include #include "fdbclient/FDBTypes.h" #include "fdbrpc/Locality.h" #include "fdbrpc/QueueModel.h" @@ -65,6 +66,7 @@ struct StorageServerInterface { // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read RequestStream getKeyValues; + RequestStream getKeyValuesAndFlatMap; RequestStream getShardState; RequestStream waitMetrics; @@ -129,6 +131,8 @@ struct StorageServerInterface { RequestStream(getValue.getEndpoint().getAdjustedEndpoint(15)); changeFeedPop = RequestStream(getValue.getEndpoint().getAdjustedEndpoint(16)); + getKeyValuesAndFlatMap = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(17)); } } else { ASSERT(Ar::isDeserializing); @@ -174,6 +178,7 @@ struct StorageServerInterface { streams.push_back(changeFeedStream.getReceiver()); streams.push_back(overlappingChangeFeeds.getReceiver()); streams.push_back(changeFeedPop.getReceiver()); + streams.push_back(getKeyValuesAndFlatMap.getReceiver(TaskPriority::LoadBalancedEndpoint)); FlowTransport::transport().addEndpoints(streams); } }; @@ -296,6 +301,9 @@ struct GetKeyValuesRequest : TimedRequest { SpanID spanContext; Arena arena; KeySelectorRef begin, end; + // This is a dummy field there has never been used. + // TODO: Get rid of this by constexpr or other template magic in getRange + KeyRef mapper = KeyRef(); Version version; // or latestVersion int limit, limitBytes; bool isFetchKeys; @@ -310,6 +318,43 @@ struct GetKeyValuesRequest : TimedRequest { } }; +struct GetKeyValuesAndFlatMapReply : public LoadBalancedReply { + constexpr static FileIdentifier file_identifier = 1783067; + Arena arena; + VectorRef data; + Version version; // useful when latestVersion was requested + bool more; + bool cached = false; + + GetKeyValuesAndFlatMapReply() : version(invalidVersion), more(false), cached(false) {} + + template + void serialize(Ar& ar) { + serializer(ar, LoadBalancedReply::penalty, LoadBalancedReply::error, data, version, more, cached, arena); + } +}; + +struct GetKeyValuesAndFlatMapRequest : TimedRequest { + constexpr static FileIdentifier file_identifier = 6795747; + SpanID spanContext; + Arena arena; + KeySelectorRef begin, end; + KeyRef mapper; + Version version; // or latestVersion + int limit, limitBytes; + bool isFetchKeys; + Optional tags; + Optional debugID; + ReplyPromise reply; + + GetKeyValuesAndFlatMapRequest() : isFetchKeys(false) {} + template + void serialize(Ar& ar) { + serializer( + ar, begin, end, mapper, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena); + } +}; + struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply { constexpr static FileIdentifier file_identifier = 1783066; Arena arena; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 3810d08191..ace522cac0 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -257,6 +257,23 @@ ThreadFuture ThreadSafeTransaction::getRange(const KeySelectorRef& }); } +ThreadFuture ThreadSafeTransaction::getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) { + KeySelector b = begin; + KeySelector e = end; + Key h = mapper; + + ISingleThreadTransaction* tr = this->tr; + return onMainThread([tr, b, e, h, limits, snapshot, reverse]() -> Future { + tr->checkDeferredError(); + return tr->getRangeAndFlatMap(b, e, h, limits, Snapshot{ snapshot }, Reverse{ reverse }); + }); +} + ThreadFuture>> ThreadSafeTransaction::getAddressesForKey(const KeyRef& key) { Key k = key; diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 75faa67745..85ae27c1fe 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -106,6 +106,12 @@ public: bool reverse = false) override { return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse); } + ThreadFuture getRangeAndFlatMap(const KeySelectorRef& begin, + const KeySelectorRef& end, + const StringRef& mapper, + GetRangeLimits limits, + bool snapshot, + bool reverse) override; ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; diff --git a/fdbrpc/TSSComparison.h b/fdbrpc/TSSComparison.h index af5080af6f..54114c2db8 100644 --- a/fdbrpc/TSSComparison.h +++ b/fdbrpc/TSSComparison.h @@ -51,10 +51,12 @@ struct TSSMetrics : ReferenceCounted, NonCopyable { ContinuousSample SSgetValueLatency; ContinuousSample SSgetKeyLatency; ContinuousSample SSgetKeyValuesLatency; + ContinuousSample SSgetKeyValuesAndFlatMapLatency; ContinuousSample TSSgetValueLatency; ContinuousSample TSSgetKeyLatency; ContinuousSample TSSgetKeyValuesLatency; + ContinuousSample TSSgetKeyValuesAndFlatMapLatency; std::unordered_map ssErrorsByCode; std::unordered_map tssErrorsByCode; @@ -103,7 +105,8 @@ struct TSSMetrics : ReferenceCounted, NonCopyable { : cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc), ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc), mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000), - TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {} + SSgetKeyValuesAndFlatMapLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), + TSSgetKeyValuesLatency(1000), TSSgetKeyValuesAndFlatMapLatency(1000) {} }; template diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index ca7d7d6db5..29f01a1595 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -208,6 +208,7 @@ set(FDBSERVER_SRCS workloads/MemoryLifetime.actor.cpp workloads/MetricLogging.actor.cpp workloads/MutationLogReaderCorrectness.actor.cpp + workloads/IndexPrefetchDemo.actor.cpp workloads/ParallelRestore.actor.cpp workloads/Performance.actor.cpp workloads/Ping.actor.cpp diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c7bb89afc6..35a8ad0eaf 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -42,6 +42,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" #include "fdbclient/StatusClient.h" +#include "fdbclient/Tuple.h" #include "fdbclient/SystemData.h" #include "fdbclient/TransactionLineage.h" #include "fdbclient/VersionedMap.h" @@ -779,8 +780,9 @@ public: struct Counters { CounterCollection cc; - Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeStreamQueries, finishedQueries, - lowPriorityQueries, rowsQueried, bytesQueried, watchQueries, emptyQueries; + Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeAndFlatMapQueries, + getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries, + emptyQueries; // Bytes of the mutations that have been added to the memory of the storage server. When the data is durable // and cleared from the memory, we do not subtract it but add it to bytesDurable. @@ -807,6 +809,10 @@ public: Counter wrongShardServer; Counter fetchedVersions; Counter fetchesFromLogs; + // The following counters measure how many of lookups in the getRangeAndFlatMapQueries are effective. "Miss" + // means fallback if fallback is enabled, otherwise means failure (so that another layer could implement + // fallback). + Counter quickGetValueHit, quickGetValueMiss, quickGetKeyValuesHit, quickGetKeyValuesMiss; LatencySample readLatencySample; LatencyBands readLatencyBands; @@ -814,22 +820,25 @@ public: Counters(StorageServer* self) : cc("StorageServer", self->thisServerID.toString()), allQueries("QueryQueue", cc), getKeyQueries("GetKeyQueries", cc), getValueQueries("GetValueQueries", cc), - getRangeQueries("GetRangeQueries", cc), getRangeStreamQueries("GetRangeStreamQueries", cc), - finishedQueries("FinishedQueries", cc), lowPriorityQueries("LowPriorityQueries", cc), - rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), - emptyQueries("EmptyQueries", cc), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), - bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc), - sampledBytesCleared("SampledBytesCleared", cc), kvFetched("KVFetched", cc), mutations("Mutations", cc), - setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc), - atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc), - updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc), - fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc), - fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc), - wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc), - fetchesFromLogs("FetchesFromLogs", cc), readLatencySample("ReadLatencyMetrics", - self->thisServerID, - SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + getRangeQueries("GetRangeQueries", cc), getRangeAndFlatMapQueries("GetRangeAndFlatMapQueries", cc), + getRangeStreamQueries("GetRangeStreamQueries", cc), finishedQueries("FinishedQueries", cc), + lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc), + bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc), + bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc), + mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc), + kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc), + clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc), + updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc), + fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc), + fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc), + readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc), + fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc), + quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc), + quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), + readLatencySample("ReadLatencyMetrics", + self->thisServerID, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; }); specialCounter(cc, "Version", [self]() { return self->version.get(); }); @@ -1985,6 +1994,37 @@ void merge(Arena& arena, } } +ACTOR Future> quickGetValue(StorageServer* data, StringRef key, Version version) { + if (data->shards[key]->isReadable()) { + try { + // TODO: Use a lower level API may be better? Or tweak priorities? + GetValueRequest req(Span().context, key, version, Optional(), Optional()); + data->actors.add(data->readGuard(req, getValueQ)); + GetValueReply reply = wait(req.reply.getFuture()); + ++data->counters.quickGetValueHit; + return reply.value; + } catch (Error& e) { + // Fallback. + } + } else { + // Fallback. + } + + ++data->counters.quickGetValueMiss; + if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) { + state Transaction tr(data->cx); + tr.setVersion(version); + // TODO: is DefaultPromiseEndpoint the best priority for this? + tr.info.taskID = TaskPriority::DefaultPromiseEndpoint; + Future> valueFuture = tr.get(key, Snapshot::True); + // TODO: async in case it needs to read from other servers. + state Optional valueOption = wait(valueFuture); + return valueOption; + } else { + throw quick_get_value_miss(); + } +}; + // If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending). // readRange has O(|result|) + O(log |data|) cost ACTOR Future readRange(StorageServer* data, @@ -2470,6 +2510,440 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) return Void(); } +ACTOR Future quickGetKeyValues(StorageServer* data, StringRef prefix, Version version) { + try { + // TODO: Use a lower level API may be better? Or tweak priorities? + GetKeyValuesRequest req; + req.spanContext = Span().context; + req.arena = Arena(); + req.begin = firstGreaterOrEqual(KeyRef(req.arena, prefix)); + req.end = firstGreaterOrEqual(strinc(prefix, req.arena)); + req.version = version; + + data->actors.add(data->readGuard(req, getKeyValuesQ)); + GetKeyValuesReply reply = wait(req.reply.getFuture()); + ++data->counters.quickGetKeyValuesHit; + + // Convert GetKeyValuesReply to RangeResult. + return RangeResult(RangeResultRef(reply.data, reply.more), reply.arena); + } catch (Error& e) { + // Fallback. + } + + ++data->counters.quickGetKeyValuesMiss; + if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) { + state Transaction tr(data->cx); + tr.setVersion(version); + // TODO: is DefaultPromiseEndpoint the best priority for this? + tr.info.taskID = TaskPriority::DefaultPromiseEndpoint; + Future rangeResultFuture = tr.getRange(prefixRange(prefix), Snapshot::True); + // TODO: async in case it needs to read from other servers. + RangeResult rangeResult = wait(rangeResultFuture); + return rangeResult; + } else { + throw quick_get_key_values_miss(); + } +}; + +Key constructMappedKey(KeyValueRef* keyValue, Tuple& mappedKeyFormatTuple, bool& isRangeQuery) { + // Lazily parse key and/or value to tuple because they may not need to be a tuple if not used. + Optional keyTuple; + Optional valueTuple; + + Tuple mappedKeyTuple; + for (int i = 0; i < mappedKeyFormatTuple.size(); i++) { + Tuple::ElementType type = mappedKeyFormatTuple.getType(i); + if (type == Tuple::BYTES || type == Tuple::UTF8) { + std::string s = mappedKeyFormatTuple.getString(i).toString(); + auto sz = s.size(); + + // Handle escape. + bool escaped = false; + size_t p = 0; + while (true) { + size_t found = s.find("{{", p); + if (found == std::string::npos) { + break; + } + s.replace(found, 2, "{"); + p += 1; + escaped = true; + } + p = 0; + while (true) { + size_t found = s.find("}}", p); + if (found == std::string::npos) { + break; + } + s.replace(found, 2, "}"); + p += 1; + escaped = true; + } + if (escaped) { + // If the element uses escape, cope the escaped version. + mappedKeyTuple.append(s); + } + // {K[??]} or {V[??]} + else if (sz > 5 && s[0] == '{' && (s[1] == 'K' || s[1] == 'V') && s[2] == '[' && s[sz - 2] == ']' && + s[sz - 1] == '}') { + int idx; + try { + idx = std::stoi(s.substr(3, sz - 5)); + } catch (std::exception& e) { + throw mapper_bad_index(); + } + Tuple* referenceTuple; + if (s[1] == 'K') { + // Use keyTuple as reference. + if (!keyTuple.present()) { + // May throw exception if the key is not parsable as a tuple. + keyTuple = Tuple::unpack(keyValue->key); + } + referenceTuple = &keyTuple.get(); + } else if (s[1] == 'V') { + // Use valueTuple as reference. + if (!valueTuple.present()) { + // May throw exception if the value is not parsable as a tuple. + valueTuple = Tuple::unpack(keyValue->value); + } + referenceTuple = &valueTuple.get(); + } else { + ASSERT(false); + throw internal_error(); + } + + if (idx < 0 || idx >= referenceTuple->size()) { + throw mapper_bad_index(); + } + mappedKeyTuple.append(referenceTuple->subTuple(idx, idx + 1)); + } else if (s == "{...}") { + // Range query. + if (i != mappedKeyFormatTuple.size() - 1) { + // It must be the last element of the mapper tuple + throw mapper_bad_range_decriptor(); + } + // Every record will try to set it. It's ugly, but not wrong. + isRangeQuery = true; + // Do not add it to the mapped key. + } else { + // If the element is a string but neither escaped nor descriptors, just copy it. + mappedKeyTuple.append(mappedKeyFormatTuple.subTuple(i, i + 1)); + } + } else { + // If the element not a string, just copy it. + mappedKeyTuple.append(mappedKeyFormatTuple.subTuple(i, i + 1)); + } + } + return mappedKeyTuple.getDataAsStandalone(); +} + +TEST_CASE("/fdbserver/storageserver/constructMappedKey") { + Key key = Tuple().append("key-0"_sr).append("key-1"_sr).append("key-2"_sr).getDataAsStandalone(); + Value value = Tuple().append("value-0"_sr).append("value-1"_sr).append("value-2"_sr).getDataAsStandalone(); + state KeyValueRef kvr(key, value); + { + Tuple mapperTuple = Tuple() + .append("normal"_sr) + .append("{{escaped}}"_sr) + .append("{K[2]}"_sr) + .append("{V[0]}"_sr) + .append("{...}"_sr); + + bool isRangeQuery = false; + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + + Key expectedMappedKey = Tuple() + .append("normal"_sr) + .append("{escaped}"_sr) + .append("key-2"_sr) + .append("value-0"_sr) + .getDataAsStandalone(); + // std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl; + ASSERT(mappedKey.compare(expectedMappedKey) == 0); + ASSERT(isRangeQuery == true); + } + { + Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr); + + bool isRangeQuery = false; + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + + Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone(); + // std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl; + ASSERT(mappedKey.compare(expectedMappedKey) == 0); + ASSERT(isRangeQuery == false); + } + { + Tuple mapperTuple = Tuple().append("{{{{}}"_sr).append("}}"_sr); + + bool isRangeQuery = false; + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + + Key expectedMappedKey = Tuple().append("{{}"_sr).append("}"_sr).getDataAsStandalone(); + // std::cout << printable(mappedKey) << " == " << printable(expectedMappedKey) << std::endl; + ASSERT(mappedKey.compare(expectedMappedKey) == 0); + ASSERT(isRangeQuery == false); + } + { + Tuple mapperTuple = Tuple().append("{K[100]}"_sr); + bool isRangeQuery = false; + state bool throwException = false; + try { + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + } catch (Error& e) { + ASSERT(e.code() == error_code_mapper_bad_index); + throwException = true; + } + ASSERT(throwException); + } + { + Tuple mapperTuple = Tuple().append("{...}"_sr).append("last-element"_sr); + bool isRangeQuery = false; + state bool throwException2 = false; + try { + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + } catch (Error& e) { + ASSERT(e.code() == error_code_mapper_bad_range_decriptor); + throwException2 = true; + } + ASSERT(throwException2); + } + { + Tuple mapperTuple = Tuple().append("{K[not-a-number]}"_sr); + bool isRangeQuery = false; + state bool throwException3 = false; + try { + Key mappedKey = constructMappedKey(&kvr, mapperTuple, isRangeQuery); + } catch (Error& e) { + ASSERT(e.code() == error_code_mapper_bad_index); + throwException3 = true; + } + ASSERT(throwException3); + } + return Void(); +} + +ACTOR Future flatMap(StorageServer* data, GetKeyValuesReply input, StringRef mapper) { + state GetKeyValuesAndFlatMapReply result; + result.version = input.version; + result.more = input.more; + result.cached = input.cached; + result.arena.dependsOn(input.arena); + + result.data.reserve(result.arena, input.data.size()); + state bool isRangeQuery = false; + state Tuple mappedKeyFormatTuple = Tuple::unpack(mapper); + state KeyValueRef* it = input.data.begin(); + for (; it != input.data.end(); it++) { + state StringRef key = it->key; + + state Key mappedKey = constructMappedKey(it, mappedKeyFormatTuple, isRangeQuery); + // Make sure the mappedKey is always available, so that it's good even we want to get key asynchronously. + result.arena.dependsOn(mappedKey.arena()); + + if (isRangeQuery) { + // Use the mappedKey as the prefix of the range query. + RangeResult rangeResult = wait(quickGetKeyValues(data, mappedKey, input.version)); + + if (rangeResult.more) { + // Probably the fan out is too large. The user should use the old way to query. + throw quick_get_key_values_has_more(); + } + result.arena.dependsOn(rangeResult.arena()); + for (int i = 0; i < rangeResult.size(); i++) { + result.data.emplace_back(result.arena, rangeResult[i].key, rangeResult[i].value); + } + } else { + Optional valueOption = wait(quickGetValue(data, mappedKey, input.version)); + + if (valueOption.present()) { + Value value = valueOption.get(); + result.arena.dependsOn(value.arena()); + result.data.emplace_back(result.arena, mappedKey, value); + } else { + // TODO: Shall we throw exception if the key doesn't exist or the range is empty? + } + } + } + return result; +} + +// Most of the actor is copied from getKeyValuesQ. I tried to use templates but things become nearly impossible after +// combining actor shenanigans with template shenanigans. +ACTOR Future getKeyValuesAndFlatMapQ(StorageServer* data, GetKeyValuesAndFlatMapRequest req) +// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large +// selector offset prevents all data from being read in one range read +{ + state Span span("SS:getKeyValuesAndFlatMap"_loc, { req.spanContext }); + state int64_t resultSize = 0; + state IKeyValueStore::ReadType type = + req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + + ++data->counters.getRangeAndFlatMapQueries; + ++data->counters.allQueries; + ++data->readQueueSizeMetric; + data->maxQueryQueue = std::max( + data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue()); + + // Active load balancing runs at a very high priority (to obtain accurate queue lengths) + // so we need to downgrade here + if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.isFetchKeys) { + wait(delay(0, TaskPriority::FetchKeys)); + } else { + wait(data->getQueryDelay()); + } + + try { + if (req.debugID.present()) + g_traceBatch.addEvent( + "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndFlatMap.Before"); + state Version version = wait(waitForVersion(data, req.version, span.context)); + + state uint64_t changeCounter = data->shardChangeCounter; + // try { + state KeyRange shard = getShardKeyRange(data, req.begin); + + if (req.debugID.present()) + g_traceBatch.addEvent( + "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndFlatMap.AfterVersion"); + //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); + //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", + //"None").detail("In", "getKeyValuesAndFlatMap>getShardKeyRange"); throw e; } + + if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) { + // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", + // shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValuesAndFlatMap>checkShardExtents"); + throw wrong_shard_server(); + } + + state int offset1 = 0; + state int offset2; + state Future fBegin = req.begin.isFirstGreaterOrEqual() + ? Future(req.begin.getKey()) + : findKey(data, req.begin, version, shard, &offset1, span.context, type); + state Future fEnd = req.end.isFirstGreaterOrEqual() + ? Future(req.end.getKey()) + : findKey(data, req.end, version, shard, &offset2, span.context, type); + state Key begin = wait(fBegin); + state Key end = wait(fEnd); + + if (req.debugID.present()) + g_traceBatch.addEvent( + "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndFlatMap.AfterKeys"); + //.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey()); + + // Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query + // An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next shard is the + // end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either + // begin is past end or equal to end (so the result is definitely empty) + if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) { + TEST(true); // wrong_shard_server due to offset in getKeyValuesAndFlatMapQ + // We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, + // and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway + // via its "slow path"), but we would have to add some flags to the response to encode whether we went off + // the beginning and the end, since it needs that information. + //TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValuesAndFlatMap>checkOffsets").detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2); + throw wrong_shard_server(); + } + + if (begin >= end) { + if (req.debugID.present()) + g_traceBatch.addEvent( + "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesAndFlatMap.Send"); + //.detail("Begin",begin).detail("End",end); + + GetKeyValuesAndFlatMapReply none; + none.version = version; + none.more = false; + none.penalty = data->getPenalty(); + + data->checkChangeCounter(changeCounter, + KeyRangeRef(std::min(req.begin.getKey(), req.end.getKey()), + std::max(req.begin.getKey(), req.end.getKey()))); + req.reply.send(none); + } else { + state int remainingLimitBytes = req.limitBytes; + + GetKeyValuesReply _r = wait( + readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context, type)); + + // Map the scanned range to another list of keys and look up. + state GetKeyValuesAndFlatMapReply r = wait(flatMap(data, _r, req.mapper)); + + if (req.debugID.present()) + g_traceBatch.addEvent("TransactionDebug", + req.debugID.get().first(), + "storageserver.getKeyValuesAndFlatMap.AfterReadRange"); + //.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size()); + data->checkChangeCounter( + changeCounter, + KeyRangeRef(std::min(begin, std::min(req.begin.getKey(), req.end.getKey())), + std::max(end, std::max(req.begin.getKey(), req.end.getKey())))); + if (EXPENSIVE_VALIDATION) { + // TODO: Only mapped keys are returned, which are not supposed to be in the range. + // for (int i = 0; i < r.data.size(); i++) + // ASSERT(r.data[i].key >= begin && r.data[i].key < end); + // TODO: GetKeyValuesWithFlatMapRequest doesn't respect limit yet. + // ASSERT(r.data.size() <= std::abs(req.limit)); + } + + /*for( int i = 0; i < r.data.size(); i++ ) { + StorageMetrics m; + m.bytesPerKSecond = r.data[i].expectedSize(); + m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int + data->metrics.notify(r.data[i].key, m); + }*/ + + // For performance concerns, the cost of a range read is billed to the start key and end key of the range. + int64_t totalByteSize = 0; + for (int i = 0; i < r.data.size(); i++) { + totalByteSize += r.data[i].expectedSize(); + } + if (totalByteSize > 0 && SERVER_KNOBS->READ_SAMPLING_ENABLED) { + int64_t bytesReadPerKSecond = std::max(totalByteSize, SERVER_KNOBS->EMPTY_READ_PENALTY) / 2; + data->metrics.notifyBytesReadPerKSecond(r.data[0].key, bytesReadPerKSecond); + data->metrics.notifyBytesReadPerKSecond(r.data[r.data.size() - 1].key, bytesReadPerKSecond); + } + + r.penalty = data->getPenalty(); + req.reply.send(r); + + resultSize = req.limitBytes - remainingLimitBytes; + data->counters.bytesQueried += resultSize; + data->counters.rowsQueried += r.data.size(); + if (r.data.size() == 0) { + ++data->counters.emptyQueries; + } + } + } catch (Error& e) { + if (!canReplyWith(e)) + throw; + data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); + } + + data->transactionTagCounter.addRequest(req.tags, resultSize); + ++data->counters.finishedQueries; + --data->readQueueSizeMetric; + + double duration = g_network->timer() - req.requestTime(); + data->counters.readLatencySample.addMeasurement(duration); + if (data->latencyBandConfig.present()) { + int maxReadBytes = + data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits::max()); + int maxSelectorOffset = + data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits::max()); + data->counters.readLatencyBands.addMeasurement(duration, + resultSize > maxReadBytes || + abs(req.begin.offset) > maxSelectorOffset || + abs(req.end.offset) > maxSelectorOffset); + } + + return Void(); +} + ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRequest req) // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read @@ -5690,6 +6164,20 @@ ACTOR Future serveGetKeyValuesRequests(StorageServer* self, FutureStream serveGetKeyValuesAndFlatMapRequests( + StorageServer* self, + FutureStream getKeyValuesAndFlatMap) { + // TODO: Is it fine to keep TransactionLineage::Operation::GetKeyValues here? + getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyValues; + loop { + GetKeyValuesAndFlatMapRequest req = waitNext(getKeyValuesAndFlatMap); + + // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade + // before doing real work + self->actors.add(self->readGuard(req, getKeyValuesAndFlatMapQ)); + } +} + ACTOR Future serveGetKeyValuesStreamRequests(StorageServer* self, FutureStream getKeyValuesStream) { loop { @@ -5889,6 +6377,7 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(checkBehind(self)); self->actors.add(serveGetValueRequests(self, ssi.getValue.getFuture())); self->actors.add(serveGetKeyValuesRequests(self, ssi.getKeyValues.getFuture())); + self->actors.add(serveGetKeyValuesAndFlatMapRequests(self, ssi.getKeyValuesAndFlatMap.getFuture())); self->actors.add(serveGetKeyValuesStreamRequests(self, ssi.getKeyValuesStream.getFuture())); self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture())); self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture())); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index c42fcff082..1e50036fd8 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1097,6 +1097,7 @@ ACTOR Future storageServerRollbackRebooter(std::set storageServerRollbackRebooter(std::set(), Reference(nullptr)); @@ -1478,6 +1480,7 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.getKeyValueStoreType); DUMPTOKEN(recruited.watchValue); DUMPTOKEN(recruited.getKeyValuesStream); + DUMPTOKEN(recruited.getKeyValuesAndFlatMap); Promise recovery; Future f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord); @@ -1574,6 +1577,7 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.getValue); DUMPTOKEN(recruited.getKey); DUMPTOKEN(recruited.getKeyValues); + DUMPTOKEN(recruited.getKeyValuesAndFlatMap); DUMPTOKEN(recruited.getShardState); DUMPTOKEN(recruited.waitMetrics); DUMPTOKEN(recruited.splitMetrics); @@ -1931,6 +1935,7 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.getKeyValueStoreType); DUMPTOKEN(recruited.watchValue); DUMPTOKEN(recruited.getKeyValuesStream); + DUMPTOKEN(recruited.getKeyValuesAndFlatMap); // printf("Recruited as storageServer\n"); std::string filename = diff --git a/fdbserver/workloads/IndexPrefetchDemo.actor.cpp b/fdbserver/workloads/IndexPrefetchDemo.actor.cpp new file mode 100644 index 0000000000..a614e80462 --- /dev/null +++ b/fdbserver/workloads/IndexPrefetchDemo.actor.cpp @@ -0,0 +1,145 @@ +/* + * IndexPrefetchDemo.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "fdbrpc/simulator.h" +#include "fdbclient/MutationLogReader.actor.h" +#include "fdbclient/Tuple.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/flow.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +const KeyRef prefix = "prefix"_sr; +const KeyRef RECORD = "RECORD"_sr; +const KeyRef INDEX = "INDEX"_sr; + +struct IndexPrefetchDemoWorkload : TestWorkload { + bool enabled; + + IndexPrefetchDemoWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + enabled = !clientId; // only do this on the "first" client + } + + std::string description() const override { return "IndexPrefetchDemo"; } + + Future start(Database const& cx) override { + if (enabled) { + return _start(cx, this); + } + return Void(); + } + + static KeyRef primaryKey(int i) { return KeyRef("primary-key-of-record-" + std::to_string(i)); } + static KeyRef indexKey(int i) { return KeyRef("index-key-of-record-" + std::to_string(i)); } + static KeyRef data(int i) { return KeyRef("data-of-record-" + std::to_string(i)); } + + ACTOR Future fillInRecords(Database cx, int n) { + std::cout << "start fillInRecords n=" << n << std::endl; + // TODO: When n is large, split into multiple transactions. + state Transaction tr(cx); + try { + tr.reset(); + for (int i = 0; i < n; i++) { + tr.set(Tuple().append(prefix).append(RECORD).append(primaryKey(i)).pack(), + Tuple().append(data(i)).pack()); + tr.set(Tuple().append(prefix).append(INDEX).append(indexKey(i)).append(primaryKey(i)).pack(), + Tuple().pack()); + } + wait(tr.commit()); + std::cout << "finished fillInRecords" << std::endl; + } catch (Error& e) { + std::cout << "failed fillInRecords" << std::endl; + wait(tr.onError(e)); + } + return Void(); + } + + static void showResult(const RangeResult& result) { + std::cout << "result size: " << result.size() << std::endl; + const KeyValueRef* it = result.begin(); + for (; it != result.end(); it++) { + std::cout << "key=" << it->key.printable() << ", value=" << it->value.printable() << std::endl; + } + } + + ACTOR Future scanRange(Database cx, KeyRangeRef range) { + std::cout << "start scanRange " << range.toString() << std::endl; + // TODO: When n is large, split into multiple transactions. + state Transaction tr(cx); + try { + tr.reset(); + RangeResult result = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY)); + showResult(result); + } catch (Error& e) { + wait(tr.onError(e)); + } + std::cout << "finished scanRange" << std::endl; + return Void(); + } + + ACTOR Future scanRangeAndFlatMap(Database cx, KeyRange range, Key mapper) { + std::cout << "start scanRangeAndFlatMap " << range.toString() << std::endl; + // TODO: When n is large, split into multiple transactions. + state Transaction tr(cx); + try { + tr.reset(); + RangeResult result = + wait(tr.getRangeAndFlatMap(KeySelector(firstGreaterOrEqual(range.begin), range.arena()), + KeySelector(firstGreaterOrEqual(range.end), range.arena()), + mapper, + GetRangeLimits(CLIENT_KNOBS->TOO_MANY))); + showResult(result); + // result size: 2 + // key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-2\x00, value=\x01data-of-record-2\x00 + // key=\x01prefix\x00\x01RECORD\x00\x01primary-key-of-record-3\x00, value=\x01data-of-record-3\x00 + } catch (Error& e) { + wait(tr.onError(e)); + } + std::cout << "finished scanRangeAndFlatMap" << std::endl; + return Void(); + } + + ACTOR Future _start(Database cx, IndexPrefetchDemoWorkload* self) { + // TODO: Use toml to config + wait(self->fillInRecords(cx, 5)); + + wait(self->scanRange(cx, normalKeys)); + + Key someIndexesBegin = Tuple().append(prefix).append(INDEX).append(indexKey(2)).getDataAsStandalone(); + Key someIndexesEnd = Tuple().append(prefix).append(INDEX).append(indexKey(4)).getDataAsStandalone(); + state KeyRange someIndexes = KeyRangeRef(someIndexesBegin, someIndexesEnd); + wait(self->scanRange(cx, someIndexes)); + + Tuple mapperTuple; + mapperTuple << prefix << RECORD << "{K[3]}"_sr; + Key mapper = mapperTuple.getDataAsStandalone(); + wait(self->scanRangeAndFlatMap(cx, someIndexes, mapper)); + return Void(); + } + + Future check(Database const& cx) override { return true; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory IndexPrefetchDemoWorkloadFactory("IndexPrefetchDemo"); diff --git a/flow/Platform.h b/flow/Platform.h index e07bc1a332..889d2a0b17 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -613,6 +613,7 @@ inline static void flushOutputStreams() { #if defined(_MSC_VER) #define DLLEXPORT __declspec(dllexport) #elif defined(__GNUG__) +#undef DLLEXPORT #define DLLEXPORT __attribute__((visibility("default"))) #else #error Missing symbol export diff --git a/flow/error_definitions.h b/flow/error_definitions.h index e468e46801..5e815c5798 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -159,6 +159,12 @@ ERROR( blocked_from_network_thread, 2026, "Detected a deadlock in a callback cal ERROR( invalid_config_db_range_read, 2027, "Invalid configuration database range read" ) ERROR( invalid_config_db_key, 2028, "Invalid configuration database key provided" ) ERROR( invalid_config_path, 2029, "Invalid configuration path" ) +ERROR( mapper_bad_index, 2030, "The index in K[] or V[] is not a valid number or out of range" ) +ERROR( mapper_no_such_key, 2031, "A mapped key is not set in database" ) +ERROR( mapper_bad_range_decriptor, 2032, "\"{...}\" must be the last element of the mapper tuple" ) +ERROR( quick_get_key_values_has_more, 2033, "One of the mapped range queries is too large" ) +ERROR( quick_get_value_miss, 2034, "Found a mapped key that is not served in the same SS" ) +ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served in the same SS" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" ) ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 22c77e091d..e5f52a2de3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -150,6 +150,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/MemoryLifetime.toml) add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml) add_fdb_test(TEST_FILES fast/MutationLogReaderCorrectness.toml) + add_fdb_test(TEST_FILES fast/IndexPrefetchDemo.toml) add_fdb_test(TEST_FILES fast/ProtocolVersion.toml) add_fdb_test(TEST_FILES fast/RandomSelector.toml) add_fdb_test(TEST_FILES fast/RandomUnitTests.toml) diff --git a/tests/fast/IndexPrefetchDemo.toml b/tests/fast/IndexPrefetchDemo.toml new file mode 100644 index 0000000000..dbdca31f8d --- /dev/null +++ b/tests/fast/IndexPrefetchDemo.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = 'IndexPrefetchDemo' +useDB = true + + [[test.workload]] + testName = 'IndexPrefetchDemo'