fixed many more bugs associated with running without remote logs
This commit is contained in:
parent
8f58bdd1cd
commit
264dc44dfa
|
@ -468,6 +468,7 @@ struct DDTeamCollection {
|
|||
Promise<Void> serverTrackerErrorOut;
|
||||
AsyncVar<int> recruitingStream;
|
||||
Debouncer restartRecruiting;
|
||||
Promise<Void> hasTeams;
|
||||
|
||||
int healthyTeamCount;
|
||||
PromiseStream<Void> zeroHealthyTeams;
|
||||
|
@ -561,6 +562,7 @@ struct DDTeamCollection {
|
|||
// use keys, src, dest, metrics, priority, system load, etc.. to decide...
|
||||
ACTOR Future<Void> getTeam( DDTeamCollection* self, GetTeamRequest req ) {
|
||||
try {
|
||||
Void _ = wait( self->hasTeams.getFuture() );
|
||||
Void _ = wait( self->checkBuildTeams( self ) );
|
||||
|
||||
// Select the best team
|
||||
|
@ -728,6 +730,10 @@ struct DDTeamCollection {
|
|||
addTeam(t->begin(), t->end() );
|
||||
}
|
||||
|
||||
if( teams.size() && hasTeams.canBeSet() ) {
|
||||
hasTeams.send(Void());
|
||||
}
|
||||
|
||||
addSubsetOfEmergencyTeams();
|
||||
}
|
||||
|
||||
|
@ -1034,6 +1040,10 @@ struct DDTeamCollection {
|
|||
//Let all of these changes get worked out before responding to the get team request
|
||||
Void _ = wait( delay(0, TaskDataDistributionLaunch) );
|
||||
|
||||
if( self->teams.size() && self->hasTeams.canBeSet() ) {
|
||||
self->hasTeams.send(Void());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ public:
|
|||
|
||||
if(hasBest) {
|
||||
for(auto& t : tags) {
|
||||
if(t.locality == locality || t.locality == tagLocalitySpecial) {
|
||||
if(t.locality == locality || t.locality == tagLocalitySpecial || locality == tagLocalitySpecial) {
|
||||
newLocations.push_back(bestLocationFor(t));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -352,7 +352,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -368,7 +368,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
|
||||
int bestOldSet = -1;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->hasBest && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
|
||||
if(oldLogData[i].tLogs[t]->hasBest && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -394,7 +394,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
} else {
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
|
||||
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial)) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -1175,8 +1175,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
if(configuration.remoteTLogReplicationFactor > 0) {
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality);
|
||||
} else {
|
||||
logSystem->remoteRecovery = Void();
|
||||
logSystem->remoteRecoveryComplete = Void();
|
||||
logSystem->remoteRecovery = logSystem->recoveryComplete;
|
||||
logSystem->remoteRecoveryComplete = logSystem->recoveryComplete;
|
||||
}
|
||||
|
||||
return logSystem;
|
||||
|
|
|
@ -289,30 +289,34 @@ ACTOR Future<Void> newResolvers( Reference<MasterData> self, RecruitFromConfigur
|
|||
}
|
||||
|
||||
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfigurationReply recr, Reference<ILogSystem> oldLogSystem, vector<Standalone<CommitTransactionRef>>* initialConfChanges ) {
|
||||
state Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
|
||||
if( !self->dcId_locality.count(primaryDcId) ) {
|
||||
TraceEvent("UnknownPrimaryDCID", self->dbgid).detail("found", self->dcId_locality.count(primaryDcId)).detail("primaryId", printable(primaryDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(primaryDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[primaryDcId] = loc;
|
||||
if(self->configuration.remoteTLogReplicationFactor > 0) {
|
||||
state Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
|
||||
if( !self->dcId_locality.count(primaryDcId) ) {
|
||||
TraceEvent(SevWarnAlways, "UnknownPrimaryDCID", self->dbgid).detail("found", self->dcId_locality.count(primaryDcId)).detail("primaryId", printable(primaryDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(primaryDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[primaryDcId] = loc;
|
||||
}
|
||||
|
||||
if( !self->dcId_locality.count(recr.remoteDcId) ) {
|
||||
TraceEvent(SevWarnAlways, "UnknownRemoteDCID", self->dbgid).detail("remoteFound", self->dcId_locality.count(recr.remoteDcId)).detail("remoteId", printable(recr.remoteDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(recr.remoteDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[recr.remoteDcId] = loc;
|
||||
}
|
||||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) );
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) );
|
||||
self->logSystem = newLogSystem;
|
||||
} else {
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial ) );
|
||||
self->logSystem = newLogSystem;
|
||||
}
|
||||
|
||||
if( self->configuration.remoteTLogReplicationFactor > 0 && !self->dcId_locality.count(recr.remoteDcId) ) {
|
||||
TraceEvent("UnknownRemoteDCID", self->dbgid).detail("remoteFound", self->dcId_locality.count(recr.remoteDcId)).detail("remoteId", printable(recr.remoteDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(recr.remoteDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[recr.remoteDcId] = loc;
|
||||
}
|
||||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = self->configuration.remoteTLogReplicationFactor > 0 ? brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) ) : Never();
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) );
|
||||
self->logSystem = newLogSystem;
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -547,9 +551,11 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
RecruitFromConfigurationRequest( self->configuration, self->lastEpochEnd==0 ) ) ) );
|
||||
|
||||
self->primaryDcId.clear();
|
||||
self->primaryDcId.push_back(recruits.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId);
|
||||
self->remoteDcId.clear();
|
||||
self->remoteDcId.push_back(recruits.remoteDcId);
|
||||
if(recruits.remoteDcId.present()) {
|
||||
self->primaryDcId.push_back(recruits.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId);
|
||||
self->remoteDcId.push_back(recruits.remoteDcId);
|
||||
}
|
||||
|
||||
TraceEvent("MasterRecoveryState", self->dbgid)
|
||||
.detail("StatusCode", RecoveryStatus::initializing_transaction_servers)
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
testTitle=RandomReadWriteTest
|
||||
testName=ConflictRange
|
||||
connectionFailuresDisableDuration=100000
|
||||
connectionFailuresDisableDuration=100000
|
||||
buggify=off
|
|
@ -1,4 +1,5 @@
|
|||
testTitle=RandomReadWriteTest
|
||||
testName=ConflictRange
|
||||
testReadYourWrites=true
|
||||
connectionFailuresDisableDuration=100000
|
||||
connectionFailuresDisableDuration=100000
|
||||
buggify=off
|
Loading…
Reference in New Issue