FastRestore:Applier:Add metrics counter and proc counter
This commit is contained in:
parent
1fc793d6a7
commit
dbce1e9974
|
@ -551,6 +551,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
|
||||
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
|
||||
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
|
||||
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -493,6 +493,7 @@ public:
|
|||
int64_t FASTRESTORE_VB_MONITOR_DELAY; // How quickly monitor finished version batch
|
||||
int64_t FASTRESTORE_VB_LAUNCH_DELAY;
|
||||
int64_t FASTRESTORE_ROLE_LOGGING_DELAY;
|
||||
int64_t FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL; // How quickly to update process metrics for restore
|
||||
|
||||
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
|
||||
};
|
||||
|
|
|
@ -42,9 +42,13 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int nodeIndex, Database cx) {
|
||||
state Reference<RestoreApplierData> self =
|
||||
Reference<RestoreApplierData>(new RestoreApplierData(applierInterf.id(), nodeIndex));
|
||||
|
||||
state ActorCollection actors(false);
|
||||
state Future<Void> exitRole = Never();
|
||||
state double updateProcessStatsDelay = SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL;
|
||||
state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
|
||||
|
||||
actors.add(traceProcessMetrics(self, "Applier"));
|
||||
|
||||
loop {
|
||||
state std::string requestTypeStr = "[Init]";
|
||||
|
||||
|
@ -74,6 +78,10 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
exitRole = Void();
|
||||
}
|
||||
}
|
||||
when(wait(updateProcessStatsTimer)) {
|
||||
updateProcessStats(self);
|
||||
updateProcessStatsTimer = delay(updateProcessStatsDelay);
|
||||
}
|
||||
when(wait(exitRole)) {
|
||||
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id());
|
||||
break;
|
||||
|
@ -132,6 +140,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
.detail("Version", commitVersion)
|
||||
.detail("Index", mIndex)
|
||||
.detail("MutationReceived", mutation.toString());
|
||||
batchData->counters.receivedBytes += mutation.totalSize();
|
||||
batchData->counters.receivedMutations += 1;
|
||||
// Sanity check
|
||||
if (g_network->isSimulated()) {
|
||||
if (isRangeMutation(mutation)) {
|
||||
|
@ -174,8 +184,9 @@ struct DBApplyProgress {
|
|||
|
||||
// Decide when to commit a transaction. We buffer enough mutations in a txn before commit the txn
|
||||
bool startNextVersion; // The next txn will include mutations in next version
|
||||
int numAtomicOps;
|
||||
double transactionSize;
|
||||
int numAtomicOps; // Status counter
|
||||
double txnBytes; // Decide when to commit a txn
|
||||
double txnMutations; // Status counter
|
||||
|
||||
Reference<ApplierBatchData> batchData;
|
||||
UID applierId;
|
||||
|
@ -183,7 +194,8 @@ struct DBApplyProgress {
|
|||
DBApplyProgress() = default;
|
||||
explicit DBApplyProgress(UID applierId, Reference<ApplierBatchData> batchData)
|
||||
: applierId(applierId), batchData(batchData), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0),
|
||||
uncommittedTxnId(0), lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), transactionSize(0) {
|
||||
uncommittedTxnId(0), lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), txnBytes(0),
|
||||
txnMutations(0) {
|
||||
curItInCurTxn = batchData->kvOps.begin();
|
||||
while (curItInCurTxn != batchData->kvOps.end() && curItInCurTxn->second.empty()) {
|
||||
curItInCurTxn++;
|
||||
|
@ -206,7 +218,8 @@ struct DBApplyProgress {
|
|||
|
||||
// Setup for the next transaction; This should be done after nextMutation()
|
||||
void nextTxn() {
|
||||
transactionSize = 0;
|
||||
txnBytes = 0;
|
||||
txnMutations = 0;
|
||||
numAtomicOps = 0;
|
||||
lastTxnHasError = false;
|
||||
startNextVersion = false;
|
||||
|
@ -235,15 +248,15 @@ struct DBApplyProgress {
|
|||
curTxnId = uncommittedTxnId;
|
||||
|
||||
numAtomicOps = 0;
|
||||
transactionSize = 0;
|
||||
txnBytes = 0;
|
||||
txnMutations = 0;
|
||||
startNextVersion = false;
|
||||
lastTxnHasError = false;
|
||||
}
|
||||
|
||||
bool shouldCommit() {
|
||||
return (!lastTxnHasError &&
|
||||
(startNextVersion || transactionSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES ||
|
||||
curItInCurTxn == batchData->kvOps.end()));
|
||||
return (!lastTxnHasError && (startNextVersion || txnBytes >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES ||
|
||||
curItInCurTxn == batchData->kvOps.end()));
|
||||
}
|
||||
|
||||
bool hasError() { return lastTxnHasError; }
|
||||
|
@ -370,8 +383,8 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
.detail("Version", progress.curItInCurTxn->first)
|
||||
.detail("Index", progress.curIndexInCurTxn)
|
||||
.detail("Mutation", m.toString())
|
||||
.detail("MutationSize", m.expectedSize())
|
||||
.detail("TxnSize", progress.transactionSize);
|
||||
.detail("MutationSize", m.totalSize())
|
||||
.detail("TxnSize", progress.txnBytes);
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
tr->set(m.param1, m.param2);
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
|
@ -386,7 +399,8 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
.detail("TypeName", typeStr);
|
||||
}
|
||||
|
||||
progress.transactionSize += m.expectedSize();
|
||||
progress.txnBytes += m.totalSize(); // Changed expectedSize to totalSize
|
||||
progress.txnMutations += 1;
|
||||
|
||||
progress.nextMutation(); // Prepare for the next mutation
|
||||
// commit per FASTRESTORE_TXN_BATCH_MAX_BYTES bytes; and commit does not cross version boundary
|
||||
|
@ -399,6 +413,10 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
// Commit the txn and prepare the starting point for next txn
|
||||
if (progress.shouldCommit()) {
|
||||
wait(tr->commit());
|
||||
// Update status counter appliedBytes, appliedMutations, atomicOps
|
||||
batchData->counters.appliedBytes += progress.txnBytes;
|
||||
batchData->counters.appliedTxns += progress.txnMutations;
|
||||
batchData->counters.appliedAtomicOps += progress.numAtomicOps;
|
||||
}
|
||||
|
||||
if (progress.isDone()) { // Are all mutations processed?
|
||||
|
@ -414,9 +432,6 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
.detail("Version", progress.curItInCurTxn->first)
|
||||
.error(e, true);
|
||||
progress.lastTxnHasError = true;
|
||||
// if (e.code() == commit_unknown_result) {
|
||||
// lastTxnHasError = true;
|
||||
// }
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,10 +46,30 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
Optional<Future<Void>> dbApplier;
|
||||
VersionedMutationsMap kvOps; // Mutations at each version
|
||||
|
||||
Future<Void> pollMetrics;
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
Counter receivedBytes, receivedMutations;
|
||||
Counter appliedBytes, appliedMutations, appliedAtomicOps;
|
||||
Counter appliedTxns;
|
||||
|
||||
Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex)
|
||||
: cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)),
|
||||
receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc),
|
||||
appliedBytes("AppliedBytes", cc), appliedMutations("AppliedMutations", cc),
|
||||
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc) {}
|
||||
} counters;
|
||||
|
||||
void addref() { return ReferenceCounted<ApplierBatchData>::addref(); }
|
||||
void delref() { return ReferenceCounted<ApplierBatchData>::delref(); }
|
||||
|
||||
ApplierBatchData() = default;
|
||||
explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) {
|
||||
pollMetrics =
|
||||
traceCounters("RestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
|
||||
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
|
||||
}
|
||||
~ApplierBatchData() = default;
|
||||
|
||||
void reset() {
|
||||
|
@ -116,7 +136,7 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
|
|||
|
||||
void initVersionBatch(int batchIndex) {
|
||||
TraceEvent("FastRestoreApplierInitVersionBatch", id()).detail("BatchIndex", batchIndex);
|
||||
batch[batchIndex] = Reference<ApplierBatchData>(new ApplierBatchData());
|
||||
batch[batchIndex] = Reference<ApplierBatchData>(new ApplierBatchData(nodeID, batchIndex));
|
||||
}
|
||||
|
||||
void resetPerRestoreRequest() {
|
||||
|
|
|
@ -59,11 +59,12 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
|
||||
state Reference<RestoreLoaderData> self =
|
||||
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex));
|
||||
|
||||
state ActorCollection actors(false);
|
||||
state Future<Void> exitRole = Never();
|
||||
state double updateProcessStatsDelay = SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL;
|
||||
state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
|
||||
|
||||
actors.add(traceCounters("RestoreLoaderMetrics", self->id(), SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->nodeId.toString() + "/RestoreLoaderMetrics"));
|
||||
actors.add(traceProcessMetrics(self, "Loader"));
|
||||
|
||||
loop {
|
||||
state std::string requestTypeStr = "[Init]";
|
||||
|
@ -98,6 +99,10 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
exitRole = Void();
|
||||
}
|
||||
}
|
||||
when(wait(updateProcessStatsTimer)) {
|
||||
updateProcessStats(self);
|
||||
updateProcessStatsTimer = delay(updateProcessStatsDelay);
|
||||
}
|
||||
when(wait(exitRole)) {
|
||||
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id());
|
||||
break;
|
||||
|
|
|
@ -54,6 +54,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
|
|||
std::map<LoadingParam, MutationsVec> sampleMutations;
|
||||
int numSampledMutations; // The total number of mutations received from sampled data.
|
||||
|
||||
Future<Void> pollMetrics;
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
|
@ -66,7 +68,11 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
|
|||
sampledBytes("SampledBytes", cc) {}
|
||||
} counters;
|
||||
|
||||
explicit LoaderBatchData(UID loaderInterfID, int batchIndex) : counters(this, loaderInterfID, batchIndex) {}
|
||||
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) {
|
||||
pollMetrics =
|
||||
traceCounters("RestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
|
||||
nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));
|
||||
}
|
||||
|
||||
void reset() {
|
||||
processedFileParams.clear();
|
||||
|
|
|
@ -73,6 +73,36 @@ ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req,
|
|||
return Void();
|
||||
}
|
||||
|
||||
void updateProcessStats(Reference<RestoreRoleData> self) {
|
||||
if (g_network->isSimulated()) {
|
||||
// memUsage and cpuUsage are not relevant in the simulator,
|
||||
// and relying on the actual values could break seed determinism
|
||||
self->cpuUsage = 100.0;
|
||||
self->memory = 100.0;
|
||||
self->residentMemory = 100.0;
|
||||
return;
|
||||
}
|
||||
|
||||
SystemStatistics sysStats = getSystemStatistics();
|
||||
if (sysStats.initialized) {
|
||||
self->cpuUsage = 100 * sysStats.processCPUSeconds / sysStats.elapsed;
|
||||
self->memory = sysStats.processMemory;
|
||||
self->residentMemory = sysStats.processResidentMemory;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role) {
|
||||
loop {
|
||||
TraceEvent("FastRestoreTraceProcessMetrics")
|
||||
.detail("Role", role)
|
||||
.detail("Node", self->nodeID)
|
||||
.detail("CpuUsage", self->cpuUsage)
|
||||
.detail("UsedMemory", self->memory)
|
||||
.detail("ResidentMemory", self->residentMemory);
|
||||
wait(delay(SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY));
|
||||
}
|
||||
}
|
||||
|
||||
//-------Helper functions
|
||||
std::string getHexString(StringRef input) {
|
||||
std::stringstream ss;
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
|
||||
#include <sstream>
|
||||
#include "flow/Stats.h"
|
||||
#include "flow/SystemMonitor.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
|
@ -109,6 +110,10 @@ public:
|
|||
UID nodeID;
|
||||
int nodeIndex;
|
||||
|
||||
double cpuUsage;
|
||||
double memory;
|
||||
double residentMemory;
|
||||
|
||||
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
|
||||
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
|
||||
|
||||
|
@ -116,7 +121,7 @@ public:
|
|||
|
||||
bool versionBatchStart = false;
|
||||
|
||||
RestoreRoleData() : role(RestoreRole::Invalid){};
|
||||
RestoreRoleData() : role(RestoreRole::Invalid), cpuUsage(0.0), memory(0.0), residentMemory(0.0){};
|
||||
|
||||
virtual ~RestoreRoleData() {}
|
||||
|
||||
|
@ -134,5 +139,8 @@ public:
|
|||
virtual std::string describeNode() = 0;
|
||||
};
|
||||
|
||||
void updateProcessStats(Reference<RestoreRoleData> self);
|
||||
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue