Copy same changes to OldTLogServer_6_0.

This commit is contained in:
Alex Miller 2019-07-15 16:43:43 -07:00
parent 2cbc05fc72
commit 32af11286b
1 changed files with 38 additions and 21 deletions

View File

@ -1023,26 +1023,32 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if(sequence > 0) {
auto& trackerData = logData->peekTracker[peekId];
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = ver;
wait(yield());
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0) {
Promise<Version> firstRequest;
trackerData.sequence_version[0] = firstRequest;
firstRequest.send(req.begin);
}
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if (seqBegin != trackerData.sequence_version.end() && !seqBegin->second.isSet()) {
seqBegin->second.send(req.begin);
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = ver;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
@ -1055,6 +1061,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
if (!sequenceData.isSet()) {
sequenceData.send(req.begin);
}
}
return Void();
}
@ -1091,12 +1104,14 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
@ -1166,11 +1181,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version