Merge branch 'apple:main' into dbcorever

This commit is contained in:
Bharadwaj V.R 2022-04-20 06:16:27 -07:00 committed by GitHub
commit a711c55061
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 468 additions and 246 deletions

View File

@ -253,25 +253,43 @@ endif()
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
)
if(NOT USE_SANITIZER)
add_test(NAME fdb_c_upgrade_single_threaded
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT USE_SANITIZER)
add_test(NAME fdb_c_upgrade_single_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "6.3.23"
--upgrade-path "6.3.23" "7.0.0" "7.2.0"
--process-number 1
)
)
add_test(NAME fdb_c_upgrade_multi_threaded
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "6.3.23"
--process-number 3
)
endif()
add_test(NAME fdb_c_upgrade_single_threaded_700api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "7.0.0" "7.2.0"
--process-number 1
)
add_test(NAME fdb_c_upgrade_multi_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.2.0"
--process-number 3
)
add_test(NAME fdb_c_upgrade_multi_threaded_700api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "7.2.0"
--process-number 3
)
endif()
endif()

View File

@ -113,8 +113,8 @@ Implementing a C++ Workload
===========================
In order to implement a workload, one has to build a shared library that links against the fdb client library. This library has to
exppse a function (with C linkage) called workloadFactory which needs to return a pointer to an object of type ``FDBWorkloadFactory``.
This mechanism allows the other to implement as many workloads within one library as she wants. To do this the pure virtual classes
expose a function (with C linkage) called workloadFactory which needs to return a pointer to an object of type ``FDBWorkloadFactory``.
This mechanism allows the author to implement as many workloads within one library as she wants. To do this the pure virtual classes
``FDBWorkloadFactory`` and ``FDBWorkload`` have to be implemented.
.. function:: FDBWorkloadFactory* workloadFactory(FDBLogger*)

View File

@ -1055,7 +1055,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
auto& ni = rep.get().mutate();
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
clientInfo->set(ni);
clientInfo->setUnconditional(ni);
successIndex = index;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller

View File

@ -581,8 +581,10 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
GetRawCommittedVersionReply repFromMaster = wait(replyFromMasterFuture);
grvProxyData->minKnownCommittedVersion =
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
// TODO add to "status json"
grvProxyData->ssVersionVectorCache.applyDelta(repFromMaster.ssVersionVectorDelta);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
// TODO add to "status json"
grvProxyData->ssVersionVectorCache.applyDelta(repFromMaster.ssVersionVectorDelta);
}
grvProxyData->stats.grvGetCommittedVersionRpcDist->sampleSeconds(now() - grvConfirmEpochLive);
GetReadVersionReply rep;
rep.version = repFromMaster.version;
@ -646,8 +648,10 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
}
reply.midShardSize = midShardSize;
reply.tagThrottleInfo.clear();
grvProxyData->ssVersionVectorCache.getDelta(request.maxVersion, reply.ssVersionVectorDelta);
grvProxyData->versionVectorSizeOnGRVReply.addMeasurement(reply.ssVersionVectorDelta.size());
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
grvProxyData->ssVersionVectorCache.getDelta(request.maxVersion, reply.ssVersionVectorDelta);
grvProxyData->versionVectorSizeOnGRVReply.addMeasurement(reply.ssVersionVectorDelta.size());
}
reply.proxyId = grvProxyData->dbgid;
if (!request.tags.empty()) {

View File

@ -291,8 +291,6 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
@ -514,6 +512,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool execOpCommitInProgress;
int txsTags;
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
explicit LogData(TLogData* tLogData,
TLogInterface interf,
Tag remoteTag,
@ -819,8 +820,8 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->toBePopped.find(inputTag) == self->toBePopped.end() || to > self->toBePopped[inputTag]) {
self->toBePopped[inputTag] = to;
if (logData->toBePopped.find(inputTag) == logData->toBePopped.end() || to > logData->toBePopped[inputTag]) {
logData->toBePopped[inputTag] = to;
}
// add the pop to the toBePopped map
TraceEvent(SevDebug, "IgnoringPopRequest")
@ -882,11 +883,11 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData
self->ignorePopRequest = false;
self->ignorePopUid = "";
self->ignorePopDeadline = 0.0;
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
self->toBePopped.clear();
logData->toBePopped.clear();
wait(waitForAll(ignoredPops));
TraceEvent("ResetIgnorePopRequest")
.detail("Now", g_network->now())
@ -1937,7 +1938,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
@ -1951,7 +1952,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
logData->toBePopped.clear();
enablePopReq.reply.send(Void());
return Void();
}

View File

@ -355,8 +355,6 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
TLogData(UID dbgid,
@ -596,6 +594,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool execOpCommitInProgress;
int txsTags;
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
explicit LogData(TLogData* tLogData,
TLogInterface interf,
Tag remoteTag,
@ -1400,8 +1401,8 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (self->toBePopped.find(inputTag) == self->toBePopped.end() || to > self->toBePopped[inputTag]) {
self->toBePopped[inputTag] = to;
if (logData->toBePopped.find(inputTag) == logData->toBePopped.end() || to > logData->toBePopped[inputTag]) {
logData->toBePopped[inputTag] = to;
}
// add the pop to the toBePopped map
TraceEvent(SevDebug, "IgnoringPopRequest")
@ -1478,11 +1479,11 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData
self->ignorePopRequest = false;
self->ignorePopUid = "";
self->ignorePopDeadline = 0.0;
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
self->toBePopped.clear();
logData->toBePopped.clear();
wait(waitForAll(ignoredPops));
TraceEvent("ResetIgnorePopRequest")
.detail("Now", g_network->now())
@ -2382,7 +2383,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
self->ignorePopUid = "";
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
}
@ -2396,7 +2397,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
wait(waitForAll(ignoredPops));
self->toBePopped.clear();
logData->toBePopped.clear();
enablePopReq.reply.send(Void());
return Void();
}

View File

@ -358,7 +358,6 @@ struct TLogData : NonCopyable {
FlowLock persistentDataCommitLock;
// Beginning of fields used by snapshot based backup and restore
bool ignorePopRequest; // ignore pop request from storage servers
double ignorePopDeadline; // time until which the ignorePopRequest will be
// honored
std::string ignorePopUid; // callers that set ignorePopRequest will set this
@ -366,8 +365,6 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
// End of fields used by snapshot based backup and restore
@ -388,11 +385,10 @@ struct TLogData : NonCopyable {
instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), bytesDurable(0),
targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false),
dataFolder(folder), degraded(degraded),
commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"),
LiteralStringRef("commit"),
Histogram::Unit::microseconds)) {
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopDeadline(0), dataFolder(folder),
degraded(degraded), commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"),
LiteralStringRef("commit"),
Histogram::Unit::microseconds)) {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
}
};
@ -629,6 +625,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool execOpCommitInProgress;
int txsTags;
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
explicit LogData(TLogData* tLogData,
TLogInterface interf,
Tag remoteTag,
@ -1234,14 +1233,17 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
state std::map<Tag, Version>::const_iterator it;
state int ignoredPopsPlayed = 0;
state std::map<Tag, Version> toBePopped;
toBePopped = std::move(self->toBePopped);
self->toBePopped.clear();
self->ignorePopRequest = false;
self->ignorePopDeadline = 0.0;
while (now() < self->ignorePopDeadline) {
wait(delayUntil(self->ignorePopDeadline + 0.0001));
}
toBePopped = std::move(logData->toBePopped);
logData->toBePopped.clear();
self->ignorePopUid = "";
for (it = toBePopped.cbegin(); it != toBePopped.cend(); ++it) {
const auto& [tag, version] = *it;
TraceEvent("PlayIgnoredPop").detail("Tag", tag.toString()).detail("Version", version);
TraceEvent("PlayIgnoredPop", logData->logId).detail("Tag", tag.toString()).detail("Version", version);
ignoredPops.push_back(tLogPopCore(self, tag, version, logData));
if (++ignoredPopsPlayed % SERVER_KNOBS->TLOG_POP_BATCH_SIZE == 0) {
TEST(true); // Yielding while processing pop requests
@ -1249,20 +1251,22 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
}
}
wait(waitForAll(ignoredPops));
TraceEvent("ResetIgnorePopRequest")
.detail("IgnorePopRequest", self->ignorePopRequest)
.detail("IgnorePopDeadline", self->ignorePopDeadline);
TraceEvent("ResetIgnorePopRequest", logData->logId).detail("IgnorePopDeadline", self->ignorePopDeadline);
return Void();
}
ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData> logData) {
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
if (now() < self->ignorePopDeadline) {
TraceEvent(SevDebug, "IgnoringPopRequest", logData->logId).detail("IgnorePopDeadline", self->ignorePopDeadline);
auto& v = self->toBePopped[req.tag];
if (logData->toBePopped.empty()) {
logData->addActor.send(processPopRequests(self, logData));
}
auto& v = logData->toBePopped[req.tag];
v = std::max(v, req.to);
TraceEvent(SevDebug, "IgnoringPopRequest")
TraceEvent(SevDebug, "IgnoringPopRequest", logData->logId)
.detail("IgnorePopDeadline", self->ignorePopDeadline)
.detail("Tag", req.tag.toString())
.detail("Version", req.to);
@ -2571,15 +2575,15 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
enablePopReq.reply.sendError(operation_failed());
return Void();
}
TraceEvent("EnableTLogPlayAllIgnoredPops2")
TraceEvent("EnableTLogPlayAllIgnoredPops2", logData->logId)
.detail("UidStr", enablePopReq.snapUID.toString())
.detail("IgnorePopUid", self->ignorePopUid)
.detail("IgnorePopRequest", self->ignorePopRequest)
.detail("IgnorePopDeadline", self->ignorePopDeadline)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
self->ignorePopDeadline = 0;
wait(processPopRequests(self, logData));
enablePopReq.reply.send(Void());
return Void();
@ -2681,9 +2685,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
req.reply.sendError(operation_failed());
} else {
// FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
self->ignorePopRequest = true;
self->ignorePopUid = req.snapUID.toString();
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
self->ignorePopDeadline = now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
req.reply.send(Void());
}
}
@ -2693,11 +2696,6 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) {
logData->addActor.send(tLogSnapCreate(snapReq, self, logData));
}
when(wait(self->ignorePopRequest ? delayUntil(self->ignorePopDeadline) : Never())) {
TEST(true); // Hit ignorePopDeadline
TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline);
logData->addActor.send(processPopRequests(self, logData));
}
}
}

View File

@ -275,8 +275,10 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
reply.locked = self->databaseLocked;
reply.metadataVersion = self->proxyMetadataVersion;
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
self->ssVersionVector.getDelta(req.maxVersion, reply.ssVersionVectorDelta);
self->versionVectorSizeOnCVReply.addMeasurement(reply.ssVersionVectorDelta.size());
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
self->ssVersionVector.getDelta(req.maxVersion, reply.ssVersionVectorDelta);
self->versionVectorSizeOnCVReply.addMeasurement(reply.ssVersionVectorDelta.size());
}
req.reply.send(reply);
}
when(ReportRawCommittedVersionRequest req =

View File

@ -806,7 +806,7 @@ public:
Promise<UID> clusterId;
// The version the cluster starts on. This value is not persisted and may
// not be valid after a recovery.
Version initialClusterVersion = invalidVersion;
Version initialClusterVersion = 1;
UID thisServerID;
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair

View File

@ -747,6 +747,41 @@ ACTOR Future<Void> clearData(Database cx) {
wait(tr.onError(e));
}
}
tr = Transaction(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
state RangeResult rangeResult = wait(tr.getRange(normalKeys, 1));
state Optional<Key> tenantPrefix;
// If the result is non-empty, it is possible that there is some bad interaction between the test
// and the optional simulated default tenant:
//
// 1. If the test is creating/deleting tenants itself, then it should disable the default tenant.
// 2. If the test is opening Database objects itself, then it needs to propagate the default tenant
// value from the existing Database.
// 3. If the test is using raw access or system key access and writing to the normal key-space, then
// it should disable the default tenant.
if (!rangeResult.empty()) {
if (cx->defaultTenant.present()) {
TenantMapEntry entry = wait(ManagementAPI::getTenant(cx.getReference(), cx->defaultTenant.get()));
tenantPrefix = entry.prefix;
}
TraceEvent(SevError, "TesterClearFailure")
.detail("DefaultTenant", cx->defaultTenant)
.detail("TenantPrefix", tenantPrefix)
.detail("FirstKey", rangeResult[0].key);
ASSERT(false);
}
break;
} catch (Error& e) {
TraceEvent(SevWarn, "TesterCheckDatabaseClearedError").error(e);
wait(tr.onError(e));
}
}
return Void();
}

View File

@ -143,7 +143,7 @@ ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo> con
loop {
ClientDBInfo ni = db->get().client;
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
info->set(ni);
info->setUnconditional(ni);
wait(db->onChange());
}
}

View File

@ -106,7 +106,6 @@ struct DataLossRecoveryWorkload : TestWorkload {
loop {
try {
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
const bool equal = !expectedValue.isError() && res == expectedValue.get();
if (!equal) {
@ -128,7 +127,6 @@ struct DataLossRecoveryWorkload : TestWorkload {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
if (value.present()) {
tr.set(key, value.get());
} else {
@ -232,7 +230,6 @@ struct DataLossRecoveryWorkload : TestWorkload {
state Transaction validateTr(cx);
loop {
try {
validateTr.setOption(FDBTransactionOptions::RAW_ACCESS);
Standalone<VectorRef<const char*>> addresses = wait(validateTr.getAddressesForKey(keys.begin));
// The move function is not what we are testing here, crash the test if the move fails.
ASSERT(addresses.size() == 1);

View File

@ -121,6 +121,7 @@ struct EncryptionOpsWorkload : TestWorkload {
EncryptCipherDomainId maxDomainId;
EncryptCipherBaseKeyId minBaseCipherId;
EncryptCipherBaseKeyId headerBaseCipherId;
EncryptCipherRandomSalt headerRandomSalt;
EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
mode = getOption(options, LiteralStringRef("fixedSize"), 1);
@ -134,6 +135,7 @@ struct EncryptionOpsWorkload : TestWorkload {
maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5;
minBaseCipherId = 100;
headerBaseCipherId = wcx.clientId * 100 + 1;
headerRandomSalt = wcx.clientId * 100 + 1;
metrics = std::make_unique<WorkloadMetrics>();
@ -183,7 +185,8 @@ struct EncryptionOpsWorkload : TestWorkload {
// insert the Encrypt Header cipherKey
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen);
cipherKeyCache->insertCipherKey(
ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen, headerRandomSalt);
TraceEvent("SetupCipherEssentials_Done").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
}
@ -209,6 +212,29 @@ struct EncryptionOpsWorkload : TestWorkload {
TraceEvent("UpdateBaseCipher").detail("DomainId", encryptDomainId).detail("BaseCipherId", *nextBaseCipherId);
}
Reference<BlobCipherKey> getEncryptionKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
const bool simCacheMiss = deterministicRandom()->randomInt(1, 100) < 15;
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
if (simCacheMiss) {
TraceEvent("SimKeyCacheMiss").detail("EncyrptDomainId", domainId).detail("BaseCipherId", baseCipherId);
// simulate KeyCache miss that may happen during decryption; insert a CipherKey with known 'salt'
cipherKeyCache->insertCipherKey(domainId,
baseCipherId,
cipherKey->rawBaseCipher(),
cipherKey->getBaseCipherLen(),
cipherKey->getSalt());
// Ensure the update was a NOP
Reference<BlobCipherKey> cKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
ASSERT(cKey->isEqual(cipherKey));
}
return cipherKey;
}
Reference<EncryptBuf> doEncryption(Reference<BlobCipherKey> textCipherKey,
Reference<BlobCipherKey> headerCipherKey,
uint8_t* payload,
@ -240,11 +266,12 @@ struct EncryptionOpsWorkload : TestWorkload {
ASSERT_EQ(header.flags.headerVersion, EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION);
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> headerCipherKey = cipherKeyCache->getCipherKey(
header.cipherHeaderDetails.encryptDomainId, header.cipherHeaderDetails.baseCipherId);
Reference<BlobCipherKey> cipherKey = getEncryptionKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> headerCipherKey = getEncryptionKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(cipherKey.isValid());
ASSERT(cipherKey->isEqual(orgCipherKey));
@ -297,7 +324,7 @@ struct EncryptionOpsWorkload : TestWorkload {
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(encryptDomainId);
// Each client working with their own version of encryptHeaderCipherKey, avoid using getLatest()
Reference<BlobCipherKey> headerCipherKey =
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId);
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, headerRandomSalt);
auto end = std::chrono::high_resolution_clock::now();
metrics->updateKeyDerivationTime(std::chrono::duration<double, std::nano>(end - start).count());

View File

@ -160,7 +160,6 @@ struct SSCheckpointWorkload : TestWorkload {
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
break;
} catch (Error& e) {
@ -186,7 +185,6 @@ struct SSCheckpointWorkload : TestWorkload {
loop {
try {
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
const bool equal = !expectedValue.isError() && res == expectedValue.get();
if (!equal) {
@ -209,7 +207,6 @@ struct SSCheckpointWorkload : TestWorkload {
state Version version;
loop {
try {
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
if (value.present()) {
tr.set(key, value.get());
} else {

View File

@ -90,17 +90,19 @@ struct PrivateEndpoints : TestWorkload {
}
explicit PrivateEndpoints(WorkloadContext const& wcx) : TestWorkload(wcx) {
// The commented out request streams below can't be default initialized properly
// as they won't initialize all of their memory which causes valgrind to complain.
startAfter = getOption(options, "startAfter"_sr, 10.0);
runFor = getOption(options, "runFor"_sr, 10.0);
addTestFor(&GrvProxyInterface::waitFailure);
addTestFor(&GrvProxyInterface::getHealthMetrics);
addTestFor(&CommitProxyInterface::getStorageServerRejoinInfo);
// addTestFor(&CommitProxyInterface::getStorageServerRejoinInfo);
addTestFor(&CommitProxyInterface::waitFailure);
addTestFor(&CommitProxyInterface::txnState);
addTestFor(&CommitProxyInterface::getHealthMetrics);
addTestFor(&CommitProxyInterface::proxySnapReq);
// addTestFor(&CommitProxyInterface::txnState);
// addTestFor(&CommitProxyInterface::getHealthMetrics);
// addTestFor(&CommitProxyInterface::proxySnapReq);
addTestFor(&CommitProxyInterface::exclusionSafetyCheckReq);
addTestFor(&CommitProxyInterface::getDDMetrics);
// addTestFor(&CommitProxyInterface::getDDMetrics);
}
std::string description() const override { return WorkloadName; }
Future<Void> start(Database const& cx) override { return _start(this, cx); }

View File

@ -1217,92 +1217,6 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
wait(tx->onError(e));
}
}
{
state double r_sample_rate = deterministicRandom()->random01();
state int64_t r_size_limit = deterministicRandom()->randomInt64(1e3, 1e6);
// update the sample rate and size limit
loop {
try {
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
Value(boost::lexical_cast<std::string>(r_sample_rate)));
tx->set(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
Value(boost::lexical_cast<std::string>(r_size_limit)));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// commit successfully, verify the system key changed
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> sampleRate = wait(tx->get(fdbClientInfoTxnSampleRate));
ASSERT(sampleRate.present());
ASSERT(r_sample_rate == BinaryReader::fromStringRef<double>(sampleRate.get(), Unversioned()));
Optional<Value> sizeLimit = wait(tx->get(fdbClientInfoTxnSizeLimit));
ASSERT(sizeLimit.present());
ASSERT(r_size_limit == BinaryReader::fromStringRef<int64_t>(sizeLimit.get(), Unversioned()));
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// Change back to default
loop {
try {
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("default"));
tx->set(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("default"));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// Test invalid values
loop {
try {
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set((deterministicRandom()->coinflip() ? LiteralStringRef("client_txn_sample_rate")
: LiteralStringRef("client_txn_size_limit"))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("invalid_value"));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "profile" && !valueObj["retriable"].get_bool());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
}
// data_distribution & maintenance get
loop {
try {

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ReadYourWrites.h"
@ -113,7 +114,17 @@ struct WriteDuringReadWorkload : TestWorkload {
std::string description() const override { return "WriteDuringRead"; }
Future<Void> setup(Database const& cx) override { return Void(); }
ACTOR Future<Void> setupImpl(WriteDuringReadWorkload* self, Database cx) {
// If we are operating in the default tenant but enable raw access, we should only write keys
// in the tenant's key-space.
if (self->useSystemKeys && cx->defaultTenant.present() && self->keyPrefix < systemKeys.begin) {
TenantMapEntry entry = wait(ManagementAPI::getTenant(cx.getReference(), cx->defaultTenant.get()));
self->keyPrefix = entry.prefix.withSuffix(self->keyPrefix).toString();
}
return Void();
}
Future<Void> setup(Database const& cx) override { return setupImpl(this, cx); }
Future<Void> start(Database const& cx) override {
if (clientId == 0)
@ -694,7 +705,7 @@ struct WriteDuringReadWorkload : TestWorkload {
Key getKeyForIndex(int idx) {
idx += minNode;
if (adjacentKeys) {
return Key(idx ? keyPrefix + std::string(idx, '\x00') : "");
return Key(keyPrefix + (idx ? std::string(idx, '\x00') : ""));
} else {
return Key(keyPrefix + format("%010d", idx));
}

View File

@ -19,6 +19,7 @@
*/
#include "flow/BlobCipher.h"
#include "flow/EncryptUtils.h"
#include "flow/Knobs.h"
#include "flow/Error.h"
@ -32,6 +33,7 @@
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#if ENCRYPTION_ENABLED
@ -54,12 +56,14 @@ BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
salt = nondeterministicRandom()->randomUInt64();
}
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
/*TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);*/
}
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt) {
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
}
void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
@ -82,6 +86,13 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
applyHmacSha256Derivation();
// update the key creation time
creationTime = now();
TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);
}
void BlobCipherKey::applyHmacSha256Derivation() {
@ -112,25 +123,77 @@ BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
return getCipherByBaseCipherId(latestBaseCipherKeyId);
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
return std::make_pair(baseCipherKeyId, salt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherKeyId);
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
return getCipherByBaseCipherId(latestBaseCipherKeyId, latestRandomSalt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(getCacheKey(baseCipherKeyId, salt));
if (itr == keyIdCache.end()) {
TraceEvent("CipherByBaseCipherId_KeyMissing")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherKeyId)
.detail("Salt", salt);
throw encrypt_key_not_found();
}
return itr->second;
}
void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId,
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
// BaseCipherKeys are immutable, given the routine invocation updates 'latestCipher',
// ensure no key-tampering is done
try {
Reference<BlobCipherKey> cipherKey = getLatestCipherKey();
if (cipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
// Key is already present; nothing more to do.
return;
} else {
TraceEvent("InsertBaseCipherKey_UpdateCipher")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
} catch (Error& e) {
if (e.code() != error_code_encrypt_key_not_found) {
throw e;
}
}
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(cipherKey->getBaseCipherId(), cipherKey->getSalt());
keyIdCache.emplace(cacheKey, cipherKey);
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
latestRandomSalt = cipherKey->getSalt();
}
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(baseCipherId, salt);
// BaseCipherKeys are immutable, ensure that cached value doesn't get updated.
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherId);
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(cacheKey);
if (itr != keyIdCache.end()) {
if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
@ -146,9 +209,9 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipher
}
}
keyIdCache.emplace(baseCipherId, makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen));
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt);
keyIdCache.emplace(cacheKey, cipherKey);
}
void BlobCipherKeyIdCache::cleanup() {
@ -197,6 +260,41 @@ void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
}
}
void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID) {
throw encrypt_invalid_id();
}
try {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
// Add mapping to track new encryption domain
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
domainCacheMap.emplace(domainId, keyIdCache);
} else {
// Track new baseCipher keys
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
}
TraceEvent("InsertCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherKeyId", baseCipherId)
.detail("Salt", salt);
} catch (Error& e) {
TraceEvent("InsertCipherKey_Failed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId)
.detail("Salt", salt);
throw;
}
}
Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
@ -217,17 +315,19 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip
}
Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId) {
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
TraceEvent("GetCipherKey_MissingDomainId").detail("DomainId", domainId);
throw encrypt_key_not_found();
}
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
return keyIdCache->getCipherByBaseCipherId(baseCipherId);
return keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
}
void BlobCipherKeyCache::resetEncyrptDomainId(const EncryptCipherDomainId domainId) {
void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
throw encrypt_key_not_found();
@ -291,8 +391,8 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader));
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs to be
// generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs
// to be generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
const int allocSize = authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE
? plaintextLen + AES_BLOCK_SIZE + sizeof(BlobCipherEncryptHeader)
@ -340,6 +440,7 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
// Populate header encryption-key details
header->cipherHeaderDetails.encryptDomainId = headerCipherKey->getDomainId();
header->cipherHeaderDetails.baseCipherId = headerCipherKey->getBaseCipherId();
header->cipherHeaderDetails.salt = headerCipherKey->getSalt();
// Populate header authToken details
if (header->flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE) {
@ -624,8 +725,8 @@ void forceLinkBlobCipherTests() {}
// 3. Inserting of 'identical' cipherKey (already cached) more than once works as desired.
// 4. Inserting of 'non-identical' cipherKey (already cached) more than once works as desired.
// 5. Validation encryption ops (correctness):
// 5.1. Encyrpt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncyrptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 5.1. Encrypt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncryptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 6. Cache cleanup
// 6.1 cleanup cipherKeys by given encryptDomainId
// 6.2. Cleanup all cached cipherKeys
@ -639,6 +740,7 @@ TEST_CASE("flow/BlobCipher") {
int len;
EncryptCipherBaseKeyId keyId;
std::unique_ptr<uint8_t[]> key;
EncryptCipherRandomSalt generatedSalt;
BaseCipher(const EncryptCipherDomainId& dId, const EncryptCipherBaseKeyId& kId)
: domainId(dId), len(deterministicRandom()->randomInt(AES_256_KEY_LENGTH / 2, AES_256_KEY_LENGTH + 1)),
@ -671,6 +773,8 @@ TEST_CASE("flow/BlobCipher") {
cipherKeyCache->insertCipherKey(
baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len);
Reference<BlobCipherKey> fetchedKey = cipherKeyCache->getLatestCipherKey(baseCipher->domainId);
baseCipher->generatedSalt = fetchedKey->getSalt();
}
}
// insert EncryptHeader BlobCipher key
@ -684,7 +788,8 @@ TEST_CASE("flow/BlobCipher") {
for (auto& domainItr : domainKeyMap) {
for (auto& baseKeyItr : domainItr.second) {
Reference<BaseCipher> baseCipher = baseKeyItr.second;
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId);
Reference<BlobCipherKey> cipherKey =
cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId, baseCipher->generatedSalt);
ASSERT(cipherKey.isValid());
// validate common cipher properties - domainId, baseCipherId, baseCipherLen, rawBaseCipher
ASSERT_EQ(cipherKey->getBaseCipherId(), baseCipher->keyId);
@ -759,7 +864,8 @@ TEST_CASE("flow/BlobCipher") {
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(
tCipherKeyKey, Reference<BlobCipherKey>(), &header.cipherTextDetails.iv[0]);
@ -846,9 +952,11 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId);
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, &header.cipherTextDetails.iv[0]);
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
@ -949,9 +1057,11 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId);
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
ASSERT(tCipherKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, &header.cipherTextDetails.iv[0]);
@ -1047,7 +1157,7 @@ TEST_CASE("flow/BlobCipher") {
// Validate dropping encyrptDomainId cached keys
const EncryptCipherDomainId candidate = deterministicRandom()->randomInt(minDomainId, maxDomainId);
cipherKeyCache->resetEncyrptDomainId(candidate);
cipherKeyCache->resetEncryptDomainId(candidate);
std::vector<Reference<BlobCipherKey>> cachedKeys = cipherKeyCache->getAllCiphers(candidate);
ASSERT(cachedKeys.empty());

View File

@ -82,11 +82,11 @@ private:
// This header is persisted along with encrypted buffer, it contains information necessary
// to assist decrypting the buffers to serve read requests.
//
// The total space overhead is 96 bytes.
// The total space overhead is 104 bytes.
#pragma pack(push, 1) // exact fit - no padding
typedef struct BlobCipherEncryptHeader {
static constexpr int headerSize = 96;
static constexpr int headerSize = 104;
union {
struct {
uint8_t size; // reading first byte is sufficient to determine header
@ -101,7 +101,7 @@ typedef struct BlobCipherEncryptHeader {
// Cipher text encryption information
struct {
// Encyrption domain boundary identifier.
// Encryption domain boundary identifier.
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier
EncryptCipherBaseKeyId baseCipherId{};
@ -116,6 +116,8 @@ typedef struct BlobCipherEncryptHeader {
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier.
EncryptCipherBaseKeyId baseCipherId{};
// Random salt
EncryptCipherRandomSalt salt{};
} cipherHeaderDetails;
// Encryption header is stored as plaintext on a persistent storage to assist reconstruction of cipher-key(s) for
@ -164,6 +166,11 @@ public:
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen);
BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt);
uint8_t* data() const { return cipher.get(); }
uint64_t getCreationTime() const { return creationTime; }
@ -206,7 +213,7 @@ private:
// This interface allows FDB processes participating in encryption to store and
// index recently used encyption cipher keys. FDB encryption has two dimensions:
// 1. Mapping on cipher encryption keys per "encryption domains"
// 2. Per encryption domain, the cipher keys are index using "baseCipherKeyId".
// 2. Per encryption domain, the cipher keys are index using {baseCipherKeyId, salt} tuple.
//
// The design supports NIST recommendation of limiting lifetime of an encryption
// key. For details refer to:
@ -214,10 +221,10 @@ private:
//
// Below gives a pictoral representation of in-memory datastructure implemented
// to index encryption keys:
// { encryptionDomain -> { baseCipherId -> cipherKey } }
// { encryptionDomain -> { {baseCipherId, salt} -> cipherKey } }
//
// Supported cache lookups schemes:
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId } tuple.
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId, salt } triplet.
// 2. Lookup latest cipher key for a given encryptionDomainId.
//
// Client is responsible to handle cache-miss usecase, the corrective operation
@ -226,15 +233,29 @@ private:
// required encryption key, however, CPs/SSs cache-miss would result in RPC to
// EncryptKeyServer to refresh the desired encryption key.
using BlobCipherKeyIdCacheMap = std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>;
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2>& pair) const {
auto hash1 = std::hash<T1>{}(pair.first);
auto hash2 = std::hash<T2>{}(pair.second);
// Equal hashes XOR would be ZERO.
return hash1 == hash2 ? hash1 : hash1 ^ hash2;
}
};
using BlobCipherKeyIdCacheKey = std::pair<EncryptCipherBaseKeyId, EncryptCipherRandomSalt>;
using BlobCipherKeyIdCacheMap = std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>;
using BlobCipherKeyIdCacheMapCItr =
std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>::const_iterator;
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>::const_iterator;
struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> {
public:
BlobCipherKeyIdCache();
explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId);
BlobCipherKeyIdCacheKey getCacheKey(const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
// API returns the last inserted cipherKey.
// If none exists, 'encrypt_key_not_found' is thrown.
@ -243,14 +264,33 @@ public:
// API returns cipherKey corresponding to input 'baseCipherKeyId'.
// If none exists, 'encrypt_key_not_found' is thrown.
Reference<BlobCipherKey> getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId);
Reference<BlobCipherKey> getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache.
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
// API cleanup the cache by dropping all cached cipherKeys
void cleanup();
@ -262,6 +302,7 @@ private:
EncryptCipherDomainId domainId;
BlobCipherKeyIdCacheMap keyIdCache;
EncryptCipherBaseKeyId latestBaseCipherKeyId;
EncryptCipherRandomSalt latestRandomSalt;
};
using BlobCipherDomainCacheMap = std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKeyIdCache>>;
@ -277,12 +318,32 @@ public:
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen);
// API returns the last insert cipherKey for a given encyryption domain Id.
// Enable clients to insert base encryption cipher details to the BlobCipherKeyCache.
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
// API returns the last insert cipherKey for a given encryption domain Id.
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getLatestCipherKey(const EncryptCipherDomainId& domainId);
@ -291,14 +352,16 @@ public:
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId);
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
// API returns point in time list of all 'cached' cipherKeys for a given encryption domainId.
std::vector<Reference<BlobCipherKey>> getAllCiphers(const EncryptCipherDomainId& domainId);
// API enables dropping all 'cached' cipherKeys for a given encryption domain Id.
// Useful to cleanup cache if an encryption domain gets removed/destroyed etc.
void resetEncyrptDomainId(const EncryptCipherDomainId domainId);
void resetEncryptDomainId(const EncryptCipherDomainId domainId);
static Reference<BlobCipherKeyCache> getInstance() {
if (g_network->isSimulated()) {
@ -364,7 +427,7 @@ public:
const BlobCipherEncryptHeader& header,
Arena&);
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encyrpted
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encrypted
// payload. The call is NOP unless header.flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI.
void verifyHeaderAuthToken(const BlobCipherEncryptHeader& header, Arena& arena);

View File

@ -519,7 +519,7 @@ void FastAllocator<Size>::getMagazine() {
--g_allocation_tracing_disabled;
}
#endif
block = (void**)::allocate(magazine_size * Size, false);
block = (void**)::allocate(magazine_size * Size, /*allowLargePages*/ false, /*includeGuardPages*/ true);
#endif
// void** block = new void*[ magazine_size * PSize ];

View File

@ -2037,7 +2037,22 @@ static void enableLargePages() {
#endif
}
static void* allocateInternal(size_t length, bool largePages) {
#ifndef _WIN32
static void* mmapInternal(size_t length, int flags, bool guardPages) {
if (guardPages) {
constexpr size_t pageSize = 4096;
length += 2 * pageSize; // Map enough for the guard pages
void* resultWithGuardPages = mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
mprotect(resultWithGuardPages, pageSize, PROT_NONE); // left guard page
mprotect((void*)(uintptr_t(resultWithGuardPages) + length - pageSize), pageSize, PROT_NONE); // right guard page
return (void*)(uintptr_t(resultWithGuardPages) + pageSize);
} else {
return mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
}
}
#endif
static void* allocateInternal(size_t length, bool largePages, bool guardPages) {
#ifdef _WIN32
DWORD allocType = MEM_COMMIT | MEM_RESERVE;
@ -2052,31 +2067,31 @@ static void* allocateInternal(size_t length, bool largePages) {
if (largePages)
flags |= MAP_HUGETLB;
return mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
return mmapInternal(length, flags, guardPages);
#elif defined(__APPLE__) || defined(__FreeBSD__)
int flags = MAP_PRIVATE | MAP_ANON;
return mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
return mmapInternal(length, flags, guardPages);
#else
#error Port me!
#endif
}
static bool largeBlockFail = false;
void* allocate(size_t length, bool allowLargePages) {
void* allocate(size_t length, bool allowLargePages, bool includeGuardPages) {
if (allowLargePages)
enableLargePages();
void* block = ALLOC_FAIL;
if (allowLargePages && !largeBlockFail) {
block = allocateInternal(length, true);
block = allocateInternal(length, true, includeGuardPages);
if (block == ALLOC_FAIL)
largeBlockFail = true;
}
if (block == ALLOC_FAIL)
block = allocateInternal(length, false);
block = allocateInternal(length, false, includeGuardPages);
// FIXME: SevWarnAlways trace if "close" to out of memory

View File

@ -284,7 +284,7 @@ std::string epochsToGMTString(double epochs);
void setMemoryQuota(size_t limit);
void* allocate(size_t length, bool allowLargePages);
void* allocate(size_t length, bool allowLargePages, bool includeGuardPages);
void setAffinity(int proc);

View File

@ -1,23 +1,16 @@
#!/usr/bin/env bash
set -Eeuo pipefail
set -Eeuxo pipefail
namespace=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)
POD_NUM=$(echo $POD_NAME | cut -d - -f3)
KEY="ycsb_load_${POD_NUM}_of_${NUM_PODS}_complete"
CLI=$(ls /var/dynamic-conf/bin/*/fdbcli | head -n1)
if [ ${MODE} != "load" ]; then
echo "WAITING FOR ALL PODS TO COME UP"
while [[ $(kubectl get pods -n ${namespace} -l name=ycsb,run=${RUN_ID} --field-selector=status.phase=Running | grep -cv NAME) -lt ${NUM_PODS} ]]; do
sleep 0.1
done
echo "ALL PODS ARE UP"
else
if ${CLI} --exec "get ${KEY}" | grep is ;
then
# load already completed
exit 0
fi
fi;
echo "WAITING FOR ALL PODS TO COME UP"
while [[ $(kubectl get pods -n ${namespace} -l name=ycsb,run=${RUN_ID} --field-selector=status.phase=Running | grep -cv NAME) -lt ${NUM_PODS} ]]; do
sleep 1
done
echo "ALL PODS ARE UP"
echo "RUNNING YCSB"
./bin/ycsb.sh ${MODE} foundationdb -s -P workloads/${WORKLOAD} ${YCSB_ARGS}
@ -27,7 +20,3 @@ echo "COPYING HISTOGRAMS TO S3"
aws s3 sync --sse aws:kms --exclude "*" --include "histogram.*" /tmp s3://${BUCKET}/ycsb_histograms/${namespace}/${POD_NAME}
echo "COPYING HISTOGRAMS TO S3 FINISHED"
if [ ${MODE} == "load" ]; then
${CLI} --exec "writemode on; set ${KEY} 1"
echo "WROTE LOAD COMPLETION KEY"
fi

View File

@ -19,7 +19,7 @@ from local_cluster import LocalCluster, random_secret_string
SUPPORTED_PLATFORMS = ["x86_64"]
SUPPORTED_VERSIONS = ["7.2.0", "7.1.0", "7.0.0", "6.3.24", "6.3.23",
SUPPORTED_VERSIONS = ["7.2.0", "7.1.1", "7.1.0", "7.0.0", "6.3.24", "6.3.23",
"6.3.22", "6.3.18", "6.3.17", "6.3.16", "6.3.15", "6.3.13", "6.3.12", "6.3.9", "6.2.30",
"6.2.29", "6.2.28", "6.2.27", "6.2.26", "6.2.25", "6.2.24", "6.2.23", "6.2.22", "6.2.21",
"6.2.20", "6.2.19", "6.2.18", "6.2.17", "6.2.16", "6.2.15", "6.2.10", "6.1.13", "6.1.12",
@ -353,6 +353,17 @@ class UpgradeTest:
test_retcode = self.tester_retcode
return test_retcode
def grep_logs_for_events(self, severity):
return (
subprocess.getoutput(
"grep -r 'Severity=\"{}\"' {}".format(
severity,
self.cluster.log.as_posix())
)
.rstrip()
.splitlines()
)
# Check the cluster log for errors
def check_cluster_logs(self, error_limit=100):
sev40s = (
@ -380,9 +391,28 @@ class UpgradeTest:
print(
">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails", err_cnt)
else:
print("No error found in logs")
print("No errors found in logs")
return err_cnt == 0
# Check the server and client logs for warnings and dump them
def dump_warnings_in_logs(self, limit=100):
sev30s = (
subprocess.getoutput(
"grep -r 'Severity=\"30\"' {}".format(
self.cluster.log.as_posix())
)
.rstrip()
.splitlines()
)
if (len(sev30s) == 0):
print("No warnings found in logs")
else:
print(">>>>>>>>>>>>>>>>>>>> Found {} severity 30 events (warnings):".format(
len(sev30s)))
for line in sev30s[:limit]:
print(line)
# Dump the last cluster configuration and cluster logs
def dump_cluster_logs(self):
for etc_file in glob.glob(os.path.join(self.cluster.etc, "*")):
@ -457,6 +487,7 @@ if __name__ == "__main__":
errcode = test.exec_test(args)
if not test.check_cluster_logs():
errcode = 1 if errcode == 0 else errcode
test.dump_warnings_in_logs()
if errcode != 0 and not args.disable_log_dump:
test.dump_cluster_logs()

View File

@ -5,6 +5,7 @@ storageEngineType = 0
processesPerMachine = 2
coordinators = 3
machineCount = 45
allowDefaultTenant = false
[[test]]
testTitle = 'DataLossRecovery'

View File

@ -4,6 +4,7 @@ storageEngineType = 4
processesPerMachine = 1
coordinators = 3
machineCount = 15
allowDefaultTenant = false
[[test]]
testTitle = 'PhysicalShardMove'

View File

@ -1,3 +1,6 @@
[configuration]
allowDefaultTenant = false
[[test]]
testTitle = 'PreLoad'

View File

@ -1,4 +1,5 @@
[configuration]
allowDefaultTenant = false
allowDisablingTenants = false
[[test]]

View File

@ -1,4 +1,5 @@
[configuration]
allowDefaultTenant = false
allowDisablingTenants = false
[[test]]