diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 7077ee7c53..953a889497 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -879,8 +879,9 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self // Message the sequencer to obtain the previous commit version for each storage server's tag ACTOR Future getTPCV(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; - GetTLogPrevCommitVersionReply rep = wait(brokenPromiseToNever( - pProxyCommitData->master.getTLogPrevCommitVersion.getReply(GetTLogPrevCommitVersionRequest(self->writtenTLogs)))); + GetTLogPrevCommitVersionReply rep = + wait(brokenPromiseToNever(pProxyCommitData->master.getTLogPrevCommitVersion.getReply( + GetTLogPrevCommitVersionRequest(self->writtenTLogs)))); // TraceEvent("GetTLogPrevCommitVersionRequest"); return Void(); } @@ -1189,9 +1190,12 @@ ACTOR Future postResolution(CommitBatchContext* self) { if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMaxCommittedVersion(UID(), self->commitVersion); //< Is this valid? - //TraceEvent("ProxyPush", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion) - // .detail("TransactionsSubmitted", trs.size()).detail("TransactionsCommitted", commitCount).detail("TxsPopTo", - // msg.popTo); + // TraceEvent("ProxyPush", pProxyCommitData->dbgid) + // .detail("PrevVersion", self->prevVersion) + // .detail("Version", self->commitVersion) + // .detail("TransactionsSubmitted", trs.size()) + // .detail("TransactionsCommitted", self->commitCount) + // .detail("TxsPopTo", self->msg.popTo); if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMaxCommittedVersion(UID(), self->commitVersion); @@ -1281,7 +1285,9 @@ ACTOR Future reply(CommitBatchContext* self) { if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMinCommittedVersion(UID(), self->commitVersion); - //TraceEvent("ProxyPushed", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion); + // TraceEvent("ProxyPushed", pProxyCommitData->dbgid) + // .detail("PrevVersion", self->prevVersion) + // .detail("Version", self->commitVersion); if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AfterLogPush"); @@ -1307,6 +1313,7 @@ ACTOR Future reply(CommitBatchContext* self) { self->lockedAfter, self->metadataVersionAfter, pProxyCommitData->minKnownCommittedVersion, + self->prevVersion, writtenTags), TaskPriority::ProxyMasterVersionReply)); } diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index 45c11158a3..3078af1df4 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -217,6 +217,7 @@ struct ReportRawCommittedVersionRequest { bool locked; Optional metadataVersion; Version minKnownCommittedVersion; + Optional prevVersion; // if present, wait for prevVersion to be committed before replying Optional> writtenTags; ReplyPromise reply; @@ -225,13 +226,14 @@ struct ReportRawCommittedVersionRequest { bool locked, Optional metadataVersion, Version minKnownCommittedVersion, + Optional prevVersion, Optional> writtenTags = Optional>()) : version(version), locked(locked), metadataVersion(metadataVersion), - minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags) {} + minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags), prevVersion(prevVersion) {} template void serialize(Ar& ar) { - serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, writtenTags, reply); + serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, prevVersion, writtenTags, reply); } }; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 97d2d6fe22..f395717fcd 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -183,7 +183,7 @@ struct MasterData : NonCopyable, ReferenceCounted { recoveryTransactionVersion; // The first version in this epoch double lastCommitTime; - Version liveCommittedVersion; // The largest live committed version reported by commit proxies. + NotifiedVersion liveCommittedVersion; // The largest live committed version reported by commit proxies. bool databaseLocked; Optional proxyMetadataVersion; Version minKnownCommittedVersion; @@ -1221,6 +1221,31 @@ ACTOR Future provideVersions(Reference self) { } } +void updateLiveCommittedVersion(Reference self, ReportRawCommittedVersionRequest req) { + self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion); + if (req.version > self->liveCommittedVersion.get()) { + self->databaseLocked = req.locked; + self->proxyMetadataVersion = req.metadataVersion; + // Note the set call switches context to any waiters on liveCommittedVersion before continuing. + self->liveCommittedVersion.set(req.version); + } + ++self->reportLiveCommittedVersionRequests; + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { + // NB: this if-condition is not needed after wait-for-prev is ported to this branch + if (req.version > self->ssVersionVector.maxVersion) { + // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); + self->ssVersionVector.setVersions(req.writtenTags.get(), req.version); + } + } +} + +ACTOR Future waitForPrev(Reference self, ReportRawCommittedVersionRequest req) { + wait(self->liveCommittedVersion.whenAtLeast(req.prevVersion.get())); + updateLiveCommittedVersion(self, req); + req.reply.send(Void()); + return Void(); +} + ACTOR Future serveLiveCommittedVersion(Reference self) { loop { choose { @@ -1230,12 +1255,12 @@ ACTOR Future serveLiveCommittedVersion(Reference self) { req.debugID.get().first(), "MasterServer.serveLiveCommittedVersion.GetRawCommittedVersion"); - if (self->liveCommittedVersion == invalidVersion) { - self->liveCommittedVersion = self->recoveryTransactionVersion; + if (self->liveCommittedVersion.get() == invalidVersion) { + self->liveCommittedVersion.set(self->recoveryTransactionVersion); } ++self->getLiveCommittedVersionRequests; GetRawCommittedVersionReply reply; - reply.version = self->liveCommittedVersion; + reply.version = self->liveCommittedVersion.get(); reply.locked = self->databaseLocked; reply.metadataVersion = self->proxyMetadataVersion; reply.minKnownCommittedVersion = self->minKnownCommittedVersion; @@ -1244,21 +1269,14 @@ ACTOR Future serveLiveCommittedVersion(Reference self) { } when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) { - self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion); - if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { - // NB: this if-condition is not needed after wait-for-prev is ported to this branch - if (req.version > self->ssVersionVector.maxVersion) { - // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); - self->ssVersionVector.setVersions(req.writtenTags.get(), req.version); - } + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.prevVersion.present() && + (self->liveCommittedVersion.get() != invalidVersion) && + (self->liveCommittedVersion.get() < req.prevVersion.get())) { + self->addActor.send(waitForPrev(self, req)); + } else { + updateLiveCommittedVersion(self, req); + req.reply.send(Void()); } - if (req.version > self->liveCommittedVersion) { - self->liveCommittedVersion = req.version; - self->databaseLocked = req.locked; - self->proxyMetadataVersion = req.metadataVersion; - } - ++self->reportLiveCommittedVersionRequests; - req.reply.send(Void()); } when(GetTLogPrevCommitVersionRequest req = waitNext(self->myInterface.getTLogPrevCommitVersion.getFuture())) {