fix: peek cursors were being timed out every 10 minutes, instead of 10 minutes after the last use
fix: if an interface is changed while we are not waiting in getMore, we will not reset the sequence to 0.
This commit is contained in:
parent
29a38d1a51
commit
2335fc73f2
|
@ -115,6 +115,7 @@ struct ILogSystem {
|
|||
bool parallelGetMore;
|
||||
int sequence;
|
||||
Deque<Future<TLogPeekReply>> futureResults;
|
||||
Future<Void> interfaceChanged;
|
||||
|
||||
ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore );
|
||||
|
||||
|
|
|
@ -119,6 +119,10 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
throw internal_error();
|
||||
}
|
||||
|
||||
if(!self->interfaceChanged.isValid()) {
|
||||
self->interfaceChanged = self->interf->onChange();
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
|
||||
|
@ -139,7 +143,9 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
//TraceEvent("SPC_getMoreB", self->randomID).detail("has", self->hasMessage()).detail("end", res.end).detail("popped", res.popped.present() ? res.popped.get() : 0);
|
||||
return Void();
|
||||
}
|
||||
when( Void _ = wait( self->interf->onChange() ) ) {
|
||||
when( Void _ = wait( self->interfaceChanged ) ) {
|
||||
self->interfaceChanged = self->interf->onChange();
|
||||
self->randomID = g_random->randomUniqueID();
|
||||
self->sequence = 0;
|
||||
self->futureResults.clear();
|
||||
}
|
||||
|
@ -150,6 +156,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
return Void();
|
||||
} else if(e.code() == error_code_timed_out) {
|
||||
TraceEvent("PeekCursorTimedOut", self->randomID);
|
||||
self->interfaceChanged = self->interf->onChange();
|
||||
self->randomID = g_random->randomUniqueID();
|
||||
self->sequence = 0;
|
||||
self->futureResults.clear();
|
||||
|
|
|
@ -1127,11 +1127,11 @@ ACTOR Future<Void> respondToRecovered( TLogInterface tli, Future<Void> recovery
|
|||
|
||||
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
|
||||
loop {
|
||||
double minExpireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
|
||||
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
|
||||
auto it = self->peekTracker.begin();
|
||||
while(it != self->peekTracker.end()) {
|
||||
double expireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now()-it->second.lastUpdate;
|
||||
if(expireTime < 1.0e-6) {
|
||||
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
|
||||
if(timeUntilExpiration < 1.0e-6) {
|
||||
for(auto seq : it->second.sequence_version) {
|
||||
if(!seq.second.isSet()) {
|
||||
seq.second.sendError(timed_out());
|
||||
|
@ -1139,12 +1139,12 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
|
|||
}
|
||||
it = self->peekTracker.erase(it);
|
||||
} else {
|
||||
minExpireTime = std::min(minExpireTime, expireTime);
|
||||
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
Void _ = wait( delay(minExpireTime) );
|
||||
Void _ = wait( delay(minTimeUntilExpiration) );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue