2 TLog stopped bug fixes - one setting stop from dbinfo, the other handling a race between peek and stopping (#8001)

This commit is contained in:
Josh Slocum 2022-08-29 15:25:29 -05:00 committed by GitHub
parent cd06f8b0f1
commit fdb509c99e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 57 additions and 36 deletions

View File

@ -497,7 +497,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
*/
AsyncTrigger stopCommit;
bool stopped, initialized;
bool initialized;
Promise<Void> stoppedPromise;
DBRecoveryCount recoveryCount;
// If persistentDataVersion != persistentDurableDataVersion,
@ -641,10 +642,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogSpillType logSpillType,
std::vector<Tag> tags,
std::string context)
: stopped(false), initialized(false), queueCommittingVersion(0), knownCommittedVersion(0),
durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0),
minPoppedTag(invalidTag), unpoppedRecoveredTagCount(0), cc("TLog", interf.id().toString()),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc),
: initialized(false), queueCommittingVersion(0), knownCommittedVersion(0), durableKnownCommittedVersion(0),
minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), minPoppedTag(invalidTag),
unpoppedRecoveredTagCount(0), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc),
bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc),
blockingPeekTimeouts("BlockingPeekTimeouts", cc), emptyPeeks("EmptyPeeks", cc),
nonEmptyPeeks("NonEmptyPeeks", cc), logId(interf.id()), protocolVersion(protocolVersion),
newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1),
@ -755,6 +756,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
waitingTags.clear();
}
}
bool stopped() const { return stoppedPromise.isSet(); }
void stop() {
if (stoppedPromise.canBeSet()) {
TraceEvent(SevDebug, "StoppingTLog", tLogData->dbgid).detail("LogId", logId);
stoppedPromise.send(Void());
}
}
};
template <class T>
@ -807,15 +817,15 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
state Version stopVersion = logData->version.get();
CODE_PROBE(true, "TLog stopped by recovering cluster-controller");
CODE_PROBE(logData->stopped, "logData already stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped");
CODE_PROBE(logData->stopped(), "logData already stopped");
CODE_PROBE(!logData->stopped(), "logData not yet stopped");
TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion)
.detail("IsStopped", logData->stopped)
.detail("IsStopped", logData->stopped())
.detail("QueueCommitted", logData->queueCommittedVersion.get());
logData->stopped = true;
logData->stop();
logData->unblockWaitingPeeks();
if (!logData->recoveryComplete.isSet()) {
logData->recoveryComplete.sendError(end_of_stream());
@ -835,7 +845,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
TraceEvent("TLogStop2", self->dbgid)
.detail("LogId", logData->logId)
.detail("Ver", stopVersion)
.detail("IsStopped", logData->stopped)
.detail("IsStopped", logData->stopped())
.detail("QueueCommitted", logData->queueCommittedVersion.get())
.detail("KnownCommitted", result.knownCommittedVersion);
@ -1318,7 +1328,7 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
// tag; which is not intended to ever happen.
Optional<Version> cachePopVersion;
for (auto& it : self->id_data) {
if (!it.second->stopped) {
if (!it.second->stopped()) {
if (it.second->version.get() - it.second->unrecoveredBefore >
SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT + SERVER_KNOBS->MAX_CACHE_VERSIONS) {
cachePopVersion = it.second->version.get() - SERVER_KNOBS->MAX_CACHE_VERSIONS;
@ -1335,7 +1345,7 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
wait(waitForAll(cachePopFutures));
}
if (logData->stopped) {
if (logData->stopped()) {
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while (logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0;
@ -1766,19 +1776,20 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (!logData->stopped && reqTag.locality != tagLocalityTxs && reqTag != txsTag &&
if (!logData->stopped() && reqTag.locality != tagLocalityTxs && reqTag != txsTag &&
logData->version.get() < logData->recoveryTxnVersion) {
DebugLogTraceEvent("TLogPeekMessages1", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("Version", logData->version.get())
.detail("RecoveryTxnVersion", logData->recoveryTxnVersion)
.detail("RecoveredAt", logData->recoveredAt);
// Make sure the peek reply has the recovery txn for the current TLog.
// Older generation TLog has been stopped and doesn't wait here.
// Similarly during recovery, reading transaction state store
// doesn't wait here.
wait(logData->version.whenAtLeast(logData->recoveryTxnVersion));
wait(logData->version.whenAtLeast(logData->recoveryTxnVersion) || logData->stoppedPromise.getFuture());
}
if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) {
@ -2200,7 +2211,7 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
loop {
int foundCount = 0;
for (auto it : self->id_data) {
if (!it.second->stopped) {
if (!it.second->stopped()) {
logData = it.second;
foundCount++;
} else if (it.second->version.get() >
@ -2225,8 +2236,8 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
}
loop {
if (logData->stopped && logData->version.get() == std::max(logData->queueCommittingVersion,
logData->queueCommittedVersion.get())) {
if (logData->stopped() && logData->version.get() == std::max(logData->queueCommittingVersion,
logData->queueCommittedVersion.get())) {
wait(logData->queueCommittedVersion.whenAtLeast(logData->version.get()));
break;
}
@ -2277,7 +2288,7 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
}
state double waitStartT = 0;
while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) {
while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped()) {
if (now() - waitStartT >= 1) {
TraceEvent(SevWarn, "TLogUpdateLag", logData->logId)
.detail("Version", logData->version.get())
@ -2288,7 +2299,7 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
wait(delayJittered(.005, TaskPriority::TLogCommit));
}
if (logData->stopped) {
if (logData->stopped()) {
req.reply.sendError(tlog_stopped());
return Void();
}
@ -2337,7 +2348,7 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
timeoutWarning(logData->queueCommittedVersion.whenAtLeast(req.version) || stopped, 0.1, warningCollectorInput));
if (stopped.isReady()) {
ASSERT(logData->stopped);
ASSERT(logData->stopped());
req.reply.sendError(tlog_stopped());
return Void();
}
@ -2413,9 +2424,11 @@ ACTOR Future<UID> getClusterId(TLogData* self) {
}
}
// send stopped promise instead of LogData* to avoid reference cycles
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Promise<Void> stoppedPromise,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
@ -2456,6 +2469,13 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
if (BUGGIFY)
wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01()));
throw worker_removed();
} else if (inf.recoveryCount > recoveryCount && stoppedPromise.canBeSet()) {
CODE_PROBE(true, "Stopping tlog because new dbinfo has a higher recovery count");
TraceEvent("StoppingTLog", self->dbgid)
.detail("LogId", tli.id())
.detail("NewRecoveryCount", inf.recoveryCount)
.detail("MyRecoveryCount", recoveryCount);
stoppedPromise.send(Void());
}
if (registerWithCC.isReady()) {
@ -2631,7 +2651,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
logData->locality);
}
if (!logData->isPrimary && logData->stopped) {
if (!logData->isPrimary && logData->stopped()) {
TraceEvent("TLogAlreadyStopped", self->dbgid).detail("LogId", logData->logId);
logData->removed = logData->removed && logData->logSystem->get()->endEpoch();
}
@ -2665,8 +2685,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) {
//TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get());
ASSERT(logData->isPrimary);
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped");
if (!logData->stopped)
CODE_PROBE(logData->stopped(), "TLogCommitRequest while stopped");
if (!logData->stopped())
logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput));
else
req.reply.sendError(tlog_stopped());
@ -2683,7 +2703,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first());
g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest");
}
if (!logData->stopped)
if (!logData->stopped())
req.reply.send(Void());
else
req.reply.sendError(tlog_stopped());
@ -2716,7 +2736,7 @@ void removeLog(TLogData* self, Reference<LogData> logData) {
.detail("LogId", logData->logId)
.detail("Input", logData->bytesInput.getValue())
.detail("Durable", logData->bytesDurable.getValue());
logData->stopped = true;
logData->stop();
logData->unblockWaitingPeeks();
if (!logData->recoveryComplete.isSet()) {
logData->recoveryComplete.sendError(end_of_stream());
@ -2776,7 +2796,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
}
state double waitStartT = 0;
while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) {
while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped()) {
if (now() - waitStartT >= 1) {
TraceEvent(SevWarn, "TLogUpdateLag", logData->logId)
.detail("Version", logData->version.get())
@ -2795,7 +2815,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
if (!foundMessage || r->version().version != ver) {
ASSERT(r->version().version > lastVer);
if (ver) {
if (logData->stopped || (endVersion.present() && ver > endVersion.get())) {
if (logData->stopped() || (endVersion.present() && ver > endVersion.get())) {
return Void();
}
@ -2836,7 +2856,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
if (!foundMessage) {
ver--;
if (ver > logData->version.get()) {
if (logData->stopped || (endVersion.present() && ver > endVersion.get())) {
if (logData->stopped() || (endVersion.present() && ver > endVersion.get())) {
return Void();
}
@ -3129,7 +3149,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
std::vector<Tag>(),
"Restored");
logData->locality = id_locality[id1];
logData->stopped = true;
logData->stop();
logData->unblockWaitingPeeks();
self->id_data[id1] = logData;
id_interf[id1] = recruited;
@ -3141,8 +3161,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->version.set(ver);
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
logData->removed = rejoinClusterController(
self, recruited, logData->recoveryCount, logData->stoppedPromise, registerWithCC.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
@ -3335,7 +3355,7 @@ ACTOR Future<Void> updateLogSystem(TLogData* self,
void stopAllTLogs(TLogData* self, UID newLogId) {
for (auto it : self->id_data) {
if (!it.second->stopped) {
if (!it.second->stopped()) {
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid)
.detail("LogId", it.second->logId)
.detail("StoppedId", it.first.toString())
@ -3348,7 +3368,7 @@ void stopAllTLogs(TLogData* self, UID newLogId) {
it.second->committingQueue.sendError(worker_removed());
}
}
it.second->stopped = true;
it.second->stop();
it.second->unblockWaitingPeeks();
if (!it.second->recoveryComplete.isSet()) {
it.second->recoveryComplete.sendError(end_of_stream());
@ -3393,7 +3413,8 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->recoveryTxnVersion = req.recoveryTransactionVersion;
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinClusterController(
self, recruited, req.epoch, logData->stoppedPromise, Future<Void>(Void()), req.isPrimary);
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());
@ -3441,7 +3462,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
logData->initialized = true;
self->newLogData.trigger();
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped &&
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped() &&
logData->unrecoveredBefore <= recoverAt) {
if (req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalitySatellite) {
logData->logRouterPopToVersion = recoverAt;
@ -3462,7 +3483,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
state Version lastVersionPrevEpoch = req.recoverAt;
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) &&
logData->version.get() < lastVersionPrevEpoch && !logData->stopped) {
logData->version.get() < lastVersionPrevEpoch && !logData->stopped()) {
// Log the changes to the persistent queue, to be committed by commitQueue()
TLogQueueEntryRef qe;
qe.version = lastVersionPrevEpoch;