Small code refactoring
This commit is contained in:
parent
de8d953865
commit
442738b6db
|
@ -2152,9 +2152,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
|
|||
self->popOrder.pop_front();
|
||||
}
|
||||
|
||||
if(self->id_data.size()) {
|
||||
return;
|
||||
} else {
|
||||
if (self->id_data.size() == 0) {
|
||||
throw worker_removed();
|
||||
}
|
||||
}
|
||||
|
@ -2605,27 +2603,25 @@ bool tlogTerminated( TLogData* self, IKeyValueStore* persistentData, TLogQueue*
|
|||
|
||||
ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, LogSystemConfig recoverFrom, Reference<AsyncVar<Reference<ILogSystem>>> logSystem) {
|
||||
loop {
|
||||
bool found = false;
|
||||
if(self->dbInfo->get().logSystemConfig.recruitmentID == logData->recruitmentID) {
|
||||
if( self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom) ) {
|
||||
bool found = self->dbInfo->get().logSystemConfig.recruitmentID == logData->recruitmentID;
|
||||
if (found) {
|
||||
if (self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom)) {
|
||||
logSystem->set(ILogSystem::fromOldLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig ));
|
||||
found = true;
|
||||
} else if( self->dbInfo->get().logSystemConfig.isEqualIds(recoverFrom) ) {
|
||||
} else if (self->dbInfo->get().logSystemConfig.isEqualIds(recoverFrom)) {
|
||||
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, false, true ));
|
||||
found = true;
|
||||
}
|
||||
else if( self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
|
||||
} else if (self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
|
||||
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, true ));
|
||||
found = true;
|
||||
} else {
|
||||
found = false;
|
||||
}
|
||||
}
|
||||
if( !found ) {
|
||||
if (!found) {
|
||||
logSystem->set(Reference<ILogSystem>());
|
||||
} else {
|
||||
logData->logSystem->get()->pop(logData->logRouterPoppedVersion, logData->remoteTag, logData->durableKnownCommittedVersion, logData->locality);
|
||||
}
|
||||
TraceEvent("TLogUpdate", self->dbgid).detail("LogId", logData->logId).detail("RecruitmentID", logData->recruitmentID).detail("DbRecruitmentID", self->dbInfo->get().logSystemConfig.recruitmentID).detail("RecoverFrom", recoverFrom.toString()).detail("DbInfo", self->dbInfo->get().logSystemConfig.toString()).detail("Found", found).detail("LogSystem", (bool) logSystem->get() ).detail("RecoveryState", (int)self->dbInfo->get().recoveryState);
|
||||
for(auto it : self->dbInfo->get().logSystemConfig.oldTLogs) {
|
||||
for (const auto& it : self->dbInfo->get().logSystemConfig.oldTLogs) {
|
||||
TraceEvent("TLogUpdateOld", self->dbgid).detail("LogId", logData->logId).detail("DbInfo", it.toString());
|
||||
}
|
||||
wait( self->dbInfo->onChange() );
|
||||
|
|
|
@ -1880,11 +1880,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
|
||||
state vector<Future<TLogInterface>> logRouterInitializationReplies;
|
||||
for( int i = 0; i < self->logRouterTags; i++) {
|
||||
const Version startVersion = oldLogSystem->logRouterTags == 0
|
||||
? oldLogSystem->recoverAt.get() + 1
|
||||
: std::max(self->tLogs[0]->startVersion, logSet->startVersion);
|
||||
for (int i = 0; i < self->logRouterTags; i++) {
|
||||
InitializeLogRouterRequest req;
|
||||
req.recoveryCount = recoveryCount;
|
||||
req.routerTag = Tag(tagLocalityLogRouter, i);
|
||||
req.startVersion = oldLogSystem->logRouterTags == 0 ? oldLogSystem->recoverAt.get() + 1 : std::max(self->tLogs[0]->startVersion, logSet->startVersion);
|
||||
req.startVersion = startVersion;
|
||||
req.tLogLocalities = localities;
|
||||
req.tLogPolicy = logSet->tLogPolicy;
|
||||
req.locality = remoteLocality;
|
||||
|
|
Loading…
Reference in New Issue