Fix decoder for unfinished version batch in a log

A mutation log's version batch data can be split into multiple blocks, and some
of the blocks can be spread across two mutation logs. Thus, the decoder needs
copy unfinished version batch data from previous file progress to the next one.
This commit is contained in:
Jingyu Zhou 2020-03-30 11:34:51 -07:00
parent 411b4c28ac
commit 3c32835cce
1 changed files with 21 additions and 6 deletions

View File

@ -219,10 +219,16 @@ struct VersionedMutations {
*/
struct DecodeProgress {
DecodeProgress() = default;
DecodeProgress(const LogFile& file) : file(file) {}
DecodeProgress(const LogFile& file, const std::vector<std::tuple<Arena, Version, int32_t, StringRef>>& values)
: file(file), keyValues(values) {}
// If there are no more mutations to pull.
bool finished() { return eof && keyValues.empty(); }
// If there are no more mutations to pull from the file.
// However, we could have unfinished version in the buffer when EOF is true,
// which means we should look for data in the next file. The
// call should call getUnfinishedBuffer() to get these left data.
bool finished() { return eof; }
std::vector<std::tuple<Arena, Version, int32_t, StringRef>> getUnfinishedBuffer() { return keyValues; }
// Returns all mutations of the next version in a batch.
Future<VersionedMutations> getNextBatch() { return getNextBatchImpl(this); }
@ -242,6 +248,8 @@ struct DecodeProgress {
// PRECONDITION: finished() must return false before calling this function.
// Returns the next batch of mutations along with the arena backing it.
// Note the returned batch can be empty when the file has unfinished
// version batch data that are in the next file.
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
ASSERT(!self->finished());
@ -292,8 +300,9 @@ struct DecodeProgress {
// Read one more block, hopefully the missing part of the value can be found.
wait(readAndDecodeFile(self));
} else {
TraceEvent(SevError, "MissingValue").detail("Version", m.version);
throw restore_corrupted_data();
TraceEvent(SevWarn, "MissingValue").detail("Version", m.version);
m.arena = Arena();
return m; // Empty mutations
}
}
}
@ -432,8 +441,10 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
printLogFiles("Relevant files are: ", logs);
state int i = 0;
// Previous file's unfinished version data
state std::vector<std::tuple<Arena, Version, int32_t, StringRef>> left;
for (; i < logs.size(); i++) {
state DecodeProgress progress(logs[i]);
state DecodeProgress progress(logs[i], left);
wait(progress.openFile(container));
while (!progress.finished()) {
VersionedMutations vms = wait(progress.getNextBatch());
@ -441,6 +452,10 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
std::cout << vms.version << " " << m.toString() << "\n";
}
}
left = progress.getUnfinishedBuffer();
if (!left.empty()) {
TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size());
}
}
return Void();
}