Get sources servers by DC in DDAuditRange.
This commit is contained in:
parent
e6846e1ed5
commit
e6df139204
|
@ -73,12 +73,13 @@ struct AuditStorageRequest {
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, id, range, type, reply);
|
serializer(ar, id, range, type, targetServers, reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
UID id;
|
UID id;
|
||||||
KeyRange range;
|
KeyRange range;
|
||||||
uint8_t type;
|
uint8_t type;
|
||||||
|
std::vector<UID> targetServers;
|
||||||
ReplyPromise<AuditStorageState> reply;
|
ReplyPromise<AuditStorageState> reply;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ class DDTxnProcessorImpl {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
|
const StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
|
||||||
current.servers.push_back(ssi);
|
current.servers[ssi.locality.describeDcId()].push_back(ssi);
|
||||||
}
|
}
|
||||||
res.push_back(current);
|
res.push_back(current);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1344,7 +1344,7 @@ ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
|
||||||
UID auditId,
|
UID auditId,
|
||||||
KeyRange range,
|
KeyRange range,
|
||||||
AuditType type) {
|
AuditType type) {
|
||||||
TraceEvent(SevDebug, "DDScheduleAuditBegin", auditId).detail("Range", range).detail("AuditType", type);
|
TraceEvent(SevDebug, "DDScheduleAuditForRangeBegin", auditId).detail("Range", range).detail("AuditType", type);
|
||||||
// TODO(heliu): Load the audit map for `range`.
|
// TODO(heliu): Load the audit map for `range`.
|
||||||
state Key begin = range.begin;
|
state Key begin = range.begin;
|
||||||
state KeyRange currentRange = range;
|
state KeyRange currentRange = range;
|
||||||
|
@ -1370,9 +1370,18 @@ ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
|
||||||
|
|
||||||
state int i = 0;
|
state int i = 0;
|
||||||
for (i = 0; i < rangeLocations.size(); ++i) {
|
for (i = 0; i < rangeLocations.size(); ++i) {
|
||||||
const AuditStorageRequest req(auditId, rangeLocations[i].range, type);
|
AuditStorageRequest req(auditId, rangeLocations[i].range, type);
|
||||||
const int idx = deterministicRandom()->randomInt(0, rangeLocations[i].servers.size());
|
if (type == AuditType::ValidateHA && rangeLocations[i].servers.size() >= 2) {
|
||||||
actors->add(doAuditStorage(self, actors, auditMap, rangeLocations[i].servers[idx], req));
|
auto it = rangeLocations[i].servers.begin();
|
||||||
|
const int idx = deterministicRandom()->randomInt(0, it->second.size());
|
||||||
|
StorageServerInterface& targetServer = it->second[idx];
|
||||||
|
++it;
|
||||||
|
for (; it != rangeLocations[i].servers.end(); ++it) {
|
||||||
|
const int idx = deterministicRandom()->randomInt(0, it->second.size());
|
||||||
|
req.targetServers.push_back(it->second[idx].id());
|
||||||
|
}
|
||||||
|
actors->add(doAuditStorage(self, actors, auditMap, targetServer, req));
|
||||||
|
}
|
||||||
begin = rangeLocations[i].range.end;
|
begin = rangeLocations[i].range.end;
|
||||||
wait(delay(0.01));
|
wait(delay(0.01));
|
||||||
}
|
}
|
||||||
|
@ -1395,7 +1404,8 @@ ACTOR Future<Void> doAuditStorage(Reference<DataDistributor> self,
|
||||||
TraceEvent(SevDebug, "DDAuditStorageBegin", req.id)
|
TraceEvent(SevDebug, "DDAuditStorageBegin", req.id)
|
||||||
.detail("Range", req.range)
|
.detail("Range", req.range)
|
||||||
.detail("AuditType", req.type)
|
.detail("AuditType", req.type)
|
||||||
.detail("StorageServer", ssi.toString());
|
.detail("StorageServer", ssi.toString())
|
||||||
|
.detail("TargetServers", describe(req.targetServers));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
auditMap->insert(req.range, AuditPhase::Running);
|
auditMap->insert(req.range, AuditPhase::Running);
|
||||||
|
|
|
@ -41,7 +41,8 @@ public:
|
||||||
StorageServersForRange() = default;
|
StorageServersForRange() = default;
|
||||||
StorageServersForRange(KeyRangeRef range) : range(range) {}
|
StorageServersForRange(KeyRangeRef range) : range(range) {}
|
||||||
|
|
||||||
std::vector<StorageServerInterface> servers;
|
// A map of dcId : list of servers
|
||||||
|
std::map<std::string, std::vector<StorageServerInterface>> servers;
|
||||||
KeyRange range;
|
KeyRange range;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue