Consistency Check Urgent (Cherrypick from Release-7.1) (#11217)

* cherry-pick-distributed-consistency-checker

* code cleanup

* refactor code, decouple consistencyCheckerUrgent and consistency checker

* fix workload for consistencycheckurgent

* add new consistencycheckurgent role type

* fix CI

* address comments
This commit is contained in:
Zhe Wang 2024-02-28 14:22:47 -08:00 committed by GitHub
parent 1f7c7f6588
commit 308ff77e91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1271 additions and 13 deletions

View File

@ -268,6 +268,16 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
init( CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT = 2;
init( CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX = 1;
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0, "" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0 = "";
init( CONSISTENCY_CHECK_URGENT_RANGE_END_0, "\\xff\\xff" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_END_0 = "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x33\\x66\\x63\\x36";
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_1, "" );
init( CONSISTENCY_CHECK_URGENT_RANGE_END_1, "" );
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_2, "" );
init( CONSISTENCY_CHECK_URGENT_RANGE_END_2, "" );
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_3, "" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_3 = "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x33\\x66\\x65\\x34\\x63\\x62";
init( CONSISTENCY_CHECK_URGENT_RANGE_END_3, "" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_END_3 = "\\xff\\xff";
//fdbcli
init( CLI_CONNECT_PARALLELISM, 400 );

View File

@ -254,8 +254,18 @@ public:
int BLOBSTORE_MAX_DELAY_RETRYABLE_ERROR;
int BLOBSTORE_MAX_DELAY_CONNECTION_FAILED;
int CONSISTENCY_CHECK_RATE_LIMIT_MAX;
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME;
int CONSISTENCY_CHECK_RATE_LIMIT_MAX; // Available in both normal and urgent mode
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME; // Available in normal mode
int CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT; // Available in urgent mode
int CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_END_0; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_1; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_END_1; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_2;
std::string CONSISTENCY_CHECK_URGENT_RANGE_END_2;
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_3;
std::string CONSISTENCY_CHECK_URGENT_RANGE_END_3;
// fdbcli
int CLI_CONNECT_PARALLELISM;

View File

@ -871,10 +871,9 @@ ACTOR Future<Void> rebootAndCheck(ClusterControllerData* cluster, Optional<Stand
ACTOR Future<Void> workerAvailabilityWatch(WorkerInterface worker,
ProcessClass startingClass,
ClusterControllerData* cluster) {
state Future<Void> failed =
(worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass)
? Never()
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
state Future<Void> failed = (worker.address() == g_network->getLocalAddress())
? Never()
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
cluster->updateWorkerList.set(worker.locality.processId(),
ProcessData(worker.locality, startingClass, worker.stableAddress()));
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch

View File

@ -708,7 +708,8 @@ static void printUsage(const char* name, bool devhelp) {
printOptionUsage("-r ROLE, --role ROLE",
" Server role (valid options are fdbd, test, multitest,"
" simulation, networktestclient, networktestserver, restore"
" consistencycheck, kvfileintegritycheck, kvfilegeneratesums, kvfiledump, unittests)."
" consistencycheck, consistencycheckurgent, kvfileintegritycheck, kvfilegeneratesums, "
"kvfiledump, unittests)."
" The default is `fdbd'.");
#ifdef _WIN32
printOptionUsage("-n, --newconsole", " Create a new console.");
@ -1035,6 +1036,7 @@ namespace {
enum class ServerRole {
ChangeClusterKey,
ConsistencyCheck,
ConsistencyCheckUrgent,
CreateTemplateDatabase,
DSLTest,
FDBD,
@ -1132,7 +1134,7 @@ struct CLIOptions {
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::ConsistencyCheck) {
if (role == ServerRole::ConsistencyCheck || role == ServerRole::ConsistencyCheckUrgent) {
if (!publicAddressStrs.empty()) {
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
printHelpTeaser(name);
@ -1320,6 +1322,8 @@ private:
role = ServerRole::KVFileDump;
else if (!strcmp(sRole, "consistencycheck"))
role = ServerRole::ConsistencyCheck;
else if (!strcmp(sRole, "consistencycheckurgent"))
role = ServerRole::ConsistencyCheckUrgent;
else if (!strcmp(sRole, "unittests"))
role = ServerRole::UnitTests;
else if (!strcmp(sRole, "flowprocess"))
@ -2424,6 +2428,18 @@ int main(int argc, char* argv[]) {
StringRef(),
opts.localities));
g_network->run();
} else if (role == ServerRole::ConsistencyCheckUrgent) {
setupRunLoopProfiler();
auto m =
startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId, opts.localities.dataHallId());
f = stopAfter(runTests(opts.connectionFile,
TEST_TYPE_CONSISTENCY_CHECK_URGENT,
TEST_ON_TESTERS,
opts.minTesterCount,
opts.testFile,
StringRef(),
opts.localities));
g_network->run();
} else if (role == ServerRole::UnitTests) {
setupRunLoopProfiler();
auto m =

View File

@ -82,6 +82,8 @@ struct WorkloadRequest {
VectorRef<VectorRef<KeyValueRef>> options;
std::vector<KeyRange> rangesToCheck; // For consistency checker urgent
int clientId; // the "id" of the client receiving the request (0 indexed)
int clientCount; // the total number of test clients participating in the workload
ReplyPromise<struct WorkloadInterface> reply;
@ -102,6 +104,7 @@ struct WorkloadRequest {
defaultTenant,
runFailureWorkloads,
disabledFailureInjectionWorkloads,
rangesToCheck,
arena);
}
};
@ -124,7 +127,12 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
LocalityData locality);
enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS };
enum test_type_t { TEST_TYPE_FROM_FILE, TEST_TYPE_CONSISTENCY_CHECK, TEST_TYPE_UNIT_TESTS };
enum test_type_t {
TEST_TYPE_FROM_FILE,
TEST_TYPE_CONSISTENCY_CHECK,
TEST_TYPE_UNIT_TESTS,
TEST_TYPE_CONSISTENCY_CHECK_URGENT
};
ACTOR Future<Void> runTests(
Reference<IClusterConnectionRecord> connRecord,

View File

@ -59,6 +59,7 @@ struct WorkloadContext {
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo;
Reference<IClusterConnectionRecord> ccr;
Optional<TenantName> defaultTenant;
std::vector<KeyRange> rangesToCheck; // for urgent consistency checker
WorkloadContext();
WorkloadContext(const WorkloadContext&);

View File

@ -64,7 +64,7 @@ WorkloadContext::WorkloadContext() {}
WorkloadContext::WorkloadContext(const WorkloadContext& r)
: options(r.options), clientId(r.clientId), clientCount(r.clientCount), sharedRandomNumber(r.sharedRandomNumber),
dbInfo(r.dbInfo), ccr(r.ccr), defaultTenant(r.defaultTenant) {}
dbInfo(r.dbInfo), ccr(r.ccr), defaultTenant(r.defaultTenant), rangesToCheck(r.rangesToCheck) {}
WorkloadContext::~WorkloadContext() {}
@ -512,6 +512,7 @@ ACTOR Future<Reference<TestWorkload>> getWorkloadIface(WorkloadRequest work,
wcx.options = options;
wcx.sharedRandomNumber = work.sharedRandomNumber;
wcx.defaultTenant = work.defaultTenant.castTo<TenantName>();
wcx.rangesToCheck = work.rangesToCheck;
workload = IWorkloadFactory::create(testName.toString(), wcx);
if (workload) {
@ -558,6 +559,7 @@ ACTOR Future<Reference<TestWorkload>> getWorkloadIface(WorkloadRequest work,
wcx.ccr = ccr;
wcx.dbInfo = dbInfo;
wcx.defaultTenant = work.defaultTenant.castTo<TenantName>();
wcx.rangesToCheck = work.rangesToCheck;
// FIXME: Other stuff not filled in; why isn't this constructed here and passed down to the other
// getWorkloadIface()?
for (int i = 0; i < work.options.size(); i++) {
@ -691,6 +693,78 @@ void sendResult(ReplyPromise<T>& reply, Optional<ErrorOr<T>> const& result) {
reply.send(res.get());
}
ACTOR Future<Reference<TestWorkload>> getConsistencyCheckUrgentWorkloadIface(
WorkloadRequest work,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state WorkloadContext wcx;
wcx.clientId = work.clientId;
wcx.clientCount = work.clientCount;
wcx.sharedRandomNumber = work.sharedRandomNumber;
wcx.ccr = ccr;
wcx.dbInfo = dbInfo;
wcx.defaultTenant = work.defaultTenant.castTo<TenantName>();
wcx.rangesToCheck = work.rangesToCheck;
Reference<TestWorkload> iface = wait(getWorkloadIface(work, ccr, work.options[0], dbInfo));
return iface;
}
ACTOR Future<Void> runConsistencyCheckUrgentWorkloadAsync(Database cx,
WorkloadInterface workIface,
Reference<TestWorkload> workload) {
state ReplyPromise<Void> jobReq;
loop choose {
when(ReplyPromise<Void> req = waitNext(workIface.start.getFuture())) {
jobReq = req;
try {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadReceived", workIface.id())
.detail("WorkloadName", workload->description())
.detail("ClientCount", workload->clientCount)
.detail("ClientId", workload->clientId);
wait(workload->start(cx));
jobReq.send(Void());
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadError", workIface.id())
.errorUnsuppressed(e)
.detail("WorkloadName", workload->description())
.detail("ClientCount", workload->clientCount)
.detail("ClientId", workload->clientId);
jobReq.sendError(consistency_check_urgent_task_failed());
}
break;
}
}
return Void();
}
ACTOR Future<Void> testerServerConsistencyCheckerUrgentWorkload(WorkloadRequest work,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo) {
state WorkloadInterface workIface;
state bool replied = false;
try {
state Database cx = openDBOnServer(dbInfo);
wait(delay(1.0));
Reference<TestWorkload> workload = wait(getConsistencyCheckUrgentWorkloadIface(work, ccr, dbInfo));
Future<Void> test = runConsistencyCheckUrgentWorkloadAsync(cx, workIface, workload);
work.reply.send(workIface);
replied = true;
wait(test);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterRunWorkloadFailed").errorUnsuppressed(e);
if (!replied) {
work.reply.sendError(e);
}
}
return Void();
}
ACTOR Future<Void> runWorkloadAsync(Database cx,
WorkloadInterface workIface,
Reference<TestWorkload> workload,
@ -865,11 +939,44 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
state PromiseStream<Future<Void>> addWorkload;
state Future<Void> workerFatalError = actorCollection(addWorkload.getFuture());
// Dedicated to consistencyCheckerUrgent
// At any time, we only allow at most 1 consistency checker workload on a server
state std::pair<int64_t, Future<Void>> consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>());
TraceEvent("StartingTesterServerCore", interf.id()).log();
loop choose {
when(wait(workerFatalError)) {}
when(wait(consistencyCheckerUrgentTester.second.isValid() ? consistencyCheckerUrgentTester.second : Never())) {
ASSERT(consistencyCheckerUrgentTester.first != 0);
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadEnd", interf.id())
.detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first);
consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>()); // reset
}
when(WorkloadRequest work = waitNext(interf.recruitments.getFuture())) {
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality));
if (work.title == "ConsistencyCheckUrgent") {
// The workload is a consistency checker urgent workload
if (work.sharedRandomNumber == consistencyCheckerUrgentTester.first) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterDuplicatedRequest", interf.id())
.detail("ConsistencyCheckerId", work.sharedRandomNumber)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
} else if (consistencyCheckerUrgentTester.second.isValid() &&
!consistencyCheckerUrgentTester.second.isReady()) {
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_TesterWorkloadConflict", interf.id())
.detail("ExistingConsistencyCheckerId", consistencyCheckerUrgentTester.first)
.detail("ArrivingConsistencyCheckerId", work.sharedRandomNumber)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
}
consistencyCheckerUrgentTester = std::make_pair(
work.sharedRandomNumber, testerServerConsistencyCheckerUrgentWorkload(work, ccr, dbInfo));
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterWorkloadInitialized", interf.id())
.detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
} else {
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality));
}
}
}
}
@ -1331,6 +1438,478 @@ ACTOR Future<Void> checkConsistency(Database cx,
}
}
ACTOR Future<std::unordered_set<int>> runUrgentConsistencyCheckWorkload(
Database cx,
std::vector<TesterInterface> testers,
int64_t consistencyCheckerId,
std::unordered_map<int, std::vector<KeyRange>> assignment) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_DispatchWorkloads")
.detail("TesterCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
// Step 1: Get interfaces for running workloads
state std::vector<Future<ErrorOr<WorkloadInterface>>> workRequests;
Standalone<VectorRef<KeyValueRef>> option;
option.push_back_deep(option.arena(), KeyValueRef("testName"_sr, "ConsistencyCheckUrgent"_sr));
Standalone<VectorRef<VectorRef<KeyValueRef>>> options;
options.push_back_deep(options.arena(), option);
for (int i = 0; i < testers.size(); i++) {
WorkloadRequest req;
req.title = "ConsistencyCheckUrgent"_sr;
req.useDatabase = true;
req.timeout = 0.0; // disable timeout workload
req.databasePingDelay = 0.0; // disable databased ping check
req.options = options;
req.clientId = i;
req.clientCount = testers.size();
req.sharedRandomNumber = consistencyCheckerId;
req.rangesToCheck = assignment[i];
workRequests.push_back(testers[i].recruitments.getReplyUnlessFailedFor(req, 10, 0));
// workRequests follows the order of clientId of assignment
}
wait(waitForAll(workRequests));
// Step 2: Run workloads via the interfaces
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TriggerWorkloads")
.detail("TesterCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
state std::unordered_set<int> completeClientIds;
state std::vector<int> clientIds; // record the clientId for jobs
state std::vector<Future<ErrorOr<Void>>> jobs;
for (int i = 0; i < workRequests.size(); i++) {
ASSERT(workRequests[i].isReady());
if (workRequests[i].get().isError()) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_FailedToContactTester")
.error(workRequests[i].get().getError())
.detail("TesterCount", testers.size())
.detail("TesterId", i)
.detail("ConsistencyCheckerId", consistencyCheckerId);
} else {
jobs.push_back(workRequests[i].get().get().start.template getReplyUnlessFailedFor<Void>(10, 0));
clientIds.push_back(i);
}
}
wait(waitForAll(jobs));
for (int i = 0; i < jobs.size(); i++) {
if (jobs[i].isError()) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_RunWorkloadError1")
.errorUnsuppressed(jobs[i].getError())
.detail("ClientId", clientIds[i])
.detail("ClientCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
} else if (jobs[i].get().isError()) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_RunWorkloadError2")
.errorUnsuppressed(jobs[i].get().getError())
.detail("ClientId", clientIds[i])
.detail("ClientCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
} else {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_RunWorkloadComplete")
.detail("ClientId", clientIds[i])
.detail("ClientCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
completeClientIds.insert(clientIds[i]); // Add complete clients
}
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_DispatchWorkloadEnd")
.detail("TesterCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
return completeClientIds;
}
ACTOR Future<std::vector<KeyRange>> getConsistencyCheckShards(Database cx, std::vector<KeyRange> ranges) {
// Get the scope of the input list of ranges
state Key beginKeyToReadKeyServer;
state Key endKeyToReadKeyServer;
for (int i = 0; i < ranges.size(); i++) {
if (i == 0 || ranges[i].begin < beginKeyToReadKeyServer) {
beginKeyToReadKeyServer = ranges[i].begin;
}
if (i == 0 || ranges[i].end > endKeyToReadKeyServer) {
endKeyToReadKeyServer = ranges[i].end;
}
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_GetConsistencyCheckShards")
.detail("RangeBegin", beginKeyToReadKeyServer)
.detail("RangeEnd", endKeyToReadKeyServer);
// Read KeyServer space within the scope and add shards intersecting with the input ranges
state std::vector<KeyRange> res;
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
KeyRange rangeToRead = Standalone(KeyRangeRef(beginKeyToReadKeyServer, endKeyToReadKeyServer));
RangeResult readResult = wait(krmGetRanges(
&tr, keyServersPrefix, rangeToRead, CLIENT_KNOBS->TOO_MANY, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
for (int i = 0; i < readResult.size() - 1; ++i) {
KeyRange rangeToCheck = Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key));
Value valueToCheck = Standalone(readResult[i].value);
bool toAdd = false;
for (const auto& range : ranges) {
if (rangeToCheck.intersects(range) == true) {
toAdd = true;
break;
}
}
if (toAdd == true) {
res.push_back(rangeToCheck);
}
beginKeyToReadKeyServer = readResult[i + 1].key;
}
if (beginKeyToReadKeyServer >= endKeyToReadKeyServer) {
break;
}
} catch (Error& e) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_GetConsistencyCheckShardsRetry").error(e);
wait(tr.onError(e));
}
}
return res;
}
ACTOR Future<std::vector<TesterInterface>> getTesters(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc,
int minTestersExpected) {
// Recruit workers
state int flags = GetWorkersRequest::TESTER_CLASS_ONLY | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY;
state Future<Void> testerTimeout = delay(600.0); // wait 600 sec for testers to show up
state std::vector<WorkerDetails> workers;
loop {
choose {
when(std::vector<WorkerDetails> w =
wait(cc->get().present()
? brokenPromiseToNever(cc->get().get().getWorkers.getReply(GetWorkersRequest(flags)))
: Never())) {
if (w.size() >= minTestersExpected) {
workers = w;
break;
}
wait(delay(SERVER_KNOBS->WORKER_POLL_DELAY));
}
when(wait(cc->onChange())) {}
when(wait(testerTimeout)) {
TraceEvent(SevError, "TesterRecruitmentTimeout").log();
throw timed_out();
}
}
}
state std::vector<TesterInterface> ts;
ts.reserve(workers.size());
for (int i = 0; i < workers.size(); i++)
ts.push_back(workers[i].interf.testerInterface);
deterministicRandom()->randomShuffle(ts);
return ts;
}
const std::unordered_map<char, uint8_t> parseCharMap{
{ '0', 0 }, { '1', 1 }, { '2', 2 }, { '3', 3 }, { '4', 4 }, { '5', 5 }, { '6', 6 }, { '7', 7 },
{ '8', 8 }, { '9', 9 }, { 'a', 10 }, { 'b', 11 }, { 'c', 12 }, { 'd', 13 }, { 'e', 14 }, { 'f', 15 },
{ 'A', 10 }, { 'B', 11 }, { 'C', 12 }, { 'D', 13 }, { 'E', 14 }, { 'F', 15 },
};
Optional<Key> getKeyFromString(const std::string& str) {
Key emptyKey;
if (str.size() == 0) {
return emptyKey;
}
if (str.size() % 4 != 0) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongLength")
.detail("InputStr", str);
return Optional<Key>();
}
std::vector<uint8_t> byteList;
for (int i = 0; i < str.size(); i += 4) {
if (str.at(i + 0) != '\\' || str.at(i + 1) != 'x') {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongBytePrefix")
.detail("InputStr", str);
return Optional<Key>();
}
const char first = str.at(i + 2);
const char second = str.at(i + 3);
if (parseCharMap.count(first) == 0 || parseCharMap.count(second) == 0) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongByteContent")
.detail("InputStr", str);
return Optional<Key>();
}
uint8_t parsedValue = parseCharMap.at(first) * 16 + parseCharMap.at(second);
byteList.push_back(parsedValue);
}
return Standalone(StringRef(byteList.data(), byteList.size()));
}
Optional<std::vector<KeyRange>> loadRangesToCheckFromKnob() {
// Load string from knob
std::vector<std::string> beginKeyStrs = {
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_1,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_2,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_3,
};
std::vector<std::string> endKeyStrs = {
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_END_0,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_END_1,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_END_2,
CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RANGE_END_3,
};
// Get keys from strings
std::vector<Key> beginKeys;
for (const auto& beginKeyStr : beginKeyStrs) {
Optional<Key> key = getKeyFromString(beginKeyStr);
if (key.present()) {
beginKeys.push_back(key.get());
} else {
return Optional<std::vector<KeyRange>>();
}
}
std::vector<Key> endKeys;
for (const auto& endKeyStr : endKeyStrs) {
Optional<Key> key = getKeyFromString(endKeyStr);
if (key.present()) {
endKeys.push_back(key.get());
} else {
return Optional<std::vector<KeyRange>>();
}
}
if (beginKeys.size() != endKeys.size()) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "ConsistencyCheckUrgent_GetKeyFromStringError")
.detail("Reason", "MismatchBeginKeysAndEndKeys");
return Optional<std::vector<KeyRange>>();
}
// Get ranges
KeyRangeMap<bool> rangeToCheckMap;
for (int i = 0; i < beginKeys.size(); i++) {
Key rangeBegin = beginKeys[i];
Key rangeEnd = endKeys[i];
if (rangeBegin.empty() && rangeEnd.empty()) {
continue;
}
if (rangeBegin > allKeys.end) {
rangeBegin = allKeys.end;
}
if (rangeEnd > allKeys.end) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_ReverseInputRange")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Index", i)
.detail("RangeBegin", rangeBegin)
.detail("RangeEnd", rangeEnd);
rangeEnd = allKeys.end;
}
KeyRange rangeToCheck;
if (rangeBegin < rangeEnd) {
rangeToCheck = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
} else if (rangeBegin > rangeEnd) {
rangeToCheck = Standalone(KeyRangeRef(rangeEnd, rangeBegin));
} else {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_EmptyInputRange")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Index", i)
.detail("RangeBegin", rangeBegin)
.detail("RangeEnd", rangeEnd);
continue;
}
rangeToCheckMap.insert(rangeToCheck, true);
}
rangeToCheckMap.coalesce(allKeys);
std::vector<KeyRange> res;
for (auto rangeToCheck : rangeToCheckMap.ranges()) {
if (rangeToCheck.value() == true) {
res.push_back(rangeToCheck.range());
}
}
TraceEvent e(SevInfo, "ConsistencyCheckUrgent_LoadedInputRange");
e.setMaxEventLength(-1);
e.setMaxFieldLength(-1);
for (int i = 0; i < res.size(); i++) {
e.detail("RangeBegin" + std::to_string(i), res[i].begin);
e.detail("RangeEnd" + std::to_string(i), res[i].end);
}
return res;
}
std::unordered_map<int, std::vector<KeyRange>> makeTaskAssignment(Database cx,
int64_t consistencyCheckerId,
std::vector<KeyRange> shardsToCheck,
int testersCount,
int round) {
std::unordered_map<int, std::vector<KeyRange>> assignment;
int batchSize = CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT;
int startingPoint = 0;
if (shardsToCheck.size() > batchSize * testersCount) {
startingPoint = deterministicRandom()->randomInt(0, shardsToCheck.size() - batchSize * testersCount);
// We randomly pick a set of successive shards:
// (1) We want to retry for different shards to avoid repeated failure on the same shards
// (2) We want to check successive shards to avoid inefficiency incurred by fragments
}
assignment.clear();
for (int i = startingPoint; i < shardsToCheck.size(); i++) {
int testerIdx = (i - startingPoint) / batchSize;
if (testerIdx > testersCount - 1) {
break; // Have filled up all testers
}
assignment[testerIdx].push_back(shardsToCheck[i]);
}
std::unordered_map<int, std::vector<KeyRange>>::iterator assignIt;
for (assignIt = assignment.begin(); assignIt != assignment.end(); assignIt++) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_AssignTaskToTesters")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("Round", round)
.detail("ClientId", assignIt->first)
.detail("ShardsCount", assignIt->second.size());
}
return assignment;
}
ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc,
Database cx,
Optional<std::vector<TesterInterface>> testers,
int minTestersExpected) {
state KeyRangeMap<bool> globalProgressMap; // used to keep track of progress
state std::unordered_map<int, std::vector<KeyRange>> assignment; // used to keep track of assignment of tasks
state std::vector<TesterInterface> ts; // used to store testers interface
state std::vector<KeyRange> rangesToCheck; // get from globalProgressMap
state std::vector<KeyRange> shardsToCheck; // get from keyServer metadata
// Initialize globalProgressMap
Optional<std::vector<KeyRange>> rangesToCheck_ = loadRangesToCheckFromKnob();
if (rangesToCheck_.present()) {
globalProgressMap.insert(allKeys, true);
for (const auto& rangeToCheck : rangesToCheck_.get()) {
// Mark rangesToCheck as incomplete
// Those ranges will be checked
globalProgressMap.insert(rangeToCheck, false);
}
globalProgressMap.coalesce(allKeys);
} else {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_FailedToLoadRangeFromKnob");
globalProgressMap.insert(allKeys, false);
}
state int64_t consistencyCheckerId = deterministicRandom()->randomInt64(0, 10000000);
state int retryTimes = 0;
state int round = 0;
// Main loop
loop {
try {
// Step 1: Load ranges to check, if nothing to run, exit
TraceEvent(SevInfo, "ConsistencyCheckUrgent_RoundBegin")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("RetryTimes", retryTimes)
.detail("TesterCount", ts.size())
.detail("Round", round);
rangesToCheck.clear();
for (auto& range : globalProgressMap.ranges()) {
if (!range.value()) { // range that is not finished
rangesToCheck.push_back(range.range());
}
}
if (rangesToCheck.size() == 0) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_Complete")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("RetryTimes", retryTimes)
.detail("Round", round);
return Void();
}
// Step 2: Get testers
ts.clear();
if (!testers.present()) { // In real clusters
wait(store(ts, getTesters(cc, minTestersExpected)));
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.05) {
throw operation_failed(); // Introduce random failure
}
} else { // In simulation
ts = testers.get();
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_GoTTesters")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("Round", round)
.detail("RetryTimes", retryTimes)
.detail("TesterCount", ts.size());
// Step 3: Load shards to check from keyserver space
// Shard is the unit for the task assignment
shardsToCheck.clear();
wait(store(shardsToCheck, getConsistencyCheckShards(cx, rangesToCheck)));
TraceEvent(SevInfo, "ConsistencyCheckUrgent_GotShardsToCheck")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("Round", round)
.detail("RetryTimes", retryTimes)
.detail("TesterCount", ts.size())
.detail("ShardCount", shardsToCheck.size());
// Step 4: Assign tasks to clients
assignment.clear();
assignment = makeTaskAssignment(cx, consistencyCheckerId, shardsToCheck, ts.size(), round);
// Step 5: Run checking on testers
std::unordered_set<int> completeClients =
wait(runUrgentConsistencyCheckWorkload(cx, ts, consistencyCheckerId, assignment));
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.05) {
throw operation_failed(); // Introduce random failure
}
// We use the complete client to decide which ranges are completed
for (const auto& clientId : completeClients) {
for (const auto& range : assignment[clientId]) {
globalProgressMap.insert(range, true); // Mark the ranges as complete
}
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_RoundEnd")
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("RetryTimes", retryTimes)
.detail("SucceedTesterCount", completeClients.size())
.detail("SucceedTesters", describe(completeClients))
.detail("TesterCount", ts.size())
.detail("Round", round);
round++;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
} else {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_CoreWithRetriableFailure")
.errorUnsuppressed(e)
.detail("ConsistencyCheckerId", consistencyCheckerId)
.detail("RetryTimes", retryTimes)
.detail("Round", round);
wait(delay(10.0));
retryTimes++;
}
}
wait(delay(10.0)); // Backoff 10 seconds for the next round
// Decide and enforce the consistencyCheckerId for the next round
consistencyCheckerId = deterministicRandom()->randomInt64(0, 10000000);
}
}
Future<Void> checkConsistencyUrgentSim(Database cx, std::vector<TesterInterface> testers) {
return runConsistencyCheckerUrgentCore(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>>(), cx, testers, 1);
}
ACTOR Future<bool> runTest(Database cx,
std::vector<TesterInterface> testers,
TestSpec spec,
@ -1392,6 +1971,8 @@ ACTOR Future<bool> runTest(Database cx,
if (spec.runConsistencyCheck) {
state bool quiescent = g_network->isSimulated() ? !BUGGIFY : spec.waitForQuiescenceEnd;
try {
// For testing urgent consistency check
wait(timeoutError(checkConsistencyUrgentSim(cx, testers), 20000.0));
wait(timeoutError(checkConsistency(cx,
testers,
quiescent,
@ -2365,7 +2946,9 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
}
if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK) {
if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK_URGENT) {
// Need not to set spec here. Will set spec when triggering workload
} else if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK) {
TestSpec spec;
Standalone<VectorRef<KeyValueRef>> options;
spec.title = "ConsistencyCheck"_sr;
@ -2427,7 +3010,15 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
knobProtectiveGroup = std::make_unique<KnobProtectiveGroup>(testSet.overrideKnobs);
Future<Void> tests;
if (at == TEST_HERE) {
if (whatToRun == TEST_TYPE_CONSISTENCY_CHECK_URGENT) {
state Database cx;
state Reference<AsyncVar<ServerDBInfo>> dbInfo(new AsyncVar<ServerDBInfo>);
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
cx = openDBOnServer(dbInfo);
tests = reportErrors(
runConsistencyCheckerUrgentCore(cc, cx, Optional<std::vector<TesterInterface>>(), minTestersExpected),
"runConsistencyCheckerUrgentCore");
} else if (at == TEST_HERE) {
auto db = makeReference<AsyncVar<ServerDBInfo>>();
std::vector<TesterInterface> iTesters(1);
actors.push_back(

View File

@ -0,0 +1,622 @@
/*
* ConsistencyCheckUrgent.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <math.h>
#include "boost/lexical_cast.hpp"
#include "flow/IRandom.h"
#include "flow/ProcessEvents.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/IRateControl.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/Knobs.h"
#include "flow/DeterministicRandom.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "flow/network.h"
#include "fdbrpc/SimulatorProcessInfo.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// The ConsistencyCheckUrgent workload is designed to support the consistency check
// urgent feature, a distributed version of the consistency check which emphasizes
// the completeness of checking and the distributed fashion.
// The ConsistencyCheckUrgent workload emphasizes the completeness of data consistency check ---
// if any shard is failed to check, this information will be propagated to the user.
// To support to the distributed fashion, the ConsistencyCheckUrgent workload takes
// rangesToCheck and consistencyCheckerId as the input and check the data consistency of
// the input ranges.
// On the other hand, the ConsistencyCheck workload is used for a single-threaded consistency
// check feature and includes many checks other than the data consistency check, such as
// shard size estimation evaluation. However, the ConsistencyCheck workload cannot guarantee
// the complete check and users cannot specify input range to check. Therefore, the
// ConsistencyCheck workload cannot meet the requirement of the consistency check urgent feature.
struct ConsistencyCheckUrgentWorkload : TestWorkload {
static constexpr auto NAME = "ConsistencyCheckUrgent";
int64_t consistencyCheckerId;
ConsistencyCheckUrgentWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
consistencyCheckerId = sharedRandomNumber;
}
ACTOR Future<std::vector<std::pair<KeyRange, Value>>>
getKeyLocationsForRangeList(Database cx, std::vector<KeyRange> ranges, ConsistencyCheckUrgentWorkload* self) {
// Get the scope of the input list of ranges
state Key beginKeyToReadKeyServer;
state Key endKeyToReadKeyServer;
for (int i = 0; i < ranges.size(); i++) {
if (i == 0 || ranges[i].begin < beginKeyToReadKeyServer) {
beginKeyToReadKeyServer = ranges[i].begin;
}
if (i == 0 || ranges[i].end > endKeyToReadKeyServer) {
endKeyToReadKeyServer = ranges[i].end;
}
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterGetKeyLocationsForRangeList")
.detail("RangeBegin", beginKeyToReadKeyServer)
.detail("RangeEnd", endKeyToReadKeyServer)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
// Read KeyServer space within the scope and add shards intersecting with the input ranges
state std::vector<std::pair<KeyRange, Value>> res;
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
KeyRange rangeToRead = Standalone(KeyRangeRef(beginKeyToReadKeyServer, endKeyToReadKeyServer));
RangeResult readResult = wait(krmGetRanges(
&tr, keyServersPrefix, rangeToRead, CLIENT_KNOBS->TOO_MANY, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
for (int i = 0; i < readResult.size() - 1; ++i) {
KeyRange rangeToCheck = Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key));
Value valueToCheck = Standalone(readResult[i].value);
bool toAdd = false;
for (const auto& range : ranges) {
if (rangeToCheck.intersects(range) == true) {
toAdd = true;
break;
}
}
if (toAdd == true) {
res.push_back(std::make_pair(rangeToCheck, valueToCheck));
}
beginKeyToReadKeyServer = readResult[i + 1].key;
}
if (beginKeyToReadKeyServer >= endKeyToReadKeyServer) {
break;
}
} catch (Error& e) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterRetryGetKeyLocationsForRangeList")
.error(e)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
wait(tr.onError(e));
}
}
return res;
}
ACTOR Future<Version> getVersion(Database cx) {
loop {
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Version version = wait(tr.getReadVersion());
return version;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> checkDataConsistencyUrgent(Database cx,
std::vector<KeyRange> rangesToCheck,
ConsistencyCheckUrgentWorkload* self,
int consistencyCheckEpoch) {
// Get shard locations of the input rangesToCheck
state std::vector<std::pair<KeyRange, Value>> shardLocationPairList;
state int retryCount = 0;
loop {
try {
shardLocationPairList.clear();
wait(store(shardLocationPairList, self->getKeyLocationsForRangeList(cx, rangesToCheck, self)));
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterGetKeyLocationListFailed")
.error(e)
.detail("RetryCount", retryCount)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
wait(delay(5.0));
retryCount++;
if (retryCount > 50) {
throw timed_out();
}
}
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterStartTask")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("RangeToCheck", rangesToCheck.size())
.detail("ShardToCheck", shardLocationPairList.size())
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
// Do consistency check shard by shard
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1));
state KeyRangeMap<bool> failedRanges; // Used to collect failed ranges in the current checkDataConsistency
failedRanges.insert(allKeys, false); // Initialized with false and will set any failed range as true later
// Which will be used to start the next consistencyCheckEpoch of the checkDataConsistency
state int64_t numShardThisClient = shardLocationPairList.size();
state int64_t numShardToCheck = -1;
state int64_t numCompleteShards = 0;
state int64_t numFailedShards = 0;
state int shardIdx = 0;
for (; shardIdx < shardLocationPairList.size(); ++shardIdx) {
numShardToCheck = numShardThisClient - shardIdx;
state KeyRangeRef range = shardLocationPairList[shardIdx].first;
// Step 1: Get source server id of the shard
state std::vector<UID> sourceStorageServers;
state std::vector<UID> destStorageServers;
retryCount = 0;
loop {
try {
Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
decodeKeyServersValue(UIDtoTagMap,
shardLocationPairList[shardIdx].second,
sourceStorageServers,
destStorageServers,
false);
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterGetUIDtoTagMapFailed")
.error(e)
.detail("RetryCount", retryCount)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
wait(delay(5.0));
retryCount++;
if (retryCount > 50) {
throw timed_out();
}
}
}
if (sourceStorageServers.size() == 0) {
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_TesterEmptySourceServers")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
numCompleteShards++;
continue; // Skip to the next shard
} else if (sourceStorageServers.size() == 1) {
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterSingleReplica")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
numCompleteShards++;
continue; // Skip to the next shard
}
// Step 2: Get server interfaces
state std::vector<UID> storageServers = sourceStorageServers; // We check source server
state std::vector<StorageServerInterface> storageServerInterfaces;
retryCount = 0;
loop {
try {
Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
std::vector<Future<Optional<Value>>> serverListEntries;
for (int s = 0; s < storageServers.size(); s++)
serverListEntries.push_back(tr.get(serverListKeyFor(storageServers[s])));
state std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterGetServerInterfaceFailed")
.detail("SSID", storageServers[s])
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("RetryCount", retryCount)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
throw operation_failed();
}
storageServerInterfaces.push_back(decodeServerListValue(serverListValues[s].get()));
}
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
wait(delay(5.0));
retryCount++;
if (retryCount > 50) {
throw timed_out();
}
}
}
// Step 3: Read a limited number of entries at a time, repeating until all keys in the shard have been read
state int64_t totalReadAmount = 0;
state int64_t shardReadAmount = 0;
state int64_t shardKeyCompared = 0;
state bool valueAvailableToCheck = true;
state KeySelector begin = firstGreaterOrEqual(range.begin);
loop {
try {
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx));
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(range.end);
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
req.options = ReadOptions(debugRandom()->randomUniqueID());
// Try getting the entries in the specified range
state std::vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < storageServerInterfaces.size(); j++) {
resetReply(req);
keyValueFutures.push_back(
storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
if (!rangeResult.present() || rangeResult.get().error.present()) {
valueAvailableToCheck = false;
TraceEvent e(SevInfo, "ConsistencyCheckUrgent_TesterGetRangeError");
e.detail("ResultPresent", rangeResult.present());
if (rangeResult.present()) {
e.detail("ErrorPresent", rangeResult.get().error.present());
if (rangeResult.get().error.present()) {
e.detail("Error", rangeResult.get().error.get().what());
}
} else {
e.detail("ResultNotPresentWithError", rangeResult.getError().what());
}
break;
}
}
if (!valueAvailableToCheck) {
failedRanges.insert(range, true);
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterShardAddedToRetry")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
break;
}
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
ASSERT(rangeResult.present() && !rangeResult.get().error.present());
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1) {
firstValidServer = j;
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
shardKeyCompared += current.data.size();
} else {
// Compare this shard against the first
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
if (current.data != reference.data || current.more != reference.more) {
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
}
}
TraceEvent(SevError, "ConsistencyCheck_DataInconsistent")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail(format("StorageServer%d", j).c_str(), storageServers[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
storageServers[firstValidServer].toString())
.detail("RangeBegin", req.begin.getKey())
.detail("RangeEnd", req.end.getKey())
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey)
.detail(format("Server%dUniques", firstValidServer).c_str(), referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(), referenceUniqueKey)
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", valueMismatchKey)
.detail("MatchingKVPairs", matchingKVPairs)
.detail("IsTSS",
storageServerInterfaces[j].isTss() ||
storageServerInterfaces[firstValidServer].isTss()
? "True"
: "False")
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end);
}
}
}
// RateKeeping
wait(rateLimiter->getAllowance(totalReadAmount));
shardReadAmount += totalReadAmount;
// Advance to the next set of entries
ASSERT(firstValidServer != -1);
if (keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
} else {
break;
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterRetryDataConsistency").error(e);
wait(delay(5.0));
}
}
// Step 4: if the shard failed to check, add it to retry queue
// Otherwise, persist the progress
if (!valueAvailableToCheck) {
// Any shard for this case has been added to failedRanges which will be retried
// by the next consistencyCheckEpoch
numFailedShards++;
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterShardFailed")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("NumCompletedShards", numCompleteShards)
.detail("NumFailedShards", numFailedShards)
.detail("NumShardThisClient", numShardThisClient)
.detail("NumShardToCheckThisEpoch", numShardToCheck - 1)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ReplicaCount", storageServers.size())
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ShardBytesRead", shardReadAmount)
.detail("ShardKeysCompared", shardKeyCompared);
} else {
numCompleteShards++;
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterShardComplete")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount)
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ReplicaCount", storageServers.size())
.detail("NumCompletedShards", numCompleteShards)
.detail("NumFailedShards", numFailedShards)
.detail("NumShardThisClient", numShardThisClient)
.detail("NumShardToCheckThisEpoch", numShardToCheck - 1)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ShardBytesRead", shardReadAmount)
.detail("ShardKeysCompared", shardKeyCompared);
}
}
// For the failed ranges, trigger next epoch of checkDataConsistencyUrgent
failedRanges.coalesce(allKeys);
state std::vector<KeyRange> failedRangesToCheck;
state KeyRangeMap<bool>::Ranges failedRangesList = failedRanges.ranges();
state KeyRangeMap<bool>::iterator failedRangesIter = failedRangesList.begin();
for (; failedRangesIter != failedRangesList.end(); ++failedRangesIter) {
if (failedRangesIter->value()) {
failedRangesToCheck.push_back(failedRangesIter->range());
}
}
if (failedRangesToCheck.size() > 0) { // Retry for any failed shard
if (consistencyCheckEpoch < CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX) {
wait(delay(60.0)); // Backoff 1 min
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterRetryFailedRanges")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("FailedCollectedRangeCount", failedRangesToCheck.size())
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount);
wait(self->checkDataConsistencyUrgent(cx, failedRangesToCheck, self, consistencyCheckEpoch + 1));
} else {
TraceEvent(SevWarn, "ConsistencyCheckUrgent_TesterRetryDepthMax")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("FailedCollectedRangeCount", failedRangesToCheck.size())
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount);
throw consistency_check_urgent_task_failed(); // Notify the checker that this tester is failed
}
// When we are able to persist progress, we simply give up retrying when retry too many times
// The failed ranges will be picked up by the next round of the consistency checker urgent
}
if (consistencyCheckEpoch == 0) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterEndTask")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ShardCount", shardLocationPairList.size())
.detail("ClientId", self->clientId)
.detail("ClientCount", self->clientCount);
}
return Void();
}
ACTOR Future<Void> _start(Database cx, ConsistencyCheckUrgentWorkload* self) {
try {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterStart")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
if (self->rangesToCheck.size() == 0) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterExit")
.detail("Reason", "AssignedEmptyRangeToCheck")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
return Void();
}
if (g_network->isSimulated() && deterministicRandom()->coinflip()) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterMimicFailure")
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
throw operation_failed(); // mimic tester failure
}
wait(self->checkDataConsistencyUrgent(cx,
self->rangesToCheck,
self,
/*consistencyCheckEpoch=*/0));
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterExit")
.detail("Reason", "CompleteCheck")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
} catch (Error& e) {
std::string reason;
Severity sev = SevInfo;
if (e.code() == error_code_actor_cancelled) {
throw e;
} else if (e.code() == error_code_timed_out) {
reason = "Operation retried too many times";
} else if (e.code() == error_code_consistency_check_urgent_task_failed) {
reason = "Retry failed ranges for too many times";
} else {
reason = "Unexpected failure";
sev = SevWarnAlways;
}
TraceEvent(sev, "ConsistencyCheckUrgent_TesterExit")
.error(e)
.detail("Reason", reason)
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId);
throw consistency_check_urgent_task_failed();
}
return Void();
}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
TraceEvent("ConsistencyCheckUrgent_EnterWorkload").detail("ConsistencyCheckerId", consistencyCheckerId);
return _start(cx, this);
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<ConsistencyCheckUrgentWorkload> ConsistencyCheckUrgentWorkloadFactory;

View File

@ -106,6 +106,7 @@ ERROR( storage_engine_not_initialized, 1081, "Storage engine was never successfu
ERROR( unknown_storage_engine, 1082, "Storage engine type is not recognized." )
ERROR( duplicate_snapshot_request, 1083, "A duplicate snapshot request has been sent, the old request is discarded.")
ERROR( dd_config_changed, 1084, "DataDistribution configuration changed." )
ERROR( consistency_check_urgent_task_failed, 1085, "Consistency check urgent task is failed")
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )