Added local granule file reading to mako

This commit is contained in:
Josh Slocum 2021-11-03 09:33:30 -05:00
parent 382882f1c1
commit 5b2617a524
8 changed files with 146 additions and 19 deletions

View File

@ -123,8 +123,8 @@ typedef struct keyrange {
typedef struct readgranulecontext { typedef struct readgranulecontext {
void* userContext; void* userContext;
int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*);
uint8_t (*get_load_f)(int64_t, void*); uint8_t* (*get_load_f)(int64_t, void*);
void (*free_load_f)(uint8_t*); void (*free_load_f)(int64_t, void*);
} FDBReadBlobGranuleContext; } FDBReadBlobGranuleContext;
DLLEXPORT void fdb_future_cancel(FDBFuture* f); DLLEXPORT void fdb_future_cancel(FDBFuture* f);

View File

@ -541,7 +541,91 @@ int run_op_clearrange(FDBTransaction* transaction, char* keystr, char* keystr2)
return FDB_SUCCESS; return FDB_SUCCESS;
} }
int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2, int64_t readVersion) { // TODO: could always abstract this into something more generically usable by something other than mako.
// But outside of testing there are likely few use cases for local granules
int MAX_BG_IDS = 1000;
typedef struct {
char* bgFilePath;
int nextId;
uint8_t** data_by_id;
} BGLocalFileContext;
int64_t granule_start_load(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
void* userContext) {
FILE* fp;
char full_fname[PATH_MAX];
int loadId;
uint8_t* data;
size_t readSize;
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
loadId = context->nextId;
if (context->data_by_id[loadId] != 0) {
fprintf(stderr, "ERROR: too many granule file loads at once: %d\n", MAX_BG_IDS);
return -1;
}
context->nextId = (context->nextId + 1) % MAX_BG_IDS;
int ret = snprintf(full_fname, PATH_MAX, "%s%s", context->bgFilePath, filename);
if (ret < 0 || ret >= PATH_MAX) {
fprintf(stderr, "ERROR: BG filename too long: %s%s\n", context->bgFilePath, filename);
return -1;
}
fp = fopen(full_fname, "r");
if (!fp) {
fprintf(stderr, "ERROR: BG could not open file: %s\n", full_fname);
return -1;
}
// don't seek if offset == 0
if (offset && fseek(fp, offset, SEEK_SET)) {
// if fseek was non-zero, it failed
fprintf(stderr, "ERROR: BG could not seek to %lld in file %s\n", offset, full_fname);
fclose(fp);
return -1;
}
data = (uint8_t*)malloc(length);
readSize = fread(data, sizeof(uint8_t), length, fp);
fclose(fp);
if (readSize != length) {
fprintf(stderr, "ERROR: BG could not read %lld bytes from file: %s\n", full_fname);
return -1;
}
context->data_by_id[loadId] = data;
return loadId;
}
uint8_t* granule_get_load(int64_t loadId, void* userContext) {
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
if (context->data_by_id[loadId] == 0) {
fprintf(stderr, "ERROR: BG loadId invalid for get_load: %lld\n", loadId);
return 0;
}
return context->data_by_id[loadId];
}
void granule_free_load(int64_t loadId, void* userContext) {
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
if (context->data_by_id[loadId] == 0) {
fprintf(stderr, "ERROR: BG loadId invalid for free_load: %lld\n", loadId);
}
free(context->data_by_id[loadId]);
context->data_by_id[loadId] = 0;
}
int run_op_read_blob_granules(FDBDatabase* database,
char* keystr,
char* keystr2,
int64_t readVersion,
char* bgFilePath) {
FDBFuture* f; FDBFuture* f;
fdb_error_t err; fdb_error_t err;
FDBKeyValue const* out_kv; FDBKeyValue const* out_kv;
@ -549,7 +633,19 @@ int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2
int out_more; int out_more;
// Not used currently! FIXME: fix warning // Not used currently! FIXME: fix warning
FDBReadBlobGranuleContext context;
// Allocate a separate context per call to avoid multiple threads accessing
BGLocalFileContext fileContext;
fileContext.bgFilePath = bgFilePath;
fileContext.nextId = 0;
fileContext.data_by_id = (uint8_t**)malloc(MAX_BG_IDS * sizeof(uint8_t*));
memset(fileContext.data_by_id, 0, MAX_BG_IDS * sizeof(uint8_t*));
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = &fileContext;
granuleContext.start_load_f = &granule_start_load;
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
f = fdb_database_read_blob_granules(database, f = fdb_database_read_blob_granules(database,
(uint8_t*)keystr, (uint8_t*)keystr,
@ -558,11 +654,14 @@ int run_op_read_blob_granules(FDBDatabase* database, char* keystr, char* keystr2
strlen(keystr2), strlen(keystr2),
0 /* beginVersion*/, 0 /* beginVersion*/,
readVersion, readVersion,
context); granuleContext);
wait_future(f); wait_future(f);
err = fdb_future_get_keyvalue_array(f, &out_kv, &out_count, &out_more); err = fdb_future_get_keyvalue_array(f, &out_kv, &out_count, &out_more);
free(fileContext.data_by_id);
if (err) { if (err) {
fprintf(stderr, "ERROR: fdb_future_get_keyvalue_array: %s\n", fdb_get_error(err)); fprintf(stderr, "ERROR: fdb_future_get_keyvalue_array: %s\n", fdb_get_error(err));
fdb_future_destroy(f); fdb_future_destroy(f);
@ -819,7 +918,7 @@ retryTxn:
break; break;
case OP_READ_BG: case OP_READ_BG:
// Requires that there is an explicit grv before bg // Requires that there is an explicit grv before bg
rc = run_op_read_blob_granules(database, keystr, keystr2, readversion); rc = run_op_read_blob_granules(database, keystr, keystr2, readversion, args->bg_file_path);
break; break;
default: default:
fprintf(stderr, "ERROR: Unknown Operation %d\n", i); fprintf(stderr, "ERROR: Unknown Operation %d\n", i);
@ -1391,8 +1490,9 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
if (args->disable_ryw) { if (args->disable_ryw) {
fdb_database_set_option(process.databases[i], FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0); fdb_database_set_option(process.databases[i], FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0);
} }
// always do not materialize blob granules if (!args->bg_materialize_files) {
fdb_database_set_option(process.databases[i], FDB_DB_OPTION_TEST_BG_NO_MATERIALIZE, (uint8_t*)NULL, 0); fdb_database_set_option(process.databases[i], FDB_DB_OPTION_TEST_BG_NO_MATERIALIZE, (uint8_t*)NULL, 0);
}
} }
#endif #endif
@ -1519,6 +1619,8 @@ int init_args(mako_args_t* args) {
args->client_threads_per_version = 0; args->client_threads_per_version = 0;
args->disable_ryw = 0; args->disable_ryw = 0;
args->json_output_path[0] = '\0'; args->json_output_path[0] = '\0';
args->bg_materialize_files = false;
args->bg_file_path[0] = '\0';
return 0; return 0;
} }
@ -1695,6 +1797,9 @@ void usage() {
printf("%-24s %s\n", " --streaming", "Streaming mode: all (default), iterator, small, medium, large, serial"); printf("%-24s %s\n", " --streaming", "Streaming mode: all (default), iterator, small, medium, large, serial");
printf("%-24s %s\n", " --disable_ryw", "Disable snapshot read-your-writes"); printf("%-24s %s\n", " --disable_ryw", "Disable snapshot read-your-writes");
printf("%-24s %s\n", " --json_report=PATH", "Output stats to the specified json file (Default: mako.json)"); printf("%-24s %s\n", " --json_report=PATH", "Output stats to the specified json file (Default: mako.json)");
printf("%-24s %s\n",
" --bg_file_path=PATH",
"Read blob granule files from the local filesystem at PATH and materialize the results.");
} }
/* parse benchmark paramters */ /* parse benchmark paramters */
@ -1744,6 +1849,7 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
{ "client_threads_per_version", required_argument, NULL, ARG_CLIENT_THREADS_PER_VERSION }, { "client_threads_per_version", required_argument, NULL, ARG_CLIENT_THREADS_PER_VERSION },
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW }, { "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
{ "json_report", optional_argument, NULL, ARG_JSON_REPORT }, { "json_report", optional_argument, NULL, ARG_JSON_REPORT },
{ "bg_file_path", required_argument, NULL, ARG_BG_FILE_PATH },
{ NULL, 0, NULL, 0 } { NULL, 0, NULL, 0 }
}; };
idx = 0; idx = 0;
@ -1924,6 +2030,12 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
strncpy(args->json_output_path, optarg, strlen(optarg) + 1); strncpy(args->json_output_path, optarg, strlen(optarg) + 1);
} }
break; break;
case ARG_BG_FILE_PATH:
// TODO REMOVE
printf("BG file path arg: %s\n", optarg);
args->bg_materialize_files = true;
strncpy(args->bg_file_path, optarg, strlen(optarg) + 1);
} }
} }

View File

@ -84,7 +84,8 @@ enum Arguments {
ARG_STREAMING_MODE, ARG_STREAMING_MODE,
ARG_DISABLE_RYW, ARG_DISABLE_RYW,
ARG_CLIENT_THREADS_PER_VERSION, ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT ARG_JSON_REPORT,
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
}; };
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE }; enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
@ -147,6 +148,8 @@ typedef struct {
uint32_t client_threads_per_version; uint32_t client_threads_per_version;
int disable_ryw; int disable_ryw;
char json_output_path[PATH_MAX]; char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
} mako_args_t; } mako_args_t;
/* shared memory */ /* shared memory */

View File

@ -1181,8 +1181,8 @@ inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
struct ReadBlobGranuleContext { struct ReadBlobGranuleContext {
void* userContext; void* userContext;
int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*);
uint8_t (*get_load_f)(int64_t, void*); uint8_t* (*get_load_f)(int64_t, void*);
void (*free_load_f)(uint8_t*); void (*free_load_f)(int64_t, void*);
}; };
#endif #endif

View File

@ -59,8 +59,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct readgranulecontext { typedef struct readgranulecontext {
void* userContext; void* userContext;
int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*); int64_t (*start_load_f)(const char*, int, int64_t, int64_t, void*);
uint8_t (*get_load_f)(int64_t, void*); uint8_t* (*get_load_f)(int64_t, void*);
void (*free_load_f)(uint8_t*); void (*free_load_f)(int64_t, void*);
} FDBReadBlobGranuleContext; } FDBReadBlobGranuleContext;
typedef int fdb_error_t; typedef int fdb_error_t;

View File

@ -145,8 +145,6 @@ ThreadFuture<RangeResult> ThreadSafeDatabase::readBlobGranules(const KeyRangeRef
RangeResult chunkRows; RangeResult chunkRows;
if (!db->blobGranuleNoMaterialize) { if (!db->blobGranuleNoMaterialize) {
// FIXME: actually implement file loading and materialization! For now just printing stuff out
// Start load process for all files in chunk // Start load process for all files in chunk
// In V1 of api snapshot is required, optional is just for forward compatibility // In V1 of api snapshot is required, optional is just for forward compatibility
ASSERT(chunk.snapshotFile.present()); ASSERT(chunk.snapshotFile.present());
@ -156,9 +154,9 @@ ThreadFuture<RangeResult> ThreadSafeDatabase::readBlobGranules(const KeyRangeRef
chunk.snapshotFile.get().offset, chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length, chunk.snapshotFile.get().length,
granule_context.userContext); granule_context.userContext);
printf(" S_ID=%lld\n", snapshotLoadId); // TODO REMOVE
int64_t deltaLoadIds[chunk.deltaFiles.size()]; int64_t deltaLoadIds[chunk.deltaFiles.size()];
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
uint8_t* deltaData[chunk.deltaFiles.size()];
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
deltaLoadIds[deltaFileIdx] = granule_context.start_load_f(deltaFName.c_str(), deltaLoadIds[deltaFileIdx] = granule_context.start_load_f(deltaFName.c_str(),
@ -166,7 +164,20 @@ ThreadFuture<RangeResult> ThreadSafeDatabase::readBlobGranules(const KeyRangeRef
chunk.deltaFiles[deltaFileIdx].offset, chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length, chunk.deltaFiles[deltaFileIdx].length,
granule_context.userContext); granule_context.userContext);
printf(" D_ID=%lld\n", deltaLoadIds[deltaFileIdx]); // TODO REMOVE deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
}
// once all loads kicked off, load data for chunk
uint8_t* snapshotData = granule_context.get_load_f(snapshotLoadId, granule_context.userContext);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
deltaData[i] = granule_context.get_load_f(deltaLoadIds[i], granule_context.userContext);
}
// FIXME: use bytes from snapshot and delta to materialize chunkRows
granule_context.free_load_f(snapshotLoadId, granule_context.userContext);
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
granule_context.free_load_f(deltaLoadIds[i], granule_context.userContext);
} }
} }

View File

@ -203,7 +203,7 @@ description is not currently required but encouraged.
<Option name="test_causal_read_risky" code="900" <Option name="test_causal_read_risky" code="900"
description="An integer between 0 and 100 (default is 0) expressing the probability that a client will verify it can't read stale data whenever it detects a recovery." /> description="An integer between 0 and 100 (default is 0) expressing the probability that a client will verify it can't read stale data whenever it detects a recovery." />
<Option name="test_bg_no_materialize" code="901" <Option name="test_bg_no_materialize" code="901"
description="An integer between 0 and 100 (default is 0) expressing the probability that a client will verify it can't read stale data whenever it detects a recovery." /> description="Blob Granule reads do not actually materialize the result. Used for testing the server-side load of blob granule reading without the client side." />
</Scope> </Scope>
<Scope name="TransactionOption"> <Scope name="TransactionOption">

View File

@ -1822,6 +1822,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
} }
state Reference<GranuleMetadata> metadata = m; state Reference<GranuleMetadata> metadata = m;
// don't do 'if (canBeSet)'
if (metadata->readable.canBeSet()) { if (metadata->readable.canBeSet()) {
wait(metadata->readable.getFuture()); wait(metadata->readable.getFuture());
} }