fix: we were recruiting one too few oldLogRouters

code cleanup
This commit is contained in:
Evan Tschannen 2020-01-02 15:05:44 -08:00
parent 3c30215662
commit 3eae401886
1 changed files with 30 additions and 82 deletions

View File

@ -112,6 +112,12 @@ public:
{
}
void addRequiredAddresses(const std::vector<WorkerInterface>& interfaces) {
for(auto& it : interfaces) {
requiredAddresses.insert(it.address());
}
}
void setDistributor(const DataDistributorInterface& interf) {
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
@ -712,22 +718,14 @@ public:
result.proxies.push_back(proxies[i].interf);
if(req.maxOldLogRouters > 0) {
auto oldLogRouters = tlogs;
if(oldLogRouters.size() == 1) {
result.oldLogRouters.push_back(oldLogRouters[0].interf);
if(tlogs.size() == 1) {
result.oldLogRouters.push_back(tlogs[0].interf);
} else {
bool foundCC = false;
for(int i = 0; i < oldLogRouters.size() - 1; i++) {
if(oldLogRouters[i].interf.locality.processId() != clusterControllerProcessId) {
result.oldLogRouters.push_back(oldLogRouters[i].interf);
} else {
foundCC = true;
for(int i = 0; i < tlogs.size(); i++) {
if(tlogs[i].interf.locality.processId() != clusterControllerProcessId) {
result.oldLogRouters.push_back(tlogs[i].interf);
}
}
if(foundCC) {
result.oldLogRouters.push_back(oldLogRouters.back().interf);
}
}
}
@ -811,22 +809,14 @@ public:
}
if(req.maxOldLogRouters > 0) {
auto oldLogRouters = tlogs;
if(oldLogRouters.size() == 1) {
result.oldLogRouters.push_back(oldLogRouters[0].interf);
if(tlogs.size() == 1) {
result.oldLogRouters.push_back(tlogs[0].interf);
} else {
bool foundCC = false;
for(int i = 0; i < oldLogRouters.size() - 1; i++) {
if(oldLogRouters[i].interf.locality.processId() != clusterControllerProcessId) {
result.oldLogRouters.push_back(oldLogRouters[i].interf);
} else {
foundCC = true;
for(int i = 0; i < tlogs.size(); i++) {
if(tlogs[i].interf.locality.processId() != clusterControllerProcessId) {
result.oldLogRouters.push_back(tlogs[i].interf);
}
}
if(foundCC) {
result.oldLogRouters.push_back(oldLogRouters.back().interf);
}
}
}
@ -1454,26 +1444,11 @@ void checkOutstandingRecruitmentRequests( ClusterControllerData* self ) {
RecruitFromConfigurationRequest& req = self->outstandingRecruitmentRequests[i];
try {
RecruitFromConfigurationReply rep = self->findWorkersForConfiguration( req );
for(auto& it : rep.oldLogRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.proxies) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.resolvers) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.satelliteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.tLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.addRequiredAddresses(rep.oldLogRouters);
self->db.addRequiredAddresses(rep.proxies);
self->db.addRequiredAddresses(rep.resolvers);
self->db.addRequiredAddresses(rep.satelliteTLogs);
self->db.addRequiredAddresses(rep.tLogs);
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRecruitmentRequests, i-- );
@ -1493,14 +1468,8 @@ void checkOutstandingRemoteRecruitmentRequests( ClusterControllerData* self ) {
RecruitRemoteFromConfigurationRequest& req = self->outstandingRemoteRecruitmentRequests[i];
try {
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
for(auto& it : rep.remoteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.logRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.addRequiredAddresses(rep.remoteTLogs);
self->db.addRequiredAddresses(rep.logRouters);
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRemoteRecruitmentRequests, i-- );
@ -1889,26 +1858,11 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
loop {
try {
auto rep = self->findWorkersForConfiguration( req );
for(auto& it : rep.oldLogRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.proxies) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.resolvers) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.satelliteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.tLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.addRequiredAddresses(rep.oldLogRouters);
self->db.addRequiredAddresses(rep.proxies);
self->db.addRequiredAddresses(rep.resolvers);
self->db.addRequiredAddresses(rep.satelliteTLogs);
self->db.addRequiredAddresses(rep.tLogs);
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();
@ -1935,14 +1889,8 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
loop {
try {
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
for(auto& it : rep.remoteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.logRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.addRequiredAddresses(rep.remoteTLogs);
self->db.addRequiredAddresses(rep.logRouters);
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();