checkForExtraDataStores:Add coordinators into stateful process list

This commit is contained in:
Meng Xu 2020-03-10 23:38:30 -07:00
parent bd345f85db
commit e0d2eca7a8
2 changed files with 9 additions and 2 deletions

View File

@ -67,6 +67,7 @@ struct WorkerInterface {
UID id() const { return tLog.getEndpoint().token; }
NetworkAddress address() const { return tLog.getEndpoint().getPrimaryAddress(); }
Optional<NetworkAddress> secondaryAddress() const { return tLog.getEndpoint().addresses.secondaryAddress; }
WorkerInterface() {}
WorkerInterface( const LocalityData& locality ) : locality( locality ) {}

View File

@ -1172,6 +1172,7 @@ struct ConsistencyCheckWorkload : TestWorkload
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
auto& db = self->dbInfo->get();
state std::vector<TLogInterface> logs = db.logSystemConfig.allPresentLogs();
state std::vector<WorkerInterface> coordWorkers = wait(getCoordWorkers(cx, self->dbInfo));
state std::vector<WorkerDetails>::iterator itr;
state bool foundExtraDataStore = false;
@ -1191,8 +1192,13 @@ struct ConsistencyCheckWorkload : TestWorkload
statefulProcesses[log.secondaryAddress().get()].insert(log.id());
}
}
// TODO: Add coordinators into stateful processes
// Why don't we add coordinator address into statefulProcesses?
// Coordinators are also stateful processes
for (const auto& cWorker: coordWorkers) {
statefulProcesses[cWorker.address()].insert(cWorker.id());
if (cWorker.secondaryAddress().present()) {
statefulProcesses[cWorker.secondaryAddress().get()].insert(cWorker.id());
}
}
for(itr = workers.begin(); itr != workers.end(); ++itr) {
ErrorOr<Standalone<VectorRef<UID>>> stores = wait(itr->interf.diskStoreRequest.getReplyUnlessFailedFor(DiskStoreRequest(false), 2, 0));