Final cleanup per review comments

Make distributor interface optional in ServerDBInfo and many other small
changes.
This commit is contained in:
Jingyu Zhou 2019-02-12 15:50:44 -08:00
parent bf6da81bf9
commit 5e6577cc82
8 changed files with 85 additions and 129 deletions

View File

@ -116,6 +116,13 @@ public:
newInfo.distributor = distributorInterf; newInfo.distributor = distributorInterf;
serverInfo->set( newInfo ); serverInfo->set( newInfo );
} }
void clearDistributor() {
ServerDBInfo newInfo = serverInfo->get();
newInfo.id = g_random->randomUniqueID();
newInfo.distributor = Optional<DataDistributorInterface>();
serverInfo->set( newInfo );
}
}; };
struct UpdateWorkerList { struct UpdateWorkerList {
@ -511,15 +518,19 @@ public:
return result; return result;
} }
void updateKnownIds(std::map< Optional<Standalone<StringRef>>, int>* id_used) {
(*id_used)[masterProcessId]++;
(*id_used)[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.present()) {
(*id_used)[db.serverInfo->get().distributor.get().locality.processId()]++;
}
}
RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) { RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) {
RecruitRemoteFromConfigurationReply result; RecruitRemoteFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used; std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++; updateKnownIds(&id_used);
id_used[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.isValid()) {
id_used[db.serverInfo->get().distributor.locality.processId()]++;
}
std::set<Optional<Key>> remoteDC; std::set<Optional<Key>> remoteDC;
remoteDC.insert(req.dcId); remoteDC.insert(req.dcId);
@ -557,11 +568,7 @@ public:
ErrorOr<RecruitFromConfigurationReply> findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional<Key> dcId ) { ErrorOr<RecruitFromConfigurationReply> findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional<Key> dcId ) {
RecruitFromConfigurationReply result; RecruitFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used; std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++; updateKnownIds(&id_used);
id_used[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.isValid()) {
id_used[db.serverInfo->get().distributor.locality.processId()]++;
}
ASSERT(dcId.present()); ASSERT(dcId.present());
@ -689,11 +696,7 @@ public:
} else { } else {
RecruitFromConfigurationReply result; RecruitFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used; std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++; updateKnownIds(&id_used);
id_used[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.isValid()) {
id_used[db.serverInfo->get().distributor.locality.processId()]++;
}
auto tlogs = getWorkersForTlogs( req.configuration, req.configuration.tLogReplicationFactor, req.configuration.getDesiredLogs(), req.configuration.tLogPolicy, id_used ); auto tlogs = getWorkersForTlogs( req.configuration, req.configuration.tLogReplicationFactor, req.configuration.getDesiredLogs(), req.configuration.tLogPolicy, id_used );
for(int i = 0; i < tlogs.size(); i++) { for(int i = 0; i < tlogs.size(); i++) {
@ -916,8 +919,8 @@ public:
std::map< Optional<Standalone<StringRef>>, int> id_used; std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++; id_used[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.isValid()) { if (db.serverInfo->get().distributor.present()) {
id_used[db.serverInfo->get().distributor.locality.processId()]++; id_used[db.serverInfo->get().distributor.get().locality.processId()]++;
} }
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true); WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
@ -1012,49 +1015,31 @@ public:
return false; return false;
} }
void updateUsedIds(RegisterMasterRequest const& req) { std::map< Optional<Standalone<StringRef>>, int> getUsedIds() {
auto dbInfo = db.serverInfo->get();
std::map<Optional<Standalone<StringRef>>, int> idUsed; std::map<Optional<Standalone<StringRef>>, int> idUsed;
idUsed[clusterControllerProcessId]++; updateKnownIds(&idUsed);
idUsed[masterProcessId]++;
if (dbInfo.distributor.isValid()) { auto dbInfo = db.serverInfo->get();
idUsed[dbInfo.distributor.locality.processId()]++; for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
}
for (const auto& tlogset : req.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) { for (const auto& tlog: tlogset.tLogs) {
if (tlog.present()) { if (tlog.present()) {
idUsed[tlog.interf().locality.processId()]++; idUsed[tlog.interf().locality.processId()]++;
} }
} }
} }
for (const MasterProxyInterface& interf : req.proxies) { for (const MasterProxyInterface& interf : dbInfo.client.proxies) {
ASSERT(interf.locality.processId().present()); ASSERT(interf.locality.processId().present());
idUsed[interf.locality.processId()]++; idUsed[interf.locality.processId()]++;
} }
for (const ResolverInterface& interf: req.resolvers) { for (const ResolverInterface& interf: dbInfo.resolvers) {
ASSERT(interf.locality.processId().present()); ASSERT(interf.locality.processId().present());
idUsed[interf.locality.processId()]++; idUsed[interf.locality.processId()]++;
} }
usedIds.swap( idUsed ); return idUsed;
}
void traceUsedIds() {
for (const auto& it : usedIds) {
TraceEvent ev("UsedID");
if (it.first.present()) ev.detail("Key", it.first.get().contents().toString());
ev.detail("Value", usedIds[it.first]);
if (id_worker.find(it.first) != id_worker.end()) {
ev.detail("Locality", id_worker[it.first].interf.locality.toString());
ev.detail("Addr", id_worker[it.first].interf.address().toString());
} else {
ev.detail("Locality", "Not found!");
}
}
} }
std::map< Optional<Standalone<StringRef>>, WorkerInfo > id_worker; std::map< Optional<Standalone<StringRef>>, WorkerInfo > id_worker;
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
std::map< Optional<Standalone<StringRef>>, int> usedIds; // current used process IDs reported by master
Standalone<RangeResultRef> lastProcessClasses; Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses; bool gotProcessClasses;
bool gotFullyRecoveredConfig; bool gotFullyRecoveredConfig;
@ -1119,8 +1104,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
//This should always be possible, because we can recruit the master on the same process as the cluster controller. //This should always be possible, because we can recruit the master on the same process as the cluster controller.
std::map< Optional<Standalone<StringRef>>, int> id_used; std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[cluster->clusterControllerProcessId]++; id_used[cluster->clusterControllerProcessId]++;
if (cluster->db.serverInfo->get().distributor.isValid()) { if (cluster->db.serverInfo->get().distributor.present()) {
id_used[cluster->db.serverInfo->get().distributor.locality.processId()]++; id_used[cluster->db.serverInfo->get().distributor.get().locality.processId()]++;
} }
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used); state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId ) if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId )
@ -1396,7 +1381,6 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.priorityInfo) ); failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.priorityInfo) );
} }
cluster->id_worker.erase( worker.locality.processId() ); cluster->id_worker.erase( worker.locality.processId() );
cluster->usedIds.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() ); cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
return Void(); return Void();
} }
@ -1703,11 +1687,6 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
dbInfo.recoveryCount = req.recoveryCount; dbInfo.recoveryCount = req.recoveryCount;
} }
// The master may tell us processes that we are not aware of. Thus, when
// using usedIds, proceed with caution as id_workers may not have the process.
self->updateUsedIds(req);
self->traceUsedIds();
if( isChanged ) { if( isChanged ) {
dbInfo.id = g_random->randomUniqueID(); dbInfo.id = g_random->randomUniqueID();
self->db.serverInfo->set( dbInfo ); self->db.serverInfo->set( dbInfo );
@ -1770,9 +1749,9 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
} }
} }
if ( req.distributorInterf.present() && !self->db.serverInfo->get().distributor.isValid() ) { if ( req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() ) {
const DataDistributorInterface& di = req.distributorInterf.get(); const DataDistributorInterface& di = req.distributorInterf.get();
TraceEvent("ClusterController").detail("RegisterDataDistributor", di.id()).detail("Valid", di.isValid()); TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id());
self->db.setDistributor( di ); self->db.setDistributor( di );
} }
if( info == self->id_worker.end() ) { if( info == self->id_worker.end() ) {
@ -2297,20 +2276,20 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
wait( self->db.serverInfo->onChange() ); wait( self->db.serverInfo->onChange() );
} }
std::map<Optional<Standalone<StringRef>>, int> id_used = self->usedIds; std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used); state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeDataDistributorRequest req; state InitializeDataDistributorRequest req;
req.reqId = g_random->randomUniqueID(); req.reqId = g_random->randomUniqueID();
TraceEvent("ClusterController_DataDistributorReqID", req.reqId).detail("Recruit", data_distributor.worker.first.address()); TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, 1, 0) ); ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
if (distributor.present()) { if (distributor.present()) {
TraceEvent("ClusterController_DataDistributorReqID", req.reqId).detail("Recruited", data_distributor.worker.first.address()); TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.first.address());
return distributor.get(); return distributor.get();
} }
} }
catch (Error& e) { catch (Error& e) {
TraceEvent("ClusterController_DataDistributorReqID", req.reqId).error(e); TraceEvent("ClusterController_DataDistributorRecruitError", req.reqId).error(e);
if ( e.code() != error_code_no_more_servers ) { if ( e.code() != error_code_no_more_servers ) {
throw; throw;
} }
@ -2320,44 +2299,29 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
} }
ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterControllerFullInterface *clusterInterface ) { ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterControllerFullInterface *clusterInterface ) {
state Future<DataDistributorInterface> newDistributor = Never(); state Future<Void> initialDelay = delay(SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY);
// wait for a while to see if existing data distributor will join. // wait for a while to see if existing data distributor will join.
loop choose { loop choose {
when ( wait( delay(SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY) ) ) { break; } when ( wait(initialDelay) ) { break; }
when ( wait(self->db.serverInfo->onChange()) ) { // Rejoins via worker registration when ( wait(self->db.serverInfo->onChange()) ) { // Rejoins via worker registration
if ( self->db.serverInfo->get().distributor.isValid() ) { if ( self->db.serverInfo->get().distributor.present() ) {
TraceEvent("ClusterController_InfoChange", self->id).detail("DataDistributorID", self->db.serverInfo->get().distributor.id()); TraceEvent("ClusterController_InfoChange", self->id)
.detail("DataDistributorID", self->db.serverInfo->get().distributor.get().id());
break; break;
} }
} }
} }
if ( !self->db.serverInfo->get().distributor.isValid() ) {
newDistributor = startDataDistributor( self );
}
// Wait on failures and restart it.
loop { loop {
state Future<Void> distributorFailed = Never(); if ( self->db.serverInfo->get().distributor.present() ) {
if ( self->db.serverInfo->get().distributor.isValid() ) { wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
distributorFailed = waitFailureClient( self->db.serverInfo->get().distributor.waitFailure, SERVER_KNOBS->DD_FAILURE_TIME );
}
choose {
when ( DataDistributorInterface distributorInterf = wait( newDistributor ) ) {
TraceEvent("ClusterController", self->id) TraceEvent("ClusterController", self->id)
.detail("DataDistributorID", distributorInterf.id()) .detail("DataDistributorDied", self->db.serverInfo->get().distributor.get().id());
.detail("Valid", distributorInterf.isValid()); self->db.clearDistributor();
} else {
DataDistributorInterface distributorInterf = wait( startDataDistributor(self) );
self->db.setDistributor( distributorInterf ); self->db.setDistributor( distributorInterf );
newDistributor = Never();
}
when ( wait( distributorFailed ) ) {
TraceEvent("ClusterController", self->id)
.detail("DataDistributorDied", self->db.serverInfo->get().distributor.id());
self->db.setDistributor( DataDistributorInterface() );
newDistributor = startDataDistributor( self );
}
} }
} }
} }

View File

@ -2422,7 +2422,6 @@ ACTOR Future<Void> storageServerFailureTracker(
Version addedVersion ) Version addedVersion )
{ {
state StorageServerInterface interf = server->lastKnownInterface; state StorageServerInterface interf = server->lastKnownInterface;
state bool doBuildTeam = false;
loop { loop {
if( self->server_status.get(interf.id()).initialized ) { if( self->server_status.get(interf.id()).initialized ) {
bool unhealthy = self->server_status.get(interf.id()).isUnhealthy(); bool unhealthy = self->server_status.get(interf.id()).isUnhealthy();
@ -2437,10 +2436,6 @@ ACTOR Future<Void> storageServerFailureTracker(
} }
self->server_status.set( interf.id(), *status ); self->server_status.set( interf.id(), *status );
if (doBuildTeam) {
doBuildTeam = false;
self->doBuildTeams = true;
}
if( status->isFailed ) if( status->isFailed )
self->restartRecruiting.trigger(); self->restartRecruiting.trigger();
@ -2455,8 +2450,8 @@ ACTOR Future<Void> storageServerFailureTracker(
wait(delay(SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME - elapsed)); wait(delay(SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME - elapsed));
} }
status->isFailed = !status->isFailed; status->isFailed = !status->isFailed;
if( !status->isFailed && (!server->teams.size() || self->zeroHealthyTeams->get()) ) { if(!status->isFailed && !server->teams.size()) {
doBuildTeam = true; self->doBuildTeams = true;
} }
TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", interf.id()).detail("Status", status->toString()) TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", interf.id()).detail("Status", status->toString())
@ -2572,9 +2567,7 @@ ACTOR Future<Void> storageServerTracker(
if(hasWrongStoreTypeOrDC) if(hasWrongStoreTypeOrDC)
self->restartRecruiting.trigger(); self->restartRecruiting.trigger();
TraceEvent("StatusMapChange", self->distributorId).detail("Status", status.toString()) if ( lastIsUnhealthy && !status.isUnhealthy() && !server->teams.size() ) {
.detail("Server", server->id).detail("LastIsUnhealthy", lastIsUnhealthy);
if ( lastIsUnhealthy && !status.isUnhealthy() && (!server->teams.size() || self->zeroHealthyTeams->get()) ) {
self->doBuildTeams = true; self->doBuildTeams = true;
} }
lastIsUnhealthy = status.isUnhealthy(); lastIsUnhealthy = status.isUnhealthy();
@ -2921,14 +2914,12 @@ ACTOR Future<Void> remoteRecovered( Reference<AsyncVar<struct ServerDBInfo>> db
} }
ACTOR Future<Void> monitorHealthyTeams( DDTeamCollection* self ) { ACTOR Future<Void> monitorHealthyTeams( DDTeamCollection* self ) {
state Future<Void> checkHealth;
loop choose { loop choose {
when ( wait( delay( SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY ) ) ) { when ( wait(self->zeroHealthyTeams->get() ? delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY) : Never()) ) {
if ( self->healthyTeamCount == 0 ) {
self->doBuildTeams = true; self->doBuildTeams = true;
checkHealth = DDTeamCollection::checkBuildTeams(self); wait( DDTeamCollection::checkBuildTeams(self) );
}
} }
when ( wait(self->zeroHealthyTeams->onChange()) ) {}
} }
} }
@ -3289,7 +3280,6 @@ struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData>
Reference<AsyncVar<DatabaseConfiguration>> configuration; Reference<AsyncVar<DatabaseConfiguration>> configuration;
std::vector<Optional<Key>> primaryDcId; std::vector<Optional<Key>> primaryDcId;
std::vector<Optional<Key>> remoteDcIds; std::vector<Optional<Key>> remoteDcIds;
AsyncTrigger configurationTrigger;
UID ddId; UID ddId;
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges; PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
PromiseStream<Future<Void>> addActor; PromiseStream<Future<Void>> addActor;
@ -3332,7 +3322,6 @@ ACTOR Future<Void> configurationMonitor( Reference<DataDistributorData> self ) {
if ( conf != self->configuration->get() ) { if ( conf != self->configuration->get() ) {
TraceEvent("DataDistributor_UpdateConfiguration", self->ddId).detail("Config", conf.toString()); TraceEvent("DataDistributor_UpdateConfiguration", self->ddId).detail("Config", conf.toString());
self->configuration->set( conf ); self->configuration->set( conf );
self->configurationTrigger.trigger();
} }
state Future<Void> watchFuture = tr.watch(configVersionKey); state Future<Void> watchFuture = tr.watch(configVersionKey);
@ -3380,7 +3369,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
self->addActor.send( configurationMonitor( self ) ); self->addActor.send( configurationMonitor( self ) );
loop choose { loop choose {
when ( wait( self->configuration.onChange() ) ) { when ( wait( self->configuration->onChange() ) ) {
self->refreshDcIds(); self->refreshDcIds();
break; break;
} }
@ -3392,22 +3381,21 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges; state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
state double lastLimited = 0; state double lastLimited = 0;
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
self->addActor.send( distributor );
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) ); self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
loop choose { loop choose {
when ( wait( self->configurationTrigger.onTrigger() ) ) { when ( wait( self->configuration->onChange() ) ) {
TraceEvent("DataDistributor_Restart", di.id()) TraceEvent("DataDistributor_Restart", di.id())
.detail("ClusterControllerID", lastClusterControllerID) .detail("ClusterControllerID", lastClusterControllerID)
.detail("Configuration", self->configuration->get().toString()); .detail("Configuration", self->configuration->get().toString());
self->refreshDcIds(); self->refreshDcIds();
distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
self->addActor.send( distributor );
} }
when ( wait( collection ) ) { when ( wait( collection ) ) {
ASSERT(false); ASSERT(false);
throw internal_error(); throw internal_error();
} }
when ( wait( distributor ) ) {}
} }
} }
catch ( Error &err ) { catch ( Error &err ) {

View File

@ -28,13 +28,11 @@ struct DataDistributorInterface {
RequestStream<ReplyPromise<Void>> waitFailure; RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct GetRateInfoRequest> getRateInfo; RequestStream<struct GetRateInfoRequest> getRateInfo;
struct LocalityData locality; struct LocalityData locality;
bool valid;
DataDistributorInterface() : valid(false) {} DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l) : locality(l), valid(true) {} explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {}
void initEndpoints() {} void initEndpoints() {}
bool isValid() const { return valid; }
UID id() const { return getRateInfo.getEndpoint().token; } UID id() const { return getRateInfo.getEndpoint().token; }
NetworkAddress address() const { return getRateInfo.getEndpoint().address; } NetworkAddress address() const { return getRateInfo.getEndpoint().address; }
bool operator== (const DataDistributorInterface& r) const { bool operator== (const DataDistributorInterface& r) const {
@ -46,7 +44,7 @@ struct DataDistributorInterface {
template <class Archive> template <class Archive>
void serialize(Archive& ar) { void serialize(Archive& ar) {
serializer(ar, waitFailure, getRateInfo, locality, valid); serializer(ar, waitFailure, getRateInfo, locality);
} }
}; };

View File

@ -93,22 +93,22 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
state Future<GetRateInfoReply> reply = Never(); state Future<GetRateInfoReply> reply = Never();
state int64_t lastTC = 0; state int64_t lastTC = 0;
if (db->get().distributor.isValid()) nextRequestTimer = Void(); if (db->get().distributor.present()) nextRequestTimer = Void();
loop choose { loop choose {
when ( wait( db->onChange() ) ) { when ( wait( db->onChange() ) ) {
if ( db->get().distributor.isValid() ) { if ( db->get().distributor.present() ) {
TraceEvent("Proxy", myID) TraceEvent("Proxy_DataDistributorChanged", myID)
.detail("DataDistributorChangedID", db->get().distributor.id()); .detail("DDID", db->get().distributor.get().id());
nextRequestTimer = Void(); // trigger GetRate request nextRequestTimer = Void(); // trigger GetRate request
} else { } else {
TraceEvent("Proxy", myID) TraceEvent("Proxy_DataDistributorDied", myID);
.detail("DataDistributorDied", db->get().distributor.id());
nextRequestTimer = Never(); nextRequestTimer = Never();
reply = Never();
} }
} }
when ( wait( nextRequestTimer ) ) { when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never(); nextRequestTimer = Never();
reply = brokenPromiseToNever(db->get().distributor.getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount))); reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount)));
} }
when ( GetRateInfoReply rep = wait(reply) ) { when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never(); reply = Never();

View File

@ -70,18 +70,21 @@ ACTOR Future<WorkerInterface> getDataDistributorWorker( Database cx, Reference<A
loop { loop {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( dbInfo ) ); state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( dbInfo ) );
if (!dbInfo->get().distributor.present()) continue;
for( int i = 0; i < workers.size(); i++ ) { for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.address() == dbInfo->get().distributor.address() ) { if( workers[i].first.address() == dbInfo->get().distributor.get().address() ) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GotWorkers").detail("DataDistributorId", dbInfo->get().distributor.id()).detail("WorkerId", workers[i].first.id()); TraceEvent("GetDataDistributorWorker").detail("Stage", "GotWorkers")
.detail("DataDistributorId", dbInfo->get().distributor.get().id())
.detail("WorkerId", workers[i].first.id());
return workers[i].first; return workers[i].first;
} }
} }
TraceEvent(SevWarn, "GetDataDistributorWorker") TraceEvent(SevWarn, "GetDataDistributorWorker")
.detail("Error", "DataDistributorWorkerNotFound") .detail("Error", "DataDistributorWorkerNotFound")
.detail("DataDistributorId", dbInfo->get().distributor.id()) .detail("DataDistributorId", dbInfo->get().distributor.get().id())
.detail("DataDistributorAddress", dbInfo->get().distributor.address()) .detail("DataDistributorAddress", dbInfo->get().distributor.get().address())
.detail("WorkerCount", workers.size()); .detail("WorkerCount", workers.size());
} }
} }
@ -334,7 +337,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
try { try {
TraceEvent("QuietDatabaseWaitingOnDataDistributor"); TraceEvent("QuietDatabaseWaitingOnDataDistributor");
WorkerInterface distributorWorker = wait( getDataDistributorWorker( cx, dbInfo ) ); WorkerInterface distributorWorker = wait( getDataDistributorWorker( cx, dbInfo ) );
UID distributorUID = dbInfo->get().distributor.id(); UID distributorUID = dbInfo->get().distributor.get().id();
TraceEvent("QuietDatabaseGotDataDistributor", distributorUID).detail("Locality", distributorWorker.locality.toString()); TraceEvent("QuietDatabaseGotDataDistributor", distributorUID).detail("Locality", distributorWorker.locality.toString());
state Future<int64_t> dataInFlight = getDataInFlight( cx, distributorWorker); state Future<int64_t> dataInFlight = getDataInFlight( cx, distributorWorker);

View File

@ -37,7 +37,7 @@ struct ServerDBInfo {
UID id; // Changes each time any other member changes UID id; // Changes each time any other member changes
ClusterControllerFullInterface clusterInterface; ClusterControllerFullInterface clusterInterface;
ClientDBInfo client; // After a successful recovery, eventually proxies that communicate with it ClientDBInfo client; // After a successful recovery, eventually proxies that communicate with it
DataDistributorInterface distributor; // The best guess of current data distributor, which might be unknown. Optional<DataDistributorInterface> distributor; // The best guess of current data distributor.
MasterInterface master; // The best guess as to the most recent master, which might still be recovering MasterInterface master; // The best guess as to the most recent master, which might still be recovering
vector<ResolverInterface> resolvers; vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; unsuccessful recoveries may increment it once. Depending on where the current master is in its recovery process, this might not have been written by the current master. DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; unsuccessful recoveries may increment it once. Depending on where the current master is in its recovery process, this might not have been written by the current master.

View File

@ -514,7 +514,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
choose { choose {
when( ServerDBInfo ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) { when( ServerDBInfo ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
TraceEvent("GotServerDBInfoChange").detail("ChangeID", ni.id).detail("MasterID", ni.master.id()) TraceEvent("GotServerDBInfoChange").detail("ChangeID", ni.id).detail("MasterID", ni.master.id())
.detail("DataDistributorID", ni.distributor.id()); .detail("DataDistributorID", ni.distributor.present() ? ni.distributor.get().id() : UID());
ServerDBInfo localInfo = ni; ServerDBInfo localInfo = ni;
localInfo.myLocality = locality; localInfo.myLocality = locality;
dbInfo->set(localInfo); dbInfo->set(localInfo);
@ -729,6 +729,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
if ( ddInterf->get().present() ) { if ( ddInterf->get().present() ) {
recruited = ddInterf->get().get(); recruited = ddInterf->get().get();
TEST(true); // Recruited while already a data distributor.
} else { } else {
startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() ); startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() );

View File

@ -780,7 +780,9 @@ Future<Void> setWhenDoneOrError( Future<Void> condition, Reference<AsyncVar<T>>
try { try {
wait( condition ); wait( condition );
} }
catch ( Error& e ) {} catch ( Error& e ) {
if (e.code() == error_code_actor_cancelled) throw;
}
var->set( val ); var->set( val );
return Void(); return Void();
} }