merge upstream/main

This commit is contained in:
Xiaoxi Wang 2022-08-15 13:30:38 -07:00
commit c9f94264f0
33 changed files with 729 additions and 311 deletions

View File

@ -302,6 +302,7 @@ namespace SummarizeTest
uniqueFileSet.Add(file.Substring(0, file.LastIndexOf("-"))); // all restarting tests end with -1.txt or -2.txt
}
uniqueFiles = uniqueFileSet.ToArray();
Array.Sort(uniqueFiles);
testFile = random.Choice(uniqueFiles);
// The on-disk format changed in 4.0.0, and 5.x can't load files from 3.x.
string oldBinaryVersionLowerBound = "4.0.0";
@ -334,8 +335,9 @@ namespace SummarizeTest
// thus, by definition, if "until_" appears, we do not want to run with the current binary version
oldBinaries = oldBinaries.Concat(currentBinary);
}
List<string> oldBinariesList = oldBinaries.ToList<string>();
if (oldBinariesList.Count == 0) {
string[] oldBinariesList = oldBinaries.ToArray<string>();
Array.Sort(oldBinariesList);
if (oldBinariesList.Count() == 0) {
// In theory, restarting tests are named to have at least one old binary version to run
// But if none of the provided old binaries fall in the range, we just skip the test
Console.WriteLine("No available old binary version from {0} to {1}", oldBinaryVersionLowerBound, oldBinaryVersionUpperBound);
@ -347,6 +349,7 @@ namespace SummarizeTest
else
{
uniqueFiles = Directory.GetFiles(testDir);
Array.Sort(uniqueFiles);
testFile = random.Choice(uniqueFiles);
}
}
@ -718,7 +721,7 @@ namespace SummarizeTest
process.Refresh();
if (process.HasExited)
return;
long mem = process.PrivateMemorySize64;
long mem = process.PagedMemorySize64;
MaxMem = Math.Max(MaxMem, mem);
//Console.WriteLine(string.Format("Process used {0} bytes", MaxMem));
Thread.Sleep(1000);

View File

@ -22,6 +22,8 @@ Each special key that existed before api version 630 is its own module. These ar
#. ``\xff\xff/cluster_file_path`` - See :ref:`cluster file client access <cluster-file-client-access>`
#. ``\xff\xff/status/json`` - See :doc:`Machine-readable status <mr-status>`
#. ``\xff\xff/worker_interfaces`` - key as the worker's network address and value as the serialized ClientWorkerInterface, not transactional
Prior to api version 630, it was also possible to read a range starting at ``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli,
but it's available in api version 630 as a module with prefix ``\xff\xff/worker_interfaces/``.
@ -210,6 +212,7 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/tenant/map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name ``<tenant>``. Clearing a key in this range will delete the tenant with name ``<tenant>``. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any changes in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants.
#. ``\xff\xff/management/tenant/rename/<tenant>`` Read/write. Setting a key in this range to an unused tenant name will result in the tenant with the name ``<tenant>`` to be renamed to the value provided. If the rename operation is a transaction retried in a loop, it is possible for the rename to be applied twice, in which case ``tenant_not_found`` or ``tenant_already_exists`` errors may be returned. This can be avoided by checking for the tenant's existence first.
#. ``\xff\xff/management/options/worker_interfaces/verify`` Read/write. Setting this key will add a verification phase in reading ``\xff\xff/worker_interfaces``. Setting this key only has an effect in the current transaction and is not persisted on commit. Try to establish connections with every worker from the list returned by Cluster Controller and only return those workers that the client can connect to. This option is now only used in fdbcli commands ``kill``, ``suspend`` and ``expensive_data_check`` to populate the worker list.
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or

View File

@ -46,7 +46,7 @@ ACTOR Future<bool> expensiveDataCheckCommandActor(
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if (address_interface->size() == 0) {

View File

@ -44,7 +44,7 @@ ACTOR Future<bool> killCommandActor(Reference<IDatabase> db,
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if (address_interface->size() == 0) {

View File

@ -43,7 +43,7 @@ ACTOR Future<bool> suspendCommandActor(Reference<IDatabase> db,
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
if (address_interface->size() == 0) {
printf("\nNo addresses can be suspended.\n");
} else if (address_interface->size() == 1) {

View File

@ -62,56 +62,52 @@ ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransacti
return valueObj["message"].get_str();
}
ACTOR Future<Void> verifyAndAddInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf;
try {
// the interface is back-ward compatible, thus if parsing failed, it needs to upgrade cli version
workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
} catch (Error& e) {
fprintf(stderr, "Error: %s; CLI version is too old, please update to use a newer version\n", e.what());
return Void();
}
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when(Optional<LeaderInfo> rep =
wait(brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())))) {
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
void addInterfacesFromKVs(RangeResult& kvs,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface) {
for (const auto& kv : kvs) {
ClientWorkerInterface workerInterf;
try {
// the interface is back-ward compatible, thus if parsing failed, it needs to upgrade cli version
workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
} catch (Error& e) {
fprintf(stderr, "Error: %s; CLI version is too old, please update to use a newer version\n", e.what());
return;
}
ClientLeaderRegInterface leaderInterf(workerInterf.address());
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
when(wait(delay(CLIENT_KNOBS->CLI_CONNECT_TIMEOUT))) {}
}
return Void();
}
ACTOR Future<Void> getWorkerInterfaces(Reference<ITransaction> tr,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface) {
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
bool verify) {
if (verify) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(workerInterfacesVerifyOptionSpecialKey, ValueRef());
}
// Hold the reference to the standalone's memory
state ThreadFuture<RangeResult> kvsFuture = tr->getRange(
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY);
RangeResult kvs = wait(safeThreadFutureToFuture(kvsFuture));
state RangeResult kvs = wait(safeThreadFutureToFuture(kvsFuture));
ASSERT(!kvs.more);
auto connectLock = makeReference<FlowLock>(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM);
std::vector<Future<Void>> addInterfs;
for (auto it : kvs) {
addInterfs.push_back(verifyAndAddInterface(address_interface, connectLock, it));
if (verify) {
// remove the option if set
tr->clear(workerInterfacesVerifyOptionSpecialKey);
}
wait(waitForAll(addInterfs));
addInterfacesFromKVs(kvs, address_interface);
return Void();
}

View File

@ -1050,36 +1050,6 @@ Future<T> stopNetworkAfter(Future<T> what) {
}
}
ACTOR Future<Void> addInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf =
BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when(Optional<LeaderInfo> rep =
wait(brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())))) {
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
}
when(wait(delay(CLIENT_KNOBS->CLI_CONNECT_TIMEOUT))) {}
}
return Void();
}
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;

View File

@ -120,6 +120,7 @@ extern const KeyRangeRef processClassSourceSpecialKeyRange;
extern const KeyRangeRef processClassTypeSpecialKeyRange;
// Other special keys
inline const KeyRef errorMsgSpecialKey = LiteralStringRef("\xff\xff/error_message");
inline const KeyRef workerInterfacesVerifyOptionSpecialKey = "\xff\xff/management/options/worker_interfaces/verify"_sr;
// help functions (Copied from fdbcli.actor.cpp)
// get all workers' info
@ -132,13 +133,14 @@ void printUsage(StringRef command);
// Pre: tr failed with special_keys_api_failure error
// Read the error message special key and return the message
ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransaction> tr);
// Using \xff\xff/worker_interfaces/ special key, get all worker interfaces
// Using \xff\xff/worker_interfaces/ special key, get all worker interfaces.
// A worker list will be returned from CC.
// If verify, we will try to establish connections to all workers returned.
// In particular, it will deserialize \xff\xff/worker_interfaces/<address>:=<ClientInterface> kv pairs and issue RPC
// calls, then only return interfaces(kv pairs) the client can talk to
ACTOR Future<Void> getWorkerInterfaces(Reference<ITransaction> tr,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface);
// Deserialize \xff\xff/worker_interfaces/<address>:=<ClientInterface> k-v pair and verify by a RPC call
ACTOR Future<Void> verifyAndAddInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv);
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
bool verify = false);
// print cluster status info
void printStatus(StatusObjectReader statusObj,
StatusClient::StatusLevel level,

View File

@ -40,6 +40,7 @@
#include <cstring>
#include <fstream> // for perf microbenchmark
#include <limits>
#include <vector>
#define BG_READ_DEBUG false
@ -209,16 +210,21 @@ namespace {
BlobGranuleFileEncryptionKeys getEncryptBlobCipherKey(const BlobGranuleCipherKeysCtx cipherKeysCtx) {
BlobGranuleFileEncryptionKeys eKeys;
// Cipher key reconstructed is 'never' inserted into BlobCipherKey cache, choose 'neverExpire'
eKeys.textCipherKey = makeReference<BlobCipherKey>(cipherKeysCtx.textCipherKey.encryptDomainId,
cipherKeysCtx.textCipherKey.baseCipherId,
cipherKeysCtx.textCipherKey.baseCipher.begin(),
cipherKeysCtx.textCipherKey.baseCipher.size(),
cipherKeysCtx.textCipherKey.salt);
cipherKeysCtx.textCipherKey.salt,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
eKeys.headerCipherKey = makeReference<BlobCipherKey>(cipherKeysCtx.headerCipherKey.encryptDomainId,
cipherKeysCtx.headerCipherKey.baseCipherId,
cipherKeysCtx.headerCipherKey.baseCipher.begin(),
cipherKeysCtx.headerCipherKey.baseCipher.size(),
cipherKeysCtx.headerCipherKey.salt);
cipherKeysCtx.headerCipherKey.salt,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
return eKeys;
}

View File

@ -60,6 +60,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( WRONG_SHARD_SERVER_DELAY, .01 ); if( randomize && BUGGIFY ) WRONG_SHARD_SERVER_DELAY = deterministicRandom()->random01(); // FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
init( FUTURE_VERSION_RETRY_DELAY, .01 ); if( randomize && BUGGIFY ) FUTURE_VERSION_RETRY_DELAY = deterministicRandom()->random01();// FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
init( GRV_ERROR_RETRY_DELAY, 5.0 ); if( randomize && BUGGIFY ) GRV_ERROR_RETRY_DELAY = 0.01 + 5 * deterministicRandom()->random01();
init( UNKNOWN_TENANT_RETRY_DELAY, 0.0 ); if( randomize && BUGGIFY ) UNKNOWN_TENANT_RETRY_DELAY = deterministicRandom()->random01();
init( REPLY_BYTE_LIMIT, 80000 );
init( DEFAULT_BACKOFF, .01 ); if( randomize && BUGGIFY ) DEFAULT_BACKOFF = deterministicRandom()->random01();

View File

@ -1279,32 +1279,6 @@ void DatabaseContext::registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Key prefix = Key(getKeyRange().begin);
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionRecord()),
[prefix = prefix, kr = KeyRange(kr)](const RangeResult& in) {
RangeResult result;
for (const auto& [k_, v] : in) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k))
result.push_back_deep(result.arena(), KeyValueRef(k, v));
}
std::sort(result.begin(), result.end(), KeyValueRef::OrderByKey{});
return result;
});
} else {
return RangeResult();
}
}
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
};
struct SingleSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
@ -3535,8 +3509,8 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext) {
state Span span("NAPI:waitForCommittedVersion"_loc, spanContext);
try {
loop {
loop {
try {
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetReadVersionReply v = wait(basicLoadBalance(
@ -3562,10 +3536,16 @@ ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, Span
wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, cx->taskID));
}
}
} catch (Error& e) {
if (e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_grv_proxy_memory_limit_exceeded) {
// GRV Proxy returns an error
wait(delayJittered(CLIENT_KNOBS->GRV_ERROR_RETRY_DELAY));
} else {
TraceEvent(SevError, "WaitForCommittedVersionError").error(e);
throw;
}
}
} catch (Error& e) {
TraceEvent(SevError, "WaitForCommittedVersionError").error(e);
throw;
}
}
@ -6774,9 +6754,12 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpa
}
}
} catch (Error& e) {
if (e.code() != error_code_broken_promise && e.code() != error_code_batch_transaction_throttled)
if (e.code() != error_code_broken_promise && e.code() != error_code_batch_transaction_throttled &&
e.code() != error_code_grv_proxy_memory_limit_exceeded)
TraceEvent(SevError, "GetConsistentReadVersionError").error(e);
if (e.code() == error_code_batch_transaction_throttled && !cx->apiVersionAtLeast(630)) {
if ((e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_grv_proxy_memory_limit_exceeded) &&
!cx->apiVersionAtLeast(630)) {
wait(delayJittered(5.0));
} else {
throw;

View File

@ -133,7 +133,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiComman
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
"failed/force",
"excluded_locality/force",
"failed_locality/force" };
"failed_locality/force",
"worker_interfaces/verify" };
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
@ -2754,6 +2755,64 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
return excludeLocalityCommitActor(ryw, true);
}
// Defined in ReadYourWrites.actor.cpp
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
// Defined in NativeAPI.actor.cpp
ACTOR Future<bool> verifyInterfaceActor(Reference<FlowLock> connectLock, ClientWorkerInterface workerInterf);
ACTOR static Future<RangeResult> workerInterfacesImplGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix,
KeyRangeRef kr) {
if (!ryw->getDatabase().getPtr() || !ryw->getDatabase()->getConnectionRecord())
return RangeResult();
state RangeResult interfs = wait(getWorkerInterfaces(ryw->getDatabase()->getConnectionRecord()));
// for options' special keys, the boolean flag indicates if it's a SET operation
auto [verify, _] = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
"worker_interfaces", "verify")];
state RangeResult result;
if (verify) {
// if verify option is set, we try to talk to every worker and only returns those we can talk to
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
state std::vector<Future<bool>> verifyInterfs;
for (const auto& [k_, value] : interfs) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k)) {
ClientWorkerInterface workerInterf =
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion());
verifyInterfs.push_back(verifyInterfaceActor(connectLock, workerInterf));
} else {
verifyInterfs.push_back(false);
}
}
wait(waitForAll(verifyInterfs));
// state int index;
for (int index = 0; index < interfs.size(); index++) {
if (verifyInterfs[index].get()) {
// if we can establish a connection, add the kv pair into the result
result.push_back_deep(result.arena(),
KeyValueRef(interfs[index].key.withPrefix(prefix), interfs[index].value));
}
}
} else {
for (const auto& [k_, v] : interfs) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k))
result.push_back_deep(result.arena(), KeyValueRef(k, v));
}
}
std::sort(result.begin(), result.end(), KeyValueRef::OrderByKey{});
return result;
}
WorkerInterfacesSpecialKeyImpl::WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<RangeResult> WorkerInterfacesSpecialKeyImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return workerInterfacesImplGetRangeActor(ryw, getKeyRange().begin, kr);
}
ACTOR Future<Void> validateSpecialSubrangeRead(ReadYourWritesTransaction* ryw,
KeySelector begin,
KeySelector end,

View File

@ -57,6 +57,7 @@ public:
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is
// mostly wrong (e.g. dumping the database after a test)
double FUTURE_VERSION_RETRY_DELAY;
double GRV_ERROR_RETRY_DELAY;
double UNKNOWN_TENANT_RETRY_DELAY;
int REPLY_BYTE_LIMIT;
double DEFAULT_BACKOFF;

View File

@ -548,6 +548,15 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class WorkerInterfacesSpecialKeyImpl : public SpecialKeyRangeReadImpl {
public:
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
};
// If the underlying set of key-value pairs of a key space is not changing, then we expect repeating a read to give the
// same result. Additionally, we can generate the expected result of any read if that read is reading a subrange. This
// actor performs a read of an arbitrary subrange of [begin, end) and validates the results.

View File

@ -453,20 +453,30 @@ struct BackupData {
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span("BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(span.context,
0,
TransactionPriority::DEFAULT,
invalidVersion,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onProxiesChanged())) {}
when(GetReadVersionReply reply =
wait(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False),
&GrvProxyInterface::getConsistentReadVersion,
request,
self->cx->taskID))) {
self->cx->ssVersionVectorCache.applyDelta(reply.ssVersionVectorDelta);
return reply.version;
try {
GetReadVersionRequest request(span.context,
0,
TransactionPriority::DEFAULT,
invalidVersion,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onProxiesChanged())) {}
when(GetReadVersionReply reply =
wait(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False),
&GrvProxyInterface::getConsistentReadVersion,
request,
self->cx->taskID))) {
self->cx->ssVersionVectorCache.applyDelta(reply.ssVersionVectorDelta);
return reply.version;
}
}
} catch (Error& e) {
if (e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_grv_proxy_memory_limit_exceeded) {
// GRV Proxy returns an error
wait(delayJittered(CLIENT_KNOBS->GRV_ERROR_RETRY_DELAY));
} else {
throw;
}
}
}

View File

@ -395,6 +395,21 @@ class DDTxnProcessorImpl {
}
}
}
ACTOR static Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnabledState* ddEnabledState) {
loop {
wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY));
state Transaction tr(cx);
loop {
try {
wait(checkMoveKeysLockReadOnly(&tr, lock, ddEnabledState));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
}
};
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
@ -432,6 +447,10 @@ Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
}
Future<Void> DDTxnProcessor::pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const {
return DDTxnProcessorImpl::pollMoveKeysLock(cx, lock, ddEnabledState);
}
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>>
DDMockTxnProcessor::getServerListAndProcessClasses() {
std::vector<std::pair<StorageServerInterface, ProcessClass>> res;

View File

@ -284,21 +284,6 @@ static std::set<int> const& normalDDQueueErrors() {
return s;
}
ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnabledState* ddEnabledState) {
loop {
wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY));
state Transaction tr(cx);
loop {
try {
wait(checkMoveKeysLockReadOnly(&tr, lock, ddEnabledState));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
}
struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
public:
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
@ -543,6 +528,10 @@ public:
Future<Void> shardsReady = resumeFromShards(Reference<DataDistributor>::addRef(this), g_network->isSimulated());
return resumeFromDataMoves(Reference<DataDistributor>::addRef(this), shardsReady);
}
Future<Void> pollMoveKeysLock(const DDEnabledState* ddEnabledState) {
return txnProcessor->pollMoveKeysLock(lock, ddEnabledState);
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -618,7 +607,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
}
actors.push_back(pollMoveKeysLock(cx, self->lock, ddEnabledState));
actors.push_back(self->pollMoveKeysLock(ddEnabledState));
actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData,
cx,
self->relocationProducer,

View File

@ -18,7 +18,9 @@
* limitations under the License.
*/
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/GetEncryptCipherKeys.h"
#include "flow/IRandom.h"
#include <boost/functional/hash.hpp>
@ -105,8 +107,12 @@ ACTOR Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>
for (const EKPBaseCipherDetails& details : reply.baseCipherDetails) {
EncryptCipherDomainId domainId = details.encryptDomainId;
if (domains.count(domainId) > 0 && cipherKeys.count(domainId) == 0) {
Reference<BlobCipherKey> cipherKey = cipherKeyCache->insertCipherKey(
domainId, details.baseCipherId, details.baseCipherKey.begin(), details.baseCipherKey.size());
Reference<BlobCipherKey> cipherKey = cipherKeyCache->insertCipherKey(domainId,
details.baseCipherId,
details.baseCipherKey.begin(),
details.baseCipherKey.size(),
details.refreshAt,
details.expireAt);
ASSERT(cipherKey.isValid());
cipherKeys[domainId] = cipherKey;
}
@ -191,10 +197,10 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
// Fetch any uncached cipher keys.
loop choose {
when(EKPGetBaseCipherKeysByIdsReply reply = wait(getUncachedEncryptCipherKeys(db, request))) {
std::unordered_map<BaseCipherIndex, StringRef, boost::hash<BaseCipherIndex>> baseCipherKeys;
std::unordered_map<BaseCipherIndex, EKPBaseCipherDetails, boost::hash<BaseCipherIndex>> baseCipherKeys;
for (const EKPBaseCipherDetails& baseDetails : reply.baseCipherDetails) {
BaseCipherIndex baseIdx = std::make_pair(baseDetails.encryptDomainId, baseDetails.baseCipherId);
baseCipherKeys[baseIdx] = baseDetails.baseCipherKey;
baseCipherKeys[baseIdx] = baseDetails;
}
// Insert base cipher keys into cache and construct result.
for (const BlobCipherDetails& details : cipherDetails) {
@ -211,9 +217,11 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
}
Reference<BlobCipherKey> cipherKey = cipherKeyCache->insertCipherKey(details.encryptDomainId,
details.baseCipherId,
itr->second.begin(),
itr->second.size(),
details.salt);
itr->second.baseCipherKey.begin(),
itr->second.baseCipherKey.size(),
details.salt,
itr->second.refreshAt,
itr->second.expireAt);
ASSERT(cipherKey.isValid());
cipherKeys[details] = cipherKey;
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Notified.h"
@ -31,6 +32,7 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -560,7 +562,9 @@ ACTOR Future<Void> queueGetReadVersionRequests(
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
bool canBeQueued = true;
if (stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() >
SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE) {
SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ||
(g_network->isSimulated() && !g_simulator.speedUpSimulation &&
deterministicRandom()->random01() < 0.01)) {
// When the limit is hit, try to drop requests from the lower priority queues.
if (req.priority == TransactionPriority::BATCH) {
canBeQueued = false;

View File

@ -76,18 +76,20 @@ struct SimKmsConnectorContext : NonCopyable, ReferenceCounted<SimKmsConnectorCon
};
namespace {
Optional<int64_t> getRefreshInterval(int64_t now, int64_t defaultTtl) {
Optional<int64_t> getRefreshInterval(const int64_t now, const int64_t defaultTtl) {
if (BUGGIFY) {
return Optional<int64_t>(now + defaultTtl);
return Optional<int64_t>(now);
}
return Optional<int64_t>();
return Optional<int64_t>(now + defaultTtl);
}
Optional<int64_t> getExpireInterval(Optional<int64_t> refTS) {
Optional<int64_t> getExpireInterval(Optional<int64_t> refTS, const int64_t defaultTtl) {
ASSERT(refTS.present());
if (BUGGIFY) {
return Optional<int64_t>(-1);
}
return refTS;
return (refTS.get() + defaultTtl);
}
} // namespace
@ -105,11 +107,17 @@ ACTOR Future<Void> ekLookupByIds(Reference<SimKmsConnectorContext> ctx,
}
// Lookup corresponding EncryptKeyCtx for input keyId
const int64_t currTS = (int64_t)now();
// Fetch default TTL to avoid BUGGIFY giving different value per invocation causing refTS > expTS
const int64_t defaultTtl = FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL;
Optional<int64_t> refAtTS = getRefreshInterval(currTS, defaultTtl);
Optional<int64_t> expAtTS = getExpireInterval(refAtTS, defaultTtl);
TraceEvent("SimKms.EKLookupById").detail("RefreshAt", refAtTS).detail("ExpireAt", expAtTS);
for (const auto& item : req.encryptKeyInfos) {
const auto& itr = ctx->simEncryptKeyStore.find(item.baseCipherId);
if (itr != ctx->simEncryptKeyStore.end()) {
rep.cipherKeyDetails.emplace_back_deep(
rep.arena, item.domainId, itr->first, StringRef(itr->second.get()->key));
rep.arena, item.domainId, itr->first, StringRef(itr->second.get()->key), refAtTS, expAtTS);
if (dbgKIdTrace.present()) {
// {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains
@ -145,11 +153,12 @@ ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This
// would mean multiple domains gets mapped to the same encryption key which is fine, the
// EncryptKeyStore guarantees that keyId -> plaintext encryptKey mapping is idempotent.
int64_t currTS = (int64_t)now();
const int64_t currTS = (int64_t)now();
// Fetch default TTL to avoid BUGGIFY giving different value per invocation causing refTS > expTS
int64_t defaultTtl = FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL;
const int64_t defaultTtl = FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL;
Optional<int64_t> refAtTS = getRefreshInterval(currTS, defaultTtl);
Optional<int64_t> expAtTS = getExpireInterval(refAtTS);
Optional<int64_t> expAtTS = getExpireInterval(refAtTS, defaultTtl);
TraceEvent("SimKms.EKLookupByDomainId").detail("RefreshAt", refAtTS).detail("ExpireAt", expAtTS);
for (const auto& info : req.encryptDomainInfos) {
EncryptCipherBaseKeyId keyId = 1 + abs(info.domainId) % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
const auto& itr = ctx->simEncryptKeyStore.find(keyId);

View File

@ -63,6 +63,8 @@ public:
}
virtual Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const { return Void(); };
virtual Future<Void> pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const = 0;
};
class DDTxnProcessorImpl;
@ -96,6 +98,8 @@ public:
const DatabaseConfiguration& configuration) const override;
Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const override;
Future<Void> pollMoveKeysLock(MoveKeysLock lock, const DDEnabledState* ddEnabledState) const override;
};
// A mock transaction implementation for test usage.

View File

@ -181,15 +181,30 @@ ACTOR Future<std::vector<std::pair<uint64_t, double>>> trackInsertionCount(Datab
ACTOR template <class T>
Future<Void> waitForLowInFlight(Database cx, T* workload) {
state Future<Void> timeout = delay(600.0);
loop {
int64_t inFlight = wait(getDataInFlight(cx, workload->dbInfo));
TraceEvent("DynamicWarming").detail("InFlight", inFlight);
if (inFlight > 1e6) { // Wait for just 1 MB to be in flight
wait(delay(1.0));
} else {
wait(delay(1.0));
TraceEvent("DynamicWarmingDone").log();
break;
try {
if (timeout.isReady()) {
throw timed_out();
}
int64_t inFlight = wait(getDataInFlight(cx, workload->dbInfo));
TraceEvent("DynamicWarming").detail("InFlight", inFlight);
if (inFlight > 1e6) { // Wait for just 1 MB to be in flight
wait(delay(1.0));
} else {
wait(delay(1.0));
TraceEvent("DynamicWarmingDone").log();
break;
}
} catch (Error& e) {
if (e.code() == error_code_attribute_not_found) {
// DD may not be initialized yet and attribute "DataInFlight" can be missing
wait(delay(1.0));
} else {
TraceEvent(SevWarn, "WaitForLowInFlightError").error(e);
throw;
}
}
}
return Void();

View File

@ -21,14 +21,17 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/EncryptUtils.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/BlobCipher.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/flow.h"
#include "flow/ITrace.h"
#include "flow/Trace.h"
#include <chrono>
#include <cstring>
#include <limits>
#include <memory>
#include <random>
@ -111,6 +114,7 @@ struct EncryptionOpsWorkload : TestWorkload {
int pageSize;
int maxBufSize;
std::unique_ptr<uint8_t[]> buff;
int enableTTLTest;
Arena arena;
std::unique_ptr<WorkloadMetrics> metrics;
@ -121,7 +125,7 @@ struct EncryptionOpsWorkload : TestWorkload {
EncryptCipherBaseKeyId headerBaseCipherId;
EncryptCipherRandomSalt headerRandomSalt;
EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enableTTLTest(false) {
mode = getOption(options, LiteralStringRef("fixedSize"), 1);
numIterations = getOption(options, LiteralStringRef("numIterations"), 10);
pageSize = getOption(options, LiteralStringRef("pageSize"), 4096);
@ -136,13 +140,18 @@ struct EncryptionOpsWorkload : TestWorkload {
metrics = std::make_unique<WorkloadMetrics>();
if (wcx.clientId == 0 && mode == 1) {
enableTTLTest = true;
}
TraceEvent("EncryptionOpsWorkload")
.detail("Mode", getModeStr())
.detail("MinDomainId", minDomainId)
.detail("MaxDomainId", maxDomainId);
.detail("MaxDomainId", maxDomainId)
.detail("EnableTTL", enableTTLTest);
}
~EncryptionOpsWorkload() { TraceEvent("EncryptionOpsWorkload_Done").log(); }
~EncryptionOpsWorkload() { TraceEvent("EncryptionOpsWorkload.Done").log(); }
bool isFixedSizePayload() { return mode == 1; }
@ -165,14 +174,19 @@ struct EncryptionOpsWorkload : TestWorkload {
void setupCipherEssentials() {
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
TraceEvent("SetupCipherEssentials_Start").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
TraceEvent("SetupCipherEssentials.Start").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
uint8_t buff[AES_256_KEY_LENGTH];
std::vector<Reference<BlobCipherKey>> cipherKeys;
int cipherLen = 0;
for (EncryptCipherDomainId id = minDomainId; id <= maxDomainId; id++) {
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(id, minBaseCipherId, buff, cipherLen);
cipherKeyCache->insertCipherKey(id,
minBaseCipherId,
buff,
cipherLen,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
ASSERT(cipherLen > 0 && cipherLen <= AES_256_KEY_LENGTH);
@ -183,13 +197,18 @@ struct EncryptionOpsWorkload : TestWorkload {
// insert the Encrypt Header cipherKey; record cipherDetails as getLatestCipher() may not work with multiple
// test clients
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID,
headerBaseCipherId,
buff,
cipherLen,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
Reference<BlobCipherKey> latestCipher = cipherKeyCache->getLatestCipherKey(ENCRYPT_HEADER_DOMAIN_ID);
ASSERT_EQ(latestCipher->getBaseCipherId(), headerBaseCipherId);
ASSERT_EQ(memcmp(latestCipher->rawBaseCipher(), buff, cipherLen), 0);
headerRandomSalt = latestCipher->getSalt();
TraceEvent("SetupCipherEssentials_Done")
TraceEvent("SetupCipherEssentials.Done")
.detail("MinDomainId", minDomainId)
.detail("MaxDomainId", maxDomainId)
.detail("HeaderBaseCipherId", headerBaseCipherId)
@ -198,9 +217,14 @@ struct EncryptionOpsWorkload : TestWorkload {
void resetCipherEssentials() {
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
cipherKeyCache->cleanup();
for (EncryptCipherDomainId id = minDomainId; id <= maxDomainId; id++) {
cipherKeyCache->resetEncryptDomainId(id);
}
TraceEvent("ResetCipherEssentials_Done").log();
cipherKeyCache->resetEncryptDomainId(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID);
cipherKeyCache->resetEncryptDomainId(ENCRYPT_HEADER_DOMAIN_ID);
TraceEvent("ResetCipherEssentials.Done").log();
}
void updateLatestBaseCipher(const EncryptCipherDomainId encryptDomainId,
@ -232,7 +256,9 @@ struct EncryptionOpsWorkload : TestWorkload {
baseCipherId,
cipherKey->rawBaseCipher(),
cipherKey->getBaseCipherLen(),
cipherKey->getSalt());
cipherKey->getSalt(),
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
// Ensure the update was a NOP
Reference<BlobCipherKey> cKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
ASSERT(cKey->isEqual(cipherKey));
@ -297,11 +323,7 @@ struct EncryptionOpsWorkload : TestWorkload {
metrics->updateDecryptionTime(std::chrono::duration<double, std::nano>(end - start).count());
}
Future<Void> setup(Database const& ctx) override { return Void(); }
std::string description() const override { return "EncryptionOps"; }
Future<Void> start(Database const& cx) override {
void testBlobCipherKeyCacheOps() {
uint8_t baseCipher[AES_256_KEY_LENGTH];
int baseCipherLen = 0;
EncryptCipherBaseKeyId nextBaseCipherId;
@ -322,7 +344,12 @@ struct EncryptionOpsWorkload : TestWorkload {
if (updateBaseCipher) {
// simulate baseCipherId getting refreshed/updated
updateLatestBaseCipher(encryptDomainId, &baseCipher[0], &baseCipherLen, &nextBaseCipherId);
cipherKeyCache->insertCipherKey(encryptDomainId, nextBaseCipherId, &baseCipher[0], baseCipherLen);
cipherKeyCache->insertCipherKey(encryptDomainId,
nextBaseCipherId,
&baseCipher[0],
baseCipherLen,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
}
auto start = std::chrono::high_resolution_clock::now();
@ -368,6 +395,103 @@ struct EncryptionOpsWorkload : TestWorkload {
// Cleanup cipherKeys
resetCipherEssentials();
}
static void compareCipherDetails(Reference<BlobCipherKey> cipherKey,
const EncryptCipherDomainId domId,
const EncryptCipherBaseKeyId baseCipherId,
const uint8_t* baseCipher,
const int baseCipherLen,
const int64_t refreshAt,
const int64_t expAt) {
ASSERT(cipherKey.isValid());
ASSERT_EQ(cipherKey->getDomainId(), domId);
ASSERT_EQ(cipherKey->getBaseCipherId(), baseCipherId);
ASSERT_EQ(memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen), 0);
ASSERT_EQ(cipherKey->getRefreshAtTS(), refreshAt);
ASSERT_EQ(cipherKey->getExpireAtTS(), expAt);
}
ACTOR Future<Void> testBlobCipherKeyCacheTTL(EncryptionOpsWorkload* self) {
state Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
state EncryptCipherDomainId domId = deterministicRandom()->randomInt(120000, 150000);
state EncryptCipherBaseKeyId baseCipherId = deterministicRandom()->randomInt(786, 1024);
state std::unique_ptr<uint8_t[]> baseCipher = std::make_unique<uint8_t[]>(AES_256_KEY_LENGTH);
state Reference<BlobCipherKey> cipherKey;
state EncryptCipherRandomSalt salt;
state int64_t refreshAt;
state int64_t expAt;
TraceEvent("TestBlobCipherCacheTTL.Start").detail("DomId", domId);
deterministicRandom()->randomBytes(baseCipher.get(), AES_256_KEY_LENGTH);
// Validate 'non-revocable' cipher with no expiration
refreshAt = std::numeric_limits<int64_t>::max();
expAt = std::numeric_limits<int64_t>::max();
cipherKeyCache->insertCipherKey(domId, baseCipherId, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
compareCipherDetails(cipherKey, domId, baseCipherId, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
TraceEvent("TestBlobCipherCacheTTL.NonRevocableNoExpiry").detail("DomId", domId);
// Validate 'non-revocable' cipher with expiration
state EncryptCipherBaseKeyId baseCipherId_1 = baseCipherId + 1;
refreshAt = now() + 5;
cipherKeyCache->insertCipherKey(domId, baseCipherId_1, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
ASSERT(cipherKey.isValid());
compareCipherDetails(cipherKey, domId, baseCipherId_1, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
salt = cipherKey->getSalt();
wait(delayUntil(refreshAt));
// Ensure that latest cipherKey needs refresh, however, cipher lookup works (non-revocable)
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
ASSERT(!cipherKey.isValid());
cipherKey = cipherKeyCache->getCipherKey(domId, baseCipherId_1, salt);
ASSERT(cipherKey.isValid());
compareCipherDetails(cipherKey, domId, baseCipherId_1, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
TraceEvent("TestBlobCipherCacheTTL.NonRevocableWithExpiry").detail("DomId", domId);
// Validate 'revocable' cipher with expiration
state EncryptCipherBaseKeyId baseCipherId_2 = baseCipherId + 2;
refreshAt = now() + 5;
expAt = refreshAt + 5;
cipherKeyCache->insertCipherKey(domId, baseCipherId_2, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
ASSERT(cipherKey.isValid());
compareCipherDetails(cipherKey, domId, baseCipherId_2, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
salt = cipherKey->getSalt();
wait(delayUntil(refreshAt));
// Ensure that latest cipherKey needs refresh, however, cipher lookup works (non-revocable)
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
ASSERT(!cipherKey.isValid());
cipherKey = cipherKeyCache->getCipherKey(domId, baseCipherId_2, salt);
ASSERT(cipherKey.isValid());
compareCipherDetails(cipherKey, domId, baseCipherId_2, baseCipher.get(), AES_256_KEY_LENGTH, refreshAt, expAt);
wait(delayUntil(expAt));
// Ensure that cipherKey lookup doesn't work after expiry
cipherKey = cipherKeyCache->getLatestCipherKey(domId);
ASSERT(!cipherKey.isValid());
cipherKey = cipherKeyCache->getCipherKey(domId, baseCipherId_2, salt);
ASSERT(!cipherKey.isValid());
TraceEvent("TestBlobCipherCacheTTL.End").detail("DomId", domId);
return Void();
}
Future<Void> setup(Database const& ctx) override { return Void(); }
std::string description() const override { return "EncryptionOps"; }
Future<Void> start(Database const& cx) override { return _start(cx, this); }
ACTOR Future<Void> _start(Database cx, EncryptionOpsWorkload* self) {
self->testBlobCipherKeyCacheOps();
if (self->enableTTLTest) {
wait(self->testBlobCipherKeyCacheTTL(self));
}
return Void();
}

View File

@ -63,7 +63,7 @@ struct ExceptionContract {
e.code() == error_code_future_version || e.code() == error_code_transaction_cancelled ||
e.code() == error_code_key_too_large || e.code() == error_code_value_too_large ||
e.code() == error_code_process_behind || e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_tag_throttled) {
e.code() == error_code_tag_throttled || e.code() == error_code_grv_proxy_memory_limit_exceeded) {
return;
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbserver/workloads/workloads.actor.h"
#include <fdbserver/Knobs.h>
#include <flow/actorcompiler.h>
@ -82,7 +83,16 @@ struct LocalRatekeeperWorkload : TestWorkload {
.detail("Actual", metrics.localRateLimit);
}
tr.reset();
Version readVersion = wait(tr.getReadVersion());
state Version readVersion = invalidVersion;
loop {
try {
Version v = wait(tr.getReadVersion());
readVersion = v;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
requests.clear();
// we send 100 requests to this storage node and count how many of those get rejected
for (int i = 0; i < 100; ++i) {

View File

@ -873,31 +873,34 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
TraceEvent(SevDebug, "DatabaseLocked").log();
// if database locked, fdb read should get database_locked error
try {
tx->reset();
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
RangeResult res = wait(tx->getRange(normalKeys, 1));
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
ASSERT(e.code() == error_code_database_locked);
tx->reset();
loop {
try {
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
RangeResult res = wait(tx->getRange(normalKeys, 1));
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
if (e.code() == error_code_grv_proxy_memory_limit_exceeded ||
e.code() == error_code_batch_transaction_throttled) {
wait(tx->onError(e));
} else {
ASSERT(e.code() == error_code_database_locked);
break;
}
}
}
// make sure we unlock the database
// unlock is idempotent, thus we can commit many times until successful
tx->reset();
loop {
try {
tx->reset();
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// unlock the database
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("lock"));
wait(tx->commit());
TraceEvent(SevDebug, "DatabaseUnlocked").log();
tx->reset();
// read should be successful
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
RangeResult res = wait(tx->getRange(normalKeys, 1));
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "DatabaseUnlockFailure").error(e);
@ -905,9 +908,23 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
wait(tx->onError(e));
}
}
tx->reset();
loop {
try {
// read should be successful
tx->setOption(FDBTransactionOptions::RAW_ACCESS);
RangeResult res = wait(tx->getRange(normalKeys, 1));
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// test consistencycheck which only used by ConsistencyCheck Workload
// Note: we have exclusive ownership of fdbShouldConsistencyCheckBeSuspended,
// no existing workloads can modify the key
tx->reset();
{
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);

View File

@ -606,6 +606,23 @@ struct TenantManagementWorkload : TestWorkload {
return Void();
}
// Returns GRV and eats GRV errors
ACTOR static Future<Version> getReadVersion(Reference<ReadYourWritesTransaction> tr) {
loop {
try {
Version version = wait(tr->getReadVersion());
return version;
} catch (Error& e) {
if (e.code() == error_code_grv_proxy_memory_limit_exceeded ||
e.code() == error_code_batch_transaction_throttled) {
wait(tr->onError(e));
} else {
throw;
}
}
}
}
ACTOR static Future<Void> deleteTenant(TenantManagementWorkload* self) {
state TenantName beginTenant = self->chooseTenantName(true);
state OperationType operationType = self->randomOperationType();
@ -695,7 +712,7 @@ struct TenantManagementWorkload : TestWorkload {
state bool retried = false;
loop {
try {
state Version beforeVersion = wait(tr->getReadVersion());
state Version beforeVersion = wait(self->getReadVersion(tr));
Optional<Void> result =
wait(timeout(deleteImpl(tr, beginTenant, endTenant, tenants, operationType, self),
deterministicRandom()->randomInt(1, 30)));
@ -704,7 +721,7 @@ struct TenantManagementWorkload : TestWorkload {
if (anyExists) {
if (self->oldestDeletionVersion == 0 && !tenants.empty()) {
tr->reset();
Version afterVersion = wait(tr->getReadVersion());
Version afterVersion = wait(self->getReadVersion(tr));
self->oldestDeletionVersion = afterVersion;
}
self->newestDeletionVersion = beforeVersion;
@ -727,6 +744,11 @@ struct TenantManagementWorkload : TestWorkload {
operationType == OperationType::MANAGEMENT_DATABASE);
ASSERT(retried);
break;
} else if (e.code() == error_code_grv_proxy_memory_limit_exceeded ||
e.code() == error_code_batch_transaction_throttled) {
// GRV proxy returns an error
wait(tr->onError(e));
continue;
} else {
throw;
}

View File

@ -125,6 +125,10 @@ struct UnitTestWorkload : TestWorkload {
}
}
std::sort(tests.begin(), tests.end(), [](auto lhs, auto rhs) {
return std::string_view(lhs->name) < std::string_view(rhs->name);
});
fprintf(stdout, "Found %zu tests\n", tests.size());
if (tests.size() == 0) {

View File

@ -20,21 +20,29 @@
#include "flow/BlobCipher.h"
#include "flow/Arena.h"
#include "flow/EncryptUtils.h"
#include "flow/Knobs.h"
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/ITrace.h"
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include <chrono>
#include <cstring>
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <unistd.h>
#include <utility>
#define BLOB_CIPHER_DEBUG false
namespace {
bool isEncryptHeaderAuthTokenModeValid(const EncryptAuthTokenMode mode) {
return mode >= ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE && mode < ENCRYPT_HEADER_AUTH_TOKEN_LAST;
@ -46,29 +54,43 @@ bool isEncryptHeaderAuthTokenModeValid(const EncryptAuthTokenMode mode) {
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen) {
int baseCiphLen,
const int64_t refreshAt,
const int64_t expireAt) {
// Salt generated is used while applying HMAC Key derivation, hence, not using crypto-secure hash algorithm is ok.
// Further, 'deterministic' salt generation is used to preserve simulation determinism properties.
EncryptCipherRandomSalt salt;
if (g_network->isSimulated()) {
salt = deterministicRandom()->randomUInt64();
} else {
salt = nondeterministicRandom()->randomUInt64();
}
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
// Support two type of CipherKeys: 'revocable' & 'non-revocable' ciphers.
// In all cases, either cipherKey never expires i.e. refreshAt == infinite, or, refreshAt needs <= expireAt
// timestamp.
ASSERT(refreshAt == std::numeric_limits<int64_t>::max() || (refreshAt <= expireAt));
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt, refreshAt, expireAt);
}
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt) {
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt) {
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt, refreshAt, expireAt);
}
void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherBaseKeyId& baseCiphId,
const EncryptCipherRandomSalt& salt) {
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt) {
// Set the base encryption key properties
baseCipher = std::make_unique<uint8_t[]>(AES_256_KEY_LENGTH);
memset(baseCipher.get(), 0, AES_256_KEY_LENGTH);
@ -82,15 +104,19 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
cipher = std::make_unique<uint8_t[]>(AES_256_KEY_LENGTH);
memset(cipher.get(), 0, AES_256_KEY_LENGTH);
applyHmacSha256Derivation();
// update the key creation time
creationTime = now();
// update cipher 'refresh' and 'expire' TS
refreshAtTS = refreshAt;
expireAtTS = expireAt;
TraceEvent("BlobCipherKey")
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "BlobCipher.KeyInit")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);
.detail("RefreshAt", refreshAtTS)
.detail("ExpireAtTS", expireAtTS);
#endif
}
void BlobCipherKey::applyHmacSha256Derivation() {
@ -118,7 +144,7 @@ BlobCipherKeyIdCache::BlobCipherKeyIdCache()
BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
: domainId(dId), latestBaseCipherKeyId(), latestRandomSalt() {
TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipher.KeyIdCacheInit").detail("DomainId", domainId);
}
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
@ -151,7 +177,9 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(const Enc
Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen) {
int baseCipherLen,
const int64_t refreshAt,
const int64_t expireAt) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
// BaseCipherKeys are immutable, given the routine invocation updates 'latestCipher',
@ -159,21 +187,30 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
Reference<BlobCipherKey> latestCipherKey = getLatestCipherKey();
if (latestCipherKey.isValid() && latestCipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(latestCipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
#endif
// Key is already present; nothing more to do.
return latestCipherKey;
} else {
TraceEvent("InsertBaseCipherKey_UpdateCipher")
TraceEvent(SevInfo, "BlobCipher.UpdatetBaseCipherKey")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
TraceEvent(SevInfo, "BlobCipherKey.InsertBaseCipherKeyLatest")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("RefreshAt", refreshAt)
.detail("ExpireAt", expireAt);
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen);
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(cipherKey->getBaseCipherId(), cipherKey->getSalt());
keyIdCache.emplace(cacheKey, cipherKey);
@ -187,7 +224,9 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt) {
ASSERT_NE(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
ASSERT_NE(salt, ENCRYPT_INVALID_RANDOM_SALT);
@ -197,21 +236,31 @@ Reference<BlobCipherKey> BlobCipherKeyIdCache::insertBaseCipherKey(const Encrypt
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(cacheKey);
if (itr != keyIdCache.end()) {
if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
#endif
// Key is already present; nothing more to do.
return itr->second;
} else {
TraceEvent("InsertBaseCipherKey_UpdateCipher")
TraceEvent(SevInfo, "BlobCipher.UpdateBaseCipherKey")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
TraceEvent(SevInfo, "BlobCipherKey.InsertBaseCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("Salt", salt)
.detail("RefreshAt", refreshAt)
.detail("ExpireAt", expireAt);
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt);
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
keyIdCache.emplace(cacheKey, cipherKey);
return cipherKey;
}
@ -237,7 +286,9 @@ std::vector<Reference<BlobCipherKey>> BlobCipherKeyIdCache::getAllCipherKeys() {
Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen) {
int baseCipherLen,
const int64_t refreshAt,
const int64_t expireAt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID) {
throw encrypt_invalid_id();
}
@ -248,18 +299,18 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
// Add mapping to track new encryption domain
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
Reference<BlobCipherKey> cipherKey =
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen);
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
domainCacheMap.emplace(domainId, keyIdCache);
return cipherKey;
} else {
// Track new baseCipher keys
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
return keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen);
return keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, refreshAt, expireAt);
}
TraceEvent("InsertCipherKey").detail("DomainId", domainId).detail("BaseCipherKeyId", baseCipherId);
} catch (Error& e) {
TraceEvent("InsertCipherKey_Failed").detail("BaseCipherKeyId", baseCipherId).detail("DomainId", domainId);
TraceEvent(SevWarn, "BlobCipher.InsertCipherKeyFailed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw;
}
}
@ -268,7 +319,9 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID ||
salt == ENCRYPT_INVALID_RANDOM_SALT) {
throw encrypt_invalid_id();
@ -280,20 +333,17 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
if (domainItr == domainCacheMap.end()) {
// Add mapping to track new encryption domain
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
cipherKey = keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
cipherKey =
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
domainCacheMap.emplace(domainId, keyIdCache);
} else {
// Track new baseCipher keys
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
cipherKey = keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
cipherKey =
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt, refreshAt, expireAt);
}
TraceEvent("InsertCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherKeyId", baseCipherId)
.detail("Salt", salt);
} catch (Error& e) {
TraceEvent("InsertCipherKey_Failed")
TraceEvent(SevWarn, "BlobCipher.InsertCipherKey_Failed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId)
.detail("Salt", salt);
@ -305,21 +355,27 @@ Reference<BlobCipherKey> BlobCipherKeyCache::insertCipherKey(const EncryptCipher
Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID) {
TraceEvent("GetLatestCipherKey_InvalidID").detail("DomainId", domainId);
TraceEvent(SevWarn, "BlobCipher.GetLatestCipherKeyInvalidID").detail("DomainId", domainId);
throw encrypt_invalid_id();
}
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
TraceEvent("GetLatestCipherKey_DomainNotFound").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipher.GetLatestCipherKeyDomainNotFound").detail("DomainId", domainId);
return Reference<BlobCipherKey>();
}
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
Reference<BlobCipherKey> cipherKey = keyIdCache->getLatestCipherKey();
if (cipherKey.isValid() && (now() - cipherKey->getCreationTime()) > FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL) {
TraceEvent("GetLatestCipherKey_ExpiredTTL")
// Ensure 'freshness' guarantees for the latestCipher
if (cipherKey.isValid() && cipherKey->needsRefresh()) {
#if BLOB_CIPHER_DEBUG
TraceEvent("SevDebug, BlobCipher.GetLatestNeedsRefresh")
.detail("DomainId", domainId)
.detail("BaseCipherId", cipherKey->getBaseCipherId());
.detail("Now", now())
.detail("RefreshAt", cipherKey->getRefreshAtTS());
#endif
return Reference<BlobCipherKey>();
}
@ -335,7 +391,22 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDom
}
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
return keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
Reference<BlobCipherKey> cipherKey = keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
// Ensure 'liveness' guarantees for the cipher
if (cipherKey.isValid() && cipherKey->isExpired()) {
#if BLOB_CIPHER_DEBUG
TraceEvent(SevDebug, "BlobCipher.GetCipherExpired")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("Now", now())
.detail("ExpireAt", cipherKey->getExpireAtTS());
#endif
return Reference<BlobCipherKey>();
}
return cipherKey;
}
void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domainId) {
@ -346,15 +417,18 @@ void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domain
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
keyIdCache->cleanup();
TraceEvent("ResetEncryptDomainId").detail("DomainId", domainId);
TraceEvent(SevInfo, "BlobCipher.ResetEncryptDomainId").detail("DomainId", domainId);
}
void BlobCipherKeyCache::cleanup() noexcept {
Reference<BlobCipherKeyCache> instance = BlobCipherKeyCache::getInstance();
TraceEvent(SevInfo, "BlobCipherKeyCache.Cleanup").log();
for (auto& domainItr : instance->domainCacheMap) {
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr.second;
keyIdCache->cleanup();
TraceEvent("BlobCipherKeyCache_Cleanup").detail("DomainId", domainItr.first);
TraceEvent(SevInfo, "BlobCipher.KeyCacheCleanup").detail("DomainId", domainItr.first);
}
instance->domainCacheMap.clear();
@ -423,7 +497,7 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
uint8_t* ciphertext = encryptBuf->begin();
int bytes{ 0 };
if (EVP_EncryptUpdate(ctx, ciphertext, &bytes, plaintext, plaintextLen) != 1) {
TraceEvent("Encrypt_UpdateFailed")
TraceEvent(SevWarn, "BlobCipher.EncryptUpdateFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
@ -431,14 +505,14 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
int finalBytes{ 0 };
if (EVP_EncryptFinal_ex(ctx, ciphertext + bytes, &finalBytes) != 1) {
TraceEvent("Encrypt_FinalFailed")
TraceEvent(SevWarn, "BlobCipher.EncryptFinalFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
if ((bytes + finalBytes) != plaintextLen) {
TraceEvent("Encrypt_UnexpectedCipherLen")
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedCipherLen")
.detail("PlaintextLen", plaintextLen)
.detail("EncryptedBufLen", bytes + finalBytes);
throw encrypt_ops_error();
@ -508,20 +582,20 @@ Standalone<StringRef> EncryptBlobCipherAes265Ctr::encryptBlobGranuleChunk(const
int bytes{ 0 };
if (EVP_EncryptUpdate(ctx, ciphertext, &bytes, plaintext, plaintextLen) != 1) {
TraceEvent("Encrypt_UpdateFailed")
TraceEvent(SevWarn, "BlobCipher.EncryptUpdateFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
int finalBytes{ 0 };
if (EVP_EncryptFinal_ex(ctx, ciphertext + bytes, &finalBytes) != 1) {
TraceEvent("Encrypt_FinalFailed")
TraceEvent(SevWarn, "BlobCipher.EncryptFinalFailed")
.detail("BaseCipherId", textCipherKey->getBaseCipherId())
.detail("EncryptDomainId", textCipherKey->getDomainId());
throw encrypt_ops_error();
}
if ((bytes + finalBytes) != plaintextLen) {
TraceEvent("Encrypt_UnexpectedCipherLen")
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedCipherLen")
.detail("PlaintextLen", plaintextLen)
.detail("EncryptedBufLen", bytes + finalBytes);
throw encrypt_ops_error();
@ -573,7 +647,7 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderAuthToken(const BlobCipherEncryptHe
AES_256_KEY_LENGTH,
arena);
if (memcmp(&header.multiAuthTokens.headerAuthToken[0], computedHeaderAuthToken.begin(), AUTH_TOKEN_SIZE) != 0) {
TraceEvent("VerifyEncryptBlobHeader_AuthTokenMismatch")
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("MultiAuthHeaderAuthToken",
@ -603,7 +677,7 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderSingleAuthToken(const uint8_t* ciph
StringRef computed = computeAuthToken(
buff, ciphertextLen + sizeof(BlobCipherEncryptHeader), headerCipherKey->rawCipher(), AES_256_KEY_LENGTH, arena);
if (memcmp(&header.singleAuthToken.authToken[0], computed.begin(), AUTH_TOKEN_SIZE) != 0) {
TraceEvent("VerifyEncryptBlobHeader_AuthTokenMismatch")
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("SingleAuthToken",
@ -629,7 +703,7 @@ void DecryptBlobCipherAes256Ctr::verifyHeaderMultiAuthToken(const uint8_t* ciphe
arena);
if (memcmp(&header.multiAuthTokens.cipherTextAuthToken[0], computedCipherTextAuthToken.begin(), AUTH_TOKEN_SIZE) !=
0) {
TraceEvent("VerifyEncryptBlobHeader_AuthTokenMismatch")
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeaderAuthTokenMismatch")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderMode", header.flags.encryptMode)
.detail("MultiAuthCipherTextAuthToken",
@ -659,7 +733,7 @@ void DecryptBlobCipherAes256Ctr::verifyEncryptHeaderMetadata(const BlobCipherEnc
if (header.flags.headerVersion != EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION ||
header.flags.encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR ||
!isEncryptHeaderAuthTokenModeValid((EncryptAuthTokenMode)header.flags.authTokenMode)) {
TraceEvent("VerifyEncryptBlobHeader")
TraceEvent(SevWarn, "BlobCipher.VerifyEncryptBlobHeader")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("ExpectedVersion", EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION)
.detail("EncryptCipherMode", header.flags.encryptMode)
@ -678,7 +752,8 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
verifyEncryptHeaderMetadata(header);
if (header.flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE && !headerCipherKey.isValid()) {
TraceEvent("Decrypt_InvalidHeaderCipherKey").detail("AuthTokenMode", header.flags.authTokenMode);
TraceEvent(SevWarn, "BlobCipher.DecryptInvalidHeaderCipherKey")
.detail("AuthTokenMode", header.flags.authTokenMode);
throw encrypt_ops_error();
}
@ -695,7 +770,7 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
uint8_t* plaintext = decrypted->begin();
int bytesDecrypted{ 0 };
if (!EVP_DecryptUpdate(ctx, plaintext, &bytesDecrypted, ciphertext, ciphertextLen)) {
TraceEvent("Decrypt_UpdateFailed")
TraceEvent(SevWarn, "BlobCipher.DecryptUpdateFailed")
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId)
.detail("EncryptDomainId", header.cipherTextDetails.encryptDomainId);
throw encrypt_ops_error();
@ -703,14 +778,14 @@ Reference<EncryptBuf> DecryptBlobCipherAes256Ctr::decrypt(const uint8_t* ciphert
int finalBlobBytes{ 0 };
if (EVP_DecryptFinal_ex(ctx, plaintext + bytesDecrypted, &finalBlobBytes) <= 0) {
TraceEvent("Decrypt_FinalFailed")
TraceEvent(SevWarn, "BlobCipher.DecryptFinalFailed")
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId)
.detail("EncryptDomainId", header.cipherTextDetails.encryptDomainId);
throw encrypt_ops_error();
}
if ((bytesDecrypted + finalBlobBytes) != ciphertextLen) {
TraceEvent("Encrypt_UnexpectedPlaintextLen")
TraceEvent(SevWarn, "BlobCipher.EncryptUnexpectedPlaintextLen")
.detail("CiphertextLen", ciphertextLen)
.detail("DecryptedBufLen", bytesDecrypted + finalBlobBytes);
throw encrypt_ops_error();
@ -760,6 +835,7 @@ StringRef computeAuthToken(const uint8_t* payload,
const uint8_t* key,
const int keyLen,
Arena& arena) {
CODE_PROBE(true, "Auth token generation");
HmacSha256DigestGen hmacGenerator(key, keyLen);
StringRef digest = hmacGenerator.digest(payload, payloadLen, arena);
@ -782,7 +858,7 @@ void forceLinkBlobCipherTests() {}
// 6.1 cleanup cipherKeys by given encryptDomainId
// 6.2. Cleanup all cached cipherKeys
TEST_CASE("flow/BlobCipher") {
TraceEvent("BlobCipherTest_Start").log();
TraceEvent("BlobCipherTest.Start").log();
// Construct a dummy External Key Manager representation and populate with some keys
class BaseCipher : public ReferenceCounted<BaseCipher>, NonCopyable {
@ -791,11 +867,16 @@ TEST_CASE("flow/BlobCipher") {
int len;
EncryptCipherBaseKeyId keyId;
std::unique_ptr<uint8_t[]> key;
int64_t refreshAt;
int64_t expireAt;
EncryptCipherRandomSalt generatedSalt;
BaseCipher(const EncryptCipherDomainId& dId, const EncryptCipherBaseKeyId& kId)
BaseCipher(const EncryptCipherDomainId& dId,
const EncryptCipherBaseKeyId& kId,
const int64_t rAt,
const int64_t eAt)
: domainId(dId), len(deterministicRandom()->randomInt(AES_256_KEY_LENGTH / 2, AES_256_KEY_LENGTH + 1)),
keyId(kId), key(std::make_unique<uint8_t[]>(len)) {
keyId(kId), key(std::make_unique<uint8_t[]>(len)), refreshAt(rAt), expireAt(eAt) {
deterministicRandom()->randomBytes(key.get(), len);
}
};
@ -810,7 +891,10 @@ TEST_CASE("flow/BlobCipher") {
deterministicRandom()->randomInt(minBaseCipherKeyId, minBaseCipherKeyId + 50) + 15;
for (int dId = minDomainId; dId <= maxDomainId; dId++) {
for (int kId = minBaseCipherKeyId; kId <= maxBaseCipherKeyId; kId++) {
domainKeyMap[dId].emplace(kId, makeReference<BaseCipher>(dId, kId));
domainKeyMap[dId].emplace(
kId,
makeReference<BaseCipher>(
dId, kId, std::numeric_limits<int64_t>::max(), std::numeric_limits<int64_t>::max()));
}
}
ASSERT_EQ(domainKeyMap.size(), maxDomainId);
@ -818,7 +902,7 @@ TEST_CASE("flow/BlobCipher") {
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
// validate getLatestCipherKey return empty when there's no cipher key
TraceEvent("BlobCipherTest_LatestKeyNotExists").log();
TraceEvent("BlobCipherTest.LatestKeyNotExists").log();
Reference<BlobCipherKey> latestKeyNonexists =
cipherKeyCache->getLatestCipherKey(deterministicRandom()->randomInt(minDomainId, maxDomainId));
ASSERT(!latestKeyNonexists.isValid());
@ -835,18 +919,27 @@ TEST_CASE("flow/BlobCipher") {
for (auto& baseKeyItr : domainItr.second) {
Reference<BaseCipher> baseCipher = baseKeyItr.second;
cipherKeyCache->insertCipherKey(
baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len);
cipherKeyCache->insertCipherKey(baseCipher->domainId,
baseCipher->keyId,
baseCipher->key.get(),
baseCipher->len,
baseCipher->refreshAt,
baseCipher->expireAt);
Reference<BlobCipherKey> fetchedKey = cipherKeyCache->getLatestCipherKey(baseCipher->domainId);
baseCipher->generatedSalt = fetchedKey->getSalt();
}
}
// insert EncryptHeader BlobCipher key
Reference<BaseCipher> headerBaseCipher = makeReference<BaseCipher>(ENCRYPT_HEADER_DOMAIN_ID, 1);
cipherKeyCache->insertCipherKey(
headerBaseCipher->domainId, headerBaseCipher->keyId, headerBaseCipher->key.get(), headerBaseCipher->len);
Reference<BaseCipher> headerBaseCipher = makeReference<BaseCipher>(
ENCRYPT_HEADER_DOMAIN_ID, 1, std::numeric_limits<int64_t>::max(), std::numeric_limits<int64_t>::max());
cipherKeyCache->insertCipherKey(headerBaseCipher->domainId,
headerBaseCipher->keyId,
headerBaseCipher->key.get(),
headerBaseCipher->len,
headerBaseCipher->refreshAt,
headerBaseCipher->expireAt);
TraceEvent("BlobCipherTest_InsertKeysDone").log();
TraceEvent("BlobCipherTest.InsertKeysDone").log();
// validate the cipherKey lookups work as desired
for (auto& domainItr : domainKeyMap) {
@ -865,17 +958,21 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_NE(std::memcmp(cipherKey->rawCipher(), baseCipher->key.get(), cipherKey->getBaseCipherLen()), 0);
}
}
TraceEvent("BlobCipherTest_LooksupDone").log();
TraceEvent("BlobCipherTest.LooksupDone").log();
// Ensure attemtping to insert existing cipherKey (identical) more than once is treated as a NOP
try {
Reference<BaseCipher> baseCipher = domainKeyMap[minDomainId][minBaseCipherKeyId];
cipherKeyCache->insertCipherKey(
baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len);
cipherKeyCache->insertCipherKey(baseCipher->domainId,
baseCipher->keyId,
baseCipher->key.get(),
baseCipher->len,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
} catch (Error& e) {
throw;
}
TraceEvent("BlobCipherTest_ReinsertIdempotentKeyDone").log();
TraceEvent("BlobCipherTest.ReinsertIdempotentKeyDone").log();
// Ensure attemtping to insert an existing cipherKey (modified) fails with appropriate error
try {
@ -886,13 +983,18 @@ TEST_CASE("flow/BlobCipher") {
for (int i = 2; i < 5; i++) {
rawCipher[i]++;
}
cipherKeyCache->insertCipherKey(baseCipher->domainId, baseCipher->keyId, &rawCipher[0], baseCipher->len);
cipherKeyCache->insertCipherKey(baseCipher->domainId,
baseCipher->keyId,
&rawCipher[0],
baseCipher->len,
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
} catch (Error& e) {
if (e.code() != error_code_encrypt_update_cipher) {
throw;
}
}
TraceEvent("BlobCipherTest_ReinsertNonIdempotentKeyDone").log();
TraceEvent("BlobCipherTest.ReinsertNonIdempotentKeyDone").log();
// Validate Encryption ops
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(minDomainId);
@ -908,7 +1010,7 @@ TEST_CASE("flow/BlobCipher") {
BlobCipherEncryptHeader headerCopy;
// validate basic encrypt followed by decrypt operation for AUTH_MODE_NONE
{
TraceEvent("NoneAuthMode_Start").log();
TraceEvent("NoneAuthMode.Start").log();
EncryptBlobCipherAes265Ctr encryptor(
cipherKey, Reference<BlobCipherKey>(), iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE);
@ -921,7 +1023,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE);
TraceEvent("BlobCipherTest_EncryptDone")
TraceEvent("BlobCipherTest.EncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -937,7 +1039,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest_DecryptDone").log();
TraceEvent("BlobCipherTest.DecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
memcpy(reinterpret_cast<uint8_t*>(&headerCopy),
@ -985,12 +1087,12 @@ TEST_CASE("flow/BlobCipher") {
ASSERT(false);
}
TraceEvent("NoneAuthMode_Done").log();
TraceEvent("NoneAuthMode.Done").log();
}
// validate basic encrypt followed by decrypt operation for AUTH_TOKEN_MODE_SINGLE
{
TraceEvent("SingleAuthMode_Start").log();
TraceEvent("SingleAuthMode.Start").log();
EncryptBlobCipherAes265Ctr encryptor(
cipherKey, headerCipherKey, iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
@ -1003,7 +1105,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
TraceEvent("BlobCipherTest_EncryptDone")
TraceEvent("BlobCipherTest.EncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -1024,7 +1126,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest_DecryptDone").log();
TraceEvent("BlobCipherTest.DecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
@ -1090,12 +1192,12 @@ TEST_CASE("flow/BlobCipher") {
}
}
TraceEvent("SingleAuthMode_Done").log();
TraceEvent("SingleAuthMode.Done").log();
}
// validate basic encrypt followed by decrypt operation for AUTH_TOKEN_MODE_MULTI
{
TraceEvent("MultiAuthMode_Start").log();
TraceEvent("MultiAuthMode.Start").log();
EncryptBlobCipherAes265Ctr encryptor(
cipherKey, headerCipherKey, iv, AES_256_IV_LENGTH, ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI);
@ -1108,7 +1210,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
ASSERT_EQ(header.flags.authTokenMode, ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI);
TraceEvent("BlobCipherTest_EncryptDone")
TraceEvent("BlobCipherTest.EncryptDone")
.detail("HeaderVersion", header.flags.headerVersion)
.detail("HeaderEncryptMode", header.flags.encryptMode)
.detail("DomainId", header.cipherTextDetails.encryptDomainId)
@ -1130,7 +1232,7 @@ TEST_CASE("flow/BlobCipher") {
ASSERT_EQ(decrypted->getLogicalSize(), bufLen);
ASSERT_EQ(memcmp(decrypted->begin(), &orgData[0], bufLen), 0);
TraceEvent("BlobCipherTest_DecryptDone").log();
TraceEvent("BlobCipherTest.DecryptDone").log();
// induce encryption header corruption - headerVersion corrupted
encrypted = encryptor.encrypt(&orgData[0], bufLen, &header, arena);
@ -1212,7 +1314,7 @@ TEST_CASE("flow/BlobCipher") {
}
}
TraceEvent("MultiAuthMode_Done").log();
TraceEvent("MultiAuthMode.Done").log();
}
// Validate dropping encryptDomainId cached keys
@ -1228,6 +1330,6 @@ TEST_CASE("flow/BlobCipher") {
ASSERT(cachedKeys.empty());
}
TraceEvent("BlobCipherTest_Done").log();
TraceEvent("BlobCipherTest.Done").log();
return Void();
}

View File

@ -283,8 +283,8 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( HEALTH_MONITOR_CONNECTION_MAX_CLOSED, 5 );
// Encryption
init( ENCRYPT_CIPHER_KEY_CACHE_TTL, isSimulated ? 120 : 10 * 60 );
if ( randomize && BUGGIFY) { ENCRYPT_CIPHER_KEY_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
init( ENCRYPT_CIPHER_KEY_CACHE_TTL, isSimulated ? 5 * 60 : 10 * 60 );
if ( randomize && BUGGIFY) { ENCRYPT_CIPHER_KEY_CACHE_TTL = deterministicRandom()->randomInt(2, 10) * 60; }
init( ENCRYPT_KEY_REFRESH_INTERVAL, isSimulated ? 60 : 8 * 60 );
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(2, 10); }
init( TOKEN_CACHE_SIZE, 100 );

View File

@ -2128,7 +2128,7 @@ struct TestGVR {
};
template <class F>
void startThreadF(F&& func) {
THREAD_HANDLE startThreadF(F&& func) {
struct Thing {
F f;
Thing(F&& f) : f(std::move(f)) {}
@ -2140,7 +2140,7 @@ void startThreadF(F&& func) {
}
};
Thing* t = new Thing(std::move(func));
g_network->startThread(Thing::start, t);
return g_network->startThread(Thing::start, t);
}
TEST_CASE("/flow/Net2/ThreadSafeQueue/Interface") {
@ -2168,6 +2168,7 @@ TEST_CASE("/flow/Net2/ThreadSafeQueue/Interface") {
struct QueueTestThreadState {
QueueTestThreadState(int threadId, int toProduce) : threadId(threadId), toProduce(toProduce) {}
int threadId;
THREAD_HANDLE handle;
int toProduce;
int produced = 0;
Promise<Void> doneProducing;
@ -2186,6 +2187,8 @@ struct QueueTestThreadState {
TEST_CASE("/flow/Net2/ThreadSafeQueue/Threaded") {
// Uses ThreadSafeQueue from multiple threads. Verifies that all pushed elements are popped, maintaining the
// ordering within a thread.
noUnseed = true; // multi-threading inherently non-deterministic
ThreadSafeQueue<int> queue;
state std::vector<QueueTestThreadState> perThread = { QueueTestThreadState(0, 1000000),
QueueTestThreadState(1, 100000),
@ -2197,7 +2200,7 @@ TEST_CASE("/flow/Net2/ThreadSafeQueue/Threaded") {
auto& s = perThread[t];
doneProducing.push_back(s.doneProducing.getFuture());
total += s.toProduce;
startThreadF([&queue, &s]() {
s.handle = startThreadF([&queue, &s]() {
printf("Thread%d\n", s.threadId);
int nextYield = 0;
while (s.produced < s.toProduce) {
@ -2228,7 +2231,14 @@ TEST_CASE("/flow/Net2/ThreadSafeQueue/Threaded") {
wait(waitForAll(doneProducing));
for (int t = 0; t < std::size(perThread); ++t) {
// Make sure we continue on the main thread.
Promise<Void> signal;
state Future<Void> doneConsuming = signal.getFuture();
g_network->onMainThread(std::move(signal), TaskPriority::DefaultOnMainThread);
wait(doneConsuming);
for (int t = 0; t < perThread.size(); ++t) {
waitThread(perThread[t].handle);
perThread[t].checkDone();
}
return Void();
@ -2238,6 +2248,7 @@ TEST_CASE("/flow/Net2/ThreadSafeQueue/Threaded") {
// satisfy this requirement yet.
TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
// Verifies that signals processed by onMainThread() are executed in order.
noUnseed = true; // multi-threading inherently non-deterministic
state std::vector<QueueTestThreadState> perThread = { QueueTestThreadState(0, 1000000),
QueueTestThreadState(1, 100000),
@ -2246,7 +2257,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
for (int t = 0; t < perThread.size(); ++t) {
auto& s = perThread[t];
doneProducing.push_back(s.doneProducing.getFuture());
startThreadF([&s]() {
s.handle = startThreadF([&s]() {
int nextYield = 0;
while (s.produced < s.toProduce) {
if (nextYield-- == 0) {
@ -2267,7 +2278,8 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
g_network->onMainThread(std::move(signal), TaskPriority::DefaultOnMainThread);
wait(doneConsuming);
for (int t = 0; t < std::size(perThread); ++t) {
for (int t = 0; t < perThread.size(); ++t) {
waitThread(perThread[t].handle);
perThread[t].checkDone();
}
return Void();

View File

@ -19,8 +19,6 @@
*/
#ifndef FLOW_BLOB_CIPHER_H
#define FLOW_BLOB_CIPHER_H
#include "flow/ProtocolVersion.h"
#include "flow/serialize.h"
#pragma once
#include "flow/Arena.h"
@ -28,10 +26,14 @@
#include "flow/FastRef.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/Knobs.h"
#include "flow/network.h"
#include "flow/ProtocolVersion.h"
#include "flow/serialize.h"
#include <boost/functional/hash.hpp>
#include <cinttypes>
#include <limits>
#include <memory>
#include <openssl/aes.h>
#include <openssl/engine.h>
@ -216,15 +218,20 @@ public:
BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen);
int baseCiphLen,
const int64_t refreshAt,
int64_t expireAt);
BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt);
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt);
uint8_t* data() const { return cipher.get(); }
uint64_t getCreationTime() const { return creationTime; }
uint64_t getRefreshAtTS() const { return refreshAtTS; }
uint64_t getExpireAtTS() const { return expireAtTS; }
EncryptCipherDomainId getDomainId() const { return encryptDomainId; }
EncryptCipherRandomSalt getSalt() const { return randomSalt; }
EncryptCipherBaseKeyId getBaseCipherId() const { return baseCipherId; }
@ -243,6 +250,20 @@ public:
randomSalt == details.salt;
}
inline bool needsRefresh() {
if (refreshAtTS == std::numeric_limits<int64_t>::max()) {
return false;
}
return now() >= refreshAtTS ? true : false;
}
inline bool isExpired() {
if (expireAtTS == std::numeric_limits<int64_t>::max()) {
return false;
}
return now() >= expireAtTS ? true : false;
}
void reset();
private:
@ -254,16 +275,20 @@ private:
EncryptCipherBaseKeyId baseCipherId;
// Random salt used for encryption cipher key derivation
EncryptCipherRandomSalt randomSalt;
// Creation timestamp for the derived encryption cipher key
uint64_t creationTime;
// Derived encryption cipher key
std::unique_ptr<uint8_t[]> cipher;
// CipherKey needs refreshAt
int64_t refreshAtTS;
// CipherKey is valid until
int64_t expireAtTS;
void initKey(const EncryptCipherDomainId& domainId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherBaseKeyId& baseCiphId,
const EncryptCipherRandomSalt& salt);
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt);
void applyHmacSha256Derivation();
};
@ -326,7 +351,9 @@ public:
Reference<BlobCipherKey> insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen);
int baseCipherLen,
const int64_t refreshAt,
const int64_t expireAt);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
@ -341,7 +368,9 @@ public:
Reference<BlobCipherKey> insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt);
// API cleanup the cache by dropping all cached cipherKeys
void cleanup();
@ -377,7 +406,9 @@ public:
Reference<BlobCipherKey> insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen);
int baseCipherLen,
const int64_t refreshAt,
const int64_t expireAt);
// Enable clients to insert base encryption cipher details to the BlobCipherKeyCache.
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
@ -394,7 +425,9 @@ public:
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
const EncryptCipherRandomSalt& salt,
const int64_t refreshAt,
const int64_t expireAt);
// API returns the last insert cipherKey for a given encryption domain Id.
// If domain Id is invalid, it would throw 'encrypt_invalid_id' exception,

View File

@ -1,3 +1,6 @@
[[knobs]]
enable_encryption = false
[[test]]
testTitle = 'EncryptDecrypt'