Merge remote-tracking branch 'origin/main' into authz-code-probe-coverage
This commit is contained in:
commit
fa154ff978
|
@ -198,7 +198,7 @@ if(NOT WIN32)
|
|||
target_link_libraries(fdb_c_client_memory_test PRIVATE fdb_c Threads::Threads)
|
||||
|
||||
target_include_directories(fdb_c_api_tester_impl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/ ${CMAKE_SOURCE_DIR}/flow/include ${CMAKE_BINARY_DIR}/flow/include)
|
||||
target_link_libraries(fdb_c_api_tester_impl PRIVATE fdb_cpp toml11_target Threads::Threads fmt::fmt boost_target)
|
||||
target_link_libraries(fdb_c_api_tester_impl PRIVATE fdb_cpp toml11_target Threads::Threads fmt::fmt boost_target stdc++fs)
|
||||
target_link_libraries(fdb_c_api_tester_impl PRIVATE SimpleOpt)
|
||||
|
||||
target_include_directories(fdb_c_unit_tests_impl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/)
|
||||
|
|
|
@ -107,11 +107,11 @@ void ApiWorkload::randomOperation(TTaskFct cont) {
|
|||
}
|
||||
|
||||
fdb::Key ApiWorkload::randomKeyName() {
|
||||
return keyPrefix + Random::get().randomStringLowerCase(minKeyLength, maxKeyLength);
|
||||
return keyPrefix + Random::get().randomByteStringLowerCase(minKeyLength, maxKeyLength);
|
||||
}
|
||||
|
||||
fdb::Value ApiWorkload::randomValue() {
|
||||
return Random::get().randomStringLowerCase(minValueLength, maxValueLength);
|
||||
return Random::get().randomByteStringLowerCase(minValueLength, maxValueLength);
|
||||
}
|
||||
|
||||
fdb::Key ApiWorkload::randomNotExistingKey() {
|
||||
|
|
|
@ -78,7 +78,7 @@ private:
|
|||
seenReadSuccess = true;
|
||||
}
|
||||
if (err.code() != expectedError) {
|
||||
info(fmt::format("incorrect error. Expected {}, Got {}", err.code(), expectedError));
|
||||
info(fmt::format("incorrect error. Expected {}, Got {}", expectedError, err.code()));
|
||||
if (err.code() == error_code_blob_granule_transaction_too_old) {
|
||||
ASSERT(!seenReadSuccess);
|
||||
ctx->done();
|
||||
|
|
|
@ -35,8 +35,8 @@ public:
|
|||
void start() override { setAndGet(NO_OP_TASK); }
|
||||
|
||||
void setAndGet(TTaskFct cont) {
|
||||
fdb::Key key = keyPrefix + random.randomStringLowerCase(10, 100);
|
||||
fdb::Value value = random.randomStringLowerCase(10, 1000);
|
||||
fdb::Key key = keyPrefix + random.randomByteStringLowerCase(10, 100);
|
||||
fdb::Value value = random.randomByteStringLowerCase(10, 1000);
|
||||
execTransaction(
|
||||
[key, value](auto ctx) {
|
||||
ctx->tx().set(key, value);
|
||||
|
|
|
@ -65,6 +65,10 @@ std::unordered_map<std::string, std::function<void(const std::string& value, Tes
|
|||
[](const std::string& value, TestSpec* spec) { //
|
||||
spec->databasePerTransaction = (value == "true");
|
||||
} },
|
||||
{ "tamperClusterFile",
|
||||
[](const std::string& value, TestSpec* spec) { //
|
||||
spec->tamperClusterFile = (value == "true");
|
||||
} },
|
||||
{ "minFdbThreads",
|
||||
[](const std::string& value, TestSpec* spec) { //
|
||||
processIntOption(value, "minFdbThreads", spec->minFdbThreads, 1, 1000);
|
||||
|
|
|
@ -58,6 +58,9 @@ struct TestSpec {
|
|||
// Execute each transaction in a separate database instance
|
||||
bool databasePerTransaction = false;
|
||||
|
||||
// Test tampering the cluster file
|
||||
bool tamperClusterFile = false;
|
||||
|
||||
// Size of the FDB client thread pool (a random number in the [min,max] range)
|
||||
int minFdbThreads = 1;
|
||||
int maxFdbThreads = 1;
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <fmt/format.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
|
@ -84,7 +85,7 @@ public:
|
|||
Random::get().randomBool(executor->getOptions().databaseCreateErrorRatio);
|
||||
fdb::Database db;
|
||||
if (databaseCreateErrorInjected) {
|
||||
db = fdb::Database("not_existing_file");
|
||||
db = fdb::Database(executor->getClusterFileForErrorInjection());
|
||||
} else {
|
||||
db = executor->selectDatabase();
|
||||
}
|
||||
|
@ -97,7 +98,7 @@ public:
|
|||
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
|
||||
enum class TxState { IN_PROGRESS, ON_ERROR, DONE };
|
||||
|
||||
fdb::Transaction tx() override { return fdbTx; }
|
||||
fdb::Transaction tx() override { return fdbTx.atomic_load(); }
|
||||
|
||||
// Set a continuation to be executed when a future gets ready
|
||||
void continueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
|
||||
|
@ -135,9 +136,11 @@ public:
|
|||
retriedErrors.size(),
|
||||
fmt::join(retriedErrorCodes(), ", "));
|
||||
}
|
||||
|
||||
// cancel transaction so that any pending operations on it
|
||||
// fail gracefully
|
||||
fdbTx.cancel();
|
||||
|
||||
txActor->complete(fdb::Error::success());
|
||||
cleanUp();
|
||||
ASSERT(txState == TxState::DONE);
|
||||
|
@ -164,13 +167,12 @@ public:
|
|||
|
||||
ASSERT(!onErrorFuture);
|
||||
|
||||
if (databaseCreateErrorInjected) {
|
||||
if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) {
|
||||
// Failed to create a database because of failure injection
|
||||
// Restart by recreating the transaction in a valid database
|
||||
scheduler->schedule([this]() {
|
||||
databaseCreateErrorInjected = false;
|
||||
fdb::Database db = executor->selectDatabase();
|
||||
fdbTx = db.createTransaction();
|
||||
fdbTx.atomic_store(db.createTransaction());
|
||||
restartTransaction();
|
||||
});
|
||||
} else {
|
||||
|
@ -188,10 +190,17 @@ protected:
|
|||
// Clean up transaction state after completing the transaction
|
||||
// Note that the object may live longer, because it is referenced
|
||||
// by not yet triggered callbacks
|
||||
virtual void cleanUp() {
|
||||
void cleanUp() {
|
||||
ASSERT(txState == TxState::DONE);
|
||||
ASSERT(!onErrorFuture);
|
||||
txActor = {};
|
||||
cancelPendingFutures();
|
||||
}
|
||||
|
||||
virtual void cancelPendingFutures() {}
|
||||
|
||||
bool canBeInjectedDatabaseCreateError(fdb::Error::CodeType errCode) {
|
||||
return errCode == error_code_no_cluster_file_found || errCode == error_code_connection_string_invalid;
|
||||
}
|
||||
|
||||
// Complete the transaction with an (unretriable) error
|
||||
|
@ -225,8 +234,9 @@ protected:
|
|||
}
|
||||
|
||||
void restartTransaction() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
ASSERT(txState == TxState::ON_ERROR);
|
||||
cancelPendingFutures();
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
txState = TxState::IN_PROGRESS;
|
||||
commitCalled = false;
|
||||
lock.unlock();
|
||||
|
@ -415,7 +425,7 @@ protected:
|
|||
if (txState != TxState::IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
callbackMap[f] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow() };
|
||||
callbackMap[f] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow(), false };
|
||||
lock.unlock();
|
||||
try {
|
||||
f.then([this](fdb::Future f) { futureReadyCallback(f, this); });
|
||||
|
@ -462,7 +472,7 @@ protected:
|
|||
err.code(),
|
||||
err.what());
|
||||
}
|
||||
if (err.code() == error_code_transaction_cancelled) {
|
||||
if (err.code() == error_code_transaction_cancelled || cbInfo.cancelled) {
|
||||
return;
|
||||
}
|
||||
if (err.code() == error_code_success || !cbInfo.retryOnError) {
|
||||
|
@ -518,17 +528,17 @@ protected:
|
|||
scheduler->schedule([thisRef]() { thisRef->handleOnErrorResult(); });
|
||||
}
|
||||
|
||||
void cleanUp() override {
|
||||
TransactionContextBase::cleanUp();
|
||||
|
||||
void cancelPendingFutures() override {
|
||||
// Cancel all pending operations
|
||||
// Note that the callbacks of the cancelled futures will still be called
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::vector<fdb::Future> futures;
|
||||
for (auto& iter : callbackMap) {
|
||||
iter.second.cancelled = true;
|
||||
futures.push_back(iter.second.future);
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
for (auto& f : futures) {
|
||||
f.cancel();
|
||||
}
|
||||
|
@ -548,6 +558,7 @@ protected:
|
|||
std::shared_ptr<ITransactionContext> thisRef;
|
||||
bool retryOnError;
|
||||
TimePoint startTime;
|
||||
bool cancelled;
|
||||
};
|
||||
|
||||
// Map for keeping track of future waits and holding necessary object references
|
||||
|
@ -567,10 +578,53 @@ class TransactionExecutorBase : public ITransactionExecutor {
|
|||
public:
|
||||
TransactionExecutorBase(const TransactionExecutorOptions& options) : options(options), scheduler(nullptr) {}
|
||||
|
||||
~TransactionExecutorBase() {
|
||||
if (tamperClusterFileThread.joinable()) {
|
||||
tamperClusterFileThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
|
||||
this->scheduler = scheduler;
|
||||
this->clusterFile = clusterFile;
|
||||
this->bgBasePath = bgBasePath;
|
||||
|
||||
ASSERT(!options.tmpDir.empty());
|
||||
emptyClusterFile.create(options.tmpDir, "fdbempty.cluster");
|
||||
invalidClusterFile.create(options.tmpDir, "fdbinvalid.cluster");
|
||||
invalidClusterFile.write(Random().get().randomStringLowerCase<std::string>(1, 100));
|
||||
|
||||
emptyListClusterFile.create(options.tmpDir, "fdbemptylist.cluster");
|
||||
emptyListClusterFile.write(fmt::format("{}:{}@",
|
||||
Random().get().randomStringLowerCase<std::string>(3, 8),
|
||||
Random().get().randomStringLowerCase<std::string>(1, 100)));
|
||||
|
||||
if (options.tamperClusterFile) {
|
||||
tamperedClusterFile.create(options.tmpDir, "fdb.cluster");
|
||||
originalClusterFile = clusterFile;
|
||||
this->clusterFile = tamperedClusterFile.getFileName();
|
||||
|
||||
// begin with a valid cluster file, but with non existing address
|
||||
tamperedClusterFile.write(fmt::format("{}:{}@192.168.{}.{}:{}",
|
||||
Random().get().randomStringLowerCase<std::string>(3, 8),
|
||||
Random().get().randomStringLowerCase<std::string>(1, 100),
|
||||
Random().get().randomInt(1, 254),
|
||||
Random().get().randomInt(1, 254),
|
||||
Random().get().randomInt(2000, 10000)));
|
||||
|
||||
tamperClusterFileThread = std::thread([this]() {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
// now write an invalid connection string
|
||||
tamperedClusterFile.write(fmt::format("{}:{}@",
|
||||
Random().get().randomStringLowerCase<std::string>(3, 8),
|
||||
Random().get().randomStringLowerCase<std::string>(1, 100)));
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
// finally use correct cluster file contents
|
||||
std::filesystem::copy_file(std::filesystem::path(originalClusterFile),
|
||||
std::filesystem::path(tamperedClusterFile.getFileName()),
|
||||
std::filesystem::copy_options::overwrite_existing);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const TransactionExecutorOptions& getOptions() override { return options; }
|
||||
|
@ -593,11 +647,30 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
std::string getClusterFileForErrorInjection() override {
|
||||
switch (Random::get().randomInt(0, 3)) {
|
||||
case 0:
|
||||
return fmt::format("{}{}", "not-existing-file", Random::get().randomStringLowerCase<std::string>(0, 2));
|
||||
case 1:
|
||||
return emptyClusterFile.getFileName();
|
||||
case 2:
|
||||
return invalidClusterFile.getFileName();
|
||||
default: // case 3
|
||||
return emptyListClusterFile.getFileName();
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
TransactionExecutorOptions options;
|
||||
std::string bgBasePath;
|
||||
std::string clusterFile;
|
||||
IScheduler* scheduler;
|
||||
TmpFile emptyClusterFile;
|
||||
TmpFile invalidClusterFile;
|
||||
TmpFile emptyListClusterFile;
|
||||
TmpFile tamperedClusterFile;
|
||||
std::thread tamperClusterFileThread;
|
||||
std::string originalClusterFile;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -612,7 +685,7 @@ public:
|
|||
void init(IScheduler* scheduler, const char* clusterFile, const std::string& bgBasePath) override {
|
||||
TransactionExecutorBase::init(scheduler, clusterFile, bgBasePath);
|
||||
for (int i = 0; i < options.numDatabases; i++) {
|
||||
fdb::Database db(clusterFile);
|
||||
fdb::Database db(this->clusterFile);
|
||||
databases.push_back(db);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,8 +127,11 @@ struct TransactionExecutorOptions {
|
|||
// Enable injection of database create errors
|
||||
bool injectDatabaseCreateErrors = false;
|
||||
|
||||
// Test tampering cluster file contents
|
||||
bool tamperClusterFile = false;
|
||||
|
||||
// The probability of injected database create errors
|
||||
// Used if buggify = true
|
||||
// Used if injectDatabaseCreateErrors = true
|
||||
double databaseCreateErrorRatio = 0.1;
|
||||
|
||||
// The size of the database instance pool
|
||||
|
@ -136,6 +139,9 @@ struct TransactionExecutorOptions {
|
|||
|
||||
// Maximum number of retries per transaction (0 - unlimited)
|
||||
int transactionRetryLimit = 0;
|
||||
|
||||
// Temporary directory
|
||||
std::string tmpDir;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -149,6 +155,7 @@ public:
|
|||
virtual void init(IScheduler* sched, const char* clusterFile, const std::string& bgBasePath) = 0;
|
||||
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
|
||||
virtual fdb::Database selectDatabase() = 0;
|
||||
virtual std::string getClusterFileForErrorInjection() = 0;
|
||||
virtual const TransactionExecutorOptions& getOptions() = 0;
|
||||
};
|
||||
|
||||
|
|
|
@ -23,6 +23,9 @@
|
|||
#include <algorithm>
|
||||
#include <ctype.h>
|
||||
#include <chrono>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <string>
|
||||
|
||||
namespace FdbApiTester {
|
||||
|
||||
|
@ -46,16 +49,6 @@ Random& Random::get() {
|
|||
return random;
|
||||
}
|
||||
|
||||
fdb::ByteString Random::randomStringLowerCase(int minLength, int maxLength) {
|
||||
int length = randomInt(minLength, maxLength);
|
||||
fdb::ByteString str;
|
||||
str.reserve(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
str += (char)randomInt('a', 'z');
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
bool Random::randomBool(double trueRatio) {
|
||||
return std::uniform_real_distribution<double>(0.0, 1.0)(random) <= trueRatio;
|
||||
}
|
||||
|
@ -119,4 +112,39 @@ GranuleSummaryArray copyGranuleSummaryArray(fdb::future_var::GranuleSummaryRefAr
|
|||
return out;
|
||||
};
|
||||
|
||||
TmpFile::~TmpFile() {
|
||||
if (!filename.empty()) {
|
||||
remove();
|
||||
}
|
||||
}
|
||||
|
||||
void TmpFile::create(std::string_view dir, std::string_view prefix) {
|
||||
while (true) {
|
||||
filename = fmt::format("{}/{}-{}", dir, prefix, Random::get().randomStringLowerCase<std::string>(6, 6));
|
||||
if (!std::filesystem::exists(std::filesystem::path(filename))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Create an empty tmp file
|
||||
std::fstream tmpFile(filename, std::fstream::out);
|
||||
if (!tmpFile.good()) {
|
||||
throw TesterError(fmt::format("Failed to create temporary file {}\n", filename));
|
||||
}
|
||||
}
|
||||
|
||||
void TmpFile::write(std::string_view data) {
|
||||
std::ofstream ofs(filename, std::fstream::out | std::fstream::binary);
|
||||
if (!ofs.good()) {
|
||||
throw TesterError(fmt::format("Failed to write to the temporary file {}\n", filename));
|
||||
}
|
||||
ofs.write(data.data(), data.size());
|
||||
}
|
||||
|
||||
void TmpFile::remove() {
|
||||
if (!std::filesystem::remove(std::filesystem::path(filename))) {
|
||||
fmt::print(stderr, "Failed to remove file {}\n", filename);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace FdbApiTester
|
|
@ -66,7 +66,20 @@ public:
|
|||
|
||||
int randomInt(int min, int max);
|
||||
|
||||
fdb::ByteString randomStringLowerCase(int minLength, int maxLength);
|
||||
template <class StringType>
|
||||
StringType randomStringLowerCase(int minLength, int maxLength) {
|
||||
int length = randomInt(minLength, maxLength);
|
||||
StringType str;
|
||||
str.reserve(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
str += (char)randomInt('a', 'z');
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
fdb::ByteString randomByteStringLowerCase(int minLength, int maxLength) {
|
||||
return randomStringLowerCase<fdb::ByteString>(minLength, maxLength);
|
||||
}
|
||||
|
||||
bool randomBool(double trueRatio);
|
||||
|
||||
|
@ -142,6 +155,19 @@ static fdb::ByteString toByteString(T value) {
|
|||
return output;
|
||||
}
|
||||
|
||||
// Creates a temporary file; file gets destroyed/deleted along with object destruction.
|
||||
struct TmpFile {
|
||||
public:
|
||||
~TmpFile();
|
||||
void create(std::string_view dir, std::string_view prefix);
|
||||
void write(std::string_view data);
|
||||
void remove();
|
||||
const std::string& getFileName() const { return filename; }
|
||||
|
||||
private:
|
||||
std::string filename;
|
||||
};
|
||||
|
||||
} // namespace FdbApiTester
|
||||
|
||||
#endif
|
||||
|
|
|
@ -361,6 +361,8 @@ bool runWorkloads(TesterOptions& options) {
|
|||
// 7.1 and older releases crash on database create errors
|
||||
txExecOptions.injectDatabaseCreateErrors = options.testSpec.buggify && options.apiVersion > 710;
|
||||
txExecOptions.transactionRetryLimit = options.transactionRetryLimit;
|
||||
txExecOptions.tmpDir = options.tmpDir.empty() ? std::string("/tmp") : options.tmpDir;
|
||||
txExecOptions.tamperClusterFile = options.testSpec.tamperClusterFile;
|
||||
|
||||
std::vector<std::shared_ptr<IWorkload>> workloads;
|
||||
workloads.reserve(options.testSpec.workloads.size() * options.numClients);
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
[[test]]
|
||||
title = 'Test tampering the cluster file'
|
||||
multiThreaded = true
|
||||
buggify = true
|
||||
tamperClusterFile = true
|
||||
minFdbThreads = 2
|
||||
maxFdbThreads = 4
|
||||
minDatabases = 2
|
||||
maxDatabases = 4
|
||||
minClientThreads = 2
|
||||
maxClientThreads = 4
|
||||
minClients = 2
|
||||
maxClients = 4
|
||||
|
||||
[[test.workload]]
|
||||
name = 'ApiCorrectness'
|
||||
minKeyLength = 1
|
||||
maxKeyLength = 64
|
||||
minValueLength = 1
|
||||
maxValueLength = 1000
|
||||
maxKeysPerTransaction = 50
|
||||
initialSize = 100
|
||||
numRandomOperations = 100
|
||||
readExistingKeysRatio = 0.9
|
|
@ -505,6 +505,14 @@ public:
|
|||
Transaction(const Transaction&) noexcept = default;
|
||||
Transaction& operator=(const Transaction&) noexcept = default;
|
||||
|
||||
void atomic_store(Transaction other) { std::atomic_store(&tr, other.tr); }
|
||||
|
||||
Transaction atomic_load() {
|
||||
Transaction retVal;
|
||||
retVal.tr = std::atomic_load(&tr);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
bool valid() const noexcept { return tr != nullptr; }
|
||||
|
||||
explicit operator bool() const noexcept { return valid(); }
|
||||
|
|
|
@ -200,6 +200,9 @@ else()
|
|||
URL "https://github.com/ToruNiina/toml11/archive/v3.4.0.tar.gz"
|
||||
URL_HASH SHA256=bc6d733efd9216af8c119d8ac64a805578c79cc82b813e4d1d880ca128bd154d
|
||||
CMAKE_CACHE_ARGS
|
||||
-DCMAKE_BUILD_TYPE:STRING=${CMAKE_BUILD_TYPE}
|
||||
-DCMAKE_C_COMPILER:FILEPATH=${CMAKE_C_COMPILER}
|
||||
-DCMAKE_CXX_COMPILER:FILEPATH=${CMAKE_CXX_COMPILER}
|
||||
-DCMAKE_INSTALL_PREFIX:PATH=${CMAKE_CURRENT_BINARY_DIR}/toml11
|
||||
-Dtoml11_BUILD_TEST:BOOL=OFF
|
||||
BUILD_ALWAYS ON)
|
||||
|
|
|
@ -14,7 +14,7 @@ ExternalProject_add(Jemalloc_project
|
|||
BUILD_BYPRODUCTS "${JEMALLOC_DIR}/include/jemalloc/jemalloc.h"
|
||||
"${JEMALLOC_DIR}/lib/libjemalloc.a"
|
||||
"${JEMALLOC_DIR}/lib/libjemalloc_pic.a"
|
||||
CONFIGURE_COMMAND ./configure --prefix=${JEMALLOC_DIR} --enable-static --disable-cxx --enable-prof
|
||||
CONFIGURE_COMMAND CC=${CMAKE_C_COMPILER} CXX=${CMAKE_CXX_COMPILER} ./configure --prefix=${JEMALLOC_DIR} --enable-static --disable-cxx --enable-prof
|
||||
BUILD_IN_SOURCE ON
|
||||
BUILD_COMMAND make
|
||||
INSTALL_DIR "${JEMALLOC_DIR}"
|
||||
|
@ -24,4 +24,4 @@ add_dependencies(im_jemalloc_pic Jemalloc_project)
|
|||
set_target_properties(im_jemalloc_pic PROPERTIES IMPORTED_LOCATION "${JEMALLOC_DIR}/lib/libjemalloc_pic.a")
|
||||
set_target_properties(im_jemalloc PROPERTIES IMPORTED_LOCATION "${JEMALLOC_DIR}/lib/libjemalloc.a")
|
||||
target_include_directories(jemalloc INTERFACE "${JEMALLOC_DIR}/include")
|
||||
target_link_libraries(jemalloc INTERFACE im_jemalloc_pic im_jemalloc)
|
||||
target_link_libraries(jemalloc INTERFACE im_jemalloc_pic im_jemalloc)
|
||||
|
|
|
@ -259,7 +259,6 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
|
|||
fprintf(stderr,
|
||||
"Type `configure perpetual_storage_wiggle=1' to enable the perpetual wiggle, or `configure "
|
||||
"storage_migration_type=gradual' to set the gradual migration type.\n");
|
||||
ret = false;
|
||||
break;
|
||||
case ConfigurationResult::SUCCESS_WARN_ROCKSDB_EXPERIMENTAL:
|
||||
printf("Configuration changed\n");
|
||||
|
|
|
@ -18,8 +18,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/Knobs.h"
|
||||
|
@ -27,6 +28,7 @@
|
|||
#include "flow/FastRef.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/ITrace.h"
|
||||
#include "flow/Platform.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/Trace.h"
|
||||
|
@ -54,6 +56,42 @@ bool isEncryptHeaderAuthTokenModeValid(const EncryptAuthTokenMode mode) {
|
|||
}
|
||||
} // namespace
|
||||
|
||||
// BlobCipherMetrics methods
|
||||
|
||||
BlobCipherMetrics::CounterSet::CounterSet(CounterCollection& cc, std::string name)
|
||||
: encryptCPUTimeNS(name + "EncryptCPUTimeNS", cc), decryptCPUTimeNS(name + "DecryptCPUTimeNS", cc),
|
||||
getCipherKeysLatency(name + "GetCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
getLatestCipherKeysLatency(name + "GetLatestCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE) {}
|
||||
|
||||
BlobCipherMetrics::BlobCipherMetrics()
|
||||
: cc("BlobCipher"), cipherKeyCacheHit("CipherKeyCacheHit", cc), cipherKeyCacheMiss("CipherKeyCacheMiss", cc),
|
||||
cipherKeyCacheExpired("CipherKeyCacheExpired", cc), latestCipherKeyCacheHit("LatestCipherKeyCacheHit", cc),
|
||||
latestCipherKeyCacheMiss("LatestCipherKeyCacheMiss", cc),
|
||||
latestCipherKeyCacheNeedsRefresh("LatestCipherKeyCacheNeedsRefresh", cc),
|
||||
getCipherKeysLatency("GetCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
getLatestCipherKeysLatency("GetLatestCipherKeysLatency",
|
||||
UID(),
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
|
||||
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
|
||||
counterSets({ CounterSet(cc, "TLog"),
|
||||
CounterSet(cc, "KVMemory"),
|
||||
CounterSet(cc, "KVRedwood"),
|
||||
CounterSet(cc, "BlobGranule"),
|
||||
CounterSet(cc, "Backup"),
|
||||
CounterSet(cc, "Test") }) {
|
||||
specialCounter(cc, "CacheSize", []() { return BlobCipherKeyCache::getInstance()->getSize(); });
|
||||
traceFuture = traceCounters("BlobCipherMetrics", UID(), FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, &cc);
|
||||
}
|
||||
|
||||
// BlobCipherKey class methods
|
||||
|
||||
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
|
||||
|
@ -144,11 +182,9 @@ void BlobCipherKey::reset() {
|
|||
|
||||
// BlobKeyIdCache class methods
|
||||
|
||||
BlobCipherKeyIdCache::BlobCipherKeyIdCache()
|
||||
: domainId(ENCRYPT_INVALID_DOMAIN_ID), latestBaseCipherKeyId(), latestRandomSalt() {}
|
||||
|
||||
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
|
||||
: domainId(dId), latestBaseCipherKeyId(), latestRandomSalt() {
|
||||
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId, size_t* sizeStat)
|
||||
: domainId(dId), latestBaseCipherKeyId(), latestRandomSalt(), sizeStat(sizeStat) {
|
||||
ASSERT(sizeStat != nullptr);
|
||||
TraceEvent(SevInfo, "BlobCipher.KeyIdCacheInit").detail("DomainId", domainId);
|
||||
}
|
||||
|
||||
|
@ -223,6 +259,7 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
|
|||
latestBaseCipherKeyId = baseCipherId;
|
||||
latestRandomSalt = cipherKey->getSalt();
|
||||
|
||||
(*sizeStat)++;
|
||||
return cipherKey;
|
||||
}
|
||||
|
||||
|
@ -267,6 +304,7 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
|
|||
Reference<BlobCipherKey> cipherKey =
|
||||
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
|
||||
keyIdCache.emplace(cacheKey, cipherKey);
|
||||
(*sizeStat)++;
|
||||
return cipherKey;
|
||||
}
|
||||
|
||||
|
@ -298,19 +336,19 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
|
|||
throw encrypt_invalid_id();
|
||||
}
|
||||
|
||||
Reference<BlobCipherKey> cipherKey;
|
||||
|
||||
try {
|
||||
auto domainItr = domainCacheMap.find(domainId);
|
||||
if (domainItr == domainCacheMap.end()) {
|
||||
// Add mapping to track new encryption domain
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
|
||||
Reference<BlobCipherKey> cipherKey =
|
||||
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId, &size);
|
||||
cipherKey = keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
|
||||
domainCacheMap.emplace(domainId, keyIdCache);
|
||||
return cipherKey;
|
||||
} else {
|
||||
// Track new baseCipher keys
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
|
||||
return keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
|
||||
cipherKey = keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "BlobCipher.InsertCipherKeyFailed")
|
||||
|
@ -318,6 +356,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
|
|||
.detail("DomainId", domainId);
|
||||
throw;
|
||||
}
|
||||
return cipherKey;
|
||||
}
|
||||
|
||||
Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
|
||||
|
@ -337,7 +376,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
|
|||
auto domainItr = domainCacheMap.find(domainId);
|
||||
if (domainItr == domainCacheMap.end()) {
|
||||
// Add mapping to track new encryption domain
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId, &size);
|
||||
cipherKey =
|
||||
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
|
||||
domainCacheMap.emplace(domainId, keyIdCache);
|
||||
|
@ -354,7 +393,6 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
|
|||
.detail("Salt", salt);
|
||||
throw;
|
||||
}
|
||||
|
||||
return cipherKey;
|
||||
}
|
||||
|
||||
|
@ -373,15 +411,20 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip
|
|||
Reference<BlobCipherKey> cipherKey = keyIdCache->getLatestCipherKey();
|
||||
|
||||
// Ensure 'freshness' guarantees for the latestCipher
|
||||
if (cipherKey.isValid() && cipherKey->needsRefresh()) {
|
||||
if (cipherKey.isValid()) {
|
||||
if (cipherKey->needsRefresh()) {
|
||||
#if BLOB_CIPHER_DEBUG
|
||||
TraceEvent("SevDebug, BlobCipher.GetLatestNeedsRefresh")
|
||||
.detail("DomainId", domainId)
|
||||
.detail("Now", now())
|
||||
.detail("RefreshAt", cipherKey->getRefreshAtTS());
|
||||
TraceEvent("SevDebug, BlobCipher.GetLatestNeedsRefresh")
|
||||
.detail("DomainId", domainId)
|
||||
.detail("Now", now())
|
||||
.detail("RefreshAt", cipherKey->getRefreshAtTS());
|
||||
#endif
|
||||
|
||||
return Reference<BlobCipherKey>();
|
||||
++BlobCipherMetrics::getInstance()->latestCipherKeyCacheNeedsRefresh;
|
||||
return Reference<BlobCipherKey>();
|
||||
}
|
||||
++BlobCipherMetrics::getInstance()->latestCipherKeyCacheHit;
|
||||
} else {
|
||||
++BlobCipherMetrics::getInstance()->latestCipherKeyCacheMiss;
|
||||
}
|
||||
|
||||
return cipherKey;
|
||||
|
@ -399,16 +442,21 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDom
|
|||
Reference<BlobCipherKey> cipherKey = keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
|
||||
|
||||
// Ensure 'liveness' guarantees for the cipher
|
||||
if (cipherKey.isValid() && cipherKey->isExpired()) {
|
||||
if (cipherKey.isValid()) {
|
||||
if (cipherKey->isExpired()) {
|
||||
#if BLOB_CIPHER_DEBUG
|
||||
TraceEvent(SevDebug, "BlobCipher.GetCipherExpired")
|
||||
.detail("DomainId", domainId)
|
||||
.detail("BaseCipherId", baseCipherId)
|
||||
.detail("Now", now())
|
||||
.detail("ExpireAt", cipherKey->getExpireAtTS());
|
||||
TraceEvent(SevDebug, "BlobCipher.GetCipherExpired")
|
||||
.detail("DomainId", domainId)
|
||||
.detail("BaseCipherId", baseCipherId)
|
||||
.detail("Now", now())
|
||||
.detail("ExpireAt", cipherKey->getExpireAtTS());
|
||||
#endif
|
||||
|
||||
return Reference<BlobCipherKey>();
|
||||
++BlobCipherMetrics::getInstance()->cipherKeyCacheExpired;
|
||||
return Reference<BlobCipherKey>();
|
||||
}
|
||||
++BlobCipherMetrics::getInstance()->cipherKeyCacheHit;
|
||||
} else {
|
||||
++BlobCipherMetrics::getInstance()->cipherKeyCacheMiss;
|
||||
}
|
||||
|
||||
return cipherKey;
|
||||
|
@ -421,6 +469,8 @@ void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domain
|
|||
}
|
||||
|
||||
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
|
||||
ASSERT(keyIdCache->getSize() <= size);
|
||||
size -= keyIdCache->getSize();
|
||||
keyIdCache->cleanup();
|
||||
TraceEvent(SevInfo, "BlobCipher.ResetEncryptDomainId").detail("DomainId", domainId);
|
||||
}
|
||||
|
@ -437,6 +487,7 @@ void BlobCipherKeyCache::cleanup() noexcept {
|
|||
}
|
||||
|
||||
instance->domainCacheMap.clear();
|
||||
instance->size = 0;
|
||||
}
|
||||
|
||||
std::vector<Reference<BlobCipherKey>> BlobCipherKeyCache::getAllCiphers(const EncryptCipherDomainId& domainId) {
|
||||
|
@ -455,8 +506,10 @@ EncryptBlobCipherAes265Ctr::EncryptBlobCipherAes265Ctr(Reference<BlobCipherKey>
|
|||
Reference<BlobCipherKey> hCipherKey,
|
||||
const uint8_t* cipherIV,
|
||||
const int ivLen,
|
||||
const EncryptAuthTokenMode mode)
|
||||
: ctx(EVP_CIPHER_CTX_new()), textCipherKey(tCipherKey), headerCipherKey(hCipherKey), authTokenMode(mode) {
|
||||
const EncryptAuthTokenMode mode,
|
||||
BlobCipherMetrics::UsageType usageType)
|
||||
: ctx(EVP_CIPHER_CTX_new()), textCipherKey(tCipherKey), headerCipherKey(hCipherKey), authTokenMode(mode),
|
||||
usageType(usageType) {
|
||||
ASSERT(isEncryptHeaderAuthTokenModeValid(mode));
|
||||
ASSERT_EQ(ivLen, AES_256_IV_LENGTH);
|
||||
memcpy(&iv[0], cipherIV, ivLen);
|
||||
|
@ -465,8 +518,10 @@ EncryptBlobCipherAes265Ctr::EncryptBlobCipherAes265Ctr(Reference<BlobCipherKey>
|
|||
|
||||
EncryptBlobCipherAes265Ctr::EncryptBlobCipherAes265Ctr(Reference<BlobCipherKey> tCipherKey,
|
||||
Reference<BlobCipherKey> hCipherKey,
|
||||
const EncryptAuthTokenMode mode)
|
||||
: ctx(EVP_CIPHER_CTX_new()), textCipherKey(tCipherKey), headerCipherKey(hCipherKey), authTokenMode(mode) {
|
||||
const EncryptAuthTokenMode mode,
|
||||
BlobCipherMetrics::UsageType usageType)
|
||||
: ctx(EVP_CIPHER_CTX_new()), textCipherKey(tCipherKey), headerCipherKey(hCipherKey), authTokenMode(mode),
|
||||
usageType(usageType) {
|
||||
ASSERT(isEncryptHeaderAuthTokenModeValid(mode));
|
||||
deterministicRandom()->randomBytes(iv, AES_256_IV_LENGTH);
|
||||
init();
|
||||
|
@ -488,6 +543,10 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
|
|||
const int plaintextLen,
|
||||
BlobCipherEncryptHeader* header,
|
||||
Arena& arena) {
|
||||
double startTime = 0.0;
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
startTime = timer_monotonic();
|
||||
}
|
||||
CODE_PROBE(true, "Encrypting data with BlobCipher");
|
||||
|
||||
memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader));
|
||||
|
@ -577,11 +636,18 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
|
|||
}
|
||||
|
||||
encryptBuf->setLogicalSize(plaintextLen);
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
BlobCipherMetrics::counters(usageType).encryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
|
||||
}
|
||||
return encryptBuf;
|
||||
}
|
||||
|
||||
Standalone<StringRef> EncryptBlobCipherAes265Ctr::encryptBlobGranuleChunk(const uint8_t* plaintext,
|
||||
const int plaintextLen) {
|
||||
double startTime = 0.0;
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
startTime = timer_monotonic();
|
||||
}
|
||||
Standalone<StringRef> encrypted = makeString(plaintextLen);
|
||||
uint8_t* ciphertext = mutateString(encrypted);
|
||||
int bytes{ 0 };
|
||||
|
@ -605,6 +671,9 @@ Standalone<StringRef> EncryptBlobCipherAes265Ctr::encryptBlobGranuleChunk(const
|
|||
.detail("EncryptedBufLen", bytes + finalBytes);
|
||||
throw encrypt_ops_error();
|
||||
}
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
BlobCipherMetrics::counters(usageType).encryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
|
||||
}
|
||||
return encrypted;
|
||||
}
|
||||
|
||||
|
@ -618,9 +687,10 @@ EncryptBlobCipherAes265Ctr::~EncryptBlobCipherAes265Ctr() {
|
|||
|
||||
DecryptBlobCipherAes256Ctr::DecryptBlobCipherAes256Ctr(Reference<BlobCipherKey> tCipherKey,
|
||||
Reference<BlobCipherKey> hCipherKey,
|
||||
const uint8_t* iv)
|
||||
const uint8_t* iv,
|
||||
BlobCipherMetrics::UsageType usageType)
|
||||
: ctx(EVP_CIPHER_CTX_new()), textCipherKey(tCipherKey), headerCipherKey(hCipherKey),
|
||||
headerAuthTokenValidationDone(false), authTokensValidationDone(false) {
|
||||
headerAuthTokenValidationDone(false), authTokensValidationDone(false), usageType(usageType) {
|
||||
if (ctx == nullptr) {
|
||||
throw encrypt_ops_error();
|
||||
}
|
||||
|
@ -754,6 +824,11 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
|
|||
Arena& arena) {
|
||||
CODE_PROBE(true, "Decrypting data with BlobCipher");
|
||||
|
||||
double startTime = 0.0;
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
startTime = timer_monotonic();
|
||||
}
|
||||
|
||||
verifyEncryptHeaderMetadata(header);
|
||||
|
||||
if (header.flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE && !headerCipherKey.isValid()) {
|
||||
|
@ -797,6 +872,9 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
|
|||
}
|
||||
|
||||
decrypted->setLogicalSize(ciphertextLen);
|
||||
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
|
||||
BlobCipherMetrics::counters(usageType).decryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
|
||||
}
|
||||
return decrypted;
|
||||
}
|
||||
|
||||
|
@ -1017,8 +1095,12 @@ TEST_CASE("flow/BlobCipher") {
|
|||
{
|
||||
TraceEvent("NoneAuthMode.Start").log();
|
||||
|
||||
EncryptBlobCipherAes265Ctr encryptor(
|
||||
cipherKey, Reference<BlobCipherKey>(), iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE);
|
||||
EncryptBlobCipherAes265Ctr encryptor(cipherKey,
|
||||
Reference<BlobCipherKey>(),
|
||||
iv,
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE,
|
||||
BlobCipherMetrics::TEST);
|
||||
BlobCipherEncryptHeader header;
|
||||
Reference<EncryptBuf> encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
|
||||
|
||||
|
@ -1038,7 +1120,8 @@ TEST_CASE("flow/BlobCipher") {
|
|||
header.cipherTextDetails.baseCipherId,
|
||||
header.cipherTextDetails.salt);
|
||||
ASSERT(tCipherKeyKey->isEqual(cipherKey));
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, Reference<BlobCipherKey>(), &header.iv[0]);
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
tCipherKeyKey, Reference<BlobCipherKey>(), &header.iv[0], BlobCipherMetrics::TEST);
|
||||
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
|
||||
|
||||
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
|
||||
|
@ -1053,7 +1136,8 @@ TEST_CASE("flow/BlobCipher") {
|
|||
headerCopy.flags.headerVersion += 1;
|
||||
try {
|
||||
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, Reference<BlobCipherKey>(), header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
tCipherKeyKey, Reference<BlobCipherKey>(), header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1069,7 +1153,8 @@ TEST_CASE("flow/BlobCipher") {
|
|||
headerCopy.flags.encryptMode += 1;
|
||||
try {
|
||||
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, Reference<BlobCipherKey>(), header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
tCipherKeyKey, Reference<BlobCipherKey>(), header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1085,7 +1170,8 @@ TEST_CASE("flow/BlobCipher") {
|
|||
memcpy(encrypted->begin(), &temp[0], bufLen);
|
||||
int tIdx = deterministicRandom()->randomInt(0, bufLen - 1);
|
||||
temp[tIdx] += 1;
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, Reference<BlobCipherKey>(), header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
tCipherKeyKey, Reference<BlobCipherKey>(), header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(&temp[0], bufLen, header, arena);
|
||||
} catch (Error& e) {
|
||||
// No authToken, hence, no corruption detection supported
|
||||
|
@ -1099,8 +1185,12 @@ TEST_CASE("flow/BlobCipher") {
|
|||
{
|
||||
TraceEvent("SingleAuthMode.Start").log();
|
||||
|
||||
EncryptBlobCipherAes265Ctr encryptor(
|
||||
cipherKey, headerCipherKey, iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
EncryptBlobCipherAes265Ctr encryptor(cipherKey,
|
||||
headerCipherKey,
|
||||
iv,
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
BlobCipherMetrics::TEST);
|
||||
BlobCipherEncryptHeader header;
|
||||
Reference<EncryptBuf> encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
|
||||
|
||||
|
@ -1125,7 +1215,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
header.cipherHeaderDetails.baseCipherId,
|
||||
header.cipherHeaderDetails.salt);
|
||||
ASSERT(tCipherKeyKey->isEqual(cipherKey));
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
|
||||
|
||||
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
|
||||
|
@ -1140,7 +1230,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
sizeof(BlobCipherEncryptHeader));
|
||||
headerCopy.flags.headerVersion += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1156,7 +1246,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
sizeof(BlobCipherEncryptHeader));
|
||||
headerCopy.flags.encryptMode += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1173,7 +1263,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
int hIdx = deterministicRandom()->randomInt(0, AUTH_TOKEN_SIZE - 1);
|
||||
headerCopy.singleAuthToken.authToken[hIdx] += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1189,7 +1279,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
memcpy(encrypted->begin(), &temp[0], bufLen);
|
||||
int tIdx = deterministicRandom()->randomInt(0, bufLen - 1);
|
||||
temp[tIdx] += 1;
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(&temp[0], bufLen, header, arena);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_encrypt_header_authtoken_mismatch) {
|
||||
|
@ -1204,8 +1294,12 @@ TEST_CASE("flow/BlobCipher") {
|
|||
{
|
||||
TraceEvent("MultiAuthMode.Start").log();
|
||||
|
||||
EncryptBlobCipherAes265Ctr encryptor(
|
||||
cipherKey, headerCipherKey, iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI);
|
||||
EncryptBlobCipherAes265Ctr encryptor(cipherKey,
|
||||
headerCipherKey,
|
||||
iv,
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI,
|
||||
BlobCipherMetrics::TEST);
|
||||
BlobCipherEncryptHeader header;
|
||||
Reference<EncryptBuf> encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
|
||||
|
||||
|
@ -1231,7 +1325,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
header.cipherHeaderDetails.salt);
|
||||
|
||||
ASSERT(tCipherKey->isEqual(cipherKey));
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
|
||||
|
||||
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
|
||||
|
@ -1246,7 +1340,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
sizeof(BlobCipherEncryptHeader));
|
||||
headerCopy.flags.headerVersion += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1262,7 +1356,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
sizeof(BlobCipherEncryptHeader));
|
||||
headerCopy.flags.encryptMode += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1279,7 +1373,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
int hIdx = deterministicRandom()->randomInt(0, AUTH_TOKEN_SIZE - 1);
|
||||
headerCopy.multiAuthTokens.cipherTextAuthToken[hIdx] += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1296,7 +1390,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
hIdx = deterministicRandom()->randomInt(0, AUTH_TOKEN_SIZE - 1);
|
||||
headerCopy.multiAuthTokens.headerAuthToken[hIdx] += 1;
|
||||
try {
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(encrypted->begin(), bufLen, headerCopy, arena);
|
||||
ASSERT(false); // error expected
|
||||
} catch (Error& e) {
|
||||
|
@ -1311,7 +1405,7 @@ TEST_CASE("flow/BlobCipher") {
|
|||
memcpy(encrypted->begin(), &temp[0], bufLen);
|
||||
int tIdx = deterministicRandom()->randomInt(0, bufLen - 1);
|
||||
temp[tIdx] += 1;
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
decrypted = decryptor.decrypt(&temp[0], bufLen, header, arena);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_encrypt_header_authtoken_mismatch) {
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "fdbclient/BlobGranuleFiles.h"
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/ClientKnobs.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
|
@ -27,7 +28,6 @@
|
|||
#include "fdbclient/SystemData.h" // for allKeys unit test - could remove
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/CompressionUtils.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "flow/IRandom.h"
|
||||
|
@ -304,7 +304,8 @@ struct IndexBlockRef {
|
|||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
Value serializedBuff = ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
|
||||
BlobCipherEncryptHeader header;
|
||||
buffer = encryptor.encrypt(serializedBuff.contents().begin(), serializedBuff.contents().size(), &header, arena)
|
||||
|
@ -332,7 +333,8 @@ struct IndexBlockRef {
|
|||
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin());
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
|
||||
StringRef decrypted =
|
||||
decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), header, arena)->toStringRef();
|
||||
|
||||
|
@ -425,7 +427,8 @@ struct IndexBlobGranuleFileChunkRef {
|
|||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
BlobCipherEncryptHeader header;
|
||||
chunkRef.buffer =
|
||||
encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &header, arena)->toStringRef();
|
||||
|
@ -454,7 +457,8 @@ struct IndexBlobGranuleFileChunkRef {
|
|||
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin());
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
|
||||
StringRef decrypted =
|
||||
decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), header, arena)->toStringRef();
|
||||
|
||||
|
|
|
@ -291,6 +291,8 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( METACLUSTER_ASSIGNMENT_FIRST_CHOICE_DELAY, 1.0 ); if ( randomize && BUGGIFY ) METACLUSTER_ASSIGNMENT_FIRST_CHOICE_DELAY = deterministicRandom()->random01() * 60;
|
||||
init( METACLUSTER_ASSIGNMENT_AVAILABILITY_TIMEOUT, 10.0 ); if ( randomize && BUGGIFY ) METACLUSTER_ASSIGNMENT_AVAILABILITY_TIMEOUT = 1 + deterministicRandom()->random01() * 59;
|
||||
init( TENANT_ENTRY_CACHE_LIST_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_ENTRY_CACHE_LIST_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
|
||||
|
||||
init( ENABLE_ENCRYPTION_CPU_TIME_LOGGING, false );
|
||||
// clang-format on
|
||||
}
|
||||
|
||||
|
|
|
@ -1355,6 +1355,7 @@ ACTOR Future<Void> excludeServers(Database cx, std::vector<AddressExclusion> ser
|
|||
state ReadYourWritesTransaction ryw(cx);
|
||||
loop {
|
||||
try {
|
||||
ryw.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
ryw.set(
|
||||
SpecialKeySpace::getManagementApiCommandOptionSpecialKey(failed ? "failed" : "excluded", "force"),
|
||||
|
@ -1417,6 +1418,7 @@ ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string
|
|||
state ReadYourWritesTransaction ryw(cx);
|
||||
loop {
|
||||
try {
|
||||
ryw.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
|
||||
failed ? "failed_locality" : "excluded_locality", "force"),
|
||||
|
@ -1459,6 +1461,7 @@ ACTOR Future<Void> includeServers(Database cx, std::vector<AddressExclusion> ser
|
|||
state ReadYourWritesTransaction ryw(cx);
|
||||
loop {
|
||||
try {
|
||||
ryw.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
for (auto& s : servers) {
|
||||
if (!s.isValid()) {
|
||||
|
@ -1562,6 +1565,7 @@ ACTOR Future<Void> includeLocalities(Database cx, std::vector<std::string> local
|
|||
state ReadYourWritesTransaction ryw(cx);
|
||||
loop {
|
||||
try {
|
||||
ryw.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
if (includeAll) {
|
||||
if (failed) {
|
||||
|
|
|
@ -861,6 +861,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
for (const auto& c : cs.coords) {
|
||||
clientLeaderServers.push_back(ClientLeaderRegInterface(c));
|
||||
}
|
||||
ASSERT(clientLeaderServers.size() > 0);
|
||||
|
||||
deterministicRandom()->randomShuffle(clientLeaderServers);
|
||||
|
||||
|
@ -880,7 +881,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
bool upToDate = wait(connRecord->upToDate(storedConnectionString));
|
||||
if (upToDate) {
|
||||
incorrectTime = Optional<double>();
|
||||
} else if (allConnectionsFailed) {
|
||||
} else if (allConnectionsFailed && storedConnectionString.getNumberOfCoordinators() > 0) {
|
||||
// Failed to connect to all coordinators from the current connection string,
|
||||
// so it is not possible to get any new updates from the cluster. It can be that
|
||||
// all the coordinators have changed, but the client missed that, because it had
|
||||
|
@ -938,6 +939,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
.detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString());
|
||||
info.intermediateConnRecord = connRecord->makeIntermediateRecord(
|
||||
ClusterConnectionString(rep.get().read().forward.get().toString()));
|
||||
ASSERT(info.intermediateConnRecord->getConnectionString().getNumberOfCoordinators() > 0);
|
||||
return info;
|
||||
}
|
||||
if (connRecord != info.intermediateConnRecord) {
|
||||
|
@ -984,6 +986,7 @@ ACTOR Future<Void> monitorProxies(
|
|||
Key traceLogGroup) {
|
||||
state MonitorLeaderInfo info(connRecord->get());
|
||||
loop {
|
||||
ASSERT(connRecord->get().isValid());
|
||||
choose {
|
||||
when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration(
|
||||
connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) {
|
||||
|
|
|
@ -652,7 +652,7 @@ ThreadFuture<bool> DLDatabase::blobbifyRange(const KeyRangeRef& keyRange) {
|
|||
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
|
||||
|
||||
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
bool ret = false;
|
||||
FdbCApi::fdb_bool_t ret = false;
|
||||
ASSERT(!api->futureGetBool(f, &ret));
|
||||
return ret;
|
||||
});
|
||||
|
@ -667,7 +667,7 @@ ThreadFuture<bool> DLDatabase::unblobbifyRange(const KeyRangeRef& keyRange) {
|
|||
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
|
||||
|
||||
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
bool ret = false;
|
||||
FdbCApi::fdb_bool_t ret = false;
|
||||
ASSERT(!api->futureGetBool(f, &ret));
|
||||
return ret;
|
||||
});
|
||||
|
|
|
@ -1296,12 +1296,17 @@ struct SingleSpecialKeyImpl : SpecialKeyRangeReadImpl {
|
|||
});
|
||||
}
|
||||
|
||||
SingleSpecialKeyImpl(KeyRef k, const std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)>& f)
|
||||
: SpecialKeyRangeReadImpl(singleKeyRange(k)), k(k), f(f) {}
|
||||
SingleSpecialKeyImpl(KeyRef k,
|
||||
const std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)>& f,
|
||||
bool supportsTenants = false)
|
||||
: SpecialKeyRangeReadImpl(singleKeyRange(k)), k(k), f(f), tenantSupport(supportsTenants) {}
|
||||
|
||||
bool supportsTenants() const override { return tenantSupport; };
|
||||
|
||||
private:
|
||||
Key k;
|
||||
std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)> f;
|
||||
bool tenantSupport;
|
||||
};
|
||||
|
||||
class HealthMetricsRangeImpl : public SpecialKeyRangeAsyncImpl {
|
||||
|
@ -1511,7 +1516,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
|
||||
else
|
||||
return Optional<Value>();
|
||||
}));
|
||||
},
|
||||
true));
|
||||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
|
@ -1658,7 +1664,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
} else {
|
||||
return Optional<Value>();
|
||||
}
|
||||
}));
|
||||
},
|
||||
true));
|
||||
registerSpecialKeysImpl(SpecialKeySpace::MODULE::CLUSTERFILEPATH,
|
||||
SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
std::make_unique<SingleSpecialKeyImpl>(
|
||||
|
@ -1675,7 +1682,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
return e;
|
||||
}
|
||||
return Optional<Value>();
|
||||
}));
|
||||
},
|
||||
true));
|
||||
|
||||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::CONNECTIONSTRING,
|
||||
|
@ -1693,22 +1701,25 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
return e;
|
||||
}
|
||||
return Optional<Value>();
|
||||
}));
|
||||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::CLUSTERID,
|
||||
SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
std::make_unique<SingleSpecialKeyImpl>(
|
||||
LiteralStringRef("\xff\xff/cluster_id"), [](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
|
||||
try {
|
||||
if (ryw->getDatabase().getPtr()) {
|
||||
return map(getClusterId(ryw->getDatabase()),
|
||||
[](UID id) { return Optional<Value>(StringRef(id.toString())); });
|
||||
}
|
||||
} catch (Error& e) {
|
||||
return e;
|
||||
}
|
||||
return Optional<Value>();
|
||||
}));
|
||||
},
|
||||
true));
|
||||
registerSpecialKeysImpl(SpecialKeySpace::MODULE::CLUSTERID,
|
||||
SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
std::make_unique<SingleSpecialKeyImpl>(
|
||||
LiteralStringRef("\xff\xff/cluster_id"),
|
||||
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
|
||||
try {
|
||||
if (ryw->getDatabase().getPtr()) {
|
||||
return map(getClusterId(ryw->getDatabase()), [](UID id) {
|
||||
return Optional<Value>(StringRef(id.toString()));
|
||||
});
|
||||
}
|
||||
} catch (Error& e) {
|
||||
return e;
|
||||
}
|
||||
return Optional<Value>();
|
||||
},
|
||||
true));
|
||||
|
||||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT,
|
||||
|
@ -6112,30 +6123,46 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
|
|||
// TODO: send the prefix as part of the commit request and ship it all the way
|
||||
// through to the storage servers
|
||||
void applyTenantPrefix(CommitTransactionRequest& req, Key tenantPrefix) {
|
||||
VectorRef<MutationRef> updatedMutations;
|
||||
updatedMutations.reserve(req.arena, req.transaction.mutations.size());
|
||||
for (auto& m : req.transaction.mutations) {
|
||||
StringRef param1 = m.param1;
|
||||
StringRef param2 = m.param2;
|
||||
if (m.param1 != metadataVersionKey) {
|
||||
m.param1 = m.param1.withPrefix(tenantPrefix, req.arena);
|
||||
param1 = m.param1.withPrefix(tenantPrefix, req.arena);
|
||||
if (m.type == MutationRef::ClearRange) {
|
||||
m.param2 = m.param2.withPrefix(tenantPrefix, req.arena);
|
||||
param2 = m.param2.withPrefix(tenantPrefix, req.arena);
|
||||
} else if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
uint8_t* key = mutateString(m.param1);
|
||||
int* offset = reinterpret_cast<int*>(&key[m.param1.size() - 4]);
|
||||
uint8_t* key = mutateString(param1);
|
||||
int* offset = reinterpret_cast<int*>(&key[param1.size() - 4]);
|
||||
*offset += tenantPrefix.size();
|
||||
}
|
||||
}
|
||||
updatedMutations.push_back(req.arena, MutationRef(MutationRef::Type(m.type), param1, param2));
|
||||
}
|
||||
req.transaction.mutations = updatedMutations;
|
||||
|
||||
for (auto& rc : req.transaction.read_conflict_ranges) {
|
||||
VectorRef<KeyRangeRef> updatedReadConflictRanges;
|
||||
updatedReadConflictRanges.reserve(req.arena, req.transaction.read_conflict_ranges.size());
|
||||
for (auto const& rc : req.transaction.read_conflict_ranges) {
|
||||
if (rc.begin != metadataVersionKey) {
|
||||
rc = rc.withPrefix(tenantPrefix, req.arena);
|
||||
updatedReadConflictRanges.push_back(req.arena, rc.withPrefix(tenantPrefix, req.arena));
|
||||
} else {
|
||||
updatedReadConflictRanges.push_back(req.arena, rc);
|
||||
}
|
||||
}
|
||||
req.transaction.read_conflict_ranges = updatedReadConflictRanges;
|
||||
|
||||
VectorRef<KeyRangeRef> updatedWriteConflictRanges;
|
||||
updatedWriteConflictRanges.reserve(req.arena, req.transaction.write_conflict_ranges.size());
|
||||
for (auto& wc : req.transaction.write_conflict_ranges) {
|
||||
if (wc.begin != metadataVersionKey) {
|
||||
wc = wc.withPrefix(tenantPrefix, req.arena);
|
||||
updatedWriteConflictRanges.push_back(req.arena, wc.withPrefix(tenantPrefix, req.arena));
|
||||
} else {
|
||||
updatedWriteConflictRanges.push_back(req.arena, wc);
|
||||
}
|
||||
}
|
||||
req.transaction.write_conflict_ranges = updatedWriteConflictRanges;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
||||
|
|
|
@ -156,6 +156,11 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeReadImpl*
|
|||
// never being called if KeySelector is already normalized
|
||||
ASSERT(ks->offset != 1);
|
||||
|
||||
// Throw error if module doesn't support tenants and we have a tenant
|
||||
if (ryw->getTenant().present() && !skrImpl->supportsTenants()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
|
||||
state Key startKey(skrImpl->getKeyRange().begin);
|
||||
state Key endKey(skrImpl->getKeyRange().end);
|
||||
state RangeResult result;
|
||||
|
@ -376,6 +381,21 @@ ACTOR Future<RangeResult> SpecialKeySpace::getRangeAggregationActor(SpecialKeySp
|
|||
}
|
||||
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::Ranges ranges =
|
||||
sks->getReadImpls().intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
|
||||
|
||||
// Check tenant legality separately from below iterations
|
||||
// because it may be partially completed and returned
|
||||
// before illegal range is checked due to the limits handler
|
||||
if (ryw->getTenant().present()) {
|
||||
for (auto iter : ranges) {
|
||||
if (iter->value() == nullptr) {
|
||||
continue;
|
||||
}
|
||||
if (!iter->value()->supportsTenants()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO : workaround to write this two together to make the code compact
|
||||
// The issue here is boost::iterator_range<> doest not provide rbegin(), rend()
|
||||
iter = reverse ? ranges.end() : ranges.begin();
|
||||
|
@ -501,6 +521,9 @@ void SpecialKeySpace::set(ReadYourWritesTransaction* ryw, const KeyRef& key, con
|
|||
.detail("Value", value.toString());
|
||||
throw special_keys_no_write_module_found();
|
||||
}
|
||||
if (!impl->supportsTenants() && ryw->getTenant().present()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
return impl->set(ryw, key, value);
|
||||
}
|
||||
|
||||
|
@ -518,6 +541,9 @@ void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& r
|
|||
TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range);
|
||||
throw special_keys_no_write_module_found();
|
||||
}
|
||||
if (!begin->supportsTenants() && ryw->getTenant().present()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
return begin->clear(ryw, range);
|
||||
}
|
||||
|
||||
|
@ -527,6 +553,9 @@ void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
|
|||
auto impl = writeImpls[key];
|
||||
if (impl == nullptr)
|
||||
throw special_keys_no_write_module_found();
|
||||
if (!impl->supportsTenants() && ryw->getTenant().present()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
return impl->clear(ryw, key);
|
||||
}
|
||||
|
||||
|
@ -614,6 +643,16 @@ ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction*
|
|||
++iter;
|
||||
}
|
||||
state std::vector<SpecialKeyRangeRWImpl*>::const_iterator it;
|
||||
// Check validity of tenant support before iterating through
|
||||
// module ptrs and potentially getting partial commits
|
||||
if (ryw->getTenant().present()) {
|
||||
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it) {
|
||||
if (!(*it)->supportsTenants()) {
|
||||
throw illegal_tenant_access();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it) {
|
||||
Optional<std::string> msg = wait((*it)->commit(ryw));
|
||||
if (msg.present()) {
|
||||
|
|
|
@ -171,7 +171,7 @@ ThreadFuture<bool> ThreadSafeDatabase::unblobbifyRange(const KeyRangeRef& keyRan
|
|||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<bool> {
|
||||
db->checkDeferredError();
|
||||
return db->blobbifyRange(range);
|
||||
return db->unblobbifyRange(range);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef FLOW_BLOB_CIPHER_H
|
||||
#define FLOW_BLOB_CIPHER_H
|
||||
#ifndef FDBCLIENT_BLOB_CIPHER_H
|
||||
#define FDBCLIENT_BLOB_CIPHER_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbrpc/Stats.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
@ -28,6 +29,7 @@
|
|||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/Knobs.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/Platform.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/serialize.h"
|
||||
|
||||
|
@ -50,6 +52,59 @@
|
|||
#define AES_256_KEY_LENGTH 32
|
||||
#define AES_256_IV_LENGTH 16
|
||||
|
||||
class BlobCipherMetrics : public NonCopyable {
|
||||
public:
|
||||
static BlobCipherMetrics* getInstance() {
|
||||
static BlobCipherMetrics* instance = nullptr;
|
||||
if (instance == nullptr) {
|
||||
instance = new BlobCipherMetrics;
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
// Order of this enum has to match initializer of counterSets.
|
||||
enum UsageType : int {
|
||||
TLOG = 0,
|
||||
KV_MEMORY,
|
||||
KV_REDWOOD,
|
||||
BLOB_GRANULE,
|
||||
BACKUP,
|
||||
TEST,
|
||||
MAX,
|
||||
};
|
||||
|
||||
struct CounterSet {
|
||||
Counter encryptCPUTimeNS;
|
||||
Counter decryptCPUTimeNS;
|
||||
LatencySample getCipherKeysLatency;
|
||||
LatencySample getLatestCipherKeysLatency;
|
||||
|
||||
CounterSet(CounterCollection& cc, std::string name);
|
||||
};
|
||||
|
||||
static CounterSet& counters(UsageType t) {
|
||||
ASSERT(t < UsageType::MAX);
|
||||
return getInstance()->counterSets[int(t)];
|
||||
}
|
||||
|
||||
private:
|
||||
BlobCipherMetrics();
|
||||
|
||||
CounterCollection cc;
|
||||
Future<Void> traceFuture;
|
||||
|
||||
public:
|
||||
Counter cipherKeyCacheHit;
|
||||
Counter cipherKeyCacheMiss;
|
||||
Counter cipherKeyCacheExpired;
|
||||
Counter latestCipherKeyCacheHit;
|
||||
Counter latestCipherKeyCacheMiss;
|
||||
Counter latestCipherKeyCacheNeedsRefresh;
|
||||
LatencySample getCipherKeysLatency;
|
||||
LatencySample getLatestCipherKeysLatency;
|
||||
std::array<CounterSet, int(UsageType::MAX)> counterSets;
|
||||
};
|
||||
|
||||
// Encryption operations buffer management
|
||||
// Approach limits number of copies needed during encryption or decryption operations.
|
||||
// For encryption EncryptBuf is allocated using client supplied Arena and provided to AES library to capture
|
||||
|
@ -324,8 +379,7 @@ using BlobCipherKeyIdCacheMapCItr =
|
|||
|
||||
struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> {
|
||||
public:
|
||||
BlobCipherKeyIdCache();
|
||||
explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId);
|
||||
explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId, size_t* sizeStat);
|
||||
|
||||
BlobCipherKeyIdCacheKey getCacheKey(const EncryptCipherBaseKeyId& baseCipherId,
|
||||
const EncryptCipherRandomSalt& salt);
|
||||
|
@ -378,11 +432,15 @@ public:
|
|||
// API returns list of all 'cached' cipherKeys
|
||||
std::vector<Reference<BlobCipherKey>> getAllCipherKeys();
|
||||
|
||||
// Return number of cipher keys in the cahce.
|
||||
size_t getSize() const { return keyIdCache.size(); }
|
||||
|
||||
private:
|
||||
EncryptCipherDomainId domainId;
|
||||
BlobCipherKeyIdCacheMap keyIdCache;
|
||||
Optional<EncryptCipherBaseKeyId> latestBaseCipherKeyId;
|
||||
Optional<EncryptCipherRandomSalt> latestRandomSalt;
|
||||
size_t* sizeStat; // pointer to the outer BlobCipherKeyCache size count.
|
||||
};
|
||||
|
||||
using BlobCipherDomainCacheMap = std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKeyIdCache>>;
|
||||
|
@ -447,10 +505,19 @@ public:
|
|||
|
||||
// API enables dropping all 'cached' cipherKeys for a given encryption domain Id.
|
||||
// Useful to cleanup cache if an encryption domain gets removed/destroyed etc.
|
||||
|
||||
void resetEncryptDomainId(const EncryptCipherDomainId domainId);
|
||||
|
||||
// Total number of cipher keys in the cache.
|
||||
size_t getSize() const { return size; }
|
||||
|
||||
static Reference<BlobCipherKeyCache> getInstance() {
|
||||
static bool cleanupRegistered = false;
|
||||
if (!cleanupRegistered) {
|
||||
// We try to avoid cipher keys appear in core dumps, so we clean them up before crash.
|
||||
// TODO(yiwu): use of MADV_DONTDUMP instead of the crash handler.
|
||||
registerCrashHandlerCallback(BlobCipherKeyCache::cleanup);
|
||||
cleanupRegistered = true;
|
||||
}
|
||||
if (g_network->isSimulated()) {
|
||||
return FlowSingleton<BlobCipherKeyCache>::getInstance(
|
||||
[]() { return makeReference<BlobCipherKeyCache>(g_network->isSimulated()); });
|
||||
|
@ -466,6 +533,7 @@ public:
|
|||
|
||||
private:
|
||||
BlobCipherDomainCacheMap domainCacheMap;
|
||||
size_t size = 0;
|
||||
|
||||
BlobCipherKeyCache() {}
|
||||
};
|
||||
|
@ -483,10 +551,12 @@ public:
|
|||
Reference<BlobCipherKey> hCipherKey,
|
||||
const uint8_t* iv,
|
||||
const int ivLen,
|
||||
const EncryptAuthTokenMode mode);
|
||||
const EncryptAuthTokenMode mode,
|
||||
BlobCipherMetrics::UsageType usageType);
|
||||
EncryptBlobCipherAes265Ctr(Reference<BlobCipherKey> tCipherKey,
|
||||
Reference<BlobCipherKey> hCipherKey,
|
||||
const EncryptAuthTokenMode mode);
|
||||
const EncryptAuthTokenMode mode,
|
||||
BlobCipherMetrics::UsageType usageType);
|
||||
~EncryptBlobCipherAes265Ctr();
|
||||
|
||||
Reference<EncryptBuf> encrypt(const uint8_t* plaintext,
|
||||
|
@ -501,6 +571,7 @@ private:
|
|||
Reference<BlobCipherKey> headerCipherKey;
|
||||
EncryptAuthTokenMode authTokenMode;
|
||||
uint8_t iv[AES_256_IV_LENGTH];
|
||||
BlobCipherMetrics::UsageType usageType;
|
||||
|
||||
void init();
|
||||
};
|
||||
|
@ -512,7 +583,8 @@ class DecryptBlobCipherAes256Ctr final : NonCopyable, public ReferenceCounted<De
|
|||
public:
|
||||
DecryptBlobCipherAes256Ctr(Reference<BlobCipherKey> tCipherKey,
|
||||
Reference<BlobCipherKey> hCipherKey,
|
||||
const uint8_t* iv);
|
||||
const uint8_t* iv,
|
||||
BlobCipherMetrics::UsageType usageType);
|
||||
~DecryptBlobCipherAes256Ctr();
|
||||
|
||||
Reference<EncryptBuf> decrypt(const uint8_t* ciphertext,
|
||||
|
@ -531,6 +603,7 @@ private:
|
|||
Reference<BlobCipherKey> headerCipherKey;
|
||||
bool headerAuthTokenValidationDone;
|
||||
bool authTokensValidationDone;
|
||||
BlobCipherMetrics::UsageType usageType;
|
||||
|
||||
void verifyEncryptHeaderMetadata(const BlobCipherEncryptHeader& header);
|
||||
void verifyAuthTokens(const uint8_t* ciphertext,
|
||||
|
@ -567,4 +640,4 @@ StringRef computeAuthToken(const uint8_t* payload,
|
|||
const int keyLen,
|
||||
Arena& arena);
|
||||
|
||||
#endif // FLOW_BLOB_CIPHER_H
|
||||
#endif // FDBCLIENT_BLOB_CIPHER_H
|
|
@ -22,10 +22,10 @@
|
|||
#define FDBCLIENT_BLOBGRANULECOMMON_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/serialize.h"
|
||||
|
|
|
@ -286,6 +286,9 @@ public:
|
|||
double METACLUSTER_ASSIGNMENT_AVAILABILITY_TIMEOUT;
|
||||
int TENANT_ENTRY_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantEntryCache is refreshed
|
||||
|
||||
// Encryption-at-rest
|
||||
bool ENABLE_ENCRYPTION_CPU_TIME_LOGGING;
|
||||
|
||||
ClientKnobs(Randomize randomize);
|
||||
void initialize(Randomize randomize);
|
||||
};
|
||||
|
|
|
@ -22,10 +22,10 @@
|
|||
#define FLOW_FDBCLIENT_COMMITTRANSACTION_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Tracing.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
|
||||
// The versioned message has wire format : -1, version, messages
|
||||
static const int32_t VERSION_HEADER = -1;
|
||||
|
@ -141,7 +141,8 @@ struct MutationRef {
|
|||
|
||||
MutationRef encrypt(const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
|
||||
const EncryptCipherDomainId& domainId,
|
||||
Arena& arena) const {
|
||||
Arena& arena,
|
||||
BlobCipherMetrics::UsageType usageType) const {
|
||||
ASSERT_NE(domainId, ENCRYPT_INVALID_DOMAIN_ID);
|
||||
auto textCipherItr = cipherKeys.find(domainId);
|
||||
auto headerCipherItr = cipherKeys.find(ENCRYPT_HEADER_DOMAIN_ID);
|
||||
|
@ -155,7 +156,8 @@ struct MutationRef {
|
|||
headerCipherItr->second,
|
||||
iv,
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
usageType);
|
||||
BlobCipherEncryptHeader* header = new (arena) BlobCipherEncryptHeader;
|
||||
StringRef headerRef(reinterpret_cast<const uint8_t*>(header), sizeof(BlobCipherEncryptHeader));
|
||||
StringRef payload =
|
||||
|
@ -164,19 +166,21 @@ struct MutationRef {
|
|||
}
|
||||
|
||||
MutationRef encryptMetadata(const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
|
||||
Arena& arena) const {
|
||||
return encrypt(cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, arena);
|
||||
Arena& arena,
|
||||
BlobCipherMetrics::UsageType usageType) const {
|
||||
return encrypt(cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, arena, usageType);
|
||||
}
|
||||
|
||||
MutationRef decrypt(const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys,
|
||||
Arena& arena,
|
||||
BlobCipherMetrics::UsageType usageType,
|
||||
StringRef* buf = nullptr) const {
|
||||
const BlobCipherEncryptHeader* header = encryptionHeader();
|
||||
auto textCipherItr = cipherKeys.find(header->cipherTextDetails);
|
||||
auto headerCipherItr = cipherKeys.find(header->cipherHeaderDetails);
|
||||
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
|
||||
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
|
||||
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, header->iv);
|
||||
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, header->iv, usageType);
|
||||
StringRef plaintext = cipher.decrypt(param2.begin(), param2.size(), *header, arena)->toStringRef();
|
||||
if (buf != nullptr) {
|
||||
*buf = plaintext;
|
||||
|
|
|
@ -86,6 +86,8 @@ public:
|
|||
std::vector<NetworkAddress> coords;
|
||||
std::vector<Hostname> hostnames;
|
||||
|
||||
size_t getNumberOfCoordinators() const { return coords.size() + hostnames.size(); }
|
||||
|
||||
bool operator==(const ClusterConnectionString& other) const noexcept {
|
||||
return key == other.key && keyDesc == other.keyDesc && coords == other.coords && hostnames == other.hostnames;
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#define FDBCLIENT_EVENTTYPES_ACTOR_G_H
|
||||
#include "fdbclient/EventTypes.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_EVENTTYPES_ACTOR_H)
|
||||
#define FDBCLIENT_EVENTTYPESS_ACTOR_H
|
||||
#define FDBCLIENT_EVENTTYPES_ACTOR_H
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
|
|
@ -24,8 +24,10 @@
|
|||
#elif !defined(FDBCLIENT_GETCIPHERKEYS_ACTOR_H)
|
||||
#define FDBCLIENT_GETCIPHERKEYS_ACTOR_H
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/EncryptKeyProxyInterface.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "fdbrpc/Stats.h"
|
||||
#include "flow/Knobs.h"
|
||||
#include "flow/IRandom.h"
|
||||
|
||||
#include <unordered_map>
|
||||
|
@ -88,7 +90,8 @@ Future<EKPGetLatestBaseCipherKeysReply> getUncachedLatestEncryptCipherKeys(Refer
|
|||
ACTOR template <class T>
|
||||
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getLatestEncryptCipherKeys(
|
||||
Reference<AsyncVar<T> const> db,
|
||||
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainNameRef> domains) {
|
||||
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainNameRef> domains,
|
||||
BlobCipherMetrics::UsageType usageType) {
|
||||
state Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
|
||||
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||
state EKPGetLatestBaseCipherKeysRequest request;
|
||||
|
@ -112,6 +115,7 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getL
|
|||
return cipherKeys;
|
||||
}
|
||||
// Fetch any uncached cipher keys.
|
||||
state double startTime = now();
|
||||
loop choose {
|
||||
when(EKPGetLatestBaseCipherKeysReply reply = wait(getUncachedLatestEncryptCipherKeys(db, request))) {
|
||||
// Insert base cipher keys into cache and construct result.
|
||||
|
@ -140,6 +144,9 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getL
|
|||
// In case encryptKeyProxy has changed, retry the request.
|
||||
when(wait(onEncryptKeyProxyChange(db))) {}
|
||||
}
|
||||
double elapsed = now() - startTime;
|
||||
BlobCipherMetrics::getInstance()->getLatestCipherKeysLatency.addMeasurement(elapsed);
|
||||
BlobCipherMetrics::counters(usageType).getLatestCipherKeysLatency.addMeasurement(elapsed);
|
||||
return cipherKeys;
|
||||
}
|
||||
|
||||
|
@ -178,7 +185,8 @@ using BaseCipherIndex = std::pair<EncryptCipherDomainId, EncryptCipherBaseKeyId>
|
|||
ACTOR template <class T>
|
||||
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> getEncryptCipherKeys(
|
||||
Reference<AsyncVar<T> const> db,
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails) {
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails,
|
||||
BlobCipherMetrics::UsageType usageType) {
|
||||
state Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
|
||||
state std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys;
|
||||
state std::unordered_set<BaseCipherIndex, boost::hash<BaseCipherIndex>> uncachedBaseCipherIds;
|
||||
|
@ -207,6 +215,7 @@ Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> getEncry
|
|||
id.first /*domainId*/, id.second /*baseCipherId*/, StringRef() /*domainName*/, request.arena);
|
||||
}
|
||||
// Fetch any uncached cipher keys.
|
||||
state double startTime = now();
|
||||
loop choose {
|
||||
when(EKPGetBaseCipherKeysByIdsReply reply = wait(getUncachedEncryptCipherKeys(db, request))) {
|
||||
std::unordered_map<BaseCipherIndex, EKPBaseCipherDetails, boost::hash<BaseCipherIndex>> baseCipherKeys;
|
||||
|
@ -242,6 +251,9 @@ Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> getEncry
|
|||
// In case encryptKeyProxy has changed, retry the request.
|
||||
when(wait(onEncryptKeyProxyChange(db))) {}
|
||||
}
|
||||
double elapsed = now() - startTime;
|
||||
BlobCipherMetrics::getInstance()->getCipherKeysLatency.addMeasurement(elapsed);
|
||||
BlobCipherMetrics::counters(usageType).getCipherKeysLatency.addMeasurement(elapsed);
|
||||
return cipherKeys;
|
||||
}
|
||||
|
||||
|
@ -253,12 +265,13 @@ struct TextAndHeaderCipherKeys {
|
|||
ACTOR template <class T>
|
||||
Future<TextAndHeaderCipherKeys> getLatestEncryptCipherKeysForDomain(Reference<AsyncVar<T> const> db,
|
||||
EncryptCipherDomainId domainId,
|
||||
EncryptCipherDomainNameRef domainName) {
|
||||
EncryptCipherDomainNameRef domainName,
|
||||
BlobCipherMetrics::UsageType usageType) {
|
||||
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainNameRef> domains;
|
||||
domains[domainId] = domainName;
|
||||
domains[ENCRYPT_HEADER_DOMAIN_ID] = FDB_DEFAULT_ENCRYPT_DOMAIN_NAME;
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys =
|
||||
wait(getLatestEncryptCipherKeys(db, domains));
|
||||
wait(getLatestEncryptCipherKeys(db, domains, usageType));
|
||||
ASSERT(cipherKeys.count(domainId) > 0);
|
||||
ASSERT(cipherKeys.count(ENCRYPT_HEADER_DOMAIN_ID) > 0);
|
||||
TextAndHeaderCipherKeys result{ cipherKeys.at(domainId), cipherKeys.at(ENCRYPT_HEADER_DOMAIN_ID) };
|
||||
|
@ -268,15 +281,19 @@ Future<TextAndHeaderCipherKeys> getLatestEncryptCipherKeysForDomain(Reference<As
|
|||
}
|
||||
|
||||
template <class T>
|
||||
Future<TextAndHeaderCipherKeys> getLatestSystemEncryptCipherKeys(const Reference<AsyncVar<T> const>& db) {
|
||||
return getLatestEncryptCipherKeysForDomain(db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
|
||||
Future<TextAndHeaderCipherKeys> getLatestSystemEncryptCipherKeys(const Reference<AsyncVar<T> const>& db,
|
||||
BlobCipherMetrics::UsageType usageType) {
|
||||
return getLatestEncryptCipherKeysForDomain(
|
||||
db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME, usageType);
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<TextAndHeaderCipherKeys> getEncryptCipherKeys(Reference<AsyncVar<T> const> db, BlobCipherEncryptHeader header) {
|
||||
Future<TextAndHeaderCipherKeys> getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
|
||||
BlobCipherEncryptHeader header,
|
||||
BlobCipherMetrics::UsageType usageType) {
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails{ header.cipherTextDetails, header.cipherHeaderDetails };
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys =
|
||||
wait(getEncryptCipherKeys(db, cipherDetails));
|
||||
wait(getEncryptCipherKeys(db, cipherDetails, usageType));
|
||||
ASSERT(cipherKeys.count(header.cipherTextDetails) > 0);
|
||||
ASSERT(cipherKeys.count(header.cipherHeaderDetails) > 0);
|
||||
TextAndHeaderCipherKeys result{ cipherKeys.at(header.cipherTextDetails),
|
||||
|
|
|
@ -368,7 +368,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
fdb_error_t (*futureGetDatabase)(FDBFuture* f, FDBDatabase** outDb);
|
||||
fdb_error_t (*futureGetInt64)(FDBFuture* f, int64_t* outValue);
|
||||
fdb_error_t (*futureGetUInt64)(FDBFuture* f, uint64_t* outValue);
|
||||
fdb_error_t (*futureGetBool)(FDBFuture* f, bool* outValue);
|
||||
fdb_error_t (*futureGetBool)(FDBFuture* f, fdb_bool_t* outValue);
|
||||
fdb_error_t (*futureGetError)(FDBFuture* f);
|
||||
fdb_error_t (*futureGetKey)(FDBFuture* f, uint8_t const** outKey, int* outKeyLength);
|
||||
fdb_error_t (*futureGetValue)(FDBFuture* f, fdb_bool_t* outPresent, uint8_t const** outValue, int* outValueLength);
|
||||
|
|
|
@ -60,6 +60,8 @@ public:
|
|||
// TODO : give this function a more descriptive name
|
||||
virtual bool isAsync() const { return false; }
|
||||
|
||||
virtual bool supportsTenants() const { return false; }
|
||||
|
||||
virtual ~SpecialKeyRangeReadImpl() {}
|
||||
|
||||
protected:
|
||||
|
@ -301,6 +303,7 @@ public:
|
|||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
bool supportsTenants() const override { return true; };
|
||||
};
|
||||
|
||||
class ReadConflictRangeImpl : public SpecialKeyRangeReadImpl {
|
||||
|
@ -309,6 +312,7 @@ public:
|
|||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
bool supportsTenants() const override { return true; };
|
||||
};
|
||||
|
||||
class WriteConflictRangeImpl : public SpecialKeyRangeReadImpl {
|
||||
|
@ -317,6 +321,7 @@ public:
|
|||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
bool supportsTenants() const override { return true; };
|
||||
};
|
||||
|
||||
class DDStatsRangeImpl : public SpecialKeyRangeAsyncImpl {
|
||||
|
|
|
@ -168,7 +168,7 @@ private:
|
|||
} else {
|
||||
ASSERT(cipherKeys != nullptr);
|
||||
Arena arena;
|
||||
toCommit->writeTypedMessage(m.encryptMetadata(*cipherKeys, arena));
|
||||
toCommit->writeTypedMessage(m.encryptMetadata(*cipherKeys, arena, BlobCipherMetrics::TLOG));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
|
@ -79,7 +80,7 @@ struct VersionedMessage {
|
|||
// In case the mutation is encrypted, get the decrypted mutation and also update message to point to
|
||||
// the decrypted mutation.
|
||||
// We use dedicated arena for decrypt buffer, as the other arena is used to count towards backup lock bytes.
|
||||
*m = m->decrypt(cipherKeys, decryptArena, &message);
|
||||
*m = m->decrypt(cipherKeys, decryptArena, BlobCipherMetrics::BACKUP, &message);
|
||||
}
|
||||
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;
|
||||
}
|
||||
|
@ -780,7 +781,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self,
|
|||
// Fetch cipher keys if any of the messages are encrypted.
|
||||
if (!cipherDetails.empty()) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
|
||||
wait(getEncryptCipherKeys(self->db, cipherDetails));
|
||||
wait(getEncryptCipherKeys(self->db, cipherDetails, BlobCipherMetrics::BLOB_GRANULE));
|
||||
cipherKeys = getCipherKeysResult;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/BlobGranuleFiles.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
|
@ -44,7 +45,6 @@
|
|||
#include "fdbserver/WaitFailure.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/CompressionUtils.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -360,13 +360,14 @@ ACTOR Future<BlobGranuleCipherKeysCtx> getLatestGranuleCipherKeys(Reference<Blob
|
|||
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainNameRef> domains;
|
||||
domains.emplace(tenantData->entry.id, StringRef(*arena, tenantData->name));
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> domainKeyMap =
|
||||
wait(getLatestEncryptCipherKeys(bwData->dbInfo, domains));
|
||||
wait(getLatestEncryptCipherKeys(bwData->dbInfo, domains, BlobCipherMetrics::BLOB_GRANULE));
|
||||
|
||||
auto domainKeyItr = domainKeyMap.find(tenantData->entry.id);
|
||||
ASSERT(domainKeyItr != domainKeyMap.end());
|
||||
cipherKeysCtx.textCipherKey = BlobGranuleCipherKey::fromBlobCipherKey(domainKeyItr->second, *arena);
|
||||
|
||||
TextAndHeaderCipherKeys systemCipherKeys = wait(getLatestSystemEncryptCipherKeys(bwData->dbInfo));
|
||||
TextAndHeaderCipherKeys systemCipherKeys =
|
||||
wait(getLatestSystemEncryptCipherKeys(bwData->dbInfo, BlobCipherMetrics::BLOB_GRANULE));
|
||||
cipherKeysCtx.headerCipherKey = BlobGranuleCipherKey::fromBlobCipherKey(systemCipherKeys.cipherHeaderKey, *arena);
|
||||
|
||||
cipherKeysCtx.ivRef = makeString(AES_256_IV_LENGTH, *arena);
|
||||
|
@ -392,7 +393,7 @@ ACTOR Future<BlobGranuleCipherKey> lookupCipherKey(Reference<BlobWorkerData> bwD
|
|||
std::unordered_set<BlobCipherDetails> cipherDetailsSet;
|
||||
cipherDetailsSet.emplace(cipherDetails);
|
||||
state std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeyMap =
|
||||
wait(getEncryptCipherKeys(bwData->dbInfo, cipherDetailsSet));
|
||||
wait(getEncryptCipherKeys(bwData->dbInfo, cipherDetailsSet, BlobCipherMetrics::BLOB_GRANULE));
|
||||
|
||||
ASSERT(cipherKeyMap.size() == 1);
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include <tuple>
|
||||
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
@ -50,7 +51,6 @@
|
|||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/Knobs.h"
|
||||
|
@ -932,7 +932,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
encryptDomains[tenantId] = tenantName.get();
|
||||
}
|
||||
}
|
||||
getCipherKeys = getLatestEncryptCipherKeys(pProxyCommitData->db, encryptDomains);
|
||||
getCipherKeys = getLatestEncryptCipherKeys(pProxyCommitData->db, encryptDomains, BlobCipherMetrics::TLOG);
|
||||
}
|
||||
|
||||
self->releaseFuture = releaseResolvingAfter(pProxyCommitData, self->releaseDelay, self->localBatchNumber);
|
||||
|
@ -1167,7 +1167,8 @@ void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef
|
|||
self->toCommit.writeTypedMessage(mutation);
|
||||
} else {
|
||||
Arena arena;
|
||||
self->toCommit.writeTypedMessage(mutation.encrypt(self->cipherKeys, tenantId /*domainId*/, arena));
|
||||
self->toCommit.writeTypedMessage(
|
||||
mutation.encrypt(self->cipherKeys, tenantId /*domainId*/, arena, BlobCipherMetrics::TLOG));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
#include "fdbclient/EncryptKeyProxyInterface.h"
|
||||
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/Stats.h"
|
||||
#include "fdbserver/KmsConnector.h"
|
||||
#include "fdbserver/KmsConnectorInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -223,6 +222,10 @@ public:
|
|||
Counter blobMetadataRefreshed;
|
||||
Counter numBlobMetadataRefreshErrors;
|
||||
|
||||
LatencySample kmsLookupByIdsReqLatency;
|
||||
LatencySample kmsLookupByDomainIdsReqLatency;
|
||||
LatencySample kmsBlobMetadataReqLatency;
|
||||
|
||||
explicit EncryptKeyProxyData(UID id)
|
||||
: myId(id), ekpCacheMetrics("EKPMetrics", myId.toString()),
|
||||
baseCipherKeyIdCacheMisses("EKPCipherIdCacheMisses", ekpCacheMetrics),
|
||||
|
@ -235,7 +238,19 @@ public:
|
|||
blobMetadataCacheHits("EKPBlobMetadataCacheHits", ekpCacheMetrics),
|
||||
blobMetadataCacheMisses("EKPBlobMetadataCacheMisses", ekpCacheMetrics),
|
||||
blobMetadataRefreshed("EKPBlobMetadataRefreshed", ekpCacheMetrics),
|
||||
numBlobMetadataRefreshErrors("EKPBlobMetadataRefreshErrors", ekpCacheMetrics) {}
|
||||
numBlobMetadataRefreshErrors("EKPBlobMetadataRefreshErrors", ekpCacheMetrics),
|
||||
kmsLookupByIdsReqLatency("EKPKmsLookupByIdsReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
kmsLookupByDomainIdsReqLatency("EKPKmsLookupByDomainIdsReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
kmsBlobMetadataReqLatency("EKPKmsBlobMetadataReqLatency",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE) {}
|
||||
|
||||
EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
|
||||
const EncryptCipherDomainId domainId,
|
||||
|
@ -375,7 +390,9 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
|
|||
keysByIdsReq.arena, item.second.domainId, item.second.baseCipherId, item.second.domainName);
|
||||
}
|
||||
keysByIdsReq.debugId = keysByIds.debugId;
|
||||
state double startTime = now();
|
||||
KmsConnLookupEKsByKeyIdsRep keysByIdsRep = wait(kmsConnectorInf.ekLookupByIds.getReply(keysByIdsReq));
|
||||
ekpProxyData->kmsLookupByIdsReqLatency.addMeasurement(now() - startTime);
|
||||
|
||||
for (const auto& item : keysByIdsRep.cipherKeyDetails) {
|
||||
keyIdsReply.baseCipherDetails.emplace_back(
|
||||
|
@ -511,8 +528,10 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
|
|||
}
|
||||
keysByDomainIdReq.debugId = latestKeysReq.debugId;
|
||||
|
||||
state double startTime = now();
|
||||
KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep =
|
||||
wait(kmsConnectorInf.ekLookupByDomainIds.getReply(keysByDomainIdReq));
|
||||
ekpProxyData->kmsLookupByDomainIdsReqLatency.addMeasurement(now() - startTime);
|
||||
|
||||
for (auto& item : keysByDomainIdRep.cipherKeyDetails) {
|
||||
CipherKeyValidityTS validityTS = getCipherKeyValidityTS(item.refreshAfterSec, item.expireAfterSec);
|
||||
|
@ -609,7 +628,9 @@ ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpP
|
|||
}
|
||||
}
|
||||
|
||||
state double startTime = now();
|
||||
KmsConnLookupEKsByDomainIdsRep rep = wait(kmsConnectorInf.ekLookupByDomainIds.getReply(req));
|
||||
ekpProxyData->kmsLookupByDomainIdsReqLatency.addMeasurement(now() - startTime);
|
||||
for (const auto& item : rep.cipherKeyDetails) {
|
||||
const auto itr = ekpProxyData->baseCipherDomainIdCache.find(item.encryptDomainId);
|
||||
if (itr == ekpProxyData->baseCipherDomainIdCache.end()) {
|
||||
|
@ -707,7 +728,9 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
if (!lookupDomains.empty()) {
|
||||
try {
|
||||
KmsConnBlobMetadataReq kmsReq(lookupDomains, req.debugId);
|
||||
state double startTime = now();
|
||||
KmsConnBlobMetadataRep kmsRep = wait(kmsConnectorInf.blobMetadataReq.getReply(kmsReq));
|
||||
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
|
||||
metadataDetails.arena().dependsOn(kmsRep.metadataDetails.arena());
|
||||
|
||||
for (auto& item : kmsRep.metadataDetails) {
|
||||
|
@ -753,7 +776,9 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
|
|||
for (auto& item : ekpProxyData->blobMetadataDomainIdCache) {
|
||||
req.domainIds.emplace_back(item.first);
|
||||
}
|
||||
state double startTime = now();
|
||||
KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req));
|
||||
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
|
||||
for (auto& item : rep.metadataDetails) {
|
||||
ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item);
|
||||
t.detail("BM" + std::to_string(item.domainId), "");
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
|
@ -488,8 +489,10 @@ private:
|
|||
|
||||
ASSERT(cipherKeys.cipherTextKey.isValid());
|
||||
ASSERT(cipherKeys.cipherHeaderKey.isValid());
|
||||
EncryptBlobCipherAes265Ctr cipher(
|
||||
cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
EncryptBlobCipherAes265Ctr cipher(cipherKeys.cipherTextKey,
|
||||
cipherKeys.cipherHeaderKey,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
BlobCipherMetrics::KV_MEMORY);
|
||||
BlobCipherEncryptHeader cipherHeader;
|
||||
Arena arena;
|
||||
StringRef ciphertext =
|
||||
|
@ -527,8 +530,10 @@ private:
|
|||
return data;
|
||||
}
|
||||
state BlobCipherEncryptHeader cipherHeader = *(BlobCipherEncryptHeader*)data.begin();
|
||||
TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(self->db, cipherHeader));
|
||||
DecryptBlobCipherAes256Ctr cipher(cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, cipherHeader.iv);
|
||||
TextAndHeaderCipherKeys cipherKeys =
|
||||
wait(getEncryptCipherKeys(self->db, cipherHeader, BlobCipherMetrics::KV_MEMORY));
|
||||
DecryptBlobCipherAes256Ctr cipher(
|
||||
cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, cipherHeader.iv, BlobCipherMetrics::KV_MEMORY);
|
||||
Arena arena;
|
||||
StringRef plaintext = cipher
|
||||
.decrypt(data.begin() + BlobCipherEncryptHeader::headerSize,
|
||||
|
@ -961,7 +966,8 @@ private:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> updateCipherKeys(KeyValueStoreMemory* self) {
|
||||
TextAndHeaderCipherKeys cipherKeys = wait(getLatestSystemEncryptCipherKeys(self->db));
|
||||
TextAndHeaderCipherKeys cipherKeys =
|
||||
wait(getLatestSystemEncryptCipherKeys(self->db, BlobCipherMetrics::KV_MEMORY));
|
||||
self->cipherKeys = cipherKeys;
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -22,11 +22,11 @@
|
|||
|
||||
#include "fmt/format.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbserver/KmsConnectorInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/OTELSpanContextMessage.h"
|
||||
#include "flow/Arena.h"
|
||||
|
@ -1925,7 +1926,7 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
|||
cipherDetails.insert(header->cipherHeaderDetails);
|
||||
collectingCipherKeys = true;
|
||||
} else {
|
||||
msg = msg.decrypt(cipherKeys.get(), cloneReader.arena());
|
||||
msg = msg.decrypt(cipherKeys.get(), cloneReader.arena(), BlobCipherMetrics::TLOG);
|
||||
}
|
||||
}
|
||||
if (!collectingCipherKeys) {
|
||||
|
@ -1946,7 +1947,7 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
|||
|
||||
if (collectingCipherKeys) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> result =
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails));
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails, BlobCipherMetrics::TLOG));
|
||||
cipherKeys = result;
|
||||
collectingCipherKeys = false;
|
||||
} else {
|
||||
|
@ -2021,7 +2022,7 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
|||
MutationRef msg;
|
||||
reader >> msg;
|
||||
if (msg.isEncrypted()) {
|
||||
msg = msg.decrypt(cipherKeys.get(), reader.arena());
|
||||
msg = msg.decrypt(cipherKeys.get(), reader.arena(), BlobCipherMetrics::TLOG);
|
||||
}
|
||||
|
||||
if (ver != invalidVersion) // This change belongs to a version < minVersion
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include <cstddef>
|
||||
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
|
@ -33,7 +34,6 @@
|
|||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/ProxyCommitData.actor.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
// Resolver's data for applyMetadataMutations() calls.
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_G_H)
|
||||
#define FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_G_H
|
||||
#include "fdbserver/IEncryptionKeyProvider.actor.g.h"
|
||||
|
@ -207,7 +208,8 @@ public:
|
|||
TraceEvent("TenantAwareEncryptionKeyProvider_CipherHeaderMissing");
|
||||
throw encrypt_ops_error();
|
||||
}
|
||||
TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(self->db, key.cipherHeader.get()));
|
||||
TextAndHeaderCipherKeys cipherKeys =
|
||||
wait(getEncryptCipherKeys(self->db, key.cipherHeader.get(), BlobCipherMetrics::KV_REDWOOD));
|
||||
EncryptionKey s = key;
|
||||
s.cipherKeys = cipherKeys;
|
||||
return s;
|
||||
|
@ -218,7 +220,8 @@ public:
|
|||
ACTOR static Future<EncryptionKey> getByRange(TenantAwareEncryptionKeyProvider* self, KeyRef begin, KeyRef end) {
|
||||
EncryptCipherDomainNameRef domainName;
|
||||
EncryptCipherDomainId domainId = self->getEncryptionDomainId(begin, end, &domainName);
|
||||
TextAndHeaderCipherKeys cipherKeys = wait(getLatestEncryptCipherKeysForDomain(self->db, domainId, domainName));
|
||||
TextAndHeaderCipherKeys cipherKeys =
|
||||
wait(getLatestEncryptCipherKeysForDomain(self->db, domainId, domainName, BlobCipherMetrics::KV_REDWOOD));
|
||||
EncryptionKey s;
|
||||
s.cipherKeys = cipherKeys;
|
||||
return s;
|
||||
|
|
|
@ -19,15 +19,16 @@
|
|||
*/
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#ifndef FDBSERVER_IPAGER_H
|
||||
#define FDBSERVER_IPAGER_H
|
||||
#include <cstddef>
|
||||
#include <stdint.h>
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/IEncryptionKeyProvider.actor.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/FastAlloc.h"
|
||||
#include "flow/flow.h"
|
||||
|
@ -341,8 +342,10 @@ public:
|
|||
BlobCipherEncryptHeader header;
|
||||
|
||||
void encode(const TextAndHeaderCipherKeys& cipherKeys, uint8_t* payload, int len) {
|
||||
EncryptBlobCipherAes265Ctr cipher(
|
||||
cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
EncryptBlobCipherAes265Ctr cipher(cipherKeys.cipherTextKey,
|
||||
cipherKeys.cipherHeaderKey,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE,
|
||||
BlobCipherMetrics::KV_REDWOOD);
|
||||
Arena arena;
|
||||
StringRef ciphertext = cipher.encrypt(payload, len, &header, arena)->toStringRef();
|
||||
ASSERT_EQ(len, ciphertext.size());
|
||||
|
@ -350,7 +353,8 @@ public:
|
|||
}
|
||||
|
||||
void decode(const TextAndHeaderCipherKeys& cipherKeys, uint8_t* payload, int len) {
|
||||
DecryptBlobCipherAes256Ctr cipher(cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr cipher(
|
||||
cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, header.iv, BlobCipherMetrics::KV_REDWOOD);
|
||||
Arena arena;
|
||||
StringRef plaintext = cipher.decrypt(payload, len, header, arena)->toStringRef();
|
||||
ASSERT_EQ(len, plaintext.size());
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#define SIM_KMS_CONNECTOR_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbserver/KmsConnector.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
|
||||
class SimKmsConnector : public KmsConnector {
|
||||
public:
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "flow/ApiVersion.h"
|
||||
#include "fmt/format.h"
|
||||
|
@ -8218,7 +8219,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
cipherDetails.insert(header->cipherHeaderDetails);
|
||||
collectingCipherKeys = true;
|
||||
} else {
|
||||
msg = msg.decrypt(cipherKeys.get(), eager.arena);
|
||||
msg = msg.decrypt(cipherKeys.get(), eager.arena, BlobCipherMetrics::TLOG);
|
||||
}
|
||||
}
|
||||
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
|
||||
|
@ -8241,7 +8242,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
|
||||
if (collectingCipherKeys) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails));
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails, BlobCipherMetrics::TLOG));
|
||||
cipherKeys = getCipherKeysResult;
|
||||
collectingCipherKeys = false;
|
||||
eager = UpdateEagerReadInfo();
|
||||
|
@ -8366,7 +8367,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
rd >> msg;
|
||||
if (msg.isEncrypted()) {
|
||||
ASSERT(cipherKeys.present());
|
||||
msg = msg.decrypt(cipherKeys.get(), rd.arena());
|
||||
msg = msg.decrypt(cipherKeys.get(), rd.arena(), BlobCipherMetrics::TLOG);
|
||||
}
|
||||
|
||||
Span span("SS:update"_loc, spanContext);
|
||||
|
|
|
@ -140,6 +140,9 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
if (autoChange) { // if auto, we first get the desired addresses by read \xff\xff/management/auto_coordinators
|
||||
loop {
|
||||
try {
|
||||
// Set RAW_ACCESS to explicitly avoid using tenants because
|
||||
// access to management keys is denied for tenant transactions
|
||||
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
Optional<Value> newCoordinatorsKey = wait(tr.get(
|
||||
LiteralStringRef("auto_coordinators")
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
|
|
|
@ -18,12 +18,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/ITrace.h"
|
||||
|
@ -274,7 +274,8 @@ struct EncryptionOpsWorkload : TestWorkload {
|
|||
BlobCipherEncryptHeader* header) {
|
||||
uint8_t iv[AES_256_IV_LENGTH];
|
||||
deterministicRandom()->randomBytes(&iv[0], AES_256_IV_LENGTH);
|
||||
EncryptBlobCipherAes265Ctr encryptor(textCipherKey, headerCipherKey, &iv[0], AES_256_IV_LENGTH, authMode);
|
||||
EncryptBlobCipherAes265Ctr encryptor(
|
||||
textCipherKey, headerCipherKey, &iv[0], AES_256_IV_LENGTH, authMode, BlobCipherMetrics::TEST);
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
Reference<EncryptBuf> encrypted = encryptor.encrypt(payload, len, header, arena);
|
||||
|
@ -306,7 +307,7 @@ struct EncryptionOpsWorkload : TestWorkload {
|
|||
ASSERT(cipherKey.isValid());
|
||||
ASSERT(cipherKey->isEqual(orgCipherKey));
|
||||
|
||||
DecryptBlobCipherAes256Ctr decryptor(cipherKey, headerCipherKey, header.iv);
|
||||
DecryptBlobCipherAes256Ctr decryptor(cipherKey, headerCipherKey, header.iv, BlobCipherMetrics::TEST);
|
||||
const bool validateHeaderAuthToken = deterministicRandom()->randomInt(0, 100) < 65;
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
|
|
@ -749,6 +749,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
|||
std::make_pair(error_code_tenant_not_found,
|
||||
ExceptionContract::possibleIf(!workload->canUseTenant(tr->getTenant()))),
|
||||
std::make_pair(error_code_invalid_option,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && specialKeys.contains(key))),
|
||||
std::make_pair(error_code_illegal_tenant_access,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && specialKeys.contains(key)))
|
||||
};
|
||||
}
|
||||
|
@ -829,6 +831,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
|||
std::make_pair(error_code_tenant_not_found,
|
||||
ExceptionContract::possibleIf(!workload->canUseTenant(tr->getTenant()))),
|
||||
std::make_pair(error_code_invalid_option,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange)),
|
||||
std::make_pair(error_code_illegal_tenant_access,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange))
|
||||
};
|
||||
}
|
||||
|
@ -875,7 +879,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
|||
std::make_pair(error_code_accessed_unreadable, ExceptionContract::Possible),
|
||||
std::make_pair(error_code_tenant_not_found,
|
||||
ExceptionContract::possibleIf(!workload->canUseTenant(tr->getTenant()))),
|
||||
std::make_pair(error_code_invalid_option,
|
||||
std::make_pair(error_code_illegal_tenant_access,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange))
|
||||
};
|
||||
}
|
||||
|
@ -945,6 +949,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
|||
std::make_pair(error_code_tenant_not_found,
|
||||
ExceptionContract::possibleIf(!workload->canUseTenant(tr->getTenant()))),
|
||||
std::make_pair(error_code_invalid_option,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange)),
|
||||
std::make_pair(error_code_illegal_tenant_access,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange))
|
||||
};
|
||||
}
|
||||
|
@ -1002,6 +1008,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
|||
std::make_pair(error_code_tenant_not_found,
|
||||
ExceptionContract::possibleIf(!workload->canUseTenant(tr->getTenant()))),
|
||||
std::make_pair(error_code_invalid_option,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange)),
|
||||
std::make_pair(error_code_illegal_tenant_access,
|
||||
ExceptionContract::possibleIf(tr->getTenant().present() && isSpecialKeyRange))
|
||||
};
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "fdbclient/Schemas.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/IRandom.h"
|
||||
|
@ -298,8 +299,133 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> testSpecialKeySpaceErrors(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
Database cx = cx_->clone();
|
||||
state Database cx = cx_->clone();
|
||||
try {
|
||||
wait(success(TenantAPI::createTenant(cx.getReference(), TenantName("foo"_sr))));
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_tenant_already_exists || e.code() == error_code_actor_cancelled);
|
||||
}
|
||||
state Reference<ReadYourWritesTransaction> tx = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state Reference<ReadYourWritesTransaction> tenantTx =
|
||||
makeReference<ReadYourWritesTransaction>(cx, TenantName("foo"_sr));
|
||||
// Use new transactions that may use default tenant rather than re-use tx
|
||||
// This is because tx will reject raw access for later tests if default tenant is set
|
||||
state Reference<ReadYourWritesTransaction> defaultTx1 = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state Reference<ReadYourWritesTransaction> defaultTx2 = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state bool disableRyw = deterministicRandom()->coinflip();
|
||||
// tenant transaction accessing modules that do not support tenants
|
||||
// tenant getRange
|
||||
try {
|
||||
wait(success(tenantTx->getRange(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT),
|
||||
CLIENT_KNOBS->TOO_MANY)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
ASSERT(e.code() == error_code_illegal_tenant_access);
|
||||
tenantTx->reset();
|
||||
}
|
||||
// tenant set + commit
|
||||
try {
|
||||
tenantTx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tenantTx->set(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"), ValueRef());
|
||||
wait(tenantTx->commit());
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
ASSERT(e.code() == error_code_illegal_tenant_access);
|
||||
tenantTx->reset();
|
||||
}
|
||||
// tenant clear
|
||||
try {
|
||||
tenantTx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tenantTx->clear(SpecialKeySpace::getManagementApiCommandRange("exclude"));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
ASSERT(e.code() == error_code_illegal_tenant_access);
|
||||
tenantTx->reset();
|
||||
}
|
||||
// tenant check that conflict ranges stay the same after commit
|
||||
// and depending on if RYW is disabled
|
||||
{
|
||||
state RangeResult readresult1;
|
||||
state RangeResult readresult2;
|
||||
state RangeResult writeResult1;
|
||||
state RangeResult writeResult2;
|
||||
try {
|
||||
if (disableRyw) {
|
||||
defaultTx1->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
|
||||
}
|
||||
defaultTx1->addReadConflictRange(singleKeyRange(LiteralStringRef("testKeylll")));
|
||||
defaultTx1->addWriteConflictRange(singleKeyRange(LiteralStringRef("testKeylll")));
|
||||
wait(store(readresult1, defaultTx1->getRange(readConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY)));
|
||||
wait(store(writeResult1, defaultTx1->getRange(writeConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY)));
|
||||
wait(defaultTx1->commit());
|
||||
CODE_PROBE(true, "conflict range tenant commit succeeded");
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
CODE_PROBE(true, "conflict range tenant commit error thrown");
|
||||
}
|
||||
wait(store(readresult2, defaultTx1->getRange(readConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY)));
|
||||
wait(store(writeResult2, defaultTx1->getRange(writeConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY)));
|
||||
ASSERT(readresult1 == readresult2);
|
||||
ASSERT(writeResult1 == writeResult2);
|
||||
defaultTx1->reset();
|
||||
}
|
||||
// proper conflict ranges
|
||||
loop {
|
||||
try {
|
||||
if (disableRyw) {
|
||||
defaultTx1->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
|
||||
defaultTx2->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
|
||||
}
|
||||
defaultTx1->setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
|
||||
defaultTx2->setOption(FDBTransactionOptions::REPORT_CONFLICTING_KEYS);
|
||||
wait(success(defaultTx1->getReadVersion()));
|
||||
wait(success(defaultTx2->getReadVersion()));
|
||||
defaultTx1->addReadConflictRange(singleKeyRange(LiteralStringRef("foo")));
|
||||
defaultTx1->addWriteConflictRange(singleKeyRange(LiteralStringRef("foo")));
|
||||
defaultTx2->addWriteConflictRange(singleKeyRange(LiteralStringRef("foo")));
|
||||
wait(defaultTx2->commit());
|
||||
try {
|
||||
wait(defaultTx1->commit());
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
state Error err = e;
|
||||
if (err.code() != error_code_not_committed) {
|
||||
wait(defaultTx1->onError(err));
|
||||
wait(defaultTx2->onError(err));
|
||||
continue;
|
||||
}
|
||||
// Read conflict ranges of defaultTx1 and check for "foo" with no tenant prefix
|
||||
state RangeResult readConflictRange =
|
||||
wait(defaultTx1->getRange(readConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY));
|
||||
state RangeResult writeConflictRange =
|
||||
wait(defaultTx1->getRange(writeConflictRangeKeysRange, CLIENT_KNOBS->TOO_MANY));
|
||||
state RangeResult conflictKeys =
|
||||
wait(defaultTx1->getRange(conflictingKeysRange, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
// size is 2 because singleKeyRange includes the key after
|
||||
ASSERT(readConflictRange.size() == 2 &&
|
||||
readConflictRange.begin()->key == readConflictRangeKeysRange.begin.withSuffix("foo"_sr));
|
||||
ASSERT(writeConflictRange.size() == 2 &&
|
||||
writeConflictRange.begin()->key == writeConflictRangeKeysRange.begin.withSuffix("foo"_sr));
|
||||
ASSERT(conflictKeys.size() == 2 &&
|
||||
conflictKeys.begin()->key == conflictingKeysRange.begin.withSuffix("foo"_sr));
|
||||
defaultTx1->reset();
|
||||
defaultTx2->reset();
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
wait(defaultTx2->onError(e));
|
||||
}
|
||||
}
|
||||
// begin key outside module range
|
||||
try {
|
||||
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
|
|
@ -296,6 +296,8 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
init( ENCRYPT_KEY_REFRESH_INTERVAL, isSimulated ? 60 : 8 * 60 );
|
||||
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(2, 10); }
|
||||
init( TOKEN_CACHE_SIZE, 100 );
|
||||
init( ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, 5.0 );
|
||||
init( ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE, 1000 );
|
||||
|
||||
// REST Client
|
||||
init( RESTCLIENT_MAX_CONNECTIONPOOL_SIZE, 10 );
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
#include "flow/Arena.h"
|
||||
|
||||
#include "flow/StreamCipher.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/ScopeExit.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -3623,6 +3622,12 @@ void platformInit() {
|
|||
#endif
|
||||
}
|
||||
|
||||
std::vector<std::function<void()>> g_crashHandlerCallbacks;
|
||||
|
||||
void registerCrashHandlerCallback(void (*f)()) {
|
||||
g_crashHandlerCallbacks.push_back(f);
|
||||
}
|
||||
|
||||
// The crashHandler function is registered to handle signals before the process terminates.
|
||||
// Basic information about the crash is printed/traced, and stdout and trace events are flushed.
|
||||
void crashHandler(int sig) {
|
||||
|
@ -3635,7 +3640,10 @@ void crashHandler(int sig) {
|
|||
|
||||
StreamCipherKey::cleanup();
|
||||
StreamCipher::cleanup();
|
||||
BlobCipherKeyCache::cleanup();
|
||||
|
||||
for (auto& f : g_crashHandlerCallbacks) {
|
||||
f();
|
||||
}
|
||||
|
||||
fprintf(error ? stderr : stdout, "SIGNAL: %s (%d)\n", strsignal(sig), sig);
|
||||
if (error) {
|
||||
|
|
|
@ -22,11 +22,11 @@
|
|||
|
||||
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
|
||||
// version.
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_EVENTTYPES_ACTOR_G_H)
|
||||
#define FDBCLIENT_EVENTTYPES_ACTOR_G_H
|
||||
#if defined(NO_INTELLISENSE) && !defined(FLOW_EVENTTYPES_ACTOR_G_H)
|
||||
#define FLOW_EVENTTYPES_ACTOR_G_H
|
||||
#include "flow/EventTypes.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_EVENTTYPES_ACTOR_H)
|
||||
#define FDBCLIENT_EVENTTYPESS_ACTOR_H
|
||||
#elif !defined(FLOW_EVENTTYPES_ACTOR_H)
|
||||
#define FLOW_EVENTTYPES_ACTOR_H
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
|
|
@ -359,6 +359,8 @@ public:
|
|||
// Encryption
|
||||
int64_t ENCRYPT_CIPHER_KEY_CACHE_TTL;
|
||||
int64_t ENCRYPT_KEY_REFRESH_INTERVAL;
|
||||
double ENCRYPT_KEY_CACHE_LOGGING_INTERVAL;
|
||||
double ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE;
|
||||
|
||||
// Authorization
|
||||
int TOKEN_CACHE_SIZE;
|
||||
|
|
|
@ -794,7 +794,14 @@ EXTERNC void flushAndExit(int exitCode);
|
|||
// Initilization code that's run at the beginning of every entry point (except fdbmonitor)
|
||||
void platformInit();
|
||||
|
||||
// Register a callback which will run as part of the crash handler. Use in conjunction with registerCrashHandler.
|
||||
// The callback being added should be simple and unlikely to fail, otherwise it will fail the crash handler,
|
||||
// preventing necessary logging being printed. Also, the crash handler may not be comprehensive in handling all
|
||||
// failure cases.
|
||||
void registerCrashHandlerCallback(void (*f)());
|
||||
|
||||
void registerCrashHandler();
|
||||
|
||||
void setupRunLoopProfiler();
|
||||
EXTERNC void setProfilingEnabled(int enabled);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ PROGRESS_CHECK_TIMEOUT_SEC = 30
|
|||
TESTER_STATS_INTERVAL_SEC = 5
|
||||
TRANSACTION_RETRY_LIMIT = 100
|
||||
RUN_WITH_GDB = False
|
||||
CLEANUP_ON_EXIT = True
|
||||
|
||||
|
||||
def version_from_str(ver_str):
|
||||
|
@ -166,7 +167,8 @@ class UpgradeTest:
|
|||
|
||||
def __exit__(self, xc_type, exc_value, traceback):
|
||||
self.cluster.stop_cluster()
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
if CLEANUP_ON_EXIT:
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
|
||||
# Determine FDB API version matching the upgrade path
|
||||
def determine_api_version(self):
|
||||
|
@ -425,6 +427,11 @@ if __name__ == "__main__":
|
|||
help="Do not dump cluster log on error",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-cleanup-on-error",
|
||||
help="In case of an error do not remove any of the generated files",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument("--blob-granules-enabled", help="Enable blob granules", action="store_true")
|
||||
parser.add_argument("--run-with-gdb", help="Execute the tester binary from gdb", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
@ -450,5 +457,7 @@ if __name__ == "__main__":
|
|||
test.dump_warnings_in_logs()
|
||||
if errcode != 0 and not args.disable_log_dump:
|
||||
test.dump_cluster_logs()
|
||||
if errcode != 0 and args.no_cleanup_on_error:
|
||||
CLEANUP_ON_EXIT = False
|
||||
|
||||
sys.exit(errcode)
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
[configuration]
|
||||
allowDisablingTenants = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SpecialKeySpaceCorrectnessTest'
|
||||
|
||||
|
|
Loading…
Reference in New Issue