Add used worker IDs to cluster controller

This "usedIds" is updated when receiving a master registration message, so that
when recruiting new data distributor, existing assignment is known.
This commit is contained in:
Jingyu Zhou 2019-01-28 09:25:15 -08:00 committed by Jingyu Zhou
parent 6a655143e8
commit 39e4a59154
3 changed files with 59 additions and 12 deletions

View File

@ -27,6 +27,7 @@
#include "fdbserver/MoveKeys.h"
#include "fdbserver/WorkerInterface.h"
#include "fdbserver/LeaderElection.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/ServerDBInfo.h"
@ -999,8 +1000,48 @@ public:
return false;
}
void updateUsedIds(RegisterMasterRequest const& req) {
auto dbInfo = db.serverInfo->get();
std::map<Optional<Standalone<StringRef>>, int> idUsed;
idUsed[clusterControllerProcessId]++;
idUsed[masterProcessId]++;
const auto& distributorInterf = dbInfo.distributor;
if (distributorInterf.isValid()) {
idUsed[distributorInterf.locality.processId()]++;
}
for (const auto& tlog : req.logSystemConfig.tLogs) {
for (const auto& locality: tlog.tLogLocalities) {
if (locality.processId().present()) {
idUsed[locality.processId()]++;
} else {
TraceEvent("UsedID").detail("Tlog", locality.toString());
}
}
}
for (const MasterProxyInterface& interf : req.proxies) {
ASSERT(interf.locality.processId().present());
idUsed[interf.locality.processId()]++;
}
for (const ResolverInterface& interf: req.resolvers) {
ASSERT(interf.locality.processId().present());
idUsed[interf.locality.processId()]++;
}
usedIds.set( idUsed );
}
void traceUsedIds() {
auto idUsed = usedIds.get();
for (const auto& it : idUsed) {
TraceEvent ev("UsedID");
if (it.first.present()) ev.detail("Key", it.first.get().contents().toString());
ev.detail("Value", idUsed[it.first]);
ev.detail("Address", id_worker[it.first].interf.address().toString());
}
}
std::map< Optional<Standalone<StringRef>>, WorkerInfo > id_worker;
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
AsyncVar<std::map< Optional<Standalone<StringRef>>, int>> usedIds; // current used process IDs reported by master
Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses;
bool gotFullyRecoveredConfig;
@ -1658,6 +1699,9 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
self->db.serverInfo->set( dbInfo );
}
self->updateUsedIds(req);
self->traceUsedIds();
checkOutstandingRequests(self);
}
@ -2232,20 +2276,22 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
while (true) {
std::map<Optional<Standalone<StringRef>>, int> id_used;
id_used[self->clusterControllerProcessId]++;
id_used[self->masterProcessId]++;
if ( self->usedIds.get().size() == 0 ) {
wait( self->usedIds.onChange() );
}
self->traceUsedIds();
std::map<Optional<Standalone<StringRef>>, int> id_used = self->usedIds.get();
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeDataDistributorRequest req;
req.reqId = g_random->randomUniqueID();
TraceEvent("DataDistributor", req.reqId).detail("Recruit", data_distributor.worker.first.address());
TraceEvent("DataDistributorReqID", req.reqId).detail("Recruit", data_distributor.worker.first.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, 1, 0) );
if (distributor.present()) {
TraceEvent("DataDistributor", req.reqId).detail("Recruited", data_distributor.worker.first.address());
TraceEvent("DataDistributorReqID", req.reqId).detail("Recruited", data_distributor.worker.first.address());
return distributor.get();
}
TraceEvent("DataDistributor", req.reqId)
TraceEvent("DataDistributorReqID", req.reqId)
.detail("RecruitFailed", data_distributor.worker.first.address())
.error(distributor.getError());
}

View File

@ -22,14 +22,16 @@
#define FOUNDATIONDB_DATADISTRIBUTORINTERFACE_H
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
struct DataDistributorInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct GetRateInfoRequest> getRateInfo;
struct LocalityData locality;
bool valid;
DataDistributorInterface() : valid(false) {}
explicit DataDistributorInterface(bool v) : valid(v) {}
explicit DataDistributorInterface(struct LocalityData l) : locality(l), valid(true) {}
bool isValid() const { return valid; }
UID id() const { return getRateInfo.getEndpoint().token; }
@ -43,7 +45,7 @@ struct DataDistributorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, getRateInfo, valid);
serializer(ar, waitFailure, getRateInfo, locality, valid);
}
};

View File

@ -715,9 +715,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
req.reply.send(recruited);
}
when ( InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture()) ) {
DataDistributorInterface recruited(true);
TraceEvent("DataDistributorReceived", req.reqId).detail("Addr", interf.address())
.detail("DataDistributorId", recruited.id());
DataDistributorInterface recruited(locality);
TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id());
startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() );
Future<Void> dataDistributorProcess = dataDistributor( recruited, dbInfo );