From fdb509c99e288e4b25118ddac2ee0a408ad44fff Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 29 Aug 2022 15:25:29 -0500 Subject: [PATCH] 2 TLog stopped bug fixes - one setting stop from dbinfo, the other handling a race between peek and stopping (#8001) --- fdbserver/TLogServer.actor.cpp | 93 +++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 1cee836298..fadb717ff1 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -497,7 +497,8 @@ struct LogData : NonCopyable, public ReferenceCounted { */ AsyncTrigger stopCommit; - bool stopped, initialized; + bool initialized; + Promise stoppedPromise; DBRecoveryCount recoveryCount; // If persistentDataVersion != persistentDurableDataVersion, @@ -641,10 +642,10 @@ struct LogData : NonCopyable, public ReferenceCounted { TLogSpillType logSpillType, std::vector tags, std::string context) - : stopped(false), initialized(false), queueCommittingVersion(0), knownCommittedVersion(0), - durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), - minPoppedTag(invalidTag), unpoppedRecoveredTagCount(0), cc("TLog", interf.id().toString()), - bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc), + : initialized(false), queueCommittingVersion(0), knownCommittedVersion(0), durableKnownCommittedVersion(0), + minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), minPoppedTag(invalidTag), + unpoppedRecoveredTagCount(0), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), + bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc), blockingPeekTimeouts("BlockingPeekTimeouts", cc), emptyPeeks("EmptyPeeks", cc), nonEmptyPeeks("NonEmptyPeeks", cc), logId(interf.id()), protocolVersion(protocolVersion), newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1), @@ -755,6 +756,15 @@ struct LogData : NonCopyable, public ReferenceCounted { waitingTags.clear(); } } + + bool stopped() const { return stoppedPromise.isSet(); } + + void stop() { + if (stoppedPromise.canBeSet()) { + TraceEvent(SevDebug, "StoppingTLog", tLogData->dbgid).detail("LogId", logId); + stoppedPromise.send(Void()); + } + } }; template @@ -807,15 +817,15 @@ ACTOR Future tLogLock(TLogData* self, ReplyPromise reply, state Version stopVersion = logData->version.get(); CODE_PROBE(true, "TLog stopped by recovering cluster-controller"); - CODE_PROBE(logData->stopped, "logData already stopped"); - CODE_PROBE(!logData->stopped, "logData not yet stopped"); + CODE_PROBE(logData->stopped(), "logData already stopped"); + CODE_PROBE(!logData->stopped(), "logData not yet stopped"); TraceEvent("TLogStop", logData->logId) .detail("Ver", stopVersion) - .detail("IsStopped", logData->stopped) + .detail("IsStopped", logData->stopped()) .detail("QueueCommitted", logData->queueCommittedVersion.get()); - logData->stopped = true; + logData->stop(); logData->unblockWaitingPeeks(); if (!logData->recoveryComplete.isSet()) { logData->recoveryComplete.sendError(end_of_stream()); @@ -835,7 +845,7 @@ ACTOR Future tLogLock(TLogData* self, ReplyPromise reply, TraceEvent("TLogStop2", self->dbgid) .detail("LogId", logData->logId) .detail("Ver", stopVersion) - .detail("IsStopped", logData->stopped) + .detail("IsStopped", logData->stopped()) .detail("QueueCommitted", logData->queueCommittedVersion.get()) .detail("KnownCommitted", result.knownCommittedVersion); @@ -1318,7 +1328,7 @@ ACTOR Future updateStorage(TLogData* self) { // tag; which is not intended to ever happen. Optional cachePopVersion; for (auto& it : self->id_data) { - if (!it.second->stopped) { + if (!it.second->stopped()) { if (it.second->version.get() - it.second->unrecoveredBefore > SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT + SERVER_KNOBS->MAX_CACHE_VERSIONS) { cachePopVersion = it.second->version.get() - SERVER_KNOBS->MAX_CACHE_VERSIONS; @@ -1335,7 +1345,7 @@ ACTOR Future updateStorage(TLogData* self) { wait(waitForAll(cachePopFutures)); } - if (logData->stopped) { + if (logData->stopped()) { if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) { while (logData->persistentDataDurableVersion != logData->version.get()) { totalSize = 0; @@ -1766,19 +1776,20 @@ Future tLogPeekMessages(PromiseType replyPromise, wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (!logData->stopped && reqTag.locality != tagLocalityTxs && reqTag != txsTag && + if (!logData->stopped() && reqTag.locality != tagLocalityTxs && reqTag != txsTag && logData->version.get() < logData->recoveryTxnVersion) { DebugLogTraceEvent("TLogPeekMessages1", self->dbgid) .detail("LogId", logData->logId) .detail("Tag", reqTag.toString()) .detail("ReqBegin", reqBegin) .detail("Version", logData->version.get()) + .detail("RecoveryTxnVersion", logData->recoveryTxnVersion) .detail("RecoveredAt", logData->recoveredAt); // Make sure the peek reply has the recovery txn for the current TLog. // Older generation TLog has been stopped and doesn't wait here. // Similarly during recovery, reading transaction state store // doesn't wait here. - wait(logData->version.whenAtLeast(logData->recoveryTxnVersion)); + wait(logData->version.whenAtLeast(logData->recoveryTxnVersion) || logData->stoppedPromise.getFuture()); } if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { @@ -2200,7 +2211,7 @@ ACTOR Future commitQueue(TLogData* self) { loop { int foundCount = 0; for (auto it : self->id_data) { - if (!it.second->stopped) { + if (!it.second->stopped()) { logData = it.second; foundCount++; } else if (it.second->version.get() > @@ -2225,8 +2236,8 @@ ACTOR Future commitQueue(TLogData* self) { } loop { - if (logData->stopped && logData->version.get() == std::max(logData->queueCommittingVersion, - logData->queueCommittedVersion.get())) { + if (logData->stopped() && logData->version.get() == std::max(logData->queueCommittingVersion, + logData->queueCommittedVersion.get())) { wait(logData->queueCommittedVersion.whenAtLeast(logData->version.get())); break; } @@ -2277,7 +2288,7 @@ ACTOR Future tLogCommit(TLogData* self, } state double waitStartT = 0; - while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) { + while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped()) { if (now() - waitStartT >= 1) { TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) .detail("Version", logData->version.get()) @@ -2288,7 +2299,7 @@ ACTOR Future tLogCommit(TLogData* self, wait(delayJittered(.005, TaskPriority::TLogCommit)); } - if (logData->stopped) { + if (logData->stopped()) { req.reply.sendError(tlog_stopped()); return Void(); } @@ -2337,7 +2348,7 @@ ACTOR Future tLogCommit(TLogData* self, timeoutWarning(logData->queueCommittedVersion.whenAtLeast(req.version) || stopped, 0.1, warningCollectorInput)); if (stopped.isReady()) { - ASSERT(logData->stopped); + ASSERT(logData->stopped()); req.reply.sendError(tlog_stopped()); return Void(); } @@ -2413,9 +2424,11 @@ ACTOR Future getClusterId(TLogData* self) { } } +// send stopped promise instead of LogData* to avoid reference cycles ACTOR Future rejoinClusterController(TLogData* self, TLogInterface tli, DBRecoveryCount recoveryCount, + Promise stoppedPromise, Future registerWithCC, bool isPrimary) { state LifetimeToken lastMasterLifetime; @@ -2456,6 +2469,13 @@ ACTOR Future rejoinClusterController(TLogData* self, if (BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01())); throw worker_removed(); + } else if (inf.recoveryCount > recoveryCount && stoppedPromise.canBeSet()) { + CODE_PROBE(true, "Stopping tlog because new dbinfo has a higher recovery count"); + TraceEvent("StoppingTLog", self->dbgid) + .detail("LogId", tli.id()) + .detail("NewRecoveryCount", inf.recoveryCount) + .detail("MyRecoveryCount", recoveryCount); + stoppedPromise.send(Void()); } if (registerWithCC.isReady()) { @@ -2631,7 +2651,7 @@ ACTOR Future serveTLogInterface(TLogData* self, logData->locality); } - if (!logData->isPrimary && logData->stopped) { + if (!logData->isPrimary && logData->stopped()) { TraceEvent("TLogAlreadyStopped", self->dbgid).detail("LogId", logData->logId); logData->removed = logData->removed && logData->logSystem->get()->endEpoch(); } @@ -2665,8 +2685,8 @@ ACTOR Future serveTLogInterface(TLogData* self, when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) { //TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get()); ASSERT(logData->isPrimary); - CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped"); - if (!logData->stopped) + CODE_PROBE(logData->stopped(), "TLogCommitRequest while stopped"); + if (!logData->stopped()) logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput)); else req.reply.sendError(tlog_stopped()); @@ -2683,7 +2703,7 @@ ACTOR Future serveTLogInterface(TLogData* self, g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first()); g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest"); } - if (!logData->stopped) + if (!logData->stopped()) req.reply.send(Void()); else req.reply.sendError(tlog_stopped()); @@ -2716,7 +2736,7 @@ void removeLog(TLogData* self, Reference logData) { .detail("LogId", logData->logId) .detail("Input", logData->bytesInput.getValue()) .detail("Durable", logData->bytesDurable.getValue()); - logData->stopped = true; + logData->stop(); logData->unblockWaitingPeeks(); if (!logData->recoveryComplete.isSet()) { logData->recoveryComplete.sendError(end_of_stream()); @@ -2776,7 +2796,7 @@ ACTOR Future pullAsyncData(TLogData* self, } state double waitStartT = 0; - while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) { + while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped()) { if (now() - waitStartT >= 1) { TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) .detail("Version", logData->version.get()) @@ -2795,7 +2815,7 @@ ACTOR Future pullAsyncData(TLogData* self, if (!foundMessage || r->version().version != ver) { ASSERT(r->version().version > lastVer); if (ver) { - if (logData->stopped || (endVersion.present() && ver > endVersion.get())) { + if (logData->stopped() || (endVersion.present() && ver > endVersion.get())) { return Void(); } @@ -2836,7 +2856,7 @@ ACTOR Future pullAsyncData(TLogData* self, if (!foundMessage) { ver--; if (ver > logData->version.get()) { - if (logData->stopped || (endVersion.present() && ver > endVersion.get())) { + if (logData->stopped() || (endVersion.present() && ver > endVersion.get())) { return Void(); } @@ -3129,7 +3149,7 @@ ACTOR Future restorePersistentState(TLogData* self, std::vector(), "Restored"); logData->locality = id_locality[id1]; - logData->stopped = true; + logData->stop(); logData->unblockWaitingPeeks(); self->id_data[id1] = logData; id_interf[id1] = recruited; @@ -3141,8 +3161,8 @@ ACTOR Future restorePersistentState(TLogData* self, logData->version.set(ver); logData->recoveryCount = BinaryReader::fromStringRef(fRecoverCounts.get()[idx].value, Unversioned()); - logData->removed = - rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false); + logData->removed = rejoinClusterController( + self, recruited, logData->recoveryCount, logData->stoppedPromise, registerWithCC.getFuture(), false); removed.push_back(errorOr(logData->removed)); logsByVersion.emplace_back(ver, id1); @@ -3335,7 +3355,7 @@ ACTOR Future updateLogSystem(TLogData* self, void stopAllTLogs(TLogData* self, UID newLogId) { for (auto it : self->id_data) { - if (!it.second->stopped) { + if (!it.second->stopped()) { TraceEvent("TLogStoppedByNewRecruitment", self->dbgid) .detail("LogId", it.second->logId) .detail("StoppedId", it.first.toString()) @@ -3348,7 +3368,7 @@ void stopAllTLogs(TLogData* self, UID newLogId) { it.second->committingQueue.sendError(worker_removed()); } } - it.second->stopped = true; + it.second->stop(); it.second->unblockWaitingPeeks(); if (!it.second->recoveryComplete.isSet()) { it.second->recoveryComplete.sendError(end_of_stream()); @@ -3393,7 +3413,8 @@ ACTOR Future tLogStart(TLogData* self, InitializeTLogRequest req, Locality logData->locality = req.locality; logData->recoveryCount = req.epoch; logData->recoveryTxnVersion = req.recoveryTransactionVersion; - logData->removed = rejoinClusterController(self, recruited, req.epoch, Future(Void()), req.isPrimary); + logData->removed = rejoinClusterController( + self, recruited, req.epoch, logData->stoppedPromise, Future(Void()), req.isPrimary); self->popOrder.push_back(recruited.id()); self->spillOrder.push_back(recruited.id()); @@ -3441,7 +3462,7 @@ ACTOR Future tLogStart(TLogData* self, InitializeTLogRequest req, Locality logData->initialized = true; self->newLogData.trigger(); - if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped && + if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped() && logData->unrecoveredBefore <= recoverAt) { if (req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalitySatellite) { logData->logRouterPopToVersion = recoverAt; @@ -3462,7 +3483,7 @@ ACTOR Future tLogStart(TLogData* self, InitializeTLogRequest req, Locality state Version lastVersionPrevEpoch = req.recoverAt; if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && - logData->version.get() < lastVersionPrevEpoch && !logData->stopped) { + logData->version.get() < lastVersionPrevEpoch && !logData->stopped()) { // Log the changes to the persistent queue, to be committed by commitQueue() TLogQueueEntryRef qe; qe.version = lastVersionPrevEpoch;