Adding throttling of audit storage tasks and tracing progress of tasks (#10233)

* when trigger doAuditOnStorageServer, check remainingBudgetForAuditTasks

* add trace event of audit progress

* address comments

* code clean up

* make dispatch and schedule audit be more clear

* make dispatch and schedule audit be more clear 2

* make dispatch and schedule audit be more clear 3

* address comments
This commit is contained in:
Zhe Wang 2023-05-15 16:19:41 -07:00 committed by GitHub
parent be1e37518a
commit 852e012eb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 159 additions and 148 deletions

View File

@ -869,6 +869,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1;
init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10;
init( SS_AUDIT_AUTO_PROCEED_COUNT_MAX, 5 );
init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1);
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;

View File

@ -855,6 +855,7 @@ public:
int PERSIST_FINISH_AUDIT_COUNT; // Num of persist complete/failed audits for each type
int AUDIT_RETRY_COUNT_MAX;
int SS_AUDIT_AUTO_PROCEED_COUNT_MAX;
int CONCURRENT_AUDIT_TASK_COUNT_MAX;
int BUGGIFY_BLOCK_BYTES;
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;
double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD;

View File

@ -84,7 +84,7 @@ ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() {
struct DDAudit {
DDAudit(AuditStorageState coreState)
: coreState(coreState), actors(true), foundError(false), anyChildAuditFailed(false), retryCount(0),
cancelled(false) {}
cancelled(false), issueDoAuditCount(0), completeDoAuditCount(0) {}
AuditStorageState coreState;
ActorCollection actors;
@ -93,6 +93,8 @@ struct DDAudit {
int retryCount;
bool anyChildAuditFailed;
bool cancelled; // use to cancel any actor beyond auditActor
int64_t issueDoAuditCount;
int64_t completeDoAuditCount;
void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
Future<Void> getAuditRunActor() { return auditActor; }
@ -335,6 +337,7 @@ public:
std::unordered_map<AuditType, std::unordered_map<UID, std::shared_ptr<DDAudit>>> audits;
Promise<Void> auditInitialized;
std::unordered_map<AuditType, bool> anyAuditStorageLaunching;
std::unordered_map<AuditType, AsyncVar<int>> remainingBudgetForAuditTasks;
Optional<Reference<TenantCache>> ddTenantCache;
@ -741,23 +744,13 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRange, AuditType auditType);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit, KeyRange range);
ACTOR Future<Void> runAuditJobOnOneRandomServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> auditInputRangeOnAllStorageServers(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> partitionAuditJobByKeyServerSpace(Reference<DataDistributor> self,
ACTOR Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> makeAuditProgressOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range,
StorageServerInterface ssi,
bool makeProgressbyServer);
ACTOR Future<Void> makeAuditProgressOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
StorageServerInterface ssi);
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
@ -1769,6 +1762,18 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
.detail("RetryCount", currentRetryCount)
.detail("IsReady", self->auditInitialized.getFuture().isReady());
wait(audit->actors.getResult()); // goto exception handler if any actor is failed
TraceEvent(SevInfo, "DDAuditStorageCoreResult", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("Range", audit->coreState.range)
.detail("AuditType", audit->coreState.getType())
.detail("RetryCount", currentRetryCount)
.detail("DDDoAuditTasksIssued", audit->issueDoAuditCount)
.detail("DDDoAuditTasksComplete", audit->completeDoAuditCount);
ASSERT(audit->issueDoAuditCount >= audit->completeDoAuditCount);
ASSERT(audit->issueDoAuditCount == audit->completeDoAuditCount || audit->anyChildAuditFailed);
// reset issueDoAuditCount and completeDoAuditCount for future use
audit->issueDoAuditCount = 0;
audit->completeDoAuditCount = 0;
if (audit->foundError) {
audit->coreState.setPhase(AuditPhase::Error);
} else if (audit->anyChildAuditFailed) {
@ -2106,137 +2111,96 @@ void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAud
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
if (audit->coreState.getType() == AuditType::ValidateStorageServerShard) {
audit->actors.add(auditInputRangeOnAllStorageServers(self, audit, allKeys));
audit->actors.add(dispatchAuditStorageServerShard(self, audit));
} else if (audit->coreState.getType() == AuditType::ValidateLocationMetadata) {
audit->actors.add(makeAuditProgressOnRange(self, audit, allKeys));
// audit->actors.add(runAuditJobOnOneRandomServer(self, audit, allKeys));
audit->actors.add(dispatchAuditStorage(self, audit, allKeys));
} else if (audit->coreState.getType() == AuditType::ValidateHA ||
audit->coreState.getType() == AuditType::ValidateReplica) {
audit->actors.add(makeAuditProgressOnRange(self, audit, range));
audit->actors.add(dispatchAuditStorage(self, audit, range));
} else {
UNREACHABLE();
}
return;
}
// Randomly pick a server to run an audit on the input range
ACTOR Future<Void> runAuditJobOnOneRandomServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
ASSERT(audit->coreState.getType() == AuditType::ValidateLocationMetadata);
TraceEvent(SevInfo, "DDRunAuditJobBySingleServerBegin", self->ddId)
// This function dedicates to audit ssshard
// For each of storage servers, audits allKeys
ACTOR Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit) {
state const AuditType auditType = audit->coreState.getType();
ASSERT(auditType == AuditType::ValidateStorageServerShard);
TraceEvent(SevInfo, "DDDispatchAuditStorageServerShardBegin", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
try {
ServerWorkerInfos serverWorkers = wait(self->txnProcessor->getServerListAndProcessClasses());
int selected = deterministicRandom()->randomInt(0, serverWorkers.servers.size());
audit->actors.add(makeAuditProgressOnServer(
self, audit, range, serverWorkers.servers[selected].first, /*makeProgressByServer=*/false));
TraceEvent(SevInfo, "DDRunAuditJobBySingleServerEnd", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
} catch (Error& e) {
TraceEvent(SevWarn, "DDRunAuditJobBySingleServerError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
audit->anyChildAuditFailed = true;
}
return Void();
}
// For each of storage servers, run an audit on the input range
ACTOR Future<Void> auditInputRangeOnAllStorageServers(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
ASSERT(audit->coreState.getType() == AuditType::ValidateStorageServerShard);
TraceEvent(SevInfo, "DDAuditInputRangeOnAllStorageServersBegin", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
try {
state ServerWorkerInfos serverWorkers = wait(self->txnProcessor->getServerListAndProcessClasses());
state int i = 0;
for (; i < serverWorkers.servers.size(); ++i) {
StorageServerInterface targetServer = serverWorkers.servers[i].first;
state StorageServerInterface targetServer = serverWorkers.servers[i].first;
// Currently, Tss server may not follow the auit consistency rule
// Thus, skip if the server is tss
if (targetServer.isTss()) {
continue;
}
audit->actors.add(
makeAuditProgressOnServer(self, audit, range, targetServer, /*makeProgressByServer=*/true));
if (self->remainingBudgetForAuditTasks.contains(auditType)) {
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
while (self->remainingBudgetForAuditTasks[auditType].get() == 0) {
wait(self->remainingBudgetForAuditTasks[auditType].onChange());
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
}
}
audit->actors.add(scheduleAuditStorageShardOnServer(self, audit, targetServer));
wait(delay(0.1));
}
TraceEvent(SevInfo, "DDAuditInputRangeOnAllStorageServersEnd", self->ddId)
TraceEvent(SevInfo, "DDDispatchAuditStorageServerShardEnd", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
} catch (Error& e) {
TraceEvent(SevWarn, "DDAuditInputRangeOnAllStorageServersError", self->ddId)
TraceEvent(SevWarn, "DDDispatchAuditStorageServerShardError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
audit->anyChildAuditFailed = true;
}
return Void();
}
// Schedule audit task on the input storage server (ssi)
// Option makeProgressbyServer:
// If we store the progress of complete range for each individual server,
// we should set makeProgressbyServer == true. Then, we load the progress on each server
// If we store the progress of complete range without distinguishing servers,
// we should set makeProgressbyServer == false. Then, we load the progress globally
ACTOR Future<Void> makeAuditProgressOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range,
StorageServerInterface ssi,
bool makeProgressbyServer) {
ASSERT(audit->coreState.getType() == AuditType::ValidateLocationMetadata ||
audit->coreState.getType() == AuditType::ValidateStorageServerShard);
// Schedule audit ssshard task on the input storage server (ssi)
// Do audit on allKeys
ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi) {
state UID serverId = ssi.uniqueID;
TraceEvent(SevInfo, "DDMakeAuditProgressOnServerBegin", self->ddId)
state const AuditType auditType = audit->coreState.getType();
ASSERT(auditType == AuditType::ValidateStorageServerShard);
TraceEvent(SevInfo, "DDScheduleAuditStorageShardOnServerBegin", self->ddId)
.detail("ServerID", serverId)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
state Key begin = range.begin;
state KeyRange currentRange = range;
.detail("AuditType", auditType);
state Key begin = allKeys.begin;
state KeyRange currentRange = allKeys;
state int64_t completedCount = 0;
state int64_t totalCount = 0;
state std::vector<AuditStorageState> auditStates;
try {
while (begin < range.end) {
currentRange = KeyRangeRef(begin, range.end);
if (makeProgressbyServer) {
ASSERT(audit->coreState.getType() == AuditType::ValidateStorageServerShard);
wait(store(auditStates,
getAuditStateByServer(self->txnProcessor->context(),
audit->coreState.getType(),
audit->coreState.id,
serverId,
currentRange)));
} else {
ASSERT(audit->coreState.getType() == AuditType::ValidateLocationMetadata);
wait(store(
auditStates,
getAuditStateByRange(
self->txnProcessor->context(), audit->coreState.getType(), audit->coreState.id, currentRange)));
}
while (begin < allKeys.end) {
currentRange = KeyRangeRef(begin, allKeys.end);
wait(store(auditStates,
getAuditStateByServer(
self->txnProcessor->context(), auditType, audit->coreState.id, serverId, currentRange)));
ASSERT(!auditStates.empty());
begin = auditStates.back().range.end;
TraceEvent(SevInfo, "DDMakeAuditProgressOnServerDispatch", self->ddId)
TraceEvent(SevInfo, "DDScheduleAuditStorageShardOnServerDispatch", self->ddId)
.detail("ServerID", serverId)
.detail("AuditID", audit->coreState.id)
.detail("CurrentRange", currentRange)
.detail("AuditType", audit->coreState.getType())
.detail("AuditType", auditType)
.detail("NextBegin", begin)
.detail("RangeEnd", range.end);
for (const auto& auditState : auditStates) {
const AuditPhase phase = auditState.getPhase();
.detail("RangeEnd", allKeys.end);
state int i = 0;
for (; i < auditStates.size(); i++) {
state AuditPhase phase = auditStates[i].getPhase();
ASSERT(phase != AuditPhase::Running && phase != AuditPhase::Failed);
totalCount++;
if (phase == AuditPhase::Complete) {
@ -2246,43 +2210,57 @@ ACTOR Future<Void> makeAuditProgressOnServer(Reference<DataDistributor> self,
audit->foundError = true;
} else {
ASSERT(phase == AuditPhase::Invalid);
AuditStorageRequest req(audit->coreState.id, auditState.range, audit->coreState.getType());
// Set doAuditOnStorageServer
if (self->remainingBudgetForAuditTasks.contains(auditType)) {
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
while (self->remainingBudgetForAuditTasks[auditType].get() == 0) {
wait(self->remainingBudgetForAuditTasks[auditType].onChange());
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
}
self->remainingBudgetForAuditTasks[auditType].set(
self->remainingBudgetForAuditTasks[auditType].get() - 1);
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
} else {
self->remainingBudgetForAuditTasks[auditType].set(
SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX - 1);
}
AuditStorageRequest req(audit->coreState.id, auditStates[i].range, auditType);
audit->actors.add(doAuditOnStorageServer(self, audit, ssi, req));
}
}
wait(delay(0.1));
}
TraceEvent(SevInfo, "DDMakeAuditProgressOnServerEnd", self->ddId)
TraceEvent(SevInfo, "DDScheduleAuditStorageShardOnServerEnd", self->ddId)
.detail("ServerID", serverId)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType())
.detail("AuditType", auditType)
.detail("TotalRanges", totalCount)
.detail("TotalComplete", completedCount)
.detail("CompleteRatio", completedCount * 1.0 / totalCount);
} catch (Error& e) {
TraceEvent(SevWarn, "DDMakeAuditProgressOnServerError", self->ddId)
TraceEvent(SevWarn, "DDScheduleAuditStorageShardOnServerError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
audit->anyChildAuditFailed = true;
}
return Void();
}
// This function is for ha/replica/locationmetadata audits
// Schedule audit task on the input range
ACTOR Future<Void> makeAuditProgressOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
ASSERT(audit->coreState.getType() == AuditType::ValidateHA ||
audit->coreState.getType() == AuditType::ValidateReplica ||
audit->coreState.getType() == AuditType::ValidateLocationMetadata);
TraceEvent(SevInfo, "DDMakeAuditProgressOnRangeBegin", self->ddId)
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
state const AuditType auditType = audit->coreState.getType();
ASSERT(auditType == AuditType::ValidateHA || auditType == AuditType::ValidateReplica ||
auditType == AuditType::ValidateLocationMetadata);
TraceEvent(SevInfo, "DDDispatchAuditStorageBegin", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
state Key begin = range.begin;
state KeyRange currentRange = range;
state int64_t completedCount = 0;
@ -2290,18 +2268,19 @@ ACTOR Future<Void> makeAuditProgressOnRange(Reference<DataDistributor> self,
try {
while (begin < range.end) {
currentRange = KeyRangeRef(begin, range.end);
std::vector<AuditStorageState> auditStates = wait(getAuditStateByRange(
self->txnProcessor->context(), audit->coreState.getType(), audit->coreState.id, currentRange));
state std::vector<AuditStorageState> auditStates =
wait(getAuditStateByRange(self->txnProcessor->context(), auditType, audit->coreState.id, currentRange));
ASSERT(!auditStates.empty());
begin = auditStates.back().range.end;
TraceEvent(SevInfo, "DDMakeAuditProgressOnRangeDispatch", self->ddId)
TraceEvent(SevInfo, "DDDispatchAuditStorageDispatch", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("CurrentRange", currentRange)
.detail("AuditType", audit->coreState.getType())
.detail("AuditType", auditType)
.detail("NextBegin", begin)
.detail("RangeEnd", range.end);
for (const auto& auditState : auditStates) {
const AuditPhase phase = auditState.getPhase();
state int i = 0;
for (; i < auditStates.size(); i++) {
state AuditPhase phase = auditStates[i].getPhase();
ASSERT(phase != AuditPhase::Running && phase != AuditPhase::Failed);
totalCount++;
if (phase == AuditPhase::Complete) {
@ -2311,24 +2290,31 @@ ACTOR Future<Void> makeAuditProgressOnRange(Reference<DataDistributor> self,
audit->foundError = true;
} else {
ASSERT(phase == AuditPhase::Invalid);
audit->actors.add(scheduleAuditOnRange(self, audit, auditState.range));
if (self->remainingBudgetForAuditTasks.contains(auditType)) {
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
while (self->remainingBudgetForAuditTasks[auditType].get() == 0) {
wait(self->remainingBudgetForAuditTasks[auditType].onChange());
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
}
}
audit->actors.add(scheduleAuditOnRange(self, audit, auditStates[i].range));
}
}
wait(delay(0.1));
}
TraceEvent(SevInfo, "DDMakeAuditProgressOnRangeEnd", self->ddId)
TraceEvent(SevInfo, "DDDispatchAuditStorageEnd", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType())
.detail("AuditType", auditType)
.detail("TotalRanges", totalCount)
.detail("TotalComplete", completedCount)
.detail("CompleteRatio", completedCount * 1.0 / totalCount);
} catch (Error& e) {
TraceEvent(SevWarn, "DDMakeAuditProgressOnRangeError", self->ddId)
TraceEvent(SevWarn, "DDDispatchAuditStorageError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
audit->anyChildAuditFailed = true;
}
@ -2336,17 +2322,17 @@ ACTOR Future<Void> makeAuditProgressOnRange(Reference<DataDistributor> self,
}
// Partition the input range into multiple subranges according to the range ownership, and
// schedule audit tasks of each subrange on the server which owns the subrange
// schedule ha/replica/locationmetadata audit tasks of each subrange on the server which owns the subrange
ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
state const AuditType auditType = audit->coreState.getType();
TraceEvent(SevInfo, "DDScheduleAuditOnRangeBegin", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
state Key begin = range.begin;
state KeyRange currentRange;
state int64_t issueDoAuditCount = 0;
try {
while (begin < range.end) {
@ -2354,24 +2340,24 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
TraceEvent(SevInfo, "DDScheduleAuditOnCurrentRange", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("CurrentRange", currentRange)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
state std::vector<IDDTxnProcessor::DDRangeLocations> rangeLocations =
wait(self->txnProcessor->getSourceServerInterfacesForRange(currentRange));
state int i = 0;
for (i = 0; i < rangeLocations.size(); ++i) {
AuditStorageRequest req(audit->coreState.id, rangeLocations[i].range, audit->coreState.getType());
StorageServerInterface targetServer;
state AuditStorageRequest req(audit->coreState.id, rangeLocations[i].range, auditType);
state StorageServerInterface targetServer;
// Set req.targetServers and targetServer, which will be
// used to doAuditOnStorageServer
// Different audit types have different settings
if (audit->coreState.getType() == AuditType::ValidateHA) {
if (auditType == AuditType::ValidateHA) {
if (rangeLocations[i].servers.size() < 2) {
TraceEvent(SevInfo, "DDScheduleAuditOnRangeEnd", self->ddId)
.detail("Reason", "Single replica, ignore")
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
return Void();
}
// pick a server from primary DC
@ -2384,14 +2370,14 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
const int idx = deterministicRandom()->randomInt(0, it->second.size());
req.targetServers.push_back(it->second[idx].id());
}
} else if (audit->coreState.getType() == AuditType::ValidateReplica) {
} else if (auditType == AuditType::ValidateReplica) {
auto it = rangeLocations[i].servers.begin(); // always compare primary DC
if (it->second.size() == 1) {
TraceEvent(SevInfo, "DDScheduleAuditOnRangeEnd", self->ddId)
.detail("Reason", "Single replica, ignore")
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
return Void();
}
ASSERT(it->second.size() >= 2);
@ -2411,7 +2397,19 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
UNREACHABLE();
}
// Set doAuditOnStorageServer
issueDoAuditCount++;
if (self->remainingBudgetForAuditTasks.contains(auditType)) {
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
while (self->remainingBudgetForAuditTasks[auditType].get() == 0) {
wait(self->remainingBudgetForAuditTasks[auditType].onChange());
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
}
self->remainingBudgetForAuditTasks[auditType].set(
self->remainingBudgetForAuditTasks[auditType].get() - 1);
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() >= 0);
} else {
self->remainingBudgetForAuditTasks[auditType].set(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX -
1);
}
audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req));
// Proceed to the next range if getSourceServerInterfacesForRange is partially read
begin = rangeLocations[i].range.end;
@ -2422,15 +2420,15 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
.detail("Reason", "End")
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType())
.detail("DoAuditCount", issueDoAuditCount);
.detail("AuditType", auditType)
.detail("IssuedDoAuditCount", audit->issueDoAuditCount);
} catch (Error& e) {
TraceEvent(SevWarn, "DDScheduleAuditOnRangeError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("Range", range)
.detail("AuditType", audit->coreState.getType());
.detail("AuditType", auditType);
audit->anyChildAuditFailed = true;
}
@ -2447,32 +2445,41 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", self->ddId)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("AuditType", req.getType())
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
try {
audit->issueDoAuditCount++;
ErrorOr<AuditStorageState> vResult = wait(ssi.auditStorage.getReplyUnlessFailedFor(
req, /*sustainedFailureDuration=*/2.0, /*sustainedFailureSlope=*/0));
if (vResult.isError()) {
throw vResult.getError();
}
TraceEvent(SevDebug, "DDDoAuditOnStorageServerEnd", self->ddId)
audit->completeDoAuditCount++;
TraceEvent(SevInfo, "DDDoAuditOnStorageServerResult", self->ddId)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("AuditType", req.getType())
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
.detail("TargetServers", describe(req.targetServers))
.detail("DDDoAuditTaskIssue", audit->issueDoAuditCount)
.detail("DDDoAuditTaskComplete", audit->completeDoAuditCount);
} catch (Error& e) {
TraceEvent(SevInfo, "DDDoAuditOnStorageServerError", req.id)
.errorUnsuppressed(e)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("AuditType", req.getType())
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
if (e.code() == error_code_actor_cancelled) {
ASSERT(self->remainingBudgetForAuditTasks.contains(req.getType()));
self->remainingBudgetForAuditTasks[req.getType()].set(
self->remainingBudgetForAuditTasks[req.getType()].get() + 1);
ASSERT(self->remainingBudgetForAuditTasks[req.getType()].get() <=
SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX);
throw e;
} else if (e.code() == error_code_audit_storage_error) {
audit->foundError = true;
@ -2483,7 +2490,9 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
audit->anyChildAuditFailed = true;
}
}
ASSERT(self->remainingBudgetForAuditTasks.contains(req.getType()));
self->remainingBudgetForAuditTasks[req.getType()].set(self->remainingBudgetForAuditTasks[req.getType()].get() + 1);
ASSERT(self->remainingBudgetForAuditTasks[req.getType()].get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX);
return Void();
}