Added Blob Granule API Tester
This commit is contained in:
parent
8fa25aa013
commit
40052c1394
|
@ -106,6 +106,7 @@ if(NOT WIN32)
|
|||
test/apitester/TesterApiWrapper.h
|
||||
test/apitester/TesterTestSpec.cpp
|
||||
test/apitester/TesterTestSpec.h
|
||||
test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp
|
||||
test/apitester/TesterCancelTransactionWorkload.cpp
|
||||
test/apitester/TesterCorrectnessWorkload.cpp
|
||||
test/apitester/TesterKeyValueStore.cpp
|
||||
|
@ -266,6 +267,8 @@ endif()
|
|||
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
|
||||
--test-dir
|
||||
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
|
||||
--bg-enabled
|
||||
@DATA_DIR@/fdbblob/
|
||||
)
|
||||
|
||||
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
#include "TesterApiWrapper.h"
|
||||
#include "TesterUtil.h"
|
||||
#include <cstdint>
|
||||
#include <fmt/format.h>
|
||||
#include <fstream>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
|
@ -60,6 +60,56 @@ std::optional<std::string> ValueFuture::getValue() const {
|
|||
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
|
||||
}
|
||||
|
||||
std::vector<KeyValue> KeyRangesFuture::getKeyRanges() const {
|
||||
ASSERT(future_);
|
||||
|
||||
int count;
|
||||
const FDBKeyRange* ranges;
|
||||
|
||||
fdb_check(fdb_future_get_keyrange_array(future_.get(), &ranges, &count));
|
||||
std::vector<KeyValue> result;
|
||||
result.reserve(count);
|
||||
for (int i = 0; i < count; i++) {
|
||||
FDBKeyRange kr = *ranges++;
|
||||
KeyValue rkv;
|
||||
rkv.key = std::string((const char*)kr.begin_key, kr.begin_key_length);
|
||||
rkv.value = std::string((const char*)kr.end_key, kr.end_key_length);
|
||||
result.push_back(rkv);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
Result::Result(FDBResult* r) : result_(r, fdb_result_destroy) {}
|
||||
|
||||
std::vector<KeyValue> KeyValuesResult::getKeyValues(bool* more_out) {
|
||||
ASSERT(result_);
|
||||
|
||||
int count;
|
||||
const FDBKeyValue* kvs;
|
||||
int more;
|
||||
|
||||
std::vector<KeyValue> result;
|
||||
|
||||
error_ = fdb_result_get_keyvalue_array(result_.get(), &kvs, &count, &more);
|
||||
|
||||
if (error_ != error_code_success) {
|
||||
return result;
|
||||
}
|
||||
|
||||
result.reserve(count);
|
||||
for (int i = 0; i < count; i++) {
|
||||
FDBKeyValue kv = *kvs++;
|
||||
KeyValue rkv;
|
||||
rkv.key = std::string((const char*)kv.key, kv.key_length);
|
||||
rkv.value = std::string((const char*)kv.value, kv.value_length);
|
||||
result.push_back(rkv);
|
||||
}
|
||||
*more_out = more;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Given an FDBDatabase, initializes a new transaction.
|
||||
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
|
||||
|
||||
|
@ -109,6 +159,85 @@ fdb_error_t Transaction::setOption(FDBTransactionOption option) {
|
|||
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
|
||||
}
|
||||
|
||||
class TesterGranuleContext {
|
||||
public:
|
||||
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
|
||||
int64_t nextId = 0;
|
||||
std::string basePath;
|
||||
|
||||
~TesterGranuleContext() {
|
||||
// if there was an error or not all loads finished, delete data
|
||||
for (auto& it : loadsInProgress) {
|
||||
uint8_t* dataToFree = it.second;
|
||||
delete dataToFree;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static int64_t granule_start_load(const char* filename,
|
||||
int filenameLength,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
void* context) {
|
||||
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
int64_t loadId = ctx->nextId++;
|
||||
|
||||
uint8_t* buffer = new uint8_t[length];
|
||||
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
|
||||
fin.seekg(offset);
|
||||
fin.read((char*)buffer, length);
|
||||
|
||||
ctx->loadsInProgress.insert({ loadId, buffer });
|
||||
|
||||
return loadId;
|
||||
}
|
||||
|
||||
static uint8_t* granule_get_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
return ctx->loadsInProgress.at(loadId);
|
||||
}
|
||||
|
||||
static void granule_free_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
auto it = ctx->loadsInProgress.find(loadId);
|
||||
uint8_t* dataToFree = it->second;
|
||||
delete dataToFree;
|
||||
|
||||
ctx->loadsInProgress.erase(it);
|
||||
}
|
||||
|
||||
KeyValuesResult Transaction::readBlobGranules(std::string_view begin, std::string_view end, std::string basePath) {
|
||||
ASSERT(tx_);
|
||||
|
||||
TesterGranuleContext testerContext;
|
||||
testerContext.basePath = basePath;
|
||||
|
||||
FDBReadBlobGranuleContext granuleContext;
|
||||
granuleContext.userContext = &testerContext;
|
||||
granuleContext.debugNoMaterialize = false;
|
||||
granuleContext.granuleParallelism = 1;
|
||||
granuleContext.start_load_f = &granule_start_load;
|
||||
granuleContext.get_load_f = &granule_get_load;
|
||||
granuleContext.free_load_f = &granule_free_load;
|
||||
|
||||
return KeyValuesResult(fdb_transaction_read_blob_granules(tx_.get(),
|
||||
(const uint8_t*)begin.data(),
|
||||
begin.size(),
|
||||
(const uint8_t*)end.data(),
|
||||
end.size(),
|
||||
0 /* beginVersion */,
|
||||
-2 /* latest read version */,
|
||||
granuleContext));
|
||||
}
|
||||
|
||||
KeyRangesFuture Transaction::getBlobGranuleRanges(std::string_view begin, std::string_view end) {
|
||||
ASSERT(tx_);
|
||||
return KeyRangesFuture(fdb_transaction_get_blob_granule_ranges(
|
||||
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size()));
|
||||
}
|
||||
|
||||
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
|
||||
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <string_view>
|
||||
#include <optional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#include "bindings/c/foundationdb/fdb_c.h"
|
||||
|
@ -35,6 +36,8 @@
|
|||
|
||||
#include "flow/error_definitions.h"
|
||||
|
||||
#include "TesterUtil.h"
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
// Wrapper parent class to manage memory of an FDBFuture pointer. Cleans up
|
||||
|
@ -62,6 +65,37 @@ public:
|
|||
std::optional<std::string> getValue() const;
|
||||
};
|
||||
|
||||
class KeyRangesFuture : public Future {
|
||||
public:
|
||||
KeyRangesFuture() = default;
|
||||
KeyRangesFuture(FDBFuture* f) : Future(f) {}
|
||||
std::vector<KeyValue> getKeyRanges() const;
|
||||
};
|
||||
|
||||
class Result {
|
||||
public:
|
||||
Result() = default;
|
||||
Result(FDBResult* r);
|
||||
|
||||
FDBResult* fdbResult() { return result_.get(); };
|
||||
|
||||
fdb_error_t getError() const { return error_; }
|
||||
|
||||
explicit operator bool() const { return result_ != nullptr; };
|
||||
|
||||
fdb_error_t error_ = error_code_client_invalid_operation; // have to call getX function to set this
|
||||
|
||||
protected:
|
||||
std::shared_ptr<FDBResult> result_;
|
||||
};
|
||||
|
||||
class KeyValuesResult : public Result {
|
||||
public:
|
||||
KeyValuesResult() = default;
|
||||
KeyValuesResult(FDBResult* f) : Result(f) {}
|
||||
std::vector<KeyValue> getKeyValues(bool* more_out);
|
||||
};
|
||||
|
||||
class Transaction {
|
||||
public:
|
||||
Transaction() = default;
|
||||
|
@ -76,6 +110,9 @@ public:
|
|||
void reset();
|
||||
fdb_error_t setOption(FDBTransactionOption option);
|
||||
|
||||
KeyValuesResult readBlobGranules(std::string_view begin, std::string_view end, std::string basePath);
|
||||
KeyRangesFuture getBlobGranuleRanges(std::string_view begin, std::string_view end);
|
||||
|
||||
private:
|
||||
std::shared_ptr<FDBTransaction> tx_;
|
||||
};
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* TesterBlobGranuleCorrectnessWorkload.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "TesterApiWorkload.h"
|
||||
#include "TesterUtil.h"
|
||||
#include <memory>
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
class ApiBlobGranuleCorrectnessWorkload : public ApiWorkload {
|
||||
public:
|
||||
ApiBlobGranuleCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
|
||||
// sometimes don't do range clears
|
||||
if (Random::get().randomInt(0, 1) == 0) {
|
||||
excludedOpTypes.push_back(OP_CLEAR_RANGE);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
enum OpType { OP_INSERT, OP_CLEAR, OP_CLEAR_RANGE, OP_READ, OP_GET_RANGES, OP_LAST = OP_GET_RANGES };
|
||||
std::vector<OpType> excludedOpTypes;
|
||||
|
||||
void randomInsertOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
|
||||
}
|
||||
execTransaction(
|
||||
[kvPairs](auto ctx) {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
ctx->tx()->set(kv.key, kv.value);
|
||||
}
|
||||
ctx->commit();
|
||||
},
|
||||
[this, kvPairs, cont]() {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
store.set(kv.key, kv.value);
|
||||
}
|
||||
schedule(cont);
|
||||
});
|
||||
}
|
||||
|
||||
void randomClearOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto keys = std::make_shared<std::vector<std::string>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
keys->push_back(randomExistingKey());
|
||||
}
|
||||
execTransaction(
|
||||
[keys](auto ctx) {
|
||||
for (const auto& key : *keys) {
|
||||
ctx->tx()->clear(key);
|
||||
}
|
||||
ctx->commit();
|
||||
},
|
||||
[this, keys, cont]() {
|
||||
for (const auto& key : *keys) {
|
||||
store.clear(key);
|
||||
}
|
||||
schedule(cont);
|
||||
});
|
||||
}
|
||||
|
||||
void randomClearRangeOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
execTransaction(
|
||||
[begin, end](auto ctx) {
|
||||
ctx->tx()->clearRange(begin, end);
|
||||
ctx->commit();
|
||||
},
|
||||
[this, begin, end, cont]() {
|
||||
store.clear(begin, end);
|
||||
schedule(cont);
|
||||
});
|
||||
}
|
||||
|
||||
void randomReadOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<KeyValue>>();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
execTransaction(
|
||||
[begin, end, results](auto ctx) {
|
||||
ctx->tx()->setOption(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
|
||||
KeyValuesResult res = ctx->tx()->readBlobGranules(begin, end, ctx->getBGBasePath());
|
||||
bool more;
|
||||
(*results) = res.getKeyValues(&more);
|
||||
ASSERT(!more);
|
||||
if (res.getError() != error_code_success) {
|
||||
ctx->onError(res.getError());
|
||||
} else {
|
||||
ctx->done();
|
||||
}
|
||||
},
|
||||
[this, begin, end, results, cont]() {
|
||||
std::vector<KeyValue> expected = store.getRange(begin, end, store.size(), false);
|
||||
if (results->size() != expected.size()) {
|
||||
error(fmt::format("randomReadOp result size mismatch. expected: {} actual: {}",
|
||||
expected.size(),
|
||||
results->size()));
|
||||
}
|
||||
ASSERT(results->size() == expected.size());
|
||||
|
||||
for (int i = 0; i < results->size(); i++) {
|
||||
if ((*results)[i].key != expected[i].key) {
|
||||
error(fmt::format("randomReadOp key mismatch at {}/{}. expected: {} actual: {}",
|
||||
i,
|
||||
results->size(),
|
||||
expected[i].key,
|
||||
(*results)[i].key));
|
||||
}
|
||||
ASSERT((*results)[i].key == expected[i].key);
|
||||
|
||||
if ((*results)[i].value != expected[i].value) {
|
||||
error(
|
||||
fmt::format("randomReadOp value mismatch at {}/{}. key: {} expected: {:.80} actual: {:.80}",
|
||||
i,
|
||||
results->size(),
|
||||
expected[i].key,
|
||||
expected[i].value,
|
||||
(*results)[i].value));
|
||||
}
|
||||
ASSERT((*results)[i].value == expected[i].value);
|
||||
}
|
||||
schedule(cont);
|
||||
});
|
||||
}
|
||||
|
||||
void randomGetRangesOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<KeyValue>>();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
execTransaction(
|
||||
[begin, end, results](auto ctx) {
|
||||
KeyRangesFuture f = ctx->tx()->getBlobGranuleRanges(begin, end);
|
||||
ctx->continueAfter(
|
||||
f,
|
||||
[ctx, f, results]() {
|
||||
(*results) = f.getKeyRanges();
|
||||
ctx->done();
|
||||
},
|
||||
true);
|
||||
},
|
||||
[this, begin, end, results, cont]() {
|
||||
ASSERT(results->size() > 0);
|
||||
ASSERT(results->front().key <= begin);
|
||||
ASSERT(results->back().value >= end);
|
||||
|
||||
for (int i = 0; i < results->size(); i++) {
|
||||
// no empty or inverted ranges
|
||||
ASSERT((*results)[i].key < (*results)[i].value);
|
||||
}
|
||||
|
||||
for (int i = 1; i < results->size(); i++) {
|
||||
// ranges contain entire requested key range
|
||||
ASSERT((*results)[i].key == (*results)[i - 1].value);
|
||||
}
|
||||
|
||||
schedule(cont);
|
||||
});
|
||||
}
|
||||
|
||||
void randomOperation(TTaskFct cont) {
|
||||
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
|
||||
while (std::count(excludedOpTypes.begin(), excludedOpTypes.end(), txType)) {
|
||||
txType = (OpType)Random::get().randomInt(0, OP_LAST);
|
||||
}
|
||||
switch (txType) {
|
||||
case OP_INSERT:
|
||||
randomInsertOp(cont);
|
||||
break;
|
||||
case OP_CLEAR:
|
||||
randomClearOp(cont);
|
||||
break;
|
||||
case OP_CLEAR_RANGE:
|
||||
randomClearRangeOp(cont);
|
||||
break;
|
||||
case OP_READ:
|
||||
randomReadOp(cont);
|
||||
break;
|
||||
case OP_GET_RANGES:
|
||||
randomGetRangesOp(cont);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<ApiBlobGranuleCorrectnessWorkload> ApiBlobGranuleCorrectnessWorkloadFactory(
|
||||
"ApiBlobGranuleCorrectness");
|
||||
|
||||
} // namespace FdbApiTester
|
|
@ -30,12 +30,9 @@
|
|||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
namespace FdbApiTester {
|
||||
#include "TesterUtil.h"
|
||||
|
||||
struct KeyValue {
|
||||
std::string key;
|
||||
std::string value;
|
||||
};
|
||||
namespace FdbApiTester {
|
||||
|
||||
class KeyValueStore {
|
||||
public:
|
||||
|
|
|
@ -49,6 +49,7 @@ public:
|
|||
int numClients;
|
||||
std::vector<std::pair<std::string, std::string>> knobs;
|
||||
TestSpec testSpec;
|
||||
std::string bgBasePath;
|
||||
};
|
||||
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -75,9 +75,10 @@ public:
|
|||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
int retryLimit)
|
||||
int retryLimit,
|
||||
std::string bgBasePath)
|
||||
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
|
||||
txState(TxState::IN_PROGRESS), commitCalled(false) {}
|
||||
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath) {}
|
||||
|
||||
// A state machine:
|
||||
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
|
||||
|
@ -123,6 +124,8 @@ public:
|
|||
contAfterDone();
|
||||
}
|
||||
|
||||
std::string getBGBasePath() override { return bgBasePath; }
|
||||
|
||||
protected:
|
||||
virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
|
||||
|
||||
|
@ -217,6 +220,9 @@ protected:
|
|||
|
||||
// A history of errors on which the transaction was retried
|
||||
std::vector<fdb_error_t> retriedErrors;
|
||||
|
||||
// blob granule base path
|
||||
std::string bgBasePath;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -228,8 +234,9 @@ public:
|
|||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
int retryLimit)
|
||||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
|
||||
int retryLimit,
|
||||
std::string bgBasePath)
|
||||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
|
||||
|
||||
protected:
|
||||
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
|
||||
|
@ -316,8 +323,9 @@ public:
|
|||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
int retryLimit)
|
||||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
|
||||
int retryLimit,
|
||||
std::string bgBasePath)
|
||||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
|
||||
|
||||
protected:
|
||||
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
|
||||
|
@ -482,9 +490,10 @@ class TransactionExecutorBase : public ITransactionExecutor {
|
|||
public:
|
||||
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
|
||||
|
||||
void init(IScheduler* scheduler, const char* clusterFile) override {
|
||||
void init(IScheduler* scheduler, const char* clusterFile, std::string bgBasePath) override {
|
||||
this->scheduler = scheduler;
|
||||
this->clusterFile = clusterFile;
|
||||
this->bgBasePath = bgBasePath;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -499,10 +508,10 @@ protected:
|
|||
std::shared_ptr<ITransactionContext> ctx;
|
||||
if (options.blockOnFutures) {
|
||||
ctx = std::make_shared<BlockingTransactionContext>(
|
||||
tx, txActor, cont, scheduler, options.transactionRetryLimit);
|
||||
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
|
||||
} else {
|
||||
ctx = std::make_shared<AsyncTransactionContext>(
|
||||
tx, txActor, cont, scheduler, options.transactionRetryLimit);
|
||||
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
|
||||
}
|
||||
txActor->init(ctx);
|
||||
txActor->start();
|
||||
|
@ -511,6 +520,7 @@ protected:
|
|||
|
||||
protected:
|
||||
TransactionExecutorOptions options;
|
||||
std::string bgBasePath;
|
||||
std::string clusterFile;
|
||||
IScheduler* scheduler;
|
||||
};
|
||||
|
@ -524,8 +534,8 @@ public:
|
|||
|
||||
~DBPoolTransactionExecutor() override { release(); }
|
||||
|
||||
void init(IScheduler* scheduler, const char* clusterFile) override {
|
||||
TransactionExecutorBase::init(scheduler, clusterFile);
|
||||
void init(IScheduler* scheduler, const char* clusterFile, std::string bgBasePath) override {
|
||||
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
|
||||
for (int i = 0; i < options.numDatabases; i++) {
|
||||
FDBDatabase* db;
|
||||
fdb_error_t err = fdb_create_database(clusterFile, &db);
|
||||
|
|
|
@ -55,6 +55,9 @@ public:
|
|||
// Mark the transaction as completed without committing it (for read transactions)
|
||||
virtual void done() = 0;
|
||||
|
||||
// Plumbing for blob granule base path
|
||||
virtual std::string getBGBasePath() = 0;
|
||||
|
||||
// A continuation to be executed when all of the given futures get ready
|
||||
virtual void continueAfterAll(std::vector<Future> futures, TTaskFct cont);
|
||||
};
|
||||
|
@ -136,7 +139,7 @@ struct TransactionExecutorOptions {
|
|||
class ITransactionExecutor {
|
||||
public:
|
||||
virtual ~ITransactionExecutor() {}
|
||||
virtual void init(IScheduler* sched, const char* clusterFile) = 0;
|
||||
virtual void init(IScheduler* sched, const char* clusterFile, std::string bgBasePath = "") = 0;
|
||||
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
|
||||
};
|
||||
|
||||
|
|
|
@ -49,6 +49,11 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
|
|||
|
||||
namespace FdbApiTester {
|
||||
|
||||
struct KeyValue {
|
||||
std::string key;
|
||||
std::string value;
|
||||
};
|
||||
|
||||
std::string lowerCase(const std::string& str);
|
||||
|
||||
class Random {
|
||||
|
|
|
@ -51,7 +51,8 @@ enum TesterOptionId {
|
|||
OPT_INPUT_PIPE,
|
||||
OPT_OUTPUT_PIPE,
|
||||
OPT_FDB_API_VERSION,
|
||||
OPT_TRANSACTION_RETRY_LIMIT
|
||||
OPT_TRANSACTION_RETRY_LIMIT,
|
||||
OPT_BG_LOCAL_BASE_PATH
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption TesterOptionDefs[] = //
|
||||
|
@ -73,6 +74,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
|
|||
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
|
||||
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
|
||||
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
|
||||
{ OPT_BG_LOCAL_BASE_PATH, "--bg-local-base-path", SO_REQ_SEP },
|
||||
SO_END_OF_OPTIONS };
|
||||
|
||||
void printProgramUsage(const char* execName) {
|
||||
|
@ -108,6 +110,8 @@ void printProgramUsage(const char* execName) {
|
|||
" Required FDB API version (default %d).\n"
|
||||
" --transaction-retry-limit NUMBER\n"
|
||||
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
|
||||
" --bg-local-base-path PATH\n"
|
||||
" Path to blob granule files on local filesystem\n"
|
||||
" -f, --test-file FILE\n"
|
||||
" Test file to run.\n"
|
||||
" -h, --help Display this help and exit.\n",
|
||||
|
@ -200,6 +204,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
|
|||
case OPT_TRANSACTION_RETRY_LIMIT:
|
||||
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
|
||||
break;
|
||||
case OPT_BG_LOCAL_BASE_PATH:
|
||||
options.bgBasePath = args.OptionArg();
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -295,7 +302,7 @@ bool runWorkloads(TesterOptions& options) {
|
|||
|
||||
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
|
||||
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
|
||||
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
|
||||
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), options.bgBasePath);
|
||||
|
||||
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
|
||||
for (const auto& workloadSpec : options.testSpec.workloads) {
|
||||
|
|
|
@ -53,6 +53,9 @@ def run_tester(args, test_file):
|
|||
args.cluster_file, "--test-file", test_file]
|
||||
if args.external_client_library is not None:
|
||||
cmd += ["--external-client-library", args.external_client_library]
|
||||
|
||||
if args.bg_enabled is not None:
|
||||
cmd += ["--bg-local-base-path", args.bg_enabled]
|
||||
|
||||
get_logger().info('\nRunning tester \'%s\'...' % ' '.join(cmd))
|
||||
proc = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
|
||||
|
@ -79,11 +82,15 @@ def run_tester(args, test_file):
|
|||
get_logger().info('')
|
||||
return ret_code
|
||||
|
||||
def include_bg_test(bg_enabled, fname):
|
||||
is_bg_test = fname.startswith("CApiBlobGranule")
|
||||
return (not is_bg_test) or bg_enabled
|
||||
|
||||
def run_tests(args):
|
||||
num_failed = 0
|
||||
# don't run blob granule tests unless bg_enabled
|
||||
test_files = [f for f in os.listdir(args.test_dir)
|
||||
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
|
||||
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml") and include_bg_test(args.bg_enabled, f)]
|
||||
|
||||
for test_file in test_files:
|
||||
get_logger().info('=========================================================')
|
||||
|
@ -111,6 +118,8 @@ def parse_args(argv):
|
|||
help='The timeout in seconds for running each individual test. (default 300)')
|
||||
parser.add_argument('--logging-level', type=str, default='INFO',
|
||||
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Specifies the level of detail in the tester output (default=\'INFO\').')
|
||||
parser.add_argument('--bg-enabled', type=str, default=None,
|
||||
help='Enable blob granule tests if set, value is path to local bg files')
|
||||
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
[[test]]
|
||||
title = 'Blob GranuleAPI Correctness Blocking'
|
||||
multiThreaded = true
|
||||
buggify = true
|
||||
blockOnFutures = true
|
||||
minFdbThreads = 2
|
||||
maxFdbThreads = 8
|
||||
minDatabases = 2
|
||||
maxDatabases = 8
|
||||
minClientThreads = 2
|
||||
maxClientThreads = 8
|
||||
minClients = 2
|
||||
maxClients = 8
|
||||
|
||||
|
||||
[[test.workload]]
|
||||
name = 'ApiBlobGranuleCorrectness'
|
||||
minKeyLength = 1
|
||||
maxKeyLength = 64
|
||||
minValueLength = 1
|
||||
maxValueLength = 1000
|
||||
maxKeysPerTransaction = 50
|
||||
initialSize = 100
|
||||
numRandomOperations = 100
|
|
@ -0,0 +1,23 @@
|
|||
[[test]]
|
||||
title = 'Blob Granule API Correctness Multi Threaded'
|
||||
multiThreaded = true
|
||||
buggify = true
|
||||
minFdbThreads = 2
|
||||
maxFdbThreads = 8
|
||||
minDatabases = 2
|
||||
maxDatabases = 8
|
||||
minClientThreads = 2
|
||||
maxClientThreads = 8
|
||||
minClients = 2
|
||||
maxClients = 8
|
||||
|
||||
[[test.workload]]
|
||||
name = 'ApiBlobGranuleCorrectness'
|
||||
minKeyLength = 1
|
||||
maxKeyLength = 64
|
||||
minValueLength = 1
|
||||
maxValueLength = 1000
|
||||
maxKeysPerTransaction = 50
|
||||
initialSize = 100
|
||||
numRandomOperations = 100
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
[[test]]
|
||||
title = 'Blob Granule API Correctness Single Threaded'
|
||||
minClients = 1
|
||||
maxClients = 3
|
||||
multiThreaded = false
|
||||
|
||||
[[test.workload]]
|
||||
name = 'ApiBlobGranuleCorrectness'
|
||||
minKeyLength = 1
|
||||
maxKeyLength = 64
|
||||
minValueLength = 1
|
||||
maxValueLength = 1000
|
||||
maxKeysPerTransaction = 50
|
||||
initialSize = 100
|
||||
numRandomOperations = 100
|
|
@ -300,7 +300,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
|
|||
int count;
|
||||
FdbCApi::fdb_bool_t more;
|
||||
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
|
||||
ASSERT(!error);
|
||||
if (error) {
|
||||
return ThreadResult<RangeResult>(Error(error));
|
||||
}
|
||||
|
||||
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
|
||||
return ThreadResult<RangeResult>(
|
||||
|
|
|
@ -111,6 +111,8 @@ if __name__ == "__main__":
|
|||
cmd_args.append(str(cluster.log))
|
||||
elif cmd == "@ETC_DIR@":
|
||||
cmd_args.append(str(cluster.etc))
|
||||
elif cmd.startswith("@DATA_DIR@"):
|
||||
cmd_args.append(str(cluster.data) + cmd[len("@DATA_DIR@"):])
|
||||
else:
|
||||
cmd_args.append(cmd)
|
||||
env = dict(**os.environ)
|
||||
|
|
Loading…
Reference in New Issue