Merge branch 'apple:main' into block-down

This commit is contained in:
Bharadwaj V.R 2022-04-22 06:19:38 -07:00 committed by GitHub
commit ed08cfbf52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 1239 additions and 414 deletions

View File

@ -106,6 +106,7 @@ if(NOT WIN32)
test/apitester/TesterApiWrapper.h
test/apitester/TesterTestSpec.cpp
test/apitester/TesterTestSpec.h
test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp
test/apitester/TesterCancelTransactionWorkload.cpp
test/apitester/TesterCorrectnessWorkload.cpp
test/apitester/TesterKeyValueStore.cpp
@ -253,6 +254,23 @@ endif()
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
)
add_fdbclient_test(
NAME fdb_c_api_tests_blob_granule
DISABLE_LOG_DUMP
API_TEST_BLOB_GRANULES_ENABLED
COMMAND ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/run_c_api_tests.py
--cluster-file
@CLUSTER_FILE@
--tester-binary
$<TARGET_FILE:fdb_c_api_tester>
--external-client-library
${CMAKE_CURRENT_BINARY_DIR}/libfdb_c_external.so
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/blobgranuletests
--blob-granule-local-file-path
@DATA_DIR@/fdbblob/
)
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)
add_test(NAME fdb_c_upgrade_single_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py

View File

@ -183,4 +183,63 @@ void ApiWorkload::populateData(TTaskFct cont) {
}
}
void ApiWorkload::randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void ApiWorkload::randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void ApiWorkload::randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
} // namespace FdbApiTester

View File

@ -114,6 +114,11 @@ protected:
// Clear the data of the workload
void clearData(TTaskFct cont);
// common operations
void randomInsertOp(TTaskFct cont);
void randomClearOp(TTaskFct cont);
void randomClearRangeOp(TTaskFct cont);
private:
void populateDataTx(TTaskFct cont);

View File

@ -18,9 +18,9 @@
* limitations under the License.
*/
#include "TesterApiWrapper.h"
#include "TesterUtil.h"
#include <cstdint>
#include <fmt/format.h>
#include <fstream>
namespace FdbApiTester {
@ -60,6 +60,56 @@ std::optional<std::string> ValueFuture::getValue() const {
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
}
std::vector<KeyValue> KeyRangesFuture::getKeyRanges() const {
ASSERT(future_);
int count;
const FDBKeyRange* ranges;
fdb_check(fdb_future_get_keyrange_array(future_.get(), &ranges, &count));
std::vector<KeyValue> result;
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyRange kr = *ranges++;
KeyValue rkv;
rkv.key = std::string((const char*)kr.begin_key, kr.begin_key_length);
rkv.value = std::string((const char*)kr.end_key, kr.end_key_length);
result.push_back(rkv);
}
return result;
}
Result::Result(FDBResult* r) : result_(r, fdb_result_destroy) {}
std::vector<KeyValue> KeyValuesResult::getKeyValues(bool* more_out) {
ASSERT(result_);
int count;
const FDBKeyValue* kvs;
int more;
std::vector<KeyValue> result;
error_ = fdb_result_get_keyvalue_array(result_.get(), &kvs, &count, &more);
if (error_ != error_code_success) {
return result;
}
result.reserve(count);
for (int i = 0; i < count; i++) {
FDBKeyValue kv = *kvs++;
KeyValue rkv;
rkv.key = std::string((const char*)kv.key, kv.key_length);
rkv.value = std::string((const char*)kv.value, kv.value_length);
result.push_back(rkv);
}
*more_out = more;
return result;
}
// Given an FDBDatabase, initializes a new transaction.
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
@ -109,6 +159,87 @@ fdb_error_t Transaction::setOption(FDBTransactionOption option) {
return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0);
}
class TesterGranuleContext {
public:
std::unordered_map<int64_t, uint8_t*> loadsInProgress;
int64_t nextId = 0;
std::string basePath;
~TesterGranuleContext() {
// if there was an error or not all loads finished, delete data
for (auto& it : loadsInProgress) {
uint8_t* dataToFree = it.second;
delete dataToFree;
}
}
};
static int64_t granule_start_load(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
int64_t loadId = ctx->nextId++;
uint8_t* buffer = new uint8_t[length];
std::ifstream fin(ctx->basePath + std::string(filename, filenameLength), std::ios::in | std::ios::binary);
fin.seekg(offset);
fin.read((char*)buffer, length);
ctx->loadsInProgress.insert({ loadId, buffer });
return loadId;
}
static uint8_t* granule_get_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
return ctx->loadsInProgress.at(loadId);
}
static void granule_free_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
auto it = ctx->loadsInProgress.find(loadId);
uint8_t* dataToFree = it->second;
delete dataToFree;
ctx->loadsInProgress.erase(it);
}
KeyValuesResult Transaction::readBlobGranules(std::string_view begin,
std::string_view end,
const std::string& basePath) {
ASSERT(tx_);
TesterGranuleContext testerContext;
testerContext.basePath = basePath;
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = &testerContext;
granuleContext.debugNoMaterialize = false;
granuleContext.granuleParallelism = 1;
granuleContext.start_load_f = &granule_start_load;
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
return KeyValuesResult(fdb_transaction_read_blob_granules(tx_.get(),
(const uint8_t*)begin.data(),
begin.size(),
(const uint8_t*)end.data(),
end.size(),
0 /* beginVersion */,
-2 /* latest read version */,
granuleContext));
}
KeyRangesFuture Transaction::getBlobGranuleRanges(std::string_view begin, std::string_view end) {
ASSERT(tx_);
return KeyRangesFuture(fdb_transaction_get_blob_granule_ranges(
tx_.get(), (const uint8_t*)begin.data(), begin.size(), (const uint8_t*)end.data(), end.size()));
}
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {
return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}

View File

@ -26,6 +26,7 @@
#include <string_view>
#include <optional>
#include <memory>
#include <unordered_map>
#define FDB_API_VERSION 720
#include "bindings/c/foundationdb/fdb_c.h"
@ -35,6 +36,8 @@
#include "flow/error_definitions.h"
#include "TesterUtil.h"
namespace FdbApiTester {
// Wrapper parent class to manage memory of an FDBFuture pointer. Cleans up
@ -62,6 +65,37 @@ public:
std::optional<std::string> getValue() const;
};
class KeyRangesFuture : public Future {
public:
KeyRangesFuture() = default;
KeyRangesFuture(FDBFuture* f) : Future(f) {}
std::vector<KeyValue> getKeyRanges() const;
};
class Result {
public:
Result() = default;
Result(FDBResult* r);
FDBResult* fdbResult() { return result_.get(); };
fdb_error_t getError() const { return error_; }
explicit operator bool() const { return result_ != nullptr; };
fdb_error_t error_ = error_code_client_invalid_operation; // have to call getX function to set this
protected:
std::shared_ptr<FDBResult> result_;
};
class KeyValuesResult : public Result {
public:
KeyValuesResult() = default;
KeyValuesResult(FDBResult* f) : Result(f) {}
std::vector<KeyValue> getKeyValues(bool* more_out);
};
class Transaction {
public:
Transaction() = default;
@ -76,6 +110,9 @@ public:
void reset();
fdb_error_t setOption(FDBTransactionOption option);
KeyValuesResult readBlobGranules(std::string_view begin, std::string_view end, const std::string& basePath);
KeyRangesFuture getBlobGranuleRanges(std::string_view begin, std::string_view end);
private:
std::shared_ptr<FDBTransaction> tx_;
};

View File

@ -0,0 +1,159 @@
/*
* TesterBlobGranuleCorrectnessWorkload.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TesterApiWorkload.h"
#include "TesterUtil.h"
#include <memory>
#include <fmt/format.h>
namespace FdbApiTester {
class ApiBlobGranuleCorrectnessWorkload : public ApiWorkload {
public:
ApiBlobGranuleCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
// sometimes don't do range clears
if (Random::get().randomInt(0, 1) == 0) {
excludedOpTypes.push_back(OP_CLEAR_RANGE);
}
}
private:
enum OpType { OP_INSERT, OP_CLEAR, OP_CLEAR_RANGE, OP_READ, OP_GET_RANGES, OP_LAST = OP_GET_RANGES };
std::vector<OpType> excludedOpTypes;
void randomReadOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
ctx->tx()->setOption(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE);
KeyValuesResult res = ctx->tx()->readBlobGranules(begin, end, ctx->getBGBasePath());
bool more;
(*results) = res.getKeyValues(&more);
ASSERT(!more);
if (res.getError() != error_code_success) {
ctx->onError(res.getError());
} else {
ctx->done();
}
},
[this, begin, end, results, cont]() {
std::vector<KeyValue> expected = store.getRange(begin, end, store.size(), false);
if (results->size() != expected.size()) {
error(fmt::format("randomReadOp result size mismatch. expected: {} actual: {}",
expected.size(),
results->size()));
}
ASSERT(results->size() == expected.size());
for (int i = 0; i < results->size(); i++) {
if ((*results)[i].key != expected[i].key) {
error(fmt::format("randomReadOp key mismatch at {}/{}. expected: {} actual: {}",
i,
results->size(),
expected[i].key,
(*results)[i].key));
}
ASSERT((*results)[i].key == expected[i].key);
if ((*results)[i].value != expected[i].value) {
error(
fmt::format("randomReadOp value mismatch at {}/{}. key: {} expected: {:.80} actual: {:.80}",
i,
results->size(),
expected[i].key,
expected[i].value,
(*results)[i].value));
}
ASSERT((*results)[i].value == expected[i].value);
}
schedule(cont);
});
}
void randomGetRangesOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
auto results = std::make_shared<std::vector<KeyValue>>();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end, results](auto ctx) {
KeyRangesFuture f = ctx->tx()->getBlobGranuleRanges(begin, end);
ctx->continueAfter(
f,
[ctx, f, results]() {
(*results) = f.getKeyRanges();
ctx->done();
},
true);
},
[this, begin, end, results, cont]() {
ASSERT(results->size() > 0);
ASSERT(results->front().key <= begin);
ASSERT(results->back().value >= end);
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
ASSERT((*results)[i].key < (*results)[i].value);
}
for (int i = 1; i < results->size(); i++) {
// ranges contain entire requested key range
ASSERT((*results)[i].key == (*results)[i - 1].value);
}
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
while (std::count(excludedOpTypes.begin(), excludedOpTypes.end(), txType)) {
txType = (OpType)Random::get().randomInt(0, OP_LAST);
}
switch (txType) {
case OP_INSERT:
randomInsertOp(cont);
break;
case OP_CLEAR:
randomClearOp(cont);
break;
case OP_CLEAR_RANGE:
randomClearRangeOp(cont);
break;
case OP_READ:
randomReadOp(cont);
break;
case OP_GET_RANGES:
randomGetRangesOp(cont);
break;
}
}
};
WorkloadFactory<ApiBlobGranuleCorrectnessWorkload> ApiBlobGranuleCorrectnessWorkloadFactory(
"ApiBlobGranuleCorrectness");
} // namespace FdbApiTester

View File

@ -31,27 +31,6 @@ public:
private:
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
void randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
for (int i = 0; i < numKeys; i++) {
kvPairs->push_back(KeyValue{ randomNotExistingKey(), randomValue() });
}
execTransaction(
[kvPairs](auto ctx) {
for (const KeyValue& kv : *kvPairs) {
ctx->tx()->set(kv.key, kv.value);
}
ctx->commit();
},
[this, kvPairs, cont]() {
for (const KeyValue& kv : *kvPairs) {
store.set(kv.key, kv.value);
}
schedule(cont);
});
}
void randomCommitReadOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
@ -145,44 +124,6 @@ private:
});
}
void randomClearOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto keys = std::make_shared<std::vector<std::string>>();
for (int i = 0; i < numKeys; i++) {
keys->push_back(randomExistingKey());
}
execTransaction(
[keys](auto ctx) {
for (const auto& key : *keys) {
ctx->tx()->clear(key);
}
ctx->commit();
},
[this, keys, cont]() {
for (const auto& key : *keys) {
store.clear(key);
}
schedule(cont);
});
}
void randomClearRangeOp(TTaskFct cont) {
std::string begin = randomKeyName();
std::string end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
ctx->tx()->clearRange(begin, end);
ctx->commit();
},
[this, begin, end, cont]() {
store.clear(begin, end);
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {

View File

@ -30,12 +30,9 @@
#include <vector>
#include <mutex>
namespace FdbApiTester {
#include "TesterUtil.h"
struct KeyValue {
std::string key;
std::string value;
};
namespace FdbApiTester {
class KeyValueStore {
public:

View File

@ -49,6 +49,7 @@ public:
int numClients;
std::vector<std::pair<std::string, std::string>> knobs;
TestSpec testSpec;
std::string bgBasePath;
};
} // namespace FdbApiTester

View File

@ -75,9 +75,10 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
int retryLimit,
std::string bgBasePath)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false) {}
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath) {}
// A state machine:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
@ -123,6 +124,8 @@ public:
contAfterDone();
}
std::string getBGBasePath() override { return bgBasePath; }
protected:
virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
@ -217,6 +220,9 @@ protected:
// A history of errors on which the transaction was retried
std::vector<fdb_error_t> retriedErrors;
// blob granule base path
std::string bgBasePath;
};
/**
@ -228,8 +234,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -316,8 +323,9 @@ public:
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
int retryLimit,
std::string bgBasePath)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit, bgBasePath) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -482,9 +490,10 @@ class TransactionExecutorBase : public ITransactionExecutor {
public:
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
void init(IScheduler* scheduler, const char* clusterFile) override {
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
this->scheduler = scheduler;
this->clusterFile = clusterFile;
this->bgBasePath = bgBasePath;
}
protected:
@ -499,10 +508,10 @@ protected:
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
} else {
ctx = std::make_shared<AsyncTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
tx, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath);
}
txActor->init(ctx);
txActor->start();
@ -511,6 +520,7 @@ protected:
protected:
TransactionExecutorOptions options;
std::string bgBasePath;
std::string clusterFile;
IScheduler* scheduler;
};
@ -524,8 +534,8 @@ public:
~DBPoolTransactionExecutor() override { release(); }
void init(IScheduler* scheduler, const char* clusterFile) override {
TransactionExecutorBase::init(scheduler, clusterFile);
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
for (int i = 0; i < options.numDatabases; i++) {
FDBDatabase* db;
fdb_error_t err = fdb_create_database(clusterFile, &db);

View File

@ -55,6 +55,9 @@ public:
// Mark the transaction as completed without committing it (for read transactions)
virtual void done() = 0;
// Plumbing for blob granule base path
virtual std::string getBGBasePath() = 0;
// A continuation to be executed when all of the given futures get ready
virtual void continueAfterAll(std::vector<Future> futures, TTaskFct cont);
};
@ -136,7 +139,7 @@ struct TransactionExecutorOptions {
class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile) = 0;
virtual void init(IScheduler* sched, const char* clusterFile, const std::string& bgBasePath) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
};

View File

@ -49,6 +49,11 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
namespace FdbApiTester {
struct KeyValue {
std::string key;
std::string value;
};
std::string lowerCase(const std::string& str);
class Random {

View File

@ -0,0 +1,24 @@
[[test]]
title = 'Blob GranuleAPI Correctness Blocking'
multiThreaded = true
buggify = true
blockOnFutures = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,23 @@
[[test]]
title = 'Blob Granule API Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -0,0 +1,15 @@
[[test]]
title = 'Blob Granule API Correctness Single Threaded'
minClients = 1
maxClients = 3
multiThreaded = false
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
numRandomOperations = 100

View File

@ -51,7 +51,8 @@ enum TesterOptionId {
OPT_INPUT_PIPE,
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT
OPT_TRANSACTION_RETRY_LIMIT,
OPT_BLOB_GRANULE_LOCAL_FILE_PATH
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -73,6 +74,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
{ OPT_BLOB_GRANULE_LOCAL_FILE_PATH, "--blob-granule-local-file-path", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -108,6 +110,8 @@ void printProgramUsage(const char* execName) {
" Required FDB API version (default %d).\n"
" --transaction-retry-limit NUMBER\n"
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
" --blob-granule-local-file-path PATH\n"
" Path to blob granule files on local filesystem\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" -h, --help Display this help and exit.\n",
@ -200,6 +204,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_TRANSACTION_RETRY_LIMIT:
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
break;
case OPT_BLOB_GRANULE_LOCAL_FILE_PATH:
options.bgBasePath = args.OptionArg();
break;
}
return true;
}
@ -295,7 +302,7 @@ bool runWorkloads(TesterOptions& options) {
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
txExecutor->init(scheduler.get(), options.clusterFile.c_str(), options.bgBasePath);
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {

View File

@ -53,6 +53,9 @@ def run_tester(args, test_file):
args.cluster_file, "--test-file", test_file]
if args.external_client_library is not None:
cmd += ["--external-client-library", args.external_client_library]
if args.blob_granule_local_file_path is not None:
cmd += ["--blob-granule-local-file-path", args.blob_granule_local_file_path]
get_logger().info('\nRunning tester \'%s\'...' % ' '.join(cmd))
proc = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
@ -79,11 +82,9 @@ def run_tester(args, test_file):
get_logger().info('')
return ret_code
def run_tests(args):
num_failed = 0
test_files = [f for f in os.listdir(args.test_dir)
if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
test_files = [f for f in os.listdir(args.test_dir) if os.path.isfile(os.path.join(args.test_dir, f)) and f.endswith(".toml")]
for test_file in test_files:
get_logger().info('=========================================================')
@ -111,6 +112,8 @@ def parse_args(argv):
help='The timeout in seconds for running each individual test. (default 300)')
parser.add_argument('--logging-level', type=str, default='INFO',
choices=['ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Specifies the level of detail in the tester output (default=\'INFO\').')
parser.add_argument('--blob-granule-local-file-path', type=str, default=None,
help='Enable blob granule tests if set, value is path to local blob granule files')
return parser.parse_args(argv)

View File

@ -405,6 +405,7 @@ endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)
function(add_fdbclient_test)
set(options DISABLED ENABLED DISABLE_LOG_DUMP)
set(options DISABLED ENABLED API_TEST_BLOB_GRANULES_ENABLED)
set(oneValueArgs NAME PROCESS_NUMBER TEST_TIMEOUT WORKING_DIRECTORY)
set(multiValueArgs COMMAND)
cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -431,6 +432,9 @@ function(add_fdbclient_test)
if(T_DISABLE_LOG_DUMP)
list(APPEND TMP_CLUSTER_CMD --disable-log-dump)
endif()
if(T_API_TEST_BLOB_GRANULES_ENABLED)
list(APPEND TMP_CLUSTER_CMD --blob-granules-enabled)
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
WORKING_DIRECTORY ${T_WORKING_DIRECTORY}

View File

@ -17,6 +17,8 @@ API version 720
General
-------
* Special keys ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` are removed in 7.2 and the functionalities they provide are now covered by the global configuration module.
.. _api-version-upgrade-guide-710:
API version 710

View File

@ -736,6 +736,7 @@
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",
@ -749,6 +750,7 @@
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",

View File

@ -191,7 +191,6 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/options/excluded/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/options/failed/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed/<exclusion>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/min_required_commit_version`` Read/write. Changing this key will change the corresponding system key ``\xff/minRequiredCommitVersion = [[Version]]``. The value of this special key is the literal text of the underlying ``Version``, which is ``int64_t``. If you set the key with a value failed to be parsed as ``int64_t``, ``special_keys_api_failure`` will be thrown. In addition, the given ``Version`` should be larger than the current read version and smaller than the upper bound(``2**63-1-version_per_second*3600*24*365*1000``). Otherwise, ``special_keys_api_failure`` is thrown. For more details, see help text of ``fdbcli`` command ``advanceversion``.
#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
#. ``\xff\xff/management/maintenance/<zone_id> := <seconds>`` Read/write. Set/clear a key in this range will change the corresponding system key ``\xff\x02/healthyZone``. The value is a literal text of a non-negative ``double`` which represents the remaining time for the zone to be in maintenance. Commiting with an invalid value will throw ``special_keys_api_failure``. Only one zone is allowed to be in maintenance at the same time. Setting a new key in the range will override the old one and the transaction will throw ``special_keys_api_failure`` error if more than one zone is given. For more details, see help text of ``fdbcli`` command ``maintenance``.
In addition, a special key ``\xff\xff/management/maintenance/IgnoreSSFailures`` in the range, if set, will disable datadistribution for storage server failures.
It is doing the same thing as the fdbcli command ``datadistribution disable ssfailure``.
@ -264,6 +263,26 @@ clients can connect FoundationDB transactions to outside events.
#. ``\xff\xff/tracing/transaction_id := <transaction_id>`` Read/write. A 64-bit integer transaction ID which follows the transaction as it moves through FoundationDB. All transactions are assigned a random transaction ID on creation, and this key can be read to surface the randomly generated ID. Alternatively, set this key to provide a custom identifier. When setting this key, provide a string in the form of a 64-bit integer, which will be automatically converted to the appropriate type.
#. ``\xff\xff/tracing/token := <tracing_enabled>`` Read/write. Set to true/false to enable or disable tracing for the transaction, respectively. If read, returns a 64-bit integer set to 0 if tracing has been disabled, or a random 64-bit integer otherwise (this integers value has no meaning to the client other than to determine whether the transaction will be traced).
.. _special-key-space-deprecation:
Deprecated Keys
===============
Listed below are the special keys that have been deprecated. Special key(s) will no longer be accessible when the client specifies an API version equal to or larger than the version where they were deprecated. Clients specifying older API versions will be able to continue using the deprecated key(s).
#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Deprecated as of API version 7.2. The corresponding functionalities are now covered by the global configuration module. For details, see :doc:`global-configuration`. Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
Versioning
==========
For how FDB clients deal with versioning, see :ref:`api-versions`. The special key space deals with versioning by using the ``API_VERSION`` passed to initialize the client. Any module added at a version larger than the API version set by the client will be inaccessible. For example, if a module is added in version 7.0 and the client sets its API version to 630, then the module will not available. When removing or updating existing modules, module developers need to continue to provide the old behavior for clients that specify old API versions.
To remove the functionality of a certain special key(s), specify the API version where the function is being deprecated in the ``registerSpecialKeysImpl`` function. When a client specifies an API version greater than or equal to the deprecation version, the functionality will not be available. Move and update its documentation to :ref:`special-key-space-deprecation`.
To update the implementation of any special keys, add the new implementation and use ``API_VERSION`` to switch between different implementations.
Add notes in ``api-version-upgrade-guide.rst`` if you either remove or update a special key(s) implementation.
.. [#conflicting_keys] In practice, the transaction probably committed successfully. However, if you're running multiple resolvers then it's possible for a transaction to cause another to abort even if it doesn't commit successfully.
.. [#max_read_transaction_life_versions] The number 5000000 comes from the server knob MAX_READ_TRANSACTION_LIFE_VERSIONS
.. [#special_key_space_enable_writes] Enabling this option enables other transaction options, such as ``ACCESS_SYSTEM_KEYS``. This may change in the future.

View File

@ -195,6 +195,12 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL:
printf("Database created\n");
fprintf(
stderr,
"WARN: Sharded RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
fprintf(stderr, "ERROR: The database is unavailable\n");
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
@ -260,6 +266,12 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
fprintf(stderr,
"WARN: RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL:
printf("Configuration changed\n");
fprintf(
stderr,
"WARN: Sharded RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
default:
ASSERT(false);
ret = false;

View File

@ -303,6 +303,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["storage_engine"] = "ssd-rocksdb-v1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
result["storage_engine"] = "ssd-sharded-rocksdb";
} else if (tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY) {
result["storage_engine"] = "memory-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 &&
@ -325,6 +328,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["tss_storage_engine"] = "ssd-redwood-1-experimental";
} else if (testingStorageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
result["tss_storage_engine"] = "ssd-rocksdb-v1";
} else if (testingStorageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
result["tss_storage_engine"] = "ssd-sharded-rocksdb";
} else if (testingStorageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE) {
result["tss_storage_engine"] = "memory-radixtree-beta";
} else if (testingStorageServerStoreType == KeyValueStoreType::MEMORY) {

View File

@ -594,9 +594,10 @@ public:
AsyncTrigger updateCache;
std::vector<std::unique_ptr<SpecialKeyRangeReadImpl>> specialKeySpaceModules;
std::unique_ptr<SpecialKeySpace> specialKeySpace;
void registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl);
void registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl,
int deprecatedVersion = -1);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;

View File

@ -825,7 +825,16 @@ struct KeyValueStoreType {
// These enumerated values are stored in the database configuration, so should NEVER be changed.
// Only add new ones just before END.
// SS storeType is END before the storageServerInterface is initialized.
enum StoreType { SSD_BTREE_V1, MEMORY, SSD_BTREE_V2, SSD_REDWOOD_V1, MEMORY_RADIXTREE, SSD_ROCKSDB_V1, END };
enum StoreType {
SSD_BTREE_V1,
MEMORY,
SSD_BTREE_V2,
SSD_REDWOOD_V1,
MEMORY_RADIXTREE,
SSD_ROCKSDB_V1,
SSD_SHARDED_ROCKSDB,
END
};
KeyValueStoreType() : type(END) {}
KeyValueStoreType(StoreType type) : type(type) {
@ -850,6 +859,8 @@ struct KeyValueStoreType {
return "ssd-redwood-1-experimental";
case SSD_ROCKSDB_V1:
return "ssd-rocksdb-v1";
case SSD_SHARDED_ROCKSDB:
return "ssd-sharded-rocksdb";
case MEMORY:
return "memory";
case MEMORY_RADIXTREE:

View File

@ -66,7 +66,9 @@ enum class ConfigurationResult {
SUCCESS_WARN_PPW_GRADUAL,
SUCCESS,
SUCCESS_WARN_ROCKSDB_EXPERIMENTAL,
SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
};
enum class CoordinatorsResult {
@ -293,6 +295,7 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
state bool warnPPWGradual = false;
state bool warnChangeStorageNoMigrate = false;
state bool warnRocksDBIsExperimental = false;
state bool warnShardedRocksDBIsExperimental = false;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -483,6 +486,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
} else if (newConfig.storageServerStoreType != oldConfig.storageServerStoreType &&
newConfig.storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
warnRocksDBIsExperimental = true;
} else if (newConfig.storageServerStoreType != oldConfig.storageServerStoreType &&
newConfig.storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
warnShardedRocksDBIsExperimental = true;
}
}
}
@ -534,6 +540,9 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
else if (m[configKeysPrefix.toString() + "storage_engine"] ==
std::to_string(KeyValueStoreType::SSD_ROCKSDB_V1))
return ConfigurationResult::DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL;
else if (m[configKeysPrefix.toString() + "storage_engine"] ==
std::to_string(KeyValueStoreType::SSD_SHARDED_ROCKSDB))
return ConfigurationResult::DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL;
else
return ConfigurationResult::DATABASE_CREATED;
} catch (Error& e2) {
@ -549,6 +558,8 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db, std::map<std::string,
return ConfigurationResult::SUCCESS_WARN_PPW_GRADUAL;
} else if (warnRocksDBIsExperimental) {
return ConfigurationResult::SUCCESS_WARN_ROCKSDB_EXPERIMENTAL;
} else if (warnShardedRocksDBIsExperimental) {
return ConfigurationResult::SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL;
} else {
return ConfigurationResult::SUCCESS;
}

View File

@ -217,6 +217,9 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
} else if (mode == "ssd-rocksdb-v1") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::SSD_ROCKSDB_V1;
} else if (mode == "ssd-sharded-rocksdb") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::SSD_SHARDED_ROCKSDB;
} else if (mode == "memory" || mode == "memory-2") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType = KeyValueStoreType::MEMORY;

View File

@ -300,7 +300,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
int count;
FdbCApi::fdb_bool_t more;
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
ASSERT(!error);
if (error) {
return ThreadResult<RangeResult>(Error(error));
}
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
return ThreadResult<RangeResult>(

View File

@ -1231,11 +1231,16 @@ Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed);
}
void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
// register a special key(s) implementation under the specified module
void DatabaseContext::registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl>&& impl,
int deprecatedVersion) {
// if deprecated, add the implementation when the api version is less than the deprecated version
if (deprecatedVersion == -1 || apiVersion < deprecatedVersion) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
}
}
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
@ -1475,188 +1480,188 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
if (apiVersionAtLeast(710)) {
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TenantMapRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("tenantmap")));
}
if (apiVersionAtLeast(700)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin,
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin,
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ManagementCommandsOptionsImpl>(
KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludeServersRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("exclude")));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedServersRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("failed")));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("excludedlocality")));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("failedlocality")));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("excludedlocality")));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedLocalitiesRangeImpl>(
SpecialKeySpace::getManagementApiCommandRange("failedlocality")));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ExclusionInProgressRangeImpl>(
KeyRangeRef(LiteralStringRef("in_progress_exclusion/"), LiteralStringRef("in_progress_exclusion0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ProcessClassRangeImpl>(
KeyRangeRef(LiteralStringRef("process/class_type/"), LiteralStringRef("process/class_type0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ProcessClassSourceRangeImpl>(
KeyRangeRef(LiteralStringRef("process/class_source/"), LiteralStringRef("process/class_source0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<LockDatabaseImpl>(
singleKeyRange(LiteralStringRef("db_locked"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ConsistencyCheckImpl>(
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::GLOBALCONFIG,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<GlobalConfigImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::TRACING,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONFIGURATION,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<CoordinatorsImpl>(
KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<CoordinatorsAutoImpl>(
singleKeyRange(LiteralStringRef("auto_coordinators"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<AdvanceVersionImpl>(
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<VersionEpochImpl>(
singleKeyRange(LiteralStringRef("version_epoch"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ClientProfilingImpl>(
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)),
/* deprecated */ 720);
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<MaintenanceImpl>(
KeyRangeRef(LiteralStringRef("maintenance/"), LiteralStringRef("maintenance0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<DataDistributionImpl>(
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::ACTORLINEAGE,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ActorLineageImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ActorProfilerConf>(SpecialKeySpace::getModuleRange(
SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF)));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ActorProfilerConf>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::TRANSACTION,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<HealthMetricsRangeImpl>(KeyRangeRef(LiteralStringRef("\xff\xff/metrics/health/"),
LiteralStringRef("\xff\xff/metrics/health0"))));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::WORKERINTERFACE,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::STATUSJSON,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionRecord()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CLUSTERFILEPATH,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> output =
StringRef(ryw->getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::STATUSJSON,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
++ryw->getDatabase()->transactionStatusRequests;
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeysImpl(SpecialKeySpace::MODULE::CLUSTERFILEPATH,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() &&
ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> output =
StringRef(ryw->getDatabase()->getConnectionRecord()->getLocation());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::CONNECTIONSTRING,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
@ -9392,4 +9397,4 @@ ACTOR Future<Void> waitPurgeGranulesCompleteActor(Reference<DatabaseContext> db,
Future<Void> DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) {
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
}
}

View File

@ -769,6 +769,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",
@ -782,6 +783,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ssd-2",
"ssd-redwood-1-experimental",
"ssd-rocksdb-v1",
"ssd-sharded-rocksdb",
"memory",
"memory-1",
"memory-2",

View File

@ -381,7 +381,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, "fdb");
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 );
init( ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT, 64000000000 ); // 64GB, Rocksdb option, Writes will slow down.
init( ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT, 100000000000 ); // 100GB, Rocksdb option, Writes will stall.
@ -393,6 +393,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD, 5 );
init( ROCKSDB_COMPACTION_READAHEAD_SIZE, 32768 ); // 32 KB, performs bigger reads when doing compaction.
init( ROCKSDB_BLOCK_SIZE, 32768 ); // 32 KB, size of the block in rocksdb cache.
init( ENABLE_SHARDED_ROCKSDB, false );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -322,6 +322,7 @@ public:
int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE;
int64_t ROCKSDB_BLOCK_SIZE;
bool ENABLE_SHARDED_ROCKSDB;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -548,6 +548,8 @@ void SpecialKeySpace::registerKeyRange(SpecialKeySpace::MODULE module,
SpecialKeySpace::IMPLTYPE type,
const KeyRangeRef& kr,
SpecialKeyRangeReadImpl* impl) {
// Not allowed to register an empty range
ASSERT(!kr.empty());
// module boundary check
if (module == SpecialKeySpace::MODULE::TESTONLY) {
ASSERT(normalKeys.contains(kr));
@ -1999,7 +2001,6 @@ ACTOR static Future<RangeResult> ClientProfilingGetRangeActor(ReadYourWritesTran
return result;
}
// TODO : add limitation on set operation
Future<RangeResult> ClientProfilingImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {

View File

@ -485,6 +485,7 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
// Deprecated as of 7.2
class ClientProfilingImpl : public SpecialKeyRangeRWImpl {
public:
explicit ClientProfilingImpl(KeyRangeRef kr);

View File

@ -37,6 +37,7 @@
#include "flow/network.h"
#include <boost/mpl/not.hpp>
#include <utility>
#include "flow/actorcompiler.h" // This must be the last #include.
@ -155,25 +156,26 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
// Scan the cached cipher-keys and filter our baseCipherIds locally cached
// for the rest, reachout to KMS to fetch the required details
std::vector<EncryptBaseCipherId> lookupCipherIds;
state std::unordered_map<EncryptBaseCipherId, Standalone<StringRef>> cachedKeys;
for (EncryptBaseCipherId id : req.baseCipherIds) {
const auto itr = ekpProxyData->baseCipherKeyIdCache.find(id);
if (itr != ekpProxyData->baseCipherKeyIdCache.end()) {
ASSERT(itr->second.isValid());
cachedKeys.emplace(id, itr->second.baseCipherKey);
} else {
lookupCipherIds.push_back(id);
}
}
ekpProxyData->baseCipherKeyIdCacheHits += cachedKeys.size();
ekpProxyData->baseCipherKeyIdCacheMisses += lookupCipherIds.size();
std::vector<std::pair<EncryptBaseCipherId, EncryptDomainId>> lookupCipherIds;
state std::vector<EKPBaseCipherDetails> cachedCipherDetails;
state EKPGetBaseCipherKeysByIdsRequest keysByIds = req;
state EKPGetBaseCipherKeysByIdsReply keyIdsReply;
for (const auto& item : req.baseCipherIds) {
const auto itr = ekpProxyData->baseCipherKeyIdCache.find(item.first);
if (itr != ekpProxyData->baseCipherKeyIdCache.end()) {
ASSERT(itr->second.isValid());
cachedCipherDetails.emplace_back(
itr->second.domainId, itr->second.baseCipherId, itr->second.baseCipherKey, keyIdsReply.arena);
} else {
lookupCipherIds.emplace_back(std::make_pair(item.first, item.second));
}
}
ekpProxyData->baseCipherKeyIdCacheHits += cachedCipherDetails.size();
ekpProxyData->baseCipherKeyIdCacheMisses += lookupCipherIds.size();
if (g_network->isSimulated()) {
if (!lookupCipherIds.empty()) {
try {
@ -181,16 +183,17 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
SimGetEncryptKeysByKeyIdsReply simKeyIdsReply =
wait(simKmsInterface.encryptKeyLookupByKeyIds.getReply(simKeyIdsReq));
for (const auto& item : simKeyIdsReply.encryptKeyMap) {
keyIdsReply.baseCipherMap.emplace(item.first, StringRef(keyIdsReply.arena, item.second));
for (const auto& item : simKeyIdsReply.encryptKeyDetails) {
keyIdsReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.encryptKeyId, item.encryptKey, keyIdsReply.arena);
}
// Record the fetched cipher details to the local cache for the future references
// Note: cache warm-up is done after reponding to the caller
for (auto& item : simKeyIdsReply.encryptKeyMap) {
for (auto& item : simKeyIdsReply.encryptKeyDetails) {
// DomainId isn't available here, the caller must know the encryption domainId
ekpProxyData->insertIntoBaseCipherIdCache(0, item.first, item.second);
ekpProxyData->insertIntoBaseCipherIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
@ -207,11 +210,11 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
throw not_implemented();
}
for (auto& item : cachedKeys) {
keyIdsReply.baseCipherMap.emplace(item.first, item.second);
}
// Append cached cipherKeyDetails to the result-set
keyIdsReply.baseCipherDetails.insert(
keyIdsReply.baseCipherDetails.end(), cachedCipherDetails.begin(), cachedCipherDetails.end());
keyIdsReply.numHits = cachedKeys.size();
keyIdsReply.numHits = cachedCipherDetails.size();
keysByIds.reply.send(keyIdsReply);
return Void();
@ -223,7 +226,7 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
// Scan the cached cipher-keys and filter our baseCipherIds locally cached
// for the rest, reachout to KMS to fetch the required details
state std::unordered_map<EncryptBaseCipherId, EKPBaseCipherDetails> cachedKeys;
state std::vector<EKPBaseCipherDetails> cachedCipherDetails;
state EKPGetLatestBaseCipherKeysRequest latestKeysReq = req;
state EKPGetLatestBaseCipherKeysReply latestCipherReply;
state Arena& arena = latestCipherReply.arena;
@ -231,17 +234,17 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
// First, check if the requested information is already cached by the server.
// Ensure the cached information is within FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL time window.
std::vector<EncryptBaseCipherId> lookupCipherDomains;
std::vector<EncryptDomainId> lookupCipherDomains;
for (EncryptDomainId id : req.encryptDomainIds) {
const auto itr = ekpProxyData->baseCipherDomainIdCache.find(id);
if (itr != ekpProxyData->baseCipherDomainIdCache.end() && itr->second.isValid()) {
cachedKeys.emplace(id, EKPBaseCipherDetails(itr->second.baseCipherId, itr->second.baseCipherKey, arena));
cachedCipherDetails.emplace_back(id, itr->second.baseCipherId, itr->second.baseCipherKey, arena);
} else {
lookupCipherDomains.push_back(id);
}
}
ekpProxyData->baseCipherDomainIdCacheHits += cachedKeys.size();
ekpProxyData->baseCipherDomainIdCacheHits += cachedCipherDetails.size();
ekpProxyData->baseCipherDomainIdCacheMisses += lookupCipherDomains.size();
if (g_network->isSimulated()) {
@ -251,13 +254,12 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
SimGetEncryptKeyByDomainIdReply simKeysByDomainIdRep =
wait(simKmsInterface.encryptKeyLookupByDomainId.getReply(simKeysByDomainIdReq));
for (auto& item : simKeysByDomainIdRep.encryptKeyMap) {
latestCipherReply.baseCipherDetailMap.emplace(
item.first, EKPBaseCipherDetails(item.second.encryptKeyId, item.second.encryptKey, arena));
for (auto& item : simKeysByDomainIdRep.encryptKeyDetails) {
latestCipherReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.encryptKeyId, item.encryptKey, arena);
// Record the fetched cipher details to the local cache for the future references
ekpProxyData->insertIntoBaseDomainIdCache(
item.first, item.second.encryptKeyId, item.second.encryptKey);
ekpProxyData->insertIntoBaseDomainIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
} catch (Error& e) {
if (!canReplyWith(e)) {
@ -274,12 +276,12 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
throw not_implemented();
}
for (auto& item : cachedKeys) {
latestCipherReply.baseCipherDetailMap.emplace(
item.first, EKPBaseCipherDetails(item.second.baseCipherId, item.second.baseCipherKey, arena));
for (auto& item : cachedCipherDetails) {
latestCipherReply.baseCipherDetails.emplace_back(
item.encryptDomainId, item.baseCipherId, item.baseCipherKey, arena);
}
latestCipherReply.numHits = cachedKeys.size();
latestCipherReply.numHits = cachedCipherDetails.size();
latestKeysReq.reply.send(latestCipherReply);
return Void();
@ -300,12 +302,12 @@ ACTOR Future<Void> refreshEncryptionKeysUsingSimKms(Reference<EncryptKeyProxyDat
req.encryptDomainIds.emplace_back(item.first);
}
SimGetEncryptKeyByDomainIdReply rep = wait(simKmsInterface.encryptKeyLookupByDomainId.getReply(req));
for (auto& item : rep.encryptKeyMap) {
ekpProxyData->insertIntoBaseDomainIdCache(item.first, item.second.encryptKeyId, item.second.encryptKey);
for (auto& item : rep.encryptKeyDetails) {
ekpProxyData->insertIntoBaseDomainIdCache(item.encryptDomainId, item.encryptKeyId, item.encryptKey);
}
ekpProxyData->baseCipherKeysRefreshed += rep.encryptKeyMap.size();
TraceEvent("RefreshEKs_Done", ekpProxyData->myId).detail("KeyCount", rep.encryptKeyMap.size());
ekpProxyData->baseCipherKeysRefreshed += rep.encryptKeyDetails.size();
TraceEvent("RefreshEKs_Done", ekpProxyData->myId).detail("KeyCount", rep.encryptKeyDetails.size());
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent("RefreshEncryptionKeys_Error").error(e);

View File

@ -90,10 +90,26 @@ struct HaltEncryptKeyProxyRequest {
}
};
struct EKPBaseCipherDetails {
constexpr static FileIdentifier file_identifier = 2149615;
int64_t encryptDomainId;
uint64_t baseCipherId;
StringRef baseCipherKey;
EKPBaseCipherDetails() : encryptDomainId(0), baseCipherId(0), baseCipherKey(StringRef()) {}
explicit EKPBaseCipherDetails(int64_t dId, uint64_t id, StringRef key, Arena& arena)
: encryptDomainId(dId), baseCipherId(id), baseCipherKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainId, baseCipherId, baseCipherKey);
}
};
struct EKPGetBaseCipherKeysByIdsReply {
constexpr static FileIdentifier file_identifier = 9485259;
Arena arena;
std::unordered_map<uint64_t, StringRef> baseCipherMap;
std::vector<EKPBaseCipherDetails> baseCipherDetails;
int numHits;
Optional<Error> error;
@ -101,18 +117,18 @@ struct EKPGetBaseCipherKeysByIdsReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, baseCipherMap, numHits, error);
serializer(ar, arena, baseCipherDetails, numHits, error);
}
};
struct EKPGetBaseCipherKeysByIdsRequest {
constexpr static FileIdentifier file_identifier = 4930263;
UID requesterID;
std::vector<uint64_t> baseCipherIds;
std::vector<std::pair<uint64_t, int64_t>> baseCipherIds;
ReplyPromise<EKPGetBaseCipherKeysByIdsReply> reply;
EKPGetBaseCipherKeysByIdsRequest() : requesterID(deterministicRandom()->randomUniqueID()) {}
explicit EKPGetBaseCipherKeysByIdsRequest(UID uid, const std::vector<uint64_t>& ids)
explicit EKPGetBaseCipherKeysByIdsRequest(UID uid, const std::vector<std::pair<uint64_t, int64_t>>& ids)
: requesterID(uid), baseCipherIds(ids) {}
template <class Ar>
@ -121,35 +137,20 @@ struct EKPGetBaseCipherKeysByIdsRequest {
}
};
struct EKPBaseCipherDetails {
constexpr static FileIdentifier file_identifier = 2149615;
uint64_t baseCipherId;
StringRef baseCipherKey;
EKPBaseCipherDetails() : baseCipherId(0), baseCipherKey(StringRef()) {}
explicit EKPBaseCipherDetails(uint64_t id, StringRef key, Arena& arena)
: baseCipherId(id), baseCipherKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, baseCipherId, baseCipherKey);
}
};
struct EKPGetLatestBaseCipherKeysReply {
constexpr static FileIdentifier file_identifier = 4831583;
Arena arena;
std::unordered_map<uint64_t, EKPBaseCipherDetails> baseCipherDetailMap;
std::vector<EKPBaseCipherDetails> baseCipherDetails;
int numHits;
Optional<Error> error;
EKPGetLatestBaseCipherKeysReply() : numHits(0) {}
explicit EKPGetLatestBaseCipherKeysReply(const std::unordered_map<uint64_t, EKPBaseCipherDetails>& cipherMap)
: baseCipherDetailMap(cipherMap), numHits(0) {}
explicit EKPGetLatestBaseCipherKeysReply(const std::vector<EKPBaseCipherDetails>& cipherDetails)
: baseCipherDetails(cipherDetails), numHits(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, baseCipherDetailMap, numHits, error);
serializer(ar, arena, baseCipherDetails, numHits, error);
}
};

View File

@ -187,6 +187,8 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType,
return keyValueStoreRedwoodV1(filename, logID);
case KeyValueStoreType::SSD_ROCKSDB_V1:
return keyValueStoreRocksDB(filename, logID, storeType);
case KeyValueStoreType::SSD_SHARDED_ROCKSDB:
return keyValueStoreRocksDB(filename, logID, storeType); // TODO: to replace the KVS in the future
case KeyValueStoreType::MEMORY_RADIXTREE:
return keyValueStoreMemory(filename,
logID,

View File

@ -1809,7 +1809,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void close() override { doClose(this, false); }
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
KeyValueStoreType getType() const override {
if (SERVER_KNOBS->ENABLE_SHARDED_ROCKSDB)
// KVSRocks pretends as KVSShardedRocksDB
// TODO: to remove when the ShardedRocksDB KVS implementation is added in the future
return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB);
else
return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1);
}
Future<Void> init() override {
if (openFuture.isValid()) {

View File

@ -20,6 +20,7 @@
#include <memory>
#include <unordered_map>
#include <utility>
#include "fdbrpc/sim_validation.h"
#include "fdbserver/Knobs.h"
@ -29,6 +30,7 @@
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/StreamCipher.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
@ -73,11 +75,14 @@ ACTOR Future<Void> simEncryptKmsProxyCore(SimKmsProxyInterface interf) {
state SimGetEncryptKeysByKeyIdsReply keysByIdsRep;
// Lookup corresponding EncryptKeyCtx for input keyId
for (SimEncryptKeyId keyId : req.encryptKeyIds) {
const auto& itr = kmsProxyCtx.simEncryptKeyStore.find(keyId);
for (const auto& item : req.encryptKeyIds) {
const auto& itr = kmsProxyCtx.simEncryptKeyStore.find(item.first);
if (itr != kmsProxyCtx.simEncryptKeyStore.end()) {
keysByIdsRep.encryptKeyMap.emplace(keyId,
StringRef(keysByIdsRep.arena, itr->second.get()->key));
keysByIdsRep.encryptKeyDetails.emplace_back(
item.second,
itr->first,
StringRef(keysByIdsRep.arena, itr->second.get()->key),
keysByIdsRep.arena);
} else {
success = false;
break;
@ -99,9 +104,8 @@ ACTOR Future<Void> simEncryptKmsProxyCore(SimKmsProxyInterface interf) {
SimEncryptKeyId keyId = domainId % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
const auto& itr = kmsProxyCtx.simEncryptKeyStore.find(keyId);
if (itr != kmsProxyCtx.simEncryptKeyStore.end()) {
keysByDomainIdRep.encryptKeyMap.emplace(
domainId,
SimEncryptKeyDetails(keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena));
keysByDomainIdRep.encryptKeyDetails.emplace_back(
domainId, keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena);
} else {
success = false;
break;
@ -137,32 +141,41 @@ ACTOR Future<Void> testRunWorkload(SimKmsProxyInterface inf, uint32_t nEncryptio
domainIdsReq.encryptDomainIds.push_back(i);
}
SimGetEncryptKeyByDomainIdReply domainIdsReply = wait(inf.encryptKeyLookupByDomainId.getReply(domainIdsReq));
for (auto& element : domainIdsReply.encryptKeyMap) {
domainIdKeyMap.emplace(element.first,
std::make_unique<SimEncryptKeyCtx>(element.second.encryptKeyId,
element.second.encryptKey.toString().c_str()));
for (auto& element : domainIdsReply.encryptKeyDetails) {
domainIdKeyMap.emplace(
element.encryptDomainId,
std::make_unique<SimEncryptKeyCtx>(element.encryptKeyId, element.encryptKey.toString().c_str()));
}
// randomly pick any domainId and validate if lookupByKeyId result matches
SimGetEncryptKeysByKeyIdsRequest keyIdsReq;
state std::unordered_map<SimEncryptKeyId, StringRef> validationMap;
std::unordered_map<SimEncryptKeyId, SimEncryptDomainId> idsToLookup;
for (i = 0; i < maxIterations; i++) {
state int idx = deterministicRandom()->randomInt(0, maxDomainIds);
state SimEncryptKeyCtx* ctx = domainIdKeyMap[idx].get();
keyIdsReq.encryptKeyIds.push_back(ctx->id);
validationMap[ctx->id] = StringRef(ctx->key);
idsToLookup.emplace(ctx->id, idx);
}
SimGetEncryptKeysByKeyIdsReply keyIdsReply = wait(inf.encryptKeyLookupByKeyIds.getReply(keyIdsReq));
ASSERT(keyIdsReply.encryptKeyMap.size() == validationMap.size());
for (const auto& element : keyIdsReply.encryptKeyMap) {
ASSERT(validationMap[element.first].compare(element.second) == 0);
state SimGetEncryptKeysByKeyIdsRequest keyIdsReq;
for (const auto& item : idsToLookup) {
keyIdsReq.encryptKeyIds.emplace_back(std::make_pair(item.first, item.second));
}
state SimGetEncryptKeysByKeyIdsReply keyIdsReply = wait(inf.encryptKeyLookupByKeyIds.getReply(keyIdsReq));
/* TraceEvent("Lookup")
.detail("KeyIdReqSize", keyIdsReq.encryptKeyIds.size())
.detail("KeyIdsRepSz", keyIdsReply.encryptKeyDetails.size())
.detail("ValSz", validationMap.size()); */
ASSERT(keyIdsReply.encryptKeyDetails.size() == validationMap.size());
for (const auto& element : keyIdsReply.encryptKeyDetails) {
ASSERT(validationMap[element.encryptDomainId].compare(element.encryptKey) == 0);
}
}
{
// Verify unknown key access returns the error
state SimGetEncryptKeysByKeyIdsRequest req;
req.encryptKeyIds.push_back(maxEncryptionKeys + 1);
req.encryptKeyIds.emplace_back(std::make_pair(maxEncryptionKeys + 1, 1));
try {
SimGetEncryptKeysByKeyIdsReply reply = wait(inf.encryptKeyLookupByKeyIds.getReply(req));
} catch (Error& e) {

View File

@ -71,26 +71,43 @@ struct SimKmsProxyInterface {
}
};
struct SimEncryptKeyDetails {
constexpr static FileIdentifier file_identifier = 1227025;
SimEncryptDomainId encryptDomainId;
SimEncryptKeyId encryptKeyId;
StringRef encryptKey;
SimEncryptKeyDetails() {}
explicit SimEncryptKeyDetails(SimEncryptDomainId domainId, SimEncryptKeyId keyId, StringRef key, Arena& arena)
: encryptDomainId(domainId), encryptKeyId(keyId), encryptKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptDomainId, encryptKeyId, encryptKey);
}
};
struct SimGetEncryptKeysByKeyIdsReply {
constexpr static FileIdentifier file_identifier = 2313778;
Arena arena;
std::unordered_map<SimEncryptKeyId, StringRef> encryptKeyMap;
std::vector<SimEncryptKeyDetails> encryptKeyDetails;
SimGetEncryptKeysByKeyIdsReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, encryptKeyMap);
serializer(ar, arena, encryptKeyDetails);
}
};
struct SimGetEncryptKeysByKeyIdsRequest {
constexpr static FileIdentifier file_identifier = 6913396;
std::vector<SimEncryptKeyId> encryptKeyIds;
std::vector<std::pair<SimEncryptKeyId, SimEncryptDomainId>> encryptKeyIds;
ReplyPromise<SimGetEncryptKeysByKeyIdsReply> reply;
SimGetEncryptKeysByKeyIdsRequest() {}
explicit SimGetEncryptKeysByKeyIdsRequest(const std::vector<SimEncryptKeyId>& keyIds) : encryptKeyIds(keyIds) {}
explicit SimGetEncryptKeysByKeyIdsRequest(const std::vector<std::pair<SimEncryptKeyId, SimEncryptDomainId>>& keyIds)
: encryptKeyIds(keyIds) {}
template <class Ar>
void serialize(Ar& ar) {
@ -98,31 +115,16 @@ struct SimGetEncryptKeysByKeyIdsRequest {
}
};
struct SimEncryptKeyDetails {
constexpr static FileIdentifier file_identifier = 1227025;
SimEncryptKeyId encryptKeyId;
StringRef encryptKey;
SimEncryptKeyDetails() {}
explicit SimEncryptKeyDetails(SimEncryptKeyId keyId, StringRef key, Arena& arena)
: encryptKeyId(keyId), encryptKey(StringRef(arena, key)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, encryptKeyId, encryptKey);
}
};
struct SimGetEncryptKeyByDomainIdReply {
constexpr static FileIdentifier file_identifier = 3009025;
Arena arena;
std::unordered_map<SimEncryptDomainId, SimEncryptKeyDetails> encryptKeyMap;
std::vector<SimEncryptKeyDetails> encryptKeyDetails;
SimGetEncryptKeyByDomainIdReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, encryptKeyMap);
serializer(ar, arena, encryptKeyDetails);
}
};

View File

@ -310,6 +310,7 @@ public:
// 2 = "memory-radixtree-beta"
// 3 = "ssd-redwood-1-experimental"
// 4 = "ssd-rocksdb-v1"
// 5 = "ssd-sharded-rocksdb"
// Requires a comma-separated list of numbers WITHOUT whitespaces
std::vector<int> storageEngineExcludeTypes;
// Set the maximum TLog version that can be selected for a test
@ -1421,6 +1422,16 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
noUnseed = true;
break;
}
case 5: {
TEST(true); // Simulated cluster using Sharded RocksDB storage engine
set_config("ssd-sharded-rocksdb");
// Tests using the RocksDB engine are necessarily non-deterministic because of RocksDB
// background threads.
TraceEvent(SevWarnAlways, "RocksDBNonDeterminism")
.detail("Explanation", "The Sharded RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
break;
}
default:
ASSERT(false); // Programmer forgot to adjust cases.
}

View File

@ -331,6 +331,9 @@ KeyValueStoreSuffix redwoodSuffix = { KeyValueStoreType::SSD_REDWOOD_V1, ".redwo
KeyValueStoreSuffix rocksdbSuffix = { KeyValueStoreType::SSD_ROCKSDB_V1,
".rocksdb",
FilesystemCheck::DIRECTORIES_ONLY };
KeyValueStoreSuffix shardedRocksdbSuffix = { KeyValueStoreType::SSD_SHARDED_ROCKSDB,
".shardedrocksdb",
FilesystemCheck::DIRECTORIES_ONLY };
std::string validationFilename = "_validate";
@ -345,6 +348,8 @@ std::string filenameFromSample(KeyValueStoreType storeType, std::string folder,
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB)
return joinPath(folder, sample_filename);
UNREACHABLE();
}
@ -360,6 +365,8 @@ std::string filenameFromId(KeyValueStoreType storeType, std::string folder, std:
return joinPath(folder, prefix + id.toString() + ".redwood-v1");
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, prefix + id.toString() + ".rocksdb");
else if (storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB)
return joinPath(folder, prefix + id.toString() + ".shardedrocksdb");
TraceEvent(SevError, "UnknownStoreType").detail("StoreType", storeType.toString());
UNREACHABLE();
@ -536,6 +543,9 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
result.insert(result.end(), result4.begin(), result4.end());
auto result5 = getDiskStores(folder, rocksdbSuffix.suffix, rocksdbSuffix.type, rocksdbSuffix.check);
result.insert(result.end(), result5.begin(), result5.end());
auto result6 =
getDiskStores(folder, shardedRocksdbSuffix.suffix, shardedRocksdbSuffix.type, shardedRocksdbSuffix.check);
result.insert(result.end(), result6.begin(), result6.end());
return result;
}
@ -1665,6 +1675,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled */
(g_network->isSimulated()
? (/* Disable for RocksDB */ s.storeType != KeyValueStoreType::SSD_ROCKSDB_V1 &&
s.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true));
Future<Void> kvClosed = kv->onClosed();
@ -2242,6 +2253,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled */
(g_network->isSimulated()
? (/* Disable for RocksDB */ req.storeType != KeyValueStoreType::SSD_ROCKSDB_V1 &&
req.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true));
@ -2440,6 +2452,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
} else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::MEMORY) {
included = fileExists(d.filename + "1.fdq");
} else {

View File

@ -33,6 +33,7 @@
#include <atomic>
#include <boost/range/const_iterator.hpp>
#include <utility>
#include "flow/actorcompiler.h" // This must be the last #include.
@ -76,10 +77,17 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
if (rep.present()) {
ASSERT(!rep.get().error.present());
ASSERT_EQ(rep.get().baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.get().baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.get().baseCipherDetailMap.find(id) != rep.get().baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.get().baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
// Ensure no hits reported by the cache.
@ -127,10 +135,17 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
ErrorOr<EKPGetLatestBaseCipherKeysReply> rep = wait(self->ekpInf.getLatestBaseCipherKeys.tryGetReply(req));
if (rep.present()) {
ASSERT(!rep.get().error.present());
ASSERT_EQ(rep.get().baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.get().baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.get().baseCipherDetailMap.find(id) != rep.get().baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.get().baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
// Ensure desired cache-hit counts
@ -165,16 +180,23 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
EKPGetLatestBaseCipherKeysReply rep = wait(self->ekpInf.getLatestBaseCipherKeys.getReply(req));
ASSERT(!rep.error.present());
ASSERT_EQ(rep.baseCipherDetailMap.size(), self->domainIds.size());
ASSERT_EQ(rep.baseCipherDetails.size(), self->domainIds.size());
for (const uint64_t id : self->domainIds) {
ASSERT(rep.baseCipherDetailMap.find(id) != rep.baseCipherDetailMap.end());
bool found = false;
for (const auto& item : rep.baseCipherDetails) {
if (item.baseCipherId == id) {
found = true;
break;
}
}
ASSERT(found);
}
self->cipherIdMap.clear();
self->cipherIds.clear();
for (auto& item : rep.baseCipherDetailMap) {
self->cipherIdMap.emplace(item.second.baseCipherId, StringRef(self->arena, item.second.baseCipherKey));
self->cipherIds.emplace_back(item.second.baseCipherId);
for (auto& item : rep.baseCipherDetails) {
self->cipherIdMap.emplace(item.baseCipherId, StringRef(self->arena, item.baseCipherKey));
self->cipherIds.emplace_back(item.baseCipherId);
}
state int numIterations = deterministicRandom()->randomInt(512, 786);
@ -184,28 +206,28 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
EKPGetBaseCipherKeysByIdsRequest req;
for (int i = idx; i < nIds && i < self->cipherIds.size(); i++) {
req.baseCipherIds.emplace_back(self->cipherIds[i]);
req.baseCipherIds.emplace_back(std::make_pair(self->cipherIds[i], 1));
}
expectedHits = req.baseCipherIds.size();
EKPGetBaseCipherKeysByIdsReply rep = wait(self->ekpInf.getBaseCipherKeysByIds.getReply(req));
ASSERT(!rep.error.present());
ASSERT_EQ(rep.baseCipherMap.size(), expectedHits);
ASSERT_EQ(rep.baseCipherDetails.size(), expectedHits);
ASSERT_EQ(rep.numHits, expectedHits);
// Valdiate the 'cipherKey' content against the one read while querying by domainIds
for (auto& item : rep.baseCipherMap) {
const auto itr = self->cipherIdMap.find(item.first);
for (auto& item : rep.baseCipherDetails) {
const auto itr = self->cipherIdMap.find(item.baseCipherId);
ASSERT(itr != self->cipherIdMap.end());
Standalone<StringRef> toCompare = self->cipherIdMap[item.first];
if (toCompare.compare(item.second) != 0) {
Standalone<StringRef> toCompare = self->cipherIdMap[item.baseCipherId];
if (toCompare.compare(item.baseCipherKey) != 0) {
TraceEvent("Mismatch")
.detail("Id", item.first)
.detail("Id", item.baseCipherId)
.detail("CipherMapDataHash", XXH3_64bits(toCompare.begin(), toCompare.size()))
.detail("CipherMapSize", toCompare.size())
.detail("CipherMapValue", toCompare.toString())
.detail("ReadDataHash", XXH3_64bits(item.second.begin(), item.second.size()))
.detail("ReadValue", item.second.toString())
.detail("ReadDataSize", item.second.size());
.detail("ReadDataHash", XXH3_64bits(item.baseCipherKey.begin(), item.baseCipherKey.size()))
.detail("ReadValue", item.baseCipherKey.toString())
.detail("ReadDataSize", item.baseCipherKey.size());
ASSERT(false);
}
}
@ -219,12 +241,15 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
TraceEvent("SimLookupInvalidKeyId_Start").log();
// Prepare a lookup with valid and invalid keyIds - SimEncryptKmsProxy should throw encrypt_key_not_found()
std::vector<uint64_t> baseCipherIds(self->cipherIds);
baseCipherIds.emplace_back(SERVER_KNOBS->SIM_KMS_MAX_KEYS + 10);
std::vector<std::pair<uint64_t, int64_t>> baseCipherIds;
for (auto id : self->cipherIds) {
baseCipherIds.emplace_back(std::make_pair(id, 1));
}
baseCipherIds.emplace_back(std::make_pair(SERVER_KNOBS->SIM_KMS_MAX_KEYS + 10, 1));
EKPGetBaseCipherKeysByIdsRequest req(deterministicRandom()->randomUniqueID(), baseCipherIds);
EKPGetBaseCipherKeysByIdsReply rep = wait(self->ekpInf.getBaseCipherKeysByIds.getReply(req));
ASSERT_EQ(rep.baseCipherMap.size(), 0);
ASSERT_EQ(rep.baseCipherDetails.size(), 0);
ASSERT(rep.error.present());
ASSERT_EQ(rep.error.get().code(), error_code_encrypt_key_not_found);

View File

@ -121,6 +121,7 @@ struct EncryptionOpsWorkload : TestWorkload {
EncryptCipherDomainId maxDomainId;
EncryptCipherBaseKeyId minBaseCipherId;
EncryptCipherBaseKeyId headerBaseCipherId;
EncryptCipherRandomSalt headerRandomSalt;
EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
mode = getOption(options, LiteralStringRef("fixedSize"), 1);
@ -181,11 +182,20 @@ struct EncryptionOpsWorkload : TestWorkload {
ASSERT_EQ(cipherKeys.size(), 1);
}
// insert the Encrypt Header cipherKey
// insert the Encrypt Header cipherKey; record cipherDetails as getLatestCipher() may not work with multiple
// test clients
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen);
Reference<BlobCipherKey> latestCipher = cipherKeyCache->getLatestCipherKey(ENCRYPT_HEADER_DOMAIN_ID);
ASSERT_EQ(latestCipher->getBaseCipherId(), headerBaseCipherId);
ASSERT_EQ(memcmp(latestCipher->rawBaseCipher(), buff, cipherLen), 0);
headerRandomSalt = latestCipher->getSalt();
TraceEvent("SetupCipherEssentials_Done").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
TraceEvent("SetupCipherEssentials_Done")
.detail("MinDomainId", minDomainId)
.detail("MaxDomainId", maxDomainId)
.detail("HeaderBaseCipherId", headerBaseCipherId)
.detail("HeaderRandomSalt", headerRandomSalt);
}
void resetCipherEssentials() {
@ -209,6 +219,29 @@ struct EncryptionOpsWorkload : TestWorkload {
TraceEvent("UpdateBaseCipher").detail("DomainId", encryptDomainId).detail("BaseCipherId", *nextBaseCipherId);
}
Reference<BlobCipherKey> getEncryptionKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
const bool simCacheMiss = deterministicRandom()->randomInt(1, 100) < 15;
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
if (simCacheMiss) {
TraceEvent("SimKeyCacheMiss").detail("EncyrptDomainId", domainId).detail("BaseCipherId", baseCipherId);
// simulate KeyCache miss that may happen during decryption; insert a CipherKey with known 'salt'
cipherKeyCache->insertCipherKey(domainId,
baseCipherId,
cipherKey->rawBaseCipher(),
cipherKey->getBaseCipherLen(),
cipherKey->getSalt());
// Ensure the update was a NOP
Reference<BlobCipherKey> cKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
ASSERT(cKey->isEqual(cipherKey));
}
return cipherKey;
}
Reference<EncryptBuf> doEncryption(Reference<BlobCipherKey> textCipherKey,
Reference<BlobCipherKey> headerCipherKey,
uint8_t* payload,
@ -240,11 +273,12 @@ struct EncryptionOpsWorkload : TestWorkload {
ASSERT_EQ(header.flags.headerVersion, EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION);
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> headerCipherKey = cipherKeyCache->getCipherKey(
header.cipherHeaderDetails.encryptDomainId, header.cipherHeaderDetails.baseCipherId);
Reference<BlobCipherKey> cipherKey = getEncryptionKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> headerCipherKey = getEncryptionKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(cipherKey.isValid());
ASSERT(cipherKey->isEqual(orgCipherKey));
@ -297,7 +331,7 @@ struct EncryptionOpsWorkload : TestWorkload {
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(encryptDomainId);
// Each client working with their own version of encryptHeaderCipherKey, avoid using getLatest()
Reference<BlobCipherKey> headerCipherKey =
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId);
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, headerRandomSalt);
auto end = std::chrono::high_resolution_clock::now();
metrics->updateKeyDerivationTime(std::chrono::duration<double, std::nano>(end - start).count());

View File

@ -144,7 +144,8 @@ struct GetMappedRangeWorkload : ApiWorkload {
return Void();
}
static void validateRecord(int expectedId, const MappedKeyValueRef* it, GetMappedRangeWorkload* self) {
// Return true if need to retry.
static bool validateRecord(int expectedId, const MappedKeyValueRef* it, GetMappedRangeWorkload* self) {
// std::cout << "validateRecord expectedId " << expectedId << " it->key " << printable(it->key) << "
// indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId)) << std::endl;
ASSERT(it->key == indexEntryKey(expectedId));
@ -155,7 +156,12 @@ struct GetMappedRangeWorkload : ApiWorkload {
auto& getRange = std::get<GetRangeReqAndResultRef>(it->reqAndResult);
auto& rangeResult = getRange.result;
// std::cout << "rangeResult.size()=" << rangeResult.size() << std::endl;
ASSERT(rangeResult.more == false);
// In the future, we may be able to do the continuation more efficiently by combining partial results
// together and then validate.
if (rangeResult.more) {
// Retry if the underlying request is not fully completed.
return true;
}
ASSERT(rangeResult.size() == SPLIT_SIZE);
for (int split = 0; split < SPLIT_SIZE; split++) {
auto& kv = rangeResult[split];
@ -174,6 +180,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
ASSERT(getValue.result.present());
ASSERT(getValue.result.get() == recordValue(expectedId));
}
return false;
}
ACTOR Future<MappedRangeResult> scanMappedRangeWithLimits(Database cx,
@ -200,10 +207,17 @@ struct GetMappedRangeWorkload : ApiWorkload {
std::cout << "result.more=" << result.more << std::endl;
ASSERT(result.size() <= limit);
int expectedId = expectedBeginId;
bool needRetry = false;
for (const MappedKeyValueRef* it = result.begin(); it != result.end(); it++) {
validateRecord(expectedId, it, self);
if (validateRecord(expectedId, it, self)) {
needRetry = true;
break;
}
expectedId++;
}
if (needRetry) {
continue;
}
std::cout << "finished scanMappedRangeWithLimits" << std::endl;
return result;
} catch (Error& e) {

View File

@ -388,6 +388,9 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
test.store = keyValueStoreRedwoodV1(fn, id);
else if (workload->storeType == "ssd-rocksdb-v1")
test.store = keyValueStoreRocksDB(fn, id, KeyValueStoreType::SSD_ROCKSDB_V1);
else if (workload->storeType == "ssd-sharded-rocksdb")
test.store = keyValueStoreRocksDB(
fn, id, KeyValueStoreType::SSD_SHARDED_ROCKSDB); // TODO: to replace the KVS in the future
else if (workload->storeType == "memory")
test.store = keyValueStoreMemory(fn, id, 500e6);
else if (workload->storeType == "memory-radixtree-beta")

View File

@ -1172,51 +1172,6 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
} catch (Error& e) {
wait(tx->onError(e));
}
// profile client get
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// client_txn_sample_rate
state Optional<Value> txnSampleRate =
wait(tx->get(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSampleRate.present());
Optional<Value> txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate));
if (txnSampleRateKey.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(txnSampleRateKey.get(), Unversioned());
if (!std::isinf(sampleRateDbl)) {
ASSERT(txnSampleRate.get().toString() == boost::lexical_cast<std::string>(sampleRateDbl));
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
// client_txn_size_limit
state Optional<Value> txnSizeLimit =
wait(tx->get(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSizeLimit.present());
Optional<Value> txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit));
if (txnSizeLimitKey.present()) {
const int64_t sizeLimit =
BinaryReader::fromStringRef<int64_t>(txnSizeLimitKey.get(), Unversioned());
if (sizeLimit != -1) {
ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast<std::string>(sizeLimit));
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "ProfileClientGet").error(e);
wait(tx->onError(e));
}
}
// data_distribution & maintenance get
loop {
try {

View File

@ -19,6 +19,7 @@
*/
#include "flow/BlobCipher.h"
#include "flow/EncryptUtils.h"
#include "flow/Knobs.h"
#include "flow/Error.h"
@ -32,6 +33,7 @@
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#if ENCRYPTION_ENABLED
@ -54,12 +56,14 @@ BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
salt = nondeterministicRandom()->randomUInt64();
}
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
/*TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);*/
}
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt) {
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
}
void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
@ -82,6 +86,13 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
applyHmacSha256Derivation();
// update the key creation time
creationTime = now();
TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);
}
void BlobCipherKey::applyHmacSha256Derivation() {
@ -105,32 +116,98 @@ void BlobCipherKey::reset() {
// BlobKeyIdCache class methods
BlobCipherKeyIdCache::BlobCipherKeyIdCache()
: domainId(ENCRYPT_INVALID_DOMAIN_ID), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID) {}
: domainId(ENCRYPT_INVALID_DOMAIN_ID), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID),
latestRandomSalt(ENCRYPT_INVALID_RANDOM_SALT) {}
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
: domainId(dId), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID) {
: domainId(dId), latestBaseCipherKeyId(ENCRYPT_INVALID_CIPHER_KEY_ID), latestRandomSalt(ENCRYPT_INVALID_RANDOM_SALT) {
TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
return getCipherByBaseCipherId(latestBaseCipherKeyId);
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
if (baseCipherKeyId == ENCRYPT_INVALID_CIPHER_KEY_ID || salt == ENCRYPT_INVALID_RANDOM_SALT) {
throw encrypt_invalid_id();
}
return std::make_pair(baseCipherKeyId, salt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherKeyId);
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
if (keyIdCache.empty()) {
// Cache is empty, nothing more to do.
throw encrypt_key_not_found();
}
// Ensure latestCipher details sanity
ASSERT_GT(latestBaseCipherKeyId, ENCRYPT_INVALID_CIPHER_KEY_ID);
ASSERT_GT(latestRandomSalt, ENCRYPT_INVALID_RANDOM_SALT);
return getCipherByBaseCipherId(latestBaseCipherKeyId, latestRandomSalt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(getCacheKey(baseCipherKeyId, salt));
if (itr == keyIdCache.end()) {
TraceEvent("CipherByBaseCipherId_KeyMissing")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherKeyId)
.detail("Salt", salt);
throw encrypt_key_not_found();
}
return itr->second;
}
void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId,
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
// BaseCipherKeys are immutable, given the routine invocation updates 'latestCipher',
// ensure no key-tampering is done
try {
Reference<BlobCipherKey> cipherKey = getLatestCipherKey();
if (cipherKey.isValid() && cipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
// Key is already present; nothing more to do.
return;
} else {
TraceEvent("InsertBaseCipherKey_UpdateCipher")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
} catch (Error& e) {
if (e.code() != error_code_encrypt_key_not_found) {
throw e;
}
}
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(cipherKey->getBaseCipherId(), cipherKey->getSalt());
keyIdCache.emplace(cacheKey, cipherKey);
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
latestRandomSalt = cipherKey->getSalt();
}
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
ASSERT_GT(salt, ENCRYPT_INVALID_RANDOM_SALT);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(baseCipherId, salt);
// BaseCipherKeys are immutable, ensure that cached value doesn't get updated.
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherId);
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(cacheKey);
if (itr != keyIdCache.end()) {
if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
@ -146,9 +223,9 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipher
}
}
keyIdCache.emplace(baseCipherId, makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen));
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt);
keyIdCache.emplace(cacheKey, cipherKey);
}
void BlobCipherKeyIdCache::cleanup() {
@ -197,6 +274,42 @@ void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
}
}
void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID ||
salt == ENCRYPT_INVALID_RANDOM_SALT) {
throw encrypt_invalid_id();
}
try {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
// Add mapping to track new encryption domain
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
domainCacheMap.emplace(domainId, keyIdCache);
} else {
// Track new baseCipher keys
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
}
TraceEvent("InsertCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherKeyId", baseCipherId)
.detail("Salt", salt);
} catch (Error& e) {
TraceEvent("InsertCipherKey_Failed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId)
.detail("Salt", salt);
throw;
}
}
Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
@ -217,17 +330,19 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip
}
Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId) {
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
TraceEvent("GetCipherKey_MissingDomainId").detail("DomainId", domainId);
throw encrypt_key_not_found();
}
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
return keyIdCache->getCipherByBaseCipherId(baseCipherId);
return keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
}
void BlobCipherKeyCache::resetEncyrptDomainId(const EncryptCipherDomainId domainId) {
void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
throw encrypt_key_not_found();
@ -291,8 +406,8 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader));
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs to be
// generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs
// to be generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
const int allocSize = authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE
? plaintextLen + AES_BLOCK_SIZE + sizeof(BlobCipherEncryptHeader)
@ -340,6 +455,7 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
// Populate header encryption-key details
header->cipherHeaderDetails.encryptDomainId = headerCipherKey->getDomainId();
header->cipherHeaderDetails.baseCipherId = headerCipherKey->getBaseCipherId();
header->cipherHeaderDetails.salt = headerCipherKey->getSalt();
// Populate header authToken details
if (header->flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE) {
@ -624,8 +740,8 @@ void forceLinkBlobCipherTests() {}
// 3. Inserting of 'identical' cipherKey (already cached) more than once works as desired.
// 4. Inserting of 'non-identical' cipherKey (already cached) more than once works as desired.
// 5. Validation encryption ops (correctness):
// 5.1. Encyrpt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncyrptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 5.1. Encrypt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncryptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 6. Cache cleanup
// 6.1 cleanup cipherKeys by given encryptDomainId
// 6.2. Cleanup all cached cipherKeys
@ -639,6 +755,7 @@ TEST_CASE("flow/BlobCipher") {
int len;
EncryptCipherBaseKeyId keyId;
std::unique_ptr<uint8_t[]> key;
EncryptCipherRandomSalt generatedSalt;
BaseCipher(const EncryptCipherDomainId& dId, const EncryptCipherBaseKeyId& kId)
: domainId(dId), len(deterministicRandom()->randomInt(AES_256_KEY_LENGTH / 2, AES_256_KEY_LENGTH + 1)),
@ -671,6 +788,8 @@ TEST_CASE("flow/BlobCipher") {
cipherKeyCache->insertCipherKey(
baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len);
Reference<BlobCipherKey> fetchedKey = cipherKeyCache->getLatestCipherKey(baseCipher->domainId);
baseCipher->generatedSalt = fetchedKey->getSalt();
}
}
// insert EncryptHeader BlobCipher key
@ -684,7 +803,8 @@ TEST_CASE("flow/BlobCipher") {
for (auto& domainItr : domainKeyMap) {
for (auto& baseKeyItr : domainItr.second) {
Reference<BaseCipher> baseCipher = baseKeyItr.second;
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId);
Reference<BlobCipherKey> cipherKey =
cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId, baseCipher->generatedSalt);
ASSERT(cipherKey.isValid());
// validate common cipher properties - domainId, baseCipherId, baseCipherLen, rawBaseCipher
ASSERT_EQ(cipherKey->getBaseCipherId(), baseCipher->keyId);
@ -759,7 +879,8 @@ TEST_CASE("flow/BlobCipher") {
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(
tCipherKeyKey, Reference<BlobCipherKey>(), &header.cipherTextDetails.iv[0]);
@ -846,9 +967,11 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId);
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, &header.cipherTextDetails.iv[0]);
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
@ -949,9 +1072,11 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId);
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(tCipherKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, &header.cipherTextDetails.iv[0]);
@ -1047,7 +1172,7 @@ TEST_CASE("flow/BlobCipher") {
// Validate dropping encyrptDomainId cached keys
const EncryptCipherDomainId candidate = deterministicRandom()->randomInt(minDomainId, maxDomainId);
cipherKeyCache->resetEncyrptDomainId(candidate);
cipherKeyCache->resetEncryptDomainId(candidate);
std::vector<Reference<BlobCipherKey>> cachedKeys = cipherKeyCache->getAllCiphers(candidate);
ASSERT(cachedKeys.empty());

View File

@ -82,11 +82,11 @@ private:
// This header is persisted along with encrypted buffer, it contains information necessary
// to assist decrypting the buffers to serve read requests.
//
// The total space overhead is 96 bytes.
// The total space overhead is 104 bytes.
#pragma pack(push, 1) // exact fit - no padding
typedef struct BlobCipherEncryptHeader {
static constexpr int headerSize = 96;
static constexpr int headerSize = 104;
union {
struct {
uint8_t size; // reading first byte is sufficient to determine header
@ -101,7 +101,7 @@ typedef struct BlobCipherEncryptHeader {
// Cipher text encryption information
struct {
// Encyrption domain boundary identifier.
// Encryption domain boundary identifier.
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier
EncryptCipherBaseKeyId baseCipherId{};
@ -116,6 +116,8 @@ typedef struct BlobCipherEncryptHeader {
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier.
EncryptCipherBaseKeyId baseCipherId{};
// Random salt
EncryptCipherRandomSalt salt{};
} cipherHeaderDetails;
// Encryption header is stored as plaintext on a persistent storage to assist reconstruction of cipher-key(s) for
@ -164,6 +166,11 @@ public:
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen);
BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt);
uint8_t* data() const { return cipher.get(); }
uint64_t getCreationTime() const { return creationTime; }
@ -206,7 +213,7 @@ private:
// This interface allows FDB processes participating in encryption to store and
// index recently used encyption cipher keys. FDB encryption has two dimensions:
// 1. Mapping on cipher encryption keys per "encryption domains"
// 2. Per encryption domain, the cipher keys are index using "baseCipherKeyId".
// 2. Per encryption domain, the cipher keys are index using {baseCipherKeyId, salt} tuple.
//
// The design supports NIST recommendation of limiting lifetime of an encryption
// key. For details refer to:
@ -214,10 +221,10 @@ private:
//
// Below gives a pictoral representation of in-memory datastructure implemented
// to index encryption keys:
// { encryptionDomain -> { baseCipherId -> cipherKey } }
// { encryptionDomain -> { {baseCipherId, salt} -> cipherKey } }
//
// Supported cache lookups schemes:
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId } tuple.
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId, salt } triplet.
// 2. Lookup latest cipher key for a given encryptionDomainId.
//
// Client is responsible to handle cache-miss usecase, the corrective operation
@ -226,15 +233,29 @@ private:
// required encryption key, however, CPs/SSs cache-miss would result in RPC to
// EncryptKeyServer to refresh the desired encryption key.
using BlobCipherKeyIdCacheMap = std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>;
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2>& pair) const {
auto hash1 = std::hash<T1>{}(pair.first);
auto hash2 = std::hash<T2>{}(pair.second);
// Equal hashes XOR would be ZERO.
return hash1 == hash2 ? hash1 : hash1 ^ hash2;
}
};
using BlobCipherKeyIdCacheKey = std::pair<EncryptCipherBaseKeyId, EncryptCipherRandomSalt>;
using BlobCipherKeyIdCacheMap = std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>;
using BlobCipherKeyIdCacheMapCItr =
std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>::const_iterator;
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>::const_iterator;
struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> {
public:
BlobCipherKeyIdCache();
explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId);
BlobCipherKeyIdCacheKey getCacheKey(const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
// API returns the last inserted cipherKey.
// If none exists, 'encrypt_key_not_found' is thrown.
@ -243,14 +264,33 @@ public:
// API returns cipherKey corresponding to input 'baseCipherKeyId'.
// If none exists, 'encrypt_key_not_found' is thrown.
Reference<BlobCipherKey> getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId);
Reference<BlobCipherKey> getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache.
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
// API cleanup the cache by dropping all cached cipherKeys
void cleanup();
@ -262,6 +302,7 @@ private:
EncryptCipherDomainId domainId;
BlobCipherKeyIdCacheMap keyIdCache;
EncryptCipherBaseKeyId latestBaseCipherKeyId;
EncryptCipherRandomSalt latestRandomSalt;
};
using BlobCipherDomainCacheMap = std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKeyIdCache>>;
@ -277,12 +318,32 @@ public:
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen);
// API returns the last insert cipherKey for a given encyryption domain Id.
// Enable clients to insert base encryption cipher details to the BlobCipherKeyCache.
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
// API returns the last insert cipherKey for a given encryption domain Id.
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getLatestCipherKey(const EncryptCipherDomainId& domainId);
@ -291,14 +352,16 @@ public:
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId);
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
// API returns point in time list of all 'cached' cipherKeys for a given encryption domainId.
std::vector<Reference<BlobCipherKey>> getAllCiphers(const EncryptCipherDomainId& domainId);
// API enables dropping all 'cached' cipherKeys for a given encryption domain Id.
// Useful to cleanup cache if an encryption domain gets removed/destroyed etc.
void resetEncyrptDomainId(const EncryptCipherDomainId domainId);
void resetEncryptDomainId(const EncryptCipherDomainId domainId);
static Reference<BlobCipherKeyCache> getInstance() {
if (g_network->isSimulated()) {
@ -364,7 +427,7 @@ public:
const BlobCipherEncryptHeader& header,
Arena&);
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encyrpted
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encrypted
// payload. The call is NOP unless header.flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI.
void verifyHeaderAuthToken(const BlobCipherEncryptHeader& header, Arena& arena);

View File

@ -27,6 +27,7 @@
#define ENCRYPT_INVALID_DOMAIN_ID 0
#define ENCRYPT_INVALID_CIPHER_KEY_ID 0
#define ENCRYPT_INVALID_RANDOM_SALT 0
#define AUTH_TOKEN_SIZE 16

View File

@ -306,7 +306,7 @@ ERROR( encrypt_key_not_found, 2702, "Expected encryption key is missing")
ERROR( encrypt_key_ttl_expired, 2703, "Expected encryption key TTL has expired")
ERROR( encrypt_header_authtoken_mismatch, 2704, "Encryption header authentication token mismatch")
ERROR( encrypt_update_cipher, 2705, "Attempt to update encryption cipher key")
ERROR( encrypt_invalid_id, 2706, "Invalid encryption domainId or encryption cipher key id")
ERROR( encrypt_invalid_id, 2706, "Invalid encryption cipher details")
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error

View File

@ -66,6 +66,7 @@ public-address = {ip_address}:$ID
listen-address = public
datadir = {datadir}/$ID
logdir = {logdir}
{bg_knob_line}
# logsize = 10MiB
# maxlogssize = 100MiB
# machine-id =
@ -82,7 +83,7 @@ logdir = {logdir}
"""
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str,
process_number: int, create_config=True, port=None, ip_address=None):
process_number: int, create_config=True, port=None, ip_address=None, blob_granules_enabled: bool=False):
self.basedir = Path(basedir)
self.etc = self.basedir.joinpath('etc')
self.log = self.basedir.joinpath('log')
@ -100,6 +101,11 @@ logdir = {logdir}
self.process_number = process_number
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
self.first_port = port
self.blob_granules_enabled = blob_granules_enabled
if (blob_granules_enabled):
# add extra process for blob_worker
self.process_number += 1
if (self.first_port is not None):
self.last_used_port = int(self.first_port)-1
self.server_ports = [self.__next_port()
@ -111,6 +117,7 @@ logdir = {logdir}
self.process = None
self.fdbmonitor_logfile = None
self.use_legacy_conf_syntax = False
if create_config:
self.create_cluster_file()
self.save_config()
@ -126,14 +133,18 @@ logdir = {logdir}
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
with open(new_conf_file, 'x') as f:
conf_template = LocalCluster.configuration_template
bg_knob_line = ""
if (self.use_legacy_conf_syntax):
conf_template = conf_template.replace("-", "_")
if (self.blob_granules_enabled):
bg_knob_line = "knob_bg_url=file://" + str(self.data) + "/fdbblob/"
f.write(conf_template.format(
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log,
ip_address=self.ip_address
ip_address=self.ip_address,
bg_knob_line=bg_knob_line
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
@ -143,6 +154,9 @@ logdir = {logdir}
for port in self.server_ports:
f.write('[fdbserver.{server_port}]\n'.format(
server_port=port))
if (self.blob_granules_enabled):
# make last process a blob_worker class
f.write('class = blob_worker')
f.flush()
os.fsync(f.fileno())
@ -202,12 +216,21 @@ logdir = {logdir}
db_config = 'configure new single {}'.format(storage)
if (enable_tenants):
db_config += " tenant_mode=optional_experimental"
if (self.blob_granules_enabled):
db_config += " blob_granules_enabled:=1"
args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', db_config]
res = subprocess.run(args, env=self.process_env())
assert res.returncode == 0, "Create database failed with {}".format(
res.returncode)
if (self.blob_granules_enabled):
bg_args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', 'blobrange start \\x00 \\xff']
bg_res = subprocess.run(bg_args, env=self.process_env())
assert bg_res.returncode == 0, "Start blob granules failed with {}".format(bg_res.returncode)
def get_status(self):
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
'status json']

View File

@ -11,7 +11,7 @@ from pathlib import Path
class TempCluster:
def __init__(self, build_dir: str, process_number: int = 1, port: str = None):
def __init__(self, build_dir: str, process_number: int = 1, port: str = None, blob_granules_enabled: bool = False):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
@ -27,6 +27,7 @@ class TempCluster:
self.build_dir.joinpath("bin", "fdbcli"),
process_number,
port=port,
blob_granules_enabled=blob_granules_enabled
)
self.log = self.cluster.log
self.etc = self.cluster.etc
@ -88,9 +89,14 @@ if __name__ == "__main__":
help='Do not dump cluster log on error',
action="store_true"
)
parser.add_argument(
'--blob-granules-enabled',
help='Enable blob granules',
action="store_true"
)
args = parser.parse_args()
errcode = 1
with TempCluster(args.build_dir, args.process_number) as cluster:
with TempCluster(args.build_dir, args.process_number, blob_granules_enabled=args.blob_granules_enabled) as cluster:
print("log-dir: {}".format(cluster.log))
print("etc-dir: {}".format(cluster.etc))
print("data-dir: {}".format(cluster.data))
@ -105,6 +111,8 @@ if __name__ == "__main__":
cmd_args.append(str(cluster.log))
elif cmd == "@ETC_DIR@":
cmd_args.append(str(cluster.etc))
elif cmd.startswith("@DATA_DIR@"):
cmd_args.append(str(cluster.data) + cmd[len("@DATA_DIR@"):])
else:
cmd_args.append(cmd)
env = dict(**os.environ)