Use trigger to start resolutionBalancing
Trigger when there are more than one resolvers. This also avoids the problem of receiving multiple UpdateRecoveryDataRequests.
This commit is contained in:
parent
6e8d16538d
commit
437e7d27c6
|
@ -20,31 +20,15 @@
|
|||
|
||||
#include <iterator>
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbrpc/PerfMetric.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/BackupProgress.actor.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/CoordinatedState.h"
|
||||
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
|
||||
#include "fdbserver/DBCoreState.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/ProxyCommitData.actor.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
|
@ -68,13 +52,12 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
|
||||
std::vector<ResolverInterface> resolvers;
|
||||
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
||||
MasterInterface myInterface;
|
||||
|
||||
AsyncVar<Standalone<VectorRef<ResolverMoveRef>>> resolverChanges;
|
||||
Version resolverChangesVersion;
|
||||
std::set<UID> resolverNeedingChanges;
|
||||
AsyncTrigger triggerResolution;
|
||||
|
||||
bool forceRecovery;
|
||||
|
||||
|
@ -90,13 +73,12 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
ServerCoordinators const& coordinators,
|
||||
ClusterControllerFullInterface const& clusterController,
|
||||
Standalone<StringRef> const& dbId,
|
||||
PromiseStream<Future<Void>> const& addActor,
|
||||
bool forceRecovery)
|
||||
|
||||
: dbgid(myInterface.id()), lastEpochEnd(invalidVersion), recoveryTransactionVersion(invalidVersion),
|
||||
liveCommittedVersion(invalidVersion), databaseLocked(false), minKnownCommittedVersion(invalidVersion),
|
||||
coordinators(coordinators), version(invalidVersion), lastVersionTime(0), addActor(addActor),
|
||||
myInterface(myInterface), forceRecovery(forceRecovery), cc("Master", dbgid.toString()),
|
||||
coordinators(coordinators), version(invalidVersion), lastVersionTime(0), myInterface(myInterface),
|
||||
forceRecovery(forceRecovery), cc("Master", dbgid.toString()),
|
||||
getCommitVersionRequests("GetCommitVersionRequests", cc),
|
||||
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
|
||||
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc) {
|
||||
|
@ -177,6 +159,8 @@ static std::pair<KeyRangeRef, bool> findRange(CoalescedKeyRangeMap<int>& key_res
|
|||
|
||||
// Balance key ranges among resolvers so that their load are evenly distributed.
|
||||
ACTOR Future<Void> resolutionBalancing(Reference<MasterData> self) {
|
||||
wait(self->triggerResolution.onTrigger());
|
||||
|
||||
state CoalescedKeyRangeMap<int> key_resolver;
|
||||
key_resolver.insert(allKeys, 0);
|
||||
loop {
|
||||
|
@ -407,7 +391,7 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
|
|||
|
||||
self->resolvers = req.resolvers;
|
||||
if (req.resolvers.size() > 1)
|
||||
self->addActor.send(resolutionBalancing(self));
|
||||
self->triggerResolution.trigger();
|
||||
|
||||
req.reply.send(Void());
|
||||
}
|
||||
|
@ -454,14 +438,15 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
|
|||
|
||||
state Future<Void> onDBChange = Void();
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
state Reference<MasterData> self(new MasterData(
|
||||
db, mi, coordinators, db->get().clusterInterface, LiteralStringRef(""), addActor, forceRecovery));
|
||||
state Reference<MasterData> self(
|
||||
new MasterData(db, mi, coordinators, db->get().clusterInterface, LiteralStringRef(""), forceRecovery));
|
||||
state Future<Void> collection = actorCollection(addActor.getFuture());
|
||||
|
||||
addActor.send(traceRole(Role::MASTER, mi.id()));
|
||||
addActor.send(provideVersions(self));
|
||||
addActor.send(serveLiveCommittedVersion(self));
|
||||
addActor.send(updateRecoveryData(self));
|
||||
addActor.send(resolutionBalancing(self));
|
||||
|
||||
TEST(!lifetime.isStillValid(db->get().masterLifetime, mi.id() == db->get().master.id())); // Master born doomed
|
||||
TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString());
|
||||
|
|
Loading…
Reference in New Issue