Remove TesterApiWrapper, replace its uses with fdb_api.hpp
This commit is contained in:
parent
3e79735b2f
commit
d41ec69b23
|
@ -121,8 +121,6 @@ if(NOT WIN32)
|
|||
test/apitester/fdb_c_api_tester.cpp
|
||||
test/apitester/TesterApiWorkload.cpp
|
||||
test/apitester/TesterApiWorkload.h
|
||||
test/apitester/TesterApiWrapper.cpp
|
||||
test/apitester/TesterApiWrapper.h
|
||||
test/apitester/TesterTestSpec.cpp
|
||||
test/apitester/TesterTestSpec.h
|
||||
test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp
|
||||
|
@ -191,9 +189,9 @@ if(NOT WIN32)
|
|||
target_link_libraries(disconnected_timeout_unit_tests PRIVATE fdb_c Threads::Threads)
|
||||
|
||||
if(USE_SANITIZER)
|
||||
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c toml11_target Threads::Threads fmt::fmt boost_asan)
|
||||
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_asan)
|
||||
else()
|
||||
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c toml11_target Threads::Threads fmt::fmt boost_target)
|
||||
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_target)
|
||||
endif()
|
||||
|
||||
# do not set RPATH for mako
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "TesterApiWorkload.h"
|
||||
#include "TesterUtil.h"
|
||||
#include "test/fdb_api.hpp"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
@ -35,7 +36,7 @@ ApiWorkload::ApiWorkload(const WorkloadConfig& config) : WorkloadBase(config) {
|
|||
runUntilStop = config.getBoolOption("runUntilStop", false);
|
||||
numRandomOperations = config.getIntOption("numRandomOperations", 1000);
|
||||
numOperationsForProgressCheck = config.getIntOption("numOperationsForProgressCheck", 10);
|
||||
keyPrefix = fmt::format("{}/", workloadId);
|
||||
keyPrefix = fdb::toBytesRef(fmt::format("{}/", workloadId));
|
||||
numRandomOpLeft = 0;
|
||||
stopReceived = false;
|
||||
checkingProgress = false;
|
||||
|
@ -105,26 +106,26 @@ void ApiWorkload::randomOperation(TTaskFct cont) {
|
|||
ASSERT(false);
|
||||
}
|
||||
|
||||
std::string ApiWorkload::randomKeyName() {
|
||||
fdb::Key ApiWorkload::randomKeyName() {
|
||||
return keyPrefix + Random::get().randomStringLowerCase(minKeyLength, maxKeyLength);
|
||||
}
|
||||
|
||||
std::string ApiWorkload::randomValue() {
|
||||
fdb::Value ApiWorkload::randomValue() {
|
||||
return Random::get().randomStringLowerCase(minValueLength, maxValueLength);
|
||||
}
|
||||
|
||||
std::string ApiWorkload::randomNotExistingKey() {
|
||||
fdb::Key ApiWorkload::randomNotExistingKey() {
|
||||
while (true) {
|
||||
std::string key = randomKeyName();
|
||||
fdb::Key key = randomKeyName();
|
||||
if (!store.exists(key)) {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string ApiWorkload::randomExistingKey() {
|
||||
std::string genKey = randomKeyName();
|
||||
std::string key = store.getKey(genKey, true, 1);
|
||||
fdb::Key ApiWorkload::randomExistingKey() {
|
||||
fdb::Key genKey = randomKeyName();
|
||||
fdb::Key key = store.getKey(genKey, true, 1);
|
||||
if (key != store.endKey()) {
|
||||
return key;
|
||||
}
|
||||
|
@ -136,7 +137,7 @@ std::string ApiWorkload::randomExistingKey() {
|
|||
return genKey;
|
||||
}
|
||||
|
||||
std::string ApiWorkload::randomKey(double existingKeyRatio) {
|
||||
fdb::Key ApiWorkload::randomKey(double existingKeyRatio) {
|
||||
if (Random::get().randomBool(existingKeyRatio)) {
|
||||
return randomExistingKey();
|
||||
} else {
|
||||
|
@ -146,19 +147,19 @@ std::string ApiWorkload::randomKey(double existingKeyRatio) {
|
|||
|
||||
void ApiWorkload::populateDataTx(TTaskFct cont) {
|
||||
int numKeys = maxKeysPerTransaction;
|
||||
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
|
||||
auto kvPairs = std::make_shared<std::vector<fdb::KeyValue>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
|
||||
kvPairs->push_back(fdb::KeyValue{ randomNotExistingKey(), randomValue() });
|
||||
}
|
||||
execTransaction(
|
||||
[kvPairs](auto ctx) {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
ctx->tx()->set(kv.key, kv.value);
|
||||
}
|
||||
ctx->commit();
|
||||
},
|
||||
[this, kvPairs, cont]() {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
store.set(kv.key, kv.value);
|
||||
}
|
||||
schedule(cont);
|
||||
|
@ -168,7 +169,7 @@ void ApiWorkload::populateDataTx(TTaskFct cont) {
|
|||
void ApiWorkload::clearData(TTaskFct cont) {
|
||||
execTransaction(
|
||||
[this](auto ctx) {
|
||||
ctx->tx()->clearRange(keyPrefix, fmt::format("{}\xff", keyPrefix));
|
||||
ctx->tx()->clearRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff'));
|
||||
ctx->commit();
|
||||
},
|
||||
[this, cont]() { schedule(cont); });
|
||||
|
@ -185,19 +186,19 @@ void ApiWorkload::populateData(TTaskFct cont) {
|
|||
|
||||
void ApiWorkload::randomInsertOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
|
||||
auto kvPairs = std::make_shared<std::vector<fdb::KeyValue>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
|
||||
kvPairs->push_back(fdb::KeyValue{ randomNotExistingKey(), randomValue() });
|
||||
}
|
||||
execTransaction(
|
||||
[kvPairs](auto ctx) {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
ctx->tx()->set(kv.key, kv.value);
|
||||
}
|
||||
ctx->commit();
|
||||
},
|
||||
[this, kvPairs, cont]() {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
store.set(kv.key, kv.value);
|
||||
}
|
||||
schedule(cont);
|
||||
|
@ -206,7 +207,7 @@ void ApiWorkload::randomInsertOp(TTaskFct cont) {
|
|||
|
||||
void ApiWorkload::randomClearOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto keys = std::make_shared<std::vector<std::string>>();
|
||||
auto keys = std::make_shared<std::vector<fdb::Key>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
keys->push_back(randomExistingKey());
|
||||
}
|
||||
|
@ -226,8 +227,8 @@ void ApiWorkload::randomClearOp(TTaskFct cont) {
|
|||
}
|
||||
|
||||
void ApiWorkload::randomClearRangeOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
fdb::Key begin = randomKeyName();
|
||||
fdb::Key end = randomKeyName();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ protected:
|
|||
std::atomic<int> numRandomOpLeft;
|
||||
|
||||
// Key prefix
|
||||
std::string keyPrefix;
|
||||
fdb::Key keyPrefix;
|
||||
|
||||
// In-memory store maintaining expected database state
|
||||
KeyValueStore store;
|
||||
|
@ -102,11 +102,11 @@ protected:
|
|||
ApiWorkload(const WorkloadConfig& config);
|
||||
|
||||
// Methods for generating random keys and values
|
||||
std::string randomKeyName();
|
||||
std::string randomValue();
|
||||
std::string randomNotExistingKey();
|
||||
std::string randomExistingKey();
|
||||
std::string randomKey(double existingKeyRatio);
|
||||
fdb::Key randomKeyName();
|
||||
fdb::Value randomValue();
|
||||
fdb::Key randomNotExistingKey();
|
||||
fdb::Key randomExistingKey();
|
||||
fdb::Key randomKey(double existingKeyRatio);
|
||||
|
||||
// Generate initial random data for the workload
|
||||
void populateData(TTaskFct cont);
|
||||
|
@ -127,4 +127,4 @@ private:
|
|||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -1,255 +0,0 @@
|
|||
/*
|
||||
* TesterApiWrapper.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "TesterApiWrapper.h"
|
||||
#include <cstdint>
|
||||
#include <fmt/format.h>
|
||||
#include <fstream>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
namespace {
|
||||
|
||||
void fdb_check(fdb_error_t e) {
|
||||
if (e) {
|
||||
fmt::print(stderr, "Unexpected error: {}\n", fdb_get_error(e));
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Future::Future(FDBFuture* f) : future_(f, fdb_future_destroy) {}
|
||||
|
||||
void Future::reset() {
|
||||
future_.reset();
|
||||
}
|
||||
|
||||
void Future::cancel() {
|
||||
ASSERT(future_);
|
||||
fdb_future_cancel(future_.get());
|
||||
}
|
||||
|
||||
fdb_error_t Future::getError() const {
|
||||
ASSERT(future_);
|
||||
return fdb_future_get_error(future_.get());
|
||||
}
|
||||
|
||||
std::optional<std::string> ValueFuture::getValue() const {
|
||||
ASSERT(future_);
|
||||
int out_present;
|
||||
const std::uint8_t* val;
|
||||
int vallen;
|
||||
fdb_check(fdb_future_get_value(future_.get(), &out_present, &val, &vallen));
|
||||
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
|
||||
}
|
||||
|
||||
std::vector<KeyValue> KeyRangesFuture::getKeyRanges() const {
|
||||
ASSERT(future_);
|
||||
|
||||
int count;
|
||||
const FDBKeyRange* ranges;
|
||||
|
||||
fdb_check(fdb_future_get_keyrange_array(future_.get(), &ranges, &count));
|
||||
std::vector<KeyValue> result;
|
||||
result.reserve(count);
|
||||
for (int i = 0; i < count; i++) {
|
||||
FDBKeyRange kr = *ranges++;
|
||||
KeyValue rkv;
|
||||
rkv.key = std::string((const char*)kr.begin_key, kr.begin_key_length);
|
||||
rkv.value = std::string((const char*)kr.end_key, kr.end_key_length);
|
||||
result.push_back(rkv);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
Result::Result(FDBResult* r) : result_(r, fdb_result_destroy) {}
|
||||
|
||||
std::vector<KeyValue> KeyValuesResult::getKeyValues(bool* more_out) {
|
||||
ASSERT(result_);
|
||||
|
||||
int count;
|
||||
const FDBKeyValue* kvs;
|
||||
int more;
|
||||
|
||||
std::vector<KeyValue> result;
|
||||
|
||||
error_ = fdb_result_get_keyvalue_array(result_.get(), &kvs, &count, &more);
|
||||
|
||||
if (error_ != error_code_success) {
|
||||
return result;
|
||||
}
|
||||
|
||||
result.reserve(count);
|
||||
for (int i = 0; i < count; i++) {
|
||||
FDBKeyValue kv = *kvs++;
|
||||
KeyValue rkv;
|
||||
rkv.key = std::string((const char*)kv.key, kv.key_length);
|
||||
rkv.value = std::string((const char*)kv.value, kv.value_length);
|
||||
result.push_back(rkv);
|
||||
}
|
||||
*more_out = more;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Given an FDBDatabase, initializes a new transaction.
|
||||
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
|
||||
|
||||
ValueFuture Transaction::get(std::string_view key, fdb_bool_t snapshot) {
|
||||
ASSERT(tx_);
|
||||
return ValueFuture(fdb_transaction_get(tx_.get(), (const uint8_t*)key.data(), key.size(), snapshot));
|
||||
}
|
||||
|
||||
void Transaction::set(std::string_view key, std::string_view value) {
|
||||
ASSERT(tx_);
|
||||
fdb_transaction_set(tx_.get(), (const uint8_t*)key.data(), key.size(), (const uint8_t*)value.data(), value.size());
|
||||
}
|
||||
|
||||
void Transaction::clear(std::string_view key) {
|
||||
ASSERT(tx_);
|
||||
fdb_transaction_clear(tx_.get(), (const uint8_t*)key.data(), key.size());
|
||||
}
|
||||
|
||||
void Transaction::clearRange(std::string_view begin, std::string_view end) {
|
||||
ASSERT(tx_);
|
||||
fdb_transaction_clear_range(
|
||||
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size());
|
||||
}
|
||||
|
||||
Future Transaction::commit() {
|
||||
ASSERT(tx_);
|
||||
return Future(fdb_transaction_commit(tx_.get()));
|
||||
}
|
||||
|
||||
void Transaction::cancel() {
|
||||
ASSERT(tx_);
|
||||
fdb_transaction_cancel(tx_.get());
|
||||
}
|
||||
|
||||
Future Transaction::onError(fdb_error_t err) {
|
||||
ASSERT(tx_);
|
||||
return Future(fdb_transaction_on_error(tx_.get(), err));
|
||||
}
|
||||
|
||||
void Transaction::reset() {
|
||||
ASSERT(tx_);
|
||||
fdb_transaction_reset(tx_.get());
|
||||
}
|
||||
|
||||
fdb_error_t Transaction::setOption(FDBTransactionOption option) {
|
||||
ASSERT(tx_);
|
||||
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
|
||||
}
|
||||
|
||||
class TesterGranuleContext {
|
||||
public:
|
||||
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
|
||||
int64_t nextId = 0;
|
||||
std::string basePath;
|
||||
|
||||
~TesterGranuleContext() {
|
||||
// if there was an error or not all loads finished, delete data
|
||||
for (auto& it : loadsInProgress) {
|
||||
uint8_t* dataToFree = it.second;
|
||||
delete[] dataToFree;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static int64_t granule_start_load(const char* filename,
|
||||
int filenameLength,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
void* context) {
|
||||
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
int64_t loadId = ctx->nextId++;
|
||||
|
||||
uint8_t* buffer = new uint8_t[length];
|
||||
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
|
||||
fin.seekg(offset);
|
||||
fin.read((char*)buffer, length);
|
||||
|
||||
ctx->loadsInProgress.insert({ loadId, buffer });
|
||||
|
||||
return loadId;
|
||||
}
|
||||
|
||||
static uint8_t* granule_get_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
return ctx->loadsInProgress.at(loadId);
|
||||
}
|
||||
|
||||
static void granule_free_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
auto it = ctx->loadsInProgress.find(loadId);
|
||||
uint8_t* dataToFree = it->second;
|
||||
delete[] dataToFree;
|
||||
|
||||
ctx->loadsInProgress.erase(it);
|
||||
}
|
||||
|
||||
KeyValuesResult Transaction::readBlobGranules(std::string_view begin,
|
||||
std::string_view end,
|
||||
const std::string& basePath) {
|
||||
ASSERT(tx_);
|
||||
|
||||
TesterGranuleContext testerContext;
|
||||
testerContext.basePath = basePath;
|
||||
|
||||
FDBReadBlobGranuleContext granuleContext;
|
||||
granuleContext.userContext = &testerContext;
|
||||
granuleContext.debugNoMaterialize = false;
|
||||
granuleContext.granuleParallelism = 1;
|
||||
granuleContext.start_load_f = &granule_start_load;
|
||||
granuleContext.get_load_f = &granule_get_load;
|
||||
granuleContext.free_load_f = &granule_free_load;
|
||||
|
||||
return KeyValuesResult(fdb_transaction_read_blob_granules(tx_.get(),
|
||||
(const uint8_t*)begin.data(),
|
||||
begin.size(),
|
||||
(const uint8_t*)end.data(),
|
||||
end.size(),
|
||||
0 /* beginVersion */,
|
||||
-2 /* latest read version */,
|
||||
granuleContext));
|
||||
}
|
||||
|
||||
KeyRangesFuture Transaction::getBlobGranuleRanges(std::string_view begin, std::string_view end) {
|
||||
ASSERT(tx_);
|
||||
return KeyRangesFuture(fdb_transaction_get_blob_granule_ranges(
|
||||
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size()));
|
||||
}
|
||||
|
||||
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
|
||||
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
|
||||
}
|
||||
|
||||
fdb_error_t FdbApi::setOption(FDBNetworkOption option, int64_t value) {
|
||||
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(&value), sizeof(value));
|
||||
}
|
||||
|
||||
fdb_error_t FdbApi::setOption(FDBNetworkOption option) {
|
||||
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(""), 0);
|
||||
}
|
||||
|
||||
} // namespace FdbApiTester
|
|
@ -1,129 +0,0 @@
|
|||
/*
|
||||
* TesterApiWrapper.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef APITESTER_API_WRAPPER_H
|
||||
#define APITESTER_API_WRAPPER_H
|
||||
|
||||
#include <string_view>
|
||||
#include <optional>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
#define FDB_API_VERSION 720
|
||||
#include "bindings/c/foundationdb/fdb_c.h"
|
||||
|
||||
#undef ERROR
|
||||
#define ERROR(name, number, description) enum { error_code_##name = number };
|
||||
|
||||
#include "flow/error_definitions.h"
|
||||
|
||||
#include "TesterUtil.h"
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
// Wrapper parent class to manage memory of an FDBFuture pointer. Cleans up
|
||||
// FDBFuture when this instance goes out of scope.
|
||||
class Future {
|
||||
public:
|
||||
Future() = default;
|
||||
Future(FDBFuture* f);
|
||||
|
||||
FDBFuture* fdbFuture() { return future_.get(); };
|
||||
|
||||
fdb_error_t getError() const;
|
||||
explicit operator bool() const { return future_ != nullptr; };
|
||||
void reset();
|
||||
void cancel();
|
||||
|
||||
protected:
|
||||
std::shared_ptr<FDBFuture> future_;
|
||||
};
|
||||
|
||||
class ValueFuture : public Future {
|
||||
public:
|
||||
ValueFuture() = default;
|
||||
ValueFuture(FDBFuture* f) : Future(f) {}
|
||||
std::optional<std::string> getValue() const;
|
||||
};
|
||||
|
||||
class KeyRangesFuture : public Future {
|
||||
public:
|
||||
KeyRangesFuture() = default;
|
||||
KeyRangesFuture(FDBFuture* f) : Future(f) {}
|
||||
std::vector<KeyValue> getKeyRanges() const;
|
||||
};
|
||||
|
||||
class Result {
|
||||
public:
|
||||
Result() = default;
|
||||
Result(FDBResult* r);
|
||||
|
||||
FDBResult* fdbResult() { return result_.get(); };
|
||||
|
||||
fdb_error_t getError() const { return error_; }
|
||||
|
||||
explicit operator bool() const { return result_ != nullptr; };
|
||||
|
||||
fdb_error_t error_ = error_code_client_invalid_operation; // have to call getX function to set this
|
||||
|
||||
protected:
|
||||
std::shared_ptr<FDBResult> result_;
|
||||
};
|
||||
|
||||
class KeyValuesResult : public Result {
|
||||
public:
|
||||
KeyValuesResult() = default;
|
||||
KeyValuesResult(FDBResult* f) : Result(f) {}
|
||||
std::vector<KeyValue> getKeyValues(bool* more_out);
|
||||
};
|
||||
|
||||
class Transaction {
|
||||
public:
|
||||
Transaction() = default;
|
||||
Transaction(FDBTransaction* tx);
|
||||
ValueFuture get(std::string_view key, fdb_bool_t snapshot);
|
||||
void set(std::string_view key, std::string_view value);
|
||||
void clear(std::string_view key);
|
||||
void clearRange(std::string_view begin, std::string_view end);
|
||||
Future commit();
|
||||
void cancel();
|
||||
Future onError(fdb_error_t err);
|
||||
void reset();
|
||||
fdb_error_t setOption(FDBTransactionOption option);
|
||||
|
||||
KeyValuesResult readBlobGranules(std::string_view begin, std::string_view end, const std::string& basePath);
|
||||
KeyRangesFuture getBlobGranuleRanges(std::string_view begin, std::string_view end);
|
||||
|
||||
private:
|
||||
std::shared_ptr<FDBTransaction> tx_;
|
||||
};
|
||||
|
||||
class FdbApi {
|
||||
public:
|
||||
static fdb_error_t setOption(FDBNetworkOption option, std::string_view value);
|
||||
static fdb_error_t setOption(FDBNetworkOption option, int64_t value);
|
||||
static fdb_error_t setOption(FDBNetworkOption option);
|
||||
};
|
||||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
|
@ -24,6 +24,55 @@
|
|||
|
||||
namespace FdbApiTester {
|
||||
|
||||
class TesterGranuleContext {
|
||||
public:
|
||||
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
|
||||
int64_t nextId = 0;
|
||||
std::string basePath;
|
||||
|
||||
~TesterGranuleContext() {
|
||||
// if there was an error or not all loads finished, delete data
|
||||
for (auto& it : loadsInProgress) {
|
||||
uint8_t* dataToFree = it.second;
|
||||
delete[] dataToFree;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static int64_t granule_start_load(const char* filename,
|
||||
int filenameLength,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
void* context) {
|
||||
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
int64_t loadId = ctx->nextId++;
|
||||
|
||||
uint8_t* buffer = new uint8_t[length];
|
||||
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
|
||||
fin.seekg(offset);
|
||||
fin.read((char*)buffer, length);
|
||||
|
||||
ctx->loadsInProgress.insert({ loadId, buffer });
|
||||
|
||||
return loadId;
|
||||
}
|
||||
|
||||
static uint8_t* granule_get_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
return ctx->loadsInProgress.at(loadId);
|
||||
}
|
||||
|
||||
static void granule_free_load(int64_t loadId, void* context) {
|
||||
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
|
||||
auto it = ctx->loadsInProgress.find(loadId);
|
||||
uint8_t* dataToFree = it->second;
|
||||
delete[] dataToFree;
|
||||
|
||||
ctx->loadsInProgress.erase(it);
|
||||
}
|
||||
|
||||
class ApiBlobGranuleCorrectnessWorkload : public ApiWorkload {
|
||||
public:
|
||||
ApiBlobGranuleCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
|
||||
|
@ -42,9 +91,9 @@ private:
|
|||
bool seenReadSuccess = false;
|
||||
|
||||
void randomReadOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<KeyValue>>();
|
||||
fdb::Key begin = randomKeyName();
|
||||
fdb::Key end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<fdb::KeyValue>>();
|
||||
auto tooOld = std::make_shared<bool>(false);
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
|
@ -52,18 +101,31 @@ private:
|
|||
execTransaction(
|
||||
[this, begin, end, results, tooOld](auto ctx) {
|
||||
ctx->tx()->setOption(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
|
||||
KeyValuesResult res = ctx->tx()->readBlobGranules(begin, end, ctx->getBGBasePath());
|
||||
bool more = false;
|
||||
(*results) = res.getKeyValues(&more);
|
||||
if (res.getError() == error_code_blob_granule_transaction_too_old) {
|
||||
TesterGranuleContext testerContext;
|
||||
testerContext.basePath = ctx->getBGBasePath();
|
||||
|
||||
fdb::native::FDBReadBlobGranuleContext granuleContext;
|
||||
granuleContext.userContext = &testerContext;
|
||||
granuleContext.debugNoMaterialize = false;
|
||||
granuleContext.granuleParallelism = 1;
|
||||
granuleContext.start_load_f = &granule_start_load;
|
||||
granuleContext.get_load_f = &granule_get_load;
|
||||
granuleContext.free_load_f = &granule_free_load;
|
||||
|
||||
fdb::Result res = ctx->tx()->readBlobGranules(
|
||||
begin, end, 0 /* beginVersion */, -2 /* latest read version */, granuleContext);
|
||||
auto out = fdb::Result::NativeKeyValueArray{};
|
||||
fdb::Error err = res.getKeyValueArrayNothrow(out);
|
||||
if (err.code() == error_code_blob_granule_transaction_too_old) {
|
||||
info("BlobGranuleCorrectness::randomReadOp bg too old\n");
|
||||
ASSERT(!seenReadSuccess);
|
||||
*tooOld = true;
|
||||
ctx->done();
|
||||
} else if (res.getError() != error_code_success) {
|
||||
ctx->onError(res.getError());
|
||||
} else if (err.code() != error_code_success) {
|
||||
ctx->onError(err);
|
||||
} else {
|
||||
ASSERT(!more);
|
||||
auto& [out_kv, out_count, out_more] = out;
|
||||
ASSERT(!out_more);
|
||||
if (!seenReadSuccess) {
|
||||
info("BlobGranuleCorrectness::randomReadOp first success\n");
|
||||
}
|
||||
|
@ -73,7 +135,7 @@ private:
|
|||
},
|
||||
[this, begin, end, results, tooOld, cont]() {
|
||||
if (!*tooOld) {
|
||||
std::vector<KeyValue> expected = store.getRange(begin, end, store.size(), false);
|
||||
std::vector<fdb::KeyValue> expected = store.getRange(begin, end, store.size(), false);
|
||||
if (results->size() != expected.size()) {
|
||||
error(fmt::format("randomReadOp result size mismatch. expected: {} actual: {}",
|
||||
expected.size(),
|
||||
|
@ -86,8 +148,8 @@ private:
|
|||
error(fmt::format("randomReadOp key mismatch at {}/{}. expected: {} actual: {}",
|
||||
i,
|
||||
results->size(),
|
||||
expected[i].key,
|
||||
(*results)[i].key));
|
||||
fdb::toCharsRef(expected[i].key),
|
||||
fdb::toCharsRef((*results)[i].key)));
|
||||
}
|
||||
ASSERT((*results)[i].key == expected[i].key);
|
||||
|
||||
|
@ -96,9 +158,9 @@ private:
|
|||
"randomReadOp value mismatch at {}/{}. key: {} expected: {:.80} actual: {:.80}",
|
||||
i,
|
||||
results->size(),
|
||||
expected[i].key,
|
||||
expected[i].value,
|
||||
(*results)[i].value));
|
||||
fdb::toCharsRef(expected[i].key),
|
||||
fdb::toCharsRef(expected[i].value),
|
||||
fdb::toCharsRef((*results)[i].value)));
|
||||
}
|
||||
ASSERT((*results)[i].value == expected[i].value);
|
||||
}
|
||||
|
@ -108,19 +170,19 @@ private:
|
|||
}
|
||||
|
||||
void randomGetRangesOp(TTaskFct cont) {
|
||||
std::string begin = randomKeyName();
|
||||
std::string end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<KeyValue>>();
|
||||
fdb::Key begin = randomKeyName();
|
||||
fdb::Key end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<fdb::KeyRange>>();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
execTransaction(
|
||||
[begin, end, results](auto ctx) {
|
||||
KeyRangesFuture f = ctx->tx()->getBlobGranuleRanges(begin, end);
|
||||
fdb::Future f = ctx->tx()->getBlobGranuleRanges(begin, end).eraseType();
|
||||
ctx->continueAfter(
|
||||
f,
|
||||
[ctx, f, results]() {
|
||||
(*results) = f.getKeyRanges();
|
||||
*results = f.get<fdb::future_var::KeyRangeArray>();
|
||||
ctx->done();
|
||||
},
|
||||
true);
|
||||
|
@ -128,18 +190,18 @@ private:
|
|||
[this, begin, end, results, cont]() {
|
||||
if (seenReadSuccess) {
|
||||
ASSERT(results->size() > 0);
|
||||
ASSERT(results->front().key <= begin);
|
||||
ASSERT(results->back().value >= end);
|
||||
ASSERT(results->front().beginKey <= begin);
|
||||
ASSERT(results->back().endKey >= end);
|
||||
}
|
||||
|
||||
for (int i = 0; i < results->size(); i++) {
|
||||
// no empty or inverted ranges
|
||||
ASSERT((*results)[i].key < (*results)[i].value);
|
||||
ASSERT((*results)[i].beginKey < (*results)[i].endKey);
|
||||
}
|
||||
|
||||
for (int i = 1; i < results->size(); i++) {
|
||||
// ranges contain entire requested key range
|
||||
ASSERT((*results)[i].key == (*results)[i - 1].value);
|
||||
ASSERT((*results)[i].beginKey == (*results)[i - 1].endKey);
|
||||
}
|
||||
|
||||
schedule(cont);
|
||||
|
@ -174,4 +236,4 @@ private:
|
|||
WorkloadFactory<ApiBlobGranuleCorrectnessWorkload> ApiBlobGranuleCorrectnessWorkloadFactory(
|
||||
"ApiBlobGranuleCorrectness");
|
||||
|
||||
} // namespace FdbApiTester
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
#include "TesterApiWorkload.h"
|
||||
#include "TesterUtil.h"
|
||||
#include "test/fdb_api.hpp"
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
|
@ -32,15 +33,15 @@ private:
|
|||
// Start multiple concurrent gets and cancel the transaction
|
||||
void randomCancelGetTx(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto keys = std::make_shared<std::vector<std::string>>();
|
||||
auto keys = std::make_shared<std::vector<fdb::Key>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
keys->push_back(randomKey(readExistingKeysRatio));
|
||||
}
|
||||
execTransaction(
|
||||
[keys](auto ctx) {
|
||||
std::vector<Future> futures;
|
||||
std::vector<fdb::Future> futures;
|
||||
for (const auto& key : *keys) {
|
||||
futures.push_back(ctx->tx()->get(key, false));
|
||||
futures.push_back(ctx->tx()->get(key, false).eraseType());
|
||||
}
|
||||
ctx->done();
|
||||
},
|
||||
|
@ -50,24 +51,25 @@ private:
|
|||
// Start multiple concurrent gets and cancel the transaction after the first get returns
|
||||
void randomCancelAfterFirstResTx(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto keys = std::make_shared<std::vector<std::string>>();
|
||||
auto keys = std::make_shared<std::vector<fdb::Key>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
keys->push_back(randomKey(readExistingKeysRatio));
|
||||
}
|
||||
execTransaction(
|
||||
[this, keys](auto ctx) {
|
||||
std::vector<ValueFuture> futures;
|
||||
std::vector<fdb::Future> futures;
|
||||
for (const auto& key : *keys) {
|
||||
futures.push_back(ctx->tx()->get(key, false));
|
||||
futures.push_back(ctx->tx()->get(key, false).eraseType());
|
||||
}
|
||||
for (int i = 0; i < keys->size(); i++) {
|
||||
ValueFuture f = futures[i];
|
||||
fdb::Future f = futures[i];
|
||||
auto expectedVal = store.get((*keys)[i]);
|
||||
ctx->continueAfter(f, [expectedVal, f, this, ctx]() {
|
||||
auto val = f.getValue();
|
||||
auto val = f.get<fdb::future_var::OptionalValue>();
|
||||
if (expectedVal != val) {
|
||||
error(fmt::format(
|
||||
"cancelAfterFirstResTx mismatch. expected: {:.80} actual: {:.80}", expectedVal, val));
|
||||
error(fmt::format("cancelAfterFirstResTx mismatch. expected: {:.80} actual: {:.80}",
|
||||
fdb::toCharsRef(expectedVal.value()),
|
||||
fdb::toCharsRef(val.value())));
|
||||
}
|
||||
ctx->done();
|
||||
});
|
||||
|
@ -91,4 +93,4 @@ private:
|
|||
|
||||
WorkloadFactory<CancelTransactionWorkload> MiscTestWorkloadFactory("CancelTransaction");
|
||||
|
||||
} // namespace FdbApiTester
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
#include "TesterApiWorkload.h"
|
||||
#include "TesterUtil.h"
|
||||
#include "test/fdb_api.hpp"
|
||||
#include <memory>
|
||||
#include <fmt/format.h>
|
||||
|
||||
|
@ -33,36 +34,36 @@ private:
|
|||
|
||||
void randomCommitReadOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
|
||||
auto kvPairs = std::make_shared<std::vector<fdb::KeyValue>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
kvPairs->push_back(KeyValue{ randomKey(readExistingKeysRatio), randomValue() });
|
||||
kvPairs->push_back(fdb::KeyValue{ randomKey(readExistingKeysRatio), randomValue() });
|
||||
}
|
||||
execTransaction(
|
||||
[kvPairs](auto ctx) {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
ctx->tx()->set(kv.key, kv.value);
|
||||
}
|
||||
ctx->commit();
|
||||
},
|
||||
[this, kvPairs, cont]() {
|
||||
for (const KeyValue& kv : *kvPairs) {
|
||||
for (const fdb::KeyValue& kv : *kvPairs) {
|
||||
store.set(kv.key, kv.value);
|
||||
}
|
||||
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
|
||||
auto results = std::make_shared<std::vector<std::optional<fdb::Value>>>();
|
||||
execTransaction(
|
||||
[kvPairs, results, this](auto ctx) {
|
||||
if (apiVersion >= 710) {
|
||||
// Test GRV caching in 7.1 and later
|
||||
ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
|
||||
}
|
||||
auto futures = std::make_shared<std::vector<Future>>();
|
||||
auto futures = std::make_shared<std::vector<fdb::Future>>();
|
||||
for (const auto& kv : *kvPairs) {
|
||||
futures->push_back(ctx->tx()->get(kv.key, false));
|
||||
}
|
||||
ctx->continueAfterAll(*futures, [ctx, futures, results]() {
|
||||
results->clear();
|
||||
for (auto& f : *futures) {
|
||||
results->push_back(((ValueFuture&)f).getValue());
|
||||
results->push_back(f.get<fdb::future_var::OptionalValue>());
|
||||
}
|
||||
ASSERT(results->size() == futures->size());
|
||||
ctx->done();
|
||||
|
@ -76,9 +77,9 @@ private:
|
|||
if (actual != expected) {
|
||||
error(
|
||||
fmt::format("randomCommitReadOp mismatch. key: {} expected: {:.80} actual: {:.80}",
|
||||
(*kvPairs)[i].key,
|
||||
expected,
|
||||
actual));
|
||||
fdb::toCharsRef((*kvPairs)[i].key),
|
||||
fdb::toCharsRef(expected.value()),
|
||||
fdb::toCharsRef(actual.value())));
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
@ -89,21 +90,21 @@ private:
|
|||
|
||||
void randomGetOp(TTaskFct cont) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto keys = std::make_shared<std::vector<std::string>>();
|
||||
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
|
||||
auto keys = std::make_shared<std::vector<fdb::Key>>();
|
||||
auto results = std::make_shared<std::vector<std::optional<fdb::Value>>>();
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
keys->push_back(randomKey(readExistingKeysRatio));
|
||||
}
|
||||
execTransaction(
|
||||
[keys, results](auto ctx) {
|
||||
auto futures = std::make_shared<std::vector<Future>>();
|
||||
auto futures = std::make_shared<std::vector<fdb::Future>>();
|
||||
for (const auto& key : *keys) {
|
||||
futures->push_back(ctx->tx()->get(key, false));
|
||||
}
|
||||
ctx->continueAfterAll(*futures, [ctx, futures, results]() {
|
||||
results->clear();
|
||||
for (auto& f : *futures) {
|
||||
results->push_back(((ValueFuture&)f).getValue());
|
||||
results->push_back(f.get<fdb::future_var::OptionalValue>());
|
||||
}
|
||||
ASSERT(results->size() == futures->size());
|
||||
ctx->done();
|
||||
|
@ -115,9 +116,9 @@ private:
|
|||
auto expected = store.get((*keys)[i]);
|
||||
if ((*results)[i] != expected) {
|
||||
error(fmt::format("randomGetOp mismatch. key: {} expected: {:.80} actual: {:.80}",
|
||||
(*keys)[i],
|
||||
expected,
|
||||
(*results)[i]));
|
||||
fdb::toCharsRef((*keys)[i]),
|
||||
fdb::toCharsRef(expected.value()),
|
||||
fdb::toCharsRef((*results)[i].value())));
|
||||
}
|
||||
}
|
||||
schedule(cont);
|
||||
|
@ -148,4 +149,4 @@ private:
|
|||
|
||||
WorkloadFactory<ApiCorrectnessWorkload> ApiCorrectnessWorkloadFactory("ApiCorrectness");
|
||||
|
||||
} // namespace FdbApiTester
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -23,26 +23,26 @@
|
|||
namespace FdbApiTester {
|
||||
|
||||
// Get the value associated with a key
|
||||
std::optional<std::string> KeyValueStore::get(std::string_view key) const {
|
||||
std::optional<fdb::Value> KeyValueStore::get(fdb::KeyRef key) const {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
auto value = store.find(std::string(key));
|
||||
auto value = store.find(fdb::Key(key));
|
||||
if (value != store.end())
|
||||
return value->second;
|
||||
else
|
||||
return std::optional<std::string>();
|
||||
return std::optional<fdb::Value>();
|
||||
}
|
||||
|
||||
// Checks if the key exists
|
||||
bool KeyValueStore::exists(std::string_view key) {
|
||||
bool KeyValueStore::exists(fdb::KeyRef key) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
return (store.find(std::string(key)) != store.end());
|
||||
return (store.find(fdb::Key(key)) != store.end());
|
||||
}
|
||||
|
||||
// Returns the key designated by a key selector
|
||||
std::string KeyValueStore::getKey(std::string_view keyName, bool orEqual, int offset) const {
|
||||
fdb::Key KeyValueStore::getKey(fdb::KeyRef keyName, bool orEqual, int offset) const {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
// Begin by getting the start key referenced by the key selector
|
||||
std::map<std::string, std::string>::const_iterator mapItr = store.lower_bound(keyName);
|
||||
std::map<fdb::Key, fdb::Value>::const_iterator mapItr = store.lower_bound(keyName);
|
||||
|
||||
// Update the iterator position if necessary based on the value of orEqual
|
||||
int count = 0;
|
||||
|
@ -88,28 +88,25 @@ std::string KeyValueStore::getKey(std::string_view keyName, bool orEqual, int of
|
|||
}
|
||||
|
||||
// Gets a range of key-value pairs, returning a maximum of <limit> results
|
||||
std::vector<KeyValue> KeyValueStore::getRange(std::string_view begin,
|
||||
std::string_view end,
|
||||
int limit,
|
||||
bool reverse) const {
|
||||
std::vector<fdb::KeyValue> KeyValueStore::getRange(fdb::KeyRef begin, fdb::KeyRef end, int limit, bool reverse) const {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::vector<KeyValue> results;
|
||||
std::vector<fdb::KeyValue> results;
|
||||
if (!reverse) {
|
||||
std::map<std::string, std::string>::const_iterator mapItr = store.lower_bound(begin);
|
||||
std::map<fdb::Key, fdb::Value>::const_iterator mapItr = store.lower_bound(begin);
|
||||
|
||||
for (; mapItr != store.end() && mapItr->first < end && results.size() < limit; mapItr++)
|
||||
results.push_back(KeyValue{ mapItr->first, mapItr->second });
|
||||
results.push_back(fdb::KeyValue{ mapItr->first, mapItr->second });
|
||||
}
|
||||
|
||||
// Support for reverse getRange queries is supported, but not tested at this time. This is because reverse range
|
||||
// queries have been disallowed by the database at the API level
|
||||
else {
|
||||
std::map<std::string, std::string>::const_iterator mapItr = store.lower_bound(end);
|
||||
std::map<fdb::Key, fdb::Value>::const_iterator mapItr = store.lower_bound(end);
|
||||
if (mapItr == store.begin())
|
||||
return results;
|
||||
|
||||
for (--mapItr; mapItr->first >= begin && results.size() < abs(limit); mapItr--) {
|
||||
results.push_back(KeyValue{ mapItr->first, mapItr->second });
|
||||
results.push_back(fdb::KeyValue{ mapItr->first, mapItr->second });
|
||||
if (mapItr == store.begin())
|
||||
break;
|
||||
}
|
||||
|
@ -119,13 +116,13 @@ std::vector<KeyValue> KeyValueStore::getRange(std::string_view begin,
|
|||
}
|
||||
|
||||
// Stores a key-value pair in the database
|
||||
void KeyValueStore::set(std::string_view key, std::string_view value) {
|
||||
void KeyValueStore::set(fdb::KeyRef key, fdb::ValueRef value) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
store[std::string(key)] = value;
|
||||
store[fdb::Key(key)] = value;
|
||||
}
|
||||
|
||||
// Removes a key from the database
|
||||
void KeyValueStore::clear(std::string_view key) {
|
||||
void KeyValueStore::clear(fdb::KeyRef key) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
auto iter = store.find(key);
|
||||
if (iter != store.end()) {
|
||||
|
@ -134,7 +131,7 @@ void KeyValueStore::clear(std::string_view key) {
|
|||
}
|
||||
|
||||
// Removes a range of keys from the database
|
||||
void KeyValueStore::clear(std::string_view begin, std::string_view end) {
|
||||
void KeyValueStore::clear(fdb::KeyRef begin, fdb::KeyRef end) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
store.erase(store.lower_bound(begin), store.lower_bound(end));
|
||||
}
|
||||
|
@ -146,22 +143,22 @@ uint64_t KeyValueStore::size() const {
|
|||
}
|
||||
|
||||
// The first key in the database; returned by key selectors that choose a key off the front
|
||||
std::string KeyValueStore::startKey() const {
|
||||
return "";
|
||||
fdb::Key KeyValueStore::startKey() const {
|
||||
return fdb::Key();
|
||||
}
|
||||
|
||||
// The last key in the database; returned by key selectors that choose a key off the back
|
||||
std::string KeyValueStore::endKey() const {
|
||||
return "\xff";
|
||||
fdb::Key KeyValueStore::endKey() const {
|
||||
return fdb::Key(1, '\xff');
|
||||
}
|
||||
|
||||
// Debugging function that prints all key-value pairs
|
||||
void KeyValueStore::printContents() const {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
printf("Contents:\n");
|
||||
std::map<std::string, std::string>::const_iterator mapItr;
|
||||
std::map<fdb::Key, fdb::Value>::const_iterator mapItr;
|
||||
for (mapItr = store.begin(); mapItr != store.end(); mapItr++)
|
||||
printf("%s\n", mapItr->first.c_str());
|
||||
}
|
||||
|
||||
} // namespace FdbApiTester
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -37,44 +37,44 @@ namespace FdbApiTester {
|
|||
class KeyValueStore {
|
||||
public:
|
||||
// Get the value associated with a key
|
||||
std::optional<std::string> get(std::string_view key) const;
|
||||
std::optional<fdb::Value> get(fdb::KeyRef key) const;
|
||||
|
||||
// Checks if the key exists
|
||||
bool exists(std::string_view key);
|
||||
bool exists(fdb::KeyRef key);
|
||||
|
||||
// Returns the key designated by a key selector
|
||||
std::string getKey(std::string_view keyName, bool orEqual, int offset) const;
|
||||
fdb::Key getKey(fdb::KeyRef keyName, bool orEqual, int offset) const;
|
||||
|
||||
// Gets a range of key-value pairs, returning a maximum of <limit> results
|
||||
std::vector<KeyValue> getRange(std::string_view begin, std::string_view end, int limit, bool reverse) const;
|
||||
std::vector<fdb::KeyValue> getRange(fdb::KeyRef begin, fdb::KeyRef end, int limit, bool reverse) const;
|
||||
|
||||
// Stores a key-value pair in the database
|
||||
void set(std::string_view key, std::string_view value);
|
||||
void set(fdb::KeyRef key, fdb::ValueRef value);
|
||||
|
||||
// Removes a key from the database
|
||||
void clear(std::string_view key);
|
||||
void clear(fdb::KeyRef key);
|
||||
|
||||
// Removes a range of keys from the database
|
||||
void clear(std::string_view begin, std::string_view end);
|
||||
void clear(fdb::KeyRef begin, fdb::KeyRef end);
|
||||
|
||||
// The number of keys in the database
|
||||
uint64_t size() const;
|
||||
|
||||
// The first key in the database; returned by key selectors that choose a key off the front
|
||||
std::string startKey() const;
|
||||
fdb::Key startKey() const;
|
||||
|
||||
// The last key in the database; returned by key selectors that choose a key off the back
|
||||
std::string endKey() const;
|
||||
fdb::Key endKey() const;
|
||||
|
||||
// Debugging function that prints all key-value pairs
|
||||
void printContents() const;
|
||||
|
||||
private:
|
||||
// A map holding the key-value pairs
|
||||
std::map<std::string, std::string, std::less<>> store;
|
||||
std::map<fdb::Key, fdb::Value, std::less<>> store;
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "TesterUtil.h"
|
||||
#include "foundationdb/fdb_c_types.h"
|
||||
#include "test/apitester/TesterScheduler.h"
|
||||
#include "test/fdb_api.hpp"
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <unordered_map>
|
||||
|
@ -36,24 +37,24 @@ namespace FdbApiTester {
|
|||
constexpr int LONG_WAIT_TIME_US = 2000000;
|
||||
constexpr int LARGE_NUMBER_OF_RETRIES = 10;
|
||||
|
||||
void TransactionActorBase::complete(fdb_error_t err) {
|
||||
void TransactionActorBase::complete(fdb::Error err) {
|
||||
error = err;
|
||||
context = {};
|
||||
}
|
||||
|
||||
void ITransactionContext::continueAfterAll(std::vector<Future> futures, TTaskFct cont) {
|
||||
void ITransactionContext::continueAfterAll(std::vector<fdb::Future> futures, TTaskFct cont) {
|
||||
auto counter = std::make_shared<std::atomic<int>>(futures.size());
|
||||
auto errorCode = std::make_shared<std::atomic<fdb_error_t>>(error_code_success);
|
||||
auto errorCode = std::make_shared<std::atomic<fdb::Error>>(fdb::Error::success());
|
||||
auto thisPtr = shared_from_this();
|
||||
for (auto& f : futures) {
|
||||
continueAfter(
|
||||
f,
|
||||
[thisPtr, f, counter, errorCode, cont]() {
|
||||
if (f.getError() != error_code_success) {
|
||||
(*errorCode) = f.getError();
|
||||
if (f.error().code() != error_code_success) {
|
||||
(*errorCode) = f.error();
|
||||
}
|
||||
if (--(*counter) == 0) {
|
||||
if (*errorCode == error_code_success) {
|
||||
if (errorCode->load().code() == error_code_success) {
|
||||
// all futures successful -> continue
|
||||
cont();
|
||||
} else {
|
||||
|
@ -71,7 +72,7 @@ void ITransactionContext::continueAfterAll(std::vector<Future> futures, TTaskFct
|
|||
*/
|
||||
class TransactionContextBase : public ITransactionContext {
|
||||
public:
|
||||
TransactionContextBase(FDBTransaction* tx,
|
||||
TransactionContextBase(fdb::Transaction tx,
|
||||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
|
@ -84,10 +85,12 @@ public:
|
|||
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
|
||||
enum class TxState { IN_PROGRESS, ON_ERROR, DONE };
|
||||
|
||||
Transaction* tx() override { return &fdbTx; }
|
||||
fdb::Transaction* tx() override { return &fdbTx; }
|
||||
|
||||
// Set a continuation to be executed when a future gets ready
|
||||
void continueAfter(Future f, TTaskFct cont, bool retryOnError) override { doContinueAfter(f, cont, retryOnError); }
|
||||
void continueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
|
||||
doContinueAfter(f, cont, retryOnError);
|
||||
}
|
||||
|
||||
// Complete the transaction with a commit
|
||||
void commit() override {
|
||||
|
@ -97,7 +100,7 @@ public:
|
|||
}
|
||||
commitCalled = true;
|
||||
lock.unlock();
|
||||
Future f = fdbTx.commit();
|
||||
fdb::Future f = fdbTx.commit();
|
||||
auto thisRef = shared_from_this();
|
||||
doContinueAfter(
|
||||
f, [thisRef]() { thisRef->done(); }, true);
|
||||
|
@ -114,12 +117,12 @@ public:
|
|||
if (retriedErrors.size() >= LARGE_NUMBER_OF_RETRIES) {
|
||||
fmt::print("Transaction succeeded after {} retries on errors: {}\n",
|
||||
retriedErrors.size(),
|
||||
fmt::join(retriedErrors, ", "));
|
||||
fmt::join(retriedErrorCodes(), ", "));
|
||||
}
|
||||
// cancel transaction so that any pending operations on it
|
||||
// fail gracefully
|
||||
fdbTx.cancel();
|
||||
txActor->complete(error_code_success);
|
||||
txActor->complete(fdb::Error::success());
|
||||
cleanUp();
|
||||
contAfterDone();
|
||||
}
|
||||
|
@ -127,7 +130,7 @@ public:
|
|||
std::string getBGBasePath() override { return bgBasePath; }
|
||||
|
||||
protected:
|
||||
virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
|
||||
virtual void doContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) = 0;
|
||||
|
||||
// Clean up transaction state after completing the transaction
|
||||
// Note that the object may live longer, because it is referenced
|
||||
|
@ -139,8 +142,8 @@ protected:
|
|||
}
|
||||
|
||||
// Complete the transaction with an (unretriable) error
|
||||
void transactionFailed(fdb_error_t err) {
|
||||
ASSERT(err != error_code_success);
|
||||
void transactionFailed(fdb::Error err) {
|
||||
ASSERT(err);
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (txState == TxState::DONE) {
|
||||
return;
|
||||
|
@ -155,7 +158,7 @@ protected:
|
|||
// Handle result of an a transaction onError call
|
||||
void handleOnErrorResult() {
|
||||
ASSERT(txState == TxState::ON_ERROR);
|
||||
fdb_error_t err = onErrorFuture.getError();
|
||||
fdb::Error err = onErrorFuture.error();
|
||||
onErrorFuture = {};
|
||||
if (err) {
|
||||
transactionFailed(err);
|
||||
|
@ -169,24 +172,32 @@ protected:
|
|||
}
|
||||
|
||||
// Checks if a transaction can be retried. Fails the transaction if the check fails
|
||||
bool canRetry(fdb_error_t lastErr) {
|
||||
bool canRetry(fdb::Error lastErr) {
|
||||
ASSERT(txState == TxState::ON_ERROR);
|
||||
retriedErrors.push_back(lastErr);
|
||||
if (retryLimit == 0 || retriedErrors.size() <= retryLimit) {
|
||||
if (retriedErrors.size() == LARGE_NUMBER_OF_RETRIES) {
|
||||
fmt::print("Transaction already retried {} times, on errors: {}\n",
|
||||
retriedErrors.size(),
|
||||
fmt::join(retriedErrors, ", "));
|
||||
fmt::join(retriedErrorCodes(), ", "));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
fmt::print("Transaction retry limit reached. Retried on errors: {}\n", fmt::join(retriedErrors, ", "));
|
||||
fmt::print("Transaction retry limit reached. Retried on errors: {}\n", fmt::join(retriedErrorCodes(), ", "));
|
||||
transactionFailed(lastErr);
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<fdb::Error::CodeType> retriedErrorCodes() {
|
||||
std::vector<fdb::Error::CodeType> retriedErrorCodes;
|
||||
for (auto e : retriedErrors) {
|
||||
retriedErrorCodes.push_back(e.code());
|
||||
}
|
||||
return retriedErrorCodes;
|
||||
}
|
||||
|
||||
// FDB transaction
|
||||
Transaction fdbTx;
|
||||
fdb::Transaction fdbTx;
|
||||
|
||||
// Actor implementing the transaction worklflow
|
||||
std::shared_ptr<ITransactionActor> txActor;
|
||||
|
@ -207,10 +218,10 @@ protected:
|
|||
TxState txState;
|
||||
|
||||
// onError future used in ON_ERROR state
|
||||
Future onErrorFuture;
|
||||
fdb::Future onErrorFuture;
|
||||
|
||||
// The error code on which onError was called
|
||||
fdb_error_t onErrorArg;
|
||||
fdb::Error onErrorArg;
|
||||
|
||||
// The time point of calling onError
|
||||
TimePoint onErrorCallTimePoint;
|
||||
|
@ -219,7 +230,7 @@ protected:
|
|||
bool commitCalled;
|
||||
|
||||
// A history of errors on which the transaction was retried
|
||||
std::vector<fdb_error_t> retriedErrors;
|
||||
std::vector<fdb::Error> retriedErrors;
|
||||
|
||||
// blob granule base path
|
||||
std::string bgBasePath;
|
||||
|
@ -230,7 +241,7 @@ protected:
|
|||
*/
|
||||
class BlockingTransactionContext : public TransactionContextBase {
|
||||
public:
|
||||
BlockingTransactionContext(FDBTransaction* tx,
|
||||
BlockingTransactionContext(fdb::Transaction tx,
|
||||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
|
@ -239,37 +250,37 @@ public:
|
|||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
|
||||
|
||||
protected:
|
||||
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
|
||||
void doContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
|
||||
auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this());
|
||||
scheduler->schedule(
|
||||
[thisRef, f, cont, retryOnError]() mutable { thisRef->blockingContinueAfter(f, cont, retryOnError); });
|
||||
}
|
||||
|
||||
void blockingContinueAfter(Future f, TTaskFct cont, bool retryOnError) {
|
||||
void blockingContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (txState != TxState::IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
lock.unlock();
|
||||
auto start = timeNow();
|
||||
fdb_error_t err = fdb_future_block_until_ready(f.fdbFuture());
|
||||
fdb::Error err = f.blockUntilReady();
|
||||
if (err) {
|
||||
transactionFailed(err);
|
||||
return;
|
||||
}
|
||||
err = f.getError();
|
||||
err = f.error();
|
||||
auto waitTimeUs = timeElapsedInUs(start);
|
||||
if (waitTimeUs > LONG_WAIT_TIME_US) {
|
||||
fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({}), commit called: {}\n",
|
||||
microsecToSec(waitTimeUs),
|
||||
err,
|
||||
fdb_get_error(err),
|
||||
err.code(),
|
||||
err.what(),
|
||||
commitCalled);
|
||||
}
|
||||
if (err == error_code_transaction_cancelled) {
|
||||
if (err.code() == error_code_transaction_cancelled) {
|
||||
return;
|
||||
}
|
||||
if (err == error_code_success || !retryOnError) {
|
||||
if (err.code() == error_code_success || !retryOnError) {
|
||||
scheduler->schedule([cont]() { cont(); });
|
||||
return;
|
||||
}
|
||||
|
@ -277,7 +288,7 @@ protected:
|
|||
onError(err);
|
||||
}
|
||||
|
||||
virtual void onError(fdb_error_t err) override {
|
||||
virtual void onError(fdb::Error err) override {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (txState != TxState::IN_PROGRESS) {
|
||||
// Ignore further errors, if the transaction is in the error handing mode or completed
|
||||
|
@ -295,19 +306,19 @@ protected:
|
|||
onErrorArg = err;
|
||||
|
||||
auto start = timeNow();
|
||||
fdb_error_t err2 = fdb_future_block_until_ready(onErrorFuture.fdbFuture());
|
||||
fdb::Error err2 = onErrorFuture.blockUntilReady();
|
||||
if (err2) {
|
||||
transactionFailed(err2);
|
||||
return;
|
||||
}
|
||||
auto waitTimeUs = timeElapsedInUs(start);
|
||||
if (waitTimeUs > LONG_WAIT_TIME_US) {
|
||||
fdb_error_t err3 = onErrorFuture.getError();
|
||||
fdb::Error err3 = onErrorFuture.error();
|
||||
fmt::print("Long waiting time on onError({}) future: {:.3f}s, return code {} ({})\n",
|
||||
onErrorArg,
|
||||
onErrorArg.code(),
|
||||
microsecToSec(waitTimeUs),
|
||||
err3,
|
||||
fdb_get_error(err3));
|
||||
err3.code(),
|
||||
err3.what());
|
||||
}
|
||||
auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this());
|
||||
scheduler->schedule([thisRef]() { thisRef->handleOnErrorResult(); });
|
||||
|
@ -319,7 +330,7 @@ protected:
|
|||
*/
|
||||
class AsyncTransactionContext : public TransactionContextBase {
|
||||
public:
|
||||
AsyncTransactionContext(FDBTransaction* tx,
|
||||
AsyncTransactionContext(fdb::Transaction tx,
|
||||
std::shared_ptr<ITransactionActor> txActor,
|
||||
TTaskFct cont,
|
||||
IScheduler* scheduler,
|
||||
|
@ -328,23 +339,24 @@ public:
|
|||
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
|
||||
|
||||
protected:
|
||||
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
|
||||
void doContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (txState != TxState::IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow() };
|
||||
callbackMap[f] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow() };
|
||||
lock.unlock();
|
||||
fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this);
|
||||
if (err) {
|
||||
try {
|
||||
f.then([this](fdb::Future f) { futureReadyCallback(f, this); });
|
||||
} catch (std::runtime_error& err) {
|
||||
lock.lock();
|
||||
callbackMap.erase(f.fdbFuture());
|
||||
callbackMap.erase(f);
|
||||
lock.unlock();
|
||||
transactionFailed(err);
|
||||
transactionFailed(fdb::Error(error_code_operation_failed));
|
||||
}
|
||||
}
|
||||
|
||||
static void futureReadyCallback(FDBFuture* f, void* param) {
|
||||
static void futureReadyCallback(fdb::Future f, void* param) {
|
||||
try {
|
||||
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
|
||||
txCtx->onFutureReady(f);
|
||||
|
@ -357,7 +369,7 @@ protected:
|
|||
}
|
||||
}
|
||||
|
||||
void onFutureReady(FDBFuture* f) {
|
||||
void onFutureReady(fdb::Future f) {
|
||||
auto endTime = timeNow();
|
||||
injectRandomSleep();
|
||||
// Hold a reference to this to avoid it to be
|
||||
|
@ -372,25 +384,25 @@ protected:
|
|||
return;
|
||||
}
|
||||
lock.unlock();
|
||||
fdb_error_t err = fdb_future_get_error(f);
|
||||
fdb::Error err = f.error();
|
||||
auto waitTimeUs = timeElapsedInUs(cbInfo.startTime, endTime);
|
||||
if (waitTimeUs > LONG_WAIT_TIME_US) {
|
||||
fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({})\n",
|
||||
microsecToSec(waitTimeUs),
|
||||
err,
|
||||
fdb_get_error(err));
|
||||
err.code(),
|
||||
err.what());
|
||||
}
|
||||
if (err == error_code_transaction_cancelled) {
|
||||
if (err.code() == error_code_transaction_cancelled) {
|
||||
return;
|
||||
}
|
||||
if (err == error_code_success || !cbInfo.retryOnError) {
|
||||
if (err.code() == error_code_success || !cbInfo.retryOnError) {
|
||||
scheduler->schedule(cbInfo.cont);
|
||||
return;
|
||||
}
|
||||
onError(err);
|
||||
}
|
||||
|
||||
virtual void onError(fdb_error_t err) override {
|
||||
virtual void onError(fdb::Error err) override {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (txState != TxState::IN_PROGRESS) {
|
||||
// Ignore further errors, if the transaction is in the error handing mode or completed
|
||||
|
@ -408,14 +420,15 @@ protected:
|
|||
onErrorFuture = tx()->onError(err);
|
||||
onErrorCallTimePoint = timeNow();
|
||||
onErrorThisRef = std::static_pointer_cast<AsyncTransactionContext>(shared_from_this());
|
||||
fdb_error_t err2 = fdb_future_set_callback(onErrorFuture.fdbFuture(), onErrorReadyCallback, this);
|
||||
if (err2) {
|
||||
try {
|
||||
onErrorFuture.then([this](fdb::Future f) { onErrorReadyCallback(f, this); });
|
||||
} catch (...) {
|
||||
onErrorFuture = {};
|
||||
transactionFailed(err2);
|
||||
transactionFailed(fdb::Error(error_code_operation_failed));
|
||||
}
|
||||
}
|
||||
|
||||
static void onErrorReadyCallback(FDBFuture* f, void* param) {
|
||||
static void onErrorReadyCallback(fdb::Future f, void* param) {
|
||||
try {
|
||||
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
|
||||
txCtx->onErrorReady(f);
|
||||
|
@ -428,15 +441,15 @@ protected:
|
|||
}
|
||||
}
|
||||
|
||||
void onErrorReady(FDBFuture* f) {
|
||||
void onErrorReady(fdb::Future f) {
|
||||
auto waitTimeUs = timeElapsedInUs(onErrorCallTimePoint);
|
||||
if (waitTimeUs > LONG_WAIT_TIME_US) {
|
||||
fdb_error_t err = onErrorFuture.getError();
|
||||
fdb::Error err = onErrorFuture.error();
|
||||
fmt::print("Long waiting time on onError({}): {:.3f}s, return code {} ({})\n",
|
||||
onErrorArg,
|
||||
onErrorArg.code(),
|
||||
microsecToSec(waitTimeUs),
|
||||
err,
|
||||
fdb_get_error(err));
|
||||
err.code(),
|
||||
err.what());
|
||||
}
|
||||
injectRandomSleep();
|
||||
auto thisRef = onErrorThisRef;
|
||||
|
@ -450,7 +463,7 @@ protected:
|
|||
// Cancel all pending operations
|
||||
// Note that the callbacks of the cancelled futures will still be called
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::vector<Future> futures;
|
||||
std::vector<fdb::Future> futures;
|
||||
for (auto& iter : callbackMap) {
|
||||
futures.push_back(iter.second.future);
|
||||
}
|
||||
|
@ -469,7 +482,7 @@ protected:
|
|||
|
||||
// Object references for a future callback
|
||||
struct CallbackInfo {
|
||||
Future future;
|
||||
fdb::Future future;
|
||||
TTaskFct cont;
|
||||
std::shared_ptr<ITransactionContext> thisRef;
|
||||
bool retryOnError;
|
||||
|
@ -477,7 +490,7 @@ protected:
|
|||
};
|
||||
|
||||
// Map for keeping track of future waits and holding necessary object references
|
||||
std::unordered_map<FDBFuture*, CallbackInfo> callbackMap;
|
||||
std::unordered_map<fdb::Future, CallbackInfo, fdb::FutureHash, fdb::FutureEquals> callbackMap;
|
||||
|
||||
// Holding reference to this for onError future C callback
|
||||
std::shared_ptr<AsyncTransactionContext> onErrorThisRef;
|
||||
|
@ -498,13 +511,9 @@ public:
|
|||
|
||||
protected:
|
||||
// Execute the transaction on the given database instance
|
||||
void executeOnDatabase(FDBDatabase* db, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) {
|
||||
FDBTransaction* tx;
|
||||
fdb_error_t err = fdb_database_create_transaction(db, &tx);
|
||||
if (err != error_code_success) {
|
||||
txActor->complete(err);
|
||||
cont();
|
||||
} else {
|
||||
void executeOnDatabase(fdb::Database db, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) {
|
||||
try {
|
||||
fdb::Transaction tx = db.createTransaction();
|
||||
std::shared_ptr<ITransactionContext> ctx;
|
||||
if (options.blockOnFutures) {
|
||||
ctx = std::make_shared<BlockingTransactionContext>(
|
||||
|
@ -515,6 +524,9 @@ protected:
|
|||
}
|
||||
txActor->init(ctx);
|
||||
txActor->start();
|
||||
} catch (...) {
|
||||
txActor->complete(fdb::Error(error_code_operation_failed));
|
||||
cont();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -537,14 +549,7 @@ public:
|
|||
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
|
||||
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
|
||||
for (int i = 0; i < options.numDatabases; i++) {
|
||||
FDBDatabase* db;
|
||||
fdb_error_t err = fdb_create_database(clusterFile, &db);
|
||||
if (err != error_code_success) {
|
||||
throw TesterError(fmt::format("Failed create database with the cluster file '{}'. Error: {}({})",
|
||||
clusterFile,
|
||||
err,
|
||||
fdb_get_error(err)));
|
||||
}
|
||||
fdb::Database db(clusterFile);
|
||||
databases.push_back(db);
|
||||
}
|
||||
}
|
||||
|
@ -554,14 +559,10 @@ public:
|
|||
executeOnDatabase(databases[idx], txActor, cont);
|
||||
}
|
||||
|
||||
void release() {
|
||||
for (FDBDatabase* db : databases) {
|
||||
fdb_database_destroy(db);
|
||||
}
|
||||
}
|
||||
void release() { databases.clear(); }
|
||||
|
||||
private:
|
||||
std::vector<FDBDatabase*> databases;
|
||||
std::vector<fdb::Database> databases;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -572,16 +573,8 @@ public:
|
|||
DBPerTransactionExecutor(const TransactionExecutorOptions& options) : TransactionExecutorBase(options) {}
|
||||
|
||||
void execute(std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) override {
|
||||
FDBDatabase* db = nullptr;
|
||||
fdb_error_t err = fdb_create_database(clusterFile.c_str(), &db);
|
||||
if (err != error_code_success) {
|
||||
txActor->complete(err);
|
||||
cont();
|
||||
}
|
||||
executeOnDatabase(db, txActor, [cont, db]() {
|
||||
fdb_database_destroy(db);
|
||||
cont();
|
||||
});
|
||||
fdb::Database db(clusterFile.c_str());
|
||||
executeOnDatabase(db, txActor, cont);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -593,4 +586,4 @@ std::unique_ptr<ITransactionExecutor> createTransactionExecutor(const Transactio
|
|||
}
|
||||
}
|
||||
|
||||
} // namespace FdbApiTester
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -23,8 +23,8 @@
|
|||
#ifndef APITESTER_TRANSACTION_EXECUTOR_H
|
||||
#define APITESTER_TRANSACTION_EXECUTOR_H
|
||||
|
||||
#include "test/fdb_api.hpp"
|
||||
#include "TesterOptions.h"
|
||||
#include "TesterApiWrapper.h"
|
||||
#include "TesterScheduler.h"
|
||||
#include <string_view>
|
||||
#include <memory>
|
||||
|
@ -39,18 +39,18 @@ public:
|
|||
virtual ~ITransactionContext() {}
|
||||
|
||||
// Current FDB transaction
|
||||
virtual Transaction* tx() = 0;
|
||||
virtual fdb::Transaction* tx() = 0;
|
||||
|
||||
// Schedule a continuation to be executed when the future gets ready
|
||||
// retryOnError controls whether transaction is retried in case of an error instead
|
||||
// of calling the continuation
|
||||
virtual void continueAfter(Future f, TTaskFct cont, bool retryOnError = true) = 0;
|
||||
virtual void continueAfter(fdb::Future f, TTaskFct cont, bool retryOnError = true) = 0;
|
||||
|
||||
// Complete the transaction with a commit
|
||||
virtual void commit() = 0;
|
||||
|
||||
// retry transaction on error
|
||||
virtual void onError(fdb_error_t err) = 0;
|
||||
virtual void onError(fdb::Error err) = 0;
|
||||
|
||||
// Mark the transaction as completed without committing it (for read transactions)
|
||||
virtual void done() = 0;
|
||||
|
@ -59,7 +59,7 @@ public:
|
|||
virtual std::string getBGBasePath() = 0;
|
||||
|
||||
// A continuation to be executed when all of the given futures get ready
|
||||
virtual void continueAfterAll(std::vector<Future> futures, TTaskFct cont);
|
||||
virtual void continueAfterAll(std::vector<fdb::Future> futures, TTaskFct cont);
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -76,10 +76,10 @@ public:
|
|||
virtual void start() = 0;
|
||||
|
||||
// Transaction completion result (error_code_success in case of success)
|
||||
virtual fdb_error_t getErrorCode() = 0;
|
||||
virtual fdb::Error getError() = 0;
|
||||
|
||||
// Notification about the completion of the transaction
|
||||
virtual void complete(fdb_error_t err) = 0;
|
||||
virtual void complete(fdb::Error err) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -88,15 +88,15 @@ public:
|
|||
class TransactionActorBase : public ITransactionActor {
|
||||
public:
|
||||
void init(std::shared_ptr<ITransactionContext> ctx) override { context = ctx; }
|
||||
fdb_error_t getErrorCode() override { return error; }
|
||||
void complete(fdb_error_t err) override;
|
||||
fdb::Error getError() override { return error; }
|
||||
void complete(fdb::Error err) override;
|
||||
|
||||
protected:
|
||||
std::shared_ptr<ITransactionContext> ctx() { return context; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<ITransactionContext> context;
|
||||
fdb_error_t error = error_code_success;
|
||||
fdb::Error error = fdb::Error::success();
|
||||
};
|
||||
|
||||
// Type of the lambda functions implementing a transaction
|
||||
|
@ -148,4 +148,4 @@ std::unique_ptr<ITransactionExecutor> createTransactionExecutor(const Transactio
|
|||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -26,8 +26,8 @@
|
|||
|
||||
namespace FdbApiTester {
|
||||
|
||||
std::string lowerCase(const std::string& str) {
|
||||
std::string res = str;
|
||||
fdb::ByteString lowerCase(fdb::BytesRef str) {
|
||||
fdb::ByteString res(str);
|
||||
std::transform(res.begin(), res.end(), res.begin(), ::tolower);
|
||||
return res;
|
||||
}
|
||||
|
@ -46,9 +46,9 @@ Random& Random::get() {
|
|||
return random;
|
||||
}
|
||||
|
||||
std::string Random::randomStringLowerCase(int minLength, int maxLength) {
|
||||
fdb::ByteString Random::randomStringLowerCase(int minLength, int maxLength) {
|
||||
int length = randomInt(minLength, maxLength);
|
||||
std::string str;
|
||||
fdb::ByteString str;
|
||||
str.reserve(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
str += (char)randomInt('a', 'z');
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
#include <fmt/format.h>
|
||||
#include <chrono>
|
||||
|
||||
#include "test/fdb_api.hpp"
|
||||
|
||||
namespace fmt {
|
||||
|
||||
// fmt::format formatting for std::optional<T>
|
||||
|
@ -49,12 +51,7 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
|
|||
|
||||
namespace FdbApiTester {
|
||||
|
||||
struct KeyValue {
|
||||
std::string key;
|
||||
std::string value;
|
||||
};
|
||||
|
||||
std::string lowerCase(const std::string& str);
|
||||
fdb::ByteString lowerCase(fdb::BytesRef str);
|
||||
|
||||
class Random {
|
||||
public:
|
||||
|
@ -64,7 +61,7 @@ public:
|
|||
|
||||
int randomInt(int min, int max);
|
||||
|
||||
std::string randomStringLowerCase(int minLength, int maxLength);
|
||||
fdb::ByteString randomStringLowerCase(int minLength, int maxLength);
|
||||
|
||||
bool randomBool(double trueRatio);
|
||||
|
||||
|
@ -112,4 +109,4 @@ static inline double microsecToSec(TimeDuration timeUs) {
|
|||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -66,7 +66,7 @@ bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) con
|
|||
if (iter == options.end()) {
|
||||
return defaultVal;
|
||||
} else {
|
||||
std::string val = lowerCase(iter->second);
|
||||
std::string val(fdb::toCharsRef(lowerCase(fdb::toBytesRef(iter->second))));
|
||||
if (val == "true") {
|
||||
return true;
|
||||
} else if (val == "false") {
|
||||
|
@ -111,11 +111,11 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF
|
|||
tasksScheduled++;
|
||||
manager->txExecutor->execute(tx, [this, tx, cont, failOnError]() {
|
||||
numTxCompleted++;
|
||||
fdb_error_t err = tx->getErrorCode();
|
||||
if (tx->getErrorCode() == error_code_success) {
|
||||
fdb::Error err = tx->getError();
|
||||
if (err.code() == error_code_success) {
|
||||
cont();
|
||||
} else {
|
||||
std::string msg = fmt::format("Transaction failed with error: {} ({})", err, fdb_get_error(err));
|
||||
std::string msg = fmt::format("Transaction failed with error: {} ({})", err.code(), err.what());
|
||||
if (failOnError) {
|
||||
error(msg);
|
||||
failed = true;
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
#include "TesterTestSpec.h"
|
||||
#include "TesterUtil.h"
|
||||
#include "flow/SimpleOpt.h"
|
||||
#include "bindings/c/foundationdb/fdb_c.h"
|
||||
#include "test/fdb_api.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
|
@ -270,27 +270,26 @@ bool parseArgs(TesterOptions& options, int argc, char** argv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void fdb_check(fdb_error_t e) {
|
||||
void fdb_check(fdb::native::fdb_error_t e) {
|
||||
if (e) {
|
||||
fmt::print(stderr, "Unexpected FDB error: {}({})\n", e, fdb_get_error(e));
|
||||
fmt::print(stderr, "Unexpected FDB error: {}({})\n", e, fdb::native::fdb_get_error(e));
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
void applyNetworkOptions(TesterOptions& options) {
|
||||
if (!options.tmpDir.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_TMP_DIR, options.tmpDir));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_TMP_DIR, options.tmpDir);
|
||||
}
|
||||
if (!options.externalClientLibrary.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
|
||||
fdb_check(
|
||||
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY, options.externalClientLibrary));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT);
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY,
|
||||
options.externalClientLibrary);
|
||||
} else if (!options.externalClientDir.empty()) {
|
||||
if (options.disableLocalClient) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT);
|
||||
}
|
||||
fdb_check(
|
||||
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY, options.externalClientDir));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY, options.externalClientDir);
|
||||
} else {
|
||||
if (options.disableLocalClient) {
|
||||
throw TesterError("Invalid options: Cannot disable local client if no external library is provided");
|
||||
|
@ -298,39 +297,38 @@ void applyNetworkOptions(TesterOptions& options) {
|
|||
}
|
||||
|
||||
if (options.testSpec.multiThreaded) {
|
||||
fdb_check(
|
||||
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION, options.numFdbThreads));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION, options.numFdbThreads);
|
||||
}
|
||||
|
||||
if (options.testSpec.fdbCallbacksOnExternalThreads) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CALLBACKS_ON_EXTERNAL_THREADS));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CALLBACKS_ON_EXTERNAL_THREADS);
|
||||
}
|
||||
|
||||
if (options.testSpec.buggify) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_BUGGIFY_ENABLE));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_BUGGIFY_ENABLE);
|
||||
}
|
||||
|
||||
if (options.trace) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_ENABLE, options.traceDir));
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_FORMAT, options.traceFormat));
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_LOG_GROUP, options.logGroup));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_ENABLE, options.traceDir);
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_FORMAT, options.traceFormat);
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TRACE_LOG_GROUP, options.logGroup);
|
||||
}
|
||||
|
||||
for (auto knob : options.knobs) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_KNOB,
|
||||
fmt::format("{}={}", knob.first.c_str(), knob.second.c_str())));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_KNOB,
|
||||
fmt::format("{}={}", knob.first.c_str(), knob.second.c_str()));
|
||||
}
|
||||
|
||||
if (!options.tlsCertFile.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_CERT_PATH, options.tlsCertFile));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_CERT_PATH, options.tlsCertFile);
|
||||
}
|
||||
|
||||
if (!options.tlsKeyFile.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_KEY_PATH, options.tlsKeyFile));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_KEY_PATH, options.tlsKeyFile);
|
||||
}
|
||||
|
||||
if (!options.tlsCaFile.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_CA_PATH, options.tlsCaFile));
|
||||
fdb::network::setOption(FDBNetworkOption::FDB_NET_OPTION_TLS_CA_PATH, options.tlsCaFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -400,17 +398,17 @@ int main(int argc, char** argv) {
|
|||
}
|
||||
randomizeOptions(options);
|
||||
|
||||
fdb_check(fdb_select_api_version(options.apiVersion));
|
||||
fdb_check(fdb::native::fdb_select_api_version(options.apiVersion));
|
||||
applyNetworkOptions(options);
|
||||
fdb_check(fdb_setup_network());
|
||||
fdb_check(fdb::native::fdb_setup_network());
|
||||
|
||||
std::thread network_thread{ &fdb_run_network };
|
||||
std::thread network_thread{ &fdb::native::fdb_run_network };
|
||||
|
||||
if (!runWorkloads(options)) {
|
||||
retCode = 1;
|
||||
}
|
||||
|
||||
fdb_check(fdb_stop_network());
|
||||
fdb_check(fdb::native::fdb_stop_network());
|
||||
network_thread.join();
|
||||
} catch (const std::runtime_error& err) {
|
||||
fmt::print(stderr, "ERROR: {}\n", err.what());
|
||||
|
|
|
@ -29,14 +29,21 @@
|
|||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
#include <fmt/format.h>
|
||||
|
||||
// introduce the option enums
|
||||
#include <fdb_c_options.g.h>
|
||||
|
||||
#undef ERROR
|
||||
#define ERROR(name, number, description) enum { error_code_##name = number };
|
||||
|
||||
#include "flow/error_definitions.h"
|
||||
|
||||
namespace fdb {
|
||||
|
||||
// hide C API to discourage mixing C/C++ API
|
||||
|
@ -47,9 +54,20 @@ namespace native {
|
|||
using ByteString = std::basic_string<uint8_t>;
|
||||
using BytesRef = std::basic_string_view<uint8_t>;
|
||||
using CharsRef = std::string_view;
|
||||
using Key = ByteString;
|
||||
using KeyRef = BytesRef;
|
||||
using Value = ByteString;
|
||||
using ValueRef = BytesRef;
|
||||
|
||||
struct KeyValue {
|
||||
Key key;
|
||||
Value value;
|
||||
};
|
||||
struct KeyRange {
|
||||
Key beginKey;
|
||||
Key endKey;
|
||||
};
|
||||
|
||||
inline uint8_t const* toBytePtr(char const* ptr) noexcept {
|
||||
return reinterpret_cast<uint8_t const*>(ptr);
|
||||
}
|
||||
|
@ -96,6 +114,8 @@ public:
|
|||
|
||||
bool retryable() const noexcept { return native::fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, err) != 0; }
|
||||
|
||||
static Error success() { return Error(error_code_success); }
|
||||
|
||||
private:
|
||||
CodeType err;
|
||||
};
|
||||
|
@ -113,14 +133,24 @@ struct Int64 {
|
|||
return Error(native::fdb_future_get_int64(f, &out));
|
||||
}
|
||||
};
|
||||
struct Key {
|
||||
struct NativeKey {
|
||||
using Type = std::pair<uint8_t const*, int>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
auto& [out_key, out_key_length] = out;
|
||||
return Error(native::fdb_future_get_key(f, &out_key, &out_key_length));
|
||||
}
|
||||
};
|
||||
struct Value {
|
||||
struct Key {
|
||||
using Type = fdb::Key;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
NativeKey::Type native_out{};
|
||||
auto err = NativeKey::extract(f, native_out);
|
||||
auto& [out_key, out_key_length] = native_out;
|
||||
out = fdb::Key(out_key, out_key_length);
|
||||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct NativeValue {
|
||||
using Type = std::tuple<bool, uint8_t const*, int>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
auto& [out_present, out_value, out_value_length] = out;
|
||||
|
@ -130,6 +160,16 @@ struct Value {
|
|||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct OptionalValue {
|
||||
using Type = std::optional<fdb::Value>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
NativeValue::Type native_out{};
|
||||
auto err = NativeValue::extract(f, native_out);
|
||||
auto& [out_present, out_value, out_value_length] = native_out;
|
||||
out = out_present ? std::make_optional(fdb::Value(out_value, out_value_length)) : std::nullopt;
|
||||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct StringArray {
|
||||
using Type = std::pair<const char**, int>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
|
@ -137,7 +177,7 @@ struct StringArray {
|
|||
return Error(native::fdb_future_get_string_array(f, &out_strings, &out_count));
|
||||
}
|
||||
};
|
||||
struct KeyValueArray {
|
||||
struct NativeKeyValueArray {
|
||||
using Type = std::tuple<native::FDBKeyValue const*, int, bool>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
auto& [out_kv, out_count, out_more] = out;
|
||||
|
@ -147,6 +187,52 @@ struct KeyValueArray {
|
|||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct KeyValueArray {
|
||||
using Type = std::pair<std::vector<KeyValue>, bool>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
NativeKeyValueArray::Type native_out{};
|
||||
auto err = NativeKeyValueArray::extract(f, native_out);
|
||||
auto [kvs, count, more] = native_out;
|
||||
|
||||
auto& [out_kv, out_more] = out;
|
||||
out_more = more;
|
||||
out_kv.clear();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
fdb::native::FDBKeyValue nativeKv = *kvs++;
|
||||
KeyValue kv;
|
||||
kv.key = fdb::Key(nativeKv.key, nativeKv.key_length);
|
||||
kv.value = fdb::Value(nativeKv.value, nativeKv.value_length);
|
||||
out_kv.push_back(kv);
|
||||
}
|
||||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct NativeKeyRangeArray {
|
||||
using Type = std::tuple<native::FDBKeyRange const*, int>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
auto& [out_kv, out_count] = out;
|
||||
auto err = native::fdb_future_get_keyrange_array(f, &out_kv, &out_count);
|
||||
return Error(err);
|
||||
}
|
||||
};
|
||||
struct KeyRangeArray {
|
||||
using Type = std::vector<KeyRange>;
|
||||
static Error extract(native::FDBFuture* f, Type& out) noexcept {
|
||||
NativeKeyRangeArray::Type native_out{};
|
||||
auto err = NativeKeyRangeArray::extract(f, native_out);
|
||||
auto [ranges, count] = native_out;
|
||||
out.clear();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
fdb::native::FDBKeyRange nativeKr = *ranges++;
|
||||
KeyRange range;
|
||||
range.beginKey = fdb::Key(nativeKr.begin_key, nativeKr.begin_key_length);
|
||||
range.endKey = fdb::Key(nativeKr.end_key, nativeKr.end_key_length);
|
||||
out.push_back(range);
|
||||
}
|
||||
return Error(err);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace future_var
|
||||
|
||||
[[noreturn]] inline void throwError(std::string_view preamble, Error err) {
|
||||
|
@ -175,11 +261,19 @@ inline Error setOptionNothrow(FDBNetworkOption option, BytesRef str) noexcept {
|
|||
return Error(native::fdb_network_set_option(option, str.data(), intSize(str)));
|
||||
}
|
||||
|
||||
inline Error setOptionNothrow(FDBNetworkOption option, CharsRef str) noexcept {
|
||||
return setOptionNothrow(option, toBytesRef(str));
|
||||
}
|
||||
|
||||
inline Error setOptionNothrow(FDBNetworkOption option, int64_t value) noexcept {
|
||||
return Error(native::fdb_network_set_option(
|
||||
option, reinterpret_cast<const uint8_t*>(&value), static_cast<int>(sizeof(value))));
|
||||
}
|
||||
|
||||
inline Error setOptionNothrow(FDBNetworkOption option) noexcept {
|
||||
return setOptionNothrow(option, "");
|
||||
}
|
||||
|
||||
inline void setOption(FDBNetworkOption option, BytesRef str) {
|
||||
if (auto err = setOptionNothrow(option, str)) {
|
||||
throwError(fmt::format("ERROR: fdb_network_set_option({}): ",
|
||||
|
@ -188,6 +282,14 @@ inline void setOption(FDBNetworkOption option, BytesRef str) {
|
|||
}
|
||||
}
|
||||
|
||||
inline void setOption(FDBNetworkOption option, CharsRef str) {
|
||||
if (auto err = setOptionNothrow(option, str)) {
|
||||
throwError(fmt::format("ERROR: fdb_network_set_option({}): ",
|
||||
static_cast<std::underlying_type_t<FDBNetworkOption>>(option)),
|
||||
err);
|
||||
}
|
||||
}
|
||||
|
||||
inline void setOption(FDBNetworkOption option, int64_t value) {
|
||||
if (auto err = setOptionNothrow(option, value)) {
|
||||
throwError(fmt::format("ERROR: fdb_network_set_option({}, {}): ",
|
||||
|
@ -197,6 +299,14 @@ inline void setOption(FDBNetworkOption option, int64_t value) {
|
|||
}
|
||||
}
|
||||
|
||||
inline void setOption(FDBNetworkOption option) {
|
||||
if (auto err = setOptionNothrow(option)) {
|
||||
throwError(fmt::format("ERROR: fdb_network_set_option({}): ",
|
||||
static_cast<std::underlying_type_t<FDBNetworkOption>>(option)),
|
||||
err);
|
||||
}
|
||||
}
|
||||
|
||||
inline Error setupNothrow() noexcept {
|
||||
return Error(native::fdb_setup_network());
|
||||
}
|
||||
|
@ -229,9 +339,9 @@ class Result {
|
|||
}
|
||||
|
||||
public:
|
||||
using KeyValueArray = future_var::KeyValueArray::Type;
|
||||
using NativeKeyValueArray = future_var::NativeKeyValueArray::Type;
|
||||
|
||||
Error getKeyValueArrayNothrow(KeyValueArray& out) const noexcept {
|
||||
Error getKeyValueArrayNothrow(NativeKeyValueArray& out) const noexcept {
|
||||
auto out_more_native = native::fdb_bool_t{};
|
||||
auto& [out_kv, out_count, out_more] = out;
|
||||
auto err_raw = native::fdb_result_get_keyvalue_array(r.get(), &out_kv, &out_count, &out_more_native);
|
||||
|
@ -239,8 +349,8 @@ public:
|
|||
return Error(err_raw);
|
||||
}
|
||||
|
||||
KeyValueArray getKeyValueArray() const {
|
||||
auto ret = KeyValueArray{};
|
||||
NativeKeyValueArray getKeyValueArray() const {
|
||||
auto ret = NativeKeyValueArray{};
|
||||
if (auto err = getKeyValueArrayNothrow(ret))
|
||||
throwError("ERROR: result_get_keyvalue_array(): ", err);
|
||||
return ret;
|
||||
|
@ -250,6 +360,8 @@ public:
|
|||
class Future {
|
||||
protected:
|
||||
friend class Transaction;
|
||||
friend struct FutureHash;
|
||||
friend struct FutureEquals;
|
||||
std::shared_ptr<native::FDBFuture> f;
|
||||
|
||||
Future(native::FDBFuture* future) {
|
||||
|
@ -332,6 +444,14 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
struct FutureHash {
|
||||
size_t operator()(const Future& f) const { return std::hash<native::FDBFuture*>{}(f.f.get()); }
|
||||
};
|
||||
|
||||
struct FutureEquals {
|
||||
bool operator()(const Future& a, const Future& b) const { return a.f.get() == b.f.get(); }
|
||||
};
|
||||
|
||||
template <typename VarTraits>
|
||||
class TypedFuture : public Future {
|
||||
friend class Future;
|
||||
|
@ -413,6 +533,12 @@ public:
|
|||
return Error(native::fdb_transaction_set_option(tr.get(), option, str.data(), intSize(str)));
|
||||
}
|
||||
|
||||
Error setOptionNothrow(FDBTransactionOption option, CharsRef str) noexcept {
|
||||
return setOptionNothrow(option, toBytesRef(str));
|
||||
}
|
||||
|
||||
Error setOptionNothrow(FDBTransactionOption option) noexcept { return setOptionNothrow(option, ""); }
|
||||
|
||||
void setOption(FDBTransactionOption option, int64_t value) {
|
||||
if (auto err = setOptionNothrow(option, value)) {
|
||||
throwError(fmt::format("transaction_set_option({}, {}) returned error: ",
|
||||
|
@ -430,6 +556,22 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
void setOption(FDBTransactionOption option, CharsRef str) {
|
||||
if (auto err = setOptionNothrow(option, str)) {
|
||||
throwError(fmt::format("transaction_set_option({}) returned error: ",
|
||||
static_cast<std::underlying_type_t<FDBTransactionOption>>(option)),
|
||||
err);
|
||||
}
|
||||
}
|
||||
|
||||
void setOption(FDBTransactionOption option) {
|
||||
if (auto err = setOptionNothrow(option)) {
|
||||
throwError(fmt::format("transaction_set_option({}) returned error: ",
|
||||
static_cast<std::underlying_type_t<FDBTransactionOption>>(option)),
|
||||
err);
|
||||
}
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Int64> getReadVersion() { return native::fdb_transaction_get_read_version(tr.get()); }
|
||||
|
||||
Error getCommittedVersionNothrow(int64_t& out) {
|
||||
|
@ -448,7 +590,7 @@ public:
|
|||
return native::fdb_transaction_get_key(tr.get(), sel.key, sel.keyLength, sel.orEqual, sel.offset, snapshot);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Value> get(KeyRef key, bool snapshot) {
|
||||
TypedFuture<future_var::OptionalValue> get(KeyRef key, bool snapshot) {
|
||||
return native::fdb_transaction_get(tr.get(), key.data(), intSize(key), snapshot);
|
||||
}
|
||||
|
||||
|
@ -479,6 +621,11 @@ public:
|
|||
reverse);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::KeyRangeArray> getBlobGranuleRanges(KeyRef begin, KeyRef end) {
|
||||
return native::fdb_transaction_get_blob_granule_ranges(
|
||||
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end));
|
||||
}
|
||||
|
||||
Result readBlobGranules(KeyRef begin,
|
||||
KeyRef end,
|
||||
int64_t begin_version,
|
||||
|
@ -494,6 +641,8 @@ public:
|
|||
|
||||
void reset() { return native::fdb_transaction_reset(tr.get()); }
|
||||
|
||||
void cancel() { return native::fdb_transaction_cancel(tr.get()); }
|
||||
|
||||
void set(KeyRef key, ValueRef value) {
|
||||
native::fdb_transaction_set(tr.get(), key.data(), intSize(key), value.data(), intSize(value));
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
},
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
f.get<future_var::NativeValue>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
|
@ -72,7 +72,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
},
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::KeyValueArray>();
|
||||
f.get<future_var::NativeKeyValueArray>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
|
@ -84,7 +84,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
},
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
f.get<future_var::NativeValue>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
|
@ -107,7 +107,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
},
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::KeyValueArray>();
|
||||
f.get<future_var::NativeKeyValueArray>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
|
@ -119,7 +119,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
},
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
f.get<future_var::NativeValue>();
|
||||
}
|
||||
} },
|
||||
{ StepKind::IMM,
|
||||
|
@ -257,7 +257,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
|
||||
user_context.clear();
|
||||
|
||||
auto out = Result::KeyValueArray{};
|
||||
auto out = Result::NativeKeyValueArray{};
|
||||
err = r.getKeyValueArrayNothrow(out);
|
||||
if (!err || err.is(2037 /*blob_granule_not_materialized*/))
|
||||
return Future();
|
||||
|
|
Loading…
Reference in New Issue