Added FDBResult and made readBlobGranules use it

This commit is contained in:
Josh Slocum 2021-12-01 14:43:23 -06:00
parent 329091e14f
commit 7f4fcc8c2c
17 changed files with 269 additions and 101 deletions

View File

@ -35,6 +35,7 @@ int g_api_version = 0;
*
* type mapping:
* FDBFuture -> ThreadSingleAssignmentVarBase
* FDBResult -> ThreadSingleAssignmentVarBase
* FDBDatabase -> IDatabase
* FDBTransaction -> ITransaction
*/
@ -275,6 +276,19 @@ extern "C" DLLEXPORT fdb_error_t fdb_future_get_key_array(FDBFuture* f, FDBKey c
*out_count = na.size(););
}
extern "C" DLLEXPORT void fdb_result_destroy(FDBResult* r) {
CATCH_AND_DIE(TSAVB(r)->cancel(););
}
fdb_error_t fdb_result_get_keyvalue_array(FDBResult* r,
FDBKeyValue const** out_kv,
int* out_count,
fdb_bool_t* out_more) {
CATCH_AND_RETURN(RangeResult rr = TSAV(RangeResult, r)->get(); *out_kv = (FDBKeyValue*)rr.begin();
*out_count = rr.size();
*out_more = rr.more;);
}
FDBFuture* fdb_create_cluster_v609(const char* cluster_file_path) {
char* path;
if (cluster_file_path) {
@ -740,7 +754,7 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTrans
return (FDBFuture*)(TXN(tr)->getBlobGranuleRanges(range).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_transaction_read_blob_granules(FDBTransaction* tr,
extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
@ -763,7 +777,7 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_read_blob_granules(FDBTransactio
rv = readVersion;
}
return (FDBFuture*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr());
return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr());
}
#include "fdb_c_function_pointers.g.h"

View File

@ -37,6 +37,7 @@
#endif
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_transaction FDBTransaction;

View File

@ -65,6 +65,7 @@ extern "C" {
/* Pointers to these opaque types represent objects in the FDB API */
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_transaction FDBTransaction;
@ -180,6 +181,16 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_keyrange_array(FDBFuture
FDBKeyRange const** out_ranges,
int* out_count);
// FDBResult is a synchronous computation result, as opposed to a future that is asynchronous.
DLLEXPORT void fdb_result_destroy(FDBResult* r);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_result_get_keyvalue_array(FDBResult* r,
FDBKeyValue const** out_kv,
int* out_count,
fdb_bool_t* out_more);
// add other return types as we need them
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_create_database(const char* cluster_file_path, FDBDatabase** out_database);
DLLEXPORT void fdb_database_destroy(FDBDatabase* d);
@ -356,7 +367,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(
/* InvalidVersion (-1) for readVersion means get read version from transaction
Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_read_blob_granules(FDBTransaction* db,
DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,

View File

@ -625,7 +625,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
char* keystr2,
bool doMaterialize,
char* bgFilePath) {
FDBFuture* f;
FDBResult* r;
fdb_error_t err;
FDBKeyValue const* out_kv;
int out_count;
@ -651,7 +651,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
granuleContext.free_load_f = &granule_free_load;
granuleContext.debugNoMaterialize = !doMaterialize;
f = fdb_transaction_read_blob_granules(transaction,
r = fdb_transaction_read_blob_granules(transaction,
(uint8_t*)keystr,
strlen(keystr),
(uint8_t*)keystr2,
@ -660,18 +660,22 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
-1, /* endVersion. -1 is use txn read version */
granuleContext);
wait_future(f);
err = fdb_future_get_keyvalue_array(f, &out_kv, &out_count, &out_more);
free(fileContext.data_by_id);
err = fdb_result_get_keyvalue_array(r, &out_kv, &out_count, &out_more);
if (err) {
fprintf(stderr, "ERROR: fdb_future_get_keyvalue_array: %s\n", fdb_get_error(err));
fdb_future_destroy(f);
if (err != 2037 /* blob_granule_not_materialized */) {
fprintf(stderr, "ERROR: fdb_result_get_keyvalue_array: %s\n", fdb_get_error(err));
fdb_result_destroy(r);
return FDB_ERROR_RETRY;
} else {
return FDB_SUCCESS;
}
fdb_future_destroy(f);
}
fdb_result_destroy(r);
return FDB_SUCCESS;
}

View File

@ -90,6 +90,17 @@ void Future::cancel() {
return fdb_future_get_keyvalue_array(future_, out_kv, out_count, out_more);
}
// Result
Result::~Result() {
fdb_result_destroy(result_);
}
// KeyValueArrayResult
[[nodiscard]] fdb_error_t KeyValueArrayResult::get(const FDBKeyValue** out_kv, int* out_count, fdb_bool_t* out_more) {
return fdb_result_get_keyvalue_array(result_, out_kv, out_count, out_more);
}
// Database
Int64Future Database::reboot_worker(FDBDatabase* db,
const uint8_t* address,
@ -281,12 +292,12 @@ KeyRangeArrayFuture Transaction::get_blob_granule_ranges(std::string_view begin_
return KeyRangeArrayFuture(fdb_transaction_get_blob_granule_ranges(
tr_, (const uint8_t*)begin_key.data(), begin_key.size(), (const uint8_t*)end_key.data(), end_key.size()));
}
KeyValueArrayFuture Transaction::read_blob_granules(std::string_view begin_key,
KeyValueArrayResult Transaction::read_blob_granules(std::string_view begin_key,
std::string_view end_key,
int64_t beginVersion,
int64_t readVersion,
FDBReadBlobGranuleContext granuleContext) {
return KeyValueArrayFuture(fdb_transaction_read_blob_granules(tr_,
return KeyValueArrayResult(fdb_transaction_read_blob_granules(tr_,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),

View File

@ -154,6 +154,27 @@ private:
EmptyFuture(FDBFuture* f) : Future(f) {}
};
class Result {
public:
virtual ~Result() = 0;
protected:
Result(FDBResult* r) : result_(r) {}
FDBResult* result_;
};
class KeyValueArrayResult : public Result {
public:
// Call this function instead of fdb_result_get_keyvalue_array when using
// the KeyValueArrayREsult type. It's behavior is identical to
// fdb_result_get_keyvalue_array.
fdb_error_t get(const FDBKeyValue** out_kv, int* out_count, fdb_bool_t* out_more);
private:
friend class Transaction;
KeyValueArrayResult(FDBResult* r) : Result(r) {}
};
// Wrapper around FDBDatabase, providing database-level API
class Database final {
public:
@ -280,7 +301,7 @@ public:
fdb_error_t add_conflict_range(std::string_view begin_key, std::string_view end_key, FDBConflictRangeType type);
KeyRangeArrayFuture get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key);
KeyValueArrayFuture read_blob_granules(std::string_view begin_key,
KeyValueArrayResult read_blob_granules(std::string_view begin_key,
std::string_view end_key,
int64_t beginVersion,
int64_t endVersion,

View File

@ -166,7 +166,7 @@ static Arena loadDeltaFile(StringRef deltaData,
return parseArena;
}
RangeResult materializeBlobGranule(BlobGranuleChunkRef chunk,
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version readVersion,
Optional<StringRef> snapshotData,
@ -210,6 +210,73 @@ RangeResult materializeBlobGranule(BlobGranuleChunkRef chunk,
return ret;
}
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
try {
RangeResult results;
// FIXME: could submit multiple chunks to start_load_f in parallel?
for (const BlobGranuleChunkRef& chunk : files) {
RangeResult chunkRows;
int64_t snapshotLoadId;
int64_t deltaLoadIds[chunk.deltaFiles.size()];
// Start load process for all files in chunk
// In V1 of api snapshot is required, optional is just for forward compatibility
ASSERT(chunk.snapshotFile.present());
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
snapshotLoadId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
granuleContext.userContext);
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
StringRef deltaData[chunk.deltaFiles.size()];
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
deltaLoadIds[deltaFileIdx] = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
granuleContext.userContext);
deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
}
// once all loads kicked off, load data for chunk
StringRef snapshotData(granuleContext.get_load_f(snapshotLoadId, granuleContext.userContext),
chunk.snapshotFile.get().length);
if (!snapshotData.begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
deltaData[i] = StringRef(granuleContext.get_load_f(deltaLoadIds[i], granuleContext.userContext),
chunk.deltaFiles[i].length);
// null data is error
if (!deltaData[i].begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
}
// materialize rows from chunk
chunkRows = materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
granuleContext.free_load_f(snapshotLoadId, granuleContext.userContext);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
granuleContext.free_load_f(deltaLoadIds[i], granuleContext.userContext);
}
}
return ErrorOr<RangeResult>(results);
} catch (Error& e) {
return ErrorOr<RangeResult>(e);
}
}
// FIXME: re-enable test!
TEST_CASE(":/blobgranule/files/applyDelta") {
printf("Testing blob granule delta applying\n");

View File

@ -21,11 +21,17 @@
#ifndef FDBCLIENT_BLOBGRANULEFILES_H
#define FDBCLIENT_BLOBGRANULEFILES_H
// This file contains a single function, for readers who want to materialize blob granules from the underlying files
// This file contains functions for readers who want to materialize blob granules from the underlying files
#include "fdbclient/BlobGranuleCommon.h"
RangeResult materializeBlobGranule(BlobGranuleChunkRef chunk,
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext);
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version readVersion,
Optional<StringRef> snapshotData,

View File

@ -259,6 +259,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
// blob granules
// CHANGE BACK
init( ENABLE_BLOB_GRANULES, false );
// clang-format on

View File

@ -75,7 +75,7 @@ public:
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) = 0;
virtual ThreadFuture<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
virtual ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) = 0;

View File

@ -262,7 +262,7 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLTransaction::getBlobGranuleRa
});
}
ThreadFuture<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) {
@ -280,7 +280,7 @@ ThreadFuture<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
int64_t rv = readVersion.present() ? readVersion.get() : invalidVersion;
FdbCApi::FDBFuture* f = api->transactionReadBlobGranules(tr,
FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
@ -288,16 +288,15 @@ ThreadFuture<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
beginVersion,
rv,
context);
return toThreadFuture<RangeResult>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const FdbCApi::FDBKeyValue* kvs;
int count;
FdbCApi::fdb_bool_t more;
FdbCApi::fdb_error_t error = api->futureGetKeyValueArray(f, &kvs, &count, &more);
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
return RangeResult(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena());
});
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
return ThreadResult<RangeResult>(
RangeResult(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena()));
}
void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
@ -599,6 +598,10 @@ void DLApi::init() {
loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel");
loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy");
loadClientFunction(
&api->resultGetKeyValueArray, lib, fdbCPath, "fdb_result_get_keyvalue_array", headerVersion >= 710);
loadClientFunction(&api->resultDestroy, lib, fdbCPath, "fdb_result_destroy", headerVersion >= 710);
loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database", headerVersion < 610);
loadClientFunction(&api->createCluster, lib, fdbCPath, "fdb_create_cluster", headerVersion < 610);
loadClientFunction(&api->clusterCreateDatabase, lib, fdbCPath, "fdb_cluster_create_database", headerVersion < 610);
@ -890,14 +893,17 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> MultiVersionTransaction::getBlo
return abortableFuture(f, tr.onChange);
}
ThreadFuture<RangeResult> MultiVersionTransaction::readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> MultiVersionTransaction::readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->readBlobGranules(keyRange, beginVersion, readVersion, granuleContext)
: makeTimeout<RangeResult>();
return abortableFuture(f, tr.onChange);
if (tr.transaction) {
return tr.transaction->readBlobGranules(keyRange, beginVersion, readVersion, granuleContext);
} else {
// FIXME: handle abortable future + timeout properly
return ThreadResult<RangeResult>(transaction_cancelled());
}
}
void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {

View File

@ -33,6 +33,7 @@
// All of the required functions loaded from that external library are stored in function pointers in this struct.
struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_cluster FDBCluster;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_transaction FDBTransaction;
@ -190,7 +191,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
uint8_t const* end_key_name,
int end_key_name_length);
FDBFuture* (*transactionReadBlobGranules)(FDBTransaction* db,
FDBResult* (*transactionReadBlobGranules)(FDBTransaction* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
@ -230,6 +231,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
void (*futureCancel)(FDBFuture* f);
void (*futureDestroy)(FDBFuture* f);
fdb_error_t (*resultGetKeyValueArray)(FDBResult* f, FDBKeyValue const** outKV, int* outCount, fdb_bool_t* outMore);
void (*resultDestroy)(FDBResult* f);
// Legacy Support
FDBFuture* (*createCluster)(const char* clusterFilePath);
FDBFuture* (*clusterCreateDatabase)(FDBCluster* cluster, uint8_t* dbName, int dbNameLength);
@ -281,7 +285,7 @@ public:
int64_t chunkSize) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) override;
@ -437,7 +441,7 @@ public:
int64_t chunkSize) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) override;

View File

@ -1696,7 +1696,7 @@ Future<Standalone<VectorRef<BlobGranuleChunkRef>>> ReadYourWritesTransaction::re
ASSERT(begin == 0);
if (!options.readYourWritesDisabled) {
return bg_no_ryw();
return blob_granule_no_ryw();
}
if (checkUsedDuringCommit()) {

View File

@ -296,13 +296,15 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeTransaction::getBlobG
});
}
ThreadFuture<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) {
// In V1 of api this is required, field is just for forward compatibility
ASSERT(beginVersion == 0);
// FIXME: prevent from calling this from another main thread!
bool doMaterialize = !granule_context.debugNoMaterialize;
ISingleThreadTransaction* tr = this->tr;
@ -320,69 +322,16 @@ ThreadFuture<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRange
// propagate error to client
if (getFilesFuture.isError()) {
return ThreadFuture<RangeResult>(getFilesFuture.getError());
return ThreadResult<RangeResult>(getFilesFuture.getError());
}
Standalone<VectorRef<BlobGranuleChunkRef>> files = getFilesFuture.get();
try {
RangeResult results;
// FIXME: could submit multiple chunks to start_load_f in parallel?
for (BlobGranuleChunkRef& chunk : files) {
RangeResult chunkRows;
int64_t snapshotLoadId;
int64_t deltaLoadIds[chunk.deltaFiles.size()];
// FIXME: move to transactions?
// do this work off of fdb network threads for performance!
if (doMaterialize) {
// Start load process for all files in chunk
// In V1 of api snapshot is required, optional is just for forward compatibility
ASSERT(chunk.snapshotFile.present());
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
snapshotLoadId = granule_context.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
granule_context.userContext);
int64_t deltaLoadIds[chunk.deltaFiles.size()];
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
StringRef deltaData[chunk.deltaFiles.size()];
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
deltaLoadIds[deltaFileIdx] = granule_context.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
granule_context.userContext);
deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
}
// once all loads kicked off, load data for chunk
StringRef snapshotData(granule_context.get_load_f(snapshotLoadId, granule_context.userContext),
chunk.snapshotFile.get().length);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
deltaData[i] = StringRef(granule_context.get_load_f(deltaLoadIds[i], granule_context.userContext),
chunk.deltaFiles[i].length);
}
// materialize rows from chunk
chunkRows = materializeBlobGranule(chunk, keyRange, readVersionOut, snapshotData, deltaData);
}
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
if (doMaterialize) {
granule_context.free_load_f(snapshotLoadId, granule_context.userContext);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
granule_context.free_load_f(deltaLoadIds[i], granule_context.userContext);
}
}
}
return results;
} catch (Error& e) {
return ThreadFuture<RangeResult>(e);
return loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersionOut, granule_context);
} else {
return ThreadResult<RangeResult>(blob_granule_not_materialized());
}
}

View File

@ -120,7 +120,7 @@ public:
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
Optional<Version> readVersion,
ReadBlobGranuleContext granuleContext) override;

View File

@ -727,5 +727,76 @@ private:
ThreadSpinLock lock;
};
// Like a future (very similar to ThreadFuture) but only for computations that already completed. Reuses the SAV's
// implementation for memory management error handling though. Essentially a future that's returned from a synchronous
// computation and guaranteed to be complete.
template <class T>
class ThreadResult {
public:
T get() { return sav->get(); }
bool isValid() const { return sav != 0; }
bool isError() { return sav->isError(); }
Error& getError() {
if (!isError())
throw future_not_error();
return sav->error;
}
void cancel() { extractPtr()->cancel(); }
ThreadResult() : sav(0) {}
explicit ThreadResult(ThreadSingleAssignmentVar<T>* sav) : sav(sav) {
// sav->addref();
}
ThreadResult(const ThreadResult<T>& rhs) : sav(rhs.sav) {
if (sav)
sav->addref();
}
ThreadResult(ThreadResult<T>&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
ThreadResult(const T& presentValue) : sav(new ThreadSingleAssignmentVar<T>()) { sav->send(presentValue); }
ThreadResult(const Error& error) : sav(new ThreadSingleAssignmentVar<T>()) { sav->sendError(error); }
ThreadResult(const ErrorOr<T> errorOr) : sav(new ThreadSingleAssignmentVar<T>()) {
if (errorOr.isError()) {
sav->sendError(errorOr.getError());
} else {
sav->send(errorOr.get());
}
}
~ThreadResult() {
if (sav)
sav->delref();
}
void operator=(const ThreadFuture<T>& rhs) {
if (rhs.sav)
rhs.sav->addref();
if (sav)
sav->delref();
sav = rhs.sav;
}
void operator=(ThreadFuture<T>&& rhs) noexcept {
if (sav != rhs.sav) {
if (sav)
sav->delref();
sav = rhs.sav;
rhs.sav = 0;
}
}
bool operator==(const ThreadResult& rhs) { return rhs.sav == sav; }
bool operator!=(const ThreadResult& rhs) { return rhs.sav != sav; }
ThreadSingleAssignmentVarBase* getPtr() const { return sav; }
ThreadSingleAssignmentVarBase* extractPtr() {
auto* p = sav;
sav = nullptr;
return p;
}
private:
ThreadSingleAssignmentVar<T>* sav;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -82,6 +82,7 @@ ERROR( wrong_format_version, 1058, "Format version not recognized" )
ERROR( unknown_change_feed, 1059, "Change feed not found" )
ERROR( change_feed_not_registered, 1060, "Change feed not registered" )
ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob granules" )
ERROR( blob_granule_file_load_error, 1062, "Error loading a blob file during granule materialization" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
@ -165,7 +166,8 @@ ERROR( mapper_bad_range_decriptor, 2032, "\"{...}\" must be the last element of
ERROR( quick_get_key_values_has_more, 2033, "One of the mapped range queries is too large" )
ERROR( quick_get_value_miss, 2034, "Found a mapped key that is not served in the same SS" )
ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served in the same SS" )
ERROR( bg_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )