added yields to the log router and tlogs after processing a version

This commit is contained in:
Evan Tschannen 2018-09-04 17:16:44 -07:00
parent 21f5cf9ce9
commit 1022e0a5c6
2 changed files with 11 additions and 7 deletions

View File

@ -206,6 +206,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
Void _ = wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
commitMessages(self, ver, messages);
self->version.set( ver );
Void _ = wait(yield(TaskTLogCommit));
//TraceEvent("LogRouterVersion").detail("Ver",ver);
}
lastVer = ver;
@ -217,6 +218,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
if(ver > self->version.get()) {
Void _ = wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
self->version.set( ver );
Void _ = wait(yield(TaskTLogCommit));
}
break;
}

View File

@ -1477,14 +1477,14 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
Void _ = wait( delayJittered(.005, TaskTLogCommit) );
}
if(logData->stopped) {
return Void();
}
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
loop {
if(logData->stopped) {
return Void();
}
Version ver = 0;
std::vector<TagsAndMessage> messages;
while (true) {
bool foundMessage = r->hasMessage();
state bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) {
ASSERT(r->version().version > lastVer);
if (ver) {
@ -1519,6 +1519,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
lastVer = ver;
ver = r->version().version;
@ -1556,6 +1557,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
break;
}