Add .cluster.idempotency_ids to status json

This commit is contained in:
Andrew Noyes 2022-12-14 07:32:59 -08:00
parent a0b9646a73
commit 91a2010a34
5 changed files with 84 additions and 3 deletions

View File

@ -902,6 +902,13 @@
"num_tenants" : 1, // on data cluster, local count; on management cluster, total metacluster count
"num_tenant_groups" : 10,
"tenant_group_capacity" : 20,
},
"idempotency_ids" : {
"size_bytes" : 0, // An estimate of the current number of bytes used in the database to store idempotency ids.
"expired_version" : 0, // The commit status of a transaction whose commit version could be <= expired_version can no longer be determined.
"expired_age" : 0, // The age in seconds of expired_version.
"oldest_id_version" : 0, // The version of the oldest idempotency id still stored in the database.
"oldest_id_age" : 0 // The age in seconds of the oldest_id_version.
}
},
"client":{

View File

@ -203,3 +203,64 @@ void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrder
commitVersion = bigEndian64(commitVersion);
reader >> highOrderBatchIndex;
}
// Find the youngest or oldest idempotency id key in `range` (depending on `reverse`)
// Write the timestamp to `*time` and the version to `*version` when non-null.
ACTOR static Future<Optional<Key>> getBoundary(Reference<ReadYourWritesTransaction> tr,
KeyRange range,
Reverse reverse,
Version* version,
int64_t* time) {
RangeResult result = wait(tr->getRange(range, /*limit*/ 1, Snapshot::False, reverse));
if (!result.size()) {
return Optional<Key>();
}
if (version != nullptr) {
BinaryReader rd(result.front().key, Unversioned());
rd.readBytes(idempotencyIdKeys.begin.size());
rd >> *version;
*version = bigEndian64(*version);
}
if (time != nullptr) {
BinaryReader rd(result.front().value, IncludeVersion());
rd >> *time;
}
return result.front().key;
}
ACTOR Future<JsonBuilderObject> getIdmpKeyStatus(Database db) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
state int64_t size;
state IdempotencyIdsExpiredVersion expired;
state KeyBackedObjectProperty<IdempotencyIdsExpiredVersion, _Unversioned> expiredKey(idempotencyIdsExpiredVersion,
Unversioned());
state int64_t oldestIdVersion = 0;
state int64_t oldestIdTime = 0;
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
wait(store(size, tr->getEstimatedRangeSizeBytes(idempotencyIdKeys)) &&
store(expired, expiredKey.getD(tr)) &&
success(getBoundary(tr, idempotencyIdKeys, Reverse::False, &oldestIdVersion, &oldestIdTime)));
JsonBuilderObject result;
result["size_bytes"] = size;
if (expired.expired != 0) {
result["expired_version"] = expired.expired;
}
if (expired.expiredTime != 0) {
result["expired_age"] = int64_t(now()) - expired.expiredTime;
}
if (oldestIdVersion != 0) {
result["oldest_id_version"] = oldestIdVersion;
}
if (oldestIdTime != 0) {
result["oldest_id_age"] = int64_t(now()) - oldestIdTime;
}
return result;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}

View File

@ -602,8 +602,6 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"description":"abc"
}
],
)statusSchema"
R"statusSchema(
"recovery_state":{
"seconds_since_last_recovered":1,
"required_resolvers":1,
@ -976,6 +974,13 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"num_tenants":0,
"num_tenant_groups":10,
"tenant_group_capacity":20
},
"idempotency_ids":{
"size_bytes": 0,
"expired_version": 0,
"expired_age": 0,
"oldest_id_version": 0,
"oldest_id_age": 0
}
},
"client":{

View File

@ -29,6 +29,7 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/JsonBuilder.h"
#include "fdbclient/PImpl.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
@ -44,10 +45,11 @@ struct CommitResult {
struct IdempotencyIdsExpiredVersion {
static constexpr auto file_identifier = 3746945;
Version expired = 0;
int64_t expiredTime = 0;
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, expired);
serializer(ar, expired, expiredTime);
}
};
@ -185,5 +187,7 @@ KeyRangeRef makeIdempotencySingleKeyRange(Arena& arena, Version version, uint8_t
void decodeIdempotencyKey(KeyRef key, Version& commitVersion, uint8_t& highOrderBatchIndex);
ACTOR Future<JsonBuilderObject> getIdmpKeyStatus(Database db);
#include "flow/unactorcompiler.h"
#endif

View File

@ -3124,6 +3124,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state JsonBuilderObject recoveryStateStatus = wait(
recoveryStateStatusFetcher(cx, ccWorker, mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
state JsonBuilderObject idmpKeyStatus = wait(getIdmpKeyStatus(cx));
// machine metrics
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();
// process metrics
@ -3505,6 +3507,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
if (!recoveryStateStatus.empty())
statusObj["recovery_state"] = recoveryStateStatus;
statusObj["idempotency_ids"] = idmpKeyStatus;
// cluster messages subsection;
JsonBuilderArray clientIssuesArr = getClientIssuesAsMessages(clientStatus);
if (clientIssuesArr.size() > 0) {