Resolve throttling events
This commit is contained in:
parent
c5c6906a3d
commit
97e49f2f70
|
@ -2068,7 +2068,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
for (auto& server : serverTeam) {
|
||||
score += server_info[server]->teams.size();
|
||||
}
|
||||
TraceEvent("BuildServerTeams")
|
||||
TraceEvent(SevDebug, "BuildServerTeams")
|
||||
.detail("Score", score)
|
||||
.detail("BestScore", bestScore)
|
||||
.detail("TeamSize", serverTeam.size())
|
||||
|
|
|
@ -300,8 +300,12 @@ ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
|
|||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> readHotRanges = wait(tr.getReadHotRanges(keys));
|
||||
TraceEvent("ReadHotRangeLog").detail("Ranges", readHotRanges.size());
|
||||
int i = 0;
|
||||
for (auto& keyRange : readHotRanges) {
|
||||
TraceEvent("ReadHotRangeLog")
|
||||
.suppressFor(1.0)
|
||||
.detail("RangeIndex", i)
|
||||
.detail("KeyRangeBegin", keyRange.begin)
|
||||
.detail("KeyRangeEnd", keyRange.end);
|
||||
}
|
||||
|
|
|
@ -185,7 +185,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
state int retries = 0;
|
||||
state double numOps = 0;
|
||||
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
||||
TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID)
|
||||
TraceEvent(delayTime > 5 ? SevWarnAlways : SevDebug, "FastRestoreApplierClearRangeMutationsStart", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Ranges", ranges.size())
|
||||
.detail("DelayTime", delayTime);
|
||||
|
@ -296,7 +296,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
for (auto& key : incompleteStagingKeys) {
|
||||
if (!fValues[i].get().present()) { // Key not exist in DB
|
||||
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
|
||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||
.suppressFor(5.0)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Key", key.first)
|
||||
|
@ -304,7 +304,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
.detail("PendingMutations", key.second->second.pendingMutations.size())
|
||||
.detail("StagingKeyType", getTypeString(key.second->second.type));
|
||||
for (auto& vm : key.second->second.pendingMutations) {
|
||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
||||
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
||||
.detail("PendingMutationVersion", vm.first.toString())
|
||||
.detail("PendingMutation", vm.second.toString());
|
||||
}
|
||||
|
|
|
@ -300,7 +300,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
state std::vector<RestoreFileFR> logFiles;
|
||||
state std::vector<RestoreFileFR> allFiles;
|
||||
state Version minRangeVersion = MAX_VERSION;
|
||||
state ActorCollection actors(false);
|
||||
state Future<Void> error = actorCollection(self->addActor.getFuture());
|
||||
|
||||
self->initBackupContainer(request.url);
|
||||
|
||||
|
@ -356,7 +356,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
}
|
||||
}
|
||||
|
||||
actors.add(monitorFinishedVersion(self, request));
|
||||
self->addActor.send(monitorFinishedVersion(self, request));
|
||||
state std::vector<VersionBatch>::iterator versionBatch = versionBatches.begin();
|
||||
for (; versionBatch != versionBatches.end(); versionBatch++) {
|
||||
while (self->runningVersionBatches.get() >= SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM && !releaseVBOutOfOrder) {
|
||||
|
@ -378,7 +378,11 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
wait(delay(SERVER_KNOBS->FASTRESTORE_VB_LAUNCH_DELAY));
|
||||
}
|
||||
|
||||
wait(waitForAll(fBatches));
|
||||
try {
|
||||
wait(waitForAll(fBatches) || error);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e);
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreController").detail("RestoreToVersion", request.targetVersion);
|
||||
return request.targetVersion;
|
||||
|
|
|
@ -149,6 +149,10 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
|
|||
|
||||
std::map<UID, double> rolesHeartBeatTime; // Key: role id; Value: most recent time controller receives heart beat
|
||||
|
||||
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
|
||||
// addActor is used to create the actorCollection when the RestoreController is created
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
||||
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
|
||||
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }
|
||||
|
||||
|
|
Loading…
Reference in New Issue