FastRestore:Add transaction rate info tracer
This commit is contained in:
parent
002b1bec4c
commit
b4254473d7
|
@ -651,6 +651,25 @@ void handleUpdateRateRequest(RestoreUpdateRateRequest req, Reference<RestoreAppl
|
|||
return;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> traceRate(Reference<ApplierBatchData> batchData, int batchIndex, UID nodeID) {
|
||||
ASSERT(batchData.isValid());
|
||||
loop {
|
||||
if (!batchData.isValid()) {
|
||||
break;
|
||||
}
|
||||
TraceEvent("FastRestoreApplierTransactionRateControl", nodeID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("TotalDataToWriteMB", batchData->dataMbToWrite)
|
||||
.detail("ApplierBytesMB", batchData->appliedBytes / 1024 / 1024)
|
||||
.detail("TargetBytesMB", batchData->targetWriteRateMB)
|
||||
.detail("InflightBytesMB", batchData->applyingDataBytes)
|
||||
.detail("ReceivedBytes", batchData->receivedBytes);
|
||||
wait(delay(5.0));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
||||
Database cx) {
|
||||
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBStart", self->id())
|
||||
|
@ -679,6 +698,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
batchData->dbApplier = Never();
|
||||
batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx);
|
||||
batchData->vbState = ApplierVersionBatchState::WRITE_TO_DB;
|
||||
batchData->rateTracer = traceRate(batchData, req.batchIndex, self->id());
|
||||
}
|
||||
|
||||
ASSERT(batchData->dbApplier.present());
|
||||
|
|
|
@ -248,11 +248,6 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
std::map<Key, StagingKey> stagingKeys;
|
||||
std::set<StagingKeyRange> stagingKeyRanges;
|
||||
FlowLock applyStagingKeysBatchLock;
|
||||
double targetWriteRateMB; // target amount of data outstanding for DB;
|
||||
double currentWriteMB; // current write BW;
|
||||
double dataMbToWrite; // total amount of data in MB to write
|
||||
double applyingDataBytes; // amount of data in flight of committing
|
||||
AsyncTrigger releaseTxnTrigger; // trigger to release more txns
|
||||
|
||||
Future<Void> pollMetrics;
|
||||
|
||||
|
@ -263,6 +258,11 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
// Stats
|
||||
double receivedBytes; // received mutation size
|
||||
double appliedBytes; // after coalesce, how many bytes to write to DB
|
||||
double targetWriteRateMB; // target amount of data outstanding for DB;
|
||||
double dataMbToWrite; // total amount of data in MB to write
|
||||
double applyingDataBytes; // amount of data in flight of committing
|
||||
AsyncTrigger releaseTxnTrigger; // trigger to release more txns
|
||||
Future<Void> rateTracer; // trace transaction rate control info
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
|
@ -290,8 +290,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
explicit ApplierBatchData(UID nodeID, int batchIndex)
|
||||
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
|
||||
targetWriteRateMB(SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS),
|
||||
currentWriteMB(0), dataMbToWrite(-1), applyingDataBytes(0), vbState(ApplierVersionBatchState::NOT_INIT),
|
||||
receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
|
||||
dataMbToWrite(-1), applyingDataBytes(0), vbState(ApplierVersionBatchState::NOT_INIT), receiveMutationReqs(0),
|
||||
receivedBytes(0), appliedBytes(0) {
|
||||
pollMetrics = traceCounters(format("FastRestoreApplierMetrics%d", batchIndex), nodeID,
|
||||
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
|
||||
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
|
||||
|
|
Loading…
Reference in New Issue