FastRestore:LeaderElection:Log election progress
This commit is contained in:
parent
ff92401ed5
commit
8ef56e5cae
|
@ -258,12 +258,15 @@ ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> lea
|
||||||
state ReadYourWritesTransaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
// state Future<Void> leaderWatch;
|
// state Future<Void> leaderWatch;
|
||||||
state RestoreWorkerInterface leaderInterf;
|
state RestoreWorkerInterface leaderInterf;
|
||||||
|
state int count = 0;
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
|
count++;
|
||||||
tr.reset();
|
tr.reset();
|
||||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
Optional<Value> leaderValue = wait(tr.get(restoreLeaderKey));
|
Optional<Value> leaderValue = wait(tr.get(restoreLeaderKey));
|
||||||
|
TraceEvent(SevDebug, "FastRestoreLeaderElection").detail("Round", count).detail("LeaderExisted", leaderValue.present());
|
||||||
if (leaderValue.present()) {
|
if (leaderValue.present()) {
|
||||||
leaderInterf = BinaryReader::fromStringRef<RestoreWorkerInterface>(leaderValue.get(), IncludeVersion());
|
leaderInterf = BinaryReader::fromStringRef<RestoreWorkerInterface>(leaderValue.get(), IncludeVersion());
|
||||||
// Register my interface as an worker if I am not the leader
|
// Register my interface as an worker if I am not the leader
|
||||||
|
@ -322,7 +325,12 @@ ACTOR Future<Void> _restoreWorker(Database cx, LocalityData locality) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityData locality) {
|
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityData locality) {
|
||||||
|
try {
|
||||||
Database cx = Database::createDatabase(ccf->getFilename(), Database::API_VERSION_LATEST, true, locality);
|
Database cx = Database::createDatabase(ccf->getFilename(), Database::API_VERSION_LATEST, true, locality);
|
||||||
wait(_restoreWorker(cx, locality));
|
wait(_restoreWorker(cx, locality));
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent("FastRestoreRestoreWorker").detail("Error", e.what());
|
||||||
|
}
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1872,6 +1872,9 @@ int main(int argc, char* argv[]) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
f = stopAfter(restoreWorker(opts.connectionFile, opts.localities));
|
f = stopAfter(restoreWorker(opts.connectionFile, opts.localities));
|
||||||
|
printf("Fast restore worker exits\n");
|
||||||
|
g_network->run();
|
||||||
|
printf("g_network->run() done\n");
|
||||||
} else { // Call fdbd roles in conventional way
|
} else { // Call fdbd roles in conventional way
|
||||||
ASSERT(opts.connectionFile);
|
ASSERT(opts.connectionFile);
|
||||||
|
|
||||||
|
@ -1888,9 +1891,8 @@ int main(int argc, char* argv[]) {
|
||||||
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
|
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
|
||||||
|
|
||||||
f = stopAfter(waitForAll(actors));
|
f = stopAfter(waitForAll(actors));
|
||||||
}
|
|
||||||
|
|
||||||
g_network->run();
|
g_network->run();
|
||||||
|
}
|
||||||
} else if (role == MultiTester) {
|
} else if (role == MultiTester) {
|
||||||
f = stopAfter(runTests(opts.connectionFile, TEST_TYPE_FROM_FILE,
|
f = stopAfter(runTests(opts.connectionFile, TEST_TYPE_FROM_FILE,
|
||||||
opts.testOnServers ? TEST_ON_SERVERS : TEST_ON_TESTERS, opts.minTesterCount,
|
opts.testOnServers ? TEST_ON_SERVERS : TEST_ON_TESTERS, opts.minTesterCount,
|
||||||
|
|
Loading…
Reference in New Issue