Merge pull request #10632 from sfc-gh-yiwu/ear_status
EaR: update status json and reduce KMS request timeout
This commit is contained in:
commit
7718e1fef0
|
@ -82,17 +82,15 @@ RESTClientKnobs::RESTClientKnobs() {
|
|||
}
|
||||
|
||||
void RESTClientKnobs::set(const std::unordered_map<std::string, int>& knobSettings) {
|
||||
TraceEvent trace = TraceEvent("RESTClientSetKnobs");
|
||||
|
||||
for (const auto& itr : knobSettings) {
|
||||
const auto& kItr = RESTClientKnobs::knobMap.find(itr.first);
|
||||
if (kItr == RESTClientKnobs::knobMap.end()) {
|
||||
trace.detail("RESTClientInvalidKnobName", itr.first);
|
||||
TraceEvent("RESTClientInvalidKnobName").detail("KnobName", itr.first);
|
||||
throw rest_invalid_rest_client_knob();
|
||||
}
|
||||
ASSERT_EQ(itr.first.compare(kItr->first), 0);
|
||||
*(kItr->second) = itr.second;
|
||||
trace.detail(itr.first.c_str(), itr.second);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1024,10 +1024,17 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
|
|||
"data_cluster_id" : 12346,
|
||||
"num_data_clusters":10
|
||||
},
|
||||
"kms_is_healthy": true,
|
||||
"encryption_at_rest": {
|
||||
"ekp_is_healthy": true
|
||||
},
|
||||
"kms" : {
|
||||
"kms_connector_type": "RESTKmsConnector",
|
||||
"kms_is_healthy": true,
|
||||
"kms_stable": true,
|
||||
"kms_urls":[
|
||||
"https://127.0.0.1:1234"
|
||||
]
|
||||
},
|
||||
"tenants":{
|
||||
"num_tenants":0,
|
||||
"num_tenant_groups":10,
|
||||
|
|
|
@ -1213,6 +1213,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( REST_KMS_MAX_BLOB_METADATA_REQUEST_VERSION, 1);
|
||||
init( REST_KMS_CURRENT_CIPHER_REQUEST_VERSION, 1);
|
||||
init( REST_KMS_MAX_CIPHER_REQUEST_VERSION, 1);
|
||||
init( REST_SIM_KMS_VAULT_DIR, "");
|
||||
init( REST_KMS_STABILITY_CHECK_INTERVAL, 5.0);
|
||||
|
||||
init( CONSISTENCY_SCAN_ACTIVE_THROTTLE_RATIO, 0.5 ); if( randomize && BUGGIFY ) CONSISTENCY_SCAN_ACTIVE_THROTTLE_RATIO = deterministicRandom()->random01();
|
||||
|
||||
|
|
|
@ -41,26 +41,43 @@ struct KMSHealthStatus {
|
|||
bool canConnectToKms;
|
||||
bool canConnectToEKP;
|
||||
double lastUpdatedTS;
|
||||
std::string kmsConnectorType;
|
||||
std::vector<std::string> restKMSUrls;
|
||||
bool kmsStable = true;
|
||||
|
||||
KMSHealthStatus() : canConnectToEKP(false), canConnectToKms(false), lastUpdatedTS(-1) {}
|
||||
KMSHealthStatus(bool canConnectToKms, bool canConnectToEKP, double lastUpdatedTS)
|
||||
: canConnectToKms(canConnectToKms), canConnectToEKP(canConnectToEKP), lastUpdatedTS(lastUpdatedTS) {}
|
||||
KMSHealthStatus() : canConnectToKms(false), canConnectToEKP(false), lastUpdatedTS(-1), kmsStable(true) {}
|
||||
|
||||
bool operator==(const KMSHealthStatus& other) {
|
||||
return canConnectToKms == other.canConnectToKms && canConnectToEKP == other.canConnectToEKP;
|
||||
}
|
||||
|
||||
bool healthnessChanged(const KMSHealthStatus& other) {
|
||||
return canConnectToKms != other.canConnectToKms || canConnectToEKP != other.canConnectToEKP;
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
std::stringstream ss;
|
||||
ss << "CanConnectToKms(" << canConnectToKms << ")"
|
||||
<< ", CanConnectToEKP(" << canConnectToEKP << ")"
|
||||
<< ", LastUpdatedTS(" << lastUpdatedTS << ")";
|
||||
<< ", LastUpdatedTS(" << lastUpdatedTS << ")"
|
||||
<< ", KMSConnectorType(" << kmsConnectorType << ")"
|
||||
<< ", RESTKmsUrls(";
|
||||
bool firstUrl = true;
|
||||
for (const auto& url : restKMSUrls) {
|
||||
if (!firstUrl) {
|
||||
ss << ", ";
|
||||
}
|
||||
ss << url;
|
||||
firstUrl = false;
|
||||
}
|
||||
|
||||
ss << "), KmsStable(" << kmsStable << ")";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, canConnectToKms, canConnectToEKP, lastUpdatedTS);
|
||||
serializer(ar, canConnectToKms, canConnectToEKP, lastUpdatedTS, kmsConnectorType, restKMSUrls, kmsStable);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -87,7 +87,12 @@ struct RESTConnectionType {
|
|||
|
||||
// Util interface facilitating management and update for RESTClient knob parameters
|
||||
struct RESTClientKnobs {
|
||||
int connection_pool_size, connect_timeout, connect_tries, max_connection_life, request_tries, request_timeout_secs;
|
||||
int connection_pool_size;
|
||||
int connect_timeout;
|
||||
int connect_tries;
|
||||
int max_connection_life; // Note: this knob is not implemented yet in RESTClient
|
||||
int request_tries;
|
||||
int request_timeout_secs;
|
||||
|
||||
RESTClientKnobs();
|
||||
|
||||
|
|
|
@ -1222,6 +1222,8 @@ public:
|
|||
int REST_KMS_MAX_BLOB_METADATA_REQUEST_VERSION;
|
||||
int REST_KMS_CURRENT_CIPHER_REQUEST_VERSION;
|
||||
int REST_KMS_MAX_CIPHER_REQUEST_VERSION;
|
||||
std::string REST_SIM_KMS_VAULT_DIR;
|
||||
double REST_KMS_STABILITY_CHECK_INTERVAL;
|
||||
|
||||
double CONSISTENCY_SCAN_ACTIVE_THROTTLE_RATIO;
|
||||
|
||||
|
|
|
@ -231,7 +231,8 @@ public:
|
|||
|
||||
std::unique_ptr<KmsConnector> kmsConnector;
|
||||
|
||||
KMSHealthStatus kmsConnectorHealthStatus;
|
||||
bool canConnectToKms = true;
|
||||
double canConnectToKmsLastUpdatedTS = 0;
|
||||
|
||||
CounterCollection ekpCacheMetrics;
|
||||
|
||||
|
@ -284,6 +285,11 @@ public:
|
|||
"EncryptKeyProxyMetrics", id, SERVER_KNOBS->ENCRYPTION_LOGGING_INTERVAL, "EncryptKeyProxyMetrics");
|
||||
}
|
||||
|
||||
void setKMSHealthiness(bool canConnect) {
|
||||
canConnectToKms = canConnect;
|
||||
canConnectToKmsLastUpdatedTS = now();
|
||||
}
|
||||
|
||||
static EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
|
||||
const EncryptCipherDomainId domainId,
|
||||
const EncryptCipherBaseKeyId baseCipherId) {
|
||||
|
@ -493,11 +499,11 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
|
|||
}
|
||||
}
|
||||
if (keysByIdsRep.cipherKeyDetails.size() > 0) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(true);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (isKmsConnectionError(e)) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(false);
|
||||
}
|
||||
|
||||
if (!canReplyWith(e)) {
|
||||
|
@ -641,11 +647,11 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
|
|||
}
|
||||
}
|
||||
if (keysByDomainIdRep.cipherKeyDetails.size() > 0) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(true);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (isKmsConnectionError(e)) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(false);
|
||||
}
|
||||
if (!canReplyWith(e)) {
|
||||
TraceEvent("GetLatestCipherKeys", ekpProxyData->myId).error(e);
|
||||
|
@ -684,8 +690,8 @@ bool isBlobMetadataEligibleForRefresh(const BlobMetadataDetailsRef& blobMetadata
|
|||
return nextRefreshCycleTS > blobMetadata.expireAt || nextRefreshCycleTS > blobMetadata.refreshAt;
|
||||
}
|
||||
|
||||
ACTOR Future<KMSHealthStatus> getHealthStatusImpl(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
ACTOR Future<bool> getHealthStatusImpl(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
state UID debugId = deterministicRandom()->randomUniqueID();
|
||||
if (DEBUG_ENCRYPT_KEY_PROXY) {
|
||||
TraceEvent(SevDebug, "KMSHealthCheckStart", ekpProxyData->myId);
|
||||
|
@ -716,31 +722,56 @@ ACTOR Future<KMSHealthStatus> getHealthStatusImpl(Reference<EncryptKeyProxyData>
|
|||
cipherDetails.encryptKCV,
|
||||
validityTS.refreshAtTS,
|
||||
validityTS.expAtTS);
|
||||
return KMSHealthStatus{ true, true, now() };
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "KMSHealthCheckError").error(e);
|
||||
if (!canReplyWith(e)) {
|
||||
throw;
|
||||
}
|
||||
++ekpProxyData->numHealthCheckErrors;
|
||||
return KMSHealthStatus{ false, true, now() };
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateHealthStatusImpl(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
// If the health check status has been updated recently avoid doing another refresh
|
||||
if (now() - ekpProxyData->kmsConnectorHealthStatus.lastUpdatedTS < FLOW_KNOBS->ENCRYPT_KEY_HEALTH_CHECK_INTERVAL) {
|
||||
if (now() - ekpProxyData->canConnectToKmsLastUpdatedTS < FLOW_KNOBS->ENCRYPT_KEY_HEALTH_CHECK_INTERVAL) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
KMSHealthStatus status = wait(getHealthStatusImpl(ekpProxyData, kmsConnectorInf));
|
||||
if (status != ekpProxyData->kmsConnectorHealthStatus) {
|
||||
bool canConnectToKms = wait(getHealthStatusImpl(ekpProxyData, kmsConnectorInf));
|
||||
if (canConnectToKms != ekpProxyData->canConnectToKms) {
|
||||
TraceEvent("KmsConnectorHealthStatusChange")
|
||||
.detail("OldStatus", ekpProxyData->kmsConnectorHealthStatus.toString())
|
||||
.detail("NewStatus", status.toString());
|
||||
.detail("OldStatus", ekpProxyData->canConnectToKms)
|
||||
.detail("NewStatus", canConnectToKms);
|
||||
}
|
||||
ekpProxyData->kmsConnectorHealthStatus = status;
|
||||
ekpProxyData->setKMSHealthiness(canConnectToKms);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getEKPStatus(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf,
|
||||
EncryptKeyProxyHealthStatusRequest req) {
|
||||
state KMSHealthStatus status;
|
||||
status.canConnectToEKP = true;
|
||||
status.canConnectToKms = ekpProxyData->canConnectToKms;
|
||||
status.lastUpdatedTS = ekpProxyData->canConnectToKmsLastUpdatedTS;
|
||||
status.kmsConnectorType = ekpProxyData->kmsConnector->getConnectorStr();
|
||||
|
||||
KmsConnGetKMSStateReq getKMSStateReq;
|
||||
try {
|
||||
KmsConnGetKMSStateRep getKMSStateRep = wait(kmsConnectorInf.getKMSStateReq.getReply(getKMSStateReq));
|
||||
for (const auto& url : getKMSStateRep.restKMSUrls) {
|
||||
status.restKMSUrls.push_back(url.toString());
|
||||
}
|
||||
status.kmsStable = getKMSStateRep.kmsStable;
|
||||
req.reply.send(status);
|
||||
} catch (Error& e) {
|
||||
TraceEvent("EKPGetKMSStateFailed", ekpProxyData->myId).error(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -810,13 +841,13 @@ ACTOR Future<Void> refreshEncryptionKeysImpl(Reference<EncryptKeyProxyData> ekpP
|
|||
|
||||
ekpProxyData->baseCipherKeysRefreshed += rep.cipherKeyDetails.size();
|
||||
if (rep.cipherKeyDetails.size() > 0) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(true);
|
||||
}
|
||||
t.detail("NumKeys", rep.cipherKeyDetails.size());
|
||||
CODE_PROBE(!rep.cipherKeyDetails.empty(), "EKP refresh cipherKeys");
|
||||
} catch (Error& e) {
|
||||
if (isKmsConnectionError(e)) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(false);
|
||||
}
|
||||
if (!canReplyWith(e)) {
|
||||
TraceEvent(SevWarn, "RefreshEKsError").error(e);
|
||||
|
@ -905,11 +936,11 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
}
|
||||
}
|
||||
if (kmsRep.metadataDetails.size() > 0) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(true);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (isKmsConnectionError(e)) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(false);
|
||||
}
|
||||
|
||||
if (!canReplyWith(e)) {
|
||||
|
@ -969,12 +1000,12 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
|
|||
|
||||
ekpProxyData->blobMetadataRefreshed += rep.metadataDetails.size();
|
||||
if (rep.metadataDetails.size() > 0) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ true, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(true);
|
||||
}
|
||||
t.detail("nKeys", rep.metadataDetails.size());
|
||||
} catch (Error& e) {
|
||||
if (isKmsConnectionError(e)) {
|
||||
ekpProxyData->kmsConnectorHealthStatus = KMSHealthStatus{ false, true, now() };
|
||||
ekpProxyData->setKMSHealthiness(false);
|
||||
}
|
||||
|
||||
if (!canReplyWith(e)) {
|
||||
|
@ -1072,7 +1103,7 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
|
|||
}
|
||||
when(EncryptKeyProxyHealthStatusRequest req = waitNext(ekpInterface.getHealthStatus.getFuture())) {
|
||||
ASSERT(encryptMode.isEncryptionEnabled() || SERVER_KNOBS->ENABLE_REST_KMS_COMMUNICATION);
|
||||
req.reply.send(self->kmsConnectorHealthStatus);
|
||||
self->addActor.send(getEKPStatus(self, kmsConnectorInf, req));
|
||||
}
|
||||
when(wait(collection)) {
|
||||
ASSERT(false);
|
||||
|
|
|
@ -222,10 +222,14 @@ struct RESTKmsConnectorCtx : public ReferenceCounted<RESTKmsConnectorCtx> {
|
|||
RESTClient restClient;
|
||||
ValidationTokenMap validationTokenMap;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
bool kmsStable;
|
||||
Future<Void> kmsStabilityChecker;
|
||||
|
||||
RESTKmsConnectorCtx()
|
||||
: uid(deterministicRandom()->randomUniqueID()), lastKmsUrlsRefreshTs(0), lastKmsUrlDiscoverTS(0.0) {}
|
||||
explicit RESTKmsConnectorCtx(const UID& id) : uid(id), lastKmsUrlsRefreshTs(0), lastKmsUrlDiscoverTS(0.0) {}
|
||||
: uid(deterministicRandom()->randomUniqueID()), lastKmsUrlsRefreshTs(0), lastKmsUrlDiscoverTS(0.0),
|
||||
kmsStable(true) {}
|
||||
explicit RESTKmsConnectorCtx(const UID& id)
|
||||
: uid(id), lastKmsUrlsRefreshTs(0), lastKmsUrlDiscoverTS(0.0), kmsStable(true) {}
|
||||
};
|
||||
|
||||
std::string getFullRequestUrl(Reference<RESTKmsConnectorCtx> ctx, const std::string& url, const std::string& suffix) {
|
||||
|
@ -1117,12 +1121,47 @@ ACTOR Future<Void> procureValidationTokens(Reference<RESTKmsConnectorCtx> ctx) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Check if KMS is table by checking request failure count from RESTClient metrics.
|
||||
// Will clear RESTClient metrics afterward, assuming it is the only user of the metrics.
|
||||
//
|
||||
// TODO(yiwu): make RESTClient periodically report and clear the stats.
|
||||
void updateKMSStability(Reference<RESTKmsConnectorCtx> self) {
|
||||
bool stable = true;
|
||||
for (auto& s : self->restClient.statsMap) {
|
||||
if (s.second->requests_failed > 0) {
|
||||
stable = false;
|
||||
}
|
||||
s.second->clear();
|
||||
}
|
||||
self->kmsStable = stable;
|
||||
}
|
||||
|
||||
Future<Void> getKMSState(Reference<RESTKmsConnectorCtx> self, KmsConnGetKMSStateReq req) {
|
||||
KmsConnGetKMSStateRep reply;
|
||||
reply.kmsStable = self->kmsStable;
|
||||
|
||||
try {
|
||||
reply.restKMSUrls.reserve(reply.arena, self->kmsUrlStore.kmsUrls.size());
|
||||
for (const auto& url : self->kmsUrlStore.kmsUrls) {
|
||||
reply.restKMSUrls.emplace_back(reply.arena, url.toString());
|
||||
}
|
||||
req.reply.send(reply);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "RestKMSGetKMSStateFailed", self->uid).error(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> restConnectorCoreImpl(KmsConnectorInterface interf) {
|
||||
state Reference<RESTKmsConnectorCtx> self = makeReference<RESTKmsConnectorCtx>(interf.id());
|
||||
state Future<Void> collection = actorCollection(self->addActor.getFuture());
|
||||
|
||||
TraceEvent("RESTKmsConnectorInit", self->uid).log();
|
||||
|
||||
self->kmsStabilityChecker =
|
||||
recurring([self = self]() { updateKMSStability(self); }, SERVER_KNOBS->REST_KMS_STABILITY_CHECK_INTERVAL);
|
||||
wait(discoverKmsUrls(self, RefreshPersistedUrls::False));
|
||||
wait(procureValidationTokens(self));
|
||||
|
||||
|
@ -1137,6 +1176,9 @@ ACTOR Future<Void> restConnectorCoreImpl(KmsConnectorInterface interf) {
|
|||
when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) {
|
||||
self->addActor.send(fetchBlobMetadata(self, req));
|
||||
}
|
||||
when(KmsConnGetKMSStateReq req = waitNext(interf.getKMSStateReq.getFuture())) {
|
||||
self->addActor.send(getKMSState(self, req));
|
||||
}
|
||||
when(wait(collection)) {
|
||||
// this should throw an error, not complete
|
||||
ASSERT(false);
|
||||
|
|
|
@ -247,6 +247,9 @@ ACTOR Future<Void> simconnectorCoreImpl(KmsConnectorInterface interf) {
|
|||
when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) {
|
||||
addActor.send(blobMetadataLookup(interf, req));
|
||||
}
|
||||
when(KmsConnGetKMSStateReq req = waitNext(interf.getKMSStateReq.getFuture())) {
|
||||
req.reply.send(KmsConnGetKMSStateRep());
|
||||
}
|
||||
when(wait(collection)) {
|
||||
// this should throw an error, not complete
|
||||
ASSERT(false);
|
||||
|
|
|
@ -2988,9 +2988,13 @@ ACTOR Future<std::pair<Optional<StorageWiggleMetrics>, Optional<StorageWiggleMet
|
|||
}
|
||||
|
||||
ACTOR Future<KMSHealthStatus> getKMSHealthStatus(Reference<const AsyncVar<ServerDBInfo>> db) {
|
||||
state KMSHealthStatus unhealthy;
|
||||
unhealthy.canConnectToEKP = false;
|
||||
unhealthy.canConnectToKms = false;
|
||||
unhealthy.lastUpdatedTS = now();
|
||||
try {
|
||||
if (!db->get().client.encryptKeyProxy.present()) {
|
||||
return KMSHealthStatus{ false, false, now() };
|
||||
return unhealthy;
|
||||
}
|
||||
KMSHealthStatus reply = wait(timeoutError(
|
||||
db->get().client.encryptKeyProxy.get().getHealthStatus.getReply(EncryptKeyProxyHealthStatusRequest()),
|
||||
|
@ -3000,7 +3004,7 @@ ACTOR Future<KMSHealthStatus> getKMSHealthStatus(Reference<const AsyncVar<Server
|
|||
if (e.code() != error_code_timed_out) {
|
||||
throw;
|
||||
}
|
||||
return KMSHealthStatus{ false, false, now() };
|
||||
return unhealthy;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3085,10 +3089,25 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
// Get EKP Health
|
||||
if (db->get().client.encryptKeyProxy.present()) {
|
||||
KMSHealthStatus status = wait(getKMSHealthStatus(db));
|
||||
JsonBuilderObject _statusObj;
|
||||
_statusObj["ekp_is_healthy"] = status.canConnectToEKP;
|
||||
statusObj["encryption_at_rest"] = _statusObj;
|
||||
statusObj["kms_is_healthy"] = status.canConnectToKms;
|
||||
|
||||
// encryption-at-rest status
|
||||
JsonBuilderObject earStatusObj;
|
||||
earStatusObj["ekp_is_healthy"] = status.canConnectToEKP;
|
||||
statusObj["encryption_at_rest"] = earStatusObj;
|
||||
|
||||
JsonBuilderObject kmsStatusObj;
|
||||
kmsStatusObj["kms_is_healthy"] = status.canConnectToKms;
|
||||
if (status.canConnectToEKP) {
|
||||
kmsStatusObj["kms_connector_type"] = status.kmsConnectorType;
|
||||
kmsStatusObj["kms_stable"] = status.kmsStable;
|
||||
JsonBuilderArray kmsUrlsArr;
|
||||
for (const auto& url : status.restKMSUrls) {
|
||||
kmsUrlsArr.push_back(url);
|
||||
}
|
||||
kmsStatusObj["kms_urls"] = kmsUrlsArr;
|
||||
}
|
||||
statusObj["kms"] = kmsStatusObj;
|
||||
|
||||
// TODO: In this scenario we should see if we can fetch any status fields that don't depend on encryption
|
||||
if (!status.canConnectToKms || !status.canConnectToEKP) {
|
||||
return StatusReply(statusObj.getJson());
|
||||
|
|
|
@ -37,6 +37,7 @@ struct KmsConnectorInterface {
|
|||
RequestStream<struct KmsConnLookupEKsByKeyIdsReq> ekLookupByIds;
|
||||
RequestStream<struct KmsConnLookupEKsByDomainIdsReq> ekLookupByDomainIds;
|
||||
RequestStream<struct KmsConnBlobMetadataReq> blobMetadataReq;
|
||||
RequestStream<struct KmsConnGetKMSStateReq> getKMSStateReq;
|
||||
|
||||
KmsConnectorInterface() {}
|
||||
|
||||
|
@ -54,6 +55,8 @@ struct KmsConnectorInterface {
|
|||
RequestStream<struct KmsConnLookupEKsByDomainIdsReq>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
|
||||
blobMetadataReq =
|
||||
RequestStream<struct KmsConnBlobMetadataReq>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
|
||||
getKMSStateReq =
|
||||
RequestStream<struct KmsConnGetKMSStateReq>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,6 +66,7 @@ struct KmsConnectorInterface {
|
|||
streams.push_back(ekLookupByIds.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(ekLookupByDomainIds.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(blobMetadataReq.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(getKMSStateReq.getReceiver(TaskPriority::Worker));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -221,4 +225,30 @@ struct KmsConnBlobMetadataReq {
|
|||
}
|
||||
};
|
||||
|
||||
struct KmsConnGetKMSStateRep {
|
||||
constexpr static FileIdentifier file_identifier = 111862;
|
||||
Arena arena;
|
||||
VectorRef<StringRef> restKMSUrls;
|
||||
bool kmsStable;
|
||||
|
||||
KmsConnGetKMSStateRep() = default;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, restKMSUrls, kmsStable, arena);
|
||||
}
|
||||
};
|
||||
|
||||
struct KmsConnGetKMSStateReq {
|
||||
constexpr static FileIdentifier file_identifier = 2349929;
|
||||
ReplyPromise<KmsConnGetKMSStateRep> reply;
|
||||
|
||||
KmsConnGetKMSStateReq() = default;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -323,10 +323,10 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
// REST Client
|
||||
init( RESTCLIENT_MAX_CONNECTIONPOOL_SIZE, 10 );
|
||||
init( RESTCLIENT_CONNECT_TRIES, 10 );
|
||||
init( RESTCLIENT_CONNECT_TIMEOUT, 10 );
|
||||
init( RESTCLIENT_CONNECT_TIMEOUT, 1 );
|
||||
init( RESTCLIENT_MAX_CONNECTION_LIFE, 120 );
|
||||
init( RESTCLIENT_REQUEST_TRIES, 10 );
|
||||
init( RESTCLIENT_REQUEST_TIMEOUT_SEC, 120 );
|
||||
init( RESTCLIENT_REQUEST_TIMEOUT_SEC, 6 );
|
||||
init( REST_LOG_LEVEL, 3 );
|
||||
}
|
||||
// clang-format on
|
||||
|
|
|
@ -385,7 +385,7 @@ public:
|
|||
int RESTCLIENT_MAX_CONNECTIONPOOL_SIZE;
|
||||
int RESTCLIENT_CONNECT_TRIES;
|
||||
int RESTCLIENT_CONNECT_TIMEOUT;
|
||||
int RESTCLIENT_MAX_CONNECTION_LIFE;
|
||||
int RESTCLIENT_MAX_CONNECTION_LIFE; // Not implemented yet.
|
||||
int RESTCLIENT_REQUEST_TRIES;
|
||||
int RESTCLIENT_REQUEST_TIMEOUT_SEC;
|
||||
int REST_LOG_LEVEL;
|
||||
|
|
Loading…
Reference in New Issue