diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 2b98d639a1..e6676455f3 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -123,8 +123,8 @@ typedef struct keyrange { typedef struct readgranulecontext { void* userContext; int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); - uint8_t (*get_load_f)(int64_t, void*); - void (*free_load_f)(uint8_t*); + uint8_t* (*get_load_f)(int64_t, void*); + void (*free_load_f)(int64_t, void*); } FDBReadBlobGranuleContext; DLLEXPORT void fdb_future_cancel(FDBFuture* f); diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index 3b1c377763..539b7ac10f 100644 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -541,7 +541,91 @@ int run_op_clearrange(FDBTransaction* transaction, char* keystr, char* keystr2) return FDB_SUCCESS; } -int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2, int64_t readVersion) { +// TODO: could always abstract this into something more generically usable by something other than mako. +// But outside of testing there are likely few use cases for local granules +int MAX_BG_IDS = 1000; +typedef struct { + char* bgFilePath; + int nextId; + uint8_t** data_by_id; +} BGLocalFileContext; + +int64_t granule_start_load(const char* filename, + int filenameLength, + int64_t offset, + int64_t length, + void* userContext) { + FILE* fp; + char full_fname[PATH_MAX]; + int loadId; + uint8_t* data; + size_t readSize; + + BGLocalFileContext* context = (BGLocalFileContext*)userContext; + + loadId = context->nextId; + if (context->data_by_id[loadId] != 0) { + fprintf(stderr, "ERROR: too many granule file loads at once: %d\n", MAX_BG_IDS); + return -1; + } + context->nextId = (context->nextId + 1) % MAX_BG_IDS; + + int ret = snprintf(full_fname, PATH_MAX, "%s%s", context->bgFilePath, filename); + if (ret < 0 || ret >= PATH_MAX) { + fprintf(stderr, "ERROR: BG filename too long: %s%s\n", context->bgFilePath, filename); + return -1; + } + + fp = fopen(full_fname, "r"); + if (!fp) { + fprintf(stderr, "ERROR: BG could not open file: %s\n", full_fname); + return -1; + } + + // don't seek if offset == 0 + if (offset && fseek(fp, offset, SEEK_SET)) { + // if fseek was non-zero, it failed + fprintf(stderr, "ERROR: BG could not seek to %lld in file %s\n", offset, full_fname); + fclose(fp); + return -1; + } + + data = (uint8_t*)malloc(length); + readSize = fread(data, sizeof(uint8_t), length, fp); + fclose(fp); + + if (readSize != length) { + fprintf(stderr, "ERROR: BG could not read %lld bytes from file: %s\n", full_fname); + return -1; + } + + context->data_by_id[loadId] = data; + return loadId; +} + +uint8_t* granule_get_load(int64_t loadId, void* userContext) { + BGLocalFileContext* context = (BGLocalFileContext*)userContext; + if (context->data_by_id[loadId] == 0) { + fprintf(stderr, "ERROR: BG loadId invalid for get_load: %lld\n", loadId); + return 0; + } + return context->data_by_id[loadId]; +} + +void granule_free_load(int64_t loadId, void* userContext) { + BGLocalFileContext* context = (BGLocalFileContext*)userContext; + if (context->data_by_id[loadId] == 0) { + fprintf(stderr, "ERROR: BG loadId invalid for free_load: %lld\n", loadId); + } + free(context->data_by_id[loadId]); + context->data_by_id[loadId] = 0; +} + +int run_op_read_blob_granules(FDBDatabase* database, + char* keystr, + char* keystr2, + int64_t readVersion, + char* bgFilePath) { FDBFuture* f; fdb_error_t err; FDBKeyValue const* out_kv; @@ -549,7 +633,19 @@ int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2 int out_more; // Not used currently! FIXME: fix warning - FDBReadBlobGranuleContext context; + + // Allocate a separate context per call to avoid multiple threads accessing + BGLocalFileContext fileContext; + fileContext.bgFilePath = bgFilePath; + fileContext.nextId = 0; + fileContext.data_by_id = (uint8_t**)malloc(MAX_BG_IDS * sizeof(uint8_t*)); + memset(fileContext.data_by_id, 0, MAX_BG_IDS * sizeof(uint8_t*)); + + FDBReadBlobGranuleContext granuleContext; + granuleContext.userContext = &fileContext; + granuleContext.start_load_f = &granule_start_load; + granuleContext.get_load_f = &granule_get_load; + granuleContext.free_load_f = &granule_free_load; f = fdb_database_read_blob_granules(database, (uint8_t*)keystr, @@ -558,11 +654,14 @@ int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2 strlen(keystr2), 0 /* beginVersion*/, readVersion, - context); + granuleContext); wait_future(f); err = fdb_future_get_keyvalue_array(f, &out_kv, &out_count, &out_more); + + free(fileContext.data_by_id); + if (err) { fprintf(stderr, "ERROR: fdb_future_get_keyvalue_array: %s\n", fdb_get_error(err)); fdb_future_destroy(f); @@ -819,7 +918,7 @@ retryTxn: break; case OP_READ_BG: // Requires that there is an explicit grv before bg - rc = run_op_read_blob_granules(database, keystr, keystr2, readversion); + rc = run_op_read_blob_granules(database, keystr, keystr2, readversion, args->bg_file_path); break; default: fprintf(stderr, "ERROR: Unknown Operation %d\n", i); @@ -1391,8 +1490,9 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi if (args->disable_ryw) { fdb_database_set_option(process.databases[i], FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0); } - // always do not materialize blob granules - fdb_database_set_option(process.databases[i], FDB_DB_OPTION_TEST_BG_NO_MATERIALIZE, (uint8_t*)NULL, 0); + if (!args->bg_materialize_files) { + fdb_database_set_option(process.databases[i], FDB_DB_OPTION_TEST_BG_NO_MATERIALIZE, (uint8_t*)NULL, 0); + } } #endif @@ -1519,6 +1619,8 @@ int init_args(mako_args_t* args) { args->client_threads_per_version = 0; args->disable_ryw = 0; args->json_output_path[0] = '\0'; + args->bg_materialize_files = false; + args->bg_file_path[0] = '\0'; return 0; } @@ -1695,6 +1797,9 @@ void usage() { printf("%-24s %s\n", " --streaming", "Streaming mode: all (default), iterator, small, medium, large, serial"); printf("%-24s %s\n", " --disable_ryw", "Disable snapshot read-your-writes"); printf("%-24s %s\n", " --json_report=PATH", "Output stats to the specified json file (Default: mako.json)"); + printf("%-24s %s\n", + " --bg_file_path=PATH", + "Read blob granule files from the local filesystem at PATH and materialize the results."); } /* parse benchmark paramters */ @@ -1744,6 +1849,7 @@ int parse_args(int argc, char* argv[], mako_args_t* args) { { "client_threads_per_version", required_argument, NULL, ARG_CLIENT_THREADS_PER_VERSION }, { "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW }, { "json_report", optional_argument, NULL, ARG_JSON_REPORT }, + { "bg_file_path", required_argument, NULL, ARG_BG_FILE_PATH }, { NULL, 0, NULL, 0 } }; idx = 0; @@ -1924,6 +2030,12 @@ int parse_args(int argc, char* argv[], mako_args_t* args) { strncpy(args->json_output_path, optarg, strlen(optarg) + 1); } break; + case ARG_BG_FILE_PATH: + // TODO REMOVE + + printf("BG file path arg: %s\n", optarg); + args->bg_materialize_files = true; + strncpy(args->bg_file_path, optarg, strlen(optarg) + 1); } } diff --git a/bindings/c/test/mako/mako.h b/bindings/c/test/mako/mako.h index e6a0d7e35e..be1bfabd9b 100644 --- a/bindings/c/test/mako/mako.h +++ b/bindings/c/test/mako/mako.h @@ -84,7 +84,8 @@ enum Arguments { ARG_STREAMING_MODE, ARG_DISABLE_RYW, ARG_CLIENT_THREADS_PER_VERSION, - ARG_JSON_REPORT + ARG_JSON_REPORT, + ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set }; enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE }; @@ -147,6 +148,8 @@ typedef struct { uint32_t client_threads_per_version; int disable_ryw; char json_output_path[PATH_MAX]; + bool bg_materialize_files; + char bg_file_path[PATH_MAX]; } mako_args_t; /* shared memory */ diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index f9f871245d..19274ead36 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -1181,8 +1181,8 @@ inline bool isValidPerpetualStorageWiggleLocality(std::string locality) { struct ReadBlobGranuleContext { void* userContext; int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); - uint8_t (*get_load_f)(int64_t, void*); - void (*free_load_f)(uint8_t*); + uint8_t* (*get_load_f)(int64_t, void*); + void (*free_load_f)(int64_t, void*); }; #endif diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index c83746ef4b..46dfd258de 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -59,8 +59,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted { typedef struct readgranulecontext { void* userContext; int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); - uint8_t (*get_load_f)(int64_t, void*); - void (*free_load_f)(uint8_t*); + uint8_t* (*get_load_f)(int64_t, void*); + void (*free_load_f)(int64_t, void*); } FDBReadBlobGranuleContext; typedef int fdb_error_t; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 73a49570c8..9e07b93315 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -145,8 +145,6 @@ ThreadFuture ThreadSafeDatabase::readBlobGranules(const KeyRangeRef RangeResult chunkRows; if (!db->blobGranuleNoMaterialize) { - // FIXME: actually implement file loading and materialization! For now just printing stuff out - // Start load process for all files in chunk // In V1 of api snapshot is required, optional is just for forward compatibility ASSERT(chunk.snapshotFile.present()); @@ -156,9 +154,9 @@ ThreadFuture ThreadSafeDatabase::readBlobGranules(const KeyRangeRef chunk.snapshotFile.get().offset, chunk.snapshotFile.get().length, granule_context.userContext); - printf(" S_ID=%lld\n", snapshotLoadId); // TODO REMOVE - int64_t deltaLoadIds[chunk.deltaFiles.size()]; + int64_t deltaLoadLengths[chunk.deltaFiles.size()]; + uint8_t* deltaData[chunk.deltaFiles.size()]; for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); deltaLoadIds[deltaFileIdx] = granule_context.start_load_f(deltaFName.c_str(), @@ -166,7 +164,20 @@ ThreadFuture ThreadSafeDatabase::readBlobGranules(const KeyRangeRef chunk.deltaFiles[deltaFileIdx].offset, chunk.deltaFiles[deltaFileIdx].length, granule_context.userContext); - printf(" D_ID=%lld\n", deltaLoadIds[deltaFileIdx]); // TODO REMOVE + deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length; + } + + // once all loads kicked off, load data for chunk + uint8_t* snapshotData = granule_context.get_load_f(snapshotLoadId, granule_context.userContext); + for (int i = 0; i < chunk.deltaFiles.size(); i++) { + deltaData[i] = granule_context.get_load_f(deltaLoadIds[i], granule_context.userContext); + } + + // FIXME: use bytes from snapshot and delta to materialize chunkRows + + granule_context.free_load_f(snapshotLoadId, granule_context.userContext); + for (int i = 0; i < chunk.deltaFiles.size(); i++) { + granule_context.free_load_f(deltaLoadIds[i], granule_context.userContext); } } diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index c18b294073..4f1b9972c6 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -203,7 +203,7 @@ description is not currently required but encouraged.