Add an ASSERT_WE_THINK that peek cursors don't get timed_out()

This should prevent us from regressing and having multi-region
recoveries hang for 10min again.
This commit is contained in:
Alex Miller 2019-10-14 18:05:06 -07:00
parent 0662f8dba0
commit 1cb311fcb8
7 changed files with 60 additions and 48 deletions

View File

@ -351,14 +351,14 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
throw operation_obsolete();
}
trackerData.lastUpdate = now();
@ -367,8 +367,8 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
@ -428,15 +428,15 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {

View File

@ -166,7 +166,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
if(res.begin.get() != expectedBegin) {
throw timed_out();
throw operation_obsolete();
}
expectedBegin = res.end;
self->futureResults.pop_front();
@ -200,8 +200,11 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
if(e.code() == error_code_end_of_stream) {
self->end.reset( self->messageVersion.version );
return Void();
} else if(e.code() == error_code_timed_out) {
TraceEvent("PeekCursorTimedOut", self->randomID);
} else if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
// and thus we've likely just wasted 10min.
ASSERT_WE_THINK(e.code() == error_code_operation_obsolete || SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10);
self->interfaceChanged = self->interf->onChange();
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;

View File

@ -886,8 +886,8 @@ namespace oldTLog_4_6 {
wait(yield());
}
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
@ -923,15 +923,15 @@ namespace oldTLog_4_6 {
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.sendError(operation_obsolete());
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
@ -1002,7 +1002,7 @@ namespace oldTLog_4_6 {
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {

View File

@ -1056,14 +1056,14 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
throw operation_obsolete();
}
trackerData.lastUpdate = now();
@ -1072,8 +1072,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
@ -1129,15 +1129,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.sendError(operation_obsolete());
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
@ -1205,15 +1205,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.sendError(operation_obsolete());
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {

View File

@ -1359,14 +1359,14 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
throw operation_obsolete();
}
trackerData.lastUpdate = now();
@ -1375,8 +1375,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
throw;
@ -1432,15 +1432,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.sendError(operation_obsolete());
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
@ -1596,15 +1596,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.sendError(operation_obsolete());
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {

View File

@ -1378,24 +1378,31 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
seqBegin->second.sendError(operation_obsolete());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
throw operation_obsolete();
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = prevPeekData.first;
req.begin = std::max(prevPeekData.first, req.begin);
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
return Void();
} else {
throw;
@ -1408,6 +1415,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
@ -1451,15 +1459,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {
@ -1612,18 +1620,18 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
return Void();
}
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
req.reply.sendError(operation_obsolete());
return Void();
}
} else {

View File

@ -34,6 +34,7 @@ ERROR( success, 0, "Success" )
ERROR( end_of_stream, 1, "End of stream" )
ERROR( operation_failed, 1000, "Operation failed")
ERROR( wrong_shard_server, 1001, "Shard is not available from this server")
ERROR( operation_obsolete, 1002, "Operation result no longer necessary")
ERROR( timed_out, 1004, "Operation timed out" )
ERROR( coordinated_state_conflict, 1005, "Conflict occurred while changing coordination information" )
ERROR( all_alternatives_failed, 1006, "All alternatives failed" )