Add cleanupPeekTrackers to LogRouter

This commit is contained in:
Alex Miller 2019-09-12 16:27:39 -07:00
parent 324289039a
commit be1f370457
1 changed files with 24 additions and 0 deletions

View File

@ -446,6 +446,29 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
return Void();
}
ACTOR Future<Void> cleanupPeekTrackers( LogRouterData* self ) {
loop {
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
auto it = self->peekTracker.begin();
while(it != self->peekTracker.end()) {
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
if(timeUntilExpiration < 1.0e-6) {
for(auto seq : it->second.sequence_version) {
if(!seq.second.isSet()) {
seq.second.sendError(timed_out());
}
}
it = self->peekTracker.erase(it);
} else {
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
++it;
}
}
wait( delay(minTimeUntilExpiration) );
}
}
ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
auto tagData = self->getTagData(req.tag);
if (!tagData) {
@ -491,6 +514,7 @@ ACTOR Future<Void> logRouterCore(
state Future<Void> dbInfoChange = Void();
addActor.send( pullAsyncData(&logRouterData) );
addActor.send( cleanupPeekTrackers(&logRouterData) );
loop choose {
when( wait( dbInfoChange ) ) {