Merge pull request #1812 from alexmiller-apple/improve-only-spilled

Improve the behavior of parallelPeekMore+onlySpilled.
This commit is contained in:
Vishesh Yadav 2019-07-10 17:15:19 -07:00 committed by GitHub
commit 2606794df6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 14 deletions

View File

@ -265,7 +265,7 @@ struct TLogData : NonCopyable {
int64_t overheadBytesDurable; int64_t overheadBytesDurable;
struct PeekTrackerData { struct PeekTrackerData {
std::map<int, Promise<Version>> sequence_version; std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate; double lastUpdate;
}; };
@ -1030,8 +1030,9 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
trackerData.lastUpdate = now(); trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture()); std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = ver; req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield()); wait(yield());
} }
} catch( Error &e ) { } catch( Error &e ) {
@ -1089,13 +1090,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
auto& sequenceData = trackerData.sequence_version[sequence+1]; auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) { if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != rep.end) { if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out()); req.reply.sendError(timed_out());
return Void(); return Void();
} }
} else { } else {
sequenceData.send(rep.end); sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
} }
rep.begin = req.begin; rep.begin = req.begin;
} }
@ -1163,13 +1164,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
auto& sequenceData = trackerData.sequence_version[sequence+1]; auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) { if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != reply.end) { if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out()); req.reply.sendError(timed_out());
return Void(); return Void();
} }
} else { } else {
sequenceData.send(reply.end); sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
} }
reply.begin = req.begin; reply.begin = req.begin;
} }

View File

@ -315,7 +315,7 @@ struct TLogData : NonCopyable {
int64_t overheadBytesDurable; int64_t overheadBytesDurable;
struct PeekTrackerData { struct PeekTrackerData {
std::map<int, Promise<Version>> sequence_version; std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate; double lastUpdate;
}; };
@ -1317,8 +1317,9 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
trackerData.lastUpdate = now(); trackerData.lastUpdate = now();
Version ver = wait(trackerData.sequence_version[sequence].getFuture()); std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = ver; req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield()); wait(yield());
} }
} catch( Error &e ) { } catch( Error &e ) {
@ -1376,13 +1377,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
auto& sequenceData = trackerData.sequence_version[sequence+1]; auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) { if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != rep.end) { if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out()); req.reply.sendError(timed_out());
return Void(); return Void();
} }
} else { } else {
sequenceData.send(rep.end); sequenceData.send(std::make_pair(rep.end, rep.onlySpilled));
} }
rep.begin = req.begin; rep.begin = req.begin;
} }
@ -1537,13 +1538,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
} }
auto& sequenceData = trackerData.sequence_version[sequence+1]; auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) { if(sequenceData.isSet()) {
if(sequenceData.getFuture().get() != reply.end) { if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out()); req.reply.sendError(timed_out());
return Void(); return Void();
} }
} else { } else {
sequenceData.send(reply.end); sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
} }
reply.begin = req.begin; reply.begin = req.begin;
} }