From b92c6ea92caf131bd9974b79ea7bbbf68c534743 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 24 Mar 2022 12:42:06 -0500 Subject: [PATCH] granule parallelism, metrics, and other improvements to bg client bindings --- bindings/c/fdb_c.cpp | 3 +- bindings/c/foundationdb/fdb_c.h | 5 +- bindings/c/test/mako/mako.c | 3 +- fdbclient/BlobGranuleFiles.cpp | 113 +++++++++++++------- fdbclient/ClientKnobs.cpp | 3 + fdbclient/ClientKnobs.h | 3 + fdbclient/DatabaseContext.h | 3 +- fdbclient/FDBTypes.h | 3 + fdbclient/MultiVersionTransaction.actor.cpp | 3 +- fdbclient/MultiVersionTransaction.h | 3 + fdbclient/NativeAPI.actor.cpp | 29 +++-- 11 files changed, 123 insertions(+), 48 deletions(-) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 7b6a9f089d..6ab52cd670 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -835,9 +835,10 @@ extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransactio context.get_load_f = granule_context.get_load_f; context.free_load_f = granule_context.free_load_f; context.debugNoMaterialize = granule_context.debugNoMaterialize; + context.granuleParallelism = granule_context.granuleParallelism; Optional rv; - if (readVersion != invalidVersion) { rv = readVersion; } + if (readVersion != latestVersion) { rv = readVersion; } return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr());); } diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 626faabb21..c2d4dfd377 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -196,6 +196,9 @@ typedef struct readgranulecontext { /* Set this to true for testing if you don't want to read the granule files, just do the request to the blob workers */ fdb_bool_t debugNoMaterialize; + + /* Number of granules to load in parallel */ + int granuleParallelism; } FDBReadBlobGranuleContext; DLLEXPORT void fdb_future_cancel(FDBFuture* f); @@ -447,7 +450,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges( uint8_t const* end_key_name, int end_key_name_length); -/* InvalidVersion (-1) for readVersion means get read version from transaction +/* LatestVersion (-2) for readVersion means get read version from transaction Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */ DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* db, uint8_t const* begin_key_name, diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index fc69ae06b5..2f31f55646 100644 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -682,6 +682,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction, granuleContext.get_load_f = &granule_get_load; granuleContext.free_load_f = &granule_free_load; granuleContext.debugNoMaterialize = !doMaterialize; + granuleContext.granuleParallelism = 2; // TODO make knob or setting for changing this? r = fdb_transaction_read_blob_granules(transaction, (uint8_t*)keystr, @@ -689,7 +690,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction, (uint8_t*)keystr2, strlen(keystr2), 0 /* beginVersion*/, - -1, /* endVersion. -1 is use txn read version */ + -2, /* endVersion. -2 (latestVersion) is use txn read version */ granuleContext); free(fileContext.data_by_id); diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index a7e5dda5a3..1fca4d3724 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -18,9 +18,12 @@ * limitations under the License. */ +#include + #include "contrib/fmt-8.1.1/include/fmt/format.h" #include "flow/serialize.h" #include "fdbclient/BlobGranuleFiles.h" +#include "fdbclient/Knobs.h" #include "fdbclient/SystemData.h" // for allKeys unit test - could remove #include "flow/UnitTest.h" @@ -211,50 +214,82 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, return ret; } +struct GranuleLoadIds { + Optional snapshotId; + std::vector deltaIds; +}; + +static void startLoad(const ReadBlobGranuleContext granuleContext, + const BlobGranuleChunkRef& chunk, + GranuleLoadIds& loadIds) { + + // Start load process for all files in chunk + if (chunk.snapshotFile.present()) { + std::string snapshotFname = chunk.snapshotFile.get().filename.toString(); + loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(), + snapshotFname.size(), + chunk.snapshotFile.get().offset, + chunk.snapshotFile.get().length, + granuleContext.userContext); + } + loadIds.deltaIds.reserve(chunk.deltaFiles.size()); + for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { + std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); + int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(), + deltaFName.size(), + chunk.deltaFiles[deltaFileIdx].offset, + chunk.deltaFiles[deltaFileIdx].length, + granuleContext.userContext); + loadIds.deltaIds.push_back(deltaLoadId); + } +} + ErrorOr loadAndMaterializeBlobGranules(const Standalone>& files, const KeyRangeRef& keyRange, Version beginVersion, Version readVersion, ReadBlobGranuleContext granuleContext) { + int64_t parallelism = granuleContext.granuleParallelism; + if (parallelism < 1) { + parallelism = 1; + } + if (parallelism >= CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM) { + parallelism = CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM; + } + + GranuleLoadIds loadIds[files.size()]; + + // Kick off first file reads if parallelism > 1 + for (int i = 0; i < parallelism - 1 && i < files.size(); i++) { + startLoad(granuleContext, files[i], loadIds[i]); + } + try { RangeResult results; - // FIXME: could submit multiple chunks to start_load_f in parallel? - for (const BlobGranuleChunkRef& chunk : files) { + for (int chunkIdx = 0; chunkIdx < files.size(); chunkIdx++) { + // Kick off files for this granule if parallelism == 1, or future granule if parallelism > 1 + if (chunkIdx + parallelism - 1 < files.size()) { + startLoad(granuleContext, files[chunkIdx + parallelism - 1], loadIds[chunkIdx + parallelism - 1]); + } + RangeResult chunkRows; - int64_t snapshotLoadId; - int64_t deltaLoadIds[chunk.deltaFiles.size()]; - - // Start load process for all files in chunk - // In V1 of api snapshot is required, optional is just for forward compatibility - ASSERT(chunk.snapshotFile.present()); - std::string snapshotFname = chunk.snapshotFile.get().filename.toString(); - snapshotLoadId = granuleContext.start_load_f(snapshotFname.c_str(), - snapshotFname.size(), - chunk.snapshotFile.get().offset, - chunk.snapshotFile.get().length, - granuleContext.userContext); - int64_t deltaLoadLengths[chunk.deltaFiles.size()]; - StringRef deltaData[chunk.deltaFiles.size()]; - for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { - std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); - deltaLoadIds[deltaFileIdx] = granuleContext.start_load_f(deltaFName.c_str(), - deltaFName.size(), - chunk.deltaFiles[deltaFileIdx].offset, - chunk.deltaFiles[deltaFileIdx].length, - granuleContext.userContext); - deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length; - } - // once all loads kicked off, load data for chunk - StringRef snapshotData(granuleContext.get_load_f(snapshotLoadId, granuleContext.userContext), - chunk.snapshotFile.get().length); - if (!snapshotData.begin()) { - return ErrorOr(blob_granule_file_load_error()); + Optional snapshotData; + if (files[chunkIdx].snapshotFile.present()) { + snapshotData = + StringRef(granuleContext.get_load_f(loadIds[chunkIdx].snapshotId.get(), granuleContext.userContext), + files[chunkIdx].snapshotFile.get().length); + if (!snapshotData.get().begin()) { + return ErrorOr(blob_granule_file_load_error()); + } } - for (int i = 0; i < chunk.deltaFiles.size(); i++) { - deltaData[i] = StringRef(granuleContext.get_load_f(deltaLoadIds[i], granuleContext.userContext), - chunk.deltaFiles[i].length); + + StringRef deltaData[files[chunkIdx].deltaFiles.size()]; + for (int i = 0; i < files[chunkIdx].deltaFiles.size(); i++) { + deltaData[i] = + StringRef(granuleContext.get_load_f(loadIds[chunkIdx].deltaIds[i], granuleContext.userContext), + files[chunkIdx].deltaFiles[i].length); // null data is error if (!deltaData[i].begin()) { return ErrorOr(blob_granule_file_load_error()); @@ -262,14 +297,16 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone(results); @@ -278,6 +315,8 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, - bytesPerCommit; + bytesPerCommit, bgLatencies, bgGranulesPerRequest; int outstandingWatches; int maxOutstandingWatches; @@ -538,6 +538,7 @@ public: bool transactionTracingSample; double verifyCausalReadsProp = 0.0; bool blobGranuleNoMaterialize = false; + bool anyBlobGranuleRequests = false; Future logger; Future throttleExpirer; diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 14fd1b023b..17c61a79ce 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -1353,6 +1353,9 @@ struct ReadBlobGranuleContext { // Set this to true for testing if you don't want to read the granule files, // just do the request to the blob workers bool debugNoMaterialize; + + // number of granules to load in parallel (default 1) + int granuleParallelism = 1; }; // Store metadata associated with each storage server. Now it only contains data be used in perpetual storage wiggle. diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index a340eb3a5f..82dc7768e2 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -282,8 +282,9 @@ ThreadResult DLTransaction::readBlobGranules(const KeyRangeRef& key context.get_load_f = granuleContext.get_load_f; context.free_load_f = granuleContext.free_load_f; context.debugNoMaterialize = granuleContext.debugNoMaterialize; + context.granuleParallelism = granuleContext.granuleParallelism; - int64_t rv = readVersion.present() ? readVersion.get() : invalidVersion; + int64_t rv = readVersion.present() ? readVersion.get() : latestVersion; FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr, keyRange.begin.begin(), diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index c6b793a047..2bf37ec424 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -107,6 +107,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted { // set this to true for testing if you don't want to read the granule files, just // do the request to the blob workers fdb_bool_t debugNoMaterialize; + + // number of granules to load in parallel (default 1) + int granuleParallelism; } FDBReadBlobGranuleContext; typedef void (*FDBCallback)(FDBFuture* future, void* callback_parameter); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a46c16641d..42fff88e85 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -533,6 +533,14 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { .detail("MedianBytesPerCommit", cx->bytesPerCommit.median()) .detail("MaxBytesPerCommit", cx->bytesPerCommit.max()) .detail("NumLocalityCacheEntries", cx->locationCache.size()); + if (cx->anyBlobGranuleRequests) { + ev.detail("MeanBGLatency", cx->bgLatencies.mean()) + .detail("MedianBGLatency", cx->bgLatencies.median()) + .detail("MaxBGLatency", cx->bgLatencies.max()) + .detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean()) + .detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median()) + .detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max()); + } } cx->latencies.clear(); @@ -541,6 +549,8 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { cx->commitLatencies.clear(); cx->mutationsPerCommit.clear(); cx->bytesPerCommit.clear(); + cx->bgLatencies.clear(); + cx->bgGranulesPerRequest.clear(); lastLogged = now(); } @@ -1353,11 +1363,11 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), + bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), lastGrvTime(0.0), + cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0), + transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), + coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), + detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), specialKeySpace(std::make_unique(specialKeys.begin, specialKeys.end, /* test */ false)), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) { dbId = deterministicRandom()->randomUniqueID(); @@ -1619,7 +1629,8 @@ DatabaseContext::DatabaseContext(const Error& err) transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), - bytesPerCommit(1000), transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), + bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), transactionTracingSample(false), + smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {} // Static constructor used by server processes to create a DatabaseContext @@ -7340,6 +7351,7 @@ ACTOR Future>> readBlobGranulesActor( state Version rv; state Standalone> results; + state double startTime = now(); if (read.present()) { rv = read.get(); @@ -7514,6 +7526,11 @@ ACTOR Future>> readBlobGranulesActor( throw e; } } + + self->trState->cx->anyBlobGranuleRequests = true; + self->trState->cx->bgGranulesPerRequest.addSample(results.size()); + self->trState->cx->bgLatencies.addSample(now() - startTime); + if (readVersionOut != nullptr) { *readVersionOut = rv; }