granule parallelism, metrics, and other improvements to bg client bindings

This commit is contained in:
Josh Slocum 2022-03-24 12:42:06 -05:00
parent 017709aec6
commit b92c6ea92c
11 changed files with 123 additions and 48 deletions

View File

@ -835,9 +835,10 @@ extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransactio
context.get_load_f = granule_context.get_load_f;
context.free_load_f = granule_context.free_load_f;
context.debugNoMaterialize = granule_context.debugNoMaterialize;
context.granuleParallelism = granule_context.granuleParallelism;
Optional<Version> rv;
if (readVersion != invalidVersion) { rv = readVersion; }
if (readVersion != latestVersion) { rv = readVersion; }
return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr()););
}

View File

@ -196,6 +196,9 @@ typedef struct readgranulecontext {
/* Set this to true for testing if you don't want to read the granule files,
just do the request to the blob workers */
fdb_bool_t debugNoMaterialize;
/* Number of granules to load in parallel */
int granuleParallelism;
} FDBReadBlobGranuleContext;
DLLEXPORT void fdb_future_cancel(FDBFuture* f);
@ -447,7 +450,7 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(
uint8_t const* end_key_name,
int end_key_name_length);
/* InvalidVersion (-1) for readVersion means get read version from transaction
/* LatestVersion (-2) for readVersion means get read version from transaction
Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */
DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* db,
uint8_t const* begin_key_name,

View File

@ -682,6 +682,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
granuleContext.debugNoMaterialize = !doMaterialize;
granuleContext.granuleParallelism = 2; // TODO make knob or setting for changing this?
r = fdb_transaction_read_blob_granules(transaction,
(uint8_t*)keystr,
@ -689,7 +690,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
(uint8_t*)keystr2,
strlen(keystr2),
0 /* beginVersion*/,
-1, /* endVersion. -1 is use txn read version */
-2, /* endVersion. -2 (latestVersion) is use txn read version */
granuleContext);
free(fileContext.data_by_id);

View File

@ -18,9 +18,12 @@
* limitations under the License.
*/
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "flow/serialize.h"
#include "fdbclient/BlobGranuleFiles.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h" // for allKeys unit test - could remove
#include "flow/UnitTest.h"
@ -211,50 +214,82 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
return ret;
}
struct GranuleLoadIds {
Optional<int64_t> snapshotId;
std::vector<int64_t> deltaIds;
};
static void startLoad(const ReadBlobGranuleContext granuleContext,
const BlobGranuleChunkRef& chunk,
GranuleLoadIds& loadIds) {
// Start load process for all files in chunk
if (chunk.snapshotFile.present()) {
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
granuleContext.userContext);
}
loadIds.deltaIds.reserve(chunk.deltaFiles.size());
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
granuleContext.userContext);
loadIds.deltaIds.push_back(deltaLoadId);
}
}
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
int64_t parallelism = granuleContext.granuleParallelism;
if (parallelism < 1) {
parallelism = 1;
}
if (parallelism >= CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM) {
parallelism = CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM;
}
GranuleLoadIds loadIds[files.size()];
// Kick off first file reads if parallelism > 1
for (int i = 0; i < parallelism - 1 && i < files.size(); i++) {
startLoad(granuleContext, files[i], loadIds[i]);
}
try {
RangeResult results;
// FIXME: could submit multiple chunks to start_load_f in parallel?
for (const BlobGranuleChunkRef& chunk : files) {
for (int chunkIdx = 0; chunkIdx < files.size(); chunkIdx++) {
// Kick off files for this granule if parallelism == 1, or future granule if parallelism > 1
if (chunkIdx + parallelism - 1 < files.size()) {
startLoad(granuleContext, files[chunkIdx + parallelism - 1], loadIds[chunkIdx + parallelism - 1]);
}
RangeResult chunkRows;
int64_t snapshotLoadId;
int64_t deltaLoadIds[chunk.deltaFiles.size()];
// Start load process for all files in chunk
// In V1 of api snapshot is required, optional is just for forward compatibility
ASSERT(chunk.snapshotFile.present());
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
snapshotLoadId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
granuleContext.userContext);
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
StringRef deltaData[chunk.deltaFiles.size()];
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
deltaLoadIds[deltaFileIdx] = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
granuleContext.userContext);
deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
}
// once all loads kicked off, load data for chunk
StringRef snapshotData(granuleContext.get_load_f(snapshotLoadId, granuleContext.userContext),
chunk.snapshotFile.get().length);
if (!snapshotData.begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
Optional<StringRef> snapshotData;
if (files[chunkIdx].snapshotFile.present()) {
snapshotData =
StringRef(granuleContext.get_load_f(loadIds[chunkIdx].snapshotId.get(), granuleContext.userContext),
files[chunkIdx].snapshotFile.get().length);
if (!snapshotData.get().begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
}
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
deltaData[i] = StringRef(granuleContext.get_load_f(deltaLoadIds[i], granuleContext.userContext),
chunk.deltaFiles[i].length);
StringRef deltaData[files[chunkIdx].deltaFiles.size()];
for (int i = 0; i < files[chunkIdx].deltaFiles.size(); i++) {
deltaData[i] =
StringRef(granuleContext.get_load_f(loadIds[chunkIdx].deltaIds[i], granuleContext.userContext),
files[chunkIdx].deltaFiles[i].length);
// null data is error
if (!deltaData[i].begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
@ -262,14 +297,16 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
// materialize rows from chunk
chunkRows = materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
chunkRows = materializeBlobGranule(files[chunkIdx], keyRange, readVersion, snapshotData, deltaData);
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
granuleContext.free_load_f(snapshotLoadId, granuleContext.userContext);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
granuleContext.free_load_f(deltaLoadIds[i], granuleContext.userContext);
if (loadIds[chunkIdx].snapshotId.present()) {
granuleContext.free_load_f(loadIds[chunkIdx].snapshotId.get(), granuleContext.userContext);
}
for (int i = 0; i < loadIds[chunkIdx].deltaIds.size(); i++) {
granuleContext.free_load_f(loadIds[chunkIdx].deltaIds[i], granuleContext.userContext);
}
}
return ErrorOr<RangeResult>(results);
@ -278,6 +315,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
}
// FIXME: add unit tests for materializeGranule and loadAndMaterializeGranules
// FIXME: re-enable test!
TEST_CASE(":/blobgranule/files/applyDelta") {
printf("Testing blob granule delta applying\n");

View File

@ -280,6 +280,9 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 );
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
// Blob granules
init( BG_MAX_GRANULE_PARALLELISM, 10 );
// clang-format on
}

View File

@ -272,6 +272,9 @@ public:
int MVC_CLIENTLIB_CHUNK_SIZE;
int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION;
// Blob Granules
int BG_MAX_GRANULE_PARALLELISM;
ClientKnobs(Randomize randomize);
void initialize(Randomize randomize);
};

View File

@ -514,7 +514,7 @@ public:
Counter transactionGrvTimedOutBatches;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
bytesPerCommit;
bytesPerCommit, bgLatencies, bgGranulesPerRequest;
int outstandingWatches;
int maxOutstandingWatches;
@ -538,6 +538,7 @@ public:
bool transactionTracingSample;
double verifyCausalReadsProp = 0.0;
bool blobGranuleNoMaterialize = false;
bool anyBlobGranuleRequests = false;
Future<Void> logger;
Future<Void> throttleExpirer;

View File

@ -1353,6 +1353,9 @@ struct ReadBlobGranuleContext {
// Set this to true for testing if you don't want to read the granule files,
// just do the request to the blob workers
bool debugNoMaterialize;
// number of granules to load in parallel (default 1)
int granuleParallelism = 1;
};
// Store metadata associated with each storage server. Now it only contains data be used in perpetual storage wiggle.

View File

@ -282,8 +282,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
context.get_load_f = granuleContext.get_load_f;
context.free_load_f = granuleContext.free_load_f;
context.debugNoMaterialize = granuleContext.debugNoMaterialize;
context.granuleParallelism = granuleContext.granuleParallelism;
int64_t rv = readVersion.present() ? readVersion.get() : invalidVersion;
int64_t rv = readVersion.present() ? readVersion.get() : latestVersion;
FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr,
keyRange.begin.begin(),

View File

@ -107,6 +107,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// set this to true for testing if you don't want to read the granule files, just
// do the request to the blob workers
fdb_bool_t debugNoMaterialize;
// number of granules to load in parallel (default 1)
int granuleParallelism;
} FDBReadBlobGranuleContext;
typedef void (*FDBCallback)(FDBFuture* future, void* callback_parameter);

View File

@ -533,6 +533,14 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
.detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
.detail("MaxBytesPerCommit", cx->bytesPerCommit.max())
.detail("NumLocalityCacheEntries", cx->locationCache.size());
if (cx->anyBlobGranuleRequests) {
ev.detail("MeanBGLatency", cx->bgLatencies.mean())
.detail("MedianBGLatency", cx->bgLatencies.median())
.detail("MaxBGLatency", cx->bgLatencies.max())
.detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean())
.detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median())
.detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max());
}
}
cx->latencies.clear();
@ -541,6 +549,8 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
cx->commitLatencies.clear();
cx->mutationsPerCommit.clear();
cx->bytesPerCommit.clear();
cx->bgLatencies.clear();
cx->bgGranulesPerRequest.clear();
lastLogged = now();
}
@ -1353,11 +1363,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0),
lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID),
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), lastGrvTime(0.0),
cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
dbId = deterministicRandom()->randomUniqueID();
@ -1619,7 +1629,8 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
// Static constructor used by server processes to create a DatabaseContext
@ -7340,6 +7351,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
state Version rv;
state Standalone<VectorRef<BlobGranuleChunkRef>> results;
state double startTime = now();
if (read.present()) {
rv = read.get();
@ -7514,6 +7526,11 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
throw e;
}
}
self->trState->cx->anyBlobGranuleRequests = true;
self->trState->cx->bgGranulesPerRequest.addSample(results.size());
self->trState->cx->bgLatencies.addSample(now() - startTime);
if (readVersionOut != nullptr) {
*readVersionOut = rv;
}