Merge branch 'main' of https://github.com/apple/foundationdb into features/ppw-pause-state

This commit is contained in:
Xiaoxi Wang 2022-06-29 21:03:47 -07:00
commit 1b0780398f
30 changed files with 424 additions and 110 deletions

View File

@ -4,6 +4,7 @@ target_include_directories(rapidjson INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/rapid
add_subdirectory(crc32)
add_subdirectory(stacktrace)
add_subdirectory(folly_memcpy)
add_subdirectory(rapidxml)
add_subdirectory(sqlite)
add_subdirectory(SimpleOpt)
add_subdirectory(fmt-8.1.1)

View File

@ -0,0 +1,2 @@
add_library(rapidxml INTERFACE)
target_include_directories(rapidxml INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include")

View File

@ -80,7 +80,7 @@ add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS $
target_include_directories(fdbclient PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/versions.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/versions.h)
add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack PRIVATE rapidxml)
# Create a separate fdbclient library with sampling enabled. This lets
# fdbserver retain sampling functionality in client code while disabling
@ -88,7 +88,7 @@ target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)
add_flow_target(STATIC_LIBRARY NAME fdbclient_sampling SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
target_include_directories(fdbclient_sampling PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include")
add_dependencies(fdbclient_sampling fdboptions)
target_link_libraries(fdbclient_sampling PUBLIC fdbrpc_sampling msgpack)
target_link_libraries(fdbclient_sampling PUBLIC fdbrpc_sampling msgpack PRIVATE rapidxml)
target_compile_definitions(fdbclient_sampling PRIVATE -DENABLE_SAMPLING)
if(WIN32)
add_dependencies(fdbclient_sampling_actors fdbclient_actors)

View File

@ -40,7 +40,7 @@
#include "flow/IAsyncFile.h"
#include "flow/Hostname.h"
#include "flow/UnitTest.h"
#include "fdbclient/rapidxml/rapidxml.hpp"
#include "rapidxml/rapidxml.hpp"
#ifdef BUILD_AWS_BACKUP
#include "fdbclient/FDBAWSCredentialsProvider.h"
#endif

View File

@ -79,6 +79,7 @@ struct MutationRef {
CompareAndClear,
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
Reserved_For_EncryptedMutationMessage /* See fdbserver/EncryptedMutationMessage.actor.h */,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.

View File

@ -24,6 +24,7 @@
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
@ -67,13 +68,14 @@ public:
ProxyCommitData& proxyCommitData_,
Reference<ILogSystem> logSystem_,
LogPushData* toCommit_,
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys_,
bool& confChange_,
Version version,
Version popVersion_,
bool initialCommit_)
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_),
logSystem(logSystem_), version(version), popVersion(popVersion_),
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), cipherKeys(cipherKeys_),
confChange(confChange_), logSystem(logSystem_), version(version), popVersion(popVersion_),
vecBackupKeys(&proxyCommitData_.vecBackupKeys), keyInfo(&proxyCommitData_.keyInfo),
cacheInfo(&proxyCommitData_.cacheInfo),
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
@ -108,6 +110,9 @@ private:
// non-null if these mutations were part of a new commit handled by this commit proxy
LogPushData* toCommit = nullptr;
// Cipher keys used to encrypt to be committed mutations
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys = nullptr;
// Flag indicates if the configure is changed
bool& confChange;
@ -152,6 +157,16 @@ private:
bool dummyConfChange = false;
private:
void writeMutation(const MutationRef& m) {
if (forResolver || !SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
toCommit->writeTypedMessage(m);
} else {
ASSERT(cipherKeys != nullptr);
Arena arena;
toCommit->writeTypedMessage(EncryptedMutationMessage::encryptMetadata(arena, *cipherKeys, m));
}
}
void checkSetKeyServersPrefix(MutationRef m) {
if (!m.param1.startsWith(keyServersPrefix)) {
return;
@ -221,7 +236,7 @@ private:
.detail("Tag", tag.toString());
toCommit->addTag(tag);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
@ -243,7 +258,7 @@ private:
toCommit->writeTypedMessage(LogProtocolMessage(), true);
TraceEvent(SevDebug, "SendingPrivatized_ServerTag", dbgid).detail("M", privatized);
toCommit->addTag(tag);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
@ -303,7 +318,7 @@ private:
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_CacheTag", dbgid).detail("M", privatized);
toCommit->addTag(cacheTag);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
void checkSetConfigKeys(MutationRef m) {
@ -354,7 +369,7 @@ private:
toCommit->addTags(allSources);
}
TraceEvent(SevDebug, "SendingPrivatized_ChangeFeed", dbgid).detail("M", privatized);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
@ -408,7 +423,7 @@ private:
if (tagV.present()) {
TraceEvent(SevDebug, "SendingPrivatized_TSSID", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
}
@ -437,7 +452,7 @@ private:
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_TSSQuarantine", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
@ -560,7 +575,7 @@ private:
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_GlobalKeys", dbgid).detail("M", privatized);
toCommit->addTags(allTags);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
// Generates private mutations for the target storage server, instructing it to create a checkpoint.
@ -582,7 +597,7 @@ private:
.detail("Checkpoint", checkpoint.toString());
toCommit->addTag(tag);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
@ -662,7 +677,7 @@ private:
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
TEST(true); // Tenant added to map
@ -760,7 +775,7 @@ private:
TraceEvent(SevDebug, "SendingPrivatized_ClearServerTag", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(kv.value));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
// Might be a tss removal, which doesn't store a tag there.
@ -784,7 +799,7 @@ private:
TraceEvent(SevDebug, "SendingPrivatized_TSSClearServerTag", dbgid)
.detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
}
@ -969,7 +984,7 @@ private:
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSMapping", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
@ -996,7 +1011,7 @@ private:
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSQuarantine", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
}
}
@ -1050,7 +1065,7 @@ private:
privatized.type = MutationRef::ClearRange;
privatized.param1 = range.begin.withPrefix(systemKeys.begin, arena);
privatized.param2 = range.end.withPrefix(systemKeys.begin, arena);
toCommit->writeTypedMessage(privatized);
writeMutation(privatized);
}
TEST(true); // Tenant cleared from map
@ -1146,9 +1161,9 @@ private:
.detail("MBegin", mutationBegin)
.detail("MEnd", mutationEnd);
toCommit->addTags(allTags);
toCommit->writeTypedMessage(mutationBegin);
writeMutation(mutationBegin);
toCommit->addTags(allTags);
toCommit->writeTypedMessage(mutationEnd);
writeMutation(mutationEnd);
}
}
@ -1223,6 +1238,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations,
LogPushData* toCommit,
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys,
bool& confChange,
Version version,
Version popVersion,
@ -1234,6 +1250,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
proxyCommitData,
logSystem,
toCommit,
pCipherKeys,
confChange,
version,
popVersion,

View File

@ -25,6 +25,8 @@
#include "fdbclient/SystemData.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/GetEncryptCipherKeys.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
@ -44,6 +46,7 @@ struct VersionedMessage {
StringRef message;
VectorRef<Tag> tags;
Arena arena; // Keep a reference to the memory containing the message
Arena decryptArena; // Arena used for decrypt buffer.
size_t bytes; // arena's size when inserted, which can grow afterwards
VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef<Tag>& t, const Arena& a)
@ -53,7 +56,8 @@ struct VersionedMessage {
// Returns true if the message is a mutation that should be backuped, i.e.,
// either key is not in system key space or is not a metadataVersionKey.
bool isBackupMessage(MutationRef* m) const {
bool isBackupMessage(MutationRef* m,
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys) {
for (Tag tag : tags) {
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
return false; // skip Txs mutations
@ -71,10 +75,26 @@ struct VersionedMessage {
TEST(true); // Returning false for OTELSpanContextMessage
return false;
}
reader >> *m;
if (EncryptedMutationMessage::isNextIn(reader)) {
// In case the mutation is encrypted, get the decrypted mutation and also update message to point to
// the decrypted mutation.
// We use dedicated arena for decrypt buffer, as the other arena is used to count towards backup lock bytes.
*m = EncryptedMutationMessage::decrypt(reader, decryptArena, cipherKeys, &message);
} else {
reader >> *m;
}
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;
}
void collectCipherDetailIfEncrypted(std::unordered_set<BlobCipherDetails>& cipherDetails) {
ArenaReader reader(arena, message, AssumeVersion(g_network->protocolVersion()));
if (EncryptedMutationMessage::isNextIn(reader)) {
EncryptedMutationMessage emm;
reader >> emm;
cipherDetails.insert(emm.header.cipherTextDetails);
cipherDetails.insert(emm.header.cipherHeaderDetails);
}
}
};
struct BackupData {
@ -89,6 +109,7 @@ struct BackupData {
Version minKnownCommittedVersion;
Version savedVersion; // Largest version saved to blob storage
Version popVersion; // Largest version popped in NOOP mode, can be larger than savedVersion.
Reference<AsyncVar<ServerDBInfo> const> db;
AsyncVar<Reference<ILogSystem>> logSystem;
Database cx;
std::vector<VersionedMessage> messages;
@ -245,7 +266,7 @@ struct BackupData {
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
db(db), pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
cc("BackupWorker", myId.toString()) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
@ -682,7 +703,10 @@ ACTOR static Future<Void> updateLogBytesWritten(BackupData* self,
// Saves messages in the range of [0, numMsg) to a file and then remove these
// messages. The file content format is a sequence of (Version, sub#, msgSize, message).
// Note only ready backups are saved.
ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int numMsg) {
ACTOR Future<Void> saveMutationsToFile(BackupData* self,
Version popVersion,
int numMsg,
std::unordered_set<BlobCipherDetails> cipherDetails) {
state int blockSize = SERVER_KNOBS->BACKUP_FILE_BLOCK_BYTES;
state std::vector<Future<Reference<IBackupFile>>> logFileFutures;
state std::vector<Reference<IBackupFile>> logFiles;
@ -691,6 +715,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
state std::vector<Version> beginVersions; // logFiles' begin versions
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
state std::vector<Standalone<StringRef>> mutations;
state std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys;
state int idx;
// Make sure all backups are ready, otherwise mutations will be lost.
@ -742,11 +767,18 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
.detail("File", logFiles[i]->getFileName());
}
// Fetch cipher keys if any of the messages are encrypted.
if (!cipherDetails.empty()) {
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
wait(getEncryptCipherKeys(self->db, cipherDetails));
cipherKeys = getCipherKeysResult;
}
blockEnds = std::vector<int64_t>(logFiles.size(), 0);
for (idx = 0; idx < numMsg; idx++) {
const auto& message = self->messages[idx];
auto& message = self->messages[idx];
MutationRef m;
if (!message.isBackupMessage(&m))
if (!message.isBackupMessage(&m, cipherKeys))
continue;
DEBUG_MUTATION("addMutation", message.version.version, m)
@ -815,6 +847,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
state Future<Void> uploadDelay = delay(SERVER_KNOBS->BACKUP_UPLOAD_DELAY);
state int numMsg = 0;
state std::unordered_set<BlobCipherDetails> cipherDetails;
Version lastPopVersion = popVersion;
// index of last version's end position in self->messages
int lastVersionIndex = 0;
@ -826,7 +859,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
popVersion = std::max(popVersion, self->minKnownCommittedVersion);
}
} else {
for (const auto& message : self->messages) {
for (auto& message : self->messages) {
// message may be prefetched in peek; uncommitted message should not be uploaded.
const Version version = message.getVersion();
if (version > self->maxPopVersion())
@ -836,6 +869,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
lastVersion = popVersion;
popVersion = version;
}
message.collectCipherDetailIfEncrypted(cipherDetails);
numMsg++;
}
}
@ -859,7 +893,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
.detail("NumMsg", numMsg)
.detail("MsgQ", self->messages.size());
// save an empty file for old epochs so that log file versions are continuous
wait(saveMutationsToFile(self, popVersion, numMsg));
wait(saveMutationsToFile(self, popVersion, numMsg, cipherDetails));
self->eraseMessages(numMsg);
}

View File

@ -34,7 +34,9 @@
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/GetEncryptCipherKeys.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
@ -48,6 +50,7 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/BlobCipher.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
@ -641,6 +644,9 @@ struct CommitBatchContext {
std::set<Tag> writtenTags; // final set tags written to in the batch
std::set<Tag> writtenTagsPreResolution; // tags written to in the batch not including any changes from the resolver.
// Cipher keys to be used to encrypt mutations
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
CommitBatchContext(ProxyCommitData*, const std::vector<CommitTransactionRequest>*, const int);
void setupTraceBatch();
@ -897,6 +903,27 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
self->transactionResolverMap.swap(requests.transactionResolverMap);
// Used to report conflicting keys
self->txReadConflictRangeIndexMap.swap(requests.txReadConflictRangeIndexMap);
// Fetch cipher keys if needed.
state Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getCipherKeys;
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
static std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> defaultDomains = {
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME },
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME }
};
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> encryptDomains = defaultDomains;
for (int t = 0; t < trs.size(); t++) {
int64_t tenantId = trs[t].tenantInfo.tenantId;
Optional<TenantName> tenantName = trs[t].tenantInfo.name;
// TODO(yiwu): In raw access mode, use tenant prefix to figure out tenant id for user data
if (tenantId != TenantInfo::INVALID_TENANT) {
ASSERT(tenantName.present());
encryptDomains[tenantId] = tenantName.get();
}
}
getCipherKeys = getLatestEncryptCipherKeys(pProxyCommitData->db, encryptDomains);
}
self->releaseFuture = releaseResolvingAfter(pProxyCommitData, self->releaseDelay, self->localBatchNumber);
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchLogging.get() >
@ -922,6 +949,11 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
}
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys = wait(getCipherKeys);
self->cipherKeys = cipherKeys;
}
return Void();
}
@ -961,6 +993,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
self->pProxyCommitData->logSystem,
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
/* pToCommit= */ nullptr,
/* pCipherKeys= */ nullptr,
self->forceRecovery,
/* version= */ self->commitVersion,
/* popVersion= */ 0,
@ -1060,6 +1093,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
pProxyCommitData->logSystem,
trs[t].transaction.mutations,
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? nullptr : &self->toCommit,
SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION ? &self->cipherKeys : nullptr,
self->forceRecovery,
self->commitVersion,
self->commitVersion + 1,
@ -1111,6 +1145,22 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
return Void();
}
void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef& mutation) {
static_assert(TenantInfo::INVALID_TENANT == ENCRYPT_INVALID_DOMAIN_ID);
if (!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || tenantId == TenantInfo::INVALID_TENANT) {
// TODO(yiwu): In raw access mode, use tenant prefix to figure out tenant id for user data
bool isRawAccess = tenantId == TenantInfo::INVALID_TENANT && !isSystemKey(mutation.param1) &&
!(mutation.type == MutationRef::ClearRange && isSystemKey(mutation.param2)) &&
self->pProxyCommitData->db->get().client.tenantMode == TenantMode::REQUIRED;
TEST(isRawAccess); // Raw access to tenant key space
self->toCommit.writeTypedMessage(mutation);
} else {
Arena arena;
self->toCommit.writeTypedMessage(
EncryptedMutationMessage::encrypt(arena, self->cipherKeys, tenantId /*domainId*/, mutation));
}
}
/// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers'
/// tags
ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
@ -1127,6 +1177,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
state int64_t tenantId = trs[self->transactionNum].tenantInfo.tenantId;
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
@ -1184,7 +1235,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.writeTypedMessage(m);
writeMutation(self, tenantId, m);
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
@ -1237,7 +1288,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.writeTypedMessage(m);
writeMutation(self, tenantId, m);
} else {
UNREACHABLE();
}
@ -2308,6 +2359,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
Reference<ILogSystem>(),
mutations,
/* pToCommit= */ nullptr,
/* pCipherKeys= */ nullptr,
confChanges,
/* version= */ 0,
/* popVersion= */ 0,
@ -2399,7 +2451,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION &&
(!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || commitData.db->get().encryptKeyProxy.present()))) {
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
wait(commitData.db->onChange());
}

View File

@ -42,6 +42,7 @@
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include <boost/functional/hash.hpp>
#include <boost/mpl/not.hpp>
#include <limits>
#include <string>

View File

@ -1,5 +1,5 @@
/*
* GetCipherKeys.actor.cpp
* GetEncryptCipherKeys.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -38,7 +38,7 @@ ACTOR Future<Void> onEncryptKeyProxyChange(Reference<AsyncVar<ServerDBInfo> cons
break;
}
}
TraceEvent("GetCipherKeys_EncryptKeyProxyChanged")
TraceEvent("GetEncryptCipherKeys_EncryptKeyProxyChanged")
.detail("PreviousProxyId", previousProxyId.orDefault(UID()))
.detail("CurrentProxyId", currentProxyId.orDefault(UID()));
return Void();
@ -50,19 +50,19 @@ ACTOR Future<EKPGetLatestBaseCipherKeysReply> getUncachedLatestEncryptCipherKeys
Optional<EncryptKeyProxyInterface> proxy = db->get().encryptKeyProxy;
if (!proxy.present()) {
// Wait for onEncryptKeyProxyChange.
TraceEvent("GetLatestCipherKeys_EncryptKeyProxyNotPresent");
TraceEvent("GetLatestEncryptCipherKeys_EncryptKeyProxyNotPresent");
return Never();
}
request.reply.reset();
try {
EKPGetLatestBaseCipherKeysReply reply = wait(proxy.get().getLatestBaseCipherKeys.getReply(request));
if (reply.error.present()) {
TraceEvent(SevWarn, "GetLatestCipherKeys_RequestFailed").error(reply.error.get());
TraceEvent(SevWarn, "GetLatestEncryptCipherKeys_RequestFailed").error(reply.error.get());
throw encrypt_keys_fetch_failed();
}
return reply;
} catch (Error& e) {
TraceEvent("GetLatestCipherKeys_CaughtError").error(e);
TraceEvent("GetLatestEncryptCipherKeys_CaughtError").error(e);
if (e.code() == error_code_broken_promise) {
// Wait for onEncryptKeyProxyChange.
return Never();
@ -81,7 +81,7 @@ ACTOR Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>
state EKPGetLatestBaseCipherKeysRequest request;
if (!db.isValid()) {
TraceEvent(SevError, "GetLatestCipherKeys_ServerDBInfoNotAvailable");
TraceEvent(SevError, "GetLatestEncryptCipherKeys_ServerDBInfoNotAvailable");
throw encrypt_ops_error();
}
@ -114,7 +114,7 @@ ACTOR Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>
// Check for any missing cipher keys.
for (auto& domain : request.encryptDomainInfos) {
if (cipherKeys.count(domain.domainId) == 0) {
TraceEvent(SevWarn, "GetLatestCipherKeys_KeyMissing").detail("DomainId", domain.domainId);
TraceEvent(SevWarn, "GetLatestEncryptCipherKeys_KeyMissing").detail("DomainId", domain.domainId);
throw encrypt_key_not_found();
}
}
@ -133,19 +133,19 @@ ACTOR Future<EKPGetBaseCipherKeysByIdsReply> getUncachedEncryptCipherKeys(Refere
Optional<EncryptKeyProxyInterface> proxy = db->get().encryptKeyProxy;
if (!proxy.present()) {
// Wait for onEncryptKeyProxyChange.
TraceEvent("GetCipherKeys_EncryptKeyProxyNotPresent");
TraceEvent("GetEncryptCipherKeys_EncryptKeyProxyNotPresent");
return Never();
}
request.reply.reset();
try {
EKPGetBaseCipherKeysByIdsReply reply = wait(proxy.get().getBaseCipherKeysByIds.getReply(request));
if (reply.error.present()) {
TraceEvent(SevWarn, "GetCipherKeys_RequestFailed").error(reply.error.get());
TraceEvent(SevWarn, "GetEncryptCipherKeys_RequestFailed").error(reply.error.get());
throw encrypt_keys_fetch_failed();
}
return reply;
} catch (Error& e) {
TraceEvent("GetCipherKeys_CaughtError").error(e);
TraceEvent("GetEncryptCipherKeys_CaughtError").error(e);
if (e.code() == error_code_broken_promise) {
// Wait for onEncryptKeyProxyChange.
return Never();
@ -167,7 +167,7 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
state EKPGetBaseCipherKeysByIdsRequest request;
if (!db.isValid()) {
TraceEvent(SevError, "GetCipherKeys_ServerDBInfoNotAvailable");
TraceEvent(SevError, "GetEncryptCipherKeys_ServerDBInfoNotAvailable");
throw encrypt_ops_error();
}
@ -204,7 +204,7 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
BaseCipherIndex baseIdx = std::make_pair(details.encryptDomainId, details.baseCipherId);
const auto& itr = baseCipherKeys.find(baseIdx);
if (itr == baseCipherKeys.end()) {
TraceEvent(SevError, "GetCipherKeys_KeyMissing")
TraceEvent(SevError, "GetEncryptCipherKeys_KeyMissing")
.detail("DomainId", details.encryptDomainId)
.detail("BaseCipherId", details.baseCipherId);
throw encrypt_key_not_found();

View File

@ -21,6 +21,7 @@
#include <algorithm>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
@ -102,6 +103,8 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
OTELSpanContextMessage scm;
br >> scm;
} else if (EncryptedMutationMessage::startsEncryptedMutationMessage(mutationType)) {
throw encrypt_unsupported();
} else {
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));

View File

@ -24,6 +24,7 @@
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/MutationTracking.h"
@ -422,6 +423,9 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
if (EncryptedMutationMessage::isNextIn(rd)) {
throw encrypt_unsupported();
}
MutationRef mutation;
rd >> mutation;

View File

@ -139,7 +139,7 @@ ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
req.debugId.present() ? TraceEvent("SimKmsGetsByDomIds", interf.id()) : Optional<TraceEvent>();
if (dbgDIdTrace.present()) {
dbgDIdTrace.get().detail("DbgId", req.debugId.get());
dbgDIdTrace.get().setMaxEventLength(16384).detail("DbgId", req.debugId.get());
}
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This

View File

@ -23,6 +23,8 @@
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/GetEncryptCipherKeys.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/StorageServerInterface.h"
@ -1874,6 +1876,9 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
state FetchInjectionInfo fii;
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
state Optional<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> cipherKeys;
state bool collectingCipherKeys = false;
// If encrypted mutation is encountered, we collect cipher details and fetch cipher keys, then start over.
loop {
state uint64_t changeCounter = data->cacheRangeChangeCounter;
bool epochEnd = false;
@ -1881,6 +1886,8 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
bool firstMutation = true;
bool dbgLastMessageWasProtocol = false;
std::unordered_set<BlobCipherDetails> cipherDetails;
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
cloneCursor2 = cursor->cloneNoMore();
@ -1904,36 +1911,60 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
EncryptedMutationMessage::isNextIn(cloneReader) && !cipherKeys.present()) {
// Encrypted mutation found, but cipher keys haven't been fetch.
// Collect cipher details to fetch cipher keys in one batch.
EncryptedMutationMessage emm;
cloneReader >> emm;
cipherDetails.insert(emm.header.cipherTextDetails);
cipherDetails.insert(emm.header.cipherHeaderDetails);
collectingCipherKeys = true;
} else {
MutationRef msg;
cloneReader >> msg;
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
// ASSERT(firstMutation);
ASSERT(dbgLastMessageWasProtocol);
if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
EncryptedMutationMessage::isNextIn(cloneReader)) {
assert(cipherKeys.present());
msg = EncryptedMutationMessage::decrypt(cloneReader, cloneReader.arena(), cipherKeys.get());
} else {
cloneReader >> msg;
}
dbgLastMessageWasProtocol = false;
if (!collectingCipherKeys) {
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
// ASSERT(firstMutation);
ASSERT(dbgLastMessageWasProtocol);
}
dbgLastMessageWasProtocol = false;
}
}
}
// Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do so
// now. If there is an epoch end we skip this step, to increase testability and to prevent inserting a
// version in the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
if (collectingCipherKeys) {
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> result =
wait(getEncryptCipherKeys(data->db, cipherDetails));
cipherKeys = result;
collectingCipherKeys = false;
} else {
// Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do
// so now. If there is an epoch end we skip this step, to increase testability and to prevent
// inserting a version in the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
}
if (data->cacheRangeChangeCounter == changeCounter)
break;
// TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read
// it again.
}
if (data->cacheRangeChangeCounter == changeCounter)
break;
// TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
// again.
}
data->debug_inApplyUpdate = true;
@ -1988,7 +2019,11 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
reader >> oscm;
} else {
MutationRef msg;
reader >> msg;
if (reader.protocolVersion().hasEncryptionAtRest() && EncryptedMutationMessage::isNextIn(reader)) {
msg = EncryptedMutationMessage::decrypt(reader, reader.arena(), cipherKeys.get());
} else {
reader >> msg;
}
if (ver != invalidVersion) // This change belongs to a version < minVersion
{

View File

@ -28,7 +28,6 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/Knobs.h"

View File

@ -33,6 +33,7 @@
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/ProxyCommitData.actor.h"
#include "flow/BlobCipher.h"
#include "flow/FastRef.h"
// Resolver's data for applyMetadataMutations() calls.
@ -93,6 +94,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations,
LogPushData* pToCommit,
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys,
bool& confChange,
Version version,
Version popVersion,

View File

@ -0,0 +1,117 @@
/*
* EncryptedMutationMessage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_ENCRYPTEDMUTATIONMESSAGE_H
#define FDBSERVER_ENCRYPTEDMUTATIONMESSAGE_H
#pragma once
#include "fdbclient/CommitTransaction.h"
#include "flow/BlobCipher.h"
struct EncryptedMutationMessage {
BlobCipherEncryptHeader header;
StringRef encrypted;
EncryptedMutationMessage() {}
std::string toString() const {
return format("code: %d, encryption info: %s",
MutationRef::Reserved_For_EncryptedMutationMessage,
header.toString().c_str());
}
template <class Ar>
void serialize(Ar& ar) {
uint8_t poly = MutationRef::Reserved_For_EncryptedMutationMessage;
serializer(ar, poly, header, encrypted);
}
static bool startsEncryptedMutationMessage(uint8_t byte) {
return byte == MutationRef::Reserved_For_EncryptedMutationMessage;
}
template <class Ar>
static bool isNextIn(Ar& ar) {
return startsEncryptedMutationMessage(*(const uint8_t*)ar.peekBytes(1));
}
// Encrypt given mutation and return an EncryptedMutationMessage.
static EncryptedMutationMessage encrypt(
Arena& arena,
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
const EncryptCipherDomainId& domainId,
const MutationRef& mutation) {
ASSERT_NE(domainId, ENCRYPT_INVALID_DOMAIN_ID);
auto textCipherItr = cipherKeys.find(domainId);
auto headerCipherItr = cipherKeys.find(ENCRYPT_HEADER_DOMAIN_ID);
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
uint8_t iv[AES_256_IV_LENGTH];
generateRandomData(iv, AES_256_IV_LENGTH);
BinaryWriter bw(AssumeVersion(g_network->protocolVersion()));
bw << mutation;
EncryptedMutationMessage encrypted_mutation;
EncryptBlobCipherAes265Ctr cipher(textCipherItr->second,
headerCipherItr->second,
iv,
AES_256_IV_LENGTH,
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
encrypted_mutation.encrypted =
cipher
.encrypt(static_cast<const uint8_t*>(bw.getData()), bw.getLength(), &encrypted_mutation.header, arena)
->toStringRef();
return encrypted_mutation;
}
// Encrypt system key space mutation and return an EncryptedMutationMessage.
static EncryptedMutationMessage encryptMetadata(
Arena& arena,
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
const MutationRef& mutation) {
return encrypt(arena, cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, mutation);
}
// Read an EncryptedMutationMessage from given reader, decrypt and return the encrypted mutation.
// Also return decrypt buffer through buf, if it is specified.
template <class Ar>
static MutationRef decrypt(Ar& ar,
Arena& arena,
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys,
StringRef* buf = nullptr) {
EncryptedMutationMessage msg;
ar >> msg;
auto textCipherItr = cipherKeys.find(msg.header.cipherTextDetails);
auto headerCipherItr = cipherKeys.find(msg.header.cipherHeaderDetails);
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, msg.header.iv);
StringRef plaintext =
cipher.decrypt(msg.encrypted.begin(), msg.encrypted.size(), msg.header, arena)->toStringRef();
if (buf != nullptr) {
*buf = plaintext;
}
ArenaReader reader(arena, plaintext, AssumeVersion(g_network->protocolVersion()));
MutationRef mutation;
reader >> mutation;
return mutation;
}
};
#endif

View File

@ -1,5 +1,5 @@
/*
* GetCipherKeys.h
* GetEncryptCipherKeys.h
*
* This source file is part of the FoundationDB open source project
*

View File

@ -32,7 +32,6 @@
#include "fdbrpc/simulator.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/ServerDBInfo.h"

View File

@ -52,7 +52,9 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbclient/VersionedMap.h"
#include "fdbserver/EncryptedMutationMessage.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/GetEncryptCipherKeys.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LatencyBandConfig.h"
@ -7092,7 +7094,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
state UpdateEagerReadInfo eager;
state FetchInjectionInfo fii;
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
state Optional<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> cipherKeys;
state bool collectingCipherKeys = false;
// Collect eager read keys.
// If encrypted mutation is encountered, we collect cipher details and fetch cipher keys, then start over.
loop {
state uint64_t changeCounter = data->shardChangeCounter;
bool epochEnd = false;
@ -7100,6 +7106,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
bool firstMutation = true;
bool dbgLastMessageWasProtocol = false;
std::unordered_set<BlobCipherDetails> cipherDetails;
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
cloneCursor2 = cursor->cloneNoMore();
@ -7122,47 +7130,72 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
EncryptedMutationMessage::isNextIn(cloneReader) && !cipherKeys.present()) {
// Encrypted mutation found, but cipher keys haven't been fetch.
// Collect cipher details to fetch cipher keys in one batch.
EncryptedMutationMessage emm;
cloneReader >> emm;
cipherDetails.insert(emm.header.cipherTextDetails);
cipherDetails.insert(emm.header.cipherHeaderDetails);
collectingCipherKeys = true;
} else {
MutationRef msg;
cloneReader >> msg;
if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
EncryptedMutationMessage::isNextIn(cloneReader)) {
assert(cipherKeys.present());
msg = EncryptedMutationMessage::decrypt(cloneReader, eager.arena, cipherKeys.get());
} else {
cloneReader >> msg;
}
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (!collectingCipherKeys) {
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
ASSERT(dbgLastMessageWasProtocol);
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
ASSERT(dbgLastMessageWasProtocol);
}
eager.addMutation(msg);
dbgLastMessageWasProtocol = false;
}
eager.addMutation(msg);
dbgLastMessageWasProtocol = false;
}
}
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
// version in the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
// actor until it was completed.
if (collectingCipherKeys) {
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
wait(getEncryptCipherKeys(data->db, cipherDetails));
cipherKeys = getCipherKeysResult;
collectingCipherKeys = false;
eager = UpdateEagerReadInfo();
} else {
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
// version in the middle of a rolled back version range.
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
// actor until it was completed.
}
for (auto& c : fii.changes)
eager.addMutations(c.mutations);
wait(doEagerReads(data, &eager));
if (data->shardChangeCounter == changeCounter)
break;
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
// again.
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
// only selectively
eager = UpdateEagerReadInfo();
}
for (auto& c : fii.changes)
eager.addMutations(c.mutations);
wait(doEagerReads(data, &eager));
if (data->shardChangeCounter == changeCounter)
break;
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
// again.
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
// only selectively
eager = UpdateEagerReadInfo();
}
data->eagerReadsLatencyHistogram->sampleSeconds(now() - start);
@ -7255,7 +7288,12 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
spanContext = scm.spanContext;
} else {
MutationRef msg;
rd >> msg;
if (rd.protocolVersion().hasEncryptionAtRest() && EncryptedMutationMessage::isNextIn(rd)) {
ASSERT(cipherKeys.present());
msg = EncryptedMutationMessage::decrypt(rd, rd.arena(), cipherKeys.get());
} else {
rd >> msg;
}
Span span("SS:update"_loc, spanContext);
span.addAttribute("key"_sr, msg.param1);
@ -7435,7 +7473,9 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
return Void(); // update will get called again ASAP
} catch (Error& err) {
state Error e = err;
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
if (e.code() == error_code_encrypt_keys_fetch_failed) {
TraceEvent(SevWarn, "SSUpdateError", data->thisServerID).error(e).backtrace();
} else if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
} else if (e.code() == error_code_please_reboot) {
wait(data->durableInProgress);

View File

@ -21,6 +21,7 @@
#include "flow/EncryptUtils.h"
#include "flow/Trace.h"
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
std::string getEncryptDbgTraceKey(std::string_view prefix,
@ -29,12 +30,15 @@ std::string getEncryptDbgTraceKey(std::string_view prefix,
Optional<EncryptCipherBaseKeyId> baseCipherId) {
// Construct the TraceEvent field key ensuring its uniqueness and compliance to TraceEvent field validator and log
// parsing tools
std::string dName = domainName.toString();
// Underscores are invalid in trace event detail name.
boost::replace_all(dName, "_", "-");
if (baseCipherId.present()) {
boost::format fmter("%s.%lld.%s.%llu");
return boost::str(boost::format(fmter % prefix % domainId % domainName.toString() % baseCipherId.get()));
return boost::str(boost::format(fmter % prefix % domainId % dName % baseCipherId.get()));
} else {
boost::format fmter("%s.%lld.%s");
return boost::str(boost::format(fmter % prefix % domainId % domainName.toString()));
return boost::str(boost::format(fmter % prefix % domainId % dName));
}
}

View File

@ -29,14 +29,14 @@
#include <string>
#include <string_view>
#define ENCRYPT_INVALID_DOMAIN_ID 0
#define ENCRYPT_INVALID_DOMAIN_ID -1
#define ENCRYPT_INVALID_CIPHER_KEY_ID 0
#define ENCRYPT_INVALID_RANDOM_SALT 0
#define AUTH_TOKEN_SIZE 16
#define SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID -1
#define ENCRYPT_HEADER_DOMAIN_ID -2
#define SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID -2
#define ENCRYPT_HEADER_DOMAIN_ID -3
const std::string FDB_DEFAULT_ENCRYPT_DOMAIN_NAME = "FdbDefaultEncryptDomain";

View File

@ -172,6 +172,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, EncryptionAtRest);
};
template <>

View File

@ -315,6 +315,7 @@ ERROR( encrypt_update_cipher, 2705, "Attempt to update encryption cipher key")
ERROR( encrypt_invalid_id, 2706, "Invalid encryption cipher details")
ERROR( encrypt_keys_fetch_failed, 2707, "Encryption keys fetch from external KMS failed")
ERROR( encrypt_invalid_kms_config, 2708, "Invalid encryption/kms configuration: discovery-url, validation-token, endpoint etc.")
ERROR( encrypt_unsupported, 2709, "Encryption not supported")
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error