Merge remote-tracking branch 'origin/main' into features/validate-trace-events-in-simulation

This commit is contained in:
Markus Pilman 2022-04-22 15:39:55 -06:00
commit 9e65e15b45
66 changed files with 1617 additions and 664 deletions

View File

@ -106,6 +106,7 @@ if(NOT WIN32)
test/apitester/TesterApiWrapper.h
test/apitester/TesterTestSpec.cpp
test/apitester/TesterTestSpec.h
test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp
test/apitester/TesterCancelTransactionWorkload.cpp
test/apitester/TesterCorrectnessWorkload.cpp
test/apitester/TesterKeyValueStore.cpp
@ -251,6 +252,25 @@ endif()
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
--tmp-dir
@TMP_DIR@
)
add_fdbclient_test(
NAME fdb_c_api_tests_blob_granule
DISABLE_LOG_DUMP
API_TEST_BLOB_GRANULES_ENABLED
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/blobgranuletests
--blob-granule-local-file-path
@DATA_DIR@/fdbblob/
)
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)

View File

@ -183,4 +183,63 @@ void ApiWorkload::populateData(TTaskFct cont) {
}
}
void ApiWorkload::randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void ApiWorkload::randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void ApiWorkload::randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
} // namespace FdbApiTester

View File

@ -114,6 +114,11 @@ protected:
// Clear the data of the workload
void clearData(TTaskFct cont);
// common operations
void randomInsertOp(TTaskFct cont);
void randomClearOp(TTaskFct cont);
void randomClearRangeOp(TTaskFct cont);
private:
void populateDataTx(TTaskFct cont);

View File

@ -18,9 +18,9 @@
* limitations under the License.
*/
#include "TesterApiWrapper.h"
#include "TesterUtil.h"
#include <cstdint>
#include <fmt/format.h>
#include <fstream>
namespace FdbApiTester {
@ -60,6 +60,56 @@ std::optional<std::string> ValueFuture::getValue() const {
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
}
std::vector<KeyValue> KeyRangesFuture::getKeyRanges() const {
ASSERT(future_);
int count;
const FDBKeyRange* ranges;
fdb_check(fdb_future_get_keyrange_array(future_.get(), &ranges, &count));
std::vector<KeyValue> result;
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyRange kr = *ranges++;
KeyValue rkv;
rkv.key = std::string((const char*)kr.begin_key, kr.begin_key_length);
rkv.value = std::string((const char*)kr.end_key, kr.end_key_length);
result.push_back(rkv);
}
return result;
}
Result::Result(FDBResult* r) : result_(r, fdb_result_destroy) {}
std::vector<KeyValue> KeyValuesResult::getKeyValues(bool* more_out) {
ASSERT(result_);
int count;
const FDBKeyValue* kvs;
int more;
std::vector<KeyValue> result;
error_ = fdb_result_get_keyvalue_array(result_.get(), &kvs, &count, &more);
if (error_ != error_code_success) {
return result;
}
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyValue kv = *kvs++;
KeyValue rkv;
rkv.key = std::string((const char*)kv.key, kv.key_length);
rkv.value = std::string((const char*)kv.value, kv.value_length);
result.push_back(rkv);
}
*more_out = more;
return result;
}
// Given an FDBDatabase, initializes a new transaction.
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
@ -109,6 +159,87 @@ fdb_error_t Transaction::setOption(FDBTransactionOption option) {
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
}
class TesterGranuleContext {
public:
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
int64_t nextId = 0;
std::string basePath;
~TesterGranuleContext() {
// if there was an error or not all loads finished, delete data
for (auto& it : loadsInProgress) {
uint8_t* dataToFree = it.second;
delete dataToFree;
}
}
};
static int64_t granule_start_load(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
int64_t loadId = ctx->nextId++;
uint8_t* buffer = new uint8_t[length];
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
fin.seekg(offset);
fin.read((char*)buffer, length);
ctx->loadsInProgress.insert({ loadId, buffer });
return loadId;
}
static uint8_t* granule_get_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
return ctx->loadsInProgress.at(loadId);
}
static void granule_free_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
auto it = ctx->loadsInProgress.find(loadId);
uint8_t* dataToFree = it->second;
delete dataToFree;
ctx->loadsInProgress.erase(it);
}
KeyValuesResult Transaction::readBlobGranules(std::string_view begin,
std::string_view end,
const std::string& basePath) {
ASSERT(tx_);
TesterGranuleContext testerContext;
testerContext.basePath = basePath;
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = &testerContext;
granuleContext.debugNoMaterialize = false;
granuleContext.granuleParallelism = 1;
granuleContext.start_load_f = &granule_start_load;
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
return KeyValuesResult(fdb_transaction_read_blob_granules(tx_.get(),
(const uint8_t*)begin.data(),
begin.size(),
(const uint8_t*)end.data(),
end.size(),
0 /* beginVersion */,
-2 /* latest read version */,
granuleContext));
}
KeyRangesFuture Transaction::getBlobGranuleRanges(std::string_view begin, std::string_view end) {
ASSERT(tx_);
return KeyRangesFuture(fdb_transaction_get_blob_granule_ranges(
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size()));
}
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}

View File

@ -26,6 +26,7 @@
#include <string_view>
#include <optional>
#include <memory>
#include <unordered_map>
#define FDB_API_VERSION 720
#include "bindings/c/foundationdb/fdb_c.h"
@ -35,6 +36,8 @@
#include "flow/error_definitions.h"
#include "TesterUtil.h"
namespace FdbApiTester {
// Wrapper parent class to manage memory of an FDBFuture pointer. Cleans up
@ -62,6 +65,37 @@ public:
std::optional<std::string> getValue() const;
};
class KeyRangesFuture : public Future {
public:
KeyRangesFuture() = default;
KeyRangesFuture(FDBFuture* f) : Future(f) {}
std::vector<KeyValue> getKeyRanges() const;
};
class Result {
public:
Result() = default;
Result(FDBResult* r);
FDBResult* fdbResult() { return result_.get(); };
fdb_error_t getError() const { return error_; }
explicit operator bool() const { return result_ != nullptr; };
fdb_error_t error_ = error_code_client_invalid_operation; // have to call getX function to set this
protected:
std::shared_ptr<FDBResult> result_;
};
class KeyValuesResult : public Result {
public:
KeyValuesResult() = default;
KeyValuesResult(FDBResult* f) : Result(f) {}
std::vector<KeyValue> getKeyValues(bool* more_out);
};
class Transaction {
public:
Transaction() = default;
@ -76,6 +110,9 @@ public:
void reset();
fdb_error_t setOption(FDBTransactionOption option);
KeyValuesResult readBlobGranules(std::string_view begin, std::string_view end, const std::string& basePath);
KeyRangesFuture getBlobGranuleRanges(std::string_view begin, std::string_view end);
private:
std::shared_ptr<FDBTransaction> tx_;
};

View File

@ -0,0 +1,159 @@
/*
* TesterBlobGranuleCorrectnessWorkload.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TesterApiWorkload.h"
#include "TesterUtil.h"
#include <memory>
#include <fmt/format.h>
namespace FdbApiTester {
class ApiBlobGranuleCorrectnessWorkload : public ApiWorkload {
public:
ApiBlobGranuleCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
// sometimes don't do range clears
if (Random::get().randomInt(0, 1) == 0) {
excludedOpTypes.push_back(OP_CLEAR_RANGE);
}
}
private:
enum OpType { OP_INSERT, OP_CLEAR, OP_CLEAR_RANGE, OP_READ, OP_GET_RANGES, OP_LAST = OP_GET_RANGES };
std::vector<OpType> excludedOpTypes;
void randomReadOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
ctx->tx()->setOption(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
KeyValuesResult res = ctx->tx()->readBlobGranules(begin, end, ctx->getBGBasePath());
bool more;
(*results) = res.getKeyValues(&more);
ASSERT(!more);
if (res.getError() != error_code_success) {
ctx->onError(res.getError());
} else {
ctx->done();
}
},
[this, begin, end, results, cont]() {
std::vector<KeyValue> expected = store.getRange(begin, end, store.size(), false);
if (results->size() != expected.size()) {
error(fmt::format("randomReadOp result size mismatch. expected: {} actual: {}",
expected.size(),
results->size()));
}
ASSERT(results->size() == expected.size());
for (int i = 0; i < results->size(); i++) {
if ((*results)[i].key != expected[i].key) {
error(fmt::format("randomReadOp key mismatch at {}/{}. expected: {} actual: {}",
i,
results->size(),
expected[i].key,
(*results)[i].key));
}
ASSERT((*results)[i].key == expected[i].key);
if ((*results)[i].value != expected[i].value) {
error(
fmt::format("randomReadOp value mismatch at {}/{}. key: {} expected: {:.80} actual: {:.80}",
i,
results->size(),
expected[i].key,
expected[i].value,
(*results)[i].value));
}
ASSERT((*results)[i].value == expected[i].value);
}
schedule(cont);
});
}
void randomGetRangesOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
KeyRangesFuture f = ctx->tx()->getBlobGranuleRanges(begin, end);
ctx->continueAfter(
f,
[ctx, f, results]() {
(*results) = f.getKeyRanges();
ctx->done();
},
true);
},
[this, begin, end, results, cont]() {
ASSERT(results->size() > 0);
ASSERT(results->front().key <= begin);
ASSERT(results->back().value >= end);
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
ASSERT((*results)[i].key < (*results)[i].value);
}
for (int i = 1; i < results->size(); i++) {
// ranges contain entire requested key range
ASSERT((*results)[i].key == (*results)[i - 1].value);
}
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
while (std::count(excludedOpTypes.begin(), excludedOpTypes.end(), txType)) {
txType = (OpType)Random::get().randomInt(0, OP_LAST);
}
switch (txType) {
case OP_INSERT:
randomInsertOp(cont);
break;
case OP_CLEAR:
randomClearOp(cont);
break;
case OP_CLEAR_RANGE:
randomClearRangeOp(cont);
break;
case OP_READ:
randomReadOp(cont);
break;
case OP_GET_RANGES:
randomGetRangesOp(cont);
break;
}
}
};
WorkloadFactory<ApiBlobGranuleCorrectnessWorkload> ApiBlobGranuleCorrectnessWorkloadFactory(
"ApiBlobGranuleCorrectness");
} // namespace FdbApiTester

View File

@ -31,27 +31,6 @@ public:
private:
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
void randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void randomCommitReadOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
@ -145,44 +124,6 @@ private:
});
}
void randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {

View File

@ -30,12 +30,9 @@
#include <vector>
#include <mutex>
namespace FdbApiTester {
#include "TesterUtil.h"
struct KeyValue {
std::string key;
std::string value;
};
namespace FdbApiTester {
class KeyValueStore {
public:

View File

@ -38,6 +38,7 @@ public:
std::string logGroup;
std::string externalClientLibrary;
std::string externalClientDir;
std::string tmpDir;
bool disableLocalClient = false;
std::string testFile;
std::string inputPipeName;
@ -49,6 +50,7 @@ public:
int numClients;
std::vector<std::pair<std::string, std::string>> knobs;
TestSpec testSpec;
std::string bgBasePath;
};
} // namespace FdbApiTester

View File

@ -75,9 +75,10 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
int retryLimit,
std::string bgBasePath)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false) {}
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath) {}
// A state machine:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
@ -123,6 +124,8 @@ public:
contAfterDone();
}
std::string getBGBasePath() override { return bgBasePath; }
protected:
virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
@ -217,6 +220,9 @@ protected:
// A history of errors on which the transaction was retried
std::vector<fdb_error_t> retriedErrors;
// blob granule base path
std::string bgBasePath;
};
/**
@ -228,8 +234,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -316,8 +323,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -482,9 +490,10 @@ class TransactionExecutorBase : public ITransactionExecutor {
public:
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
void init(IScheduler* scheduler, const char* clusterFile) override {
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
this->scheduler = scheduler;
this->clusterFile = clusterFile;
this->bgBasePath = bgBasePath;
}
protected:
@ -499,10 +508,10 @@ protected:
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
} else {
ctx = std::make_shared<AsyncTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
}
txActor->init(ctx);
txActor->start();
@ -511,6 +520,7 @@ protected:
protected:
TransactionExecutorOptions options;
std::string bgBasePath;
std::string clusterFile;
IScheduler* scheduler;
};
@ -524,8 +534,8 @@ public:
~DBPoolTransactionExecutor() override { release(); }
void init(IScheduler* scheduler, const char* clusterFile) override {
TransactionExecutorBase::init(scheduler, clusterFile);
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
for (int i = 0; i < options.numDatabases; i++) {
FDBDatabase* db;
fdb_error_t err = fdb_create_database(clusterFile, &db);

View File

@ -55,6 +55,9 @@ public:
// Mark the transaction as completed without committing it (for read transactions)
virtual void done() = 0;
// Plumbing for blob granule base path
virtual std::string getBGBasePath() = 0;
// A continuation to be executed when all of the given futures get ready
virtual void continueAfterAll(std::vector<Future> futures, TTaskFct cont);
};
@ -136,7 +139,7 @@ struct TransactionExecutorOptions {
class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile) = 0;
virtual void init(IScheduler* sched, const char* clusterFile, const std::string& bgBasePath) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
};

View File

@ -49,6 +49,11 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
namespace FdbApiTester {
struct KeyValue {
std::string key;
std::string value;
};
std::string lowerCase(const std::string& str);
class Random {

View File

@ -0,0 +1,24 @@
[[test]]
title = 'Blob GranuleAPI Correctness Blocking'
multiThreaded = true
buggify = true
blockOnFutures = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,23 @@
[[test]]
title = 'Blob Granule API Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,15 @@
[[test]]
title = 'Blob Granule API Correctness Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -46,12 +46,14 @@ enum TesterOptionId {
OPT_KNOB,
OPT_EXTERNAL_CLIENT_LIBRARY,
OPT_EXTERNAL_CLIENT_DIRECTORY,
OPT_TMP_DIR,
OPT_DISABLE_LOCAL_CLIENT,
OPT_TEST_FILE,
OPT_INPUT_PIPE,
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT
OPT_TRANSACTION_RETRY_LIMIT,
OPT_BLOB_GRANULE_LOCAL_FILE_PATH
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -66,6 +68,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
{ OPT_EXTERNAL_CLIENT_LIBRARY, "--external-client-library", SO_REQ_SEP },
{ OPT_EXTERNAL_CLIENT_DIRECTORY, "--external-client-dir", SO_REQ_SEP },
{ OPT_TMP_DIR, "--tmp-dir", SO_REQ_SEP },
{ OPT_DISABLE_LOCAL_CLIENT, "--disable-local-client", SO_NONE },
{ OPT_TEST_FILE, "-f", SO_REQ_SEP },
{ OPT_TEST_FILE, "--test-file", SO_REQ_SEP },
@ -73,6 +76,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
{ OPT_BLOB_GRANULE_LOCAL_FILE_PATH, "--blob-granule-local-file-path", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -98,6 +102,8 @@ void printProgramUsage(const char* execName) {
" Path to the external client library.\n"
" --external-client-dir DIR\n"
" Directory containing external client libraries.\n"
" --tmp-dir DIR\n"
" Directory for temporary files of the client.\n"
" --disable-local-client DIR\n"
" Disable the local client, i.e. use only external client libraries.\n"
" --input-pipe NAME\n"
@ -108,6 +114,8 @@ void printProgramUsage(const char* execName) {
" Required FDB API version (default %d).\n"
" --transaction-retry-limit NUMBER\n"
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
" --blob-granule-local-file-path PATH\n"
" Path to blob granule files on local filesystem\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" -h, --help Display this help and exit.\n",
@ -180,6 +188,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_EXTERNAL_CLIENT_DIRECTORY:
options.externalClientDir = args.OptionArg();
break;
case OPT_TMP_DIR:
options.tmpDir = args.OptionArg();
break;
case OPT_DISABLE_LOCAL_CLIENT:
options.disableLocalClient = true;
break;
@ -200,6 +211,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_TRANSACTION_RETRY_LIMIT:
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
break;
case OPT_BLOB_GRANULE_LOCAL_FILE_PATH:
options.bgBasePath = args.OptionArg();
break;
}
return true;
}
@ -236,6 +250,9 @@ void fdb_check(fdb_error_t e) {
}
void applyNetworkOptions(TesterOptions& options) {
if (!options.tmpDir.empty()) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_CLIENT_TMP_DIR, options.tmpDir));
}
if (!options.externalClientLibrary.empty()) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
fdb_check(
@ -295,7 +312,7 @@ bool runWorkloads(TesterOptions& options) {
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), options.bgBasePath);
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {

View File

@ -49,10 +49,16 @@ def initialize_logger_level(logging_level):
def run_tester(args, test_file):
cmd = [args.tester_binary, "--cluster-file",
args.cluster_file, "--test-file", test_file]
cmd = [args.tester_binary,
"--cluster-file", args.cluster_file,
"--test-file", test_file]
if args.external_client_library is not None:
cmd += ["--external-client-library", args.external_client_library]
if args.tmp_dir is not None:
cmd += ["--tmp-dir", args.tmp_dir]
if args.blob_granule_local_file_path is not None:
cmd += ["--blob-granule-local-file-path",
args.blob_granule_local_file_path]
get_logger().info('\nRunning tester \'%s\'...' % ' '.join(cmd))
proc = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
@ -82,8 +88,8 @@ def run_tester(args, test_file):
def run_tests(args):
num_failed = 0
test_files = [f for f in os.listdir(args.test_dir)
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
test_files = [f for f in os.listdir(args.test_dir) if os.path.isfile(
os.path.join(args.test_dir, f)) and f.endswith(".toml")]
for test_file in test_files:
get_logger().info('=========================================================')
@ -111,6 +117,10 @@ def parse_args(argv):
help='The timeout in seconds for running each individual test. (default 300)')
parser.add_argument('--logging-level', type=str, default='INFO',
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Specifies the level of detail in the tester output (default=\'INFO\').')
parser.add_argument('--tmp-dir', type=str, default=None,
help='The directory for storing temporary files (default: None)')
parser.add_argument('--blob-granule-local-file-path', type=str, default=None,
help='Enable blob granule tests if set, value is path to local blob granule files')
return parser.parse_args(argv)

View File

@ -285,6 +285,13 @@ func (o NetworkOptions) SetDistributedClientTracer(param string) error {
return o.setOpt(90, []byte(param))
}
// Sets the directory for storing temporary files created by FDB client, such as temporary copies of client libraries. Defaults to /tmp
//
// Parameter: Client directory for temporary files.
func (o NetworkOptions) SetClientTmpDir(param string) error {
return o.setOpt(90, []byte(param))
}
// Set the size of the client location cache. Raising this value can boost performance in very large databases where clients access data in a near-random pattern. Defaults to 100000.
//
// Parameter: Max location cache entries

View File

@ -405,6 +405,7 @@ endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)
function(add_fdbclient_test)
set(options DISABLED ENABLED DISABLE_LOG_DUMP)
set(options DISABLED ENABLED API_TEST_BLOB_GRANULES_ENABLED)
set(oneValueArgs NAME PROCESS_NUMBER TEST_TIMEOUT WORKING_DIRECTORY)
set(multiValueArgs COMMAND)
cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -431,6 +432,9 @@ function(add_fdbclient_test)
if(T_DISABLE_LOG_DUMP)
list(APPEND TMP_CLUSTER_CMD --disable-log-dump)
endif()
if(T_API_TEST_BLOB_GRANULES_ENABLED)
list(APPEND TMP_CLUSTER_CMD --blob-granules-enabled)
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
WORKING_DIRECTORY ${T_WORKING_DIRECTORY}

View File

@ -17,6 +17,8 @@ API version 720
General
-------
* Special keys ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` are removed in 7.2 and the functionalities they provide are now covered by the global configuration module.
.. _api-version-upgrade-guide-710:
API version 710

View File

@ -736,6 +736,7 @@
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",
@ -749,6 +750,7 @@
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",

View File

@ -191,7 +191,6 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/options/excluded/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/options/failed/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/min_required_commit_version`` Read/write. Changing this key will change the corresponding system key ``\xff/minRequiredCommitVersion = [[Version]]``. The value of this special key is the literal text of the underlying ``Version``, which is ``int64_t``. If you set the key with a value failed to be parsed as ``int64_t``, ``special_keys_api_failure`` will be thrown. In addition, the given ``Version`` should be larger than the current read version and smaller than the upper bound(``2**63-1-version_per_second*3600*24*365*1000``). Otherwise, ``special_keys_api_failure`` is thrown. For more details, see help text of ``fdbcli`` command ``advanceversion``.
#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
#. ``\xff\xff/management/maintenance/<zone_id> := <seconds>`` Read/write. Set/clear a key in this range will change the corresponding system key ``\xff\x02/healthyZone``. The value is a literal text of a non-negative ``double`` which represents the remaining time for the zone to be in maintenance. Commiting with an invalid value will throw ``special_keys_api_failure``. Only one zone is allowed to be in maintenance at the same time. Setting a new key in the range will override the old one and the transaction will throw ``special_keys_api_failure`` error if more than one zone is given. For more details, see help text of ``fdbcli`` command ``maintenance``.
In addition, a special key ``\xff\xff/management/maintenance/IgnoreSSFailures`` in the range, if set, will disable datadistribution for storage server failures.
It is doing the same thing as the fdbcli command ``datadistribution disable ssfailure``.
@ -264,6 +263,26 @@ clients can connect FoundationDB transactions to outside events.
#. ``\xff\xff/tracing/transaction_id := <transaction_id>`` Read/write. A 64-bit integer transaction ID which follows the transaction as it moves through FoundationDB. All transactions are assigned a random transaction ID on creation, and this key can be read to surface the randomly generated ID. Alternatively, set this key to provide a custom identifier. When setting this key, provide a string in the form of a 64-bit integer, which will be automatically converted to the appropriate type.
#. ``\xff\xff/tracing/token := <tracing_enabled>`` Read/write. Set to true/false to enable or disable tracing for the transaction, respectively. If read, returns a 64-bit integer set to 0 if tracing has been disabled, or a random 64-bit integer otherwise (this integers value has no meaning to the client other than to determine whether the transaction will be traced).
.. _special-key-space-deprecation:
Deprecated Keys
===============
Listed below are the special keys that have been deprecated. Special key(s) will no longer be accessible when the client specifies an API version equal to or larger than the version where they were deprecated. Clients specifying older API versions will be able to continue using the deprecated key(s).
#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Deprecated as of API version 7.2. The corresponding functionalities are now covered by the global configuration module. For details, see :doc:`global-configuration`. Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
Versioning
==========
For how FDB clients deal with versioning, see :ref:`api-versions`. The special key space deals with versioning by using the ``API_VERSION`` passed to initialize the client. Any module added at a version larger than the API version set by the client will be inaccessible. For example, if a module is added in version 7.0 and the client sets its API version to 630, then the module will not available. When removing or updating existing modules, module developers need to continue to provide the old behavior for clients that specify old API versions.
To remove the functionality of a certain special key(s), specify the API version where the function is being deprecated in the ``registerSpecialKeysImpl`` function. When a client specifies an API version greater than or equal to the deprecation version, the functionality will not be available. Move and update its documentation to :ref:`special-key-space-deprecation`.
To update the implementation of any special keys, add the new implementation and use ``API_VERSION`` to switch between different implementations.
Add notes in ``api-version-upgrade-guide.rst`` if you either remove or update a special key(s) implementation.
.. [#conflicting_keys] In practice, the transaction probably committed successfully. However, if you're running multiple resolvers then it's possible for a transaction to cause another to abort even if it doesn't commit successfully.
.. [#max_read_transaction_life_versions] The number 5000000 comes from the server knob MAX_READ_TRANSACTION_LIFE_VERSIONS
.. [#special_key_space_enable_writes] Enabling this option enables other transaction options, such as ``ACCESS_SYSTEM_KEYS``. This may change in the future.

View File

@ -195,6 +195,12 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL:
printf("Database created\n");
fprintf(
stderr,
"WARN: Sharded RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
fprintf(stderr, "ERROR: The database is unavailable\n");
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
@ -260,6 +266,12 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL:
printf("Configuration changed\n");
fprintf(
stderr,
"WARN: Sharded RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
default:
ASSERT(false);
ret = false;

View File

@ -303,6 +303,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["storage_engine"] = "ssd-rocksdb-v1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
result["storage_engine"] = "ssd-sharded-rocksdb";
} else if (tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY) {
result["storage_engine"] = "memory-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
@ -325,6 +328,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["tss_storage_engine"] = "ssd-redwood-1-experimental";
} else if (testingStorageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["tss_storage_engine"] = "ssd-rocksdb-v1";
} else if (testingStorageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
result["tss_storage_engine"] = "ssd-sharded-rocksdb";
} else if (testingStorageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE) {
result["tss_storage_engine"] = "memory-radixtree-beta";
} else if (testingStorageServerStoreType == KeyValueStoreType::MEMORY) {

View File

@ -594,9 +594,10 @@ public:
AsyncTrigger updateCache;
std::vector<std::unique_ptr<SpecialKeyRangeReadImpl>> specialKeySpaceModules;
std::unique_ptr<SpecialKeySpace> specialKeySpace;
void registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl);
void registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl,
int deprecatedVersion = -1);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;

View File

@ -825,7 +825,16 @@ struct KeyValueStoreType {
// These enumerated values are stored in the database configuration, so should NEVER be changed.
// Only add new ones just before END.
// SS storeType is END before the storageServerInterface is initialized.
enum StoreType { SSD_BTREE_V1, MEMORY, SSD_BTREE_V2, SSD_REDWOOD_V1, MEMORY_RADIXTREE, SSD_ROCKSDB_V1, END };
enum StoreType {
SSD_BTREE_V1,
MEMORY,
SSD_BTREE_V2,
SSD_REDWOOD_V1,
MEMORY_RADIXTREE,
SSD_ROCKSDB_V1,
SSD_SHARDED_ROCKSDB,
END
};
KeyValueStoreType() : type(END) {}
KeyValueStoreType(StoreType type) : type(type) {
@ -850,6 +859,8 @@ struct KeyValueStoreType {
return "ssd-redwood-1-experimental";
case SSD_ROCKSDB_V1:
return "ssd-rocksdb-v1";
case SSD_SHARDED_ROCKSDB:
return "ssd-sharded-rocksdb";
case MEMORY:
return "memory";
case MEMORY_RADIXTREE:

View File

@ -66,7 +66,9 @@ enum class ConfigurationResult {
SUCCESS_WARN_PPW_GRADUAL,
SUCCESS,
SUCCESS_WARN_ROCKSDB_EXPERIMENTAL,
SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
};
enum class CoordinatorsResult {
@ -293,6 +295,7 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
state bool warnPPWGradual = false;
state bool warnChangeStorageNoMigrate = false;
state bool warnRocksDBIsExperimental = false;
state bool warnShardedRocksDBIsExperimental = false;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -483,6 +486,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
} else if (newConfig.storageServerStoreType != oldConfig.storageServerStoreType &&
newConfig.storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
warnRocksDBIsExperimental = true;
} else if (newConfig.storageServerStoreType != oldConfig.storageServerStoreType &&
newConfig.storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
warnShardedRocksDBIsExperimental = true;
}
}
}
@ -534,6 +540,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
else if (m[configKeysPrefix.toString() + "storage_engine"] ==
std::to_string(KeyValueStoreType::SSD_ROCKSDB_V1))
return ConfigurationResult::DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL;
else if (m[configKeysPrefix.toString() + "storage_engine"] ==
std::to_string(KeyValueStoreType::SSD_SHARDED_ROCKSDB))
return ConfigurationResult::DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL;
else
return ConfigurationResult::DATABASE_CREATED;
} catch (Error& e2) {
@ -549,6 +558,8 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
return ConfigurationResult::SUCCESS_WARN_PPW_GRADUAL;
} else if (warnRocksDBIsExperimental) {
return ConfigurationResult::SUCCESS_WARN_ROCKSDB_EXPERIMENTAL;
} else if (warnShardedRocksDBIsExperimental) {
return ConfigurationResult::SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL;
} else {
return ConfigurationResult::SUCCESS;
}

View File

@ -217,6 +217,9 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
} else if (mode == "ssd-rocksdb-v1") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::SSD_ROCKSDB_V1;
} else if (mode == "ssd-sharded-rocksdb") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::SSD_SHARDED_ROCKSDB;
} else if (mode == "memory" || mode == "memory-2") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::MEMORY;

View File

@ -300,7 +300,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
int count;
FdbCApi::fdb_bool_t more;
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
ASSERT(!error);
if (error) {
return ThreadResult<RangeResult>(Error(error));
}
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
return ThreadResult<RangeResult>(
@ -1996,8 +1998,9 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe
for (int ii = 0; ii < threadCount; ++ii) {
std::string filename = basename(path);
char tempName[PATH_MAX + 12];
sprintf(tempName, "/tmp/%s-XXXXXX", filename.c_str());
constexpr int MAX_TMP_NAME_LENGTH = PATH_MAX + 12;
char tempName[MAX_TMP_NAME_LENGTH];
snprintf(tempName, MAX_TMP_NAME_LENGTH, "%s/%s-XXXXXX", tmpDir.c_str(), filename.c_str());
int tempFd = mkstemp(tempName);
int fd;
@ -2143,6 +2146,9 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
// multiple client threads are not supported on windows.
threadCount = extractIntOption(value, 1, 1);
#endif
} else if (option == FDBNetworkOptions::CLIENT_TMP_DIR) {
validateOption(value, true, false, false);
tmpDir = abspath(value.get().toString());
} else {
forwardOption = true;
}
@ -2518,7 +2524,8 @@ void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
MultiVersionApi::MultiVersionApi()
: callbackOnMainThread(true), localClientDisabled(false), networkStartSetup(false), networkSetup(false),
bypassMultiClientApi(false), externalClient(false), apiVersion(0), threadCount(0), envOptionsLoaded(false) {}
bypassMultiClientApi(false), externalClient(false), apiVersion(0), threadCount(0), tmpDir("/tmp"),
envOptionsLoaded(false) {}
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();

View File

@ -898,6 +898,7 @@ private:
int nextThread = 0;
int threadCount;
std::string tmpDir;
Mutex lock;
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;

View File

@ -1231,11 +1231,16 @@ Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed);
}
void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
// register a special key(s) implementation under the specified module
void DatabaseContext::registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl,
int deprecatedVersion) {
// if deprecated, add the implementation when the api version is less than the deprecated version
if (deprecatedVersion == -1 || apiVersion < deprecatedVersion) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
}
}
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
@ -1475,188 +1480,188 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
if (apiVersionAtLeast(710)) {
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TenantMapRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("tenantmap")));
}
if (apiVersionAtLeast(700)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin,
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin,
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ManagementCommandsOptionsImpl>(
KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludeServersRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("exclude")));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedServersRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("failed")));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("excludedlocality")));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("failedlocality")));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("excludedlocality")));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("failedlocality")));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ExclusionInProgressRangeImpl>(
KeyRangeRef(LiteralStringRef("in_progress_exclusion/"), LiteralStringRef("in_progress_exclusion0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ProcessClassRangeImpl>(
KeyRangeRef(LiteralStringRef("process/class_type/"), LiteralStringRef("process/class_type0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ProcessClassSourceRangeImpl>(
KeyRangeRef(LiteralStringRef("process/class_source/"), LiteralStringRef("process/class_source0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<LockDatabaseImpl>(
singleKeyRange(LiteralStringRef("db_locked"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ConsistencyCheckImpl>(
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::GLOBALCONFIG,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<GlobalConfigImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::TRACING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<CoordinatorsImpl>(
KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<CoordinatorsAutoImpl>(
singleKeyRange(LiteralStringRef("auto_coordinators"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<AdvanceVersionImpl>(
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<VersionEpochImpl>(
singleKeyRange(LiteralStringRef("version_epoch"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ClientProfilingImpl>(
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)),
/* deprecated */ 720);
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<MaintenanceImpl>(
KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<DataDistributionImpl>(
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::ACTORLINEAGE,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ActorLineageImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ActorProfilerConf>(SpecialKeySpace::getModuleRange(
SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF)));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ActorProfilerConf>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<HealthMetricsRangeImpl>(KeyRangeRef(LiteralStringRef("\xff\xff/metrics/health/"),
LiteralStringRef("\xff\xff/metrics/health0"))));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::WORKERINTERFACE,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::STATUSJSON,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionRecord()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CLUSTERFILEPATH,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> output =
StringRef(ryw->getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::STATUSJSON,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::CLUSTERFILEPATH,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> output =
StringRef(ryw->getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONNECTIONSTRING,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(

View File

@ -769,6 +769,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",
@ -782,6 +783,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",

View File

@ -381,7 +381,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, "fdb");
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 );
init( ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT, 64000000000 ); // 64GB, Rocksdb option, Writes will slow down.
init( ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT, 100000000000 ); // 100GB, Rocksdb option, Writes will stall.
@ -393,6 +393,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD, 5 );
init( ROCKSDB_COMPACTION_READAHEAD_SIZE, 32768 ); // 32 KB, performs bigger reads when doing compaction.
init( ROCKSDB_BLOCK_SIZE, 32768 ); // 32 KB, size of the block in rocksdb cache.
init( ENABLE_SHARDED_ROCKSDB, false );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
@ -854,6 +855,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ENCRYPTION_MODE, "AES-256-CTR");
init( SIM_KMS_MAX_KEYS, 4096);
// Support KmsConnector types are:
// KMS_CONNECTOR_TYPE_HTTP -> 1
init( KMS_CONNECTOR_TYPE, "HttpKmsConnector");
// Blob granlues
init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards ) BG_SNAPSHOT_FILE_TARGET_BYTES = 100000; else if (simulationMediumShards || (randomize && BUGGIFY) ) BG_SNAPSHOT_FILE_TARGET_BYTES = 1000000;

View File

@ -322,6 +322,7 @@ public:
int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE;
int64_t ROCKSDB_BLOCK_SIZE;
bool ENABLE_SHARDED_ROCKSDB;
// Leader election
int MAX_NOTIFICATIONS;
@ -821,6 +822,9 @@ public:
std::string ENCRYPTION_MODE;
int SIM_KMS_MAX_KEYS;
// Key Management Service (KMS) Connector
std::string KMS_CONNECTOR_TYPE;
// blob granule stuff
// FIXME: configure url with database configuration instead of knob eventually
std::string BG_URL;

View File

@ -548,6 +548,8 @@ void SpecialKeySpace::registerKeyRange(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
const KeyRangeRef& kr,
SpecialKeyRangeReadImpl* impl) {
// Not allowed to register an empty range
ASSERT(!kr.empty());
// module boundary check
if (module == SpecialKeySpace::MODULE::TESTONLY) {
ASSERT(normalKeys.contains(kr));
@ -1999,7 +2001,6 @@ ACTOR static Future<RangeResult> ClientProfilingGetRangeActor(ReadYourWritesTran
return result;
}
// TODO : add limitation on set operation
Future<RangeResult> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {

View File

@ -485,6 +485,7 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
// Deprecated as of 7.2
class ClientProfilingImpl : public SpecialKeyRangeRWImpl {
public:
explicit ClientProfilingImpl(KeyRangeRef kr);

View File

@ -134,6 +134,9 @@ description is not currently required but encouraged.
<Option name="distributed_client_tracer" code="90"
paramType="String" paramDescription="Distributed tracer type. Choose from none, log_file, or network_lossy"
description="Set a tracer to run on the client. Should be set to the same value as the tracer set on the server." />
<Option name="client_tmp_dir" code="90"
paramType="String" paramDescription="Client directory for temporary files. "
description="Sets the directory for storing temporary files created by FDB client, such as temporary copies of client libraries. Defaults to /tmp" />
<Option name="supported_client_versions" code="1000"
paramType="String" paramDescription="[release version],[source version],[protocol version];..."
description="This option is set automatically to communicate the list of supported clients to the active client."

View File

@ -55,6 +55,8 @@ set(FDBSERVER_SRCS
KeyValueStoreMemory.actor.cpp
KeyValueStoreRocksDB.actor.cpp
KeyValueStoreSQLite.actor.cpp
KmsConnector.h
KmsConnectorInterface.h
KnobProtectiveGroups.cpp
KnobProtectiveGroups.h
Knobs.h
@ -134,8 +136,8 @@ set(FDBSERVER_SRCS
ServerDBInfo.actor.h
ServerDBInfo.h
SigStack.cpp
SimEncryptKmsProxy.actor.cpp
SimEncryptKmsProxy.actor.h
SimKmsConnector.actor.h
SimKmsConnector.actor.cpp
SimpleConfigConsumer.actor.cpp
SimpleConfigConsumer.h
SimulatedCluster.actor.cpp

View File

@ -2631,7 +2631,11 @@ public:
.detail("TSSID", tssId)
.detail("Reason",
self->zeroHealthyTeams->get() ? "ZeroHealthyTeams" : "TooMany");
Promise<Void> shutdown = self->shutdown;
killPromise.send(Void());
if (!shutdown.canBeSet()) {
return Void(); // "self" got destroyed, so return.
}
}
}
}
@ -3582,6 +3586,8 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
DDTeamCollection::~DDTeamCollection() {
TraceEvent("DDTeamCollectionDestructed", distributorId).detail("Primary", primary);
// Signal that the object is being destroyed.
shutdown.send(Void());
// Cancel the teamBuilder to avoid creating new teams after teams are cancelled.
teamBuilder.cancel();

View File

@ -272,6 +272,12 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
LocalityMap<UID> machineLocalityMap; // locality info of machines
// A mechanism to tell actors that reference a DDTeamCollection object through a direct
// pointer (without doing reference counting) that the object is being destroyed.
// (Introduced to solve the problem of "self" getting destroyed from underneath the
// "storageRecruiter" actor).
Promise<Void> shutdown;
// Randomly choose one machine team that has chosenServer and has the correct size
// When configuration is changed, we may have machine teams with old storageTeamSize
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const;

View File

@ -386,15 +386,19 @@ void launchDest(RelocateData& relocation,
}
}
void completeDest(RelocateData const& relocation, std::map<UID, Busyness>& destBusymap) {
int destWorkFactor = getDestWorkFactor();
for (UID id : relocation.completeDests) {
destBusymap[id].removeWork(relocation.priority, destWorkFactor);
}
}
void complete(RelocateData const& relocation, std::map<UID, Busyness>& busymap, std::map<UID, Busyness>& destBusymap) {
ASSERT(relocation.workFactor > 0);
for (int i = 0; i < relocation.src.size(); i++)
busymap[relocation.src[i]].removeWork(relocation.priority, relocation.workFactor);
int destWorkFactor = getDestWorkFactor();
for (UID id : relocation.completeDests) {
destBusymap[id].removeWork(relocation.priority, destWorkFactor);
}
completeDest(relocation, destBusymap);
}
ACTOR Future<Void> dataDistributionRelocator(struct DDQueueData* self,
@ -1389,6 +1393,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
} else {
TEST(true); // move to removed server
healthyDestinations.addDataInFlightToTeam(-metrics.bytes);
completeDest(rd, self->destBusymap);
rd.completeDests.clear();
wait(delay(SERVER_KNOBS->RETRY_RELOCATESHARD_DELAY, TaskPriority::DataDistributionLaunch));
}

View File

@ -21,11 +21,15 @@
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/KmsConnector.h"
#include "fdbserver/KmsConnectorInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/SimEncryptKmsProxy.actor.h"
#include "fdbserver/SimKmsConnector.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "flow/Arena.h"
#include "flow/EncryptUtils.h"
#include "flow/Error.h"
#include "flow/EventTypes.actor.h"
#include "flow/FastRef.h"
@ -37,12 +41,11 @@
#include "flow/network.h"
#include <boost/mpl/not.hpp>
#include <utility>
#include <memory>
#include "flow/actorcompiler.h" // This must be the last #include.
using EncryptDomainId = uint64_t;
using EncryptBaseCipherId = uint64_t;
namespace {
bool canReplyWith(Error e) {
switch (e.code()) {
@ -56,16 +59,16 @@ bool canReplyWith(Error e) {
} // namespace
struct EncryptBaseCipherKey {
EncryptDomainId domainId;
EncryptBaseCipherId baseCipherId;
EncryptCipherDomainId domainId;
EncryptCipherBaseKeyId baseCipherId;
Standalone<StringRef> baseCipherKey;
uint64_t creationTimeSec;
bool noExpiry;
EncryptBaseCipherKey()
: domainId(0), baseCipherId(0), baseCipherKey(StringRef()), creationTimeSec(0), noExpiry(false) {}
explicit EncryptBaseCipherKey(EncryptDomainId dId,
EncryptBaseCipherId cipherId,
explicit EncryptBaseCipherKey(EncryptCipherDomainId dId,
EncryptCipherBaseKeyId cipherId,
StringRef cipherKey,
bool neverExpire)
: domainId(dId), baseCipherId(cipherId), baseCipherKey(cipherKey), creationTimeSec(now()), noExpiry(neverExpire) {
@ -74,8 +77,8 @@ struct EncryptBaseCipherKey {
bool isValid() { return noExpiry ? true : ((now() - creationTimeSec) < FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL); }
};
using EncryptBaseDomainIdCache = std::unordered_map<EncryptDomainId, EncryptBaseCipherKey>;
using EncryptBaseCipherKeyIdCache = std::unordered_map<EncryptBaseCipherId, EncryptBaseCipherKey>;
using EncryptBaseDomainIdCache = std::unordered_map<EncryptCipherDomainId, EncryptBaseCipherKey>;
using EncryptBaseCipherKeyIdCache = std::unordered_map<EncryptCipherBaseKeyId, EncryptBaseCipherKey>;
struct EncryptKeyProxyData : NonCopyable, ReferenceCounted<EncryptKeyProxyData> {
public:
@ -86,6 +89,8 @@ public:
EncryptBaseDomainIdCache baseCipherDomainIdCache;
EncryptBaseCipherKeyIdCache baseCipherKeyIdCache;
std::unique_ptr<KmsConnector> kmsConnector;
CounterCollection ekpCacheMetrics;
Counter baseCipherKeyIdCacheMisses;
@ -106,8 +111,8 @@ public:
numResponseWithErrors("EKPNumResponseWithErrors", ekpCacheMetrics),
numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics) {}
void insertIntoBaseDomainIdCache(const EncryptDomainId domainId,
const EncryptBaseCipherId baseCipherId,
void insertIntoBaseDomainIdCache(const EncryptCipherDomainId domainId,
const EncryptCipherBaseKeyId baseCipherId,
const StringRef baseCipherKey) {
// Entries in domainId cache are eligible for periodic refreshes to support 'limiting lifetime of encryption
// key' support if enabled on external KMS solutions.
@ -118,8 +123,8 @@ public:
insertIntoBaseCipherIdCache(domainId, baseCipherId, baseCipherKey);
}
void insertIntoBaseCipherIdCache(const EncryptDomainId domainId,
const EncryptBaseCipherId baseCipherId,
void insertIntoBaseCipherIdCache(const EncryptCipherDomainId domainId,
const EncryptCipherBaseKeyId baseCipherId,
const StringRef baseCipherKey) {
// Given an cipherKey is immutable, it is OK to NOT expire cached information.
// TODO: Update cache to support LRU eviction policy to limit the total cache size.
@ -150,162 +155,168 @@ public:
};
ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData> ekpProxyData,
SimKmsProxyInterface simKmsInterface,
KmsConnectorInterface kmsConnectorInf,
EKPGetBaseCipherKeysByIdsRequest req) {
// Scan the cached cipher-keys and filter our baseCipherIds locally cached
// for the rest, reachout to KMS to fetch the required details
std::vector<EncryptBaseCipherId> lookupCipherIds;
state std::unordered_map<EncryptBaseCipherId, Standalone<StringRef>> cachedKeys;
for (EncryptBaseCipherId id : req.baseCipherIds) {
const auto itr = ekpProxyData->baseCipherKeyIdCache.find(id);
if (itr != ekpProxyData->baseCipherKeyIdCache.end()) {
ASSERT(itr->second.isValid());
cachedKeys.emplace(id, itr->second.baseCipherKey);
} else {
lookupCipherIds.push_back(id);
}
}
ekpProxyData->baseCipherKeyIdCacheHits += cachedKeys.size();
ekpProxyData->baseCipherKeyIdCacheMisses += lookupCipherIds.size();
std::vector<std::pair<EncryptCipherBaseKeyId, EncryptCipherDomainId>> lookupCipherIds;
state std::vector<EKPBaseCipherDetails> cachedCipherDetails;
state EKPGetBaseCipherKeysByIdsRequest keysByIds = req;
state EKPGetBaseCipherKeysByIdsReply keyIdsReply;
if (g_network->isSimulated()) {
if (!lookupCipherIds.empty()) {
try {
SimGetEncryptKeysByKeyIdsRequest simKeyIdsReq(lookupCipherIds);
SimGetEncryptKeysByKeyIdsReply simKeyIdsReply =
wait(simKmsInterface.encryptKeyLookupByKeyIds.getReply(simKeyIdsReq));
// Dedup the requested pair<baseCipherId, encryptDomainId>
// TODO: endpoint serialization of std::unordered_set isn't working at the moment
std::unordered_set<std::pair<EncryptCipherBaseKeyId, EncryptCipherDomainId>,
boost::hash<std::pair<EncryptCipherBaseKeyId, EncryptCipherDomainId>>>
dedupedCipherIds;
for (const auto& item : req.baseCipherIds) {
dedupedCipherIds.emplace(item);
}
for (const auto& item : simKeyIdsReply.encryptKeyMap) {
keyIdsReply.baseCipherMap.emplace(item.first, StringRef(keyIdsReply.arena, item.second));
}
// Record the fetched cipher details to the local cache for the future references
// Note: cache warm-up is done after reponding to the caller
for (auto& item : simKeyIdsReply.encryptKeyMap) {
// DomainId isn't available here, the caller must know the encryption domainId
ekpProxyData->insertIntoBaseCipherIdCache(0, item.first, item.second);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("GetCipherKeysByIds", ekpProxyData->myId).error(e);
throw;
}
TraceEvent("GetCipherKeysByIds", ekpProxyData->myId).detail("ErrorCode", e.code());
ekpProxyData->sendErrorResponse(keysByIds.reply, e);
return Void();
}
for (const auto& item : dedupedCipherIds) {
const auto itr = ekpProxyData->baseCipherKeyIdCache.find(item.first);
if (itr != ekpProxyData->baseCipherKeyIdCache.end()) {
ASSERT(itr->second.isValid());
cachedCipherDetails.emplace_back(
itr->second.domainId, itr->second.baseCipherId, itr->second.baseCipherKey, keyIdsReply.arena);
} else {
lookupCipherIds.emplace_back(std::make_pair(item.first, item.second));
}
} else {
// TODO: Call to non-FDB KMS connector process.
throw not_implemented();
}
for (auto& item : cachedKeys) {
keyIdsReply.baseCipherMap.emplace(item.first, item.second);
ekpProxyData->baseCipherKeyIdCacheHits += cachedCipherDetails.size();
ekpProxyData->baseCipherKeyIdCacheMisses += lookupCipherIds.size();
if (!lookupCipherIds.empty()) {
try {
KmsConnLookupEKsByKeyIdsReq keysByIdsReq(lookupCipherIds);
KmsConnLookupEKsByKeyIdsRep keysByIdsRep = wait(kmsConnectorInf.ekLookupByIds.getReply(keysByIdsReq));
for (const auto& item : keysByIdsRep.cipherKeyDetails) {
keyIdsReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.encryptKeyId, item.encryptKey, keyIdsReply.arena);
}
// Record the fetched cipher details to the local cache for the future references
// Note: cache warm-up is done after reponding to the caller
for (auto& item : keysByIdsRep.cipherKeyDetails) {
// DomainId isn't available here, the caller must know the encryption domainId
ekpProxyData->insertIntoBaseCipherIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("GetCipherKeysByIds", ekpProxyData->myId).error(e);
throw;
}
TraceEvent("GetCipherKeysByIds", ekpProxyData->myId).detail("ErrorCode", e.code());
ekpProxyData->sendErrorResponse(keysByIds.reply, e);
return Void();
}
}
keyIdsReply.numHits = cachedKeys.size();
// Append cached cipherKeyDetails to the result-set
keyIdsReply.baseCipherDetails.insert(
keyIdsReply.baseCipherDetails.end(), cachedCipherDetails.begin(), cachedCipherDetails.end());
keyIdsReply.numHits = cachedCipherDetails.size();
keysByIds.reply.send(keyIdsReply);
return Void();
}
ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyData,
SimKmsProxyInterface simKmsInterface,
KmsConnectorInterface kmsConnectorInf,
EKPGetLatestBaseCipherKeysRequest req) {
// Scan the cached cipher-keys and filter our baseCipherIds locally cached
// for the rest, reachout to KMS to fetch the required details
state std::unordered_map<EncryptBaseCipherId, EKPBaseCipherDetails> cachedKeys;
state std::vector<EKPBaseCipherDetails> cachedCipherDetails;
state EKPGetLatestBaseCipherKeysRequest latestKeysReq = req;
state EKPGetLatestBaseCipherKeysReply latestCipherReply;
state Arena& arena = latestCipherReply.arena;
// Dedup the requested domainIds.
// TODO: endpoint serialization of std::unordered_set isn't working at the moment
std::unordered_set<EncryptCipherDomainId> dedupedDomainIds;
for (EncryptCipherDomainId id : req.encryptDomainIds) {
dedupedDomainIds.emplace(id);
}
// First, check if the requested information is already cached by the server.
// Ensure the cached information is within FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL time window.
std::vector<EncryptBaseCipherId> lookupCipherDomains;
for (EncryptDomainId id : req.encryptDomainIds) {
std::vector<EncryptCipherDomainId> lookupCipherDomains;
for (EncryptCipherDomainId id : dedupedDomainIds) {
const auto itr = ekpProxyData->baseCipherDomainIdCache.find(id);
if (itr != ekpProxyData->baseCipherDomainIdCache.end() && itr->second.isValid()) {
cachedKeys.emplace(id, EKPBaseCipherDetails(itr->second.baseCipherId, itr->second.baseCipherKey, arena));
cachedCipherDetails.emplace_back(id, itr->second.baseCipherId, itr->second.baseCipherKey, arena);
} else {
lookupCipherDomains.push_back(id);
lookupCipherDomains.emplace_back(id);
}
}
ekpProxyData->baseCipherDomainIdCacheHits += cachedKeys.size();
ekpProxyData->baseCipherDomainIdCacheHits += cachedCipherDetails.size();
ekpProxyData->baseCipherDomainIdCacheMisses += lookupCipherDomains.size();
if (g_network->isSimulated()) {
if (!lookupCipherDomains.empty()) {
try {
SimGetEncryptKeysByDomainIdsRequest simKeysByDomainIdReq(lookupCipherDomains);
SimGetEncryptKeyByDomainIdReply simKeysByDomainIdRep =
wait(simKmsInterface.encryptKeyLookupByDomainId.getReply(simKeysByDomainIdReq));
if (!lookupCipherDomains.empty()) {
try {
KmsConnLookupEKsByDomainIdsReq keysByDomainIdReq(lookupCipherDomains);
KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep =
wait(kmsConnectorInf.ekLookupByDomainIds.getReply(keysByDomainIdReq));
for (auto& item : simKeysByDomainIdRep.encryptKeyMap) {
latestCipherReply.baseCipherDetailMap.emplace(
item.first, EKPBaseCipherDetails(item.second.encryptKeyId, item.second.encryptKey, arena));
for (auto& item : keysByDomainIdRep.cipherKeyDetails) {
latestCipherReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.encryptKeyId, item.encryptKey, arena);
// Record the fetched cipher details to the local cache for the future references
ekpProxyData->insertIntoBaseDomainIdCache(
item.first, item.second.encryptKeyId, item.second.encryptKey);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).error(e);
throw;
}
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).detail("ErrorCode", e.code());
ekpProxyData->sendErrorResponse(latestKeysReq.reply, e);
return Void();
// Record the fetched cipher details to the local cache for the future references
ekpProxyData->insertIntoBaseDomainIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).error(e);
throw;
}
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).detail("ErrorCode", e.code());
ekpProxyData->sendErrorResponse(latestKeysReq.reply, e);
return Void();
}
} else {
// TODO: Call to non-FDB KMS connector process.
throw not_implemented();
}
for (auto& item : cachedKeys) {
latestCipherReply.baseCipherDetailMap.emplace(
item.first, EKPBaseCipherDetails(item.second.baseCipherId, item.second.baseCipherKey, arena));
for (auto& item : cachedCipherDetails) {
latestCipherReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.baseCipherId, item.baseCipherKey, arena);
}
latestCipherReply.numHits = cachedKeys.size();
latestCipherReply.numHits = cachedCipherDetails.size();
latestKeysReq.reply.send(latestCipherReply);
return Void();
}
ACTOR Future<Void> refreshEncryptionKeysUsingSimKms(Reference<EncryptKeyProxyData> ekpProxyData,
SimKmsProxyInterface simKmsInterface) {
ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf) {
ASSERT(g_network->isSimulated());
TraceEvent("RefreshEKs_Start", ekpProxyData->myId).detail("Inf", simKmsInterface.id());
TraceEvent("RefreshEKs_Start", ekpProxyData->myId).detail("KmsConnInf", kmsConnectorInf.id());
try {
SimGetEncryptKeysByDomainIdsRequest req;
KmsConnLookupEKsByDomainIdsReq req;
req.encryptDomainIds.reserve(ekpProxyData->baseCipherDomainIdCache.size());
for (auto& item : ekpProxyData->baseCipherDomainIdCache) {
req.encryptDomainIds.emplace_back(item.first);
}
SimGetEncryptKeyByDomainIdReply rep = wait(simKmsInterface.encryptKeyLookupByDomainId.getReply(req));
for (auto& item : rep.encryptKeyMap) {
ekpProxyData->insertIntoBaseDomainIdCache(item.first, item.second.encryptKeyId, item.second.encryptKey);
KmsConnLookupEKsByDomainIdsRep rep = wait(kmsConnectorInf.ekLookupByDomainIds.getReply(req));
for (auto& item : rep.cipherKeyDetails) {
ekpProxyData->insertIntoBaseDomainIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
ekpProxyData->baseCipherKeysRefreshed += rep.encryptKeyMap.size();
TraceEvent("RefreshEKs_Done", ekpProxyData->myId).detail("KeyCount", rep.encryptKeyMap.size());
ekpProxyData->baseCipherKeysRefreshed += rep.cipherKeyDetails.size();
TraceEvent("RefreshEKs_Done", ekpProxyData->myId).detail("KeyCount", rep.cipherKeyDetails.size());
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("RefreshEncryptionKeys_Error").error(e);
@ -318,30 +329,34 @@ ACTOR Future<Void> refreshEncryptionKeysUsingSimKms(Reference<EncryptKeyProxyDat
return Void();
}
ACTOR Future<Void> refreshEncryptionKeysUsingKms(Reference<EncryptKeyProxyData> ekpProxyData) {
wait(delay(0)); // compiler needs to be happy
throw not_implemented();
void refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
Future<Void> ignored = refreshEncryptionKeysCore(ekpProxyData, kmsConnectorInf);
}
void refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData, SimKmsProxyInterface simKmsInterface) {
Future<Void> ignored;
void activateKmsConnector(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
if (g_network->isSimulated()) {
ignored = refreshEncryptionKeysUsingSimKms(ekpProxyData, simKmsInterface);
ekpProxyData->kmsConnector = std::make_unique<SimKmsConnector>();
} else if (SERVER_KNOBS->KMS_CONNECTOR_TYPE.compare("HttpKmsConnector")) {
throw not_implemented();
} else {
ignored = refreshEncryptionKeysUsingKms(ekpProxyData);
throw not_implemented();
}
TraceEvent("EKP_ActiveKmsConnector", ekpProxyData->myId).detail("ConnectorType", SERVER_KNOBS->KMS_CONNECTOR_TYPE);
ekpProxyData->addActor.send(ekpProxyData->kmsConnector->connectorCore(kmsConnectorInf));
}
ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface, Reference<AsyncVar<ServerDBInfo>> db) {
state Reference<EncryptKeyProxyData> self(new EncryptKeyProxyData(ekpInterface.id()));
state PromiseStream<Future<Void>> addActor;
state Future<Void> collection = actorCollection(self->addActor.getFuture());
self->addActor.send(traceRole(Role::ENCRYPT_KEY_PROXY, ekpInterface.id()));
state SimKmsProxyInterface simKmsProxyInf;
state KmsConnectorInterface kmsConnectorInf;
kmsConnectorInf.initEndpoints();
TraceEvent("EKP_Start", self->myId).log();
TraceEvent("EKP_Start", self->myId).detail("KmsConnectorInf", kmsConnectorInf.id());
activateKmsConnector(self, kmsConnectorInf);
// Register a recurring task to refresh the cached Encryption keys.
// Approach avoids external RPCs due to EncryptionKey refreshes for the inline write encryption codepath such as:
@ -350,31 +365,17 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
// FLOW_KNOB->ENCRRYPTION_KEY_REFRESH_INTERVAL_SEC, allowing the interactions with external Encryption Key Manager
// mostly not co-inciding with FDB process encryption key refresh attempts.
if (g_network->isSimulated()) {
// In simulation construct an Encryption KMSProxy actor to satisfy encryption keys lookups otherwise satisfied
// by integrating external Encryption Key Management solutions.
simKmsProxyInf.initEndpoints();
self->addActor.send(simEncryptKmsProxyCore(simKmsProxyInf));
TraceEvent("EKP_InitSimKmsInf", self->myId).detail("Inf", simKmsProxyInf.id());
self->encryptionKeyRefresher = recurring([&]() { refreshEncryptionKeys(self, simKmsProxyInf); },
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL,
TaskPriority::Worker);
} else {
// TODO: Add recurring actor to talk to external KMS proxy process
throw not_implemented();
}
self->encryptionKeyRefresher = recurring([&]() { refreshEncryptionKeys(self, kmsConnectorInf); },
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL,
TaskPriority::Worker);
try {
loop choose {
when(EKPGetBaseCipherKeysByIdsRequest req = waitNext(ekpInterface.getBaseCipherKeysByIds.getFuture())) {
wait(getCipherKeysByBaseCipherKeyIds(self, simKmsProxyInf, req));
wait(getCipherKeysByBaseCipherKeyIds(self, kmsConnectorInf, req));
}
when(EKPGetLatestBaseCipherKeysRequest req = waitNext(ekpInterface.getLatestBaseCipherKeys.getFuture())) {
wait(getLatestCipherKeys(self, simKmsProxyInf, req));
wait(getLatestCipherKeys(self, kmsConnectorInf, req));
}
when(HaltEncryptKeyProxyRequest req = waitNext(ekpInterface.haltEncryptKeyProxy.getFuture())) {
TraceEvent("EKP_Halted", self->myId).detail("ReqID", req.requesterID);

View File

@ -90,10 +90,26 @@ struct HaltEncryptKeyProxyRequest {
}
};
struct EKPBaseCipherDetails {
constexpr static FileIdentifier file_identifier = 2149615;
int64_t encryptDomainId;
uint64_t baseCipherId;
StringRef baseCipherKey;
EKPBaseCipherDetails() : encryptDomainId(0), baseCipherId(0), baseCipherKey(StringRef()) {}
explicit EKPBaseCipherDetails(int64_t dId, uint64_t id, StringRef key, Arena& arena)
: encryptDomainId(dId), baseCipherId(id), baseCipherKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainId, baseCipherId, baseCipherKey);
}
};
struct EKPGetBaseCipherKeysByIdsReply {
constexpr static FileIdentifier file_identifier = 9485259;
Arena arena;
std::unordered_map<uint64_t, StringRef> baseCipherMap;
std::vector<EKPBaseCipherDetails> baseCipherDetails;
int numHits;
Optional<Error> error;
@ -101,18 +117,18 @@ struct EKPGetBaseCipherKeysByIdsReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, baseCipherMap, numHits, error);
serializer(ar, arena, baseCipherDetails, numHits, error);
}
};
struct EKPGetBaseCipherKeysByIdsRequest {
constexpr static FileIdentifier file_identifier = 4930263;
UID requesterID;
std::vector<uint64_t> baseCipherIds;
std::vector<std::pair<uint64_t, int64_t>> baseCipherIds;
ReplyPromise<EKPGetBaseCipherKeysByIdsReply> reply;
EKPGetBaseCipherKeysByIdsRequest() : requesterID(deterministicRandom()->randomUniqueID()) {}
explicit EKPGetBaseCipherKeysByIdsRequest(UID uid, const std::vector<uint64_t>& ids)
explicit EKPGetBaseCipherKeysByIdsRequest(UID uid, const std::vector<std::pair<uint64_t, int64_t>>& ids)
: requesterID(uid), baseCipherIds(ids) {}
template <class Ar>
@ -121,35 +137,20 @@ struct EKPGetBaseCipherKeysByIdsRequest {
}
};
struct EKPBaseCipherDetails {
constexpr static FileIdentifier file_identifier = 2149615;
uint64_t baseCipherId;
StringRef baseCipherKey;
EKPBaseCipherDetails() : baseCipherId(0), baseCipherKey(StringRef()) {}
explicit EKPBaseCipherDetails(uint64_t id, StringRef key, Arena& arena)
: baseCipherId(id), baseCipherKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, baseCipherId, baseCipherKey);
}
};
struct EKPGetLatestBaseCipherKeysReply {
constexpr static FileIdentifier file_identifier = 4831583;
Arena arena;
std::unordered_map<uint64_t, EKPBaseCipherDetails> baseCipherDetailMap;
std::vector<EKPBaseCipherDetails> baseCipherDetails;
int numHits;
Optional<Error> error;
EKPGetLatestBaseCipherKeysReply() : numHits(0) {}
explicit EKPGetLatestBaseCipherKeysReply(const std::unordered_map<uint64_t, EKPBaseCipherDetails>& cipherMap)
: baseCipherDetailMap(cipherMap), numHits(0) {}
explicit EKPGetLatestBaseCipherKeysReply(const std::vector<EKPBaseCipherDetails>& cipherDetails)
: baseCipherDetails(cipherDetails), numHits(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, baseCipherDetailMap, numHits, error);
serializer(ar, arena, baseCipherDetails, numHits, error);
}
};

View File

@ -187,6 +187,8 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType,
return keyValueStoreRedwoodV1(filename, logID);
case KeyValueStoreType::SSD_ROCKSDB_V1:
return keyValueStoreRocksDB(filename, logID, storeType);
case KeyValueStoreType::SSD_SHARDED_ROCKSDB:
return keyValueStoreRocksDB(filename, logID, storeType); // TODO: to replace the KVS in the future
case KeyValueStoreType::MEMORY_RADIXTREE:
return keyValueStoreMemory(filename,
logID,

View File

@ -1809,7 +1809,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void close() override { doClose(this, false); }
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
KeyValueStoreType getType() const override {
if (SERVER_KNOBS->ENABLE_SHARDED_ROCKSDB)
// KVSRocks pretends as KVSShardedRocksDB
// TODO: to remove when the ShardedRocksDB KVS implementation is added in the future
return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB);
else
return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1);
}
Future<Void> init() override {
if (openFuture.isValid()) {

43
fdbserver/KmsConnector.h Normal file
View File

@ -0,0 +1,43 @@
/*
* KmsConnector.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef KMS_CONNECTOR_H
#define KMS_CONNECTOR_H
#pragma once
#include "fdbserver/KmsConnectorInterface.h"
#include "flow/Arena.h"
#include "flow/EncryptUtils.h"
// FDB encryption needs to interact with external Key Management Services (KMS) solutions to lookup/refresh encryption
// keys. KmsConnector interface is an abstract interface enabling implementing specialized KMS connector
// implementations.
// FDB KMSConnector implementation should inherit from KmsConnector and implement pure virtual function,
// EncryptKeyProxyServer instantiates desired implementation object based on SERVER_KNOB->KMS_CONNECTOR_TYPE knob.
class KmsConnector : public NonCopyable {
public:
KmsConnector() {}
virtual ~KmsConnector() {}
virtual Future<Void> connectorCore(struct KmsConnectorInterface interf) = 0;
};
#endif

View File

@ -0,0 +1,144 @@
/*
* KmsConnectorInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_KMSCONNECTORINTERFACE_H
#define FDBSERVER_KMSCONNECTORINTERFACE_H
#pragma once
#include "fdbrpc/fdbrpc.h"
#include "flow/EncryptUtils.h"
#include "flow/FileIdentifier.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/network.h"
struct KmsConnectorInterface {
constexpr static FileIdentifier file_identifier = 2416711;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct KmsConnLookupEKsByKeyIdsReq> ekLookupByIds;
RequestStream<struct KmsConnLookupEKsByDomainIdsReq> ekLookupByDomainIds;
KmsConnectorInterface() {}
UID id() const { return ekLookupByIds.getEndpoint().token; }
template <class Archive>
void serialize(Archive& ar) {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, waitFailure);
if (Archive::isDeserializing) {
ekLookupByIds =
RequestStream<struct KmsConnLookupEKsByKeyIdsReq>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
ekLookupByDomainIds =
RequestStream<struct KmsConnLookupEKsByDomainIdsReq>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
}
}
void initEndpoints() {
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(ekLookupByIds.getReceiver(TaskPriority::Worker));
streams.push_back(ekLookupByDomainIds.getReceiver(TaskPriority::Worker));
FlowTransport::transport().addEndpoints(streams);
}
};
struct EncryptCipherKeyDetails {
constexpr static FileIdentifier file_identifier = 1227025;
EncryptCipherDomainId encryptDomainId;
EncryptCipherBaseKeyId encryptKeyId;
StringRef encryptKey;
EncryptCipherKeyDetails() {}
explicit EncryptCipherKeyDetails(EncryptCipherDomainId dId,
EncryptCipherBaseKeyId keyId,
StringRef key,
Arena& arena)
: encryptDomainId(dId), encryptKeyId(keyId), encryptKey(StringRef(arena, key)) {}
bool operator==(const EncryptCipherKeyDetails& toCompare) {
return encryptDomainId == toCompare.encryptDomainId && encryptKeyId == toCompare.encryptKeyId &&
encryptKey.compare(toCompare.encryptKey) == 0;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainId, encryptKeyId, encryptKey);
}
};
struct KmsConnLookupEKsByKeyIdsRep {
constexpr static FileIdentifier file_identifier = 2313778;
Arena arena;
std::vector<EncryptCipherKeyDetails> cipherKeyDetails;
KmsConnLookupEKsByKeyIdsRep() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, cipherKeyDetails);
}
};
struct KmsConnLookupEKsByKeyIdsReq {
constexpr static FileIdentifier file_identifier = 6913396;
std::vector<std::pair<EncryptCipherBaseKeyId, EncryptCipherDomainId>> encryptKeyIds;
ReplyPromise<KmsConnLookupEKsByKeyIdsRep> reply;
KmsConnLookupEKsByKeyIdsReq() {}
explicit KmsConnLookupEKsByKeyIdsReq(
const std::vector<std::pair<EncryptCipherBaseKeyId, EncryptCipherDomainId>>& keyIds)
: encryptKeyIds(keyIds) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptKeyIds, reply);
}
};
struct KmsConnLookupEKsByDomainIdsRep {
constexpr static FileIdentifier file_identifier = 3009025;
Arena arena;
std::vector<EncryptCipherKeyDetails> cipherKeyDetails;
KmsConnLookupEKsByDomainIdsRep() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, cipherKeyDetails);
}
};
struct KmsConnLookupEKsByDomainIdsReq {
constexpr static FileIdentifier file_identifier = 9918682;
std::vector<EncryptCipherDomainId> encryptDomainIds;
ReplyPromise<KmsConnLookupEKsByDomainIdsRep> reply;
KmsConnLookupEKsByDomainIdsReq() {}
explicit KmsConnLookupEKsByDomainIdsReq(const std::vector<EncryptCipherDomainId>& ids) : encryptDomainIds(ids) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainIds, reply);
}
};
#endif

View File

@ -1,188 +0,0 @@
/*
* SimEncryptKmsProxy.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <unordered_map>
#include "fdbrpc/sim_validation.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/SimEncryptKmsProxy.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/StreamCipher.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
struct SimEncryptKeyCtx {
SimEncryptKeyId id;
SimEncryptKey key;
SimEncryptKeyCtx() : id(0) {}
explicit SimEncryptKeyCtx(SimEncryptKeyId kId, const char* data) : id(kId), key(data) {}
};
struct SimEncryptKmsProxyContext {
uint32_t maxEncryptionKeys;
std::unordered_map<SimEncryptKeyId, std::unique_ptr<SimEncryptKeyCtx>> simEncryptKeyStore;
SimEncryptKmsProxyContext() : maxEncryptionKeys(0) {}
explicit SimEncryptKmsProxyContext(uint32_t keyCount) : maxEncryptionKeys(keyCount) {
uint8_t buffer[AES_256_KEY_LENGTH];
// Construct encryption keyStore.
for (int i = 0; i < maxEncryptionKeys; i++) {
generateRandomData(&buffer[0], AES_256_KEY_LENGTH);
SimEncryptKeyCtx ctx(i, reinterpret_cast<const char*>(buffer));
simEncryptKeyStore[i] = std::make_unique<SimEncryptKeyCtx>(i, reinterpret_cast<const char*>(buffer));
}
}
};
ACTOR Future<Void> simEncryptKmsProxyCore(SimKmsProxyInterface interf) {
state SimEncryptKmsProxyContext kmsProxyCtx(SERVER_KNOBS->SIM_KMS_MAX_KEYS);
ASSERT(kmsProxyCtx.simEncryptKeyStore.size() == SERVER_KNOBS->SIM_KMS_MAX_KEYS);
TraceEvent("SimEncryptKmsProxy_Init", interf.id()).detail("MaxEncryptKeys", SERVER_KNOBS->SIM_KMS_MAX_KEYS);
state bool success = true;
loop {
choose {
when(SimGetEncryptKeysByKeyIdsRequest req = waitNext(interf.encryptKeyLookupByKeyIds.getFuture())) {
state SimGetEncryptKeysByKeyIdsRequest keysByIdsReq = req;
state SimGetEncryptKeysByKeyIdsReply keysByIdsRep;
// Lookup corresponding EncryptKeyCtx for input keyId
for (SimEncryptKeyId keyId : req.encryptKeyIds) {
const auto& itr = kmsProxyCtx.simEncryptKeyStore.find(keyId);
if (itr != kmsProxyCtx.simEncryptKeyStore.end()) {
keysByIdsRep.encryptKeyMap.emplace(keyId,
StringRef(keysByIdsRep.arena, itr->second.get()->key));
} else {
success = false;
break;
}
}
wait(delayJittered(1.0)); // simulate network delay
success ? keysByIdsReq.reply.send(keysByIdsRep) : keysByIdsReq.reply.sendError(encrypt_key_not_found());
}
when(SimGetEncryptKeysByDomainIdsRequest req = waitNext(interf.encryptKeyLookupByDomainId.getFuture())) {
state SimGetEncryptKeysByDomainIdsRequest keysByDomainIdReq = req;
state SimGetEncryptKeyByDomainIdReply keysByDomainIdRep;
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This would
// mean multiple domains gets mapped to the same encryption key which is fine, the EncryptKeyStore
// guarantees that keyId -> plaintext encryptKey mapping is idempotent.
for (SimEncryptDomainId domainId : req.encryptDomainIds) {
SimEncryptKeyId keyId = domainId % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
const auto& itr = kmsProxyCtx.simEncryptKeyStore.find(keyId);
if (itr != kmsProxyCtx.simEncryptKeyStore.end()) {
keysByDomainIdRep.encryptKeyMap.emplace(
domainId,
SimEncryptKeyDetails(keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena));
} else {
success = false;
break;
}
}
wait(delayJittered(1.0)); // simulate network delay
success ? keysByDomainIdReq.reply.send(keysByDomainIdRep)
: keysByDomainIdReq.reply.sendError(encrypt_key_not_found());
}
}
}
}
void forceLinkSimEncryptKmsProxyTests() {}
namespace {
ACTOR Future<Void> testRunWorkload(SimKmsProxyInterface inf, uint32_t nEncryptionKeys) {
state uint32_t maxEncryptionKeys = nEncryptionKeys;
state int maxDomainIds = deterministicRandom()->randomInt(121, 295);
state int maxIterations = deterministicRandom()->randomInt(786, 1786);
state std::unordered_map<SimEncryptDomainId, std::unique_ptr<SimEncryptKeyCtx>> domainIdKeyMap;
state int i = 0;
TraceEvent("RunWorkloadStart").detail("MaxDomainIds", maxDomainIds).detail("MaxIterations", maxIterations);
{
// construct domainId to EncryptKeyCtx map
SimGetEncryptKeysByDomainIdsRequest domainIdsReq;
for (i = 0; i < maxDomainIds; i++) {
domainIdsReq.encryptDomainIds.push_back(i);
}
SimGetEncryptKeyByDomainIdReply domainIdsReply = wait(inf.encryptKeyLookupByDomainId.getReply(domainIdsReq));
for (auto& element : domainIdsReply.encryptKeyMap) {
domainIdKeyMap.emplace(element.first,
std::make_unique<SimEncryptKeyCtx>(element.second.encryptKeyId,
element.second.encryptKey.toString().c_str()));
}
// randomly pick any domainId and validate if lookupByKeyId result matches
SimGetEncryptKeysByKeyIdsRequest keyIdsReq;
state std::unordered_map<SimEncryptKeyId, StringRef> validationMap;
for (i = 0; i < maxIterations; i++) {
state int idx = deterministicRandom()->randomInt(0, maxDomainIds);
state SimEncryptKeyCtx* ctx = domainIdKeyMap[idx].get();
keyIdsReq.encryptKeyIds.push_back(ctx->id);
validationMap[ctx->id] = StringRef(ctx->key);
}
SimGetEncryptKeysByKeyIdsReply keyIdsReply = wait(inf.encryptKeyLookupByKeyIds.getReply(keyIdsReq));
ASSERT(keyIdsReply.encryptKeyMap.size() == validationMap.size());
for (const auto& element : keyIdsReply.encryptKeyMap) {
ASSERT(validationMap[element.first].compare(element.second) == 0);
}
}
{
// Verify unknown key access returns the error
state SimGetEncryptKeysByKeyIdsRequest req;
req.encryptKeyIds.push_back(maxEncryptionKeys + 1);
try {
SimGetEncryptKeysByKeyIdsReply reply = wait(inf.encryptKeyLookupByKeyIds.getReply(req));
} catch (Error& e) {
ASSERT(e.code() == error_code_encrypt_key_not_found);
}
}
TraceEvent("RunWorkloadDone").log();
return Void();
}
} // namespace
TEST_CASE("fdbserver/SimEncryptKmsProxy") {
state SimKmsProxyInterface inf;
state uint32_t maxEncryptKeys = 64;
loop choose {
when(wait(simEncryptKmsProxyCore(inf))) { throw internal_error(); }
when(wait(testRunWorkload(inf, maxEncryptKeys))) { break; }
}
return Void();
}

View File

@ -71,26 +71,43 @@ struct SimKmsProxyInterface {
}
};
struct SimEncryptKeyDetails {
constexpr static FileIdentifier file_identifier = 1227025;
SimEncryptDomainId encryptDomainId;
SimEncryptKeyId encryptKeyId;
StringRef encryptKey;
SimEncryptKeyDetails() {}
explicit SimEncryptKeyDetails(SimEncryptDomainId domainId, SimEncryptKeyId keyId, StringRef key, Arena& arena)
: encryptDomainId(domainId), encryptKeyId(keyId), encryptKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainId, encryptKeyId, encryptKey);
}
};
struct SimGetEncryptKeysByKeyIdsReply {
constexpr static FileIdentifier file_identifier = 2313778;
Arena arena;
std::unordered_map<SimEncryptKeyId, StringRef> encryptKeyMap;
std::vector<SimEncryptKeyDetails> encryptKeyDetails;
SimGetEncryptKeysByKeyIdsReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, encryptKeyMap);
serializer(ar, arena, encryptKeyDetails);
}
};
struct SimGetEncryptKeysByKeyIdsRequest {
constexpr static FileIdentifier file_identifier = 6913396;
std::vector<SimEncryptKeyId> encryptKeyIds;
std::vector<std::pair<SimEncryptKeyId, SimEncryptDomainId>> encryptKeyIds;
ReplyPromise<SimGetEncryptKeysByKeyIdsReply> reply;
SimGetEncryptKeysByKeyIdsRequest() {}
explicit SimGetEncryptKeysByKeyIdsRequest(const std::vector<SimEncryptKeyId>& keyIds) : encryptKeyIds(keyIds) {}
explicit SimGetEncryptKeysByKeyIdsRequest(const std::vector<std::pair<SimEncryptKeyId, SimEncryptDomainId>>& keyIds)
: encryptKeyIds(keyIds) {}
template <class Ar>
void serialize(Ar& ar) {
@ -98,31 +115,16 @@ struct SimGetEncryptKeysByKeyIdsRequest {
}
};
struct SimEncryptKeyDetails {
constexpr static FileIdentifier file_identifier = 1227025;
SimEncryptKeyId encryptKeyId;
StringRef encryptKey;
SimEncryptKeyDetails() {}
explicit SimEncryptKeyDetails(SimEncryptKeyId keyId, StringRef key, Arena& arena)
: encryptKeyId(keyId), encryptKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptKeyId, encryptKey);
}
};
struct SimGetEncryptKeyByDomainIdReply {
constexpr static FileIdentifier file_identifier = 3009025;
Arena arena;
std::unordered_map<SimEncryptDomainId, SimEncryptKeyDetails> encryptKeyMap;
std::vector<SimEncryptKeyDetails> encryptKeyDetails;
SimGetEncryptKeyByDomainIdReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, encryptKeyMap);
serializer(ar, arena, encryptKeyDetails);
}
};

View File

@ -0,0 +1,207 @@
/*
* SimEncryptKmsProxy.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/SimKmsConnector.actor.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/EncryptUtils.h"
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/network.h"
#include "flow/UnitTest.h"
#include <memory>
#include <unordered_map>
#include <utility>
#include "flow/actorcompiler.h" // This must be the last #include.
using SimEncryptKey = std::string;
struct SimEncryptKeyCtx {
EncryptCipherBaseKeyId id;
SimEncryptKey key;
explicit SimEncryptKeyCtx(EncryptCipherBaseKeyId kId, const char* data) : id(kId), key(data) {}
};
struct SimKmsConnectorContext {
uint32_t maxEncryptionKeys;
std::unordered_map<EncryptCipherBaseKeyId, std::unique_ptr<SimEncryptKeyCtx>> simEncryptKeyStore;
explicit SimKmsConnectorContext(uint32_t keyCount) : maxEncryptionKeys(keyCount) {
uint8_t buffer[AES_256_KEY_LENGTH];
// Construct encryption keyStore.
for (int i = 0; i < maxEncryptionKeys; i++) {
generateRandomData(&buffer[0], AES_256_KEY_LENGTH);
SimEncryptKeyCtx ctx(i, reinterpret_cast<const char*>(buffer));
simEncryptKeyStore[i] = std::make_unique<SimEncryptKeyCtx>(i, reinterpret_cast<const char*>(buffer));
}
}
};
ACTOR Future<Void> simKmsConnectorCore_impl(KmsConnectorInterface interf) {
TraceEvent("SimEncryptKmsProxy_Init", interf.id()).detail("MaxEncryptKeys", SERVER_KNOBS->SIM_KMS_MAX_KEYS);
state bool success = true;
state std::unique_ptr<SimKmsConnectorContext> ctx =
std::make_unique<SimKmsConnectorContext>(SERVER_KNOBS->SIM_KMS_MAX_KEYS);
ASSERT_EQ(ctx->simEncryptKeyStore.size(), SERVER_KNOBS->SIM_KMS_MAX_KEYS);
loop {
choose {
when(KmsConnLookupEKsByKeyIdsReq req = waitNext(interf.ekLookupByIds.getFuture())) {
state KmsConnLookupEKsByKeyIdsReq keysByIdsReq = req;
state KmsConnLookupEKsByKeyIdsRep keysByIdsRep;
// Lookup corresponding EncryptKeyCtx for input keyId
for (const auto& item : req.encryptKeyIds) {
const auto& itr = ctx->simEncryptKeyStore.find(item.first);
if (itr != ctx->simEncryptKeyStore.end()) {
keysByIdsRep.cipherKeyDetails.emplace_back(
item.second,
itr->first,
StringRef(keysByIdsRep.arena, itr->second.get()->key),
keysByIdsRep.arena);
} else {
success = false;
break;
}
}
wait(delayJittered(1.0)); // simulate network delay
success ? keysByIdsReq.reply.send(keysByIdsRep) : keysByIdsReq.reply.sendError(encrypt_key_not_found());
}
when(KmsConnLookupEKsByDomainIdsReq req = waitNext(interf.ekLookupByDomainIds.getFuture())) {
state KmsConnLookupEKsByDomainIdsReq keysByDomainIdReq = req;
state KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep;
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This would
// mean multiple domains gets mapped to the same encryption key which is fine, the EncryptKeyStore
// guarantees that keyId -> plaintext encryptKey mapping is idempotent.
for (EncryptCipherDomainId domainId : req.encryptDomainIds) {
EncryptCipherBaseKeyId keyId = domainId % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
const auto& itr = ctx->simEncryptKeyStore.find(keyId);
if (itr != ctx->simEncryptKeyStore.end()) {
keysByDomainIdRep.cipherKeyDetails.emplace_back(
domainId, keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena);
} else {
success = false;
break;
}
}
wait(delayJittered(1.0)); // simulate network delay
success ? keysByDomainIdReq.reply.send(keysByDomainIdRep)
: keysByDomainIdReq.reply.sendError(encrypt_key_not_found());
}
}
}
}
Future<Void> SimKmsConnector::connectorCore(KmsConnectorInterface interf) {
return simKmsConnectorCore_impl(interf);
}
void forceLinkSimKmsConnectorTests() {}
namespace {
ACTOR Future<Void> testRunWorkload(KmsConnectorInterface inf, uint32_t nEncryptionKeys) {
state uint32_t maxEncryptionKeys = nEncryptionKeys;
state int maxDomainIds = deterministicRandom()->randomInt(121, 295);
state int maxIterations = deterministicRandom()->randomInt(786, 1786);
state std::unordered_map<EncryptCipherDomainId, std::unique_ptr<SimEncryptKeyCtx>> domainIdKeyMap;
state int i = 0;
TraceEvent("RunWorkloadStart").detail("MaxDomainIds", maxDomainIds).detail("MaxIterations", maxIterations);
{
// construct domainId to EncryptKeyCtx map
KmsConnLookupEKsByDomainIdsReq domainIdsReq;
for (i = 0; i < maxDomainIds; i++) {
domainIdsReq.encryptDomainIds.push_back(i);
}
KmsConnLookupEKsByDomainIdsRep domainIdsRep = wait(inf.ekLookupByDomainIds.getReply(domainIdsReq));
for (auto& element : domainIdsRep.cipherKeyDetails) {
domainIdKeyMap.emplace(
element.encryptDomainId,
std::make_unique<SimEncryptKeyCtx>(element.encryptKeyId, element.encryptKey.toString().c_str()));
}
// randomly pick any domainId and validate if lookupByKeyId result matches
state std::unordered_map<EncryptCipherBaseKeyId, StringRef> validationMap;
std::unordered_map<EncryptCipherBaseKeyId, EncryptCipherDomainId> idsToLookup;
for (i = 0; i < maxIterations; i++) {
state int idx = deterministicRandom()->randomInt(0, maxDomainIds);
state SimEncryptKeyCtx* ctx = domainIdKeyMap[idx].get();
validationMap[ctx->id] = StringRef(ctx->key);
idsToLookup.emplace(ctx->id, idx);
}
state KmsConnLookupEKsByKeyIdsReq keyIdsReq;
for (const auto& item : idsToLookup) {
keyIdsReq.encryptKeyIds.emplace_back(std::make_pair(item.first, item.second));
}
state KmsConnLookupEKsByKeyIdsRep keyIdsReply = wait(inf.ekLookupByIds.getReply(keyIdsReq));
/* TraceEvent("Lookup")
.detail("KeyIdReqSize", keyIdsReq.encryptKeyIds.size())
.detail("KeyIdsRepSz", keyIdsReply.encryptKeyDetails.size())
.detail("ValSz", validationMap.size()); */
ASSERT(keyIdsReply.cipherKeyDetails.size() == validationMap.size());
for (const auto& element : keyIdsReply.cipherKeyDetails) {
ASSERT(validationMap[element.encryptKeyId].compare(element.encryptKey) == 0);
}
}
{
// Verify unknown key access returns the error
state KmsConnLookupEKsByKeyIdsReq req;
req.encryptKeyIds.emplace_back(std::make_pair(maxEncryptionKeys + 1, 1));
try {
KmsConnLookupEKsByKeyIdsRep reply = wait(inf.ekLookupByIds.getReply(req));
} catch (Error& e) {
ASSERT(e.code() == error_code_encrypt_key_not_found);
}
}
TraceEvent("RunWorkloadDone").log();
return Void();
}
} // namespace
TEST_CASE("fdbserver/SimKmsConnector") {
state KmsConnectorInterface inf;
state uint32_t maxEncryptKeys = 64;
state SimKmsConnector connector;
loop choose {
when(wait(connector.connectorCore(inf))) { throw internal_error(); }
when(wait(testRunWorkload(inf, maxEncryptKeys))) { break; }
}
return Void();
}

View File

@ -0,0 +1,40 @@
/*
* SimKmsConnector.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_SIMKMSCONNECTOR_ACTOR_G_H)
#define FDBSERVER_SIMKMSCONNECTOR_ACTOR_G_H
#include "fdbserver/SimKmsConnector.actor.g.h"
#elif !defined(FDBSERVER_SIMKMSCONNECTOR_ACTOR_H)
#define FDBSERVER_SIMKMSCONNECTOR_ACTOR_H
#include "fdbserver/KmsConnector.h"
#include "flow/BlobCipher.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class SimKmsConnector : public KmsConnector {
public:
SimKmsConnector() = default;
Future<Void> connectorCore(KmsConnectorInterface interf);
};
#endif

View File

@ -310,6 +310,7 @@ public:
// 2 = "memory-radixtree-beta"
// 3 = "ssd-redwood-1-experimental"
// 4 = "ssd-rocksdb-v1"
// 5 = "ssd-sharded-rocksdb"
// Requires a comma-separated list of numbers WITHOUT whitespaces
std::vector<int> storageEngineExcludeTypes;
// Set the maximum TLog version that can be selected for a test
@ -1420,6 +1421,16 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
noUnseed = true;
break;
}
case 5: {
TEST(true); // Simulated cluster using Sharded RocksDB storage engine
set_config("ssd-sharded-rocksdb");
// Tests using the RocksDB engine are necessarily non-deterministic because of RocksDB
// background threads.
TraceEvent(SevWarnAlways, "RocksDBNonDeterminism")
.detail("Explanation", "The Sharded RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
break;
}
default:
ASSERT(false); // Programmer forgot to adjust cases.
}

View File

@ -323,6 +323,9 @@ KeyValueStoreSuffix redwoodSuffix = { KeyValueStoreType::SSD_REDWOOD_V1, ".redwo
KeyValueStoreSuffix rocksdbSuffix = { KeyValueStoreType::SSD_ROCKSDB_V1,
".rocksdb",
FilesystemCheck::DIRECTORIES_ONLY };
KeyValueStoreSuffix shardedRocksdbSuffix = { KeyValueStoreType::SSD_SHARDED_ROCKSDB,
".shardedrocksdb",
FilesystemCheck::DIRECTORIES_ONLY };
std::string validationFilename = "_validate";
@ -337,6 +340,8 @@ std::string filenameFromSample(KeyValueStoreType storeType, std::string folder,
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB)
return joinPath(folder, sample_filename);
UNREACHABLE();
}
@ -352,6 +357,8 @@ std::string filenameFromId(KeyValueStoreType storeType, std::string folder, std:
return joinPath(folder, prefix + id.toString() + ".redwood-v1");
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, prefix + id.toString() + ".rocksdb");
else if (storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB)
return joinPath(folder, prefix + id.toString() + ".shardedrocksdb");
TraceEvent(SevError, "UnknownStoreType").detail("StoreType", storeType.toString());
UNREACHABLE();
@ -528,6 +535,9 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
result.insert(result.end(), result4.begin(), result4.end());
auto result5 = getDiskStores(folder, rocksdbSuffix.suffix, rocksdbSuffix.type, rocksdbSuffix.check);
result.insert(result.end(), result5.begin(), result5.end());
auto result6 =
getDiskStores(folder, shardedRocksdbSuffix.suffix, shardedRocksdbSuffix.type, shardedRocksdbSuffix.check);
result.insert(result.end(), result6.begin(), result6.end());
return result;
}
@ -1657,6 +1667,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled */
(g_network->isSimulated()
? (/* Disable for RocksDB */ s.storeType != KeyValueStoreType::SSD_ROCKSDB_V1 &&
s.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true));
Future<Void> kvClosed = kv->onClosed();
@ -2234,6 +2245,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled */
(g_network->isSimulated()
? (/* Disable for RocksDB */ req.storeType != KeyValueStoreType::SSD_ROCKSDB_V1 &&
req.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true));
@ -2432,6 +2444,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
} else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::MEMORY) {
included = fileExists(d.filename + "1.fdq");
} else {

View File

@ -33,6 +33,7 @@
#include <atomic>
#include <boost/range/const_iterator.hpp>
#include <utility>
#include "flow/actorcompiler.h" // This must be the last #include.
@ -76,10 +77,17 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
if (rep.present()) {
ASSERT(!rep.get().error.present());
ASSERT_EQ(rep.get().baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.get().baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.get().baseCipherDetailMap.find(id) != rep.get().baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.get().baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
// Ensure no hits reported by the cache.
@ -127,10 +135,17 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
ErrorOr<EKPGetLatestBaseCipherKeysReply> rep = wait(self->ekpInf.getLatestBaseCipherKeys.tryGetReply(req));
if (rep.present()) {
ASSERT(!rep.get().error.present());
ASSERT_EQ(rep.get().baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.get().baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.get().baseCipherDetailMap.find(id) != rep.get().baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.get().baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
// Ensure desired cache-hit counts
@ -165,16 +180,23 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
EKPGetLatestBaseCipherKeysReply rep = wait(self->ekpInf.getLatestBaseCipherKeys.getReply(req));
ASSERT(!rep.error.present());
ASSERT_EQ(rep.baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.baseCipherDetailMap.find(id) != rep.baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
self->cipherIdMap.clear();
self->cipherIds.clear();
for (auto& item : rep.baseCipherDetailMap) {
self->cipherIdMap.emplace(item.second.baseCipherId, StringRef(self->arena, item.second.baseCipherKey));
self->cipherIds.emplace_back(item.second.baseCipherId);
for (auto& item : rep.baseCipherDetails) {
self->cipherIdMap.emplace(item.baseCipherId, StringRef(self->arena, item.baseCipherKey));
self->cipherIds.emplace_back(item.baseCipherId);
}
state int numIterations = deterministicRandom()->randomInt(512, 786);
@ -184,28 +206,28 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
EKPGetBaseCipherKeysByIdsRequest req;
for (int i = idx; i < nIds && i < self->cipherIds.size(); i++) {
req.baseCipherIds.emplace_back(self->cipherIds[i]);
req.baseCipherIds.emplace_back(std::make_pair(self->cipherIds[i], 1));
}
expectedHits = req.baseCipherIds.size();
EKPGetBaseCipherKeysByIdsReply rep = wait(self->ekpInf.getBaseCipherKeysByIds.getReply(req));
ASSERT(!rep.error.present());
ASSERT_EQ(rep.baseCipherMap.size(), expectedHits);
ASSERT_EQ(rep.baseCipherDetails.size(), expectedHits);
ASSERT_EQ(rep.numHits, expectedHits);
// Valdiate the 'cipherKey' content against the one read while querying by domainIds
for (auto& item : rep.baseCipherMap) {
const auto itr = self->cipherIdMap.find(item.first);
for (auto& item : rep.baseCipherDetails) {
const auto itr = self->cipherIdMap.find(item.baseCipherId);
ASSERT(itr != self->cipherIdMap.end());
Standalone<StringRef> toCompare = self->cipherIdMap[item.first];
if (toCompare.compare(item.second) != 0) {
Standalone<StringRef> toCompare = self->cipherIdMap[item.baseCipherId];
if (toCompare.compare(item.baseCipherKey) != 0) {
TraceEvent("Mismatch")
.detail("Id", item.first)
.detail("Id", item.baseCipherId)
.detail("CipherMapDataHash", XXH3_64bits(toCompare.begin(), toCompare.size()))
.detail("CipherMapSize", toCompare.size())
.detail("CipherMapValue", toCompare.toString())
.detail("ReadDataHash", XXH3_64bits(item.second.begin(), item.second.size()))
.detail("ReadValue", item.second.toString())
.detail("ReadDataSize", item.second.size());
.detail("ReadDataHash", XXH3_64bits(item.baseCipherKey.begin(), item.baseCipherKey.size()))
.detail("ReadValue", item.baseCipherKey.toString())
.detail("ReadDataSize", item.baseCipherKey.size());
ASSERT(false);
}
}
@ -219,12 +241,15 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
TraceEvent("SimLookupInvalidKeyId_Start").log();
// Prepare a lookup with valid and invalid keyIds - SimEncryptKmsProxy should throw encrypt_key_not_found()
std::vector<uint64_t> baseCipherIds(self->cipherIds);
baseCipherIds.emplace_back(SERVER_KNOBS->SIM_KMS_MAX_KEYS + 10);
std::vector<std::pair<uint64_t, int64_t>> baseCipherIds;
for (auto id : self->cipherIds) {
baseCipherIds.emplace_back(std::make_pair(id, 1));
}
baseCipherIds.emplace_back(std::make_pair(SERVER_KNOBS->SIM_KMS_MAX_KEYS + 10, 1));
EKPGetBaseCipherKeysByIdsRequest req(deterministicRandom()->randomUniqueID(), baseCipherIds);
EKPGetBaseCipherKeysByIdsReply rep = wait(self->ekpInf.getBaseCipherKeysByIds.getReply(req));
ASSERT_EQ(rep.baseCipherMap.size(), 0);
ASSERT_EQ(rep.baseCipherDetails.size(), 0);
ASSERT(rep.error.present());
ASSERT_EQ(rep.error.get().code(), error_code_encrypt_key_not_found);

View File

@ -135,7 +135,6 @@ struct EncryptionOpsWorkload : TestWorkload {
maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5;
minBaseCipherId = 100;
headerBaseCipherId = wcx.clientId * 100 + 1;
headerRandomSalt = wcx.clientId * 100 + 1;
metrics = std::make_unique<WorkloadMetrics>();
@ -183,12 +182,20 @@ struct EncryptionOpsWorkload : TestWorkload {
ASSERT_EQ(cipherKeys.size(), 1);
}
// insert the Encrypt Header cipherKey
// insert the Encrypt Header cipherKey; record cipherDetails as getLatestCipher() may not work with multiple
// test clients
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(
ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen, headerRandomSalt);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen);
Reference<BlobCipherKey> latestCipher = cipherKeyCache->getLatestCipherKey(ENCRYPT_HEADER_DOMAIN_ID);
ASSERT_EQ(latestCipher->getBaseCipherId(), headerBaseCipherId);
ASSERT_EQ(memcmp(latestCipher->rawBaseCipher(), buff, cipherLen), 0);
headerRandomSalt = latestCipher->getSalt();
TraceEvent("SetupCipherEssentials_Done").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
TraceEvent("SetupCipherEssentials_Done")
.detail("MinDomainId", minDomainId)
.detail("MaxDomainId", maxDomainId)
.detail("HeaderBaseCipherId", headerBaseCipherId)
.detail("HeaderRandomSalt", headerRandomSalt);
}
void resetCipherEssentials() {

View File

@ -144,7 +144,8 @@ struct GetMappedRangeWorkload : ApiWorkload {
return Void();
}
static void validateRecord(int expectedId, const MappedKeyValueRef* it, GetMappedRangeWorkload* self) {
// Return true if need to retry.
static bool validateRecord(int expectedId, const MappedKeyValueRef* it, GetMappedRangeWorkload* self) {
// std::cout << "validateRecord expectedId " << expectedId << " it->key " << printable(it->key) << "
// indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId)) << std::endl;
ASSERT(it->key == indexEntryKey(expectedId));
@ -155,7 +156,12 @@ struct GetMappedRangeWorkload : ApiWorkload {
auto& getRange = std::get<GetRangeReqAndResultRef>(it->reqAndResult);
auto& rangeResult = getRange.result;
// std::cout << "rangeResult.size()=" << rangeResult.size() << std::endl;
ASSERT(rangeResult.more == false);
// In the future, we may be able to do the continuation more efficiently by combining partial results
// together and then validate.
if (rangeResult.more) {
// Retry if the underlying request is not fully completed.
return true;
}
ASSERT(rangeResult.size() == SPLIT_SIZE);
for (int split = 0; split < SPLIT_SIZE; split++) {
auto& kv = rangeResult[split];
@ -174,6 +180,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
ASSERT(getValue.result.present());
ASSERT(getValue.result.get() == recordValue(expectedId));
}
return false;
}
ACTOR Future<MappedRangeResult> scanMappedRangeWithLimits(Database cx,
@ -200,10 +207,17 @@ struct GetMappedRangeWorkload : ApiWorkload {
std::cout << "result.more=" << result.more << std::endl;
ASSERT(result.size() <= limit);
int expectedId = expectedBeginId;
bool needRetry = false;
for (const MappedKeyValueRef* it = result.begin(); it != result.end(); it++) {
validateRecord(expectedId, it, self);
if (validateRecord(expectedId, it, self)) {
needRetry = true;
break;
}
expectedId++;
}
if (needRetry) {
continue;
}
std::cout << "finished scanMappedRangeWithLimits" << std::endl;
return result;
} catch (Error& e) {

View File

@ -388,6 +388,9 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
test.store = keyValueStoreRedwoodV1(fn, id);
else if (workload->storeType == "ssd-rocksdb-v1")
test.store = keyValueStoreRocksDB(fn, id, KeyValueStoreType::SSD_ROCKSDB_V1);
else if (workload->storeType == "ssd-sharded-rocksdb")
test.store = keyValueStoreRocksDB(
fn, id, KeyValueStoreType::SSD_SHARDED_ROCKSDB); // TODO: to replace the KVS in the future
else if (workload->storeType == "memory")
test.store = keyValueStoreMemory(fn, id, 500e6);
else if (workload->storeType == "memory-radixtree-beta")

View File

@ -1172,51 +1172,6 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
} catch (Error& e) {
wait(tx->onError(e));
}
// profile client get
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// client_txn_sample_rate
state Optional<Value> txnSampleRate =
wait(tx->get(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSampleRate.present());
Optional<Value> txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate));
if (txnSampleRateKey.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(txnSampleRateKey.get(), Unversioned());
if (!std::isinf(sampleRateDbl)) {
ASSERT(txnSampleRate.get().toString() == boost::lexical_cast<std::string>(sampleRateDbl));
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
// client_txn_size_limit
state Optional<Value> txnSizeLimit =
wait(tx->get(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSizeLimit.present());
Optional<Value> txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit));
if (txnSizeLimitKey.present()) {
const int64_t sizeLimit =
BinaryReader::fromStringRef<int64_t>(txnSizeLimitKey.get(), Unversioned());
if (sizeLimit != -1) {
ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast<std::string>(sizeLimit));
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "ProfileClientGet").error(e);
wait(tx->onError(e));
}
}
// data_distribution & maintenance get
loop {
try {

View File

@ -35,7 +35,7 @@ void forceLinkBLockCiherTests();
void forceLinkParallelStreamTests();
void forceLinkSimExternalConnectionTests();
void forceLinkMutationLogReaderTests();
void forceLinkSimEncryptKmsProxyTests();
void forceLinkSimKmsConnectorTests();
void forceLinkIThreadPoolTests();
void forceLinkTokenSignTests();
void forceLinkVersionVectorTests();
@ -84,7 +84,7 @@ struct UnitTestWorkload : TestWorkload {
forceLinkParallelStreamTests();
forceLinkSimExternalConnectionTests();
forceLinkMutationLogReaderTests();
forceLinkSimEncryptKmsProxyTests();
forceLinkSimKmsConnectorTests();
forceLinkIThreadPoolTests();
forceLinkTokenSignTests();
forceLinkVersionVectorTests();

View File

@ -116,19 +116,32 @@ void BlobCipherKey::reset() {
// BlobKeyIdCache class methods
BlobCipherKeyIdCache::BlobCipherKeyIdCache()
: domainId(ENCRYPT_INVALID_DOMAIN_ID), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID) {}
: domainId(ENCRYPT_INVALID_DOMAIN_ID), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID),
latestRandomSalt(ENCRYPT_INVALID_RANDOM_SALT) {}
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
: domainId(dId), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID) {
: domainId(dId), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID), latestRandomSalt(ENCRYPT_INVALID_RANDOM_SALT) {
TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId);
}
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
if (baseCipherKeyId == ENCRYPT_INVALID_CIPHER_KEY_ID || salt == ENCRYPT_INVALID_RANDOM_SALT) {
throw encrypt_invalid_id();
}
return std::make_pair(baseCipherKeyId, salt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
if (keyIdCache.empty()) {
// Cache is empty, nothing more to do.
throw encrypt_key_not_found();
}
// Ensure latestCipher details sanity
ASSERT_GT(latestBaseCipherKeyId, ENCRYPT_INVALID_CIPHER_KEY_ID);
ASSERT_GT(latestRandomSalt, ENCRYPT_INVALID_RANDOM_SALT);
return getCipherByBaseCipherId(latestBaseCipherKeyId, latestRandomSalt);
}
@ -154,7 +167,7 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& bas
// ensure no key-tampering is done
try {
Reference<BlobCipherKey> cipherKey = getLatestCipherKey();
if (cipherKey->getBaseCipherId() == baseCipherId) {
if (cipherKey.isValid() && cipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
@ -189,6 +202,7 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& bas
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
ASSERT_GT(salt, ENCRYPT_INVALID_RANDOM_SALT);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(baseCipherId, salt);
@ -265,7 +279,8 @@ void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID ||
salt == ENCRYPT_INVALID_RANDOM_SALT) {
throw encrypt_invalid_id();
}

View File

@ -25,7 +25,7 @@
#include <unordered_map>
#include <vector>
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#if (!defined(TLS_DISABLED))
#define ENCRYPTION_ENABLED 1
#else
#define ENCRYPTION_ENABLED 0
@ -233,20 +233,12 @@ private:
// required encryption key, however, CPs/SSs cache-miss would result in RPC to
// EncryptKeyServer to refresh the desired encryption key.
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2>& pair) const {
auto hash1 = std::hash<T1>{}(pair.first);
auto hash2 = std::hash<T2>{}(pair.second);
// Equal hashes XOR would be ZERO.
return hash1 == hash2 ? hash1 : hash1 ^ hash2;
}
};
using BlobCipherKeyIdCacheKey = std::pair<EncryptCipherBaseKeyId, EncryptCipherRandomSalt>;
using BlobCipherKeyIdCacheMap = std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>;
using BlobCipherKeyIdCacheKeyHash = boost::hash<BlobCipherKeyIdCacheKey>;
using BlobCipherKeyIdCacheMap =
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, BlobCipherKeyIdCacheKeyHash>;
using BlobCipherKeyIdCacheMapCItr =
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>::const_iterator;
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, BlobCipherKeyIdCacheKeyHash>::const_iterator;
struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> {
public:

View File

@ -27,6 +27,7 @@
#define ENCRYPT_INVALID_DOMAIN_ID 0
#define ENCRYPT_INVALID_CIPHER_KEY_ID 0
#define ENCRYPT_INVALID_RANDOM_SALT 0
#define AUTH_TOKEN_SIZE 16

View File

@ -305,7 +305,7 @@ ERROR( encrypt_key_not_found, 2702, "Expected encryption key is missing")
ERROR( encrypt_key_ttl_expired, 2703, "Expected encryption key TTL has expired")
ERROR( encrypt_header_authtoken_mismatch, 2704, "Encryption header authentication token mismatch")
ERROR( encrypt_update_cipher, 2705, "Attempt to update encryption cipher key")
ERROR( encrypt_invalid_id, 2706, "Invalid encryption domainId or encryption cipher key id")
ERROR( encrypt_invalid_id, 2706, "Invalid encryption cipher details")
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error

View File

@ -66,6 +66,7 @@ public-address = {ip_address}:$ID
listen-address = public
datadir = {datadir}/$ID
logdir = {logdir}
{bg_knob_line}
# logsize = 10MiB
# maxlogssize = 100MiB
# machine-id =
@ -82,7 +83,7 @@ logdir = {logdir}
"""
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str,
process_number: int, create_config=True, port=None, ip_address=None):
process_number: int, create_config=True, port=None, ip_address=None, blob_granules_enabled: bool=False):
self.basedir = Path(basedir)
self.etc = self.basedir.joinpath('etc')
self.log = self.basedir.joinpath('log')
@ -100,6 +101,11 @@ logdir = {logdir}
self.process_number = process_number
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
self.first_port = port
self.blob_granules_enabled = blob_granules_enabled
if (blob_granules_enabled):
# add extra process for blob_worker
self.process_number += 1
if (self.first_port is not None):
self.last_used_port = int(self.first_port)-1
self.server_ports = [self.__next_port()
@ -111,6 +117,7 @@ logdir = {logdir}
self.process = None
self.fdbmonitor_logfile = None
self.use_legacy_conf_syntax = False
if create_config:
self.create_cluster_file()
self.save_config()
@ -126,14 +133,18 @@ logdir = {logdir}
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
with open(new_conf_file, 'x') as f:
conf_template = LocalCluster.configuration_template
bg_knob_line = ""
if (self.use_legacy_conf_syntax):
conf_template = conf_template.replace("-", "_")
if (self.blob_granules_enabled):
bg_knob_line = "knob_bg_url=file://" + str(self.data) + "/fdbblob/"
f.write(conf_template.format(
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log,
ip_address=self.ip_address
ip_address=self.ip_address,
bg_knob_line=bg_knob_line
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
@ -143,6 +154,9 @@ logdir = {logdir}
for port in self.server_ports:
f.write('[fdbserver.{server_port}]\n'.format(
server_port=port))
if (self.blob_granules_enabled):
# make last process a blob_worker class
f.write('class = blob_worker')
f.flush()
os.fsync(f.fileno())
@ -202,12 +216,21 @@ logdir = {logdir}
db_config = 'configure new single {}'.format(storage)
if (enable_tenants):
db_config += " tenant_mode=optional_experimental"
if (self.blob_granules_enabled):
db_config += " blob_granules_enabled:=1"
args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', db_config]
res = subprocess.run(args, env=self.process_env())
assert res.returncode == 0, "Create database failed with {}".format(
res.returncode)
if (self.blob_granules_enabled):
bg_args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', 'blobrange start \\x00 \\xff']
bg_res = subprocess.run(bg_args, env=self.process_env())
assert bg_res.returncode == 0, "Start blob granules failed with {}".format(bg_res.returncode)
def get_status(self):
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
'status json']

View File

@ -11,7 +11,7 @@ from pathlib import Path
class TempCluster:
def __init__(self, build_dir: str, process_number: int = 1, port: str = None):
def __init__(self, build_dir: str, process_number: int = 1, port: str = None, blob_granules_enabled: bool = False):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
@ -27,6 +27,7 @@ class TempCluster:
self.build_dir.joinpath("bin", "fdbcli"),
process_number,
port=port,
blob_granules_enabled=blob_granules_enabled
)
self.log = self.cluster.log
self.etc = self.cluster.etc
@ -88,9 +89,14 @@ if __name__ == "__main__":
help='Do not dump cluster log on error',
action="store_true"
)
parser.add_argument(
'--blob-granules-enabled',
help='Enable blob granules',
action="store_true"
)
args = parser.parse_args()
errcode = 1
with TempCluster(args.build_dir, args.process_number) as cluster:
with TempCluster(args.build_dir, args.process_number, blob_granules_enabled=args.blob_granules_enabled) as cluster:
print("log-dir: {}".format(cluster.log))
print("etc-dir: {}".format(cluster.etc))
print("data-dir: {}".format(cluster.data))
@ -105,6 +111,10 @@ if __name__ == "__main__":
cmd_args.append(str(cluster.log))
elif cmd == "@ETC_DIR@":
cmd_args.append(str(cluster.etc))
elif cmd == "@TMP_DIR@":
cmd_args.append(str(cluster.tmp_dir))
elif cmd.startswith("@DATA_DIR@"):
cmd_args.append(str(cluster.data) + cmd[len("@DATA_DIR@"):])
else:
cmd_args.append(cmd)
env = dict(**os.environ)

View File

@ -14,6 +14,7 @@ from threading import Thread, Event
import traceback
import time
from urllib import request
import hashlib
from local_cluster import LocalCluster, random_secret_string
@ -30,6 +31,7 @@ CURRENT_VERSION = "7.2.0"
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
TRANSACTION_RETRY_LIMIT = 100
MAX_DOWNLOAD_ATTEMPTS = 5
RUN_WITH_GDB = False
@ -66,6 +68,23 @@ def random_sleep(minSec, maxSec):
time.sleep(timeSec)
def compute_sha256(filename):
hash = hashlib.sha256()
with open(filename, 'rb') as f:
while True:
data = f.read(128*1024)
if not data:
break
hash.update(data)
return hash.hexdigest()
def read_to_str(filename):
with open(filename, 'r') as f:
return f.read()
class UpgradeTest:
def __init__(self, build_dir: str, upgrade_path: list, process_number: int = 1, port: str = None):
self.build_dir = Path(build_dir).resolve()
@ -133,10 +152,29 @@ class UpgradeTest:
parents=True, exist_ok=True)
remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT,
version, remote_bin_name)
print("Downloading '{}' to '{}'...".format(remote_file, local_file))
request.urlretrieve(remote_file, local_file)
print("Download complete")
assert local_file.exists(), "{} does not exist".format(local_file)
remote_sha256 = "{}.sha256".format(remote_file)
local_sha256 = Path("{}.sha256".format(local_file))
for attempt_cnt in range(MAX_DOWNLOAD_ATTEMPTS):
print("Downloading '{}' to '{}'...".format(remote_file, local_file))
request.urlretrieve(remote_file, local_file)
print("Downloading '{}' to '{}'...".format(
remote_sha256, local_sha256))
request.urlretrieve(remote_sha256, local_sha256)
print("Download complete")
assert local_file.exists(), "{} does not exist".format(local_file)
assert local_sha256.exists(), "{} does not exist".format(local_sha256)
expected_checksum = read_to_str(local_sha256)
actual_checkum = compute_sha256(local_file)
if (expected_checksum == actual_checkum):
print("Checksum OK")
break
print("Checksum mismatch. Expected: {} Actual: {}".format(
expected_checksum, actual_checkum))
if attempt_cnt == MAX_DOWNLOAD_ATTEMPTS-1:
assert False, "Failed to download {} after {} attempts".format(
local_file, MAX_DOWNLOAD_ATTEMPTS)
if makeExecutable:
make_executable(local_file)
@ -246,6 +284,7 @@ class UpgradeTest:
'--api-version', str(self.api_version),
'--log',
'--log-dir', self.log,
'--tmp-dir', self.tmp_dir,
'--transaction-retry-limit', str(TRANSACTION_RETRY_LIMIT)]
if (RUN_WITH_GDB):
cmd_args = ['gdb', '-ex', 'run', '--args'] + cmd_args