fix: for configurations with anti-quorums to work, the push actors need to be put in the proxy’s actor collection
This commit is contained in:
@ -547,8 +547,8 @@ struct ILogSystem {
virtual Future<Void> endEpoch() = 0;
// Ends the current epoch without starting a new one
static Reference<ILogSystem> fromServerDBInfo( UID const& dbgid, struct ServerDBInfo const& db, bool usePreviousEpochEnd = false );
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, struct LocalityData const&, struct LogSystemConfig const&, bool excludeRemote = false, bool usePreviousEpochEnd = false );
static Reference<ILogSystem> fromServerDBInfo( UID const& dbgid, struct ServerDBInfo const& db, bool usePreviousEpochEnd = false, Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>() );
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, struct LocalityData const&, struct LogSystemConfig const&, bool excludeRemote = false, bool usePreviousEpochEnd = false, Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>() );
// Constructs a new ILogSystem implementation from the given ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
// The caller can peek() the returned log system and can push() if it has version numbers reserved for it and prevVersions
@ -1196,7 +1196,7 @@ ACTOR Future<Void> masterProxyServerCore(
for(auto r = rs.begin(); r != rs.end(); ++r)
commitData.logSystem = ILogSystem::fromServerDBInfo(, db->get());
commitData.logSystem = ILogSystem::fromServerDBInfo(, db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter,, 2e9, true, true);
@ -1220,7 +1220,7 @@ ACTOR Future<Void> masterProxyServerCore(
when( Void _ = wait( dbInfoChange ) ) {
dbInfoChange = db->onChange();
if(db->get() == && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
commitData.logSystem = ILogSystem::fromServerDBInfo(, db->get());
commitData.logSystem = ILogSystem::fromServerDBInfo(, db->get(), false, addActor);
for(auto it : commitData.tag_popped) {
commitData.logSystem->pop(it.second, it.first);
@ -84,11 +84,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version knownCommittedVersion;
LocalityData locality;
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
ActorCollection actors;
Optional<PromiseStream<Future<Void>>> addActor;
ActorCollection popActors;
std::vector<OldLogData> oldLogData;
AsyncTrigger logSystemConfigChanged;
TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false) {}
TagPartitionedLogSystem( UID dbgid, LocalityData locality, Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>() ) : dbgid(dbgid), locality(locality), addActor(addActor), popActors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false) {}
virtual void stopRejoins() {
rejoins = Future<Void>();
@ -121,10 +122,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality );
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool usePreviousEpochEnd ) {
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool usePreviousEpochEnd, Optional<PromiseStream<Future<Void>>> addActor ) {
ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) );
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality, addActor) );
logSystem->expectedLogSets = lsConf.expectedLogSets;
@ -386,7 +387,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
ASSERT( failed.size() >= 1 );
Void _ = wait( quorum(changes, 1) || tagError<Void>( quorum( failed, 1 ), master_tlog_failed() ) || self->actors.getResult() );
Void _ = wait( quorum(changes, 1) || tagError<Void>( quorum( failed, 1 ), master_tlog_failed() ) );
@ -402,7 +403,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, data.getMessages(location), debugID ), TaskTLogCommitReply ),
@ -773,8 +774,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
if (prev < upTo)
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
if (prev == 0)
actors.add( popFromLog( this, log, tag, 0.0 ) ); //Fast pop time because log routers can only hold 5 seconds of data.
if (prev == 0) {
popActors.add( popFromLog( this, log, tag, 0.0 ) ); //Fast pop time because log routers can only hold 5 seconds of data.
@ -787,7 +789,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if (prev < upTo)
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
if (prev == 0)
actors.add( popFromLog( this, log, tag, 0.0 ) );
popActors.add( popFromLog( this, log, tag, 0.0 ) );
@ -808,7 +810,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if (prev < upTo)
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
if (prev == 0)
actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
popActors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
@ -2043,11 +2045,11 @@ Future<Void> ILogSystem::recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSys
return TagPartitionedLogSystem::recoverAndEndEpoch( outLogSystem, dbgid, oldState, rejoins, locality );
Reference<ILogSystem> ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote, bool usePreviousEpochEnd ) {
Reference<ILogSystem> ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote, bool usePreviousEpochEnd, Optional<PromiseStream<Future<Void>>> addActor ) {
if (conf.logSystemType == 0)
return Reference<ILogSystem>();
else if (conf.logSystemType == 2)
return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote, usePreviousEpochEnd );
return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote, usePreviousEpochEnd, addActor );
throw internal_error();
@ -2061,6 +2063,6 @@ Reference<ILogSystem> ILogSystem::fromOldLogSystemConfig( UID const& dbgid, stru
throw internal_error();
Reference<ILogSystem> ILogSystem::fromServerDBInfo( UID const& dbgid, ServerDBInfo const& dbInfo, bool usePreviousEpochEnd ) {
return fromLogSystemConfig( dbgid, dbInfo.myLocality, dbInfo.logSystemConfig, false, usePreviousEpochEnd );
Reference<ILogSystem> ILogSystem::fromServerDBInfo( UID const& dbgid, ServerDBInfo const& dbInfo, bool usePreviousEpochEnd, Optional<PromiseStream<Future<Void>>> addActor ) {
return fromLogSystemConfig( dbgid, dbInfo.myLocality, dbInfo.logSystemConfig, false, usePreviousEpochEnd, addActor );
Reference in New Issue