Batch empty peek reply

This commit is contained in:
Zhe Wu 2022-02-15 14:08:45 -08:00
parent 3da99fa55a
commit 9da735c38e
4 changed files with 174 additions and 117 deletions

View File

@ -103,6 +103,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TLOG_POP_BATCH_SIZE, 1000 ); if ( randomize && BUGGIFY ) TLOG_POP_BATCH_SIZE = 10;
init( TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE, 250e6 );
init( ENABLE_DETAILED_TLOG_POP_TRACE, false ); if ( randomize && BUGGIFY ) ENABLE_DETAILED_TLOG_POP_TRACE = true;
init( PEEK_BATCHING_EMPTY_MSG, false ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG = true;
init( PEEK_BATCHING_EMPTY_MSG_INTERVAL, 0.001 ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG_INTERVAL = 0.01;
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );

View File

@ -106,6 +106,8 @@ public:
double PUSH_STATS_SLOW_AMOUNT;
double PUSH_STATS_SLOW_RATIO;
int TLOG_POP_BATCH_SIZE;
bool PEEK_BATCHING_EMPTY_MSG;
double PEEK_BATCHING_EMPTY_MSG_INTERVAL;
// Data distribution queue
double HEALTH_POLL_TIME;

View File

@ -515,6 +515,8 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
state double startTime = now();
Version poppedVer = poppedVersion(self, reqTag);
if (poppedVer > reqBegin || reqBegin < self->startVersion) {
@ -535,8 +537,32 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
return Void();
}
Version endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);
state Version endVersion;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);
if (messages.getLength() > 0 || !SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG ||
now() - startTime > SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL) {
break;
}
// Reply the peek request when
// - Have data return to the caller, or
// - Batching empty peek is disabled, or
// - Batching empty peek interval has been reached.
state Version waitUntilVersion = self->version.get() + 1;
// Currently, from `reqBegin` to self->version are all empty peeks. Wait for more version, or the empty batching
// interval has expired.
wait(self->version.whenAtLeast(waitUntilVersion) || delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL));
if (self->version.get() < waitUntilVersion) {
break; // We know that from `reqBegin` to self->version are all empty messages. Skip re-executing the peek
// logic.
}
}
TLogPeekReply reply;
reply.maxKnownVersion = self->version.get();

View File

@ -1740,140 +1740,167 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
return Void();
}
state Version endVersion = logData->version.get() + 1;
state bool onlySpilled = false;
state Version endVersion;
state bool onlySpilled;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We
// may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if
// an initial attempt to read from disk results in insufficient data and the required data is no longer in
// memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the
// result?
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
endVersion = logData->version.get() + 1;
onlySpilled = false;
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
}
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from
// memory. We may or may not actually send it depending on whether we get enough data from disk. SOMEDAY:
// Only do this if an initial attempt to read from disk results in insufficient data and the required data
// is no longer in memory SOMEDAY: Should we only send part of the messages we collected, to actually limit
// the size of the result?
if (logData->shouldSpillByValue(reqTag)) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin),
persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
messages.serializeBytes(messages2.toValue());
peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion);
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, reqTag, reqBegin),
persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
if (logData->shouldSpillByValue(reqTag)) {
RangeResult kvs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessagesKey(logData->logId, reqTag, reqBegin),
persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->DESIRED_TOTAL_BYTES,
SERVER_KNOBS->DESIRED_TOTAL_BYTES));
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
for (auto& kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << VERSION_HEADER << ver;
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
RangeResult kvrefs = wait(self->persistentData->readRange(
KeyRangeRef(
persistTagMessageRefsKey(logData->logId, reqTag, reqBegin),
persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations;
state bool earlyEnd = false;
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
for (const SpilledData& sd : spilledData) {
if (mutationBytes >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
earlyEnd = true;
break;
}
if (sd.version >= reqBegin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd)
break;
}
if (sd.version >= reqBegin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
mutationBytes += sd.mutationBytes;
}
}
if (earlyEnd)
break;
}
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1);
wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes));
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1);
wait(self->peekMemoryLimiter.take(TaskPriority::TLogSpilledPeekReply, commitBytes));
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;
messageReads.reserve(commitLocations.size());
for (const auto& pair : commitLocations) {
messageReads.push_back(self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::True));
}
commitLocations.clear();
wait(waitForAll(messageReads));
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size())
break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4);
BinaryReader rd(queueEntryData, IncludeVersion());
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT(valid == 0x01);
ASSERT(length + sizeof(valid) == queueEntryData.size());
state Version lastRefMessageVersion = 0;
state int index = 0;
loop {
if (index >= messageReads.size())
break;
Standalone<StringRef> queueEntryData = messageReads[index].get();
uint8_t valid;
const uint32_t length = *(uint32_t*)queueEntryData.begin();
queueEntryData = queueEntryData.substr(4, queueEntryData.size() - 4);
BinaryReader rd(queueEntryData, IncludeVersion());
state TLogQueueEntry entry;
rd >> entry >> valid;
ASSERT(valid == 0x01);
ASSERT(length + sizeof(valid) == queueEntryData.size());
messages << VERSION_HEADER << entry.version;
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId)
.detail("DebugID", self->dbgid)
.detail("PeekTag", reqTag);
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId)
.detail("DebugID", self->dbgid)
.detail("PeekTag", reqTag);
}
lastRefMessageVersion = entry.version;
index++;
}
lastRefMessageVersion = entry.version;
index++;
}
messageReads.clear();
memoryReservation.release();
messageReads.clear();
memoryReservation.release();
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
if (earlyEnd) {
endVersion = lastRefMessageVersion + 1;
onlySpilled = true;
} else {
messages.serializeBytes(messages2.toValue());
}
}
}
} else {
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
if (reqOnlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion);
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
}
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
// Reply the peek request when
// - Have data return to the caller, or
// - Batching empty peek is disabled, or
// - Batching empty peek interval has been reached.
if (messages.getLength() > 0 || !SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG ||
(now() - blockStart > SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL)) {
break;
}
state Version waitUntilVersion = logData->version.get() + 1;
// Currently, from `reqBegin` to logData->version are all empty peeks. Wait for more versions, or the empty
// batching interval has expired.
wait(logData->version.whenAtLeast(waitUntilVersion) || delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL));
if (logData->version.get() < waitUntilVersion) {
break; // We know that from `reqBegin` to logData->version are all empty messages. Skip re-executing the
// peek logic.
}
}
TLogPeekReply reply;