From 1758c926837f7b7ed0d64d7e361c29d7854da1b1 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Fri, 6 Aug 2021 14:42:35 +0000 Subject: [PATCH] - Pull changes related to tlog-peeks from the version indexer branch Pull commits 5e37bc37a07ea54136865d1933758d9769ef8a94 and 95e85aaffba7e7ad8dad85774ac7370f23a1e769 from the version indexer branch. --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/ServerKnobs.h | 1 + fdbserver/TLogServer.actor.cpp | 28 ++++++++++++++++++---------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 5e2c20120f..29c992fefd 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -103,6 +103,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TLOG_POP_BATCH_SIZE, 1000 ); if ( randomize && BUGGIFY ) TLOG_POP_BATCH_SIZE = 10; init( TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE, 250e6 ); init( ENABLE_DETAILED_TLOG_POP_TRACE, true ); + init( BLOCKING_PEEK_TIMEOUT, 1.0 ); // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes init( MAX_FORKED_PROCESS_OUTPUT, 1024 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 5205b881b2..1be8867913 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -106,6 +106,7 @@ public: double PUSH_STATS_SLOW_AMOUNT; double PUSH_STATS_SLOW_RATIO; int TLOG_POP_BATCH_SIZE; + double BLOCKING_PEEK_TIMEOUT; // Data distribution queue double HEALTH_POLL_TIME; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 90a2374142..fdd9c637d4 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -549,6 +549,8 @@ struct LogData : NonCopyable, public ReferenceCounted { CounterCollection cc; Counter bytesInput; Counter bytesDurable; + Counter blockingPeeks; + Counter blockingPeekTimeouts; UID logId; ProtocolVersion protocolVersion; @@ -630,7 +632,8 @@ struct LogData : NonCopyable, public ReferenceCounted { : stopped(false), initialized(false), queueCommittingVersion(0), knownCommittedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), minPoppedTag(invalidTag), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()), - bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), logId(interf.id()), + bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc), + blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), protocolVersion(protocolVersion), newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1), logSystem(new AsyncVar>()), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), @@ -1526,14 +1529,19 @@ std::deque>& getVersionMessages(Refe return tagData->versionMessages; }; -ACTOR Future waitForMessagesForTag(Reference self, TLogPeekRequest* req) { +ACTOR Future waitForMessagesForTag(Reference self, TLogPeekRequest* req, double timeout) { + self->blockingPeeks += 1; auto tagData = self->getTagData(req->tag); - if (tagData.isValid()) { + if (tagData.isValid() && !tagData->versionMessages.empty() && tagData->versionMessages.back().first > req->begin) { return Void(); } - wait(self->waitingTags[req->tag].getFuture()); - // we want the caller to finish first, otherwise the data structure it is building might not be complete - wait(delay(0.0)); + choose { + when(wait(self->waitingTags[req->tag].getFuture())) { + // we want the caller to finish first, otherwise the data structure it is building might not be complete + wait(delay(0.0)); + } + when(wait(delay(timeout))) { self->blockingPeekTimeouts += 1; } + } return Void(); } @@ -1735,6 +1743,10 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen return Void(); } + if (req.begin > logData->persistentDataDurableVersion && !req.onlySpilled && req.tag.locality >= 0 && + !req.returnIfBlocked) { + wait(waitForMessagesForTag(logData, &req, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT)); + } state Version endVersion = logData->version.get() + 1; state bool onlySpilled = false; @@ -1866,10 +1878,6 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (req.onlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - if (req.tag.locality >= 0 && !req.returnIfBlocked) { - // wait for at most 1 second - wait(waitForMessagesForTag(logData, &req) || delay(1.0)); - } peekMessagesFromMemory(logData, req, messages, endVersion); }