Refactor decoder
This commit is contained in:
parent
d7ac5830e6
commit
e3629ef356
|
@ -363,20 +363,190 @@ std::vector<MutationRef> decode_value(const StringRef& value) {
|
|||
return mutations;
|
||||
}
|
||||
|
||||
|
||||
// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and
|
||||
// returns (commitVersion, chunkNumber)
|
||||
std::pair<Version, int32_t> decodeLogKey(const StringRef& key) {
|
||||
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
|
||||
|
||||
uint8_t hash;
|
||||
Version version;
|
||||
int32_t part;
|
||||
BinaryReader rd(key, Unversioned());
|
||||
rd >> hash >> version >> part;
|
||||
version = bigEndian64(version);
|
||||
part = bigEndian32(part);
|
||||
|
||||
int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
|
||||
ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash);
|
||||
|
||||
return std::make_pair(version, part);
|
||||
}
|
||||
|
||||
// Decodes an encoded list of mutations in the format of:
|
||||
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
|
||||
// where a mutation is encoded as:
|
||||
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2]
|
||||
std::vector<MutationRef> decodeLogValue(const StringRef& value) {
|
||||
StringRefReader reader(value, restore_corrupted_data());
|
||||
|
||||
Version protocolVersion = reader.consume<uint64_t>();
|
||||
if (protocolVersion <= 0x0FDB00A200090001) {
|
||||
throw incompatible_protocol_version();
|
||||
}
|
||||
|
||||
uint32_t val_length = reader.consume<uint32_t>();
|
||||
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
|
||||
TraceEvent(SevError, "FileRestoreLogValueError")
|
||||
.detail("ValueLen", val_length)
|
||||
.detail("ValueSize", value.size())
|
||||
.detail("Value", printable(value));
|
||||
}
|
||||
|
||||
std::vector<MutationRef> mutations;
|
||||
while (1) {
|
||||
if (reader.eof())
|
||||
break;
|
||||
|
||||
// Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep()
|
||||
uint32_t type, p1len, p2len;
|
||||
type = reader.consume<uint32_t>();
|
||||
p1len = reader.consume<uint32_t>();
|
||||
p2len = reader.consume<uint32_t>();
|
||||
|
||||
const uint8_t* key = reader.consume(p1len);
|
||||
const uint8_t* val = reader.consume(p2len);
|
||||
|
||||
mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len));
|
||||
}
|
||||
return mutations;
|
||||
}
|
||||
|
||||
// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk,
|
||||
// in chunk order, and can check the chunk set for completion or intersection with a set
|
||||
// of ranges.
|
||||
struct AccumulatedMutations {
|
||||
AccumulatedMutations() : lastChunkNumber(-1) {}
|
||||
|
||||
// Add a KV pair for this mutation chunk set
|
||||
// It will be accumulated onto serializedMutations if the chunk number is
|
||||
// the next expected value.
|
||||
void addChunk(int chunkNumber, const KeyValueRef& kv) {
|
||||
if (chunkNumber == lastChunkNumber + 1) {
|
||||
lastChunkNumber = chunkNumber;
|
||||
serializedMutations += kv.value.toString();
|
||||
} else {
|
||||
lastChunkNumber = -2;
|
||||
serializedMutations.clear();
|
||||
}
|
||||
kvs.push_back(kv);
|
||||
}
|
||||
|
||||
// Returns true if both
|
||||
// - 1 or more chunks were added to this set
|
||||
// - The header of the first chunk contains a valid protocol version and a length
|
||||
// that matches the bytes after the header in the combined value in serializedMutations
|
||||
bool isComplete() const {
|
||||
if (lastChunkNumber >= 0) {
|
||||
StringRefReader reader(serializedMutations, restore_corrupted_data());
|
||||
|
||||
Version protocolVersion = reader.consume<uint64_t>();
|
||||
if (protocolVersion <= 0x0FDB00A200090001) {
|
||||
throw incompatible_protocol_version();
|
||||
}
|
||||
|
||||
uint32_t vLen = reader.consume<uint32_t>();
|
||||
return vLen == reader.remainder().size();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns true if a complete chunk contains any MutationRefs which intersect with any
|
||||
// range in ranges.
|
||||
// It is undefined behavior to run this if isComplete() does not return true.
|
||||
bool matchesAnyRange(const std::vector<KeyRange>& ranges) const {
|
||||
std::vector<MutationRef> mutations = decodeLogValue(serializedMutations);
|
||||
for (auto& m : mutations) {
|
||||
for (auto& r : ranges) {
|
||||
if (m.type == MutationRef::ClearRange) {
|
||||
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (r.contains(m.param1)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<KeyValueRef> kvs;
|
||||
std::string serializedMutations;
|
||||
int lastChunkNumber;
|
||||
};
|
||||
|
||||
|
||||
|
||||
struct VersionedMutations {
|
||||
Version version;
|
||||
std::vector<MutationRef> mutations;
|
||||
Arena arena; // The arena that contains the mutations.
|
||||
std::string serializedMutations; // buffer that contains mutations
|
||||
};
|
||||
|
||||
struct VersionedKVPart {
|
||||
Arena arena;
|
||||
Version version;
|
||||
int32_t part;
|
||||
StringRef kv;
|
||||
VersionedKVPart(Arena arena, Version version, int32_t part, StringRef kv)
|
||||
: arena(arena), version(version), part(part), kv(kv) {}
|
||||
};
|
||||
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file,
|
||||
int64_t offset,
|
||||
int len) {
|
||||
state Standalone<StringRef> buf = makeString(len);
|
||||
int rLen = wait(file->read(mutateString(buf), len, offset));
|
||||
if (rLen != len)
|
||||
throw restore_bad_read();
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
|
||||
state StringRefReader reader(buf, restore_corrupted_data());
|
||||
|
||||
try {
|
||||
// Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION
|
||||
if (reader.consume<int32_t>() != BACKUP_AGENT_MLOG_VERSION)
|
||||
throw restore_unsupported_file_version();
|
||||
|
||||
// Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte.
|
||||
while (1) {
|
||||
// If eof reached or first key len bytes is 0xFF then end of block was reached.
|
||||
if (reader.eof() || *reader.rptr == 0xFF)
|
||||
break;
|
||||
|
||||
// Read key and value. If anything throws then there is a problem.
|
||||
uint32_t kLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* k = reader.consume(kLen);
|
||||
uint32_t vLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* v = reader.consume(vLen);
|
||||
|
||||
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef(v, vLen)));
|
||||
}
|
||||
|
||||
// Make sure any remaining bytes in the block are 0xFF
|
||||
for (auto b : reader.remainder())
|
||||
if (b != 0xFF)
|
||||
throw restore_corrupted_data_padding();
|
||||
|
||||
return results;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "FileRestoreCorruptLogFileBlock")
|
||||
.error(e)
|
||||
.detail("Filename", file->getFilename())
|
||||
.detail("BlockOffset", offset)
|
||||
.detail("BlockLen", len)
|
||||
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
|
||||
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Model a decoding progress for a mutation file. Usage is:
|
||||
|
@ -395,174 +565,78 @@ struct VersionedKVPart {
|
|||
* at any time this object might have two blocks of data in memory.
|
||||
*/
|
||||
class DecodeProgress {
|
||||
std::vector<VersionedKVPart> keyValues;
|
||||
Standalone<VectorRef<KeyValueRef>> blocks;
|
||||
std::unordered_map<Version, AccumulatedMutations> mutationBlocksByVersion;
|
||||
|
||||
public:
|
||||
DecodeProgress() = default;
|
||||
template <class U>
|
||||
DecodeProgress(const LogFile& file, U&& values) : keyValues(std::forward<U>(values)), file(file) {}
|
||||
DecodeProgress(const LogFile& file) : file(file) {}
|
||||
|
||||
// If there are no more mutations to pull from the file.
|
||||
// However, we could have unfinished version in the buffer when EOF is true,
|
||||
// which means we should look for data in the next file. The caller
|
||||
// should call getUnfinishedBuffer() to get these left data.
|
||||
bool finished() const { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); }
|
||||
|
||||
std::vector<VersionedKVPart>&& getUnfinishedBuffer() && { return std::move(keyValues); }
|
||||
|
||||
// Returns all mutations of the next version in a batch.
|
||||
Future<VersionedMutations> getNextBatch() { return getNextBatchImpl(this); }
|
||||
bool finished() const { return done; }
|
||||
|
||||
// Open and loads file into memory
|
||||
Future<Void> openFile(Reference<IBackupContainer> container) { return openFileImpl(this, container); }
|
||||
|
||||
// The following are private APIs:
|
||||
|
||||
// Returns true if value contains complete data.
|
||||
static bool isValueComplete(StringRef value) {
|
||||
StringRefReader reader(value, restore_corrupted_data());
|
||||
|
||||
reader.consume<uint64_t>(); // Consume the includeVersion
|
||||
uint32_t val_length = reader.consume<uint32_t>();
|
||||
return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// PRECONDITION: finished() must return false before calling this function.
|
||||
// Returns the next batch of mutations along with the arena backing it.
|
||||
// Note the returned batch can be empty when the file has unfinished
|
||||
// version batch data that are in the next file.
|
||||
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
|
||||
ASSERT(!self->finished());
|
||||
VersionedMutations getNextBatch() {
|
||||
ASSERT(!finished());
|
||||
|
||||
loop {
|
||||
if (self->keyValues.size() <= 1) {
|
||||
// Try to decode another block when less than one left
|
||||
wait(readAndDecodeFile(self));
|
||||
}
|
||||
|
||||
const auto& kv = self->keyValues[0];
|
||||
ASSERT(kv.part == 0);
|
||||
|
||||
// decode next versions, check if they are continuous parts
|
||||
int idx = 1; // next kv pair in "keyValues"
|
||||
int bufSize = kv.kv.size();
|
||||
for (int lastPart = 0; idx < self->keyValues.size(); idx++, lastPart++) {
|
||||
if (idx == self->keyValues.size())
|
||||
break;
|
||||
|
||||
const auto& nextKV = self->keyValues[idx];
|
||||
if (kv.version != nextKV.version) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (lastPart + 1 != nextKV.part) {
|
||||
TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", nextKV.part);
|
||||
throw restore_corrupted_data();
|
||||
}
|
||||
bufSize += nextKV.kv.size();
|
||||
}
|
||||
|
||||
VersionedMutations m;
|
||||
m.version = kv.version;
|
||||
TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size());
|
||||
StringRef value = kv.kv;
|
||||
if (idx > 1) {
|
||||
// Stitch parts into one and then decode one by one
|
||||
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
|
||||
value = buf;
|
||||
m.arena = buf.arena();
|
||||
}
|
||||
if (isValueComplete(value)) {
|
||||
m.mutations = decode_value(value);
|
||||
if (m.arena.getSize() == 0) {
|
||||
m.arena = kv.arena;
|
||||
}
|
||||
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
|
||||
return m;
|
||||
} else if (!self->eof) {
|
||||
// Read one more block, hopefully the missing part of the value can be found.
|
||||
wait(readAndDecodeFile(self));
|
||||
} else {
|
||||
TraceEvent(SevWarn, "MissingValue").detail("Version", m.version);
|
||||
self->leftover = true;
|
||||
return m; // Empty mutations
|
||||
VersionedMutations vms;
|
||||
for (auto& [version, m] : mutationBlocksByVersion) {
|
||||
if (m.isComplete()) {
|
||||
vms.version = version;
|
||||
std::vector<MutationRef> mutations = decodeLogValue(m.serializedMutations);
|
||||
TraceEvent("Decode").detail("version", vms.version).detail("N", mutations.size());
|
||||
vms.mutations.insert(vms.mutations.end(), mutations.begin(), mutations.end());
|
||||
vms.arena = blocks.arena();
|
||||
vms.serializedMutations = m.serializedMutations;
|
||||
mutationBlocksByVersion.erase(version);
|
||||
return vms;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a buffer which stitches first "idx" values into one.
|
||||
// "len" MUST equal the summation of these values.
|
||||
Standalone<StringRef> combineValues(const int idx, const int len) {
|
||||
ASSERT(idx <= keyValues.size() && idx > 1);
|
||||
|
||||
Standalone<StringRef> buf = makeString(len);
|
||||
int n = 0;
|
||||
for (int i = 0; i < idx; i++) {
|
||||
const auto& value = keyValues[i].kv;
|
||||
memcpy(mutateString(buf) + n, value.begin(), value.size());
|
||||
n += value.size();
|
||||
}
|
||||
|
||||
ASSERT(n == len);
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Decodes a block into KeyValueRef stored in "keyValues".
|
||||
void decode_block(const Standalone<StringRef>& buf, int len) {
|
||||
StringRef block(buf.begin(), len);
|
||||
StringRefReader reader(block, restore_corrupted_data());
|
||||
|
||||
try {
|
||||
// Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION
|
||||
if (reader.consume<int32_t>() != BACKUP_AGENT_MLOG_VERSION)
|
||||
throw restore_unsupported_file_version();
|
||||
|
||||
// Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte.
|
||||
while (1) {
|
||||
// If eof reached or first key len bytes is 0xFF then end of block was reached.
|
||||
if (reader.eof() || *reader.rptr == 0xFF)
|
||||
break;
|
||||
|
||||
// Read key and value. If anything throws then there is a problem.
|
||||
uint32_t kLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* k = reader.consume(kLen);
|
||||
std::pair<Version, int32_t> version_part = decode_key(StringRef(k, kLen));
|
||||
uint32_t vLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* v = reader.consume(vLen);
|
||||
TraceEvent(SevDecodeInfo, "Block")
|
||||
.detail("KeySize", kLen)
|
||||
.detail("valueSize", vLen)
|
||||
.detail("Offset", reader.rptr - buf.begin())
|
||||
.detail("Version", version_part.first)
|
||||
.detail("Part", version_part.second);
|
||||
keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen));
|
||||
}
|
||||
|
||||
// Make sure any remaining bytes in the block are 0xFF
|
||||
for (auto b : reader.remainder()) {
|
||||
if (b != 0xFF)
|
||||
throw restore_corrupted_data_padding();
|
||||
}
|
||||
|
||||
// The (version, part) in a block can be out of order, i.e., (3, 0)
|
||||
// can be followed by (4, 0), and then (3, 1). So we need to sort them
|
||||
// first by version, and then by part number.
|
||||
std::sort(keyValues.begin(), keyValues.end(), [](const VersionedKVPart& a, const VersionedKVPart& b) {
|
||||
return a.version == b.version ? a.part < b.part : a.version < b.version;
|
||||
});
|
||||
return;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin());
|
||||
throw;
|
||||
}
|
||||
// No complete versions
|
||||
TraceEvent(SevWarn, "UnfishedBlocks").detail("NumberOfVersions", mutationBlocksByVersion.size());
|
||||
done = true;
|
||||
return vms;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> openFileImpl(DecodeProgress* self, Reference<IBackupContainer> container) {
|
||||
Reference<IAsyncFile> fd = wait(container->readFile(self->file.fileName));
|
||||
self->fd = fd;
|
||||
wait(readAndDecodeFile(self));
|
||||
while (!self->eof) {
|
||||
wait(readAndDecodeFile(self));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Add blocks to mutationBlocksByVersion
|
||||
void filterLogMutationKVPairs(VectorRef<KeyValueRef> blocks) {
|
||||
for (auto& kv : blocks) {
|
||||
auto versionAndChunkNumber = decodeLogKey(kv.key);
|
||||
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
|
||||
}
|
||||
/*
|
||||
std::vector<KeyValueRef> output;
|
||||
|
||||
for (auto& vb : mutationBlocksByVersion) {
|
||||
AccumulatedMutations& m = vb.second;
|
||||
|
||||
// If the mutations are incomplete or match one of the ranges, include in results.
|
||||
if (!m.isComplete() || m.matchesAnyRange(ranges)) {
|
||||
output.insert(output.end(), m.kvs.begin(), m.kvs.end());
|
||||
}
|
||||
}
|
||||
|
||||
return output;*/
|
||||
}
|
||||
|
||||
// Reads a file block, decodes it into key/value pairs, and stores these pairs.
|
||||
ACTOR static Future<Void> readAndDecodeFile(DecodeProgress* self) {
|
||||
try {
|
||||
|
@ -572,17 +646,21 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
state Standalone<StringRef> buf = makeString(len);
|
||||
state int rLen = wait(self->fd->read(mutateString(buf), len, self->offset));
|
||||
// Decode a file block into log_key and log_value pairs
|
||||
Standalone<VectorRef<KeyValueRef>> blocks = wait(decodeLogFileBlock(self->fd, self->offset, len));
|
||||
// This is memory inefficient, but we don't know if blocks are complete version data
|
||||
self->blocks.reserve(self->blocks.arena(), self->blocks.size() + blocks.size());
|
||||
for (int i = 0; i < blocks.size(); i++) {
|
||||
self->blocks.push_back_deep(self->blocks.arena(), blocks[i]);
|
||||
}
|
||||
|
||||
TraceEvent("ReadFile")
|
||||
.detail("Name", self->file.fileName)
|
||||
.detail("Len", rLen)
|
||||
.detail("Len", len)
|
||||
.detail("Offset", self->offset);
|
||||
if (rLen != len) {
|
||||
throw restore_corrupted_data();
|
||||
}
|
||||
self->decode_block(buf, rLen);
|
||||
self->offset += rLen;
|
||||
self->filterLogMutationKVPairs(blocks);
|
||||
self->offset += len;
|
||||
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "CorruptLogFileBlock")
|
||||
|
@ -598,9 +676,55 @@ public:
|
|||
Reference<IAsyncFile> fd;
|
||||
int64_t offset = 0;
|
||||
bool eof = false;
|
||||
bool leftover = false; // Done but has unfinished version batch data left
|
||||
bool done = false;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> process_file(Reference<IBackupContainer> container, LogFile file, UID uid, DecodeParams params) {
|
||||
TraceEvent("ProcessFile").detail("Name", file.fileName);
|
||||
std::cout << "ProcessFile " << file.fileName << "\n";
|
||||
if (file.fileSize == 0) {
|
||||
TraceEvent("SkipEmptyFile").detail("Name", file.fileName);
|
||||
return Void();
|
||||
}
|
||||
|
||||
state DecodeProgress progress(file);
|
||||
wait(progress.openFile(container));
|
||||
while (!progress.finished()) {
|
||||
VersionedMutations vms = progress.getNextBatch();
|
||||
if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) {
|
||||
TraceEvent("SkipVersion").detail("Version", vms.version);
|
||||
continue;
|
||||
}
|
||||
|
||||
int sub = 0;
|
||||
for (const auto& m : vms.mutations) {
|
||||
sub++; // sub sequence number starts at 1
|
||||
bool print = params.prefix.empty(); // no filtering
|
||||
|
||||
if (!print) {
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
print = m.param1.startsWith(StringRef(params.prefix));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRange range(KeyRangeRef(m.param1, m.param2));
|
||||
print = range.contains(StringRef(params.prefix));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
if (print) {
|
||||
TraceEvent(format("Mutation_%d_%d", vms.version, sub).c_str(), uid)
|
||||
.detail("Version", vms.version)
|
||||
.setMaxFieldLength(10000)
|
||||
.detail("M", m.toString());
|
||||
std::cout << vms.version << " " << m.toString() << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
TraceEvent("ProcessFileDone").detail("File", file.fileName);
|
||||
std::cout << "ProcessFileDone " << file.fileName << "\n";
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> decode_logs(DecodeParams params) {
|
||||
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
|
||||
state UID uid = deterministicRandom()->randomUniqueID();
|
||||
|
@ -625,49 +749,16 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
|
|||
|
||||
if (params.list_only) return Void();
|
||||
|
||||
state int i = 0;
|
||||
// Previous file's unfinished version data
|
||||
state std::vector<VersionedKVPart> left;
|
||||
for (; i < logs.size(); i++) {
|
||||
if (logs[i].fileSize == 0)
|
||||
continue;
|
||||
state int idx = 0;
|
||||
while (idx < logs.size()) {
|
||||
TraceEvent("ProcessFileI").detail("Name", logs[idx].fileName).detail("I", idx);
|
||||
std::cout << "ProcessFileI " << logs[idx].fileName << " " << idx << "\n";
|
||||
|
||||
state DecodeProgress progress(logs[i], std::move(left));
|
||||
wait(progress.openFile(container));
|
||||
while (!progress.finished()) {
|
||||
VersionedMutations vms = wait(progress.getNextBatch());
|
||||
if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) {
|
||||
continue;
|
||||
}
|
||||
wait(process_file(container, logs[idx], uid, params));
|
||||
|
||||
int sub = 0;
|
||||
for (const auto& m : vms.mutations) {
|
||||
sub++; // sub sequence number starts at 1
|
||||
bool print = params.prefix.empty(); // no filtering
|
||||
|
||||
if (!print) {
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
print = m.param1.startsWith(StringRef(params.prefix));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRange range(KeyRangeRef(m.param1, m.param2));
|
||||
print = range.contains(StringRef(params.prefix));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
if (print) {
|
||||
TraceEvent(format("Mutation_%d_%d", vms.version, sub).c_str(), uid)
|
||||
.detail("Version", vms.version)
|
||||
.setMaxFieldLength(10000)
|
||||
.detail("M", m.toString());
|
||||
std::cout << vms.version << " " << m.toString() << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
left = std::move(progress).getUnfinishedBuffer();
|
||||
if (!left.empty()) {
|
||||
TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size());
|
||||
}
|
||||
TraceEvent("ProcessFileIDone").detail("I", idx);
|
||||
std::cout << "ProcessFileIDone " << idx << "\n";
|
||||
idx++;
|
||||
}
|
||||
TraceEvent("DecodeDone", uid);
|
||||
return Void();
|
||||
|
|
Loading…
Reference in New Issue