add kms and ekp status to json

This commit is contained in:
Nim Wijetunga 2023-06-06 11:00:21 -07:00 committed by Xiaoge Su
parent d4d3e562af
commit 7e14bd3389
11 changed files with 256 additions and 10 deletions

View File

@ -984,6 +984,10 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"data_cluster_id" : 12346,
"num_data_clusters":10
},
"kms_is_healthy": true,
"encryption_at_rest": {
"ekp_is_healthy": true
},
"tenants":{
"num_tenants":0,
"num_tenant_groups":10,

View File

@ -34,6 +34,36 @@
#include <limits>
#define DEBUG_ENCRYPT_KEY_PROXY false
struct KMSHealthStatus {
constexpr static FileIdentifier file_identifier = 2378149;
bool canConnectToKms;
bool canConnectToEKP;
double lastUpdatedTS;
KMSHealthStatus() : canConnectToEKP(false), canConnectToKms(false), lastUpdatedTS(-1) {}
KMSHealthStatus(bool canConnectToKms, bool canConnectToEKP, double lastUpdatedTS)
: canConnectToKms(canConnectToKms), canConnectToEKP(canConnectToEKP), lastUpdatedTS(lastUpdatedTS) {}
bool operator==(const KMSHealthStatus& other) {
return canConnectToKms == other.canConnectToKms && canConnectToEKP == other.canConnectToEKP;
}
std::string toString() const {
std::stringstream ss;
ss << "CanConnectToKms(" << canConnectToKms << ")"
<< ", CanConnectToEKP(" << canConnectToEKP << ")"
<< ", LastUpdatedTS(" << lastUpdatedTS << ")";
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, canConnectToKms, canConnectToEKP, lastUpdatedTS);
}
};
struct EncryptKeyProxyInterface {
constexpr static FileIdentifier file_identifier = 1303419;
struct LocalityData locality;
@ -43,6 +73,7 @@ struct EncryptKeyProxyInterface {
RequestStream<struct EKPGetBaseCipherKeysByIdsRequest> getBaseCipherKeysByIds;
RequestStream<struct EKPGetLatestBaseCipherKeysRequest> getLatestBaseCipherKeys;
RequestStream<struct EKPGetLatestBlobMetadataRequest> getLatestBlobMetadata;
RequestStream<struct EncryptKeyProxyHealthStatusRequest> getHealthStatus;
EncryptKeyProxyInterface() {}
explicit EncryptKeyProxyInterface(const struct LocalityData& loc, UID id) : locality(loc), myId(id) {}
@ -70,6 +101,8 @@ struct EncryptKeyProxyInterface {
waitFailure.getEndpoint().getAdjustedEndpoint(3));
getLatestBlobMetadata =
RequestStream<struct EKPGetLatestBlobMetadataRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
getHealthStatus = RequestStream<struct EncryptKeyProxyHealthStatusRequest>(
waitFailure.getEndpoint().getAdjustedEndpoint(5));
}
}
@ -80,6 +113,7 @@ struct EncryptKeyProxyInterface {
streams.push_back(getBaseCipherKeysByIds.getReceiver(TaskPriority::Worker));
streams.push_back(getLatestBaseCipherKeys.getReceiver(TaskPriority::Worker));
streams.push_back(getLatestBlobMetadata.getReceiver(TaskPriority::Worker));
streams.push_back(getHealthStatus.getReceiver(TaskPriority::Worker));
FlowTransport::transport().addEndpoints(streams);
}
};
@ -98,6 +132,18 @@ struct HaltEncryptKeyProxyRequest {
}
};
struct EncryptKeyProxyHealthStatusRequest {
constexpr static FileIdentifier file_identifier = 2378139;
ReplyPromise<KMSHealthStatus> reply;
EncryptKeyProxyHealthStatusRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct EKPBaseCipherDetails {
constexpr static FileIdentifier file_identifier = 2149615;
int64_t encryptDomainId;

View File

@ -78,6 +78,16 @@ bool canReplyWith(Error e) {
}
}
bool isKmsConnectionError(Error e) {
switch (e.code()) {
case error_code_timed_out:
case error_code_connection_failed:
return true;
default:
return false;
}
}
int64_t computeCipherRefreshTS(Optional<int64_t> refreshInterval, int64_t currTS) {
int64_t refreshAtTS = -1;
const int64_t defaultTTL = FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL;
@ -212,6 +222,7 @@ public:
PromiseStream<Future<Void>> addActor;
Future<Void> encryptionKeyRefresher;
Future<Void> blobMetadataRefresher;
Future<Void> healthChecker;
Future<Void> logger;
EncryptBaseDomainIdCache baseCipherDomainIdCache;
@ -220,6 +231,8 @@ public:
std::unique_ptr<KmsConnector> kmsConnector;
KMSHealthStatus kmsConnectorHealthStatus;
CounterCollection ekpCacheMetrics;
Counter baseCipherKeyIdCacheMisses;
@ -233,6 +246,8 @@ public:
Counter blobMetadataCacheMisses;
Counter blobMetadataRefreshed;
Counter numBlobMetadataRefreshErrors;
Counter numHealthCheckErrors;
Counter numHealthCheckRequests;
LatencySample kmsLookupByIdsReqLatency;
LatencySample kmsLookupByDomainIdsReqLatency;
@ -251,6 +266,8 @@ public:
blobMetadataCacheMisses("EKPBlobMetadataCacheMisses", ekpCacheMetrics),
blobMetadataRefreshed("EKPBlobMetadataRefreshed", ekpCacheMetrics),
numBlobMetadataRefreshErrors("EKPBlobMetadataRefreshErrors", ekpCacheMetrics),
numHealthCheckErrors("KMSHealthCheckErrors", ekpCacheMetrics),
numHealthCheckRequests("KMSHealthCheckRequests", ekpCacheMetrics),
kmsLookupByIdsReqLatency("EKPKmsLookupByIdsReqLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -475,7 +492,14 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
"");
}
}
if (keysByIdsRep.cipherKeyDetails.size() > 0) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
}
} catch (Error& e) {
if (isKmsConnectionError(e)) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
}
if (!canReplyWith(e)) {
TraceEvent("GetCipherKeysByKeyIds", ekpProxyData->myId).error(e);
throw;
@ -616,7 +640,13 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
"");
}
}
if (keysByDomainIdRep.cipherKeyDetails.size() > 0) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
}
} catch (Error& e) {
if (isKmsConnectionError(e)) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
}
if (!canReplyWith(e)) {
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).error(e);
throw;
@ -654,6 +684,66 @@ bool isBlobMetadataEligibleForRefresh(const BlobMetadataDetailsRef& blobMetadata
return nextRefreshCycleTS > blobMetadata.expireAt || nextRefreshCycleTS > blobMetadata.refreshAt;
}
ACTOR Future<KMSHealthStatus> getHealthStatusImpl(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf) {
state UID debugId = deterministicRandom()->randomUniqueID();
if (DEBUG_ENCRYPT_KEY_PROXY) {
TraceEvent(SevDebug, "KMSHealthCheckStart", ekpProxyData->myId);
}
// Health check will try to fetch the encryption details for the system key
try {
KmsConnLookupEKsByDomainIdsReq req;
req.debugId = debugId;
req.encryptDomainIds.push_back(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID);
++ekpProxyData->numHealthCheckRequests;
KmsConnLookupEKsByDomainIdsRep rep = wait(timeoutError(kmsConnectorInf.ekLookupByDomainIds.getReply(req),
FLOW_KNOBS->EKP_HEALTH_CHECK_REQUEST_TIMEOUT));
if (rep.cipherKeyDetails.size() < 1) {
TraceEvent(SevWarn, "KMSHealthCheckResponseEmpty");
throw encrypt_key_not_found();
}
EncryptCipherKeyDetailsRef cipherDetails = rep.cipherKeyDetails[0];
if (cipherDetails.encryptDomainId != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
TraceEvent(SevWarn, "KMSHealthCheckNoSystemKeyFound");
throw encrypt_key_not_found();
}
CipherKeyValidityTS validityTS =
getCipherKeyValidityTS(cipherDetails.refreshAfterSec, cipherDetails.expireAfterSec);
ekpProxyData->insertIntoBaseDomainIdCache(cipherDetails.encryptDomainId,
cipherDetails.encryptKeyId,
cipherDetails.encryptKey,
cipherDetails.encryptKCV,
validityTS.refreshAtTS,
validityTS.expAtTS);
return KMSHealthStatus{ true, true, now() };
} catch (Error& e) {
TraceEvent(SevWarn, "KMSHealthCheckError").error(e);
if (!canReplyWith(e)) {
throw;
}
++ekpProxyData->numHealthCheckErrors;
return KMSHealthStatus{ false, true, now() };
}
}
ACTOR Future<Void> updateHealthStatusImpl(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf) {
// If the health check status has been updated recently avoid doing another refresh
if (now() - ekpProxyData->kmsConnectorHealthStatus.lastUpdatedTS < FLOW_KNOBS->ENCRYPT_KEY_HEALTH_CHECK_INTERVAL) {
return Void();
}
KMSHealthStatus status = wait(getHealthStatusImpl(ekpProxyData, kmsConnectorInf));
if (status != ekpProxyData->kmsConnectorHealthStatus) {
TraceEvent("KmsConnectorHealthStatusChange")
.detail("OldStatus", ekpProxyData->kmsConnectorHealthStatus.toString())
.detail("NewStatus", status.toString());
}
ekpProxyData->kmsConnectorHealthStatus = status;
return Void();
}
ACTOR Future<Void> refreshEncryptionKeysImpl(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf) {
state UID debugId = deterministicRandom()->randomUniqueID();
@ -719,10 +809,15 @@ ACTOR Future<Void> refreshEncryptionKeysImpl(Reference<EncryptKeyProxyData> ekpP
}
ekpProxyData->baseCipherKeysRefreshed += rep.cipherKeyDetails.size();
if (rep.cipherKeyDetails.size() > 0) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
}
t.detail("NumKeys", rep.cipherKeyDetails.size());
CODE_PROBE(!rep.cipherKeyDetails.empty(), "EKP refresh cipherKeys");
} catch (Error& e) {
if (isKmsConnectionError(e)) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
}
if (!canReplyWith(e)) {
TraceEvent(SevWarn, "RefreshEKsError").error(e);
throw e;
@ -738,6 +833,10 @@ Future<Void> refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData,
return refreshEncryptionKeysImpl(ekpProxyData, kmsConnectorInf);
}
Future<Void> updateHealthStatus(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
return updateHealthStatusImpl(ekpProxyData, kmsConnectorInf);
}
ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf,
EKPGetLatestBlobMetadataRequest req) {
@ -805,7 +904,14 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
dbgTrace.get().detail("BMI" + std::to_string(item.domainId), "");
}
}
if (kmsRep.metadataDetails.size() > 0) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
}
} catch (Error& e) {
if (isKmsConnectionError(e)) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
}
if (!canReplyWith(e)) {
TraceEvent("GetLatestBlobMetadataUnexpectedError", ekpProxyData->myId).error(e);
throw;
@ -822,7 +928,7 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpProxyData,
KmsConnectorInterface kmsConnectorInf) {
state UID debugId = deterministicRandom()->randomUniqueID();
state UID debugId = ekpProxyData->myId;
state double startTime;
state TraceEvent t("RefreshBlobMetadataStart", ekpProxyData->myId);
@ -862,9 +968,15 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
}
ekpProxyData->blobMetadataRefreshed += rep.metadataDetails.size();
if (rep.metadataDetails.size() > 0) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
}
t.detail("nKeys", rep.metadataDetails.size());
} catch (Error& e) {
if (isKmsConnectionError(e)) {
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
}
if (!canReplyWith(e)) {
TraceEvent("RefreshBlobMetadataError").error(e);
throw e;
@ -929,6 +1041,13 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
CLIENT_KNOBS->BLOB_METADATA_REFRESH_INTERVAL,
TaskPriority::Worker);
self->healthChecker = recurringAsync([&]() { return updateHealthStatus(self, kmsConnectorInf); },
FLOW_KNOBS->ENCRYPT_KEY_HEALTH_CHECK_INTERVAL,
true,
FLOW_KNOBS->ENCRYPT_KEY_HEALTH_CHECK_INTERVAL,
TaskPriority::Worker,
true);
CODE_PROBE(!encryptMode.isEncryptionEnabled() && SERVER_KNOBS->ENABLE_REST_KMS_COMMUNICATION,
"Encryption disabled and EKP Recruited");
try {
@ -951,6 +1070,10 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
req.reply.send(Void());
break;
}
when(EncryptKeyProxyHealthStatusRequest req = waitNext(ekpInterface.getHealthStatus.getFuture())) {
ASSERT(encryptMode.isEncryptionEnabled() || SERVER_KNOBS->ENABLE_REST_KMS_COMMUNICATION);
req.reply.send(self->kmsConnectorHealthStatus);
}
when(wait(collection)) {
ASSERT(false);
throw internal_error();

View File

@ -401,7 +401,7 @@ Standalone<VectorRef<EncryptCipherKeyDetailsRef>> parseEncryptCipherResponse(Ref
}
if (FLOW_KNOBS->REST_LOG_LEVEL >= RESTLogSeverity::VERBOSE) {
TraceEvent("RESTParseEncryptCipherResponseStart", ctx->uid).detail("Response", resp->toString());
TraceEvent("RESTParseEncryptCipherResponseStart", ctx->uid);
}
rapidjson::Document doc;
@ -827,12 +827,12 @@ Future<T> kmsRequestImpl(
} catch (Error& e) {
curUrl->nFailedResponses++;
if (pass > 1 && isKmsNotReachable(e.code())) {
TraceEvent(SevDebug, "KmsRequestFailedUnreachable", ctx->uid)
TraceEvent(SevWarn, "KmsRequestFailedUnreachable", ctx->uid)
.error(e)
.detail("RequestID", requestID);
throw e;
} else {
TraceEvent(SevDebug, "KmsRequestError", ctx->uid).error(e).detail("RequestID", requestID);
TraceEvent(SevWarn, "KmsRequestError", ctx->uid).error(e).detail("RequestID", requestID);
// attempt to do request from next KmsUrl
}
}

View File

@ -20,8 +20,11 @@
#include <cinttypes>
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/EncryptKeyProxyInterface.h"
#include "fdbclient/json_spirit/json_spirit_value.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/BlobManagerInterface.h"
#include "flow/genericactors.actor.h"
#include "fmt/format.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BlobWorkerInterface.h"
@ -2965,6 +2968,24 @@ ACTOR Future<std::pair<Optional<StorageWiggleMetrics>, Optional<StorageWiggleMet
}
}
}
ACTOR Future<KMSHealthStatus> getKMSHealthStatus(Reference<const AsyncVar<ServerDBInfo>> db) {
try {
if (!db->get().client.encryptKeyProxy.present()) {
return KMSHealthStatus{ false, false, now() };
}
KMSHealthStatus reply = wait(timeoutError(
db->get().client.encryptKeyProxy.get().getHealthStatus.getReply(EncryptKeyProxyHealthStatusRequest()),
FLOW_KNOBS->EKP_HEALTH_CHECK_REQUEST_TIMEOUT));
return reply;
} catch (Error& e) {
if (e.code() != error_code_timed_out) {
throw;
}
return KMSHealthStatus{ false, false, now() };
}
}
// read storageWigglerStats through Read-only tx, then convert it to JSON field
ACTOR Future<JsonBuilderObject> storageWigglerStatsFetcher(Optional<DataDistributorInterface> ddWorker,
DatabaseConfiguration conf,
@ -3059,6 +3080,22 @@ ACTOR Future<StatusReply> clusterGetStatus(
state WorkerDetails csWorker; // ConsistencyScan worker
try {
state JsonBuilderObject statusObj;
// Get EKP Health
if (db->get().client.encryptKeyProxy.present()) {
KMSHealthStatus status = wait(getKMSHealthStatus(db));
JsonBuilderObject _statusObj;
_statusObj["ekp_is_healthy"] = status.canConnectToEKP;
statusObj["encryption_at_rest"] = _statusObj;
statusObj["kms_is_healthy"] = status.canConnectToKms;
// TODO: In this scenario we should see if we can fetch any status fields that don't depend on encryption
if (!status.canConnectToKms || !status.canConnectToEKP) {
return StatusReply(statusObj.getJson());
}
}
// Get the master Worker interface
Optional<WorkerDetails> _mWorker = getWorker(workers, db->get().master.address());
if (_mWorker.present()) {
@ -3186,7 +3223,6 @@ ACTOR Future<StatusReply> clusterGetStatus(
state WorkerEvents programStarts =
workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents();
state JsonBuilderObject statusObj;
if (db->get().recoveryCount > 0) {
statusObj["generation"] = db->get().recoveryCount;
}

View File

@ -61,5 +61,7 @@ ACTOR Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEve
std::vector<WorkerDetails> workers,
std::string eventName);
ACTOR Future<KMSHealthStatus> getKMSHealthStatus(Reference<const AsyncVar<ServerDBInfo>> db);
#include "flow/unactorcompiler.h"
#endif

View File

@ -2717,6 +2717,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.getBaseCipherKeysByIds);
DUMPTOKEN(recruited.getLatestBaseCipherKeys);
DUMPTOKEN(recruited.getLatestBlobMetadata);
DUMPTOKEN(recruited.getHealthStatus);
Future<Void> encryptKeyProxyProcess = encryptKeyProxyServer(recruited, dbInfo, req.encryptMode);
errorForwarders.add(forwardError(

View File

@ -26,6 +26,7 @@
#include "fdbserver/Knobs.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/Status.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -199,6 +200,20 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> simHealthyKms(EncryptKeyProxyTestWorkload* self) {
TraceEvent("SimHealthyKmsStart").log();
loop {
KMSHealthStatus status = wait(getKMSHealthStatus(self->dbInfo));
if (status.canConnectToKms && status.canConnectToEKP) {
ASSERT_GE(status.lastUpdatedTS, 0);
ASSERT_GE(now(), status.lastUpdatedTS);
break;
}
wait(delay(20.0));
}
return Void();
}
// Following test cases are covered:
// 1. Simulate an empty domainIdCache.
// 2. Simulate an mixed lookup (partial cache-hit) for domainIdCache.
@ -225,6 +240,11 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
// Simulate lookup BaseCipherIds which aren't yet cached
wait(self->simLookupInvalidKeyId(self));
// Simulate getting health status for healthy KMS
wait(self->simHealthyKms(self));
// TODO: Test unhealthy kms status when we implement kms http server in simulation
return Void();
}

View File

@ -309,6 +309,9 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
if ( randomize && BUGGIFY) { ENCRYPT_CIPHER_KEY_CACHE_TTL = deterministicRandom()->randomInt(2, 10) * 60; }
init( ENCRYPT_KEY_REFRESH_INTERVAL, isSimulated ? 60 : 8 * 60 );
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(2, 10); }
init( ENCRYPT_KEY_HEALTH_CHECK_INTERVAL, 10 );
if ( randomize && BUGGIFY) { ENCRYPT_KEY_HEALTH_CHECK_INTERVAL = deterministicRandom()->randomInt(10, 60); }
init( EKP_HEALTH_CHECK_REQUEST_TIMEOUT, 10.0);
init( ENCRYPT_KEY_CACHE_LOGGING_INTERVAL, 5.0 );
init( ENCRYPT_KEY_CACHE_LATENCY_LOGGING_INTERVAL, 60.0 );
init( ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY, 0.01 );

View File

@ -372,6 +372,8 @@ public:
// Encryption
int64_t ENCRYPT_CIPHER_KEY_CACHE_TTL;
int64_t ENCRYPT_KEY_REFRESH_INTERVAL;
int64_t ENCRYPT_KEY_HEALTH_CHECK_INTERVAL;
double EKP_HEALTH_CHECK_REQUEST_TIMEOUT;
double ENCRYPT_KEY_CACHE_LOGGING_INTERVAL;
double ENCRYPT_KEY_CACHE_LATENCY_LOGGING_INTERVAL;
double ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY;

View File

@ -1338,7 +1338,8 @@ Future<Void> recurringAsync(
// value AND/OR actor functor taking longer than expected to return, could cause actor
// functor to run with no-delay
double initialDelay, // Initial delay interval
TaskPriority taskID = TaskPriority::DefaultDelay) {
TaskPriority taskID = TaskPriority::DefaultDelay,
bool jittered = false) {
wait(delay(initialDelay));
@ -1350,12 +1351,20 @@ Future<Void> recurringAsync(
if (absoluteIntervalDelay) {
wait(val);
// Ensure subsequent actorFunc executions observe client supplied delay interval.
wait(delay(interval));
if (jittered) {
wait(delayJittered(interval));
} else {
wait(delay(interval));
}
} else {
// Guarantee at-least client supplied interval delay; two possible scenarios:
// 1. The actorFunc executions finishes before 'interval' delay
// 2. The actorFunc executions takes > 'interval' delay.
wait(val && delay(interval));
if (jittered) {
wait(val && delayJittered(interval));
} else {
wait(val && delay(interval));
}
}
}
}