Minor fix on ratekeeper work registration.

This commit is contained in:
Jingyu Zhou 2019-02-15 17:29:52 -08:00
parent 3c86643822
commit e6ac3f7fe8
4 changed files with 47 additions and 20 deletions

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -1769,6 +1769,11 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id());
self->db.setDistributor(di);
}
if ( req.ratekeeperInterf.present() && !self->db.serverInfo->get().ratekeeper.present() ) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
TraceEvent("ClusterController_RegisterRatekeeper", self->id).detail("RKID", rki.id());
self->db.setRatekeeper(rki);
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo );
checkOutstandingRequests( self );
@ -2398,7 +2403,7 @@ ACTOR Future<RatekeeperInterface> startRatekeeper(ClusterControllerData *self) {
req.reqId = g_random->randomUniqueID();
TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address());
ErrorOr<RatekeeperInterface> interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
ErrorOr<RatekeeperInterface> interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) );
if (interf.present()) {
TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address());
return interf.get();
@ -2414,7 +2419,7 @@ ACTOR Future<RatekeeperInterface> startRatekeeper(ClusterControllerData *self) {
}
}
ACTOR Future<Void> waitRKRejoinOrStartRK(ClusterControllerData *self) {
ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
state Future<Void> initialDelay = delay(SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY);
// wait for a while to see if an existing ratekeeper will join.
@ -2432,8 +2437,8 @@ ACTOR Future<Void> waitRKRejoinOrStartRK(ClusterControllerData *self) {
loop {
if ( self->db.serverInfo->get().ratekeeper.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ) );
TraceEvent("ClusterController", self->id)
.detail("RatekeeperDied", self->db.serverInfo->get().ratekeeper.get().id());
TraceEvent("ClusterController_RateKeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RateKeeperClass);
} else {
RatekeeperInterface rkInterf = wait( startRatekeeper(self) );
@ -2460,7 +2465,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
self.addActor.send( updateDatacenterVersionDifference(&self) );
self.addActor.send( handleForcedRecoveries(&self, interf) );
self.addActor.send( waitDDRejoinOrStartDD(&self) );
self.addActor.send( waitRKRejoinOrStartRK(&self) );
self.addActor.send( monitorRatekeeper(&self) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {

View File

@ -3454,6 +3454,39 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state DatabaseConfiguration configuration = self->configuration->get();
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
<<<<<<< HEAD
=======
state Transaction tr(cx);
loop {
try {
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
Standalone<RangeResultRef> replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
for(auto& kv : replicaKeys) {
auto dcId = decodeDatacenterReplicasKey(kv.key);
auto replicas = decodeDatacenterReplicasValue(kv.value);
if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) ||
(self->remoteDcIds.size() && self->remoteDcIds[0] == dcId && configuration.usableRegions > 1)) {
if(replicas > configuration.storageTeamSize) {
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
}
} else {
tr.clear(kv.key);
}
}
wait(tr.commit());
break;
}
catch(Error &e) {
wait(tr.onError(e));
}
}
>>>>>>> Minor fix on ratekeeper work registration.
//cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) );
//ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE );

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -655,7 +655,7 @@ ACTOR Future<Void> rateKeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
state Future<Void> collection = actorCollection( self.addActor.getFuture() );
// TODOs:
double lastLimited;
double lastLimited = 0;
self.lastLimited = &lastLimited;
TraceEvent("Ratekeeper_Starting", rkInterf.id());

View File

@ -29,7 +29,6 @@
struct RatekeeperInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct GetRateInfoRequest> getRateInfo;
RequestStream<struct StorageChangeRequest> changeStorage;
struct LocalityData locality;
RatekeeperInterface() {}
@ -47,7 +46,7 @@ struct RatekeeperInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, getRateInfo, changeStorage, locality);
serializer(ar, waitFailure, getRateInfo, locality);
}
};
@ -80,14 +79,4 @@ struct GetRateInfoReply {
}
};
struct StorageChangeRequest {
UID ssID;
Optional<StorageServerInterface> ssInterf;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ssID, ssInterf);
}
};
#endif //FDBSERVER_RATEKEEPERINTERFACE_H