Address comments

This commit is contained in:
Zhe Wu 2022-02-16 13:48:02 -08:00
parent 9da735c38e
commit e07ae6fdb9
2 changed files with 8 additions and 6 deletions

View File

@ -544,20 +544,21 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);
// Reply the peek request when
// - Have data return to the caller, or
// - Batching empty peek is disabled, or
// - Batching empty peek interval has been reached.
if (messages.getLength() > 0 || !SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG ||
now() - startTime > SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL) {
break;
}
// Reply the peek request when
// - Have data return to the caller, or
// - Batching empty peek is disabled, or
// - Batching empty peek interval has been reached.
state Version waitUntilVersion = self->version.get() + 1;
// Currently, from `reqBegin` to self->version are all empty peeks. Wait for more version, or the empty batching
// interval has expired.
wait(self->version.whenAtLeast(waitUntilVersion) || delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL));
wait(self->version.whenAtLeast(waitUntilVersion) ||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - startTime)));
if (self->version.get() < waitUntilVersion) {
break; // We know that from `reqBegin` to self->version are all empty messages. Skip re-executing the peek
// logic.

View File

@ -1896,7 +1896,8 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
// Currently, from `reqBegin` to logData->version are all empty peeks. Wait for more versions, or the empty
// batching interval has expired.
wait(logData->version.whenAtLeast(waitUntilVersion) || delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL));
wait(logData->version.whenAtLeast(waitUntilVersion) ||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - blockStart)));
if (logData->version.get() < waitUntilVersion) {
break; // We know that from `reqBegin` to logData->version are all empty messages. Skip re-executing the
// peek logic.