Fix a race condition between batched peek and pop, where the server removal pop may be lost

This commit is contained in:
Zhe Wu 2022-11-01 16:02:42 -07:00
parent a5a5df715b
commit 32bc9b6ebb
5 changed files with 95 additions and 90 deletions

View File

@ -115,6 +115,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ENABLE_DETAILED_TLOG_POP_TRACE, false ); if ( randomize && BUGGIFY ) ENABLE_DETAILED_TLOG_POP_TRACE = true;
init( PEEK_BATCHING_EMPTY_MSG, false ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG = true;
init( PEEK_BATCHING_EMPTY_MSG_INTERVAL, 0.001 ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG_INTERVAL = 0.01;
init( POP_FROM_LOG_DELAY, 1 ); if ( randomize && BUGGIFY ) POP_FROM_LOG_DELAY = 0;
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );

View File

@ -110,6 +110,7 @@ public:
double BLOCKING_PEEK_TIMEOUT;
bool PEEK_BATCHING_EMPTY_MSG;
double PEEK_BATCHING_EMPTY_MSG_INTERVAL;
double POP_FROM_LOG_DELAY;
// Data distribution queue
double HEALTH_POLL_TIME;

View File

@ -514,35 +514,38 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
state double startTime = now();
Version poppedVer = poppedVersion(self, reqTag);
if (poppedVer > reqBegin || reqBegin < self->startVersion) {
// This should only happen if a packet is sent multiple times and the reply is not needed.
// Since we are using popped differently, do not send a reply.
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid)
.detail("Begin", reqBegin)
.detail("Popped", poppedVer)
.detail("Start", self->startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
}
replyPromise.send(Never());
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
}
state Version poppedVer;
state Version endVersion;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
poppedVer = poppedVersion(self, reqTag);
if (poppedVer > reqBegin || reqBegin < self->startVersion) {
// This should only happen if a packet is sent multiple times and the reply is not needed.
// Since we are using popped differently, do not send a reply.
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid)
.detail("Begin", reqBegin)
.detail("Popped", poppedVer)
.detail("Start", self->startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
}
replyPromise.send(Never());
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
}
ASSERT_WE_THINK(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion);
endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);

View File

@ -1802,75 +1802,76 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
}
state double workStart = now();
state Version poppedVer = poppedVersion(logData, reqTag);
auto tagData = logData->getTagData(reqTag);
bool tagRecovered = tagData && !tagData->unpoppedRecovered;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && poppedVer <= reqBegin &&
reqBegin > logData->persistentDataDurableVersion && !reqOnlySpilled && reqTag.locality >= 0 &&
!reqReturnIfBlocked && tagRecovered) {
state double startTime = now();
// TODO (version vector) check if this should be included in "status details" json
// TODO (version vector) all tags may be too many, instead, standard deviation?
wait(waitForMessagesForTag(logData, reqTag, reqBegin, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT));
double latency = now() - startTime;
if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) {
UID ssID = nondeterministicRandom()->randomUniqueID();
std::string s = "BlockingPeekLatencies-" + reqTag.toString();
logData->blockingPeekLatencies.try_emplace(
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
}
LatencySample& sample = logData->blockingPeekLatencies.at(reqTag);
sample.addMeasurement(latency);
poppedVer = poppedVersion(logData, reqTag);
}
DebugLogTraceEvent("TLogPeekMessages2", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("PoppedVer", poppedVer);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = reqBegin;
}
replyPromise.send(rep);
return Void();
}
state Version poppedVer;
state Version endVersion;
state bool onlySpilled;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
poppedVer = poppedVersion(logData, reqTag);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && poppedVer <= reqBegin &&
reqBegin > logData->persistentDataDurableVersion && !reqOnlySpilled && reqTag.locality >= 0 &&
!reqReturnIfBlocked) {
state double startTime = now();
// TODO (version vector) check if this should be included in "status details" json
// TODO (version vector) all tags may be too many, instead, standard deviation?
wait(waitForMessagesForTag(logData, reqTag, reqBegin, SERVER_KNOBS->BLOCKING_PEEK_TIMEOUT));
double latency = now() - startTime;
if (logData->blockingPeekLatencies.find(reqTag) == logData->blockingPeekLatencies.end()) {
UID ssID = nondeterministicRandom()->randomUniqueID();
std::string s = "BlockingPeekLatencies-" + reqTag.toString();
logData->blockingPeekLatencies.try_emplace(
reqTag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE);
}
LatencySample& sample = logData->blockingPeekLatencies.at(reqTag);
sample.addMeasurement(latency);
poppedVer = poppedVersion(logData, reqTag);
}
DisabledTraceEvent("TLogPeekMessages1", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("PoppedVer", poppedVer);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
rep.onlySpilled = false;
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
trackerData.lastUpdate = now();
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
TEST(true); // tlog peek second attempt ended at a different version
replyPromise.sendError(operation_obsolete());
return Void();
}
} else {
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
}
rep.begin = reqBegin;
}
replyPromise.send(rep);
return Void();
}
ASSERT_WE_THINK(reqBegin >= poppedVersion(logData, reqTag));
endVersion = logData->version.get() + 1;
onlySpilled = false;

View File

@ -1449,8 +1449,7 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom
}
if (prev == 0) {
// pop tag from log upto version defined in outstandingPops[].first
popActors.add(
popFromLog(this, log, tag, /*delayBeforePop*/ 1.0, /*popLogRouter=*/false)); //< FIXME: knob
popActors.add(popFromLog(this, log, tag, SERVER_KNOBS->POP_FROM_LOG_DELAY, /*popLogRouter=*/false));
}
}
}