Fix according to Evan's comments

Use getRateInfo's endpoint as the ID for the DataDistributorInterface.
For now, added a "rejoined" flag for ClusterControllerData and Proxy.

TODO: move DataDistributorInterface into ServerDBInfo.
This commit is contained in:
Jingyu Zhou 2019-01-17 11:32:33 -08:00 committed by Jingyu Zhou
parent c35d1bf2ef
commit 0490160714
7 changed files with 66 additions and 73 deletions

View File

@ -1019,6 +1019,7 @@ public:
Version datacenterVersionDifference;
bool versionDifferenceUpdated;
AsyncVar<DataDistributorInterface> dataDistributorInterface;
bool rejoined = false;
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
: id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0), versionDifferenceUpdated(false)
@ -2220,17 +2221,15 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
ACTOR Future<Void> clusterGetDistributorInterface( ClusterControllerData *self, UID reqId, ReplyPromise<GetDistributorInterfaceReply> reqReply ) {
TraceEvent("CCGetDistributorInterfaceRequest", reqId);
state Future<Void> distributorOnChange = Never();
while ( !self->dataDistributorInterface.get().isValid() ) {
while ( !self->rejoined ) {
wait( self->dataDistributorInterface.onChange() );
TraceEvent("CCGetDistributorInterfaceID", self->dataDistributorInterface.get().id)
.detail("Endpoint", self->dataDistributorInterface.get().waitFailure.getEndpoint().token);
TraceEvent("CCGetDistributorInterfaceID", self->dataDistributorInterface.get().id())
.detail("Endpoint", self->dataDistributorInterface.get().waitFailure.getEndpoint().token);
}
GetDistributorInterfaceReply reply(self->dataDistributorInterface.get());
TraceEvent("CCGetDistributorInterfaceReply", reqId)
.detail("DataDistributorId", reply.distributorInterface.id)
.detail("DataDistributorId", reply.distributorInterface.id())
.detail("Endpoint", reply.distributorInterface.waitFailure.getEndpoint().token);
reqReply.send( reply );
return Void();
@ -2270,16 +2269,17 @@ ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterCo
// wait for a while to see if existing data distributor will join.
loop choose {
when ( DataDistributorRejoinRequest req = waitNext( clusterInterface->dataDistributorRejoin.getFuture() ) ) {
TraceEvent("ClusterController", self->id).detail("DataDistributorRejoinID", req.dataDistributor.id);
TraceEvent("ClusterController", self->id).detail("DataDistributorRejoinID", req.dataDistributor.id());
self->dataDistributorInterface.set( req.dataDistributor );
self->rejoined = true;
distributorFailed = waitFailureClient( self->dataDistributorInterface.get().waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
req.reply.send(true);
req.reply.send( Void() );
break;
}
when ( wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) ) ) { break; }
}
if ( !self->dataDistributorInterface.get().isValid() ) { // No rejoin happened
if ( !self->rejoined ) {
newDistributor = startDataDistributor( self );
}
@ -2287,36 +2287,35 @@ ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterCo
loop choose {
when ( DataDistributorInterface distributorInterf = wait( newDistributor ) ) {
TraceEvent ev("ClusterController", self->id);
const UID myDdId = self->dataDistributorInterface.get().id;
if ( myDdId == UID() ) {
ev.detail("NewDataDistributorID", distributorInterf.id);
self->dataDistributorInterface.set( distributorInterf );
distributorFailed = waitFailureClient( self->dataDistributorInterface.get().waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
} else {
ev.detail("MyDataDistributorID", myDdId).detail("DiscardDataDistributorID", distributorInterf.id);
}
const UID myDdId = self->dataDistributorInterface.get().id();
ev.detail("NewDataDistributorID", distributorInterf.id());
self->dataDistributorInterface.set( distributorInterf );
self->rejoined = true;
distributorFailed = waitFailureClient( self->dataDistributorInterface.get().waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
newDistributor = Never();
}
when ( wait( distributorFailed ) ) {
distributorFailed = Never();
TraceEvent("ClusterController", self->id).detail("DataDistributorFailed", self->dataDistributorInterface.get().id)
.detail("Endpoint", self->dataDistributorInterface.get().waitFailure.getEndpoint().token);
self->dataDistributorInterface.set( DataDistributorInterface() ); // clear the ID
TraceEvent("ClusterController", self->id)
.detail("DataDistributorFailed", self->dataDistributorInterface.get().id())
.detail("Endpoint", self->dataDistributorInterface.get().waitFailure.getEndpoint().token);
self->rejoined = false;
newDistributor = startDataDistributor( self );
}
when ( DataDistributorRejoinRequest req = waitNext( clusterInterface->dataDistributorRejoin.getFuture() ) ) {
if ( !self->dataDistributorInterface.get().isValid() ) {
if ( !self->rejoined ) {
self->dataDistributorInterface.set( req.dataDistributor );
distributorFailed = waitFailureClient( self->dataDistributorInterface.get().waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
TraceEvent("ClusterController", self->id).detail("DataDistributorRejoined", req.dataDistributor.id);
self->rejoined = true;
TraceEvent("ClusterController", self->id).detail("DataDistributorRejoined", req.dataDistributor.id());
} else {
const UID myDdId = self->dataDistributorInterface.get().id;
const bool success = myDdId == req.dataDistributor.id;
req.reply.send(success);
const UID myDdId = self->dataDistributorInterface.get().id();
const bool success = myDdId == req.dataDistributor.id();
req.reply.send( Void() );
TraceEvent("ClusterController", self->id)
.detail("DataDistributorRejoin", success ? "OK" : "Failed")
.detail("OldDataDistributorID", myDdId)
.detail("ReqID", req.dataDistributor.id);
.detail("DataDistributorRejoin", success ? "OK" : "Failed")
.detail("OldDataDistributorID", myDdId)
.detail("ReqID", req.dataDistributor.id());
}
}
}

View File

@ -260,7 +260,7 @@ struct GetDistributorInterfaceRequest {
struct DataDistributorRejoinRequest {
DataDistributorInterface dataDistributor;
ReplyPromise<bool> reply;
ReplyPromise<Void> reply;
DataDistributorRejoinRequest() { }
explicit DataDistributorRejoinRequest(DataDistributorInterface di) : dataDistributor(di) {}

View File

@ -3294,6 +3294,7 @@ ACTOR Future<Void> configurationMonitor( Reference<DataDistributorData> self ) {
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
@ -3343,22 +3344,22 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state UID lastClusterControllerID(0,0);
state PromiseStream<Future<Void>> addActor;
state Reference<AsyncVar<DatabaseConfiguration>> configuration( new AsyncVar<DatabaseConfiguration>(DatabaseConfiguration()) );
state Reference<DataDistributorData> self( new DataDistributorData(db, configuration, di.id, addActor) );
state Reference<DataDistributorData> self( new DataDistributorData(db, configuration, di.id(), addActor) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
state Future<Void> trigger = self->configurationTrigger.onTrigger();
state Version recoveryTransactionVersion = invalidVersion;
TraceEvent("NewDataDistributorID", di.id);
TraceEvent("NewDataDistributorID", di.id());
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
self->addActor.send( configurationMonitor( self ) );
loop choose {
// Get configuration from the master. Can't use configurationMonitor for it
// because the transaction read needs ratekeeper, which is not started yet.
when ( GetRecoveryInfoReply infoReply = wait( brokenPromiseToNever(self->dbInfo->get().master.getRecoveryInfo.getReply(GetRecoveryInfoRequest(di.id)) )) ) {
when ( GetRecoveryInfoReply infoReply = wait( brokenPromiseToNever(self->dbInfo->get().master.getRecoveryInfo.getReply(GetRecoveryInfoRequest(di.id())) )) ) {
configuration->set( infoReply.configuration );
recoveryTransactionVersion = infoReply.recoveryTransactionVersion;
TraceEvent("DataDistributor", di.id)
TraceEvent("DataDistributor", di.id())
.detail("RecoveryVersion", infoReply.recoveryTransactionVersion)
.detail("Configuration", configuration->get().toString());
// TODO: is remoteRecovered.getFuture() as Void() in dataDistribution() correct?
@ -3368,7 +3369,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
}
const std::vector<RegionInfo>& regions = self->configuration->get().regions;
TraceEvent ev("DataDistributor", di.id);
TraceEvent ev("DataDistributor", di.id());
if ( regions.size() > 0 ) {
self->primaryDcId.push_back( regions[0].dcId );
ev.detail("PrimaryDcID", regions[0].dcId.toHexString());
@ -3381,16 +3382,16 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
try {
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
state double lastLimited = 0;
TraceEvent("DataDistributor", di.id).detail("StartDD", "RK");
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, di.id, self->configuration->get(), ddStorageServerChanges, recoveryTransactionVersion, self->primaryDcId, self->remoteDcIds, &lastLimited, Void() ), "DataDistribution", di.id, &normalDataDistributorErrors() ) );
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id, &normalRateKeeperErrors() ) );
TraceEvent("DataDistributor", di.id()).detail("StartDD", "RK");
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, recoveryTransactionVersion, self->primaryDcId, self->remoteDcIds, &lastLimited, Void() ), "DataDistribution", di.id(), &normalDataDistributorErrors() ) );
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
state Future<bool> reply;
state Future<Void> reply;
loop {
if ( self->dbInfo->get().clusterInterface.id() != lastClusterControllerID ) {
// Rejoin the new cluster controller
DataDistributorRejoinRequest req(di);
TraceEvent("DataDistributorRejoining", di.id)
TraceEvent("DataDistributorRejoining", di.id())
.detail("OldClusterControllerID", lastClusterControllerID)
.detail("ClusterControllerID", self->dbInfo->get().clusterInterface.id());
reply = self->dbInfo->get().clusterInterface.dataDistributorRejoin.getReply(req);
@ -3398,15 +3399,10 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
reply = Never();
}
choose {
when (bool success = wait(brokenPromiseToNever(reply))) {
if (success) {
lastClusterControllerID = self->dbInfo->get().clusterInterface.id();
TraceEvent("DataDistributorRejoined", di.id)
.detail("ClusterControllerID", lastClusterControllerID);
} else {
TraceEvent("DataDistributorRejoinFailed", di.id); // Probably distributor exists.
break;
}
when (wait(brokenPromiseToNever(reply))) {
lastClusterControllerID = self->dbInfo->get().clusterInterface.id();
TraceEvent("DataDistributorRejoined", di.id())
.detail("ClusterControllerID", lastClusterControllerID);
}
when (wait(self->dbInfo->onChange())) {}
when (wait(trigger)) { break; } // TODO: maybe break here? Since configuration changed.
@ -3419,15 +3415,12 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
}
catch ( Error &err ) {
if ( normalDataDistributorErrors().count(err.code()) == 0 ) {
TraceEvent("DataDistributorError", di.id).error(err);
TraceEvent("DataDistributorError", di.id()).error(err);
throw err;
}
TraceEvent("DataDistributorTerminated", di.id).error(err);
TraceEvent("DataDistributorTerminated", di.id()).error(err);
}
while ( !self->addActor.isEmpty() ) {
self->addActor.getFuture().pop();
}
return Void();
}

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,24 +24,23 @@
#include "fdbrpc/fdbrpc.h"
struct DataDistributorInterface {
UID id;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream< struct GetRateInfoRequest > getRateInfo;
RequestStream<struct GetRateInfoRequest> getRateInfo;
DataDistributorInterface() {}
UID id() const { return getRateInfo.getEndpoint().token; }
NetworkAddress address() const { return getRateInfo.getEndpoint().address; }
bool operator== (const DataDistributorInterface& r) const {
return id == r.id;
return id() == r.id();
}
bool operator!= (const DataDistributorInterface& r) const {
return !(*this == r);
}
bool isValid() const { return id != UID(); }
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, id, waitFailure, getRateInfo);
serializer(ar, waitFailure, getRateInfo);
}
};

View File

@ -87,7 +87,7 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> monitorDataDistributor(UID myID, Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<DataDistributorInterface>> dataDistributor) {
ACTOR Future<Void> monitorDataDistributor(UID myID, Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<DataDistributorInterface>> dataDistributor, bool *rejoined) {
state Future<Void> distributorFailure = Never();
state Future<GetDistributorInterfaceReply> reply = brokenPromiseToNever( db->get().clusterInterface.getDistributorInterface.getReply( GetDistributorInterfaceRequest(myID) ) );
@ -95,8 +95,9 @@ ACTOR Future<Void> monitorDataDistributor(UID myID, Reference<AsyncVar<ServerDBI
when ( GetDistributorInterfaceReply r = wait( reply ) ) {
reply = Never();
dataDistributor->set( r.distributorInterface );
*rejoined = true;
distributorFailure = waitFailureClient( dataDistributor->get().waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
TraceEvent("Proxy", myID).detail("DataDistributorChangedID", dataDistributor->get().id)
TraceEvent("Proxy", myID).detail("DataDistributorChangedID", dataDistributor->get().id())
.detail("Endpoint", dataDistributor->get().waitFailure.getEndpoint().token);
}
when ( wait( db->onChange() ) ) {
@ -105,9 +106,10 @@ ACTOR Future<Void> monitorDataDistributor(UID myID, Reference<AsyncVar<ServerDBI
}
when ( wait( distributorFailure ) ) {
distributorFailure = Never();
*rejoined = false;
TraceEvent("Proxy", myID)
.detail("CC", db->get().clusterInterface.id())
.detail("DataDistributorFailed", dataDistributor->get().id)
.detail("DataDistributorFailed", dataDistributor->get().id())
.detail("Token", dataDistributor->get().waitFailure.getEndpoint().token);
wait( delay(0.001) );
reply = brokenPromiseToNever( db->get().clusterInterface.getDistributorInterface.getReply( GetDistributorInterfaceRequest(myID) ) );
@ -115,7 +117,7 @@ ACTOR Future<Void> monitorDataDistributor(UID myID, Reference<AsyncVar<ServerDBI
}
}
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, Reference<AsyncVar<DataDistributorInterface>> dataDistributor) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, Reference<AsyncVar<DataDistributorInterface>> dataDistributor, bool *rejoined) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -123,7 +125,7 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
loop choose {
when ( wait( dataDistributor->onChange() ) ) {
if ( dataDistributor->get().isValid() ) {
if ( *rejoined ) {
nextRequestTimer = Void(); // trigger GetRate request
} else {
nextRequestTimer = Never();
@ -1140,8 +1142,9 @@ ACTOR static Future<Void> transactionStarter(
otherProxies.push_back(mp);
}
state Reference<AsyncVar<DataDistributorInterface>> dataDistributor( new AsyncVar<DataDistributorInterface>(DataDistributorInterface()) );
addActor.send( getRate(proxy.id(), db, &transactionCount, &transactionRate, dataDistributor) ); // do this after correct CC is obtained.
addActor.send( monitorDataDistributor(proxy.id(), db, dataDistributor) );
bool rejoined = false;
addActor.send( getRate(proxy.id(), db, &transactionCount, &transactionRate, dataDistributor, &rejoined) ); // do this after correct CC is obtained.
addActor.send( monitorDataDistributor(proxy.id(), db, dataDistributor, &rejoined) );
ASSERT(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS); // else potentially we could return uncommitted read versions (since self->committedVersion is only a committed version if this recovery succeeds)

View File

@ -85,14 +85,14 @@ ACTOR Future<DistributorPair> getDataDistributorWorker( Database cx, Reference<A
for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.address() == ddInterf.address() ) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GotWorkers").detail("DataDistributorId", ddInterf.id).detail("WorkerId", workers[i].first.id());
return std::make_pair(workers[i].first, ddInterf.id);
TraceEvent("GetDataDistributorWorker").detail("Stage", "GotWorkers").detail("DataDistributorId", ddInterf.id()).detail("WorkerId", workers[i].first.id());
return std::make_pair(workers[i].first, ddInterf.id());
}
}
TraceEvent(SevWarn, "GetDataDistributorWorker")
.detail("Error", "DataDistributorWorkerNotFound")
.detail("DataDistributorId", ddInterf.id)
.detail("DataDistributorId", ddInterf.id())
.detail("DataDistributorAddress", ddInterf.address())
.detail("WorkerCount", workers.size());
}

View File

@ -715,12 +715,11 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
}
when ( InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture()) ) {
DataDistributorInterface recruited;
recruited.id = req.reqId;
TraceEvent("DataDistributorReceived", req.reqId).detail("Addr", interf.address()).detail("WorkerId", interf.id());
startRole( Role::DATA_DISTRIBUTOR, req.reqId, interf.id());
TraceEvent("DataDistributorReceived", req.reqId).detail("Addr", interf.address()).detail("DataDistributorId", recruited.id());
startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() );
Future<Void> dataDistributorProcess = dataDistributor( recruited, dbInfo );
errorForwarders.add( forwardError( errors, Role::DATA_DISTRIBUTOR, req.reqId, dataDistributorProcess ) );
errorForwarders.add( forwardError( errors, Role::DATA_DISTRIBUTOR, recruited.id(), dataDistributorProcess ) );
req.reply.send(recruited);
}
when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) {