Merge pull request #6900 from sfc-gh-jslocum/bg_client_api_tests

Blob granule client API tests
This commit is contained in:
Vaidas Gasiunas 2022-04-22 09:21:31 +02:00 committed by GitHub
commit caaf43da12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 562 additions and 87 deletions

View File

@ -106,6 +106,7 @@ if(NOT WIN32)
test/apitester/TesterApiWrapper.h
test/apitester/TesterTestSpec.cpp
test/apitester/TesterTestSpec.h
test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp
test/apitester/TesterCancelTransactionWorkload.cpp
test/apitester/TesterCorrectnessWorkload.cpp
test/apitester/TesterKeyValueStore.cpp
@ -253,6 +254,23 @@ endif()
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
)
add_fdbclient_test(
NAME fdb_c_api_tests_blob_granule
DISABLE_LOG_DUMP
API_TEST_BLOB_GRANULES_ENABLED
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/blobgranuletests
--blob-granule-local-file-path
@DATA_DIR@/fdbblob/
)
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)
add_test(NAME fdb_c_upgrade_single_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py

View File

@ -183,4 +183,63 @@ void ApiWorkload::populateData(TTaskFct cont) {
}
}
void ApiWorkload::randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void ApiWorkload::randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void ApiWorkload::randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
} // namespace FdbApiTester

View File

@ -114,6 +114,11 @@ protected:
// Clear the data of the workload
void clearData(TTaskFct cont);
// common operations
void randomInsertOp(TTaskFct cont);
void randomClearOp(TTaskFct cont);
void randomClearRangeOp(TTaskFct cont);
private:
void populateDataTx(TTaskFct cont);

View File

@ -18,9 +18,9 @@
* limitations under the License.
*/
#include "TesterApiWrapper.h"
#include "TesterUtil.h"
#include <cstdint>
#include <fmt/format.h>
#include <fstream>
namespace FdbApiTester {
@ -60,6 +60,56 @@ std::optional<std::string> ValueFuture::getValue() const {
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
}
std::vector<KeyValue> KeyRangesFuture::getKeyRanges() const {
ASSERT(future_);
int count;
const FDBKeyRange* ranges;
fdb_check(fdb_future_get_keyrange_array(future_.get(), &ranges, &count));
std::vector<KeyValue> result;
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyRange kr = *ranges++;
KeyValue rkv;
rkv.key = std::string((const char*)kr.begin_key, kr.begin_key_length);
rkv.value = std::string((const char*)kr.end_key, kr.end_key_length);
result.push_back(rkv);
}
return result;
}
Result::Result(FDBResult* r) : result_(r, fdb_result_destroy) {}
std::vector<KeyValue> KeyValuesResult::getKeyValues(bool* more_out) {
ASSERT(result_);
int count;
const FDBKeyValue* kvs;
int more;
std::vector<KeyValue> result;
error_ = fdb_result_get_keyvalue_array(result_.get(), &kvs, &count, &more);
if (error_ != error_code_success) {
return result;
}
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyValue kv = *kvs++;
KeyValue rkv;
rkv.key = std::string((const char*)kv.key, kv.key_length);
rkv.value = std::string((const char*)kv.value, kv.value_length);
result.push_back(rkv);
}
*more_out = more;
return result;
}
// Given an FDBDatabase, initializes a new transaction.
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
@ -109,6 +159,87 @@ fdb_error_t Transaction::setOption(FDBTransactionOption option) {
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
}
class TesterGranuleContext {
public:
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
int64_t nextId = 0;
std::string basePath;
~TesterGranuleContext() {
// if there was an error or not all loads finished, delete data
for (auto& it : loadsInProgress) {
uint8_t* dataToFree = it.second;
delete dataToFree;
}
}
};
static int64_t granule_start_load(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
int64_t loadId = ctx->nextId++;
uint8_t* buffer = new uint8_t[length];
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
fin.seekg(offset);
fin.read((char*)buffer, length);
ctx->loadsInProgress.insert({ loadId, buffer });
return loadId;
}
static uint8_t* granule_get_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
return ctx->loadsInProgress.at(loadId);
}
static void granule_free_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
auto it = ctx->loadsInProgress.find(loadId);
uint8_t* dataToFree = it->second;
delete dataToFree;
ctx->loadsInProgress.erase(it);
}
KeyValuesResult Transaction::readBlobGranules(std::string_view begin,
std::string_view end,
const std::string& basePath) {
ASSERT(tx_);
TesterGranuleContext testerContext;
testerContext.basePath = basePath;
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = &testerContext;
granuleContext.debugNoMaterialize = false;
granuleContext.granuleParallelism = 1;
granuleContext.start_load_f = &granule_start_load;
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
return KeyValuesResult(fdb_transaction_read_blob_granules(tx_.get(),
(const uint8_t*)begin.data(),
begin.size(),
(const uint8_t*)end.data(),
end.size(),
0 /* beginVersion */,
-2 /* latest read version */,
granuleContext));
}
KeyRangesFuture Transaction::getBlobGranuleRanges(std::string_view begin, std::string_view end) {
ASSERT(tx_);
return KeyRangesFuture(fdb_transaction_get_blob_granule_ranges(
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size()));
}
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}

View File

@ -26,6 +26,7 @@
#include <string_view>
#include <optional>
#include <memory>
#include <unordered_map>
#define FDB_API_VERSION 720
#include "bindings/c/foundationdb/fdb_c.h"
@ -35,6 +36,8 @@
#include "flow/error_definitions.h"
#include "TesterUtil.h"
namespace FdbApiTester {
// Wrapper parent class to manage memory of an FDBFuture pointer. Cleans up
@ -62,6 +65,37 @@ public:
std::optional<std::string> getValue() const;
};
class KeyRangesFuture : public Future {
public:
KeyRangesFuture() = default;
KeyRangesFuture(FDBFuture* f) : Future(f) {}
std::vector<KeyValue> getKeyRanges() const;
};
class Result {
public:
Result() = default;
Result(FDBResult* r);
FDBResult* fdbResult() { return result_.get(); };
fdb_error_t getError() const { return error_; }
explicit operator bool() const { return result_ != nullptr; };
fdb_error_t error_ = error_code_client_invalid_operation; // have to call getX function to set this
protected:
std::shared_ptr<FDBResult> result_;
};
class KeyValuesResult : public Result {
public:
KeyValuesResult() = default;
KeyValuesResult(FDBResult* f) : Result(f) {}
std::vector<KeyValue> getKeyValues(bool* more_out);
};
class Transaction {
public:
Transaction() = default;
@ -76,6 +110,9 @@ public:
void reset();
fdb_error_t setOption(FDBTransactionOption option);
KeyValuesResult readBlobGranules(std::string_view begin, std::string_view end, const std::string& basePath);
KeyRangesFuture getBlobGranuleRanges(std::string_view begin, std::string_view end);
private:
std::shared_ptr<FDBTransaction> tx_;
};

View File

@ -0,0 +1,159 @@
/*
* TesterBlobGranuleCorrectnessWorkload.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TesterApiWorkload.h"
#include "TesterUtil.h"
#include <memory>
#include <fmt/format.h>
namespace FdbApiTester {
class ApiBlobGranuleCorrectnessWorkload : public ApiWorkload {
public:
ApiBlobGranuleCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
// sometimes don't do range clears
if (Random::get().randomInt(0, 1) == 0) {
excludedOpTypes.push_back(OP_CLEAR_RANGE);
}
}
private:
enum OpType { OP_INSERT, OP_CLEAR, OP_CLEAR_RANGE, OP_READ, OP_GET_RANGES, OP_LAST = OP_GET_RANGES };
std::vector<OpType> excludedOpTypes;
void randomReadOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
ctx->tx()->setOption(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
KeyValuesResult res = ctx->tx()->readBlobGranules(begin, end, ctx->getBGBasePath());
bool more;
(*results) = res.getKeyValues(&more);
ASSERT(!more);
if (res.getError() != error_code_success) {
ctx->onError(res.getError());
} else {
ctx->done();
}
},
[this, begin, end, results, cont]() {
std::vector<KeyValue> expected = store.getRange(begin, end, store.size(), false);
if (results->size() != expected.size()) {
error(fmt::format("randomReadOp result size mismatch. expected: {} actual: {}",
expected.size(),
results->size()));
}
ASSERT(results->size() == expected.size());
for (int i = 0; i < results->size(); i++) {
if ((*results)[i].key != expected[i].key) {
error(fmt::format("randomReadOp key mismatch at {}/{}. expected: {} actual: {}",
i,
results->size(),
expected[i].key,
(*results)[i].key));
}
ASSERT((*results)[i].key == expected[i].key);
if ((*results)[i].value != expected[i].value) {
error(
fmt::format("randomReadOp value mismatch at {}/{}. key: {} expected: {:.80} actual: {:.80}",
i,
results->size(),
expected[i].key,
expected[i].value,
(*results)[i].value));
}
ASSERT((*results)[i].value == expected[i].value);
}
schedule(cont);
});
}
void randomGetRangesOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
KeyRangesFuture f = ctx->tx()->getBlobGranuleRanges(begin, end);
ctx->continueAfter(
f,
[ctx, f, results]() {
(*results) = f.getKeyRanges();
ctx->done();
},
true);
},
[this, begin, end, results, cont]() {
ASSERT(results->size() > 0);
ASSERT(results->front().key <= begin);
ASSERT(results->back().value >= end);
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
ASSERT((*results)[i].key < (*results)[i].value);
}
for (int i = 1; i < results->size(); i++) {
// ranges contain entire requested key range
ASSERT((*results)[i].key == (*results)[i - 1].value);
}
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
while (std::count(excludedOpTypes.begin(), excludedOpTypes.end(), txType)) {
txType = (OpType)Random::get().randomInt(0, OP_LAST);
}
switch (txType) {
case OP_INSERT:
randomInsertOp(cont);
break;
case OP_CLEAR:
randomClearOp(cont);
break;
case OP_CLEAR_RANGE:
randomClearRangeOp(cont);
break;
case OP_READ:
randomReadOp(cont);
break;
case OP_GET_RANGES:
randomGetRangesOp(cont);
break;
}
}
};
WorkloadFactory<ApiBlobGranuleCorrectnessWorkload> ApiBlobGranuleCorrectnessWorkloadFactory(
"ApiBlobGranuleCorrectness");
} // namespace FdbApiTester

View File

@ -31,27 +31,6 @@ public:
private:
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
void randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void randomCommitReadOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
@ -145,44 +124,6 @@ private:
});
}
void randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {

View File

@ -30,12 +30,9 @@
#include <vector>
#include <mutex>
namespace FdbApiTester {
#include "TesterUtil.h"
struct KeyValue {
std::string key;
std::string value;
};
namespace FdbApiTester {
class KeyValueStore {
public:

View File

@ -49,6 +49,7 @@ public:
int numClients;
std::vector<std::pair<std::string, std::string>> knobs;
TestSpec testSpec;
std::string bgBasePath;
};
} // namespace FdbApiTester

View File

@ -75,9 +75,10 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
int retryLimit,
std::string bgBasePath)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false) {}
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath) {}
// A state machine:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
@ -123,6 +124,8 @@ public:
contAfterDone();
}
std::string getBGBasePath() override { return bgBasePath; }
protected:
virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
@ -217,6 +220,9 @@ protected:
// A history of errors on which the transaction was retried
std::vector<fdb_error_t> retriedErrors;
// blob granule base path
std::string bgBasePath;
};
/**
@ -228,8 +234,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -316,8 +323,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -482,9 +490,10 @@ class TransactionExecutorBase : public ITransactionExecutor {
public:
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
void init(IScheduler* scheduler, const char* clusterFile) override {
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
this->scheduler = scheduler;
this->clusterFile = clusterFile;
this->bgBasePath = bgBasePath;
}
protected:
@ -499,10 +508,10 @@ protected:
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
} else {
ctx = std::make_shared<AsyncTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
}
txActor->init(ctx);
txActor->start();
@ -511,6 +520,7 @@ protected:
protected:
TransactionExecutorOptions options;
std::string bgBasePath;
std::string clusterFile;
IScheduler* scheduler;
};
@ -524,8 +534,8 @@ public:
~DBPoolTransactionExecutor() override { release(); }
void init(IScheduler* scheduler, const char* clusterFile) override {
TransactionExecutorBase::init(scheduler, clusterFile);
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
for (int i = 0; i < options.numDatabases; i++) {
FDBDatabase* db;
fdb_error_t err = fdb_create_database(clusterFile, &db);

View File

@ -55,6 +55,9 @@ public:
// Mark the transaction as completed without committing it (for read transactions)
virtual void done() = 0;
// Plumbing for blob granule base path
virtual std::string getBGBasePath() = 0;
// A continuation to be executed when all of the given futures get ready
virtual void continueAfterAll(std::vector<Future> futures, TTaskFct cont);
};
@ -136,7 +139,7 @@ struct TransactionExecutorOptions {
class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile) = 0;
virtual void init(IScheduler* sched, const char* clusterFile, const std::string& bgBasePath) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
};

View File

@ -49,6 +49,11 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
namespace FdbApiTester {
struct KeyValue {
std::string key;
std::string value;
};
std::string lowerCase(const std::string& str);
class Random {

View File

@ -0,0 +1,24 @@
[[test]]
title = 'Blob GranuleAPI Correctness Blocking'
multiThreaded = true
buggify = true
blockOnFutures = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,23 @@
[[test]]
title = 'Blob Granule API Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,15 @@
[[test]]
title = 'Blob Granule API Correctness Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -51,7 +51,8 @@ enum TesterOptionId {
OPT_INPUT_PIPE,
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT
OPT_TRANSACTION_RETRY_LIMIT,
OPT_BLOB_GRANULE_LOCAL_FILE_PATH
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -73,6 +74,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
{ OPT_BLOB_GRANULE_LOCAL_FILE_PATH, "--blob-granule-local-file-path", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -108,6 +110,8 @@ void printProgramUsage(const char* execName) {
" Required FDB API version (default %d).\n"
" --transaction-retry-limit NUMBER\n"
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
" --blob-granule-local-file-path PATH\n"
" Path to blob granule files on local filesystem\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" -h, --help Display this help and exit.\n",
@ -200,6 +204,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_TRANSACTION_RETRY_LIMIT:
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
break;
case OPT_BLOB_GRANULE_LOCAL_FILE_PATH:
options.bgBasePath = args.OptionArg();
break;
}
return true;
}
@ -295,7 +302,7 @@ bool runWorkloads(TesterOptions& options) {
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), options.bgBasePath);
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {

View File

@ -53,6 +53,9 @@ def run_tester(args, test_file):
args.cluster_file, "--test-file", test_file]
if args.external_client_library is not None:
cmd += ["--external-client-library", args.external_client_library]
if args.blob_granule_local_file_path is not None:
cmd += ["--blob-granule-local-file-path", args.blob_granule_local_file_path]
get_logger().info('\nRunning tester \'%s\'...' % ' '.join(cmd))
proc = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
@ -79,11 +82,9 @@ def run_tester(args, test_file):
get_logger().info('')
return ret_code
def run_tests(args):
num_failed = 0
test_files = [f for f in os.listdir(args.test_dir)
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
test_files = [f for f in os.listdir(args.test_dir) if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
for test_file in test_files:
get_logger().info('=========================================================')
@ -111,6 +112,8 @@ def parse_args(argv):
help='The timeout in seconds for running each individual test. (default 300)')
parser.add_argument('--logging-level', type=str, default='INFO',
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Specifies the level of detail in the tester output (default=\'INFO\').')
parser.add_argument('--blob-granule-local-file-path', type=str, default=None,
help='Enable blob granule tests if set, value is path to local blob granule files')
return parser.parse_args(argv)

View File

@ -405,6 +405,7 @@ endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)
function(add_fdbclient_test)
set(options DISABLED ENABLED DISABLE_LOG_DUMP)
set(options DISABLED ENABLED API_TEST_BLOB_GRANULES_ENABLED)
set(oneValueArgs NAME PROCESS_NUMBER TEST_TIMEOUT WORKING_DIRECTORY)
set(multiValueArgs COMMAND)
cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -431,6 +432,9 @@ function(add_fdbclient_test)
if(T_DISABLE_LOG_DUMP)
list(APPEND TMP_CLUSTER_CMD --disable-log-dump)
endif()
if(T_API_TEST_BLOB_GRANULES_ENABLED)
list(APPEND TMP_CLUSTER_CMD --blob-granules-enabled)
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
WORKING_DIRECTORY ${T_WORKING_DIRECTORY}

View File

@ -300,7 +300,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
int count;
FdbCApi::fdb_bool_t more;
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
ASSERT(!error);
if (error) {
return ThreadResult<RangeResult>(Error(error));
}
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
return ThreadResult<RangeResult>(

View File

@ -66,6 +66,7 @@ public-address = {ip_address}:$ID
listen-address = public
datadir = {datadir}/$ID
logdir = {logdir}
{bg_knob_line}
# logsize = 10MiB
# maxlogssize = 100MiB
# machine-id =
@ -82,7 +83,7 @@ logdir = {logdir}
"""
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str,
process_number: int, create_config=True, port=None, ip_address=None):
process_number: int, create_config=True, port=None, ip_address=None, blob_granules_enabled: bool=False):
self.basedir = Path(basedir)
self.etc = self.basedir.joinpath('etc')
self.log = self.basedir.joinpath('log')
@ -100,6 +101,11 @@ logdir = {logdir}
self.process_number = process_number
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
self.first_port = port
self.blob_granules_enabled = blob_granules_enabled
if (blob_granules_enabled):
# add extra process for blob_worker
self.process_number += 1
if (self.first_port is not None):
self.last_used_port = int(self.first_port)-1
self.server_ports = [self.__next_port()
@ -111,6 +117,7 @@ logdir = {logdir}
self.process = None
self.fdbmonitor_logfile = None
self.use_legacy_conf_syntax = False
if create_config:
self.create_cluster_file()
self.save_config()
@ -126,14 +133,18 @@ logdir = {logdir}
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
with open(new_conf_file, 'x') as f:
conf_template = LocalCluster.configuration_template
bg_knob_line = ""
if (self.use_legacy_conf_syntax):
conf_template = conf_template.replace("-", "_")
if (self.blob_granules_enabled):
bg_knob_line = "knob_bg_url=file://" + str(self.data) + "/fdbblob/"
f.write(conf_template.format(
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log,
ip_address=self.ip_address
ip_address=self.ip_address,
bg_knob_line=bg_knob_line
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
@ -143,6 +154,9 @@ logdir = {logdir}
for port in self.server_ports:
f.write('[fdbserver.{server_port}]\n'.format(
server_port=port))
if (self.blob_granules_enabled):
# make last process a blob_worker class
f.write('class = blob_worker')
f.flush()
os.fsync(f.fileno())
@ -202,12 +216,21 @@ logdir = {logdir}
db_config = 'configure new single {}'.format(storage)
if (enable_tenants):
db_config += " tenant_mode=optional_experimental"
if (self.blob_granules_enabled):
db_config += " blob_granules_enabled:=1"
args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', db_config]
res = subprocess.run(args, env=self.process_env())
assert res.returncode == 0, "Create database failed with {}".format(
res.returncode)
if (self.blob_granules_enabled):
bg_args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', 'blobrange start \\x00 \\xff']
bg_res = subprocess.run(bg_args, env=self.process_env())
assert bg_res.returncode == 0, "Start blob granules failed with {}".format(bg_res.returncode)
def get_status(self):
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
'status json']

View File

@ -11,7 +11,7 @@ from pathlib import Path
class TempCluster:
def __init__(self, build_dir: str, process_number: int = 1, port: str = None):
def __init__(self, build_dir: str, process_number: int = 1, port: str = None, blob_granules_enabled: bool = False):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
@ -27,6 +27,7 @@ class TempCluster:
self.build_dir.joinpath("bin", "fdbcli"),
process_number,
port=port,
blob_granules_enabled=blob_granules_enabled
)
self.log = self.cluster.log
self.etc = self.cluster.etc
@ -88,9 +89,14 @@ if __name__ == "__main__":
help='Do not dump cluster log on error',
action="store_true"
)
parser.add_argument(
'--blob-granules-enabled',
help='Enable blob granules',
action="store_true"
)
args = parser.parse_args()
errcode = 1
with TempCluster(args.build_dir, args.process_number) as cluster:
with TempCluster(args.build_dir, args.process_number, blob_granules_enabled=args.blob_granules_enabled) as cluster:
print("log-dir: {}".format(cluster.log))
print("etc-dir: {}".format(cluster.etc))
print("data-dir: {}".format(cluster.data))
@ -105,6 +111,8 @@ if __name__ == "__main__":
cmd_args.append(str(cluster.log))
elif cmd == "@ETC_DIR@":
cmd_args.append(str(cluster.etc))
elif cmd.startswith("@DATA_DIR@"):
cmd_args.append(str(cluster.data) + cmd[len("@DATA_DIR@"):])
else:
cmd_args.append(cmd)
env = dict(**os.environ)