Fix review comment for PR 1176

This commit is contained in:
Jingyu Zhou 2019-03-12 11:34:16 -07:00
parent 8504bd6c9f
commit 2b0139670e
5 changed files with 60 additions and 74 deletions

View File

@ -2335,12 +2335,10 @@ ACTOR Future<Void> handleForcedRecoveries( ClusterControllerData *self, ClusterC
}
ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerData *self ) {
state Optional<Key> dcId = self->clusterControllerDcId;
while ( !self->clusterControllerProcessId.present() || !self->masterProcessId.present() ) {
wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
}
state UID reqId;
loop {
try {
while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) {
@ -2348,19 +2346,18 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
reqId = g_random->randomUniqueID();
state InitializeDataDistributorRequest req(reqId);
TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address());
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeDataDistributorRequest req(g_random->randomUniqueID());
TraceEvent("ClusterController_DataDistributorRecruit", self->id).detail("Addr", data_distributor.worker.first.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
if (distributor.present()) {
TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorRecruited", self->id).detail("Addr", data_distributor.worker.first.address());
return distributor.get();
}
}
catch (Error& e) {
TraceEvent("ClusterController_DataDistributorRecruitError", reqId).error(e);
TraceEvent("ClusterController_DataDistributorRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}
@ -2398,7 +2395,6 @@ ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
}
ACTOR Future<RatekeeperInterface> startRatekeeper(ClusterControllerData *self) {
state UID reqId;
loop {
try {
while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) {
@ -2406,20 +2402,18 @@ ACTOR Future<RatekeeperInterface> startRatekeeper(ClusterControllerData *self) {
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
Optional<Key> dcId = self->clusterControllerDcId;
state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used);
reqId = g_random->randomUniqueID();
state InitializeRatekeeperRequest req(reqId);
TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address());
state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeRatekeeperRequest req(g_random->randomUniqueID());
TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", rkWorker.worker.first.address());
ErrorOr<RatekeeperInterface> interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) );
if (interf.present()) {
TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address());
TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", rkWorker.worker.first.address());
return interf.get();
}
}
catch (Error& e) {
TraceEvent("ClusterController_RatekeeperRecruitError", reqId).error(e);
TraceEvent("ClusterController_RatekeeperRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}

View File

@ -3455,8 +3455,30 @@ struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData>
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) : dbInfo(db), ddId(id) {}
};
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, double* lastLimited)
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
state Reference<ProxyInfo> proxies(new ProxyInfo(db->get().client.proxies, db->get().myLocality));
choose {
when (wait(db->onChange())) {}
when (GetHealthMetricsReply reply = wait(proxies->size() ?
loadBalance(proxies, &MasterProxyInterface::getHealthMetrics, GetHealthMetricsRequest(false))
: Never())) {
if (reply.healthMetrics.batchLimited) {
*lastLimited = now();
}
}
}
}
}
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
{
state double lastLimited = 0;
self->addActor.send( monitorBatchLimitedTime(self->dbInfo, &lastLimited) );
state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true);
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
@ -3612,7 +3634,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, double*
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
@ -3654,36 +3676,14 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
while (db->get().client.proxies.size() == 0) {
wait(db->onChange());
}
state int idx = g_random->randomInt(0, db->get().client.proxies.size());
choose {
when (wait(db->onChange())) {}
when (ErrorOr<GetHealthMetricsReply> reply = wait(
db->get().client.proxies[idx].getHealthMetrics.getReplyUnlessFailedFor(GetHealthMetricsRequest(false), 1.0, 0))) {
if (reply.present() && reply.get().healthMetrics.batchLimited) {
*lastLimited = now();
}
}
}
}
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
state double lastLimited = 0;
try {
TraceEvent("DataDistributor_Running", di.id());
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
self->addActor.send( monitorBatchLimitedTime(db, &lastLimited) );
state Future<Void> distributor = reportErrorsExcept( dataDistribution(self, &lastLimited), "DataDistribution", di.id(), &normalDataDistributorErrors() );
state Future<Void> distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() );
wait( distributor || collection );
}

View File

@ -294,50 +294,38 @@ ACTOR Future<Void> monitorServerListChange(
Reference<AsyncVar<ServerDBInfo>> dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
state Database db = openDBOnServer(dbInfo, TaskRateKeeper, true, true);
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
state Future<vector<std::pair<StorageServerInterface, ProcessClass>>> serverListAndProcessClasses = Never();
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(db);
loop {
try {
choose {
when ( wait( checkSignal ) ) {
checkSignal = Never();
serverListAndProcessClasses = getServerListAndProcessClasses(&tr);
}
when ( vector<std::pair<StorageServerInterface, ProcessClass>> results = wait( serverListAndProcessClasses ) ) {
serverListAndProcessClasses = Never();
vector<std::pair<StorageServerInterface, ProcessClass>> results = wait(getServerListAndProcessClasses(&tr));
std::map<UID, StorageServerInterface> newServers;
for (int i = 0; i < results.size(); i++) {
const StorageServerInterface& ssi = results[i].first;
const UID serverId = ssi.id();
newServers[serverId] = ssi;
std::map<UID, StorageServerInterface> newServers;
for (int i = 0; i < results.size(); i++) {
const StorageServerInterface& ssi = results[i].first;
const UID serverId = ssi.id();
newServers[serverId] = ssi;
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
oldServers.erase(serverId);
} else {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
for (const auto& it : oldServers) {
serverChanges.send( std::make_pair(it.first, Optional<StorageServerInterface>()) );
}
oldServers.swap(newServers);
tr = Transaction(db);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
oldServers.erase(serverId);
} else {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
}
for (const auto& it : oldServers) {
serverChanges.send( std::make_pair(it.first, Optional<StorageServerInterface>()) );
}
oldServers.swap(newServers);
tr = Transaction(db);
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
} catch(Error& e) {
wait( tr.onError(e) );
serverListAndProcessClasses = Never();
checkSignal = Void();
}
}
}

View File

@ -1,5 +1,5 @@
/*
* DataDistributorInterface.h
* RatekeeperInterface.h
*
* This source file is part of the FoundationDB open source project
*

View File

@ -832,6 +832,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
TEST(true); // Recruited while already a data distributor.
} else {
startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() );
DUMPTOKEN( recruited.waitFailure );
Future<Void> dataDistributorProcess = dataDistributor( recruited, dbInfo );
errorForwarders.add( forwardError( errors, Role::DATA_DISTRIBUTOR, recruited.id(), setWhenDoneOrError( dataDistributorProcess, ddInterf, Optional<DataDistributorInterface>() ) ) );
@ -849,6 +850,9 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
TEST(true); // Recruited while already a ratekeeper.
} else {
startRole(Role::RATE_KEEPER, recruited.id(), interf.id());
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.getRateInfo );
Future<Void> ratekeeper = rateKeeper( recruited, dbInfo );
errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional<RatekeeperInterface>() ) ) );
rkInterf->set(Optional<RatekeeperInterface>(recruited));