fix: the shared tlog could fail to update a stopped tlog’s queueCommitVersion to version if a second tlog registered before it could issue the first commit for the tlog

This commit is contained in:
Evan Tschannen 2019-03-29 20:11:30 -07:00
parent 5dd6396eed
commit a44ffd851e
1 changed files with 11 additions and 2 deletions

View File

@ -1466,7 +1466,7 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData ) {
ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, std::vector<Reference<LogData>> missingFinalCommit ) {
state Version ver = logData->version.get();
state Version commitNumber = self->queueCommitBegin+1;
state Version knownCommittedVersion = logData->knownCommittedVersion;
@ -1507,6 +1507,11 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData ) {
logData->queueCommittedVersion.set(ver);
self->queueCommitEnd.set(commitNumber);
for(auto& it : missingFinalCommit) {
TraceEvent("TLogCommitMissingFinalCommit", self->dbgid).detail("LogId", logData->logId).detail("Version", it->version.get()).detail("QueueVer", it->queueCommittedVersion.get());
TEST(true); //A TLog was replaced before having a change to commit its queue
it->queueCommittedVersion.set(it->version.get());
}
return Void();
}
@ -1515,10 +1520,13 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
loop {
int foundCount = 0;
state std::vector<Reference<LogData>> missingFinalCommit;
for(auto it : self->id_data) {
if(!it.second->stopped) {
logData = it.second;
foundCount++;
} else if(it.second->version.get() > std::max(it.second->queueCommittingVersion, it.second->queueCommittedVersion.get())) {
missingFinalCommit.push_back(it.second);
}
}
@ -1544,7 +1552,8 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
while( self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get() ) {
wait( self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange() );
}
self->sharedActors.send(doQueueCommit(self, logData));
self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit));
missingFinalCommit.clear();
}
when(wait(self->newLogData.onTrigger())) {}
}