Merge remote-tracking branch 'origin/main' into change-data-hall

This commit is contained in:
sfc-gh-tclinkenbeard 2022-03-28 14:43:54 -07:00
commit f7f4d0247e
8 changed files with 231 additions and 22 deletions

View File

@ -449,7 +449,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range_split_points(F
int end_key_name_length,
int64_t chunk_size);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTransaction* db,
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
@ -457,7 +457,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(
/* LatestVersion (-2) for readVersion means get read version from transaction
Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */
DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* db,
DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,

View File

@ -20,6 +20,7 @@
// Unit tests for the FoundationDB C API.
#include "fdb_c_options.g.h"
#define FDB_API_VERSION 710
#include <foundationdb/fdb_c.h>
#include <assert.h>
@ -2430,6 +2431,38 @@ TEST_CASE("Tenant create, access, and delete") {
break;
}
while (1) {
StringRef begin = "\xff\xff/management/tenant_map/"_sr;
StringRef end = "\xff\xff/management/tenant_map0"_sr;
fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
fdb::KeyValueArrayFuture f = tr.get_range(FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(begin.begin(), begin.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(end.begin(), end.size()),
/* limit */ 0,
/* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
/* iteration */ 0,
/* snapshot */ false,
/* reverse */ 0);
fdb_error_t err = wait_future(f);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
FDBKeyValue const* outKv;
int outCount;
int outMore;
fdb_check(f.get(&outKv, &outCount, &outMore));
CHECK(outCount == 1);
CHECK(StringRef(outKv->key, outKv->key_length) == StringRef(tenantName).withPrefix(begin));
tr.reset();
break;
}
fdb::Tenant tenant(db, reinterpret_cast<const uint8_t*>(tenantName.c_str()), tenantName.size());
fdb::Transaction tr2(tenant);
@ -2505,6 +2538,152 @@ TEST_CASE("Tenant create, access, and delete") {
}
}
int64_t granule_start_load_fail(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* userContext) {
CHECK(false);
return -1;
}
uint8_t* granule_get_load_fail(int64_t loadId, void* userContext) {
CHECK(false);
return nullptr;
}
void granule_free_load_fail(int64_t loadId, void* userContext) {
CHECK(false);
}
TEST_CASE("Blob Granule Functions") {
auto confValue =
get_value("\xff/conf/blob_granules_enabled", /* snapshot */ false, { FDB_TR_OPTION_READ_SYSTEM_KEYS });
if (!confValue.has_value() || confValue.value() != "1") {
return;
}
// write some data
insert_data(db, create_data({ { "bg1", "a" }, { "bg2", "b" }, { "bg3", "c" } }));
// because wiring up files is non-trivial, just test the calls complete with the expected no_materialize error
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = nullptr;
granuleContext.start_load_f = &granule_start_load_fail;
granuleContext.get_load_f = &granule_get_load_fail;
granuleContext.free_load_f = &granule_free_load_fail;
granuleContext.debugNoMaterialize = true;
granuleContext.granuleParallelism = 1;
// dummy values
FDBKeyValue const* out_kv;
int out_count;
int out_more;
fdb::Transaction tr(db);
int64_t originalReadVersion = -1;
// test no materialize gets error but completes, save read version
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
// -2 is latest version
fdb::KeyValueArrayResult r = tr.read_blob_granules(key("bg"), key("bh"), 0, -2, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
// If read done, save read version. Should have already used read version so this shouldn't error
fdb::Int64Future grvFuture = tr.get_read_version();
fdb_error_t grvErr = wait_future(grvFuture);
CHECK(!grvErr);
CHECK(!grvFuture.get(&originalReadVersion));
CHECK(originalReadVersion > 0);
tr.reset();
break;
}
// test with begin version > 0
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
// -2 is latest version, read version should be >= originalReadVersion
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), originalReadVersion, -2, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
;
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
// test with prior read version completes after delay larger than normal MVC window
// TODO: should we not do this?
std::this_thread::sleep_for(std::chrono::milliseconds(6000));
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), 0, originalReadVersion, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
// test ranges
while (1) {
fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh"));
fdb_error_t err = wait_future(f);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
const FDBKeyRange* out_kr;
int out_count;
fdb_check(f.get(&out_kr, &out_count));
CHECK(out_count >= 1);
// check key ranges are in order
for (int i = 0; i < out_count; i++) {
// key range start < end
CHECK(std::string((const char*)out_kr[i].begin_key, out_kr[i].begin_key_length) <
std::string((const char*)out_kr[i].end_key, out_kr[i].end_key_length));
}
// Ranges themselves are sorted
for (int i = 0; i < out_count - 1; i++) {
CHECK(std::string((const char*)out_kr[i].end_key, out_kr[i].end_key_length) <=
std::string((const char*)out_kr[i + 1].begin_key, out_kr[i + 1].begin_key_length));
}
tr.reset();
break;
}
}
int main(int argc, char** argv) {
if (argc < 3) {
std::cout << "Unit tests for the FoundationDB C API.\n"

View File

@ -238,7 +238,7 @@ ACTOR Future<Void> echoClient() {
return Void();
}
struct SimpleKeyValueStoreInteface {
struct SimpleKeyValueStoreInterface {
constexpr static FileIdentifier file_identifier = 8226647;
RequestStream<struct GetKVInterface> connect;
RequestStream<struct GetRequest> get;
@ -253,7 +253,7 @@ struct SimpleKeyValueStoreInteface {
struct GetKVInterface {
constexpr static FileIdentifier file_identifier = 8062308;
ReplyPromise<SimpleKeyValueStoreInteface> reply;
ReplyPromise<SimpleKeyValueStoreInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
@ -297,7 +297,7 @@ struct ClearRequest {
};
ACTOR Future<Void> kvStoreServer() {
state SimpleKeyValueStoreInteface inf;
state SimpleKeyValueStoreInterface inf;
state std::map<std::string, std::string> store;
inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint);
loop {
@ -333,17 +333,17 @@ ACTOR Future<Void> kvStoreServer() {
}
}
ACTOR Future<SimpleKeyValueStoreInteface> connect() {
ACTOR Future<SimpleKeyValueStoreInterface> connect() {
std::cout << format("%llu: Connect...\n", uint64_t(g_network->now()));
SimpleKeyValueStoreInteface c;
SimpleKeyValueStoreInterface c;
c.connect = RequestStream<GetKVInterface>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER));
SimpleKeyValueStoreInteface result = wait(c.connect.getReply(GetKVInterface()));
SimpleKeyValueStoreInterface result = wait(c.connect.getReply(GetKVInterface()));
std::cout << format("%llu: done..\n", uint64_t(g_network->now()));
return result;
}
ACTOR Future<Void> kvSimpleClient() {
state SimpleKeyValueStoreInteface server = wait(connect());
state SimpleKeyValueStoreInterface server = wait(connect());
std::cout << format("Set %s -> %s\n", "foo", "bar");
SetRequest setRequest;
setRequest.key = "foo";
@ -356,7 +356,7 @@ ACTOR Future<Void> kvSimpleClient() {
return Void();
}
ACTOR Future<Void> kvClient(SimpleKeyValueStoreInteface server, std::shared_ptr<uint64_t> ops) {
ACTOR Future<Void> kvClient(SimpleKeyValueStoreInterface server, std::shared_ptr<uint64_t> ops) {
state Future<Void> timeout = delay(20);
state int rangeSize = 2 << 12;
loop {
@ -397,7 +397,7 @@ ACTOR Future<Void> throughputMeasurement(std::shared_ptr<uint64_t> operations) {
}
ACTOR Future<Void> multipleClients() {
SimpleKeyValueStoreInteface server = wait(connect());
SimpleKeyValueStoreInterface server = wait(connect());
auto ops = std::make_shared<uint64_t>(0);
std::vector<Future<Void>> clients(100);
for (auto& f : clients) {

View File

@ -8707,11 +8707,24 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
results->lastReturnedVersion.set(feedReply.mutations.back().version);
}
if (refresh.canBeSet() && !atLatest && feedReply.atLatestVersion) {
if (!refresh.canBeSet()) {
try {
// refresh is set if and only if this actor is cancelled
wait(Future<Void>(Void()));
// Catch any unexpected behavior if the above contract is broken
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
throw;
}
}
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
}
if (refresh.canBeSet() && feedReply.minStreamVersion > results->storageData[0]->version.get()) {
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
results->storageData[0]->version.set(feedReply.minStreamVersion);
}
}

View File

@ -2704,16 +2704,23 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
}
ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
KeyRangeRef tenantRange =
kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
state KeyRef managementPrefix =
kr.begin.substr(0,
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
TenantMapRangeImpl::submoduleRange.begin.size());
std::map<TenantName, TenantMapEntry> tenants = wait(ManagementAPI::listTenantsTransaction(
&ryw->getTransaction(), tenantRange.begin, tenantRange.end, limitsHint.rows));
kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
TenantNameRef beginTenant = kr.begin.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
TenantNameRef endTenant = kr.end;
if (endTenant.startsWith(TenantMapRangeImpl::submoduleRange.begin)) {
endTenant = endTenant.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
} else {
endTenant = "\xff"_sr;
}
std::map<TenantName, TenantMapEntry> tenants =
wait(ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, limitsHint.rows));
RangeResult results;
for (auto tenant : tenants) {
@ -2783,7 +2790,7 @@ Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransacti
TenantNameRef endTenant = range.end().removePrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
if (endTenant.startsWith(submoduleRange.begin)) {
endTenant = endTenant.removePrefix(submoduleRange.end);
endTenant = endTenant.removePrefix(submoduleRange.begin);
} else {
endTenant = "\xff"_sr;
}

View File

@ -319,9 +319,6 @@ ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRange
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) {
// In V1 of api this is required, field is just for forward compatibility
ASSERT(beginVersion == 0);
// FIXME: prevent from calling this from another main thread!
ISingleThreadTransaction* tr = this->tr;

View File

@ -52,4 +52,7 @@ enum WellKnownEndpoints {
WLTOKEN_RESERVED_COUNT // 23
};
static_assert(WLTOKEN_PROTOCOL_INFO ==
10); // Enforce that the value of this endpoint does not change per comment above.
#endif

View File

@ -10,6 +10,16 @@ endif()
# as soon as we get rid of the old build system
target_link_libraries(fdbmonitor PUBLIC Threads::Threads)
# We don't compile fdbmonitor with thread sanitizer instrumentation, since this
# appears to change its behavior (it no longer seems to restart killed
# processes). fdbmonitor is single-threaded anyway.
get_target_property(fdbmonitor_options fdbmonitor COMPILE_OPTIONS)
list(REMOVE_ITEM fdbmonitor_options "-fsanitize=thread")
set_property(TARGET fdbmonitor PROPERTY COMPILE_OPTIONS ${target_options})
get_target_property(fdbmonitor_options fdbmonitor LINK_OPTIONS)
list(REMOVE_ITEM fdbmonitor_options "-fsanitize=thread")
set_property(TARGET fdbmonitor PROPERTY LINK_OPTIONS ${target_options})
if(GENERATE_DEBUG_PACKAGES)
fdb_install(TARGETS fdbmonitor DESTINATION fdbmonitor COMPONENT server)
else()