From 48d2cb77e601744def1c86c036d570c30ed0733e Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 15 Feb 2019 14:33:01 -0800 Subject: [PATCH] fix: the only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is when processing epoch end. Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version --- fdbserver/LogRouter.actor.cpp | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index a7f5d7e084..fae3ce0e27 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -84,6 +84,7 @@ struct LogRouterData { Tag routerTag; bool allowPops; LogSet logSet; + bool foundEpochEnd; std::vector> tag_data; //we only store data for the remote tag locality @@ -102,7 +103,7 @@ struct LogRouterData { return newTagData; } - LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0) { + LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), foundEpochEnd(false) { //setup just enough of a logSet to be able to call getPushLocations logSet.logServers.resize(req.tLogLocalities.size()); logSet.tLogPolicy = req.tLogPolicy; @@ -169,6 +170,27 @@ void commitMessages( LogRouterData* self, Version version, const std::vectormessageBlocks.push_back( std::make_pair(version, block) ); } +ACTOR Future waitForVersion( LogRouterData *self, Version ver ) { + // The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is when processing epoch end. + // Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version. + if(!self->foundEpochEnd) { + wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))); + } else { + while(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) { + if(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS > self->version.get()) { + self->version.set( self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS ); + wait(yield(TaskTLogCommit)); + } else { + wait(self->minPopped.whenAtLeast((self->minPopped.get()+1))); + } + } + } + if(ver >= self->startVersion + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT) { + self->foundEpochEnd = true; + } + return Void(); +} + ACTOR Future pullAsyncData( LogRouterData *self ) { state Future dbInfoChange = Void(); state Reference r; @@ -203,7 +225,8 @@ ACTOR Future pullAsyncData( LogRouterData *self ) { if (!foundMessage || r->version().version != ver) { ASSERT(r->version().version > lastVer); if (ver) { - wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))); + wait( waitForVersion(self, ver) ); + commitMessages(self, ver, messages); self->version.set( ver ); wait(yield(TaskTLogCommit)); @@ -216,7 +239,8 @@ ACTOR Future pullAsyncData( LogRouterData *self ) { if (!foundMessage) { ver--; //ver is the next possible version we will get data for if(ver > self->version.get()) { - wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))); + wait( waitForVersion(self, ver) ); + self->version.set( ver ); wait(yield(TaskTLogCommit)); }