Merge pull request #3435 from apple/release-6.3

Merge Release 6.3 to master
This commit is contained in:
Meng Xu 2020-06-30 10:08:28 -07:00 committed by GitHub
commit f3302833ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 123 additions and 64 deletions

View File

@ -138,7 +138,7 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
}
// Save each tag's savedVersion for all epochs into "bStatus".
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus) {
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus, bool logging) {
state Transaction tr(cx);
loop {
@ -156,12 +156,14 @@ ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupPro
const UID workerID = decodeBackupProgressKey(it.key);
const WorkerBackupStatus status = decodeBackupProgressValue(it.value);
bStatus->addBackupStatus(status);
TraceEvent("GotBackupProgress", dbgid)
.detail("BackupWorker", workerID)
.detail("Epoch", status.epoch)
.detail("Version", status.version)
.detail("Tag", status.tag.toString())
.detail("TotalTags", status.totalTags);
if (logging) {
TraceEvent("GotBackupProgress", dbgid)
.detail("BackupWorker", workerID)
.detail("Epoch", status.epoch)
.detail("Version", status.version)
.detail("Tag", status.tag.toString())
.detail("TotalTags", status.totalTags);
}
}
return Void();
} catch (Error& e) {

View File

@ -102,7 +102,7 @@ private:
Optional<Value> backupStartedValue;
};
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus);
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus, bool logging);
#include "flow/unactorcompiler.h"
#endif

View File

@ -561,7 +561,7 @@ ACTOR Future<Void> monitorBackupProgress(BackupData* self) {
// check all workers have started by checking their progress is larger
// than the backup's start version.
state Reference<BackupProgress> progress(new BackupProgress(self->myId, {}));
wait(getBackupProgress(self->cx, self->myId, progress));
wait(getBackupProgress(self->cx, self->myId, progress, /*logging=*/false));
state std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch);
state std::map<UID, Version> savedLogVersions;
if (tagVersions.size() != self->totalTags) {
@ -1087,7 +1087,11 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
if (e.code() == error_code_worker_removed) {
pull = Void(); // cancels pulling
self.stop();
wait(done);
try {
wait(done);
} catch (Error& e) {
TraceEvent("BackupWorkerShutdownError", self.myId).error(e, true);
}
}
TraceEvent("BackupWorkerTerminated", self.myId).error(err, true);
if (err.code() != error_code_actor_cancelled && err.code() != error_code_worker_removed) {

View File

@ -600,18 +600,18 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// Fast Restore
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 80 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_SAMPLING_PERCENT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 512.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1048576.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 10000 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS = deterministicRandom()->random01() * 240 + 10; }
init( FASTRESTORE_TRACK_REQUEST_LATENCY, false ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
@ -625,8 +625,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE, false ); if( randomize && BUGGIFY ) { FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE = deterministicRandom()->random01() < 0.5 ? true : false; }
init( FASTRESTORE_REQBATCH_PARALLEL, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_PARALLEL = deterministicRandom()->random01() * 100 + 1; }
init( FASTRESTORE_REQBATCH_LOG, false ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_LOG = deterministicRandom()->random01() < 0.2 ? true : false; }
init( FASTRESTORE_TXN_CLEAR_MAX, 1000 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
init( FASTRESTORE_TXN_CLEAR_MAX, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
init( FASTRESTORE_TXN_RETRY_MAX, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_RETRY_MAX = deterministicRandom()->random01() * 100 + 1; }
init( FASTRESTORE_TXN_EXTRA_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_EXTRA_DELAY = deterministicRandom()->random01() * 1 + 0.001;}
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );

View File

@ -559,6 +559,7 @@ public:
bool FASTRESTORE_REQBATCH_LOG; // verbose log information for getReplyBatches
int FASTRESTORE_TXN_CLEAR_MAX; // threshold to start tracking each clear op in a txn
int FASTRESTORE_TXN_RETRY_MAX; // threshold to start output error on too many retries
double FASTRESTORE_TXN_EXTRA_DELAY; // extra delay to avoid overwhelming fdb
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.

View File

@ -45,8 +45,8 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
Reference<RestoreApplierData>(new RestoreApplierData(applierInterf.id(), nodeIndex));
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
state Future<Void> updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreApplier"));
actors.add(traceRoleVersionBatchProgress(self, "RestoreApplier"));
@ -66,7 +66,9 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
}
when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) {
requestTypeStr = "applyToDB";
actors.add(handleApplyToDBRequest(req, self, cx));
actors.add(handleApplyToDBRequest(
req, self, cx)); // TODO: Check how FDB uses TaskPriority for ACTORS. We may need to add
// priority here to avoid requests at later VB block requests at earlier VBs
}
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
requestTypeStr = "initVersionBatch";
@ -79,10 +81,6 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
exitRole = Void();
}
}
when(wait(updateProcessStatsTimer)) {
updateProcessStats(self);
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
}
when(wait(actors.getResult())) {}
when(wait(exitRole)) {
TraceEvent("RestoreApplierCoreExitRole", self->id());
@ -113,14 +111,24 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
state bool printTrace = false;
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
// wait(delay(0.0, TaskPriority::RestoreApplierReceiveMutations)); // This hurts performance from 100MB/s to 60MB/s
// on circus
batchData->receiveMutationReqs += 1;
// Trace when the receive phase starts at a VB and when it finishes.
// This can help check if receiveMutations block applyMutation phase.
// If so, we need more sophisticated scheduler to ensure priority execution
printTrace = (batchData->receiveMutationReqs % 100 == 1);
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
.detail("Request", req.toString())
.detail("CurrentMemory", getSystemStatistics().processMemory)
.detail("PreviousVersionBatchState", batchData->vbState.get());
.detail("PreviousVersionBatchState", batchData->vbState.get())
.detail("ReceiveMutationRequests", batchData->receiveMutationReqs);
wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
@ -159,7 +167,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
}
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedMessageIndex", curMsgIndex.get())
@ -169,7 +177,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Clear all ranges in input ranges
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, double delayTime,
Database cx, UID applierID, int batchIndex) {
Database cx, UID applierID, int batchIndex,
ApplierBatchData::Counters* cc) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int retries = 0;
state double numOps = 0;
@ -186,6 +195,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
debugFRMutation("FastRestoreApplierApplyClearRangeMutation", 0,
MutationRef(MutationRef::ClearRange, range.begin, range.end));
tr->clear(range);
cc->clearOps += 1;
++numOps;
if (numOps >= SERVER_KNOBS->FASTRESTORE_TXN_CLEAR_MAX) {
TraceEvent(SevWarn, "FastRestoreApplierClearRangeMutationsTooManyClearsInTxn")
@ -196,6 +206,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
}
}
wait(tr->commit());
cc->clearTxns += 1;
break;
} catch (Error& e) {
retries++;
@ -214,7 +225,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
UID applierID, int batchIndex) {
UID applierID, int batchIndex, ApplierBatchData::Counters* cc) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<Future<Optional<Value>>> fValues(incompleteStagingKeys.size(), Never());
state int retries = 0;
@ -234,10 +245,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : incompleteStagingKeys) {
fValues[i++] = tr->get(key.first);
cc->fetchKeys += 1;
}
wait(waitForAll(fValues));
cc->fetchTxns += 1;
break;
} catch (Error& e) {
cc->fetchTxnRetries += 1;
if (retries++ > incompleteStagingKeys.size()) {
TraceEvent(SevWarnAlways, "GetAndComputeStagingKeys", applierID)
.suppressFor(1.0)
@ -304,14 +318,16 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
clearRanges.push_back_deep(clearRanges.arena(), range);
curTxnSize += range.expectedSize();
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
delayTime += 0.1;
fClearRanges.push_back(
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_EXTRA_DELAY;
clearRanges = Standalone<VectorRef<KeyRangeRef>>();
curTxnSize = 0;
}
}
if (curTxnSize > 0) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
fClearRanges.push_back(
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
}
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
@ -354,24 +370,23 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
int numKeysInBatch = 0;
double delayTime = 0; // Start transactions at different time to avoid overwelming FDB.
double delayTime = 0; // Start transactions at different time to avoid overwhelming FDB.
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
if (!stagingKeyIter->second.hasBaseValue()) {
incompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter);
batchData->counters.fetchKeys += 1;
numKeysInBatch++;
}
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
fGetAndComputeKeys.push_back(
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
delayTime += 0.1;
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
batchIndex, &batchData->counters));
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_EXTRA_DELAY;
numKeysInBatch = 0;
incompleteStagingKeys.clear();
}
}
if (numKeysInBatch > 0) {
fGetAndComputeKeys.push_back(
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
batchIndex, &batchData->counters));
}
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
@ -400,8 +415,9 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
// Apply mutations in batchData->stagingKeys [begin, end).
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
std::map<Key, StagingKey>::iterator end, Database cx,
FlowLock* applyStagingKeysBatchLock, UID applierID) {
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB));
FlowLock* applyStagingKeysBatchLock, UID applierID,
ApplierBatchData::Counters* cc) {
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int sets = 0;
@ -416,6 +432,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
while (iter != end) {
if (iter->second.type == MutationRef::SetValue) {
tr->set(iter->second.key, iter->second.val);
cc->appliedMutations += 1;
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
.detail("SetKey", iter->second.key);
sets++;
@ -428,6 +445,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
.detail("SubVersion", iter->second.version.sub);
}
tr->clear(singleKeyRange(iter->second.key));
cc->appliedMutations += 1;
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
.detail("ClearKey", iter->second.key);
clears++;
@ -449,8 +467,10 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
.detail("Sets", sets)
.detail("Clears", clears);
wait(tr->commit());
cc->appliedTxns += 1;
break;
} catch (Error& e) {
cc->appliedTxnRetries += 1;
wait(tr->onError(e));
}
}
@ -462,6 +482,7 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
Database cx) {
std::map<Key, StagingKey>::iterator begin = batchData->stagingKeys.begin();
std::map<Key, StagingKey>::iterator cur = begin;
state int txnBatches = 0;
double txnSize = 0;
std::vector<Future<Void>> fBatches;
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysStart", applierID)
@ -470,21 +491,28 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
while (cur != batchData->stagingKeys.end()) {
txnSize += cur->second.expectedMutationSize();
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
&batchData->counters));
batchData->counters.appliedBytes += txnSize;
begin = cur;
txnSize = 0;
txnBatches++;
}
cur++;
}
if (begin != batchData->stagingKeys.end()) {
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
&batchData->counters));
batchData->counters.appliedBytes += txnSize;
txnBatches++;
}
wait(waitForAll(fBatches));
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("StagingKeys", batchData->stagingKeys.size());
.detail("StagingKeys", batchData->stagingKeys.size())
.detail("TransactionBatches", txnBatches);
return Void();
}

View File

@ -250,20 +250,26 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
RoleVersionBatchState vbState;
long receiveMutationReqs;
// Status counters
struct Counters {
CounterCollection cc;
Counter receivedBytes, receivedWeightedBytes, receivedMutations, receivedAtomicOps;
Counter appliedWeightedBytes, appliedMutations, appliedAtomicOps;
Counter appliedTxns;
Counter fetchKeys; // number of keys to fetch from dest. FDB cluster.
Counter appliedBytes, appliedWeightedBytes, appliedMutations, appliedAtomicOps;
Counter appliedTxns, appliedTxnRetries;
Counter fetchKeys, fetchTxns, fetchTxnRetries; // number of keys to fetch from dest. FDB cluster.
Counter clearOps, clearTxns;
Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex)
: cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)),
receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc),
receivedAtomicOps("ReceivedAtomicOps", cc), receivedWeightedBytes("ReceivedWeightedMutations", cc),
appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc),
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc), fetchKeys("FetchKeys", cc) {}
appliedBytes("AppliedBytes", cc), appliedWeightedBytes("AppliedWeightedBytes", cc),
appliedMutations("AppliedMutations", cc), appliedAtomicOps("AppliedAtomicOps", cc),
appliedTxns("AppliedTxns", cc), appliedTxnRetries("AppliedTxnRetries", cc), fetchKeys("FetchKeys", cc),
fetchTxns("FetchTxns", cc), fetchTxnRetries("FetchTxnRetries", cc), clearOps("ClearOps", cc),
clearTxns("ClearTxns", cc) {}
} counters;
void addref() { return ReferenceCounted<ApplierBatchData>::addref(); }

View File

@ -66,8 +66,8 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex));
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
state Future<Void> updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreLoader"));
loop {
@ -107,10 +107,6 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
exitRole = Void();
}
}
when(wait(updateProcessStatsTimer)) {
updateProcessStats(self);
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
}
when(wait(actors.getResult())) {}
when(wait(exitRole)) {
TraceEvent("FastRestoreLoaderCoreExitRole", self->id());
@ -349,11 +345,15 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state bool isDuplicated = true;
state bool printTrace = false;
ASSERT(batchData.isValid());
bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end();
bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false;
TraceEvent(SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFile", self->id())
batchData->loadFileReqs += 1;
printTrace = (batchData->loadFileReqs % 10 == 1);
// TODO: Make the actor priority lower than sendMutation priority. (Unsure it will help performance though)
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFile", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString())
.detail("NotProcessed", !paramExist)
@ -382,10 +382,10 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
wait(it->second); // wait on the processing of the req.param.
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
TraceEvent(SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString());
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
return Void();
}
@ -458,6 +458,11 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
} else {
batchStatus->sendAllLogs = Void();
}
if ((batchStatus->sendAllRanges.present() && batchStatus->sendAllRanges.get().isReady()) &&
(batchStatus->sendAllLogs.present() && batchStatus->sendAllLogs.get().isReady())) {
// Both log and range files have been sent.
batchData->kvOpsPerLP.clear();
}
}
TraceEvent("FastRestoreLoaderPhaseSendMutationsDone", self->id())

View File

@ -77,6 +77,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
LoaderVersionBatchState vbState;
long loadFileReqs;
// Status counters
struct Counters {
CounterCollection cc;

View File

@ -81,6 +81,7 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
actors.add(updateHeartbeatTime(self));
actors.add(checkRolesLiveness(self));
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreMaster"));
wait(startProcessRestoreRequests(self, cx));
@ -511,7 +512,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
state Reference<MasterBatchStatus> batchStatus = self->batchStatus[batchIndex];
state double startTime = now();
TraceEvent("FastRestoreMasterDispatchVersionBatchesStart")
TraceEvent("FastRestoreMasterDispatchVersionBatchesStart", self->id())
.detail("BatchIndex", batchIndex)
.detail("BatchSize", versionBatch.size)
.detail("RunningVersionBatches", self->runningVersionBatches.get());
@ -562,7 +563,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
self->checkMemory.trigger();
}
TraceEvent("FastRestoreMasterDispatchVersionBatchesDone")
TraceEvent("FastRestoreMasterDispatchVersionBatchesDone", self->id())
.detail("BatchIndex", batchIndex)
.detail("BatchSize", versionBatch.size)
.detail("RunningVersionBatches", self->runningVersionBatches.get())

View File

@ -137,11 +137,20 @@ ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatch
return Void();
}
// Updated process metrics will be used by scheduler for throttling as well
ACTOR Future<Void> updateProcessMetrics(Reference<RestoreRoleData> self) {
loop {
updateProcessStats(self);
wait(delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL));
}
}
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role) {
loop {
TraceEvent("FastRestoreTraceProcessMetrics")
TraceEvent("FastRestoreTraceProcessMetrics", self->nodeID)
.detail("Role", role)
.detail("Node", self->nodeID)
.detail("PipelinedMaxVersionBatchIndex", self->versionBatchId.get())
.detail("FinishedVersionBatchIndex", self->finishedBatch.get())
.detail("CpuUsage", self->cpuUsage)
.detail("UsedMemory", self->memory)
.detail("ResidentMemory", self->residentMemory);

View File

@ -123,7 +123,7 @@ public:
virtual std::string describeNode() = 0;
};
void updateProcessStats(Reference<RestoreRoleData> self);
ACTOR Future<Void> updateProcessMetrics(Reference<RestoreRoleData> self);
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role);
ACTOR Future<Void> traceRoleVersionBatchProgress(Reference<RestoreRoleData> self, std::string role);

View File

@ -1324,7 +1324,7 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
state LogEpoch epoch = self->cstate.myDBState.recoveryCount;
state Reference<BackupProgress> backupProgress(
new BackupProgress(self->dbgid, self->logSystem->getOldEpochTagsVersionsInfo()));
state Future<Void> gotProgress = getBackupProgress(cx, self->dbgid, backupProgress);
state Future<Void> gotProgress = getBackupProgress(cx, self->dbgid, backupProgress, /*logging=*/true);
state std::vector<Future<InitializeBackupReply>> initializationReplies;
state std::vector<std::pair<UID, Tag>> idsTags; // worker IDs and tags for current epoch