diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d95b9c1385..be5bfc53e4 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1769,6 +1769,11 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id()); self->db.setDistributor(di); } + if ( req.ratekeeperInterf.present() && !self->db.serverInfo->get().ratekeeper.present() ) { + const RatekeeperInterface& rki = req.ratekeeperInterf.get(); + TraceEvent("ClusterController_RegisterRatekeeper", self->id).detail("RKID", rki.id()); + self->db.setRatekeeper(rki); + } if( info == self->id_worker.end() ) { self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo ); checkOutstandingRequests( self ); @@ -2398,7 +2403,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { req.reqId = g_random->randomUniqueID(); TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address()); - ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) ); + ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) ); if (interf.present()) { TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address()); return interf.get(); @@ -2414,7 +2419,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } } -ACTOR Future waitRKRejoinOrStartRK(ClusterControllerData *self) { +ACTOR Future monitorRatekeeper(ClusterControllerData *self) { state Future initialDelay = delay(SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY); // wait for a while to see if an existing ratekeeper will join. @@ -2432,8 +2437,8 @@ ACTOR Future waitRKRejoinOrStartRK(ClusterControllerData *self) { loop { if ( self->db.serverInfo->get().ratekeeper.present() ) { wait( waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ) ); - TraceEvent("ClusterController", self->id) - .detail("RatekeeperDied", self->db.serverInfo->get().ratekeeper.get().id()); + TraceEvent("ClusterController_RateKeeperDied", self->id) + .detail("RKID", self->db.serverInfo->get().ratekeeper.get().id()); self->db.clearInterf(ProcessClass::RateKeeperClass); } else { RatekeeperInterface rkInterf = wait( startRatekeeper(self) ); @@ -2460,7 +2465,7 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, self.addActor.send( updateDatacenterVersionDifference(&self) ); self.addActor.send( handleForcedRecoveries(&self, interf) ); self.addActor.send( waitDDRejoinOrStartDD(&self) ); - self.addActor.send( waitRKRejoinOrStartRK(&self) ); + self.addActor.send( monitorRatekeeper(&self) ); //printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str()); loop choose { diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index fe52e5e42a..e1b80d2314 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3454,6 +3454,39 @@ ACTOR Future dataDistribution(Reference self, state DatabaseConfiguration configuration = self->configuration->get(); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; +<<<<<<< HEAD +======= + state Transaction tr(cx); + loop { + try { + tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); + tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); + + Standalone replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY)); + + for(auto& kv : replicaKeys) { + auto dcId = decodeDatacenterReplicasKey(kv.key); + auto replicas = decodeDatacenterReplicasValue(kv.value); + if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) || + (self->remoteDcIds.size() && self->remoteDcIds[0] == dcId && configuration.usableRegions > 1)) { + if(replicas > configuration.storageTeamSize) { + tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize)); + } + } else { + tr.clear(kv.key); + } + } + + wait(tr.commit()); + break; + } + catch(Error &e) { + wait(tr.onError(e)); + } + } + + +>>>>>>> Minor fix on ratekeeper work registration. //cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); //ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE ); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 83a8778411..f872a55566 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -655,7 +655,7 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference collection = actorCollection( self.addActor.getFuture() ); // TODOs: - double lastLimited; + double lastLimited = 0; self.lastLimited = &lastLimited; TraceEvent("Ratekeeper_Starting", rkInterf.id()); diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index cb5049a595..539aeb8d7f 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -29,7 +29,6 @@ struct RatekeeperInterface { RequestStream> waitFailure; RequestStream getRateInfo; - RequestStream changeStorage; struct LocalityData locality; RatekeeperInterface() {} @@ -47,7 +46,7 @@ struct RatekeeperInterface { template void serialize(Archive& ar) { - serializer(ar, waitFailure, getRateInfo, changeStorage, locality); + serializer(ar, waitFailure, getRateInfo, locality); } }; @@ -80,14 +79,4 @@ struct GetRateInfoReply { } }; -struct StorageChangeRequest { - UID ssID; - Optional ssInterf; - - template - void serialize(Ar& ar) { - serializer(ar, ssID, ssInterf); - } -}; - #endif //FDBSERVER_RATEKEEPERINTERFACE_H