- Address the following issues:

- Sequencer should update the version vector once for a given commit
version (irrespective of the number of times that it receives and
processes the ReportRawCommittedVersionRequest message for that commit
version). Issue found by simulation tests.

- Storage server should take both its latest commit version and the
read version into account while processing a read request. This is to
address transaction_too_old error that we saw while running tests with
mako (and also in YCSB tests).

- Do not enable the tlog blocking-peek logic if ENABLE_VERSION_VECTOR
flag is set to false.
This commit is contained in:
Sreenath Bodagala 2021-08-10 19:47:18 +00:00
parent 1758c92683
commit cec744cebf
3 changed files with 75 additions and 36 deletions

View File

@ -633,13 +633,12 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), minPoppedTagVersion(0),
minPoppedTag(invalidTag), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), blockingPeeks("BlockingPeeks", cc),
blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()),
protocolVersion(protocolVersion), newPersistentDataVersion(invalidVersion), tLogData(tLogData),
unrecoveredBefore(1), recoveredAt(1), logSystem(new AsyncVar<Reference<ILogSystem>>()), remoteTag(remoteTag),
isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0),
locality(tagLocalityInvalid), recruitmentID(recruitmentID), logSpillType(logSpillType),
allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false),
txsTags(txsTags) {
blockingPeekTimeouts("BlockingPeekTimeouts", cc), logId(interf.id()), protocolVersion(protocolVersion),
newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1),
logSystem(new AsyncVar<Reference<ILogSystem>>()), remoteTag(remoteTag), isPrimary(isPrimary),
logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid),
recruitmentID(recruitmentID), logSpillType(logSpillType), allTags(tags.begin(), tags.end()),
terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false), txsTags(txsTags) {
startRole(Role::TRANSACTION_LOG,
interf.id(),
tLogData->workerID,
@ -1170,17 +1169,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
}
uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion;
if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE &&
(logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning
(tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag
if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE &&
(logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning
(tagData->unpoppedRecovered ||
PoppedVersionLag >=
SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag
TraceEvent("TLogPopDetails", logData->logId)
.detail("Tag", tagData->tag.toString())
.detail("UpTo", upTo)
.detail("PoppedVersionLag", PoppedVersionLag)
.detail("MinPoppedTag", logData->minPoppedTag.toString())
.detail("QueuePoppedVersion", logData->queuePoppedVersion)
.detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False")
.detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False");
.detail("Tag", tagData->tag.toString())
.detail("UpTo", upTo)
.detail("PoppedVersionLag", PoppedVersionLag)
.detail("MinPoppedTag", logData->minPoppedTag.toString())
.detail("QueuePoppedVersion", logData->queuePoppedVersion)
.detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False")
.detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False");
}
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop));
@ -1743,8 +1744,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
return Void();
}
if (req.begin > logData->persistentDataDurableVersion && !req.onlySpilled && req.tag.locality >= 0 &&
!req.returnIfBlocked) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.begin > logData->persistentDataDurableVersion && !req.onlySpilled &&
req.tag.locality >= 0 && !req.returnIfBlocked) {
wait(waitForMessagesForTag(logData, &req, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT));
}
state Version endVersion = logData->version.get() + 1;

View File

@ -1229,11 +1229,11 @@ ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
void updateLiveCommittedVersion(Reference<MasterData> self, ReportRawCommittedVersionRequest req) {
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
self->ssVersionVector.setVersion(req.writtenTags.get(), req.version);
}
if (req.version > self->liveCommittedVersion.get()) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
self->ssVersionVector.setVersion(req.writtenTags.get(), req.version);
}
self->databaseLocked = req.locked;
self->proxyMetadataVersion = req.metadataVersion;
// Note the set call switches context to any waiters on liveCommittedVersion before continuing.

View File

@ -1176,13 +1176,20 @@ ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version,
// If the latest commit version that mutated the shard(s) being served by the specified storage
// server is below the client specified read version then do a read at the latest commit version
// of the storage server.
Version getRealReadVersion(VersionVector& ssLatestCommitVersions, Tag& tag, Version specifiedReadVersion)
{
Version realReadVersion = ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : specifiedReadVersion;
Version getRealReadVersion(VersionVector& ssLatestCommitVersions, Tag& tag, Version specifiedReadVersion) {
Version realReadVersion =
ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : specifiedReadVersion;
ASSERT(realReadVersion <= specifiedReadVersion);
return realReadVersion;
}
// Find the latest commit version of the given tag.
Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag) {
Version commitVersion =
ssLatestCommitVersions.hasVersion(tag) ? ssLatestCommitVersions.getVersion(tag) : invalidVersion;
return commitVersion;
}
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
@ -1204,6 +1211,37 @@ Future<Version> waitForVersion(StorageServer* data, Version version, SpanID span
return waitForVersionActor(data, version, spanContext);
}
Future<Version> waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) {
ASSERT(commitVersion == invalidVersion || commitVersion < readVersion);
if (commitVersion == invalidVersion) {
return waitForVersion(data, readVersion, spanContext);
}
if (readVersion == latestVersion) {
readVersion = std::max(Version(1), data->version.get());
}
if (readVersion < data->oldestVersion.get() || readVersion <= 0) {
return transaction_too_old();
} else {
if (commitVersion < data->oldestVersion.get()) {
return data->oldestVersion.get();
} else if (commitVersion <= data->version.get()) {
return commitVersion;
}
}
if ((data->behind || data->versionBehind) && commitVersion > data->version.get()) {
return process_behind();
}
if (deterministicRandom()->random01() < 0.001) {
TraceEvent("WaitForVersion1000x");
}
return waitForVersionActor(data, std::max(commitVersion, data->oldestVersion.get()), spanContext);
}
ACTOR Future<Version> waitForVersionNoTooOld(StorageServer* data, Version version) {
// This could become an Actor transparently, but for now it just does the lookup
if (version == latestVersion)
@ -1245,10 +1283,8 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
"getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
// If the client specified the latest commit version (that mutated the shard(s) being served
// by this storage server) then return the value that corresponds to that version.
Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version);
state Version version = wait(waitForVersion(data, readVersion, req.spanContext));
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, req.spanContext));
if (req.debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.debugID.get().first(),
@ -1947,8 +1983,9 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
try {
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version);
state Version version = wait(waitForVersion(data, readVersion, span.context));
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -2114,8 +2151,9 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
if (req.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.Before");
Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version);
state Version version = wait(waitForVersion(data, readVersion, span.context));
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -2284,8 +2322,8 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
wait(data->getQueryDelay());
try {
Version readVersion = getRealReadVersion(req.ssLatestCommitVersions, data->tag, req.version);
state Version version = wait(waitForVersion(data, readVersion, req.spanContext));
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, req.spanContext));
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange(data, req.sel);