Improve distributed consistency checker (#11346)

* ConsistencyCheckerUrgent repeated run

* address comments

* avoid trace SevError for TesterRecruitmentTimeout unless it keeps failure for over 1 day

* address comments

* address comments
This commit is contained in:
Zhe Wang 2024-04-30 14:45:32 -07:00 committed by GitHub
parent 5e9a57bd1f
commit bf53218556
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 38 additions and 7 deletions

View File

@ -269,6 +269,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
init( CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME, 600 );
init( CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT = 2;
init( CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX = 1;
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0, "" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0 = "";

View File

@ -257,6 +257,7 @@ public:
int CONSISTENCY_CHECK_RATE_LIMIT_MAX; // Available in both normal and urgent mode
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME; // Available in normal mode
int CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME; // Available in urgent mode
int CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT; // Available in urgent mode
int CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0; // Available in urgent mode

View File

@ -1594,7 +1594,7 @@ ACTOR Future<std::vector<TesterInterface>> getTesters(Reference<AsyncVar<Optiona
}
when(wait(cc->onChange())) {}
when(wait(testerTimeout)) {
TraceEvent(SevError, "TesterRecruitmentTimeout").log();
TraceEvent(SevWarnAlways, "TesterRecruitmentTimeout");
throw timed_out();
}
}
@ -1792,6 +1792,7 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
state std::vector<TesterInterface> ts; // used to store testers interface
state std::vector<KeyRange> rangesToCheck; // get from globalProgressMap
state std::vector<KeyRange> shardsToCheck; // get from keyServer metadata
state Optional<double> whenFailedToGetTesterStart;
// Initialize globalProgressMap
Optional<std::vector<KeyRange>> rangesToCheck_ = loadRangesToCheckFromKnob();
@ -1838,7 +1839,19 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
// Step 2: Get testers
ts.clear();
if (!testers.present()) { // In real clusters
try {
wait(store(ts, getTesters(cc, minTestersExpected)));
whenFailedToGetTesterStart.reset();
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
if (!whenFailedToGetTesterStart.present()) {
whenFailedToGetTesterStart = now();
} else if (now() - whenFailedToGetTesterStart.get() > 3600 * 24) { // 1 day
TraceEvent(SevError, "TesterRecruitmentTimeout");
}
}
throw e;
}
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.05) {
throw operation_failed(); // Introduce random failure
}
@ -1908,9 +1921,24 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
}
}
ACTOR Future<Void> runConsistencyCheckerUrgentHolder(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc,
Database cx,
Optional<std::vector<TesterInterface>> testers,
int minTestersExpected,
bool repeatRun) {
loop {
wait(runConsistencyCheckerUrgentCore(cc, cx, testers, minTestersExpected));
if (!repeatRun) {
break;
}
wait(delay(CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME));
}
return Void();
}
Future<Void> checkConsistencyUrgentSim(Database cx, std::vector<TesterInterface> testers) {
return runConsistencyCheckerUrgentCore(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>>(), cx, testers, 1);
return runConsistencyCheckerUrgentHolder(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>>(), cx, testers, 1, /*repeatRun=*/false);
}
ACTOR Future<bool> runTest(Database cx,
@ -3018,9 +3046,10 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
state Reference<AsyncVar<ServerDBInfo>> dbInfo(new AsyncVar<ServerDBInfo>);
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
cx = openDBOnServer(dbInfo);
tests = reportErrors(
runConsistencyCheckerUrgentCore(cc, cx, Optional<std::vector<TesterInterface>>(), minTestersExpected),
"runConsistencyCheckerUrgentCore");
tests =
reportErrors(runConsistencyCheckerUrgentHolder(
cc, cx, Optional<std::vector<TesterInterface>>(), minTestersExpected, /*repeatRun=*/true),
"runConsistencyCheckerUrgentHolder");
} else if (at == TEST_HERE) {
auto db = makeReference<AsyncVar<ServerDBInfo>>();
std::vector<TesterInterface> iTesters(1);