From cec744cebf242a1aa9913a9d41a30f561b785a99 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Tue, 10 Aug 2021 19:47:18 +0000 Subject: [PATCH] - Address the following issues: - Sequencer should update the version vector once for a given commit version (irrespective of the number of times that it receives and processes the ReportRawCommittedVersionRequest message for that commit version). Issue found by simulation tests. - Storage server should take both its latest commit version and the read version into account while processing a read request. This is to address transaction_too_old error that we saw while running tests with mako (and also in YCSB tests). - Do not enable the tlog blocking-peek logic if ENABLE_VERSION_VECTOR flag is set to false. --- fdbserver/TLogServer.actor.cpp | 39 ++++++++++--------- fdbserver/masterserver.actor.cpp | 8 ++-- fdbserver/storageserver.actor.cpp | 64 ++++++++++++++++++++++++------- 3 files changed, 75 insertions(+), 36 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index fdd9c637d4..369d962eea 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -633,13 +633,12 @@ struct LogData : NonCopyable, public ReferenceCounted { durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0), minPoppedTag(invalidTag), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc), - blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), - protocolVersion(protocolVersion), newPersistentDataVersion(invalidVersion), tLogData(tLogData), - unrecoveredBefore(1), recoveredAt(1), logSystem(new AsyncVar>()), remoteTag(remoteTag), - isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), - locality(tagLocalityInvalid), recruitmentID(recruitmentID), logSpillType(logSpillType), - allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false), - txsTags(txsTags) { + blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), protocolVersion(protocolVersion), + newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1), + logSystem(new AsyncVar>()), remoteTag(remoteTag), isPrimary(isPrimary), + logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid), + recruitmentID(recruitmentID), logSpillType(logSpillType), allTags(tags.begin(), tags.end()), + terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false), txsTags(txsTags) { startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, @@ -1170,17 +1169,19 @@ ACTOR Future tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen } uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion; - if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && - (logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning - (tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag + if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && + (logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning + (tagData->unpoppedRecovered || + PoppedVersionLag >= + SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag TraceEvent("TLogPopDetails", logData->logId) - .detail("Tag", tagData->tag.toString()) - .detail("UpTo", upTo) - .detail("PoppedVersionLag", PoppedVersionLag) - .detail("MinPoppedTag", logData->minPoppedTag.toString()) - .detail("QueuePoppedVersion", logData->queuePoppedVersion) - .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") - .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); + .detail("Tag", tagData->tag.toString()) + .detail("UpTo", upTo) + .detail("PoppedVersionLag", PoppedVersionLag) + .detail("MinPoppedTag", logData->minPoppedTag.toString()) + .detail("QueuePoppedVersion", logData->queuePoppedVersion) + .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") + .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); } if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); @@ -1743,8 +1744,8 @@ ACTOR Future tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen return Void(); } - if (req.begin > logData->persistentDataDurableVersion && !req.onlySpilled && req.tag.locality >= 0 && - !req.returnIfBlocked) { + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.begin > logData->persistentDataDurableVersion && !req.onlySpilled && + req.tag.locality >= 0 && !req.returnIfBlocked) { wait(waitForMessagesForTag(logData, &req, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT)); } state Version endVersion = logData->version.get() + 1; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index fa94d9eeb0..d74e43d01c 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1229,11 +1229,11 @@ ACTOR Future provideVersions(Reference self) { void updateLiveCommittedVersion(Reference self, ReportRawCommittedVersionRequest req) { self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion); - if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { - // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); - self->ssVersionVector.setVersion(req.writtenTags.get(), req.version); - } if (req.version > self->liveCommittedVersion.get()) { + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { + // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); + self->ssVersionVector.setVersion(req.writtenTags.get(), req.version); + } self->databaseLocked = req.locked; self->proxyMetadataVersion = req.metadataVersion; // Note the set call switches context to any waiters on liveCommittedVersion before continuing. diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 2269e4f425..bfad080913 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1176,13 +1176,20 @@ ACTOR Future waitForVersionActor(StorageServer* data, Version version, // If the latest commit version that mutated the shard(s) being served by the specified storage // server is below the client specified read version then do a read at the latest commit version // of the storage server. -Version getRealReadVersion(VersionVector& ssLatestCommitVersions, Tag& tag, Version specifiedReadVersion) -{ - Version realReadVersion = ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : specifiedReadVersion; +Version getRealReadVersion(VersionVector& ssLatestCommitVersions, Tag& tag, Version specifiedReadVersion) { + Version realReadVersion = + ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : specifiedReadVersion; ASSERT(realReadVersion <= specifiedReadVersion); return realReadVersion; } +// Find the latest commit version of the given tag. +Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag) { + Version commitVersion = + ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : invalidVersion; + return commitVersion; +} + Future waitForVersion(StorageServer* data, Version version, SpanID spanContext) { if (version == latestVersion) { version = std::max(Version(1), data->version.get()); @@ -1204,6 +1211,37 @@ Future waitForVersion(StorageServer* data, Version version, SpanID span return waitForVersionActor(data, version, spanContext); } +Future waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) { + ASSERT(commitVersion == invalidVersion || commitVersion < readVersion); + + if (commitVersion == invalidVersion) { + return waitForVersion(data, readVersion, spanContext); + } + + if (readVersion == latestVersion) { + readVersion = std::max(Version(1), data->version.get()); + } + + if (readVersion < data->oldestVersion.get() || readVersion <= 0) { + return transaction_too_old(); + } else { + if (commitVersion < data->oldestVersion.get()) { + return data->oldestVersion.get(); + } else if (commitVersion <= data->version.get()) { + return commitVersion; + } + } + + if ((data->behind || data->versionBehind) && commitVersion > data->version.get()) { + return process_behind(); + } + + if (deterministicRandom()->random01() < 0.001) { + TraceEvent("WaitForVersion1000x"); + } + return waitForVersionActor(data, std::max(commitVersion, data->oldestVersion.get()), spanContext); +} + ACTOR Future waitForVersionNoTooOld(StorageServer* data, Version version) { // This could become an Actor transparently, but for now it just does the lookup if (version == latestVersion) @@ -1245,10 +1283,8 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask()); state Optional v; - // If the client specified the latest commit version (that mutated the shard(s) being served - // by this storage server) then return the value that corresponds to that version. - Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version); - state Version version = wait(waitForVersion(data, readVersion, req.spanContext)); + Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag); + state Version version = wait(waitForVersion(data, commitVersion, req.version, req.spanContext)); if (req.debugID.present()) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), @@ -1947,8 +1983,9 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) try { if (req.debugID.present()) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before"); - Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version); - state Version version = wait(waitForVersion(data, readVersion, span.context)); + + Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag); + state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context)); state uint64_t changeCounter = data->shardChangeCounter; // try { @@ -2114,8 +2151,9 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe if (req.debugID.present()) g_traceBatch.addEvent( "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.Before"); - Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version); - state Version version = wait(waitForVersion(data, readVersion, span.context)); + + Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag); + state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context)); state uint64_t changeCounter = data->shardChangeCounter; // try { @@ -2284,8 +2322,8 @@ ACTOR Future getKeyQ(StorageServer* data, GetKeyRequest req) { wait(data->getQueryDelay()); try { - Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version); - state Version version = wait(waitForVersion(data, readVersion, req.spanContext)); + Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag); + state Version version = wait(waitForVersion(data, commitVersion, req.version, req.spanContext)); state uint64_t changeCounter = data->shardChangeCounter; state KeyRange shard = getShardKeyRange(data, req.sel);