Add logging to LogRouter's waitForVersion function (#11359)
* Add logging to waitForVersion * Respond to review comments. --------- Co-authored-by: Dan Lambright <hlambright@apple.com>
This commit is contained in:
parent
3f8fcf5d0f
commit
5d6333fab9
|
@ -256,6 +256,7 @@ ACTOR Future<Void> waitForVersion(LogRouterData* self, Version ver) {
|
||||||
// The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is
|
// The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is
|
||||||
// when processing epoch end. Since one set of log routers is created per generation of transaction logs, the gap
|
// when processing epoch end. Since one set of log routers is created per generation of transaction logs, the gap
|
||||||
// caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
|
// caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
|
||||||
|
|
||||||
state double startTime = now();
|
state double startTime = now();
|
||||||
if (self->version.get() < self->startVersion) {
|
if (self->version.get() < self->startVersion) {
|
||||||
// Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion,
|
// Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion,
|
||||||
|
@ -297,6 +298,26 @@ ACTOR Future<Void> waitForVersion(LogRouterData* self, Version ver) {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> waitForVersionAndLog(LogRouterData* self, Version ver) {
|
||||||
|
state Future<Void> f = waitForVersion(self, ver);
|
||||||
|
state double emitInterval = 60.0;
|
||||||
|
loop {
|
||||||
|
choose {
|
||||||
|
when(wait(f)) {
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
when(wait(delay(emitInterval))) {
|
||||||
|
TraceEvent("LogRouterWaitForVersionLongDelay", self->dbgid)
|
||||||
|
.detail("WaitForVersion", ver)
|
||||||
|
.detail("StartVersion", self->startVersion)
|
||||||
|
.detail("Version", self->version.get())
|
||||||
|
.detail("MinPopped", self->minPopped.get())
|
||||||
|
.detail("FoundEpochEnd", self->foundEpochEnd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ACTOR Future<Reference<ILogSystem::IPeekCursor>> getPeekCursorData(LogRouterData* self,
|
ACTOR Future<Reference<ILogSystem::IPeekCursor>> getPeekCursorData(LogRouterData* self,
|
||||||
Reference<ILogSystem::IPeekCursor> r,
|
Reference<ILogSystem::IPeekCursor> r,
|
||||||
Version startVersion) {
|
Version startVersion) {
|
||||||
|
@ -379,7 +400,7 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
|
||||||
if (!foundMessage || r->version().version != ver) {
|
if (!foundMessage || r->version().version != ver) {
|
||||||
ASSERT(r->version().version > lastVer);
|
ASSERT(r->version().version > lastVer);
|
||||||
if (ver) {
|
if (ver) {
|
||||||
wait(waitForVersion(self, ver));
|
wait(waitForVersionAndLog(self, ver));
|
||||||
|
|
||||||
commitMessages(self, ver, messages);
|
commitMessages(self, ver, messages);
|
||||||
self->version.set(ver);
|
self->version.set(ver);
|
||||||
|
@ -394,7 +415,7 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
|
||||||
if (!foundMessage) {
|
if (!foundMessage) {
|
||||||
ver--; // ver is the next possible version we will get data for
|
ver--; // ver is the next possible version we will get data for
|
||||||
if (ver > self->version.get() && ver >= r->popped()) {
|
if (ver > self->version.get() && ver >= r->popped()) {
|
||||||
wait(waitForVersion(self, ver));
|
wait(waitForVersionAndLog(self, ver));
|
||||||
|
|
||||||
self->version.set(ver);
|
self->version.set(ver);
|
||||||
wait(yield(TaskPriority::TLogCommit));
|
wait(yield(TaskPriority::TLogCommit));
|
||||||
|
|
Loading…
Reference in New Issue