Merge pull request #10831 from sfc-gh-yiwu/ear_timeout

EaR: Handle KMS timeout in storage server and commit proxy
This commit is contained in:
Yi Wu 2023-08-28 20:59:22 -07:00 committed by GitHub
commit 8d7f2e84ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 399 additions and 130 deletions

View File

@ -317,6 +317,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION, 1 );
init( ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION, 1 );
init( ENCRYPT_HEADER_AES_CTR_HMAC_SHA_AUTH_VERSION, 1 );
init( ENCRYPT_GET_CIPHER_KEY_LONG_REQUEST_THRESHOLD, 6.0);
init( REST_KMS_ALLOW_NOT_SECURE_CONNECTION, false ); if ( randomize && BUGGIFY ) REST_KMS_ALLOW_NOT_SECURE_CONNECTION = !REST_KMS_ALLOW_NOT_SECURE_CONNECTION;
init( SIM_KMS_VAULT_MAX_KEYS, 4096 );

View File

@ -575,6 +575,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TENANT_ID_REQUEST_MAX_QUEUE_SIZE, 1e6 );
init( BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE, 1e5 ); if ( randomize && BUGGIFY ) BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE = 100;
init( COMMIT_PROXY_LIVENESS_TIMEOUT, 20.0 );
init( COMMIT_PROXY_MAX_LIVENESS_TIMEOUT, 600.0 ); if ( randomize && BUGGIFY ) COMMIT_PROXY_MAX_LIVENESS_TIMEOUT = 20.0;
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;

View File

@ -316,6 +316,7 @@ public:
int ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION;
int ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION;
int ENCRYPT_HEADER_AES_CTR_HMAC_SHA_AUTH_VERSION;
double ENCRYPT_GET_CIPHER_KEY_LONG_REQUEST_THRESHOLD;
// REST KMS configurations
bool REST_KMS_ALLOW_NOT_SECURE_CONNECTION;

View File

@ -43,6 +43,36 @@ struct TextAndHeaderCipherKeys {
Reference<BlobCipherKey> cipherHeaderKey;
};
class GetEncryptCipherKeysMonitor : public ReferenceCounted<GetEncryptCipherKeysMonitor> {
public:
GetEncryptCipherKeysMonitor() = default;
Reference<AsyncVar<bool>> degraded() const { return degradedVar; }
template <class T>
Future<T> monitor(Future<T> actor);
ActiveCounter<int>::Releaser handleLongRunningRequest() {
// Check if we are transiting into degraded state before we bump longRunningRequests counter.
if (longRunningRequests.getValue() == 0) {
ASSERT(degradedVar->get() == false);
degradedVar->set(true);
}
return longRunningRequests.take(1, [this]() {
// Releaser callback fires after we decrease the longRunningRequests counter.
// Check if we are transiting away from degraded state.
if (longRunningRequests.getValue() == 0) {
ASSERT(degradedVar->get() == true);
degradedVar->set(false);
}
});
}
private:
Reference<AsyncVar<bool>> degradedVar = makeReference<AsyncVar<bool>>(false);
ActiveCounter<int> longRunningRequests = { 0 };
};
template <class T>
class GetEncryptCipherKeys {
public:
@ -52,14 +82,17 @@ public:
static Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getLatestEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
std::unordered_set<EncryptCipherDomainId> domainIds,
BlobCipherMetrics::UsageType usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
// Get latest cipher key for given a encryption domain. It tries to get the cipher key from the local cache.
// In case of cache miss, it fetches the cipher key from EncryptKeyProxy and put the result in the local cache
// before return.
static Future<Reference<BlobCipherKey>> getLatestEncryptCipherKey(Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType);
static Future<Reference<BlobCipherKey>> getLatestEncryptCipherKey(
Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
// Get cipher keys specified by the list of cipher details. It tries to get the cipher keys from local cache.
// In case of cache miss, it fetches the cipher keys from EncryptKeyProxy and put the result in the local cache
@ -67,22 +100,24 @@ public:
static Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> getEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
std::unordered_set<BlobCipherDetails> cipherDetails,
BlobCipherMetrics::UsageType usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
static Future<TextAndHeaderCipherKeys> getLatestEncryptCipherKeysForDomain(Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType);
static Future<TextAndHeaderCipherKeys> getLatestEncryptCipherKeysForDomain(
Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
static Future<TextAndHeaderCipherKeys> getLatestSystemEncryptCipherKeys(const Reference<AsyncVar<T> const>& db,
BlobCipherMetrics::UsageType usageType);
static Future<TextAndHeaderCipherKeys> getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeader header,
BlobCipherMetrics::UsageType usageType);
static Future<TextAndHeaderCipherKeys> getLatestSystemEncryptCipherKeys(
const Reference<AsyncVar<T> const>& db,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
static Future<TextAndHeaderCipherKeys> getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeaderRef header,
BlobCipherMetrics::UsageType usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor = {});
};
#endif

View File

@ -39,6 +39,29 @@
#define DEBUG_GET_CIPHER false
namespace {
ACTOR template <class T>
Future<T> monitorGetEncryptCipherKeys(GetEncryptCipherKeysMonitor* self, Future<T> actor) {
Future<Void> timer = delay(CLIENT_KNOBS->ENCRYPT_GET_CIPHER_KEY_LONG_REQUEST_THRESHOLD, TaskPriority::DefaultDelay);
choose {
when(T t = wait(actor)) {
return t;
}
when(wait(timer)) {
// Bookkeeping number of long running actors using RAII and set degraded state accordingly.
state ActiveCounter<int>::Releaser r = self->handleLongRunningRequest();
T t = wait(actor);
return t;
}
}
}
} // anonymous namespace
template <class T>
Future<T> GetEncryptCipherKeysMonitor::monitor(Future<T> actor) {
return monitorGetEncryptCipherKeys(this, actor);
}
template <class T>
Optional<EncryptKeyProxyInterface> _getEncryptKeyProxyInterface(const Reference<AsyncVar<T> const>& db) {
if constexpr (std::is_same_v<T, ClientDBInfo>) {
@ -106,7 +129,7 @@ Future<EKPGetLatestBaseCipherKeysReply> _getUncachedLatestEncryptCipherKeys(Refe
}
ACTOR template <class T>
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> _getLatestEncryptCipherKeys(
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> _getLatestEncryptCipherKeysImpl(
Reference<AsyncVar<T> const> db,
std::unordered_set<EncryptCipherDomainId> domainIds,
BlobCipherMetrics::UsageType usageType) {
@ -168,13 +191,28 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> _get
return cipherKeys;
}
template <class T>
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> _getLatestEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
std::unordered_set<EncryptCipherDomainId> domainIds,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> actor =
_getLatestEncryptCipherKeysImpl(db, domainIds, usageType);
if (monitor.isValid()) {
actor = monitor->monitor(actor);
}
return actor;
}
ACTOR template <class T>
Future<Reference<BlobCipherKey>> _getLatestEncryptCipherKey(Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType) {
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
std::unordered_set<EncryptCipherDomainId> domainIds{ domainId };
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKey =
wait(_getLatestEncryptCipherKeys(db, domainIds, usageType));
wait(_getLatestEncryptCipherKeys(db, domainIds, usageType, monitor));
return cipherKey.at(domainId);
}
@ -232,7 +270,7 @@ Future<EKPGetBaseCipherKeysByIdsReply> _getUncachedEncryptCipherKeys(Reference<A
// In case of cache miss, it fetches the cipher keys from EncryptKeyProxy and put the result in the local cache
// before return.
ACTOR template <class T>
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> _getEncryptCipherKeys(
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> _getEncryptCipherKeysImpl(
Reference<AsyncVar<T> const> db,
std::unordered_set<BlobCipherDetails> cipherDetails,
BlobCipherMetrics::UsageType usageType) {
@ -305,14 +343,29 @@ Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> _getEncr
return cipherKeys;
}
template <class T>
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> _getEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
std::unordered_set<BlobCipherDetails> cipherDetails,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> actor =
_getEncryptCipherKeysImpl(db, cipherDetails, usageType);
if (monitor.isValid()) {
actor = monitor->monitor(actor);
}
return actor;
}
ACTOR template <class T>
Future<TextAndHeaderCipherKeys> _getLatestEncryptCipherKeysForDomain(Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType) {
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
// TODO: Do not fetch header cipher key if authentication is diabled.
std::unordered_set<EncryptCipherDomainId> domainIds = { domainId, ENCRYPT_HEADER_DOMAIN_ID };
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys =
wait(_getLatestEncryptCipherKeys(db, domainIds, usageType));
wait(_getLatestEncryptCipherKeys(db, domainIds, usageType, monitor));
ASSERT(cipherKeys.count(domainId) > 0);
ASSERT(cipherKeys.count(ENCRYPT_HEADER_DOMAIN_ID) > 0);
TextAndHeaderCipherKeys result{ cipherKeys.at(domainId), cipherKeys.at(ENCRYPT_HEADER_DOMAIN_ID) };
@ -323,48 +376,16 @@ Future<TextAndHeaderCipherKeys> _getLatestEncryptCipherKeysForDomain(Reference<A
template <class T>
Future<TextAndHeaderCipherKeys> _getLatestSystemEncryptCipherKeys(const Reference<AsyncVar<T> const>& db,
BlobCipherMetrics::UsageType usageType) {
return _getLatestEncryptCipherKeysForDomain(db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, usageType);
}
ACTOR template <class T>
Future<TextAndHeaderCipherKeys> _getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeader header,
BlobCipherMetrics::UsageType usageType) {
state bool authenticatedEncryption = header.flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE;
ASSERT(header.cipherTextDetails.isValid());
ASSERT(!authenticatedEncryption || header.cipherHeaderDetails.isValid());
std::unordered_set<BlobCipherDetails> cipherDetails{ header.cipherTextDetails };
if (authenticatedEncryption) {
cipherDetails.insert(header.cipherHeaderDetails);
}
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys =
wait(_getEncryptCipherKeys(db, cipherDetails, usageType));
TextAndHeaderCipherKeys result;
auto setCipherKey = [&](const BlobCipherDetails& details, TextAndHeaderCipherKeys& result) {
ASSERT(details.isValid());
auto iter = cipherKeys.find(details);
ASSERT(iter != cipherKeys.end() && iter->second.isValid());
isEncryptHeaderDomain(details.encryptDomainId) ? result.cipherHeaderKey = iter->second
: result.cipherTextKey = iter->second;
};
setCipherKey(header.cipherTextDetails, result);
if (authenticatedEncryption) {
setCipherKey(header.cipherHeaderDetails, result);
}
ASSERT(result.cipherTextKey.isValid() && (!authenticatedEncryption || result.cipherHeaderKey.isValid()));
return result;
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getLatestEncryptCipherKeysForDomain(db, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, usageType, monitor);
}
ACTOR template <class T>
Future<TextAndHeaderCipherKeys> _getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeaderRef header,
BlobCipherMetrics::UsageType usageType) {
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
state bool authenticatedEncryption = header.getAuthTokenMode() != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE;
state EncryptHeaderCipherDetails details = header.getCipherDetails();
@ -378,7 +399,7 @@ Future<TextAndHeaderCipherKeys> _getEncryptCipherKeys(Reference<AsyncVar<T> cons
}
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys =
wait(_getEncryptCipherKeys(db, cipherDetails, usageType));
wait(_getEncryptCipherKeys(db, cipherDetails, usageType, monitor));
TextAndHeaderCipherKeys result;
auto setCipherKey = [&](const BlobCipherDetails& details, TextAndHeaderCipherKeys& result) {
@ -401,53 +422,53 @@ template <class T>
Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>>
GetEncryptCipherKeys<T>::getLatestEncryptCipherKeys(Reference<AsyncVar<T> const> db,
std::unordered_set<EncryptCipherDomainId> domainIds,
BlobCipherMetrics::UsageType usageType) {
return _getLatestEncryptCipherKeys(db, domainIds, usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getLatestEncryptCipherKeys(db, domainIds, usageType, monitor);
}
template <class T>
Future<Reference<BlobCipherKey>> GetEncryptCipherKeys<T>::getLatestEncryptCipherKey(
Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType) {
return _getLatestEncryptCipherKey(db, domainId, usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getLatestEncryptCipherKey(db, domainId, usageType, monitor);
}
template <class T>
Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> GetEncryptCipherKeys<T>::getEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
std::unordered_set<BlobCipherDetails> cipherDetails,
BlobCipherMetrics::UsageType usageType) {
return _getEncryptCipherKeys(db, cipherDetails, usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getEncryptCipherKeys(db, cipherDetails, usageType, monitor);
}
template <class T>
Future<TextAndHeaderCipherKeys> GetEncryptCipherKeys<T>::getLatestEncryptCipherKeysForDomain(
Reference<AsyncVar<T> const> db,
EncryptCipherDomainId domainId,
BlobCipherMetrics::UsageType usageType) {
return _getLatestEncryptCipherKeysForDomain(db, domainId, usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getLatestEncryptCipherKeysForDomain(db, domainId, usageType, monitor);
}
template <class T>
Future<TextAndHeaderCipherKeys> GetEncryptCipherKeys<T>::getLatestSystemEncryptCipherKeys(
const Reference<AsyncVar<T> const>& db,
BlobCipherMetrics::UsageType usageType) {
return _getLatestSystemEncryptCipherKeys(db, usageType);
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getLatestSystemEncryptCipherKeys(db, usageType, monitor);
}
template <class T>
Future<TextAndHeaderCipherKeys> GetEncryptCipherKeys<T>::getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeader header,
BlobCipherMetrics::UsageType usageType) {
return _getEncryptCipherKeys(db, header, usageType);
}
template <class T>
Future<TextAndHeaderCipherKeys> GetEncryptCipherKeys<T>::getEncryptCipherKeys(Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeaderRef header,
BlobCipherMetrics::UsageType usageType) {
return _getEncryptCipherKeys(db, header, usageType);
Future<TextAndHeaderCipherKeys> GetEncryptCipherKeys<T>::getEncryptCipherKeys(
Reference<AsyncVar<T> const> db,
BlobCipherEncryptHeaderRef header,
BlobCipherMetrics::UsageType usageType,
Reference<GetEncryptCipherKeysMonitor> monitor) {
return _getEncryptCipherKeys(db, header, usageType, monitor);
}
#include "flow/unactorcompiler.h"

View File

@ -522,6 +522,7 @@ public:
int TENANT_ID_REQUEST_MAX_QUEUE_SIZE;
int BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE;
double COMMIT_PROXY_LIVENESS_TIMEOUT;
double COMMIT_PROXY_MAX_LIVENESS_TIMEOUT;
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;

View File

@ -1031,7 +1031,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
}
}
getCipherKeys = GetEncryptCipherKeys<ServerDBInfo>::getLatestEncryptCipherKeys(
pProxyCommitData->db, encryptDomainIds, BlobCipherMetrics::TLOG);
pProxyCommitData->db, encryptDomainIds, BlobCipherMetrics::TLOG, pProxyCommitData->encryptionMonitor);
}
self->releaseFuture = releaseResolvingAfter(pProxyCommitData, self->releaseDelay, self->localBatchNumber);
@ -1725,8 +1725,8 @@ ACTOR Future<WriteMutationRefVar> writeMutationEncryptedMutation(CommitBatchCont
ASSERT(encryptedMutation.isEncrypted());
Reference<AsyncVar<ServerDBInfo> const> dbInfo = self->pProxyCommitData->db;
headerRef = encryptedMutation.configurableEncryptionHeader();
TextAndHeaderCipherKeys cipherKeys =
wait(GetEncryptCipherKeys<ServerDBInfo>::getEncryptCipherKeys(dbInfo, headerRef, BlobCipherMetrics::TLOG));
TextAndHeaderCipherKeys cipherKeys = wait(GetEncryptCipherKeys<ServerDBInfo>::getEncryptCipherKeys(
dbInfo, headerRef, BlobCipherMetrics::TLOG, self->pProxyCommitData->encryptionMonitor));
decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.type == mutation->type);
@ -3446,9 +3446,11 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> systemCipherKeys;
if (pContext->pCommitData->encryptMode.isEncryptionEnabled()) {
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks =
wait(GetEncryptCipherKeys<ServerDBInfo>::getLatestEncryptCipherKeys(
pContext->pCommitData->db, ENCRYPT_CIPHER_SYSTEM_DOMAINS, BlobCipherMetrics::TLOG));
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks = wait(
GetEncryptCipherKeys<ServerDBInfo>::getLatestEncryptCipherKeys(pContext->pCommitData->db,
ENCRYPT_CIPHER_SYSTEM_DOMAINS,
BlobCipherMetrics::TLOG,
pContext->pCommitData->encryptionMonitor));
systemCipherKeys = cks;
}
@ -3791,12 +3793,20 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
masterLifetime.isEqual(commitData.db->get().masterLifetime))) {
if (trs.size() || lastCommitComplete.isReady()) {
// When encryption is enabled, cipher key fetching issue (e.g KMS outage) is detected by the
// encryption monitor. In that case, commit timeout is expected and timeout error is suppressed. But
// we still want to trigger recovery occassionally (with the COMMIT_PROXY_MAX_LIVENESS_TIMEOUT), in
// the hope that the cipher key fetching issue could be resolve by recovery (e.g, if one CP have
// networking issue connecting to EKP, and recovery may exclude the CP).
lastCommitComplete = transformError(
timeoutError(
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes),
SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT),
timeoutErrorIfCleared(
commitBatch(&commitData,
const_cast<std::vector<CommitTransactionRequest>*>(&batchedRequests.first),
batchBytes),
commitData.encryptionMonitor->degraded(),
SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT),
SERVER_KNOBS->COMMIT_PROXY_MAX_LIVENESS_TIMEOUT),
timed_out(),
failed_to_progress());
addActor.send(lastCommitComplete);

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/GetEncryptCipherKeys.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "flow/flow.h"
@ -32,7 +33,8 @@ IKeyValueStore* openKVStore(KeyValueStoreType storeType,
bool openRemotely,
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> encryptionMode,
int64_t pageCacheBytes) {
int64_t pageCacheBytes,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor) {
// Only Redwood support encryption currently.
if (encryptionMode.present() && encryptionMode.get().isEncryptionEnabled() &&
storeType != KeyValueStoreType::SSD_REDWOOD_V1) {
@ -52,7 +54,7 @@ IKeyValueStore* openKVStore(KeyValueStoreType storeType,
case KeyValueStoreType::MEMORY:
return keyValueStoreMemory(filename, logID, memoryLimit);
case KeyValueStoreType::SSD_REDWOOD_V1:
return keyValueStoreRedwoodV1(filename, logID, db, encryptionMode, pageCacheBytes);
return keyValueStoreRedwoodV1(filename, logID, db, encryptionMode, pageCacheBytes, encryptionMonitor);
case KeyValueStoreType::SSD_ROCKSDB_V1:
return keyValueStoreRocksDB(filename, logID, storeType);
case KeyValueStoreType::SSD_SHARDED_ROCKSDB:

View File

@ -20,6 +20,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.h"
#include "fdbclient/RandomKeyValueUtils.h"
#include "fdbclient/Tuple.h"
#include "fdbrpc/DDSketch.h"
@ -5043,10 +5044,12 @@ public:
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> expectedEncryptionMode,
EncodingType encodingType = EncodingType::MAX_ENCODING_TYPE,
Reference<IPageEncryptionKeyProvider> keyProvider = {})
Reference<IPageEncryptionKeyProvider> keyProvider = {},
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor = {})
: m_pager(pager), m_db(db), m_expectedEncryptionMode(expectedEncryptionMode), m_encodingType(encodingType),
m_enforceEncodingType(false), m_keyProvider(keyProvider), m_pBuffer(nullptr), m_mutationCount(0), m_name(name),
m_logID(logID), m_pBoundaryVerifier(DecodeBoundaryVerifier::getVerifier(name)) {
m_enforceEncodingType(false), m_keyProvider(keyProvider), m_encryptionMonitor(encryptionMonitor),
m_pBuffer(nullptr), m_mutationCount(0), m_name(name), m_logID(logID),
m_pBoundaryVerifier(DecodeBoundaryVerifier::getVerifier(name)) {
m_pDecodeCacheMemory = m_pager->getPageCachePenaltySource();
m_lazyClearActor = 0;
m_init = init_impl(this);
@ -5221,14 +5224,14 @@ public:
case EncodingType::AESEncryption:
ASSERT(m_expectedEncryptionMode.present());
ASSERT(m_db.isValid());
m_keyProvider =
makeReference<AESEncryptionKeyProvider<AESEncryption>>(m_db, m_expectedEncryptionMode.get());
m_keyProvider = makeReference<AESEncryptionKeyProvider<AESEncryption>>(
m_db, m_expectedEncryptionMode.get(), m_encryptionMonitor);
break;
case EncodingType::AESEncryptionWithAuth:
ASSERT(m_expectedEncryptionMode.present());
ASSERT(m_db.isValid());
m_keyProvider = makeReference<AESEncryptionKeyProvider<AESEncryptionWithAuth>>(
m_db, m_expectedEncryptionMode.get());
m_db, m_expectedEncryptionMode.get(), m_encryptionMonitor);
break;
default:
ASSERT(false);
@ -5658,6 +5661,7 @@ private:
EncodingType m_encodingType;
bool m_enforceEncodingType;
Reference<IPageEncryptionKeyProvider> m_keyProvider;
Reference<GetEncryptCipherKeysMonitor> m_encryptionMonitor;
// Counter to update with DecodeCache memory usage
int64_t* m_pDecodeCacheMemory = nullptr;
@ -8018,7 +8022,8 @@ public:
Optional<EncryptionAtRestMode> encryptionMode,
EncodingType encodingType = EncodingType::MAX_ENCODING_TYPE,
Reference<IPageEncryptionKeyProvider> keyProvider = {},
int64_t pageCacheBytes = 0)
int64_t pageCacheBytes = 0,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor = {})
: m_filename(filename), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
if (!encryptionMode.present() || encryptionMode.get().isEncryptionEnabled()) {
ASSERT(keyProvider.isValid() || db.isValid());
@ -8050,7 +8055,8 @@ public:
remapCleanupWindowBytes,
SERVER_KNOBS->REDWOOD_EXTENT_CONCURRENT_READS,
false);
m_tree = new VersionedBTree(pager, filename, logID, db, encryptionMode, encodingType, keyProvider);
m_tree = new VersionedBTree(
pager, filename, logID, db, encryptionMode, encodingType, keyProvider, encryptionMonitor);
m_init = catchError(init_impl(this));
}
@ -8345,14 +8351,16 @@ IKeyValueStore* keyValueStoreRedwoodV1(std::string const& filename,
UID logID,
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> encryptionMode,
int64_t pageCacheBytes) {
int64_t pageCacheBytes,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor) {
return new KeyValueStoreRedwood(filename,
logID,
db,
encryptionMode,
EncodingType::MAX_ENCODING_TYPE,
Reference<IPageEncryptionKeyProvider>(),
pageCacheBytes);
pageCacheBytes,
encryptionMonitor);
}
int randomSize(int max) {

View File

@ -20,6 +20,7 @@
#ifndef FDBSERVER_IKEYVALUESTORE_H
#define FDBSERVER_IKEYVALUESTORE_H
#include "fdbclient/GetEncryptCipherKeys.h"
#pragma once
#include "fdbclient/FDBTypes.h"
@ -35,7 +36,8 @@ extern IKeyValueStore* keyValueStoreRedwoodV1(std::string const& filename,
UID logID,
Reference<AsyncVar<struct ServerDBInfo> const> db = {},
Optional<EncryptionAtRestMode> encryptionMode = {},
int64_t pageCacheBytes = 0);
int64_t pageCacheBytes = 0,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor = {});
extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
UID logID,
KeyValueStoreType storeType,
@ -76,7 +78,8 @@ IKeyValueStore* openKVStore(KeyValueStoreType storeType,
bool openRemotely = false,
Reference<AsyncVar<struct ServerDBInfo> const> db = {},
Optional<EncryptionAtRestMode> encryptionMode = {},
int64_t pageCacheBytes = 0);
int64_t pageCacheBytes = 0,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor = {});
void GenerateIOLogChecksumFile(std::string filename);
Future<Void> KVFileCheck(std::string const& filename, bool const& integrity);

View File

@ -304,8 +304,10 @@ public:
const StringRef systemKeysPrefix = systemKeys.begin;
AESEncryptionKeyProvider(Reference<AsyncVar<ServerDBInfo> const> db, EncryptionAtRestMode encryptionMode)
: db(db), encryptionMode(encryptionMode) {
AESEncryptionKeyProvider(Reference<AsyncVar<ServerDBInfo> const> db,
EncryptionAtRestMode encryptionMode,
Reference<GetEncryptCipherKeysMonitor> monitor)
: db(db), encryptionMode(encryptionMode), monitor(monitor) {
ASSERT(encryptionMode != EncryptionAtRestMode::DISABLED);
ASSERT(db.isValid());
}
@ -324,7 +326,7 @@ public:
state TextAndHeaderCipherKeys cipherKeys;
BlobCipherEncryptHeaderRef headerRef = Encoder::getEncryptionHeaderRef(encodingHeader);
TextAndHeaderCipherKeys cks = wait(GetEncryptCipherKeys<ServerDBInfo>::getEncryptCipherKeys(
self->db, headerRef, BlobCipherMetrics::KV_REDWOOD));
self->db, headerRef, BlobCipherMetrics::KV_REDWOOD, self->monitor));
cipherKeys = cks;
EncryptionKey encryptionKey;
encryptionKey.aesKey = cipherKeys;
@ -343,7 +345,7 @@ public:
ASSERT(self->encryptionMode == EncryptionAtRestMode::DOMAIN_AWARE || domainId < 0);
TextAndHeaderCipherKeys cipherKeys =
wait(GetEncryptCipherKeys<ServerDBInfo>::getLatestEncryptCipherKeysForDomain(
self->db, domainId, BlobCipherMetrics::KV_REDWOOD));
self->db, domainId, BlobCipherMetrics::KV_REDWOOD, self->monitor));
EncryptionKey encryptionKey;
encryptionKey.aesKey = cipherKeys;
return encryptionKey;
@ -382,6 +384,7 @@ public:
private:
Reference<AsyncVar<ServerDBInfo> const> db;
EncryptionAtRestMode encryptionMode;
Reference<GetEncryptCipherKeysMonitor> monitor;
};
#include "flow/unactorcompiler.h"

View File

@ -26,6 +26,7 @@
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_H
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/Knobs.h"
@ -261,6 +262,7 @@ struct ProxyCommitData {
int localTLogCount = -1;
EncryptionAtRestMode encryptMode;
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor;
PromiseStream<ExpectedIdempotencyIdCountForKey> expectedIdempotencyIdCountForKey;
Standalone<VectorRef<MutationRef>> idempotencyClears;
@ -337,8 +339,9 @@ struct ProxyCommitData {
stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount, &tenantMap), master(master),
logAdapter(nullptr), txnStateStore(nullptr), committedVersion(recoveryTransactionVersion),
minKnownCommittedVersion(0), version(0), lastVersionTime(0), commitVersionRequestNumber(1),
mostRecentProcessedRequestNumber(0), firstProxy(firstProxy), encryptMode(encryptMode), provisional(provisional),
lastCoalesceTime(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
mostRecentProcessedRequestNumber(0), firstProxy(firstProxy), encryptMode(encryptMode),
encryptionMonitor(makeReference<GetEncryptCipherKeysMonitor>()), provisional(provisional), lastCoalesceTime(0),
locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
localCommitBatchesStarted(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
singleKeyMutationEvent("SingleKeyMutation"_sr), lastTxsPop(0), popRemoteTxs(false), lastStartCommit(0),

View File

@ -1225,15 +1225,16 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder);
std::string folder,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor);
ACTOR Future<Void> storageServer(
IKeyValueStore* persistentData,
StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<IClusterConnectionRecord>
connRecord); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
Reference<IClusterConnectionRecord> connRecord,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
@ -1365,6 +1366,37 @@ Future<T> ioTimeoutError(Future<T> what, double time, const char* context = null
}
}
ACTOR template <class T>
Future<T> ioTimeoutErrorIfCleared(Future<T> what,
double time,
Reference<AsyncVar<bool>> condition,
const char* context = nullptr) {
// Before simulation is sped up, IO operations can take a very long time so limit timeouts
// to not end until at least time after simulation is sped up.
if (g_network->isSimulated() && !g_simulator->speedUpSimulation) {
time += std::max(0.0, FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS - now());
}
Future<Void> end = lowPriorityDelayAfterCleared(condition, time);
choose {
when(T t = wait(what)) {
return t;
}
when(wait(end)) {
Error err = io_timeout();
if (isSimulatorProcessUnreliable()) {
err = err.asInjectedFault();
}
TraceEvent e(SevError, "IoTimeoutError");
e.error(err);
if (context != nullptr) {
e.detail("Context", context);
}
e.log();
throw err;
}
}
}
ACTOR template <class T>
Future<T> ioDegradedOrTimeoutError(Future<T> what,
double errTime,

View File

@ -1389,6 +1389,7 @@ public:
Optional<LatencyBandConfig> latencyBandConfig;
Optional<EncryptionAtRestMode> encryptionMode;
Reference<GetEncryptCipherKeysMonitor> getEncryptCipherKeysMonitor;
struct Counters : CommonStorageCounters {
@ -1611,7 +1612,8 @@ public:
StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi)
StorageServerInterface const& ssi,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor)
: shardAware(false), tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
TLOG_CURSOR_READS_LATENCY_HISTOGRAM,
Histogram::Unit::milliseconds)),
@ -1667,7 +1669,7 @@ public:
/*maxTagsTracked=*/SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED,
/*minRateTracked=*/SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE *
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE),
busiestWriteTagContext(ssi.id()), counters(this),
busiestWriteTagContext(ssi.id()), getEncryptCipherKeysMonitor(encryptionMonitor), counters(this),
storageServerSourceTLogIDEventHolder(
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")),
tenantData(db) {
@ -11833,7 +11835,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
recentCommitStats.back().whenCommit = now();
try {
wait(ioTimeoutError(durable, SERVER_KNOBS->MAX_STORAGE_COMMIT_TIME, "StorageCommit"));
wait(ioTimeoutErrorIfCleared(durable,
SERVER_KNOBS->MAX_STORAGE_COMMIT_TIME,
data->getEncryptCipherKeysMonitor->degraded(),
"StorageCommit"));
} catch (Error& e) {
if (e.code() == error_code_io_timeout) {
if (SERVER_KNOBS->LOGGING_STORAGE_COMMIT_WHEN_IO_TIMEOUT) {
@ -13764,8 +13769,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
std::string folder,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor) {
state StorageServer self(persistentData, db, ssi, encryptionMonitor);
self.shardAware = persistentData->shardAware();
state Future<Void> ssCore;
self.initialClusterVersion = startVersion;
@ -13890,8 +13896,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<IClusterConnectionRecord> connRecord) {
state StorageServer self(persistentData, db, ssi);
Reference<IClusterConnectionRecord> connRecord,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor) {
state StorageServer self(persistentData, db, ssi, encryptionMonitor);
state Future<Void> ssCore;
self.folder = folder;
self.checkpointFolder = joinPath(self.folder, serverCheckpointFolder);

View File

@ -1606,7 +1606,8 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
int64_t memoryLimit,
IKeyValueStore* store,
bool validateDataFiles,
Promise<Void>* rebootKVStore) {
Promise<Void>* rebootKVStore,
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor) {
state TrackRunningStorage _(id, storeType, locality, filename, runningStorages, storageCleaners);
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
@ -1675,8 +1676,13 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
DUMPTOKEN(recruited.changeFeedVersionUpdate);
Future<ErrorOr<Void>> storeError = errorOr(store->getError());
prevStorageServer =
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<IClusterConnectionRecord>(nullptr));
prevStorageServer = storageServer(store,
recruited,
db,
folder,
Promise<Void>(),
Reference<IClusterConnectionRecord>(nullptr),
encryptionMonitor);
prevStorageServer = handleIOErrors(prevStorageServer, storeError, id, store->onClosed());
}
}
@ -2225,6 +2231,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor = makeReference<GetEncryptCipherKeysMonitor>();
IKeyValueStore* kv = openKVStore(
s.storeType,
s.filename,
@ -2238,7 +2245,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
s.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true),
dbInfo);
dbInfo,
Optional<EncryptionAtRestMode>(),
0,
encryptionMonitor);
Future<Void> kvClosed =
kv->onClosed() ||
rebootKVSPromise.getFuture() /* clear the onClosed() Future in actorCollection when rebooting */;
@ -2287,7 +2297,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Future<ErrorOr<Void>> storeError = errorOr(kv->getError());
Promise<Void> recovery;
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord, encryptionMonitor);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, storeError, s.storeID, kvClosed);
@ -2305,7 +2315,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
memoryLimit,
kv,
validateDataFiles,
&rebootKVSPromise);
&rebootKVSPromise,
encryptionMonitor);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), f));
} else if (s.storedComponent == DiskStore::TLogData) {
LocalLineage _;
@ -3007,6 +3018,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
folder,
isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(),
recruited.id());
Reference<GetEncryptCipherKeysMonitor> encryptionMonitor =
makeReference<GetEncryptCipherKeysMonitor>();
IKeyValueStore* data = openKVStore(
req.storeType,
filename,
@ -3021,7 +3034,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
deterministicRandom()->coinflip())
: true),
dbInfo,
req.encryptMode);
req.encryptMode,
0,
encryptionMonitor);
TraceEvent("StorageServerInitProgress", recruited.id())
.detail("ReqID", req.reqId)
.detail("StorageType", req.storeType.toString())
@ -3043,7 +3058,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,
folder);
folder,
encryptionMonitor);
s = handleIOErrors(s, storeError, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(&runningStorages,
@ -3060,7 +3076,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
memoryLimit,
data,
false,
&rebootKVSPromise2);
&rebootKVSPromise2,
encryptionMonitor);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
} else if (storageCache.exists(req.reqId)) {
forwardPromise(req.reply, storageCache.get(req.reqId));

View File

@ -164,6 +164,41 @@ ACTOR Future<Void> lowPriorityDelay(double waitTime) {
return Void();
}
ACTOR Future<Void> delayAfterCleared(Reference<AsyncVar<bool>> condition, double time, TaskPriority taskID) {
state Future<Void> timer = condition->get() ? Never() : delay(time, taskID);
state bool previousState = condition->get();
loop choose {
when(wait(timer)) {
return Void();
}
when(wait(condition->onChange())) {
bool currentState = condition->get();
if (currentState != previousState) {
timer = currentState ? Never() : delay(time, taskID);
previousState = currentState;
}
}
}
}
// Same as delayAfterCleared, but use lowPriorityDelay.
ACTOR Future<Void> lowPriorityDelayAfterCleared(Reference<AsyncVar<bool>> condition, double time) {
state Future<Void> timer = condition->get() ? Never() : lowPriorityDelay(time);
state bool previousState = condition->get();
loop choose {
when(wait(timer)) {
return Void();
}
when(wait(condition->onChange())) {
bool currentState = condition->get();
if (currentState != previousState) {
timer = currentState ? Never() : lowPriorityDelay(time);
previousState = currentState;
}
}
}
}
namespace {
struct DummyState {

View File

@ -22,7 +22,9 @@
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/TaskPriority.h"
#include "flow/network.h"
#include "flow/swift_support.h"
#include <utility>
@ -942,6 +944,35 @@ Future<Void> setWhenDoneOrError(Future<Void> condition, Reference<AsyncVar<T>> v
return Void();
}
ACTOR Future<Void> lowPriorityDelay(double waitTime);
// Delay after condition is cleared (i.e. equal to false).
// If during delay, condition changes to true, wait till condition become false again, and repeat.
ACTOR Future<Void> delayAfterCleared(Reference<AsyncVar<bool>> condition,
double time,
TaskPriority taskID = TaskPriority::DefaultDelay);
// Same as delayAfterCleared, but use lowPriorityDelay.
ACTOR Future<Void> lowPriorityDelayAfterCleared(Reference<AsyncVar<bool>> condition, double time);
// Similar to timeoutError, but does not throw timed_out if condition is true.
// Once condition becomes false again, reset the timer (e.g. if time is 10s, wait for 10s again before throwing
// timed_out, if condition remains to be false).
ACTOR template <class T>
Future<T> timeoutErrorIfCleared(Future<T> what,
Reference<AsyncVar<bool>> condition,
double time,
TaskPriority taskID = TaskPriority::DefaultDelay) {
choose {
when(T t = wait(what)) {
return t;
}
when(wait(delayAfterCleared(condition, time, taskID))) {
throw timed_out();
}
}
}
Future<bool> allTrue(const std::vector<Future<bool>>& all);
Future<Void> anyTrue(std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output);
Future<Void> cancelOnly(std::vector<Future<Void>> const& futures);
@ -950,7 +981,6 @@ Future<Void> timeoutWarningCollector(FutureStream<Void> const& input,
const char* const& context,
UID const& id);
ACTOR Future<bool> quorumEqualsTrue(std::vector<Future<bool>> futures, int required);
Future<Void> lowPriorityDelay(double const& waitTime);
ACTOR template <class T>
Future<Void> streamHelper(PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input) {
@ -1454,6 +1484,65 @@ Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
}
}
// A simple counter designed to track an ongoing count of something, such as how many actors are in a critical section,
// how many bytes are currently being processed, etc... Can be explicitly released idempotently, or will automatically
// release when destructed to handle actor ending or errors.
// Can be used for any type T so long as it has += and -= operators.
// Usage Example: tracking number of actors in code section
// ActiveCounter<int> counter;
//
// state ActiveCounter::Releaser tracker = counter.take(1);
// wait(perform my operation);
// tracker.release();
template <class T>
struct ActiveCounter {
struct Releaser : NonCopyable {
ActiveCounter<T>* parent;
T delta;
std::function<void()> releaseCallback;
Releaser() : parent(nullptr) {}
Releaser(ActiveCounter<T>* parent, T delta, std::function<void()> releaseCallback)
: parent(parent), delta(delta), releaseCallback(releaseCallback) {
parent->counter += delta;
}
Releaser(Releaser&& r) noexcept : parent(r.parent), delta(r.delta), releaseCallback(r.releaseCallback) {
r.parent = nullptr;
}
void operator=(Releaser&& r) {
release();
parent = r.parent;
delta = r.delta;
releaseCallback = r.releaseCallback;
r.parent = nullptr;
}
void release() {
if (parent) {
parent->counter -= delta;
parent = nullptr;
if (releaseCallback) {
releaseCallback();
}
}
}
~Releaser() { release(); }
};
T counter;
ActiveCounter(T initialValue) : counter(initialValue) {}
T getValue() { return counter; }
Releaser take(T delta, std::function<void()> releaseCallback = {}) {
return Releaser(this, delta, releaseCallback);
}
};
// A low-overhead FIFO mutex made with no internal queue structure (no list, deque, vector, etc)
// The lock is implemented as a Promise<Void>, which is returned to callers in a convenient wrapper
// called Lock.