Reworked multi-version client readBlobGranules to not get stuck on client version changes (#8017)

* Reworked multi-version client readBlobGranules to not get stuck on version changes

* Addressing review comments
This commit is contained in:
Josh Slocum 2022-08-30 02:16:09 -05:00 committed by GitHub
parent ac6889286c
commit 825a58880e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 257 additions and 40 deletions

View File

@ -345,7 +345,6 @@ if(NOT WIN32)
)
set_tests_properties("fdb_c_upgrade_to_future_version" PROPERTIES ENVIRONMENT "${SANITIZER_OPTIONS}")
if (0) # reenable after stabilizing the test
add_test(NAME fdb_c_upgrade_to_future_version_blob_granules
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
@ -354,7 +353,6 @@ if (0) # reenable after stabilizing the test
--blob-granules-enabled
--process-number 3
)
endif()
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)
add_test(NAME fdb_c_upgrade_single_threaded_630api

View File

@ -943,6 +943,57 @@ extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransactio
return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr()););
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_read_blob_granules_start(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
int64_t* readVersionOut) {
Optional<Version> rv;
if (readVersion != latestVersion) {
rv = readVersion;
}
return (FDBFuture*)(TXN(tr)
->readBlobGranulesStart(KeyRangeRef(KeyRef(begin_key_name, begin_key_name_length),
KeyRef(end_key_name, end_key_name_length)),
beginVersion,
rv,
readVersionOut)
.extractPtr());
}
extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules_finish(FDBTransaction* tr,
FDBFuture* f,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
FDBReadBlobGranuleContext* granule_context) {
// FIXME: better way to convert?
ReadBlobGranuleContext context;
context.userContext = granule_context->userContext;
context.start_load_f = granule_context->start_load_f;
context.get_load_f = granule_context->get_load_f;
context.free_load_f = granule_context->free_load_f;
context.debugNoMaterialize = granule_context->debugNoMaterialize;
context.granuleParallelism = granule_context->granuleParallelism;
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture(
TSAV(Standalone<VectorRef<BlobGranuleChunkRef>>, f));
return (FDBResult*)(TXN(tr)
->readBlobGranulesFinish(startFuture,
KeyRangeRef(KeyRef(begin_key_name, begin_key_name_length),
KeyRef(end_key_name, end_key_name_length)),
beginVersion,
readVersion,
context)
.extractPtr());
}
#include "fdb_c_function_pointers.g.h"
#define FDB_API_CHANGED(func, ver) \

View File

@ -51,6 +51,27 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_create_database_from_connection_str
DLLEXPORT void fdb_use_future_protocol_version();
// the logical read_blob_granules is broken out (at different points depending on the client type) into the asynchronous
// start() that happens on the fdb network thread, and synchronous finish() that happens off it
DLLEXPORT FDBFuture* fdb_transaction_read_blob_granules_start(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
int64_t* readVersionOut);
DLLEXPORT FDBResult* fdb_transaction_read_blob_granules_finish(FDBTransaction* tr,
FDBFuture* f,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
FDBReadBlobGranuleContext* granuleContext);
#ifdef __cplusplus
}
#endif

View File

@ -280,10 +280,46 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) {
if (!api->transactionReadBlobGranules) {
return unsupported_operation();
}
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> DLTransaction::readBlobGranulesStart(
const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) {
if (!api->transactionReadBlobGranulesStart) {
return unsupported_operation();
}
int64_t rv = readVersion.present() ? readVersion.get() : latestVersion;
FdbCApi::FDBFuture* f = api->transactionReadBlobGranulesStart(tr,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
beginVersion,
rv,
readVersionOut);
return ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>>(
(ThreadSingleAssignmentVar<Standalone<VectorRef<BlobGranuleChunkRef>>>*)(f));
};
ThreadResult<RangeResult> DLTransaction::readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
if (!api->transactionReadBlobGranulesFinish) {
return unsupported_operation();
}
// convert back to fdb future for API
FdbCApi::FDBFuture* f = (FdbCApi::FDBFuture*)(startFuture.extractPtr());
// FIXME: better way to convert here?
FdbCApi::FDBReadBlobGranuleContext context;
context.userContext = granuleContext.userContext;
@ -293,18 +329,18 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
context.debugNoMaterialize = granuleContext.debugNoMaterialize;
context.granuleParallelism = granuleContext.granuleParallelism;
int64_t rv = readVersion.present() ? readVersion.get() : latestVersion;
FdbCApi::FDBResult* r = api->transactionReadBlobGranulesFinish(tr,
f,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
beginVersion,
readVersion,
&context);
FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
beginVersion,
rv,
context);
return ThreadResult<RangeResult>((ThreadSingleAssignmentVar<RangeResult>*)(r));
}
};
void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
throwIfError(api->transactionAddConflictRange(
@ -812,6 +848,16 @@ void DLApi::init() {
headerVersion >= 710);
loadClientFunction(
&api->transactionReadBlobGranules, lib, fdbCPath, "fdb_transaction_read_blob_granules", headerVersion >= 710);
loadClientFunction(&api->transactionReadBlobGranulesStart,
lib,
fdbCPath,
"fdb_transaction_read_blob_granules_start",
headerVersion >= 720);
loadClientFunction(&api->transactionReadBlobGranulesFinish,
lib,
fdbCPath,
"fdb_transaction_read_blob_granules_finish",
headerVersion >= 720);
loadClientFunction(&api->futureGetInt64,
lib,
fdbCPath,
@ -1165,14 +1211,45 @@ ThreadResult<RangeResult> MultiVersionTransaction::readBlobGranules(const KeyRan
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) {
// FIXME: prevent from calling this from another main thread?
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->readBlobGranules(keyRange, beginVersion, readVersion, granuleContext);
Version readVersionOut;
auto f = tr.transaction->readBlobGranulesStart(keyRange, beginVersion, readVersion, &readVersionOut);
auto abortableF = abortableFuture(f, tr.onChange);
abortableF.blockUntilReadyCheckOnMainThread();
if (abortableF.isError()) {
return ThreadResult<RangeResult>(abortableF.getError());
}
if (granuleContext.debugNoMaterialize) {
return ThreadResult<RangeResult>(blob_granule_not_materialized());
}
return tr.transaction->readBlobGranulesFinish(
abortableF, keyRange, beginVersion, readVersionOut, granuleContext);
} else {
return abortableTimeoutResult<RangeResult>(tr.onChange);
}
}
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> MultiVersionTransaction::readBlobGranulesStart(
const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) {
// can't call this directly
return ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>>(unsupported_operation());
}
ThreadResult<RangeResult> MultiVersionTransaction::readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
// can't call this directly
return ThreadResult<RangeResult>(unsupported_operation());
}
void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
auto tr = getTransaction();
if (tr.transaction) {

View File

@ -400,34 +400,33 @@ ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRange
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) {
// FIXME: prevent from calling this from another main thread!
// This should not be called directly, bypassMultiversionApi should not be set
return ThreadResult<RangeResult>(unsupported_operation());
}
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> ThreadSafeTransaction::readBlobGranulesStart(
const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) {
ISingleThreadTransaction* tr = this->tr;
KeyRange r = keyRange;
int64_t readVersionOut;
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> getFilesFuture = onMainThread(
[tr, r, beginVersion, readVersion, &readVersionOut]() -> Future<Standalone<VectorRef<BlobGranuleChunkRef>>> {
return onMainThread(
[tr, r, beginVersion, readVersion, readVersionOut]() -> Future<Standalone<VectorRef<BlobGranuleChunkRef>>> {
tr->checkDeferredError();
return tr->readBlobGranules(r, beginVersion, readVersion, &readVersionOut);
return tr->readBlobGranules(r, beginVersion, readVersion, readVersionOut);
});
// FIXME: can this safely avoid another main thread jump?
getFilesFuture.blockUntilReadyCheckOnMainThread();
// propagate error to client
if (getFilesFuture.isError()) {
return ThreadResult<RangeResult>(getFilesFuture.getError());
}
Standalone<VectorRef<BlobGranuleChunkRef>> files = getFilesFuture.get();
}
ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
// do this work off of fdb network threads for performance!
if (granule_context.debugNoMaterialize) {
return ThreadResult<RangeResult>(blob_granule_not_materialized());
} else {
return loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersionOut, granule_context);
}
Standalone<VectorRef<BlobGranuleChunkRef>> files = startFuture.get();
return loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersion, granuleContext);
}
void ThreadSafeTransaction::addReadConflictRange(const KeyRangeRef& keys) {

View File

@ -22,6 +22,7 @@
#define FDBCLIENT_ICLIENTAPI_H
#pragma once
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
@ -86,6 +87,19 @@ public:
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) = 0;
virtual ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesStart(
const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) = 0;
virtual ThreadResult<RangeResult> readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) = 0;
virtual void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) = 0;
virtual void set(const KeyRef& key, const ValueRef& value) = 0;
virtual void clear(const KeyRef& begin, const KeyRef& end) = 0;

View File

@ -298,21 +298,39 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int end_key_name_length,
int64_t chunkSize);
FDBFuture* (*transactionGetBlobGranuleRanges)(FDBTransaction* db,
FDBFuture* (*transactionGetBlobGranuleRanges)(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int rangeLimit);
FDBResult* (*transactionReadBlobGranules)(FDBTransaction* db,
FDBResult* (*transactionReadBlobGranules)(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
FDBReadBlobGranuleContext granule_context);
int64_t readVersion);
FDBFuture* (*transactionReadBlobGranulesStart)(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
int64_t* readVersionOut);
FDBResult* (*transactionReadBlobGranulesFinish)(FDBTransaction* tr,
FDBFuture* startFuture,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t beginVersion,
int64_t readVersion,
FDBReadBlobGranuleContext* granule_context);
FDBFuture* (*transactionCommit)(FDBTransaction* tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction* tr, int64_t* outVersion);
@ -411,6 +429,18 @@ public:
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) override;
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesStart(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) override;
ThreadResult<RangeResult> readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) override;
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
@ -616,6 +646,18 @@ public:
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) override;
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesStart(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) override;
ThreadResult<RangeResult> readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
void clear(const KeyRef& begin, const KeyRef& end) override;
@ -681,6 +723,9 @@ private:
template <class T>
ThreadResult<T> abortableTimeoutResult(ThreadFuture<Void> abortSignal);
template <class T>
ThreadResult<T> abortableResult(ThreadResult<T> result, ThreadFuture<Void> abortSignal);
TransactionInfo transaction;
TransactionInfo getTransaction();

View File

@ -164,6 +164,18 @@ public:
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) override;
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesStart(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
Version* readVersionOut) override;
ThreadResult<RangeResult> readBlobGranulesFinish(
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> startFuture,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) override;
void addReadConflictRange(const KeyRangeRef& keys) override;
void makeSelfConflicting();

View File

@ -18,7 +18,7 @@ CLUSTER_ACTIONS = ["wiggle"]
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
TESTER_STATS_INTERVAL_SEC = 5
TRANSACTION_RETRY_LIMIT = 100
TRANSACTION_RETRY_LIMIT = 1000 # TODO change back!!!! after other pr is merged
RUN_WITH_GDB = False