Merge pull request #6057 from sfc-gh-etschannen/fix-queue-commit

Fix a bug which prevented the tlog from spilling data
This commit is contained in:
sfc-gh-ngoyal 2021-11-29 09:56:57 -08:00 committed by GitHub
commit 90db1eb202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 0 deletions

View File

@ -955,6 +955,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
}
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
//TraceEvent("TlogUpdatePersist", self->dbgid).detail("LogId", logData->logId).detail("NextVersion", nextVersion).detail("Version", logData->version.get()).detail("PersistentDataDurableVer", logData->persistentDataDurableVersion).detail("QueueCommitVer", logData->queueCommittedVersion.get()).detail("PersistDataVer", logData->persistentDataVersion);
@ -1002,6 +1005,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
//TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize);
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
if (nextVersion > logData->persistentDataVersion) {
@ -1591,6 +1597,9 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
wait(self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) ||
self->largeDiskQueueCommitBytes.onChange());
}
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
break;
}
self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit));
missingFinalCommit.clear();
}
@ -2062,6 +2071,11 @@ void removeLog(TLogData* self, Reference<LogData> logData) {
} else {
throw worker_removed();
}
if (logData->queueCommittingVersion == 0) {
// If the removed tlog never attempted a queue commit, the update storage loop could become stuck waiting for
// queueCommittedVersion to advance.
logData->queueCommittedVersion.set(std::numeric_limits<Version>::max());
}
}
ACTOR Future<Void> pullAsyncData(TLogData* self,

View File

@ -1166,6 +1166,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
}
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
//TraceEvent("TlogUpdatePersist", self->dbgid).detail("LogId", logData->logId).detail("NextVersion", nextVersion).detail("Version", logData->version.get()).detail("PersistentDataDurableVer", logData->persistentDataDurableVersion).detail("QueueCommitVer", logData->queueCommittedVersion.get()).detail("PersistDataVer", logData->persistentDataVersion);
@ -1218,6 +1221,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
//TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize);
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
if (nextVersion > logData->persistentDataVersion) {
@ -2033,6 +2039,9 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
wait(self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) ||
self->largeDiskQueueCommitBytes.onChange());
}
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
break;
}
self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit));
missingFinalCommit.clear();
}
@ -2512,6 +2521,11 @@ void removeLog(TLogData* self, Reference<LogData> logData) {
} else {
throw worker_removed();
}
if (logData->queueCommittingVersion == 0) {
// If the removed tlog never attempted a queue commit, the update storage loop could become stuck waiting for
// queueCommittedVersion to advance.
logData->queueCommittedVersion.set(std::numeric_limits<Version>::max());
}
}
// copy data from old gene to new gene without desiarlzing

View File

@ -1299,6 +1299,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
}
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
//TraceEvent("TlogUpdatePersist", self->dbgid).detail("LogId", logData->logId).detail("NextVersion", nextVersion).detail("Version", logData->version.get()).detail("PersistentDataDurableVer", logData->persistentDataDurableVersion).detail("QueueCommitVer", logData->queueCommittedVersion.get()).detail("PersistDataVer", logData->persistentDataVersion);
@ -1353,6 +1356,9 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
//TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize);
wait(logData->queueCommittedVersion.whenAtLeast(nextVersion));
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
return Void();
}
wait(delay(0, TaskPriority::UpdateStorage));
if (nextVersion > logData->persistentDataVersion) {
@ -2081,6 +2087,9 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
wait(self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) ||
self->largeDiskQueueCommitBytes.onChange());
}
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
break;
}
self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit));
missingFinalCommit.clear();
}
@ -2598,6 +2607,11 @@ void removeLog(TLogData* self, Reference<LogData> logData) {
if (self->id_data.size() == 0) {
throw worker_removed();
}
if (logData->queueCommittingVersion == 0) {
// If the removed tlog never attempted a queue commit, the update storage loop could become stuck waiting for
// queueCommittedVersion to advance.
logData->queueCommittedVersion.set(std::numeric_limits<Version>::max());
}
}
// remote tLog pull data from log routers