getWorkers from IDDTxnProcessor

This commit is contained in:
Xiaoxi Wang 2022-10-04 14:57:04 -07:00
parent 28e170ca69
commit 21b2e11bc4
3 changed files with 18 additions and 5 deletions

View File

@ -60,7 +60,7 @@ class DDTeamCollectionImpl {
// Because worker's processId can be changed when its locality is changed, we cannot watch on the old
// processId; This actor is inactive most time, so iterating all workers incurs little performance
// overhead.
state std::vector<ProcessData> workers = wait(getWorkers(self->cx));
state std::vector<ProcessData> workers = wait(self->db->getWorkers());
state std::set<AddressExclusion> existingAddrs;
for (int i = 0; i < workers.size(); i++) {
const ProcessData& workerData = workers[i];
@ -1608,9 +1608,7 @@ public:
return Void(); // Don't ignore failures
}
ACTOR static Future<Void> waitForAllDataRemoved(DDTeamCollection const* teams,
UID serverID,
Version addedVersion) {
ACTOR static Future<Void> waitForAllDataRemoved(DDTeamCollection const* teams, UID serverID, Version addedVersion) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(teams->cx);
loop {
try {
@ -1844,7 +1842,7 @@ public:
state Future<RangeResult> flocalitiesExclude =
tr.getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
state Future<RangeResult> flocalitiesFailed = tr.getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
state Future<std::vector<ProcessData>> fworkers = getWorkers(self->cx);
state Future<std::vector<ProcessData>> fworkers = self->db->getWorkers();
wait(success(fresultsExclude) && success(fresultsFailed) && success(flocalitiesExclude) &&
success(flocalitiesFailed));

View File

@ -616,6 +616,10 @@ Future<Void> DDTxnProcessor::waitDDTeamInfoPrintSignal() const {
return DDTxnProcessorImpl::waitDDTeamInfoPrintSignal(cx);
}
Future<std::vector<ProcessData>> DDTxnProcessor::getWorkers() const {
return ::getWorkers(cx);
}
Future<ServerWorkerInfos> DDMockTxnProcessor::getServerListAndProcessClasses() {
ServerWorkerInfos res;
for (auto& [_, mss] : mgs->allServers) {
@ -781,3 +785,8 @@ Future<std::pair<Optional<StorageMetrics>, int>> DDMockTxnProcessor::waitStorage
int expectedShardCount) const {
return Future<std::pair<Optional<StorageMetrics>, int>>();
}
// FIXME: finish implementation
Future<std::vector<ProcessData>> DDMockTxnProcessor::getWorkers() const {
return Future<std::vector<ProcessData>>();
}

View File

@ -122,6 +122,8 @@ public:
virtual Future<UID> getClusterId() const { return {}; }
virtual Future<Void> waitDDTeamInfoPrintSignal() const { return Never(); }
virtual Future<std::vector<ProcessData>> getWorkers() const = 0;
};
class DDTxnProcessorImpl;
@ -202,6 +204,8 @@ public:
Future<UID> getClusterId() const override;
Future<Void> waitDDTeamInfoPrintSignal() const override;
Future<std::vector<ProcessData>> getWorkers() const override;
};
// A mock transaction implementation for test usage.
@ -259,6 +263,8 @@ public:
}
Future<HealthMetrics> getHealthMetrics(bool detailed = false) const override;
Future<std::vector<ProcessData>> getWorkers() const override;
};
#endif // FOUNDATIONDB_DDTXNPROCESSOR_H