Fixed a number of problems with monitorLeaderRemotely

This commit is contained in:
Evan Tschannen 2020-05-10 14:20:50 -07:00
parent 0957b33948
commit 048201717c
9 changed files with 165 additions and 134 deletions

View File

@ -123,6 +123,14 @@ struct LeaderInfo {
return ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate));
}
ClusterControllerPriorityInfo getPriorityInfo() const {
ClusterControllerPriorityInfo info;
info.processClassFitness = (changeID.first() >> 57) & 7;
info.isExcluded = (changeID.first() >> 60) & 1;
info.dcFitness = (changeID.first() >> 61) & 7;
return info;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, changeID, serializedInfo, forward);

View File

@ -886,7 +886,7 @@ inline bool addressExcluded( std::set<AddressExclusion> const& exclusions, Netwo
}
struct ClusterControllerPriorityInfo {
enum DCFitness { FitnessPrimary, FitnessRemote, FitnessPreferred, FitnessUnknown, FitnessBad }; //cannot be larger than 7 because of leader election mask
enum DCFitness { FitnessPrimary, FitnessRemote, FitnessPreferred, FitnessUnknown, FitnessNotPreferred, FitnessBad }; //cannot be larger than 7 because of leader election mask
static DCFitness calculateDCFitness(Optional<Key> const& dcId, std::vector<Optional<Key>> const& dcPriority) {
if(!dcPriority.size()) {
@ -895,7 +895,7 @@ struct ClusterControllerPriorityInfo {
if(dcId == dcPriority[0]) {
return FitnessPreferred;
} else {
return FitnessUnknown;
return FitnessNotPreferred;
}
} else {
if(dcId == dcPriority[0]) {

View File

@ -411,60 +411,51 @@ ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, Asyn
// bool represents if the LeaderInfo is a majority answer or not.
// This function also masks the first 7 bits of changeId of the nominees and returns the Leader with masked changeId
Optional<std::pair<LeaderInfo, bool>> getLeader( const vector<Optional<LeaderInfo>>& nominees ) {
vector<LeaderInfo> maskedNominees;
// If any coordinator says that the quorum is forwarded, then it is
for(int i=0; i<nominees.size(); i++)
if (nominees[i].present() && nominees[i].get().forward)
return std::pair<LeaderInfo, bool>(nominees[i].get(), true);
vector<std::pair<UID,int>> maskedNominees;
maskedNominees.reserve(nominees.size());
for (auto &nominee : nominees) {
if (nominee.present()) {
maskedNominees.push_back(nominee.get());
maskedNominees.back().changeID = UID(maskedNominees.back().changeID.first() & LeaderInfo::mask, maskedNominees.back().changeID.second());
for (int i =0; i < nominees.size(); i++) {
if (nominees[i].present()) {
maskedNominees.push_back(std::make_pair(UID(nominees[i].get().changeID.first() & LeaderInfo::mask, nominees[i].get().changeID.second()), i));
}
}
// If any coordinator says that the quorum is forwarded, then it is
for(int i=0; i<maskedNominees.size(); i++)
if (maskedNominees[i].forward)
return std::pair<LeaderInfo, bool>(maskedNominees[i], true);
if(!maskedNominees.size())
return Optional<std::pair<LeaderInfo, bool>>();
std::sort(maskedNominees.begin(), maskedNominees.end(),
[](const LeaderInfo& l, const LeaderInfo& r) { return l.changeID < r.changeID; });
[](const std::pair<UID,int>& l, const std::pair<UID,int>& r) { return l.first < r.first; });
int bestCount = 0;
LeaderInfo bestNominee;
LeaderInfo currentNominee;
int curCount = 0;
for (int i = 0; i < maskedNominees.size(); i++) {
if (currentNominee == maskedNominees[i]) {
int bestIdx = 0;
int currentIdx = 0;
int curCount = 1;
for (int i = 1; i < maskedNominees.size(); i++) {
if (maskedNominees[currentIdx].first == maskedNominees[i].first) {
curCount++;
}
else {
if (curCount > bestCount) {
bestNominee = currentNominee;
bestIdx = currentIdx;
bestCount = curCount;
}
currentNominee = maskedNominees[i];
currentIdx = i;
curCount = 1;
}
}
if (curCount > bestCount) {
bestNominee = currentNominee;
bestIdx = currentIdx;
bestCount = curCount;
}
bool majority = bestCount >= nominees.size() / 2 + 1;
return std::pair<LeaderInfo, bool>(bestNominee, majority);
return std::pair<LeaderInfo, bool>(nominees[maskedNominees[bestIdx].second].get(), majority);
}
struct MonitorLeaderInfo {
bool hasConnected;
Reference<ClusterConnectionFile> intermediateConnFile;
MonitorLeaderInfo() : hasConnected(false) {}
explicit MonitorLeaderInfo( Reference<ClusterConnectionFile> intermediateConnFile ) : intermediateConnFile(intermediateConnFile), hasConnected(false) {}
};
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, MonitorLeaderInfo info ) {
state ClientCoordinators coordinators( info.intermediateConnFile );
@ -670,6 +661,7 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
outInfo.id = deterministicRandom()->randomUniqueID();
outInfo.forward = leader.get().first.serializedInfo;
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
leaderInfo->set(leader.get().first);
TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString());
return Void();
}
@ -679,9 +671,7 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
ClusterControllerClientInterface res;
reader.deserialize(res);
knownLeader->set(res);
if (leader.get().second) {
leaderInfo->set(leader.get().first);
}
leaderInfo->set(leader.get().first);
}
}
wait( nomineeChange.onTrigger() || allActors );
@ -771,10 +761,12 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
clientInfo->set( ni );
successIdx = idx;
} else if(idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
} else {
idx = (idx+1)%addrs.size();
if(idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
}
}
idx = (idx+1)%addrs.size();
}
}

View File

@ -49,6 +49,14 @@ struct ClientData {
ClientData() : clientInfo( new AsyncVar<CachedSerialization<ClientDBInfo>>( CachedSerialization<ClientDBInfo>() ) ) {}
};
struct MonitorLeaderInfo {
bool hasConnected;
Reference<ClusterConnectionFile> intermediateConnFile;
MonitorLeaderInfo() : hasConnected(false) {}
explicit MonitorLeaderInfo( Reference<ClusterConnectionFile> intermediateConnFile ) : intermediateConnFile(intermediateConnFile), hasConnected(false) {}
};
// Monitors the given coordination group's leader election process and provides a best current guess
// of the current leader. If a leader is elected for long enough and communication with a quorum of
// coordinators is possible, eventually outKnownLeader will be that leader's interface.

View File

@ -246,7 +246,6 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
ACTOR Future<Void> remoteMonitorLeader( int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader, ElectionResultRequest req ) {
if (currentElectedLeader->get().present() && req.knownLeader != currentElectedLeader->get().get().changeID) {
TraceEvent("ElectionResultQuickReply").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
req.reply.send( currentElectedLeader->get() );
return Void();
}
@ -255,15 +254,12 @@ ACTOR Future<Void> remoteMonitorLeader( int* clientCount, Reference<AsyncVar<boo
hasConnectedClients->set(true);
while (!currentElectedLeader->get().present() || req.knownLeader == currentElectedLeader->get().get().changeID) {
TraceEvent("ElectionResultWaitBefore").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
choose {
when (wait( yieldedFuture(currentElectedLeader->onChange()) ) ) {}
when (wait( yieldedFuture(currentElectedLeader->onChange()) ) ) {}
when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; }
}
TraceEvent("ElectionResultWaitAfter").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
}
TraceEvent("ElectionResultReply").detail("RequestID", req.requestID).detail("ChangeID", !currentElectedLeader->get().present() ? UID(-1,-1) : currentElectedLeader->get().get().changeID);
req.reply.send( currentElectedLeader->get() );
if(--(*clientCount) == 0) {
@ -301,7 +297,6 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
actors.add(openDatabase(&clientData, &clientCount, hasConnectedClients, req));
}
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
TraceEvent("ElectionResultRequestReceivedRegister").detail("RequestID", req.requestID);
if(!leaderMon.isValid()) {
leaderMon = monitorLeaderForProxies(req.key, req.coordinators, &clientData, currentElectedLeader);
}
@ -326,7 +321,6 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
if(!nextInterval.isValid()) {
nextInterval = delay(0);
}
//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
availableCandidates.erase( LeaderInfo(req.prevChangeID) );
availableCandidates.insert( req.myInfo );
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
@ -532,7 +526,6 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
}
}
when ( ElectionResultRequest req = waitNext( interf.electionResult.getFuture() ) ) {
TraceEvent("ElectionResultRequestReceivedServer").detail("RequestID", req.requestID);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if( forward.present() ) {
req.reply.send( forward.get() );

View File

@ -142,7 +142,6 @@ struct ElectionResultRequest {
Key key;
vector<NetworkAddress> coordinators;
UID knownLeader;
UID requestID;
ReplyPromise<Optional<LeaderInfo>> reply;
ElectionResultRequest() = default;
@ -150,7 +149,7 @@ struct ElectionResultRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, coordinators, knownLeader, requestID, reply);
serializer(ar, key, coordinators, knownLeader, reply);
}
};

View File

@ -84,7 +84,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators, Valu
state UID prevChangeID;
if(asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessBad || asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessRemote || asyncPriorityInfo->get().isExcluded) {
if(asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessBad ||
asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessRemote ||
asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessNotPreferred ||
asyncPriorityInfo->get().isExcluded) {
wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) );
} else if( asyncPriorityInfo->get().processClassFitness > ProcessClass::UnsetFit ) {
wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );

View File

@ -877,7 +877,8 @@ ACTOR Future<Void> workerServer(
ProcessClass initialClass, std::string folder, int64_t memoryLimit,
std::string metricsConnFile, std::string metricsPrefix,
Promise<Void> recoveredDiskFiles, int64_t memoryProfileThreshold,
std::string _coordFolder, std::string whitelistBinPaths) {
std::string _coordFolder, std::string whitelistBinPaths,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state PromiseStream< ErrorInfo > errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf( new AsyncVar<Optional<DataDistributorInterface>>() );
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf( new AsyncVar<Optional<RatekeeperInterface>>() );
@ -888,7 +889,6 @@ ACTOR Future<Void> workerServer(
state ActorCollection filesClosed(true);
state Promise<Void> stopping;
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
state Future<Void> metricsLogger;
state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
@ -1574,52 +1574,64 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
}
}
ACTOR Future<std::pair<ClusterConnectionString, UID>> monitorLeaderRemotelyOneGeneration( Reference<ClusterConnectionFile> connFile, UID changeID, Reference<AsyncVar<Value>> result ) {
state ClusterConnectionString ccf = connFile->getConnectionString();
ACTOR Future<MonitorLeaderInfo> monitorLeaderRemotelyOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> result, MonitorLeaderInfo info ) {
state ClusterConnectionString ccf = info.intermediateConnFile->getConnectionString();
state ElectionResultRequest request;
request.key = ccf.clusterKey();
request.coordinators = ccf.coordinators();
request.knownLeader = changeID;
state int index = 0;
state int successIndex = 0;
loop {
LeaderElectionRegInterface interf( request.coordinators[index] );
request.reply = ReplyPromise<Optional<LeaderInfo>>();
try {
UID requestID = deterministicRandom()->randomUniqueID();
TraceEvent("ElectionResultRequest").detail("RequestID", requestID).detail("Destination", request.coordinators[index].toString());
request.requestID = requestID;
ErrorOr<Optional<LeaderInfo>> leader = wait( interf.electionResult.tryGetReply( request ) );
if (leader.isError()) throw leader.getError();
if (!leader.get().present()) continue;
ErrorOr<Optional<LeaderInfo>> leader = wait( interf.electionResult.tryGetReply( request ) );
if (leader.present()) {
if(leader.get().present()) {
if( leader.get().get().forward ) {
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(leader.get().get().serializedInfo.toString())));
return info;
}
if(connFile != info.intermediateConnFile) {
if(!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
}
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
info.intermediateConnFile = connFile;
}
request.knownLeader = leader.get().get().changeID;
if( leader.get().get().forward ) {
return std::make_pair(ClusterConnectionString( leader.get().get().serializedInfo.toString() ), request.knownLeader);
info.hasConnected = true;
connFile->notifyConnected();
request.knownLeader = leader.get().get().changeID;
ClusterControllerPriorityInfo info = leader.get().get().getPriorityInfo();
if( leader.get().get().serializedInfo.size() && !info.isExcluded &&
(info.dcFitness == ClusterControllerPriorityInfo::FitnessPrimary ||
info.dcFitness == ClusterControllerPriorityInfo::FitnessPreferred ||
info.dcFitness == ClusterControllerPriorityInfo::FitnessUnknown)) {
result->set(leader.get().get().serializedInfo);
} else {
result->set(Value());
}
}
if (leader.get().get().serializedInfo.size()) {
result->set(leader.get().get().serializedInfo);
successIndex = index;
} else {
index = (index+1) % request.coordinators.size();
if (index == successIndex) {
wait( delay( CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY ) );
}
} catch (Error& e) {
TraceEvent("MonitorLeaderRemotely").error(e);
}
index = (index+1) % request.coordinators.size();
if (index == 0) {
wait( delay( CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY ) );
}
}
}
ACTOR Future<Void> monitorLeaderRemotelyInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedInterface ) {
state UID changeID = UID(-1, -1);
state ClusterConnectionString ccs;
ACTOR Future<Void> monitorLeaderRemotelyInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo ) {
state MonitorLeaderInfo info(connFile);
loop {
std::pair<ClusterConnectionString, UID> result = wait( monitorLeaderRemotelyOneGeneration( connFile, changeID, outSerializedInterface ) );
std::tie(ccs, changeID) = result;
connFile->setConnectionString(ccs);
MonitorLeaderInfo _info = wait( monitorLeaderRemotelyOneGeneration( connFile, outSerializedLeaderInfo, info ) );
info = _info;
}
}
@ -1632,25 +1644,28 @@ Future<Void> monitorLeaderRemotely(Reference<ClusterConnectionFile> const& connF
return m || deserializer( serializedInfo, outKnownLeader );
}
ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles, LocalityData locality ) {
ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Future<Void> recoveredDiskFiles, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
state Future<Void> monitor = monitorLeaderRemotely( connFile, currentCC );
state Future<Void> timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
state Future<Void> leader = Never();
state Future<Void> failed = Never();
state Future<Void> timeout;
loop choose {
when( wait(currentCC->onChange()) ) {
failed = IFailureMonitor::failureMonitor().onFailed( currentCC->get().get().getWorkers.getEndpoint() );
timeout = Never();
}
when ( wait(timeout) ) {
wait( clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality ) );
return Void();
}
when ( wait(failed) ) {
failed = Never();
wait(recoveredDiskFiles);
loop {
if(currentCC->get().present() && dbInfo->get().clusterInterface == currentCC->get().get() && IFailureMonitor::failureMonitor().getState( currentCC->get().get().registerWorker.getEndpoint() ).isAvailable()) {
timeout = Future<Void>();
} else if(!timeout.isValid()) {
timeout = delay( SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS );
}
choose {
when( wait(currentCC->onChange()) ) {}
when( wait(dbInfo->onChange()) ) {}
when( wait(currentCC->get().present() ? IFailureMonitor::failureMonitor().onStateChanged( currentCC->get().get().registerWorker.getEndpoint() ) : Never() ) ) {}
when( wait(timeout.isValid() ? timeout : Never()) ) {
monitor.cancel();
wait( clusterController( connFile, currentCC , asyncPriorityInfo, recoveredDiskFiles, locality ) );
return Void();
}
}
}
}
@ -1693,17 +1708,18 @@ ACTOR Future<Void> fdbd(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc(new AsyncVar<Optional<ClusterControllerFullInterface>>);
Reference<AsyncVar<Optional<ClusterInterface>>> ci(new AsyncVar<Optional<ClusterInterface>>);
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo(new AsyncVar<ClusterControllerPriorityInfo>(getCCPriorityInfo(fitnessFilePath, processClass)));
Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo"));
if (processClass == ProcessClass::TesterClass) {
actors.push_back( reportErrors( monitorLeader( connFile, cc ), "ClusterController" ) );
//} else if (processClass == ProcessClass::StorageClass && SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS) {
// actors.push_back( reportErrors( monitorLeaderRemotelyWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController" ) );
} else if (processClass == ProcessClass::StorageClass && SERVER_KNOBS->DELAY_STORAGE_CANDIDACY_SECONDS) {
actors.push_back( reportErrors( monitorLeaderRemotelyWithDelayedCandidacy( connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, dbInfo ), "ClusterController" ) );
} else {
actors.push_back( reportErrors( clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
}
actors.push_back( reportErrors(extractClusterInterface( cc, ci ), "ExtractClusterInterface") );
actors.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, memoryProfileThreshold, coordFolder, whitelistBinPaths), "WorkerServer", UID(), &normalWorkerErrors()) );
actors.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, memoryProfileThreshold, coordFolder, whitelistBinPaths, dbInfo), "WorkerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
wait( quorum(actors,1) );

View File

@ -134,56 +134,68 @@ struct SidebandWorkload : TestWorkload {
}
ACTOR Future<Void> mutator( SidebandWorkload *self, Database cx ) {
state SidebandInterface checker = wait( self->fetchSideband( self, cx->clone() ) );
state double lastTime = now();
try {
state SidebandInterface checker = wait( self->fetchSideband( self, cx->clone() ) );
state double lastTime = now();
state Version commitVersion;
loop {
wait( poisson( &lastTime, 1.0 / self->operationsPerSecond ) );
state Transaction tr(cx);
state uint64_t key = deterministicRandom()->randomUniqueID().hash();
state Version commitVersion = wait( tr.getReadVersion() ); // Used if the key is already present
state Standalone<StringRef> messageKey(format( "Sideband/Message/%llx", key ));
loop {
try {
Optional<Value> val = wait( tr.get( messageKey ) );
if( val.present() ) {
++self->keysUnexpectedlyPresent;
wait( poisson( &lastTime, 1.0 / self->operationsPerSecond ) );
state Transaction tr(cx);
state uint64_t key = deterministicRandom()->randomUniqueID().hash();
state Standalone<StringRef> messageKey(format( "Sideband/Message/%llx", key ));
loop {
try {
Optional<Value> val = wait( tr.get( messageKey ) );
if( val.present() ) {
commitVersion = tr.getReadVersion().get();
++self->keysUnexpectedlyPresent;
break;
}
tr.set( messageKey, LiteralStringRef("deadbeef") );
wait( tr.commit() );
commitVersion = tr.getCommittedVersion();
break;
}
tr.set( messageKey, LiteralStringRef("deadbeef") );
wait( tr.commit() );
commitVersion = tr.getCommittedVersion();
break;
}
catch( Error& e ) {
wait( tr.onError( e ) );
catch( Error& e ) {
wait( tr.onError( e ) );
}
}
++self->messages;
checker.updates.send( SidebandMessage( key, commitVersion ) );
}
++self->messages;
checker.updates.send( SidebandMessage( key, commitVersion ) );
} catch(Error &e) {
TraceEvent(SevError, "SidebandMutatorError").error(e).backtrace();
throw;
}
}
ACTOR Future<Void> checker( SidebandWorkload *self, Database cx ) {
loop {
state SidebandMessage message = waitNext( self->interf.updates.getFuture() );
state Standalone<StringRef> messageKey(format("Sideband/Message/%llx", message.key));
state Transaction tr(cx);
try {
loop {
try {
Optional<Value> val = wait( tr.get( messageKey ) );
if( !val.present() ) {
TraceEvent( SevError, "CausalConsistencyError", self->interf.id() )
.detail("MessageKey", messageKey.toString().c_str())
.detail("RemoteCommitVersion", message.commitVersion)
.detail("LocalReadVersion", tr.getReadVersion().get()); // will assert that ReadVersion is set
++self->consistencyErrors;
state SidebandMessage message = waitNext( self->interf.updates.getFuture() );
state Standalone<StringRef> messageKey(format("Sideband/Message/%llx", message.key));
state Transaction tr(cx);
loop {
try {
Optional<Value> val = wait( tr.get( messageKey ) );
if( !val.present() ) {
TraceEvent( SevError, "CausalConsistencyError", self->interf.id() )
.detail("MessageKey", messageKey.toString().c_str())
.detail("RemoteCommitVersion", message.commitVersion)
.detail("LocalReadVersion", tr.getReadVersion().get()); // will assert that ReadVersion is set
++self->consistencyErrors;
}
break;
} catch( Error& e ) {
wait( tr.onError(e) );
}
break;
} catch( Error& e ) {
wait( tr.onError(e) );
}
}
} catch( Error &e ) {
TraceEvent(SevError, "SidebandCheckerError").error(e).backtrace();
throw;
}
}
};