Merge branch 'main' of github.com:apple/foundationdb into network-disable-bypass

This commit is contained in:
Jon Fu 2022-09-21 11:33:27 -07:00
commit 0343ca9c53
27 changed files with 935 additions and 393 deletions

1
.gitignore vendored
View File

@ -64,6 +64,7 @@ packaging/msi/obj
simfdb
tests/oldBinaries
trace.*.xml
trace.*.json
.venv
# Editor files

View File

@ -1359,7 +1359,7 @@ else:
except:
# The system python on OS X can't find the library installed to /usr/local/lib if SIP is enabled
# find_library does find the location in /usr/local/lib, so if the above fails fallback to using it
lib_path = ctypes.util.find_library(capi_name)
lib_path = ctypes.util.find_library("fdb_c")
if lib_path is not None:
try:
_capi = ctypes.CDLL(lib_path)

View File

@ -25,6 +25,7 @@ env_set(STATIC_LINK_LIBCXX "${_static_link_libcxx}" BOOL "Statically link libstd
env_set(TRACE_PC_GUARD_INSTRUMENTATION_LIB "" STRING "Path to a library containing an implementation for __sanitizer_cov_trace_pc_guard. See https://clang.llvm.org/docs/SanitizerCoverage.html for more info.")
env_set(PROFILE_INSTR_GENERATE OFF BOOL "If set, build FDB as an instrumentation build to generate profiles")
env_set(PROFILE_INSTR_USE "" STRING "If set, build FDB with profile")
env_set(FULL_DEBUG_SYMBOLS OFF BOOL "Generate full debug symbols")
set(USE_SANITIZER OFF)
if(USE_ASAN OR USE_VALGRIND OR USE_MSAN OR USE_TSAN OR USE_UBSAN)
@ -164,9 +165,20 @@ else()
set(SANITIZER_COMPILE_OPTIONS)
set(SANITIZER_LINK_OPTIONS)
# we always compile with debug symbols. CPack will strip them out
# we always compile with debug symbols. For release builds CPack will strip them out
# and create a debuginfo rpm
add_compile_options(-ggdb -fno-omit-frame-pointer)
add_compile_options(-fno-omit-frame-pointer -gz)
add_link_options(-gz)
if(FDB_RELEASE OR FULL_DEBUG_SYMBOLS OR CMAKE_BUILD_TYPE STREQUAL "Debug")
# Configure with FULL_DEBUG_SYMBOLS=ON to generate all symbols for debugging with gdb
# Also generating full debug symbols in release builds, because they are packaged
# separately and installed optionally
add_compile_options(-ggdb)
else()
# Generating minimal debug symbols by default. They are sufficient for testing purposes
add_compile_options(-ggdb1)
endif()
if(TRACE_PC_GUARD_INSTRUMENTATION_LIB)
add_compile_options(-fsanitize-coverage=trace-pc-guard)
link_libraries(${TRACE_PC_GUARD_INSTRUMENTATION_LIB})

View File

@ -2,10 +2,8 @@ project(awssdk-download NONE)
# Compile the sdk with clang and libc++, since otherwise we get libc++ vs libstdc++ link errors when compiling fdb with clang
set(AWSSDK_COMPILER_FLAGS "")
set(AWSSDK_LINK_FLAGS "")
if(APPLE OR CLANG OR USE_LIBCXX)
if(APPLE OR USE_LIBCXX)
set(AWSSDK_COMPILER_FLAGS -stdlib=libc++ -nostdlib++)
set(AWSSDK_LINK_FLAGS -stdlib=libc++ -lc++abi)
endif()
include(ExternalProject)
@ -21,11 +19,11 @@ ExternalProject_Add(awssdk_project
-DSIMPLE_INSTALL=ON
-DCMAKE_INSTALL_PREFIX=install # need to specify an install prefix so it doesn't install in /usr/lib - FIXME: use absolute path
-DBYO_CRYPTO=ON # we have our own crypto libraries that conflict if we let aws sdk build and link its own
-DBUILD_CURL=ON
-DBUILD_ZLIB=ON
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_EXE_LINKER_FLAGS=${AWSSDK_COMPILER_FLAGS}
-DCMAKE_CXX_FLAGS=${AWSSDK_LINK_FLAGS}
-DCMAKE_CXX_FLAGS=${AWSSDK_COMPILER_FLAGS}
TEST_COMMAND ""
# the sdk build produces a ton of artifacts, with their own dependency tree, so there is a very specific dependency order they must be linked in
BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-cpp-sdk-core.a"
@ -41,6 +39,8 @@ ExternalProject_Add(awssdk_project
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-compression.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-cal.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-common.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/external-install/curl/lib/libcurl.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/external-install/zlib/lib/libz.a"
)
add_library(awssdk_core STATIC IMPORTED)
@ -96,7 +96,15 @@ add_library(awssdk_c_common STATIC IMPORTED)
add_dependencies(awssdk_c_common awssdk_project)
set_target_properties(awssdk_c_common PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-common.a")
add_library(curl STATIC IMPORTED)
add_dependencies(curl awssdk_project)
set_property(TARGET curl PROPERTY IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/external-install/curl/lib/libcurl.a")
add_library(zlib STATIC IMPORTED)
add_dependencies(zlib awssdk_project)
set_property(TARGET zlib PROPERTY IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/external-install/zlib/lib/libz.a")
# link them all together in one interface target
add_library(awssdk_target INTERFACE)
target_include_directories(awssdk_target SYSTEM INTERFACE ${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/include)
target_link_libraries(awssdk_target INTERFACE awssdk_core awssdk_crt awssdk_c_s3 awssdk_c_auth awssdk_c_eventstream awssdk_c_http awssdk_c_mqtt awssdk_c_sdkutils awssdk_c_io awssdk_checksums awssdk_c_compression awssdk_c_cal awssdk_c_common curl)
target_link_libraries(awssdk_target INTERFACE awssdk_core awssdk_crt awssdk_c_s3 awssdk_c_auth awssdk_c_eventstream awssdk_c_http awssdk_c_mqtt awssdk_c_sdkutils awssdk_c_io awssdk_checksums awssdk_c_compression awssdk_c_cal awssdk_c_common zlib curl)

View File

@ -22,6 +22,9 @@
#include <time.h>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BlobCipher.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // has to be last include
@ -253,16 +256,18 @@ std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
void decodeBackupLogValue(Arena& arena,
VectorRef<MutationRef>& result,
int& mutationSize,
StringRef value,
StringRef addPrefix,
StringRef removePrefix,
Version version,
Reference<KeyRangeMap<Version>> key_version) {
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
int* mutationSize,
Standalone<StringRef> value,
Key addPrefix,
Key removePrefix,
Version version,
Reference<KeyRangeMap<Version>> key_version,
Database cx) {
try {
uint64_t offset(0);
state uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
@ -274,36 +279,48 @@ void decodeBackupLogValue(Arena& arena,
throw incompatible_protocol_version();
}
uint32_t totalBytes = 0;
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
state uint32_t consumed = 0;
if (totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
state int originalOffset = offset;
while (consumed < totalBytes) {
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));
MutationRef logValue;
Arena tempArena;
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
state Optional<MutationRef> encryptedLogValue = Optional<MutationRef>();
// Decrypt mutation ref if encrypted
if (logValue.isEncrypted()) {
encryptedLogValue = logValue;
Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::BACKUP));
logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
}
ASSERT(!logValue.isEncrypted());
MutationRef originalLogValue = logValue;
if (logValue.type == MutationRef::ClearRange) {
KeyRangeRef range(logValue.param1, logValue.param2);
@ -320,8 +337,8 @@ void decodeBackupLogValue(Arena& arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena);
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
} else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
@ -333,8 +350,13 @@ void decodeBackupLogValue(Arena& arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
}
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
encryptedResult->push_back_deep(*arena, encryptedLogValue);
} else {
encryptedResult->push_back_deep(*arena, Optional<MutationRef>());
}
}
}
@ -348,8 +370,15 @@ void decodeBackupLogValue(Arena& arena,
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
// If we did not remove/add prefixes to the mutation then keep the original encrypted mutation so we
// do not have to re-encrypt unnecessarily
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
encryptedResult->push_back_deep(*arena, encryptedLogValue);
} else {
encryptedResult->push_back_deep(*arena, Optional<MutationRef>());
}
}
}
@ -374,6 +403,7 @@ void decodeBackupLogValue(Arena& arena,
.detail("Value", value);
throw;
}
return Void();
}
static double lastErrorTime = 0;
@ -614,21 +644,24 @@ ACTOR Future<int> dumpData(Database cx,
state int mutationSize = 0;
loop {
try {
RCGroup group = waitNext(results.getFuture());
state RCGroup group = waitNext(results.getFuture());
lock->release(group.items.expectedSize());
BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena,
req.transaction.mutations,
mutationSize,
bw.toValue(),
addPrefix,
removePrefix,
group.groupKey,
keyVersion);
Standalone<StringRef> value = bw.toValue();
wait(decodeBackupLogValue(&req.arena,
&req.transaction.mutations,
&req.transaction.encryptedMutations,
&mutationSize,
value,
addPrefix,
removePrefix,
group.groupKey,
keyVersion,
cx));
newBeginVersion = group.groupKey + 1;
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
@ -652,8 +685,10 @@ ACTOR Future<int> dumpData(Database cx,
Key rangeEnd = getApplyKey(newBeginVersion, uid);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
// The commit request contains no read conflict ranges, so regardless of what read version we

View File

@ -152,7 +152,7 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
expireAtTS = expireAt;
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "BlobCipher.KeyInit")
TraceEvent(SevDebug, "BlobCipherKeyInit")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
@ -168,10 +168,10 @@ void BlobCipherKey::applyHmacSha256Derivation() {
memcpy(&buf[0], baseCipher.get(), baseCipherLen);
memcpy(&buf[0] + baseCipherLen, &randomSalt, sizeof(EncryptCipherRandomSalt));
HmacSha256DigestGen hmacGen(baseCipher.get(), baseCipherLen);
StringRef digest = hmacGen.digest(&buf[0], baseCipherLen + sizeof(EncryptCipherRandomSalt), arena);
std::copy(digest.begin(), digest.end(), cipher.get());
if (digest.size() < AES_256_KEY_LENGTH) {
memcpy(cipher.get() + digest.size(), buf, AES_256_KEY_LENGTH - digest.size());
unsigned int digestLen =
hmacGen.digest(&buf[0], baseCipherLen + sizeof(EncryptCipherRandomSalt), cipher.get(), AUTH_TOKEN_SIZE);
if (digestLen < AES_256_KEY_LENGTH) {
memcpy(cipher.get() + digestLen, buf, AES_256_KEY_LENGTH - digestLen);
}
}
@ -185,7 +185,7 @@ void BlobCipherKey::reset() {
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId, size_t* sizeStat)
: domainId(dId), latestBaseCipherKeyId(), latestRandomSalt(), sizeStat(sizeStat) {
ASSERT(sizeStat != nullptr);
TraceEvent(SevInfo, "BlobCipher.KeyIdCacheInit").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipherKeyIdCacheInit").detail("DomainId", domainId);
}
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
@ -229,7 +229,7 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
if (latestCipherKey.isValid() && latestCipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(latestCipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "InsertBaseCipherKey_AlreadyPresent")
TraceEvent(SevDebug, "InsertBaseCipherKeyAlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
#endif
@ -237,14 +237,14 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
// Key is already present; nothing more to do.
return latestCipherKey;
} else {
TraceEvent(SevInfo, "BlobCipher.UpdatetBaseCipherKey")
TraceEvent(SevInfo, "BlobCipherUpdatetBaseCipherKey")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
TraceEvent(SevInfo, "BlobCipherKey.InsertBaseCipherKeyLatest")
TraceEvent(SevInfo, "BlobCipherKeyInsertBaseCipherKeyLatest")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("RefreshAt", refreshAt)
@ -279,7 +279,7 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
if (itr != keyIdCache.end()) {
if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "InsertBaseCipherKey_AlreadyPresent")
TraceEvent(SevDebug, "InsertBaseCipherKeyAlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
#endif
@ -287,14 +287,14 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
// Key is already present; nothing more to do.
return itr->second;
} else {
TraceEvent(SevInfo, "BlobCipher.UpdateBaseCipherKey")
TraceEvent(SevInfo, "BlobCipherUpdateBaseCipherKey")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
TraceEvent(SevInfo, "BlobCipherKey.InsertBaseCipherKey")
TraceEvent(SevInfo, "BlobCipherKeyInsertBaseCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("Salt", salt)
@ -351,7 +351,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
cipherKey = keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
}
} catch (Error& e) {
TraceEvent(SevWarn, "BlobCipher.InsertCipherKeyFailed")
TraceEvent(SevWarn, "BlobCipherInsertCipherKeyFailed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw;
@ -387,7 +387,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
}
} catch (Error& e) {
TraceEvent(SevWarn, "BlobCipher.InsertCipherKey_Failed")
TraceEvent(SevWarn, "BlobCipherInsertCipherKey_Failed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId)
.detail("Salt", salt);
@ -398,12 +398,12 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) {
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
TraceEvent(SevWarn, "BlobCipher.GetLatestCipherKeyInvalidID").detail("DomainId", domainId);
TraceEvent(SevWarn, "BlobCipherGetLatestCipherKeyInvalidID").detail("DomainId", domainId);
throw encrypt_invalid_id();
}
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
TraceEvent(SevInfo, "BlobCipher.GetLatestCipherKeyDomainNotFound").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipherGetLatestCipherKeyDomainNotFound").detail("DomainId", domainId);
return Reference<BlobCipherKey>();
}
@ -414,7 +414,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip
if (cipherKey.isValid()) {
if (cipherKey->needsRefresh()) {
#if BLOB_CIPHER_DEBUG
TraceEvent("SevDebug, BlobCipher.GetLatestNeedsRefresh")
TraceEvent("SevDebug, BlobCipherGetLatestNeedsRefresh")
.detail("DomainId", domainId)
.detail("Now", now())
.detail("RefreshAt", cipherKey->getRefreshAtTS());
@ -445,7 +445,7 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDom
if (cipherKey.isValid()) {
if (cipherKey->isExpired()) {
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "BlobCipher.GetCipherExpired")
TraceEvent(SevDebug, "BlobCipherGetCipherExpired")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("Now", now())
@ -472,18 +472,18 @@ void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domain
ASSERT(keyIdCache->getSize() <= size);
size -= keyIdCache->getSize();
keyIdCache->cleanup();
TraceEvent(SevInfo, "BlobCipher.ResetEncryptDomainId").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipherResetEncryptDomainId").detail("DomainId", domainId);
}
void BlobCipherKeyCache::cleanup() noexcept {
Reference<BlobCipherKeyCache> instance = BlobCipherKeyCache::getInstance();
TraceEvent(SevInfo, "BlobCipherKeyCache.Cleanup").log();
TraceEvent(SevInfo, "BlobCipherKeyCacheCleanup").log();
for (auto& domainItr : instance->domainCacheMap) {
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr.second;
keyIdCache->cleanup();
TraceEvent(SevInfo, "BlobCipher.KeyCacheCleanup").detail("DomainId", domainItr.first);
TraceEvent(SevInfo, "BlobCipherKeyCacheCleanup").detail("DomainId", domainItr.first);
}
instance->domainCacheMap.clear();
@ -547,7 +547,6 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
startTime = timer_monotonic();
}
CODE_PROBE(true, "Encrypting data with BlobCipher");
memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader));
@ -561,7 +560,7 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
uint8_t* ciphertext = encryptBuf->begin();
int bytes{ 0 };
if (EVP_EncryptUpdate(ctx, ciphertext, &bytes, plaintext, plaintextLen) != 1) {
TraceEvent(SevWarn, "BlobCipher.EncryptUpdateFailed")
TraceEvent(SevWarn, "BlobCipherEncryptUpdateFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
@ -569,14 +568,14 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
int finalBytes{ 0 };
if (EVP_EncryptFinal_ex(ctx, ciphertext + bytes, &finalBytes) != 1) {
TraceEvent(SevWarn, "BlobCipher.EncryptFinalFailed")
TraceEvent(SevWarn, "BlobCipherEncryptFinalFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
if ((bytes + finalBytes) != plaintextLen) {
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedCipherLen")
TraceEvent(SevWarn, "BlobCipherEncryptUnexpectedCipherLen")
.detail("PlaintextLen", plaintextLen)
.detail("EncryptedBufLen", bytes + finalBytes);
throw encrypt_ops_error();
@ -610,73 +609,41 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
memcpy(&ciphertext[bytes + finalBytes],
reinterpret_cast<const uint8_t*>(header),
sizeof(BlobCipherEncryptHeader));
StringRef authToken = computeAuthToken(ciphertext,
bytes + finalBytes + sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
arena);
memcpy(&header->singleAuthToken.authToken[0], authToken.begin(), AUTH_TOKEN_SIZE);
computeAuthToken(ciphertext,
bytes + finalBytes + sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
&header->singleAuthToken.authToken[0],
AUTH_TOKEN_SIZE);
} else {
ASSERT_EQ(header->flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI);
StringRef cipherTextAuthToken =
computeAuthToken(ciphertext,
bytes + finalBytes,
reinterpret_cast<const uint8_t*>(&header->cipherTextDetails.salt),
sizeof(EncryptCipherRandomSalt),
arena);
memcpy(&header->multiAuthTokens.cipherTextAuthToken[0], cipherTextAuthToken.begin(), AUTH_TOKEN_SIZE);
StringRef headerAuthToken = computeAuthToken(reinterpret_cast<const uint8_t*>(header),
sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
arena);
memcpy(&header->multiAuthTokens.headerAuthToken[0], headerAuthToken.begin(), AUTH_TOKEN_SIZE);
computeAuthToken(ciphertext,
bytes + finalBytes,
reinterpret_cast<const uint8_t*>(&header->cipherTextDetails.salt),
sizeof(EncryptCipherRandomSalt),
&header->multiAuthTokens.cipherTextAuthToken[0],
AUTH_TOKEN_SIZE);
computeAuthToken(reinterpret_cast<const uint8_t*>(header),
sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
&header->multiAuthTokens.headerAuthToken[0],
AUTH_TOKEN_SIZE);
}
}
encryptBuf->setLogicalSize(plaintextLen);
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
BlobCipherMetrics::counters(usageType).encryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
}
CODE_PROBE(true, "Encrypting data with BlobCipher");
return encryptBuf;
}
Standalone<StringRef> EncryptBlobCipherAes265Ctr::encryptBlobGranuleChunk(const uint8_t* plaintext,
const int plaintextLen) {
double startTime = 0.0;
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
startTime = timer_monotonic();
}
Standalone<StringRef> encrypted = makeString(plaintextLen);
uint8_t* ciphertext = mutateString(encrypted);
int bytes{ 0 };
if (EVP_EncryptUpdate(ctx, ciphertext, &bytes, plaintext, plaintextLen) != 1) {
TraceEvent(SevWarn, "BlobCipher.EncryptUpdateFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
int finalBytes{ 0 };
if (EVP_EncryptFinal_ex(ctx, ciphertext + bytes, &finalBytes) != 1) {
TraceEvent(SevWarn, "BlobCipher.EncryptFinalFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
if ((bytes + finalBytes) != plaintextLen) {
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedCipherLen")
.detail("PlaintextLen", plaintextLen)
.detail("EncryptedBufLen", bytes + finalBytes);
throw encrypt_ops_error();
}
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
BlobCipherMetrics::counters(usageType).encryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
}
return encrypted;
}
EncryptBlobCipherAes265Ctr::~EncryptBlobCipherAes265Ctr() {
if (ctx != nullptr) {
EVP_CIPHER_CTX_free(ctx);
@ -716,18 +683,20 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderAuthToken(const BlobCipherEncryptHe
reinterpret_cast<const uint8_t*>(&header),
sizeof(BlobCipherEncryptHeader));
memset(reinterpret_cast<uint8_t*>(&headerCopy.multiAuthTokens.headerAuthToken), 0, AUTH_TOKEN_SIZE);
StringRef computedHeaderAuthToken = computeAuthToken(reinterpret_cast<const uint8_t*>(&headerCopy),
sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
arena);
if (memcmp(&header.multiAuthTokens.headerAuthToken[0], computedHeaderAuthToken.begin(), AUTH_TOKEN_SIZE) != 0) {
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
uint8_t computedHeaderAuthToken[AUTH_TOKEN_SIZE];
computeAuthToken(reinterpret_cast<const uint8_t*>(&headerCopy),
sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
&computedHeaderAuthToken[0],
AUTH_TOKEN_SIZE);
if (memcmp(&header.multiAuthTokens.headerAuthToken[0], &computedHeaderAuthToken[0], AUTH_TOKEN_SIZE) != 0) {
TraceEvent(SevWarn, "BlobCipherVerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("MultiAuthHeaderAuthToken",
StringRef(arena, &header.multiAuthTokens.headerAuthToken[0], AUTH_TOKEN_SIZE).toString())
.detail("ComputedHeaderAuthToken", computedHeaderAuthToken.toString());
.detail("ComputedHeaderAuthToken", StringRef(computedHeaderAuthToken, AUTH_TOKEN_SIZE));
throw encrypt_header_authtoken_mismatch();
}
@ -749,15 +718,20 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderSingleAuthToken(const uint8_t* ciph
BlobCipherEncryptHeader* eHeader = (BlobCipherEncryptHeader*)(&buff[ciphertextLen]);
memset(reinterpret_cast<uint8_t*>(&eHeader->singleAuthToken), 0, 2 * AUTH_TOKEN_SIZE);
StringRef computed = computeAuthToken(
buff, ciphertextLen + sizeof(BlobCipherEncryptHeader), headerCipherKey->rawCipher(), AES_256_KEY_LENGTH, arena);
if (memcmp(&header.singleAuthToken.authToken[0], computed.begin(), AUTH_TOKEN_SIZE) != 0) {
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
uint8_t computed[AUTH_TOKEN_SIZE];
computeAuthToken(buff,
ciphertextLen + sizeof(BlobCipherEncryptHeader),
headerCipherKey->rawCipher(),
AES_256_KEY_LENGTH,
&computed[0],
AUTH_TOKEN_SIZE);
if (memcmp(&header.singleAuthToken.authToken[0], &computed[0], AUTH_TOKEN_SIZE) != 0) {
TraceEvent(SevWarn, "BlobCipherVerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("SingleAuthToken",
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString())
.detail("ComputedSingleAuthToken", computed.toString());
.detail("ComputedSingleAuthToken", StringRef(computed, AUTH_TOKEN_SIZE));
throw encrypt_header_authtoken_mismatch();
}
}
@ -770,20 +744,20 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderMultiAuthToken(const uint8_t* ciphe
if (!headerAuthTokenValidationDone) {
verifyHeaderAuthToken(header, arena);
}
StringRef computedCipherTextAuthToken =
computeAuthToken(ciphertext,
ciphertextLen,
reinterpret_cast<const uint8_t*>(&header.cipherTextDetails.salt),
sizeof(EncryptCipherRandomSalt),
arena);
if (memcmp(&header.multiAuthTokens.cipherTextAuthToken[0], computedCipherTextAuthToken.begin(), AUTH_TOKEN_SIZE) !=
0) {
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
uint8_t computedCipherTextAuthToken[AUTH_TOKEN_SIZE];
computeAuthToken(ciphertext,
ciphertextLen,
reinterpret_cast<const uint8_t*>(&header.cipherTextDetails.salt),
sizeof(EncryptCipherRandomSalt),
&computedCipherTextAuthToken[0],
AUTH_TOKEN_SIZE);
if (memcmp(&header.multiAuthTokens.cipherTextAuthToken[0], &computedCipherTextAuthToken[0], AUTH_TOKEN_SIZE) != 0) {
TraceEvent(SevWarn, "BlobCipherVerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("MultiAuthCipherTextAuthToken",
StringRef(arena, &header.multiAuthTokens.cipherTextAuthToken[0], AUTH_TOKEN_SIZE).toString())
.detail("ComputedCipherTextAuthToken", computedCipherTextAuthToken.toString());
.detail("ComputedCipherTextAuthToken", StringRef(computedCipherTextAuthToken, AUTH_TOKEN_SIZE));
throw encrypt_header_authtoken_mismatch();
}
}
@ -808,7 +782,7 @@ void DecryptBlobCipherAes256Ctr::verifyEncryptHeaderMetadata(const BlobCipherEnc
if (header.flags.headerVersion != EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION ||
header.flags.encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR ||
!isEncryptHeaderAuthTokenModeValid((EncryptAuthTokenMode)header.flags.authTokenMode)) {
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeader")
TraceEvent(SevWarn, "BlobCipherVerifyEncryptBlobHeader")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("ExpectedVersion", EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION)
.detail("EncryptCipherMode", header.flags.encryptMode)
@ -822,8 +796,6 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
const int ciphertextLen,
const BlobCipherEncryptHeader& header,
Arena& arena) {
CODE_PROBE(true, "Decrypting data with BlobCipher");
double startTime = 0.0;
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
startTime = timer_monotonic();
@ -832,7 +804,7 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
verifyEncryptHeaderMetadata(header);
if (header.flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE && !headerCipherKey.isValid()) {
TraceEvent(SevWarn, "BlobCipher.DecryptInvalidHeaderCipherKey")
TraceEvent(SevWarn, "BlobCipherDecryptInvalidHeaderCipherKey")
.detail("AuthTokenMode", header.flags.authTokenMode);
throw encrypt_ops_error();
}
@ -850,7 +822,7 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
uint8_t* plaintext = decrypted->begin();
int bytesDecrypted{ 0 };
if (!EVP_DecryptUpdate(ctx, plaintext, &bytesDecrypted, ciphertext, ciphertextLen)) {
TraceEvent(SevWarn, "BlobCipher.DecryptUpdateFailed")
TraceEvent(SevWarn, "BlobCipherDecryptUpdateFailed")
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId)
.detail("EncryptDomainId", header.cipherTextDetails.encryptDomainId);
throw encrypt_ops_error();
@ -858,23 +830,27 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
int finalBlobBytes{ 0 };
if (EVP_DecryptFinal_ex(ctx, plaintext + bytesDecrypted, &finalBlobBytes) <= 0) {
TraceEvent(SevWarn, "BlobCipher.DecryptFinalFailed")
TraceEvent(SevWarn, "BlobCipherDecryptFinalFailed")
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId)
.detail("EncryptDomainId", header.cipherTextDetails.encryptDomainId);
throw encrypt_ops_error();
}
if ((bytesDecrypted + finalBlobBytes) != ciphertextLen) {
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedPlaintextLen")
TraceEvent(SevWarn, "BlobCipherEncryptUnexpectedPlaintextLen")
.detail("CiphertextLen", ciphertextLen)
.detail("DecryptedBufLen", bytesDecrypted + finalBlobBytes);
throw encrypt_ops_error();
}
decrypted->setLogicalSize(ciphertextLen);
if (CLIENT_KNOBS->ENABLE_ENCRYPTION_CPU_TIME_LOGGING) {
BlobCipherMetrics::counters(usageType).decryptCPUTimeNS += int64_t((timer_monotonic() - startTime) * 1e9);
}
CODE_PROBE(true, "Decrypting data with BlobCipher");
return decrypted;
}
@ -898,32 +874,38 @@ HmacSha256DigestGen::~HmacSha256DigestGen() {
}
}
StringRef HmacSha256DigestGen::digest(const unsigned char* data, size_t len, Arena& arena) {
CODE_PROBE(true, "Digest generation");
unsigned int digestLen = HMAC_size(ctx);
auto digest = new (arena) unsigned char[digestLen];
unsigned int HmacSha256DigestGen::digest(const unsigned char* data,
size_t len,
unsigned char* buf,
unsigned int bufLen) {
ASSERT_EQ(bufLen, HMAC_size(ctx));
if (HMAC_Update(ctx, data, len) != 1) {
throw encrypt_ops_error();
}
if (HMAC_Final(ctx, digest, &digestLen) != 1) {
unsigned int digestLen = 0;
if (HMAC_Final(ctx, buf, &digestLen) != 1) {
throw encrypt_ops_error();
}
return StringRef(arena, digest, digestLen);
CODE_PROBE(true, "Digest generation");
return digestLen;
}
StringRef computeAuthToken(const uint8_t* payload,
const int payloadLen,
const uint8_t* key,
const int keyLen,
Arena& arena) {
CODE_PROBE(true, "Auth token generation");
void computeAuthToken(const uint8_t* payload,
const int payloadLen,
const uint8_t* key,
const int keyLen,
unsigned char* digestBuf,
unsigned int digestBufSz) {
HmacSha256DigestGen hmacGenerator(key, keyLen);
StringRef digest = hmacGenerator.digest(payload, payloadLen, arena);
unsigned int digestLen = hmacGenerator.digest(payload, payloadLen, digestBuf, digestBufSz);
ASSERT_GE(digest.size(), AUTH_TOKEN_SIZE);
return digest;
ASSERT_EQ(digestLen, digestBufSz);
CODE_PROBE(true, "Auth token generation");
}
// Only used to link unit tests
@ -941,7 +923,7 @@ void forceLinkBlobCipherTests() {}
// 6.1 cleanup cipherKeys by given encryptDomainId
// 6.2. Cleanup all cached cipherKeys
TEST_CASE("flow/BlobCipher") {
TraceEvent("BlobCipherTest.Start").log();
TraceEvent("BlobCipherTestStart").log();
// Construct a dummy External Key Manager representation and populate with some keys
class BaseCipher : public ReferenceCounted<BaseCipher>, NonCopyable {
@ -985,7 +967,7 @@ TEST_CASE("flow/BlobCipher") {
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
// validate getLatestCipherKey return empty when there's no cipher key
TraceEvent("BlobCipherTest.LatestKeyNotExists").log();
TraceEvent("BlobCipherTestLatestKeyNotExists").log();
Reference<BlobCipherKey> latestKeyNonexists =
cipherKeyCache->getLatestCipherKey(deterministicRandom()->randomInt(minDomainId, maxDomainId));
ASSERT(!latestKeyNonexists.isValid());
@ -997,7 +979,7 @@ TEST_CASE("flow/BlobCipher") {
}
// insert BlobCipher keys into BlobCipherKeyCache map and validate
TraceEvent("BlobCipherTest_InsertKeys").log();
TraceEvent("BlobCipherTestInsertKeys").log();
for (auto& domainItr : domainKeyMap) {
for (auto& baseKeyItr : domainItr.second) {
Reference<BaseCipher> baseCipher = baseKeyItr.second;
@ -1022,7 +1004,7 @@ TEST_CASE("flow/BlobCipher") {
headerBaseCipher->refreshAt,
headerBaseCipher->expireAt);
TraceEvent("BlobCipherTest.InsertKeysDone").log();
TraceEvent("BlobCipherTestInsertKeysDone").log();
// validate the cipherKey lookups work as desired
for (auto& domainItr : domainKeyMap) {
@ -1041,7 +1023,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_NE(std::memcmp(cipherKey->rawCipher(), baseCipher->key.get(), cipherKey->getBaseCipherLen()), 0);
}
}
TraceEvent("BlobCipherTest.LooksupDone").log();
TraceEvent("BlobCipherTestLooksupDone").log();
// Ensure attemtping to insert existing cipherKey (identical) more than once is treated as a NOP
try {
@ -1055,7 +1037,7 @@ TEST_CASE("flow/BlobCipher") {
} catch (Error& e) {
throw;
}
TraceEvent("BlobCipherTest.ReinsertIdempotentKeyDone").log();
TraceEvent("BlobCipherTestReinsertIdempotentKeyDone").log();
// Ensure attemtping to insert an existing cipherKey (modified) fails with appropriate error
try {
@ -1077,7 +1059,7 @@ TEST_CASE("flow/BlobCipher") {
throw;
}
}
TraceEvent("BlobCipherTest.ReinsertNonIdempotentKeyDone").log();
TraceEvent("BlobCipherTestReinsertNonIdempotentKeyDone").log();
// Validate Encryption ops
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(minDomainId);
@ -1093,7 +1075,7 @@ TEST_CASE("flow/BlobCipher") {
BlobCipherEncryptHeader headerCopy;
// validate basic encrypt followed by decrypt operation for AUTH_MODE_NONE
{
TraceEvent("NoneAuthMode.Start").log();
TraceEvent("NoneAuthModeStart").log();
EncryptBlobCipherAes265Ctr encryptor(cipherKey,
Reference<BlobCipherKey>(),
@ -1110,7 +1092,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE);
TraceEvent("BlobCipherTest.EncryptDone")
TraceEvent("BlobCipherTestEncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -1127,7 +1109,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest.DecryptDone").log();
TraceEvent("BlobCipherTestDecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
memcpy(reinterpret_cast<uint8_t*>(&headerCopy),
@ -1200,7 +1182,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
TraceEvent("BlobCipherTest.EncryptDone")
TraceEvent("BlobCipherTestEncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -1221,7 +1203,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest.DecryptDone").log();
TraceEvent("BlobCipherTestDecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
@ -1287,12 +1269,12 @@ TEST_CASE("flow/BlobCipher") {
}
}
TraceEvent("SingleAuthMode.Done").log();
TraceEvent("SingleAuthModeDone").log();
}
// validate basic encrypt followed by decrypt operation for AUTH_TOKEN_MODE_MULTI
{
TraceEvent("MultiAuthMode.Start").log();
TraceEvent("MultiAuthModeStart").log();
EncryptBlobCipherAes265Ctr encryptor(cipherKey,
headerCipherKey,
@ -1309,7 +1291,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI);
TraceEvent("BlobCipherTest.EncryptDone")
TraceEvent("BlobCipherTestEncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -1331,7 +1313,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest.DecryptDone").log();
TraceEvent("BlobCipherTestDecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
@ -1413,7 +1395,7 @@ TEST_CASE("flow/BlobCipher") {
}
}
TraceEvent("MultiAuthMode.Done").log();
TraceEvent("MultiAuthModeDone").log();
}
// Validate dropping encryptDomainId cached keys
@ -1429,6 +1411,6 @@ TEST_CASE("flow/BlobCipher") {
ASSERT(cachedKeys.empty());
}
TraceEvent("BlobCipherTest.Done").log();
TraceEvent("BlobCipherTestDone").log();
return Void();
}

View File

@ -3328,6 +3328,14 @@ bool AccumulatedMutations::matchesAnyRange(const std::vector<KeyRange>& ranges)
std::vector<MutationRef> mutations = decodeMutationLogValue(serializedMutations);
for (auto& m : mutations) {
for (auto& r : ranges) {
if (m.type == MutationRef::Encrypted) {
// TODO: In order to filter out encrypted mutations that are not relevant to the
// target range, they would have to be decrypted here in order to check relevance
// below, however the staged mutations would still need to remain encrypted for
// staging into the destination database. Without decrypting, we must assume that
// some data could match the range and return true here.
return true;
}
if (m.type == MutationRef::ClearRange) {
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
return true;

View File

@ -728,8 +728,8 @@ ACTOR Future<RangeResult> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw,
loop {
try {
auto keys = kr.removePrefix(ddStatsRange.begin);
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix = wait(
waitDataDistributionMetricsList(ryw->getDatabase(), keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT));
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
wait(waitDataDistributionMetricsList(ryw->getDatabase(), keys, CLIENT_KNOBS->TOO_MANY));
RangeResult result;
for (const auto& ddMetricsRef : resultWithoutPrefix) {
// each begin key is the previous end key, thus we only encode the begin key in the result

View File

@ -116,7 +116,7 @@ class EncryptBuf : public ReferenceCounted<EncryptBuf>, NonCopyable {
public:
EncryptBuf(int size, Arena& arena) : allocSize(size), logicalSize(size) {
if (size > 0) {
buffer = new (arena) uint8_t[size];
buffer = new (arena) uint8_t[size]();
} else {
buffer = nullptr;
}
@ -563,7 +563,6 @@ public:
const int plaintextLen,
BlobCipherEncryptHeader* header,
Arena&);
Standalone<StringRef> encryptBlobGranuleChunk(const uint8_t* plaintext, const int plaintextLen);
private:
EVP_CIPHER_CTX* ctx;
@ -628,16 +627,17 @@ public:
HmacSha256DigestGen(const unsigned char* key, size_t len);
~HmacSha256DigestGen();
HMAC_CTX* getCtx() const { return ctx; }
StringRef digest(unsigned char const* data, size_t len, Arena&);
unsigned int digest(unsigned char const* data, size_t len, unsigned char* buf, unsigned int bufLen);
private:
HMAC_CTX* ctx;
};
StringRef computeAuthToken(const uint8_t* payload,
const int payloadLen,
const uint8_t* key,
const int keyLen,
Arena& arena);
void computeAuthToken(const uint8_t* payload,
const int payloadLen,
const uint8_t* key,
const int keyLen,
unsigned char* digestBuf,
unsigned int digestBufSz);
#endif // FDBCLIENT_BLOB_CIPHER_H

View File

@ -24,6 +24,7 @@
#include "fdbclient/BlobCipher.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Tracing.h"
@ -171,6 +172,22 @@ struct MutationRef {
return encrypt(cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, arena, usageType);
}
MutationRef decrypt(TextAndHeaderCipherKeys cipherKeys,
Arena& arena,
BlobCipherMetrics::UsageType usageType,
StringRef* buf = nullptr) const {
const BlobCipherEncryptHeader* header = encryptionHeader();
DecryptBlobCipherAes256Ctr cipher(cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, header->iv, usageType);
StringRef plaintext = cipher.decrypt(param2.begin(), param2.size(), *header, arena)->toStringRef();
if (buf != nullptr) {
*buf = plaintext;
}
ArenaReader reader(arena, plaintext, AssumeVersion(ProtocolVersion::withEncryptionAtRest()));
MutationRef mutation;
reader >> mutation;
return mutation;
}
MutationRef decrypt(const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys,
Arena& arena,
BlobCipherMetrics::UsageType usageType,
@ -180,15 +197,10 @@ struct MutationRef {
auto headerCipherItr = cipherKeys.find(header->cipherHeaderDetails);
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, header->iv, usageType);
StringRef plaintext = cipher.decrypt(param2.begin(), param2.size(), *header, arena)->toStringRef();
if (buf != nullptr) {
*buf = plaintext;
}
ArenaReader reader(arena, plaintext, AssumeVersion(ProtocolVersion::withEncryptionAtRest()));
MutationRef mutation;
reader >> mutation;
return mutation;
TextAndHeaderCipherKeys textAndHeaderKeys;
textAndHeaderKeys.cipherHeaderKey = headerCipherItr->second;
textAndHeaderKeys.cipherTextKey = textCipherItr->second;
return decrypt(textAndHeaderKeys, arena, usageType, buf);
}
// These masks define which mutation types have particular properties (they are used to implement
@ -253,6 +265,11 @@ struct CommitTransactionRef {
VectorRef<KeyRangeRef> read_conflict_ranges;
VectorRef<KeyRangeRef> write_conflict_ranges;
VectorRef<MutationRef> mutations; // metadata mutations
// encryptedMutations should be a 1-1 corespondence with mutations field above. That is either
// encryptedMutations.size() == 0 or encryptedMutations.size() == mutations.size() and encryptedMutations[i] =
// mutations[i].encrypt(). Currently this field is not serialized so clients should NOT set this field during a
// usual commit path. It is currently only used during backup mutation log restores.
VectorRef<Optional<MutationRef>> encryptedMutations;
Version read_snapshot = 0;
bool report_conflicting_keys = false;
bool lock_aware = false; // set when metadata mutations are present

View File

@ -150,6 +150,21 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getL
return cipherKeys;
}
// Get latest cipher key for given a encryption domain. It tries to get the cipher key from the local cache.
// In case of cache miss, it fetches the cipher key from EncryptKeyProxy and put the result in the local cache
// before return.
ACTOR template <class T>
Future<Reference<BlobCipherKey>> getLatestEncryptCipherKey(Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
EncryptCipherDomainName domainName,
BlobCipherMetrics::UsageType usageType) {
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> domains({ { domainId, domainName } });
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKey =
wait(getLatestEncryptCipherKeys(db, domains, usageType));
return cipherKey.at(domainId);
}
ACTOR template <class T>
Future<EKPGetBaseCipherKeysByIdsReply> getUncachedEncryptCipherKeys(Reference<AsyncVar<T> const> db,
EKPGetBaseCipherKeysByIdsRequest request) {

View File

@ -50,6 +50,7 @@
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
@ -61,6 +62,7 @@
#include "fdbclient/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
@ -1009,10 +1011,17 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
ASSERT(tenantName.present());
encryptDomains[tenantId] = Standalone(tenantName.get(), tenantInfo.arena);
} else {
for (auto m : trs[t].transaction.mutations) {
std::pair<EncryptCipherDomainName, int64_t> details =
getEncryptDetailsFromMutationRef(pProxyCommitData, m);
encryptDomains[details.second] = details.first;
// Optimization: avoid enumerating mutations if cluster only serves default encryption domains
if (pProxyCommitData->tenantMap.size() > 0) {
for (auto m : trs[t].transaction.mutations) {
std::pair<EncryptCipherDomainName, int64_t> details =
getEncryptDetailsFromMutationRef(pProxyCommitData, m);
encryptDomains[details.second] = details.first;
}
} else {
// Ensure default encryption domain-ids are present.
ASSERT_EQ(encryptDomains.count(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID), 1);
ASSERT_EQ(encryptDomains.count(FDB_DEFAULT_ENCRYPT_DOMAIN_ID), 1);
}
}
}
@ -1240,25 +1249,54 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
return Void();
}
void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef& mutation) {
ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
int64_t tenantId,
const MutationRef* mutation,
Optional<MutationRef>* encryptedMutationOpt,
Arena* arena) {
static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID);
if (self->pProxyCommitData->isEncryptionEnabled) {
EncryptCipherDomainId domainId = tenantId;
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, mutation);
domainId = p.second;
state EncryptCipherDomainId domainId = tenantId;
state MutationRef encryptedMutation;
CODE_PROBE(true, "Raw access mutation encryption");
if (encryptedMutationOpt->present()) {
CODE_PROBE(true, "using already encrypted mutation");
encryptedMutation = encryptedMutationOpt->get();
ASSERT(encryptedMutation.isEncrypted());
// During simulation check whether the encrypted mutation matches the decrpyted mutation
if (g_network && g_network->isSimulated()) {
Reference<AsyncVar<ServerDBInfo> const> dbInfo = self->pProxyCommitData->db;
state const BlobCipherEncryptHeader* header = encryptedMutation.encryptionHeader();
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG));
MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 &&
decryptedMutation.type == mutation->type);
}
} else {
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, *mutation);
domainId = p.second;
if (self->cipherKeys.find(domainId) == self->cipherKeys.end()) {
Reference<BlobCipherKey> cipherKey = wait(getLatestEncryptCipherKey(
self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG));
self->cipherKeys[domainId] = cipherKey;
}
CODE_PROBE(true, "Raw access mutation encryption");
}
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
}
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
Arena arena;
self->toCommit.writeTypedMessage(mutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::TLOG));
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;
} else {
self->toCommit.writeTypedMessage(mutation);
self->toCommit.writeTypedMessage(*mutation);
return *mutation;
}
}
@ -1278,6 +1316,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
state VectorRef<Optional<MutationRef>>* encryptedMutations =
&trs[self->transactionNum].transaction.encryptedMutations;
ASSERT(encryptedMutations->size() == 0 || encryptedMutations->size() == pMutations->size());
state int64_t tenantId = trs[self->transactionNum].tenantInfo.tenantId;
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
@ -1292,13 +1333,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
}
auto& m = (*pMutations)[mutationNum];
state MutationRef m = (*pMutations)[mutationNum];
state Optional<MutationRef> encryptedMutation =
encryptedMutations->size() > 0 ? (*encryptedMutations)[mutationNum] : Optional<MutationRef>();
state Arena arena;
state MutationRef writtenMutation;
self->mutationCount++;
self->mutationBytes += m.expectedSize();
self->yieldBytes += m.expectedSize();
ASSERT(!m.isEncrypted());
// Determine the set of tags (responsible storage servers) for the mutation, splitting it
// if necessary. Serialize (splits of) the mutation into the message buffer and add the tags.
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
auto& tags = pProxyCommitData->tagsForKey(m.param1);
@ -1336,7 +1381,11 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
writeMutation(self, tenantId, m);
if (encryptedMutation.present()) {
ASSERT(encryptedMutation.get().isEncrypted());
}
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
@ -1389,7 +1438,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
writeMutation(self, tenantId, m);
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
} else {
UNREACHABLE();
}
@ -1403,7 +1453,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (m.type != MutationRef::Type::ClearRange) {
// Add the mutation to the relevant backup tag
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, m);
// If encryption is enabled make sure the mutation we are writing is also encrypted
ASSERT(!self->pProxyCommitData->isEncryptionEnabled || writtenMutation.isEncrypted());
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, writtenMutation);
}
} else {
KeyRangeRef mutationRange(m.param1, m.param2);
@ -1421,6 +1473,21 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
MutationRef backupMutation(
MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
// changed to account for clear ranges which span tenant boundaries
if (self->pProxyCommitData->isEncryptionEnabled) {
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
encryptedMutation.present()) {
backupMutation = encryptedMutation.get();
} else {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, backupMutation);
EncryptCipherDomainId domainId = p.second;
backupMutation =
backupMutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::BACKUP);
}
}
// Add the mutation to the relevant backup tag
for (auto backupName : backupRange.value()) {
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena,
@ -1514,9 +1581,9 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
pProxyCommitData->stats.mutations += self->mutationCount;
pProxyCommitData->stats.mutationBytes += self->mutationBytes;
// Storage servers mustn't make durable versions which are not fully committed (because then they are impossible to
// roll back) We prevent this by limiting the number of versions which are semi-committed but not fully committed to
// be less than the MVCC window
// Storage servers mustn't make durable versions which are not fully committed (because then they are impossible
// to roll back) We prevent this by limiting the number of versions which are semi-committed but not fully
// committed to be less than the MVCC window
if (pProxyCommitData->committedVersion.get() <
self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
self->computeDuration += g_network->timer() - self->computeStart;
@ -1715,8 +1782,8 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
// After logging finishes, we report the commit version to master so that every other proxy can get the most
// up-to-date live committed version. We also maintain the invariant that master's committed version >=
// self->committedVersion by reporting commit version first before updating self->committedVersion. Otherwise, a
// client may get a commit version that the master is not aware of, and next GRV request may get a version less than
// self->committedVersion.
// client may get a commit version that the master is not aware of, and next GRV request may get a version less
// than self->committedVersion.
CODE_PROBE(pProxyCommitData->committedVersion.get() > self->commitVersion,
"later version was reported committed first");
@ -1775,11 +1842,11 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
for (int resolverInd : self->transactionResolverMap[t]) {
auto const& cKRs =
self->resolution[resolverInd]
.conflictingKeyRangeMap[self->nextTr[resolverInd]]; // nextTr[resolverInd] -> index of this
// trs[t] on the resolver
.conflictingKeyRangeMap[self->nextTr[resolverInd]]; // nextTr[resolverInd] -> index of
// this trs[t] on the resolver
for (auto const& rCRIndex : cKRs)
// read_conflict_range can change when sent to resolvers, mapping the index from resolver-side
// to original index in commitTransactionRef
// read_conflict_range can change when sent to resolvers, mapping the index from
// resolver-side to original index in commitTransactionRef
conflictingKRIndices.push_back(conflictingKRIndices.arena(),
self->txReadConflictRangeIndexMap[t][resolverInd][rCRIndex]);
}

View File

@ -21,9 +21,12 @@
// This file implements the functions and actors used by the RestoreLoader role.
// The RestoreLoader role starts with the restoreLoaderCore actor
#include "fdbclient/BlobCipher.h"
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/MutationTracking.h"
@ -44,17 +47,19 @@ void splitMutation(const KeyRangeMap<UID>& krMap,
VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena,
VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
const RestoreAsset& asset);
ACTOR Future<Void> _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
RestoreAsset asset,
Database cx);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, Database cx);
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
Reference<RestoreLoaderData> self,
Database cx);
ACTOR Future<Void> sendMutationsToApplier(
std::priority_queue<RestoreLoaderSchedSendLoadParamRequest>* sendLoadParamQueue,
std::map<int, int>* inflightSendLoadParamReqs,
@ -64,7 +69,8 @@ ACTOR Future<Void> sendMutationsToApplier(
RestoreAsset asset,
bool isRangeFile,
std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
std::map<UID, RestoreApplierInterface>* pApplierInterfaces,
Database cx);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
Reference<IBackupContainer> bc,
@ -85,7 +91,7 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
// Dispatch requests based on node's business (i.e, cpu usage for now) and requests' priorities
// Requests for earlier version batches are preferred; which is equivalent to
// sendMuttionsRequests are preferred than loadingFileRequests
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self, Database cx) {
try {
state int curVBInflightReqs = 0;
state int sendLoadParams = 0;
@ -139,7 +145,7 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
// Dispatch the request if it is the next version batch to process or if cpu usage is low
if (req.batchIndex - 1 == self->finishedSendingVB ||
self->cpuUsage < SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
self->addActor.send(handleSendMutationsRequest(req, self));
self->addActor.send(handleSendMutationsRequest(req, self, cx));
self->sendingQueue.pop();
}
}
@ -204,7 +210,7 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
self->loadingQueue.pop();
ASSERT(false); // Check if this ever happens easily
} else {
self->addActor.send(handleLoadFileRequest(req, self));
self->addActor.send(handleLoadFileRequest(req, self, cx));
self->loadingQueue.pop();
lastLoadReqs++;
}
@ -244,7 +250,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf,
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreLoader"));
self->addActor.send(dispatchRequests(self));
self->addActor.send(dispatchRequests(self, cx));
loop {
state std::string requestTypeStr = "[Init]";
@ -361,6 +367,18 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
req.reply.send(RestoreCommonReply(self->id()));
}
ACTOR static Future<MutationRef> _decryptMutation(MutationRef mutation, Database cx, Arena* arena) {
ASSERT(mutation.isEncrypted());
Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
state const BlobCipherEncryptHeader* header = mutation.encryptionHeader();
std::unordered_set<BlobCipherDetails> cipherDetails;
cipherDetails.insert(header->cipherHeaderDetails);
cipherDetails.insert(header->cipherTextDetails);
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
wait(getEncryptCipherKeys(dbInfo, cipherDetails, BlobCipherMetrics::BACKUP));
return mutation.decrypt(getCipherKeysResult, *arena, BlobCipherMetrics::BACKUP);
}
// Parse a data block in a partitioned mutation log file and store mutations
// into "kvOpsIter" and samples into "samplesIter".
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
@ -370,7 +388,8 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
Reference<IBackupContainer> bc,
RestoreAsset asset) {
RestoreAsset asset,
Database cx) {
state Standalone<StringRef> buf = makeString(asset.len);
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
@ -389,21 +408,21 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
wait(processedFileOffset->whenAtLeast(asset.offset));
ASSERT(processedFileOffset->get() == asset.offset);
Arena tempArena;
StringRefReader reader(buf, restore_corrupted_data());
state Arena tempArena;
state StringRefReader reader(buf, restore_corrupted_data());
try {
// Read block header
if (reader.consume<int32_t>() != PARTITIONED_MLOG_VERSION)
throw restore_unsupported_file_version();
VersionedMutationsMap& kvOps = kvOpsIter->second;
state VersionedMutationsMap* kvOps = &kvOpsIter->second;
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF)
break;
// Deserialize messages written in saveMutationsToFile().
LogMessageVersion msgVersion;
state LogMessageVersion msgVersion;
msgVersion.version = reader.consumeNetworkUInt64();
msgVersion.sub = reader.consumeNetworkUInt32();
int msgSize = reader.consumeNetworkInt32();
@ -413,19 +432,20 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
if (!asset.isInVersionRange(msgVersion.version))
continue;
VersionedMutationsMap::iterator it;
state VersionedMutationsMap::iterator it;
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
std::tie(it, inserted) = kvOps->emplace(msgVersion, MutationsVec());
// A clear mutation can be split into multiple mutations with the same (version, sub).
// See saveMutationsToFile(). Current tests only use one key range per backup, thus
// only one clear mutation is generated (i.e., always inserted).
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
MutationRef mutation;
state MutationRef mutation;
rd >> mutation;
if (mutation.isEncrypted()) {
throw encrypt_unsupported();
MutationRef decryptedMutation = wait(_decryptMutation(mutation, cx, &tempArena));
mutation = decryptedMutation;
}
// Skip mutation whose commitVesion < range kv's version
@ -500,12 +520,13 @@ ACTOR static Future<Void> parsePartitionedLogFileOnLoader(
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
Reference<IBackupContainer> bc,
RestoreAsset asset) {
RestoreAsset asset,
Database cx) {
state int readFileRetries = 0;
loop {
try {
wait(_parsePartitionedLogFileOnLoader(
pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc, asset));
pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc, asset, cx));
break;
} catch (Error& e) {
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
@ -532,7 +553,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
LoadingParam param,
Reference<LoaderBatchData> batchData,
UID loaderID,
Reference<IBackupContainer> bc) {
Reference<IBackupContainer> bc,
Database cx) {
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
@ -572,7 +594,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
samplesIter,
&batchData->counters,
bc,
subAsset));
subAsset,
cx));
} else {
fileParserFutures.push_back(
parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
@ -582,8 +605,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile && !param.isPartitionedLog()) {
_parseSerializedMutation(
pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset);
wait(_parseSerializedMutation(
pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset, cx));
}
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID)
@ -594,7 +617,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
}
// A loader can process multiple RestoreLoadFileRequest in parallel.
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, Database cx) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state bool isDuplicated = true;
state bool printTrace = false;
@ -623,7 +646,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("ProcessLoadParam", req.param.toString());
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] =
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc);
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc, cx);
self->inflightLoadingReqs++;
isDuplicated = false;
} else {
@ -682,7 +705,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
// Send buffered mutations to appliers.
// Do not need to block on low memory usage because this actor should not increase memory usage.
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
Reference<RestoreLoaderData> self,
Database cx) {
state Reference<LoaderBatchData> batchData;
state Reference<LoaderBatchStatus> batchStatus;
state bool isDuplicated = true;
@ -759,7 +783,8 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
loadParam.asset,
loadParam.isRangeFile,
&batchData->rangeToApplier,
&self->appliersInterf));
&self->appliersInterf,
cx));
}
}
wait(waitForAll(fSendMutations));
@ -812,7 +837,8 @@ ACTOR Future<Void> sendMutationsToApplier(
RestoreAsset asset,
bool isRangeFile,
std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
std::map<UID, RestoreApplierInterface>* pApplierInterfaces,
Database cx) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
@ -820,6 +846,7 @@ ACTOR Future<Void> sendMutationsToApplier(
state Version msgIndex = 1; // Monotonically increased index for send message, must start at 1
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
state double msgSize = 0; // size of mutations in the message
state Arena arena;
// Wait for scheduler to kick it off
Promise<Void> toSched;
@ -863,14 +890,18 @@ ACTOR Future<Void> sendMutationsToApplier(
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
KeyRangeMap<UID> krMap;
state KeyRangeMap<UID> krMap;
buildApplierRangeMap(&krMap, pRangeToApplier);
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef& kvm = kvOp->second[mIndex];
state MutationRef kvm = kvOp->second[mIndex];
if (kvm.isEncrypted()) {
MutationRef decryptedMutation = wait(_decryptMutation(kvm, cx, &arena));
kvm = decryptedMutation;
}
// Send the mutation to applier
if (isRangeMutation(kvm)) {
MutationsVec mvector;
@ -1082,31 +1113,35 @@ bool concatenateBackupMutationForLogFile(SerializedMutationListMap* pMutationMap
// we may not get the entire mutation list for the version encoded_list_of_mutations:
// [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
const RestoreAsset& asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
SampledMutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
ACTOR Future<Void> _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
RestoreAsset asset,
Database cx) {
state VersionedMutationsMap* kvOps = &kvOpsIter->second;
state SampledMutationsVec* samples = &samplesIter->second;
state SerializedMutationListMap::iterator mutationMapIterator = pmutationMap->begin();
TraceEvent(SevFRMutationInfo, "FastRestoreLoaderParseSerializedLogMutation")
.detail("BatchIndex", asset.batchIndex)
.detail("RestoreAsset", asset.toString());
Arena tempArena;
for (auto& m : mutationMap) {
StringRef k = m.first.contents();
StringRef val = m.second.first.contents();
state Arena tempArena;
loop {
if (mutationMapIterator == pmutationMap->end()) {
break;
}
StringRef k = mutationMapIterator->first.contents();
state StringRef val = mutationMapIterator->second.first.contents();
StringRefReader kReader(k, restore_corrupted_data());
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
state uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
ASSERT_WE_THINK(asset.isInVersionRange(commitVersion));
StringRefReader vReader(val, restore_corrupted_data());
state StringRefReader vReader(val, restore_corrupted_data());
vReader.consume<uint64_t>(); // Consume the includeVersion
// TODO(xumengpanda): verify the protocol version is compatible and raise error if needed
@ -1114,72 +1149,79 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
uint32_t val_length_decoded = vReader.consume<uint32_t>();
ASSERT(val_length_decoded == val.size() - sizeof(uint64_t) - sizeof(uint32_t));
int sub = 0;
while (1) {
state int sub = 0;
loop {
// stop when reach the end of the string
if (vReader.eof()) { //|| *reader.rptr == 0xFF
break;
}
uint32_t type = vReader.consume<uint32_t>();
uint32_t kLen = vReader.consume<uint32_t>();
uint32_t vLen = vReader.consume<uint32_t>();
const uint8_t* k = vReader.consume(kLen);
const uint8_t* v = vReader.consume(vLen);
state uint32_t type = vReader.consume<uint32_t>();
state uint32_t kLen = vReader.consume<uint32_t>();
state uint32_t vLen = vReader.consume<uint32_t>();
state const uint8_t* k = vReader.consume(kLen);
state const uint8_t* v = vReader.consume(vLen);
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
state MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
if (mutation.isEncrypted()) {
MutationRef decryptedMutation = wait(_decryptMutation(mutation, cx, &tempArena));
mutation = decryptedMutation;
}
// Should this mutation be skipped?
// Skip mutation whose commitVesion < range kv's version
if (logMutationTooOld(pRangeVersions, mutation, commitVersion)) {
cc->oldLogMutations += 1;
continue;
}
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {
continue;
}
// Only apply mutation within the asset.range and apply removePrefix and addPrefix
ASSERT(asset.removePrefix.size() == 0);
if (isRangeMutation(mutation)) {
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
// Remove prefix or add prefix if we restore data to a new key space
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
mutation.param2 =
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
} else {
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {
} else {
// Only apply mutation within the asset.range and apply removePrefix and addPrefix
ASSERT(asset.removePrefix.size() == 0);
if (isRangeMutation(mutation)) {
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
// Remove prefix or add prefix if we restore data to a new key space
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
mutation.param2 =
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
} else {
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
}
cc->loadedLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
auto it = kvOps->insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
ASSERT(it.second); // inserted is true
ASSERT(sub <
std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples->push_back_deep(samples->arena(),
SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
}
}
cc->loadedLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
ASSERT(it.second); // inserted is true
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples.push_back_deep(samples.arena(), SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
}
mutationMapIterator++;
}
return Void();
}
// Parsing the data blocks in a range file

View File

@ -66,11 +66,14 @@ struct SimKmsConnectorContext : NonCopyable, ReferenceCounted<SimKmsConnectorCon
// Construct encryption keyStore.
// Note the keys generated must be the same after restart.
for (int i = 1; i <= maxEncryptionKeys; i++) {
Arena arena;
StringRef digest = computeAuthToken(
reinterpret_cast<const unsigned char*>(&i), sizeof(i), SHA_KEY, AES_256_KEY_LENGTH, arena);
simEncryptKeyStore[i] =
std::make_unique<SimEncryptKeyCtx>(i, reinterpret_cast<const char*>(digest.begin()));
uint8_t digest[AUTH_TOKEN_SIZE];
computeAuthToken(reinterpret_cast<const unsigned char*>(&i),
sizeof(i),
SHA_KEY,
AES_256_KEY_LENGTH,
&digest[0],
AUTH_TOKEN_SIZE);
simEncryptKeyStore[i] = std::make_unique<SimEncryptKeyCtx>(i, reinterpret_cast<const char*>(&digest[0]));
}
}
};

View File

@ -183,10 +183,15 @@ public:
void setEncryptKeyProxy(const EncryptKeyProxyInterface& interf) {
auto newInfo = serverInfo->get();
auto newClientInfo = clientInfo->get();
newClientInfo.id = deterministicRandom()->randomUniqueID();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.encryptKeyProxy = interf;
newInfo.client.encryptKeyProxy = interf;
newClientInfo.encryptKeyProxy = interf;
serverInfo->set(newInfo);
clientInfo->set(newClientInfo);
}
void setConsistencyScan(const ConsistencyScanInterface& interf) {
@ -199,7 +204,9 @@ public:
void clearInterf(ProcessClass::ClassType t) {
auto newInfo = serverInfo->get();
auto newClientInfo = clientInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newClientInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
if (t == ProcessClass::DataDistributorClass) {
newInfo.distributor = Optional<DataDistributorInterface>();
@ -209,10 +216,13 @@ public:
newInfo.blobManager = Optional<BlobManagerInterface>();
} else if (t == ProcessClass::EncryptKeyProxyClass) {
newInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
newInfo.client.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
newClientInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
} else if (t == ProcessClass::ConsistencyScanClass) {
newInfo.consistencyScan = Optional<ConsistencyScanInterface>();
}
serverInfo->set(newInfo);
clientInfo->set(newClientInfo);
}
ACTOR static Future<Void> countClients(DBInfo* self) {

View File

@ -0,0 +1,299 @@
/*
* IEncryptionKeyProvider.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/BlobCipher.h"
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_G_H)
#define FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_G_H
#include "fdbserver/IEncryptionKeyProvider.actor.g.h"
#elif !defined(FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_H)
#define FDBSERVER_IENCRYPTIONKEYPROVIDER_ACTOR_H
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/EncryptionOpsUtils.h"
#include "fdbserver/ServerDBInfo.h"
#include "flow/Arena.h"
#include "flow/EncryptUtils.h"
#define XXH_INLINE_ALL
#include "flow/xxhash.h"
#include "flow/actorcompiler.h" // This must be the last #include.
typedef uint64_t XOREncryptionKeyID;
// EncryptionKeyRef is somewhat multi-variant, it will contain members representing the union
// of all fields relevant to any implemented encryption scheme. They are generally of
// the form
// Page Fields - fields which come from or are stored in the Page
// Secret Fields - fields which are only known by the Key Provider
// but it is up to each encoding and provider which fields are which and which ones are used
//
// TODO(yiwu): Rename and/or refactor this struct. It doesn't sound like an encryption key should
// contain page fields like encryption header.
struct EncryptionKeyRef {
EncryptionKeyRef(){};
EncryptionKeyRef(Arena& arena, const EncryptionKeyRef& toCopy)
: cipherKeys(toCopy.cipherKeys), secret(arena, toCopy.secret), id(toCopy.id) {}
int expectedSize() const { return secret.size(); }
// Fields for AESEncryptionV1
TextAndHeaderCipherKeys cipherKeys;
Optional<BlobCipherEncryptHeader> cipherHeader;
// Fields for XOREncryption_TestOnly
StringRef secret;
Optional<XOREncryptionKeyID> id;
};
typedef Standalone<EncryptionKeyRef> EncryptionKey;
// Interface used by pager to get encryption keys reading pages from disk
// and by the BTree to get encryption keys to use for new pages
class IEncryptionKeyProvider : public ReferenceCounted<IEncryptionKeyProvider> {
public:
virtual ~IEncryptionKeyProvider() {}
// Get an EncryptionKey with Secret Fields populated based on the given Page Fields.
// It is up to the implementation which fields those are.
// The output Page Fields must match the input Page Fields.
virtual Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) = 0;
// Get encryption key that should be used for a given user Key-Value range
virtual Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) = 0;
// Setting tenant prefix to tenant name map.
virtual void setTenantPrefixIndex(Reference<TenantPrefixIndex> tenantPrefixIndex) {}
virtual bool shouldEnableEncryption() const = 0;
};
// The null key provider is useful to simplify page decoding.
// It throws an error for any key info requested.
class NullKeyProvider : public IEncryptionKeyProvider {
public:
virtual ~NullKeyProvider() {}
bool shouldEnableEncryption() const override { return true; }
Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override { throw encryption_key_not_found(); }
Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) override {
throw encryption_key_not_found();
}
};
// Key provider for dummy XOR encryption scheme
class XOREncryptionKeyProvider_TestOnly : public IEncryptionKeyProvider {
public:
XOREncryptionKeyProvider_TestOnly(std::string filename) {
ASSERT(g_network->isSimulated());
// Choose a deterministic random filename (without path) byte for secret generation
// Remove any leading directory names
size_t lastSlash = filename.find_last_of("\\/");
if (lastSlash != filename.npos) {
filename.erase(0, lastSlash);
}
xorWith = filename.empty() ? 0x5e
: (uint8_t)filename[XXH3_64bits(filename.data(), filename.size()) % filename.size()];
}
virtual ~XOREncryptionKeyProvider_TestOnly() {}
bool shouldEnableEncryption() const override { return true; }
Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override {
if (!key.id.present()) {
throw encryption_key_not_found();
}
EncryptionKey s = key;
uint8_t secret = ~(uint8_t)key.id.get() ^ xorWith;
s.secret = StringRef(s.arena(), &secret, 1);
return s;
}
Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) override {
EncryptionKeyRef k;
k.id = end.empty() ? 0 : *(end.end() - 1);
return getSecrets(k);
}
uint8_t xorWith;
};
// Key provider to provider cipher keys randomly from a pre-generated pool. Use for testing.
class RandomEncryptionKeyProvider : public IEncryptionKeyProvider {
public:
RandomEncryptionKeyProvider() {
for (unsigned i = 0; i < NUM_CIPHER; i++) {
BlobCipherDetails cipherDetails;
cipherDetails.encryptDomainId = i;
cipherDetails.baseCipherId = deterministicRandom()->randomUInt64();
cipherDetails.salt = deterministicRandom()->randomUInt64();
cipherKeys[i] = generateCipherKey(cipherDetails);
}
}
virtual ~RandomEncryptionKeyProvider() = default;
bool shouldEnableEncryption() const override { return true; }
Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override {
ASSERT(key.cipherHeader.present());
EncryptionKey s = key;
s.cipherKeys.cipherTextKey = cipherKeys[key.cipherHeader.get().cipherTextDetails.encryptDomainId];
s.cipherKeys.cipherHeaderKey = cipherKeys[key.cipherHeader.get().cipherHeaderDetails.encryptDomainId];
return s;
}
Future<EncryptionKey> getByRange(const KeyRef& /*begin*/, const KeyRef& /*end*/) override {
EncryptionKey s;
s.cipherKeys.cipherTextKey = getRandomCipherKey();
s.cipherKeys.cipherHeaderKey = getRandomCipherKey();
return s;
}
private:
Reference<BlobCipherKey> generateCipherKey(const BlobCipherDetails& cipherDetails) {
static unsigned char SHA_KEY[] = "3ab9570b44b8315fdb261da6b1b6c13b";
uint8_t digest[AUTH_TOKEN_SIZE];
computeAuthToken(reinterpret_cast<const unsigned char*>(&cipherDetails.baseCipherId),
sizeof(EncryptCipherBaseKeyId),
SHA_KEY,
AES_256_KEY_LENGTH,
&digest[0],
AUTH_TOKEN_SIZE);
return makeReference<BlobCipherKey>(cipherDetails.encryptDomainId,
cipherDetails.baseCipherId,
&digest[0],
AES_256_KEY_LENGTH,
cipherDetails.salt,
std::numeric_limits<int64_t>::max() /* refreshAt */,
std::numeric_limits<int64_t>::max() /* expireAt */);
}
Reference<BlobCipherKey> getRandomCipherKey() {
return cipherKeys[deterministicRandom()->randomInt(0, NUM_CIPHER)];
}
static constexpr int NUM_CIPHER = 1000;
Reference<BlobCipherKey> cipherKeys[NUM_CIPHER];
};
// Key provider which extract tenant id from range key prefixes, and fetch tenant specific encryption keys from
// EncryptKeyProxy.
class TenantAwareEncryptionKeyProvider : public IEncryptionKeyProvider {
public:
TenantAwareEncryptionKeyProvider(Reference<AsyncVar<ServerDBInfo> const> db) : db(db) {}
virtual ~TenantAwareEncryptionKeyProvider() = default;
bool shouldEnableEncryption() const override {
return isEncryptionOpSupported(EncryptOperationType::STORAGE_SERVER_ENCRYPTION, db->get().client);
}
ACTOR static Future<EncryptionKey> getSecrets(TenantAwareEncryptionKeyProvider* self, EncryptionKeyRef key) {
if (!key.cipherHeader.present()) {
TraceEvent("TenantAwareEncryptionKeyProvider_CipherHeaderMissing");
throw encrypt_ops_error();
}
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(self->db, key.cipherHeader.get(), BlobCipherMetrics::KV_REDWOOD));
EncryptionKey s = key;
s.cipherKeys = cipherKeys;
return s;
}
Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override { return getSecrets(this, key); }
ACTOR static Future<EncryptionKey> getByRange(TenantAwareEncryptionKeyProvider* self, KeyRef begin, KeyRef end) {
EncryptCipherDomainNameRef domainName;
EncryptCipherDomainId domainId = self->getEncryptionDomainId(begin, end, &domainName);
TextAndHeaderCipherKeys cipherKeys =
wait(getLatestEncryptCipherKeysForDomain(self->db, domainId, domainName, BlobCipherMetrics::KV_REDWOOD));
EncryptionKey s;
s.cipherKeys = cipherKeys;
return s;
}
Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) override {
return getByRange(this, begin, end);
}
void setTenantPrefixIndex(Reference<TenantPrefixIndex> tenantPrefixIndex) override {
ASSERT(tenantPrefixIndex.isValid());
this->tenantPrefixIndex = tenantPrefixIndex;
}
private:
EncryptCipherDomainId getEncryptionDomainId(const KeyRef& begin,
const KeyRef& end,
EncryptCipherDomainNameRef* domainName) {
int64_t domainId = SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID;
int64_t beginTenantId = getTenantId(begin, true /*inclusive*/);
int64_t endTenantId = getTenantId(end, false /*inclusive*/);
if (beginTenantId == endTenantId && beginTenantId != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
ASSERT(tenantPrefixIndex.isValid());
Key tenantPrefix = TenantMapEntry::idToPrefix(beginTenantId);
auto view = tenantPrefixIndex->atLatest();
auto itr = view.find(tenantPrefix);
if (itr != view.end()) {
*domainName = *itr;
domainId = beginTenantId;
} else {
// No tenant with the same tenant id. We could be in optional or disabled tenant mode.
}
}
if (domainId == SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
*domainName = FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME;
}
return domainId;
}
int64_t getTenantId(const KeyRef& key, bool inclusive) {
// A valid tenant id is always a valid encrypt domain id.
static_assert(INVALID_ENCRYPT_DOMAIN_ID == -1);
if (key.size() && key >= systemKeys.begin) {
return SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID;
}
if (key.size() < TENANT_PREFIX_SIZE) {
// Encryption domain information not available, leverage 'default encryption domain'
return FDB_DEFAULT_ENCRYPT_DOMAIN_ID;
}
StringRef prefix = key.substr(0, TENANT_PREFIX_SIZE);
int64_t tenantId = TenantMapEntry::prefixToId(prefix, EnforceValidTenantId::False);
if (tenantId == TenantInfo::INVALID_TENANT) {
// Encryption domain information not available, leverage 'default encryption domain'
return FDB_DEFAULT_ENCRYPT_DOMAIN_ID;
}
if (!inclusive && key.size() == TENANT_PREFIX_SIZE) {
tenantId = tenantId - 1;
}
ASSERT(tenantId >= 0);
return tenantId;
}
Reference<AsyncVar<ServerDBInfo> const> db;
Reference<TenantPrefixIndex> tenantPrefixIndex;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/BlobCipher.h"
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_IPAGEENCRYPTIONKEYPROVIDER_ACTOR_G_H)
#define FDBSERVER_IPAGEENCRYPTIONKEYPROVIDER_ACTOR_G_H
#include "fdbserver/IPageEncryptionKeyProvider.actor.g.h"
@ -207,14 +208,17 @@ private:
Reference<BlobCipherKey> generateCipherKey(const BlobCipherDetails& cipherDetails) {
static unsigned char SHA_KEY[] = "3ab9570b44b8315fdb261da6b1b6c13b";
Arena arena;
StringRef digest = computeAuthToken(reinterpret_cast<const unsigned char*>(&cipherDetails.baseCipherId),
sizeof(EncryptCipherBaseKeyId),
SHA_KEY,
AES_256_KEY_LENGTH,
arena);
uint8_t digest[AUTH_TOKEN_SIZE];
computeAuthToken(reinterpret_cast<const unsigned char*>(&cipherDetails.baseCipherId),
sizeof(EncryptCipherBaseKeyId),
SHA_KEY,
AES_256_KEY_LENGTH,
&digest[0],
AUTH_TOKEN_SIZE);
ASSERT_EQ(AUTH_TOKEN_SIZE, AES_256_KEY_LENGTH);
return makeReference<BlobCipherKey>(cipherDetails.encryptDomainId,
cipherDetails.baseCipherId,
digest.begin(),
&digest[0],
AES_256_KEY_LENGTH,
cipherDetails.salt,
std::numeric_limits<int64_t>::max() /* refreshAt */,

View File

@ -75,6 +75,7 @@ struct TestWorkload : NonCopyable, WorkloadContext, ReferenceCounted<TestWorkloa
virtual ~TestWorkload(){};
virtual Future<Void> initialized() { return Void(); }
virtual std::string description() const = 0;
virtual void disableFailureInjectionWorkloads(std::set<std::string>& out) const;
virtual Future<Void> setup(Database const& cx) { return Void(); }
virtual Future<Void> start(Database const& cx) = 0;
virtual Future<bool> check(Database const& cx) = 0;

View File

@ -350,7 +350,7 @@ Future<bool> CompoundWorkload::check(Database const& cx) {
.detail("Name", workloadName)
.detail("Remaining", *wCount)
.detail("Phase", "End");
return true;
return ret;
},
workload.check(cx));
};
@ -384,19 +384,21 @@ void CompoundWorkload::addFailureInjection(WorkloadRequest& work) {
if (!work.runFailureWorkloads || !FLOW_KNOBS->ENABLE_SIMULATION_IMPROVEMENTS) {
return;
}
// Some common workloads won't work with failure injection workloads
// Some workloads won't work with some failure injection workloads
std::set<std::string> disabledWorkloads;
for (auto const& w : workloads) {
auto desc = w->description();
if (desc == "ChangeConfig") {
return;
} else if (desc == "SaveAndKill") {
return;
}
w->disableFailureInjectionWorkloads(disabledWorkloads);
}
if (disabledWorkloads.count("all") > 0) {
return;
}
auto& factories = IFailureInjectorFactory::factories();
DeterministicRandom random(sharedRandomNumber);
for (auto& factory : factories) {
auto workload = factory->create(*this);
if (disabledWorkloads.count(workload->description()) > 0) {
continue;
}
while (workload->add(random, work, *this)) {
failureInjection.push_back(workload);
workload = factory->create(*this);
@ -419,6 +421,8 @@ void CompoundWorkload::getMetrics(std::vector<PerfMetric>&) {
ASSERT(false);
}
void TestWorkload::disableFailureInjectionWorkloads(std::set<std::string>& out) const {}
FailureInjectionWorkload::FailureInjectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
bool FailureInjectionWorkload::add(DeterministicRandom& random,
@ -508,6 +512,8 @@ ACTOR Future<Reference<TestWorkload>> getWorkloadIface(WorkloadRequest work,
wcx.clientId = work.clientId;
wcx.clientCount = work.clientCount;
wcx.sharedRandomNumber = work.sharedRandomNumber;
wcx.ccr = ccr;
wcx.dbInfo = dbInfo;
// FIXME: Other stuff not filled in; why isn't this constructed here and passed down to the other
// getWorkloadIface()?
for (int i = 0; i < work.options.size(); i++) {

View File

@ -270,7 +270,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
deterministicRandom()->randomInt(0, 2000),
tag.toString(),
backupRanges,
StopWhenDone{ !stopDifferentialDelay },

View File

@ -49,6 +49,8 @@ struct ChangeConfigWorkload : TestWorkload {
std::string description() const override { return "ChangeConfig"; }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
Future<Void> start(Database const& cx) override {
if (this->clientId != 0)
return Void();

View File

@ -29,7 +29,7 @@
struct DataDistributionMetricsWorkload : KVWorkload {
int numShards, readPerTx, writePerTx;
int64_t avgBytes;
int64_t avgBytes, transactionTimeLimit;
double testDuration;
std::string keyPrefix;
PerfIntCounter commits, errors;
@ -38,6 +38,8 @@ struct DataDistributionMetricsWorkload : KVWorkload {
DataDistributionMetricsWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), numShards(0), avgBytes(0), commits("Commits"), errors("Errors") {
testDuration = getOption(options, "testDuration"_sr, 10.0);
// transaction time out duration(ms)
transactionTimeLimit = getOption(options, "transactionTimeLimit"_sr, 10000);
keyPrefix = getOption(options, "keyPrefix"_sr, "DDMetrics"_sr).toString();
readPerTx = getOption(options, "readPerTransaction"_sr, 1);
writePerTx = getOption(options, "writePerTransaction"_sr, 5 * readPerTx);
@ -73,6 +75,9 @@ struct DataDistributionMetricsWorkload : KVWorkload {
ACTOR Future<Void> resultConsistencyCheckClient(Database cx, DataDistributionMetricsWorkload* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::TIMEOUT,
StringRef((uint8_t*)&self->transactionTimeLimit, sizeof(int64_t)));
try {
wait(delay(self->delayPerLoop));
int startIndex = deterministicRandom()->randomInt(0, self->nodeCount - 1);
@ -88,7 +93,7 @@ struct DataDistributionMetricsWorkload : KVWorkload {
// the range. If we didn't read through the end of the range, then the second last key
// in the result will be the last key less than endKey. (Condition #2)
state KeySelector end = KeySelectorRef(endKey.withPrefix(ddStatsRange.begin, endKey.arena()), false, 2);
RangeResult result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->SHARD_COUNT_LIMIT)));
RangeResult result = wait(tr->getRange(begin, end, GetRangeLimits(CLIENT_KNOBS->TOO_MANY)));
// Condition #1 and #2 can be broken if multiple rpc calls happened in one getRange
if (result.size() > 1) {
if (result[0].key > begin.getKey() || result[1].key <= begin.getKey()) {
@ -100,13 +105,14 @@ struct DataDistributionMetricsWorkload : KVWorkload {
.detail("SecondKey", result[1].key)
.detail("BeginKeySelector", begin);
}
if (result[result.size() - 1].key < end.getKey() || result[result.size() - 2].key >= end.getKey()) {
if (!result.readThroughEnd && (result[result.size() - 1].key < end.getKey() ||
result[result.size() - 2].key >= end.getKey())) {
++self->errors;
TraceEvent(SevError, "TestFailure")
.detail("Reason", "Result mismatches the given end selector")
.detail("Size", result.size())
.detail("FirstKey", result[result.size() - 1].key)
.detail("SecondKey", result[result.size() - 2].key)
.detail("LastKey", result[result.size() - 1].key)
.detail("SecondLastKey", result[result.size() - 2].key)
.detail("EndKeySelector", end);
}
// Debugging traces
@ -123,9 +129,10 @@ struct DataDistributionMetricsWorkload : KVWorkload {
}
} catch (Error& e) {
// Ignore timed_out error and cross_module_read, the end key selector may read through the end
if (e.code() == error_code_timed_out || e.code() == error_code_special_keys_cross_module_read)
if (e.code() == error_code_timed_out || e.code() == error_code_transaction_timed_out) {
tr->reset();
continue;
TraceEvent(SevDebug, "FailedToRetrieveDDMetrics").error(e);
}
wait(tr->onError(e));
}
}
@ -139,38 +146,45 @@ struct DataDistributionMetricsWorkload : KVWorkload {
// TODO : find why this not work
// wait(quietDatabase(cx, self->dbInfo, "PopulateTPCC"));
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
try {
state RangeResult result = wait(tr->getRange(ddStatsRange, CLIENT_KNOBS->SHARD_COUNT_LIMIT));
ASSERT(!result.more);
self->numShards = result.size();
if (self->numShards < 1)
return false;
state int64_t totalBytes = 0;
auto schema = readJSONStrictly(JSONSchemas::dataDistributionStatsSchema.toString()).get_obj();
for (int i = 0; i < result.size(); ++i) {
ASSERT(result[i].key.startsWith(ddStatsRange.begin));
std::string errorStr;
auto valueObj = readJSONStrictly(result[i].value.toString()).get_obj();
CODE_PROBE(true, "data_distribution_stats schema validation");
if (!schemaMatch(schema, valueObj, errorStr, SevError, true)) {
TraceEvent(SevError, "DataDistributionStatsSchemaValidationFailed")
.detail("ErrorStr", errorStr.c_str())
.detail("JSON", json_spirit::write_string(json_spirit::mValue(result[i].value.toString())));
return false;
state int i;
state int retries = 0;
loop {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::TIMEOUT,
StringRef((uint8_t*)&self->transactionTimeLimit, sizeof(int64_t)));
try {
state RangeResult result = wait(tr->getRange(ddStatsRange, CLIENT_KNOBS->TOO_MANY));
ASSERT(!result.more);
self->numShards = result.size();
// There's no guarantee that #shards <= CLIENT_KNOBS->SHARD_COUNT_LIMIT all the time
ASSERT(self->numShards >= 1);
state int64_t totalBytes = 0;
auto schema = readJSONStrictly(JSONSchemas::dataDistributionStatsSchema.toString()).get_obj();
for (i = 0; i < result.size(); ++i) {
ASSERT(result[i].key.startsWith(ddStatsRange.begin));
std::string errorStr;
auto valueObj = readJSONStrictly(result[i].value.toString()).get_obj();
CODE_PROBE(true, "data_distribution_stats schema validation");
if (!schemaMatch(schema, valueObj, errorStr, SevError, true)) {
TraceEvent(SevError, "DataDistributionStatsSchemaValidationFailed")
.detail("ErrorStr", errorStr.c_str())
.detail("JSON", json_spirit::write_string(json_spirit::mValue(result[i].value.toString())));
return false;
}
totalBytes += valueObj["shard_bytes"].get_int64();
}
totalBytes += valueObj["shard_bytes"].get_int64();
self->avgBytes = totalBytes / self->numShards;
break;
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_transaction_timed_out) {
tr->reset();
// The RPC call may in some corner cases get no response
if (++retries > 10)
break;
continue;
}
wait(tr->onError(e));
}
self->avgBytes = totalBytes / self->numShards;
// fetch data-distribution stats for a smaller range
ASSERT(result.size());
state int idx = deterministicRandom()->randomInt(0, result.size());
RangeResult res = wait(tr->getRange(
KeyRangeRef(result[idx].key, idx + 1 < result.size() ? result[idx + 1].key : ddStatsRange.end), 100));
ASSERT_WE_THINK(res.size() == 1 && res[0] == result[idx]); // It works good now. However, not sure in any
// case of data-distribution, the number changes
} catch (Error& e) {
TraceEvent(SevError, "FailedToRetrieveDDMetrics").detail("Error", e.what());
throw;
}
return true;
}
@ -188,7 +202,12 @@ struct DataDistributionMetricsWorkload : KVWorkload {
std::string description() const override { return "DataDistributionMetrics"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return _start(cx, this); }
Future<bool> check(Database const& cx) override { return _check(cx, this); }
Future<bool> check(Database const& cx) override {
if (clientId == 0)
return _check(cx, this);
return true;
}
void getMetrics(std::vector<PerfMetric>& m) override {
m.emplace_back("NumShards", numShards, Averaged::True);

View File

@ -46,7 +46,7 @@ struct DiskFailureInjectionWorkload : FailureInjectionWorkload {
// Verification Mode: We run the workload indefinitely in this mode.
// The idea is to keep going until we get a non-zero chaosMetric to ensure
// that we haven't lost the chaos event. testDuration is ignored in this mode
bool verificationMode;
bool verificationMode = false;
DiskFailureInjectionWorkload(WorkloadContext const& wcx, NoOptions) : FailureInjectionWorkload(wcx) {}

View File

@ -56,6 +56,7 @@ struct MoveKeysWorkload : FailureInjectionWorkload {
state Transaction tr(cx);
loop {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
try {
RangeResult res = wait(tr.getRange(configKeys, 1000));
ASSERT(res.size() < 1000);

View File

@ -45,6 +45,7 @@ struct SaveAndKillWorkload : TestWorkload {
}
std::string description() const override { return "SaveAndKillWorkload"; }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
Future<Void> setup(Database const& cx) override {
g_simulator->disableSwapsToAll();
return Void();

View File

@ -219,6 +219,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml)
add_fdb_test(TEST_FILES rare/CycleWithKills.toml)
add_fdb_test(TEST_FILES rare/CycleWithDeadHall.toml)
add_fdb_test(TEST_FILES rare/DataDistributionMetrics.toml)
add_fdb_test(TEST_FILES rare/FuzzTest.toml)
add_fdb_test(TEST_FILES rare/GlobalTagThrottling.toml IGNORE)
add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml)

View File

@ -0,0 +1,8 @@
[configuration]
buggify = false
[[test]]
testTitle = 'DataDistributionMetricsTest'
[[test.workload]]
testName = 'DataDistributionMetrics'