fix: the only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is when processing epoch end. Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version

This commit is contained in:
Evan Tschannen 2019-02-15 14:33:01 -08:00
parent 6d7d40bcb3
commit 48d2cb77e6
1 changed files with 27 additions and 3 deletions

View File

@ -84,6 +84,7 @@ struct LogRouterData {
Tag routerTag;
bool allowPops;
LogSet logSet;
bool foundEpochEnd;
std::vector<Reference<TagData>> tag_data; //we only store data for the remote tag locality
@ -102,7 +103,7 @@ struct LogRouterData {
return newTagData;
}
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0) {
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), foundEpochEnd(false) {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
@ -169,6 +170,27 @@ void commitMessages( LogRouterData* self, Version version, const std::vector<Tag
self->messageBlocks.push_back( std::make_pair(version, block) );
}
ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
// The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is when processing epoch end.
// Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
if(!self->foundEpochEnd) {
wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
} else {
while(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) {
if(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS > self->version.get()) {
self->version.set( self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS );
wait(yield(TaskTLogCommit));
} else {
wait(self->minPopped.whenAtLeast((self->minPopped.get()+1)));
}
}
}
if(ver >= self->startVersion + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT) {
self->foundEpochEnd = true;
}
return Void();
}
ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
@ -203,7 +225,8 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
if (!foundMessage || r->version().version != ver) {
ASSERT(r->version().version > lastVer);
if (ver) {
wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
wait( waitForVersion(self, ver) );
commitMessages(self, ver, messages);
self->version.set( ver );
wait(yield(TaskTLogCommit));
@ -216,7 +239,8 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
if (!foundMessage) {
ver--; //ver is the next possible version we will get data for
if(ver > self->version.get()) {
wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
wait( waitForVersion(self, ver) );
self->version.set( ver );
wait(yield(TaskTLogCommit));
}