From e3629ef356b523fc949c4033392e9f7f9ca29950 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sun, 1 Aug 2021 14:08:54 -0700 Subject: [PATCH] Refactor decoder --- fdbbackup/FileDecoder.actor.cpp | 491 +++++++++++++++++++------------- 1 file changed, 291 insertions(+), 200 deletions(-) diff --git a/fdbbackup/FileDecoder.actor.cpp b/fdbbackup/FileDecoder.actor.cpp index 2ccb01a61f..7a93db7857 100644 --- a/fdbbackup/FileDecoder.actor.cpp +++ b/fdbbackup/FileDecoder.actor.cpp @@ -363,20 +363,190 @@ std::vector decode_value(const StringRef& value) { return mutations; } + +// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and +// returns (commitVersion, chunkNumber) +std::pair decodeLogKey(const StringRef& key) { + ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t)); + + uint8_t hash; + Version version; + int32_t part; + BinaryReader rd(key, Unversioned()); + rd >> hash >> version >> part; + version = bigEndian64(version); + part = bigEndian32(part); + + int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; + ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash); + + return std::make_pair(version, part); +} + +// Decodes an encoded list of mutations in the format of: +// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k], +// where a mutation is encoded as: +// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2] +std::vector decodeLogValue(const StringRef& value) { + StringRefReader reader(value, restore_corrupted_data()); + + Version protocolVersion = reader.consume(); + if (protocolVersion <= 0x0FDB00A200090001) { + throw incompatible_protocol_version(); + } + + uint32_t val_length = reader.consume(); + if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) { + TraceEvent(SevError, "FileRestoreLogValueError") + .detail("ValueLen", val_length) + .detail("ValueSize", value.size()) + .detail("Value", printable(value)); + } + + std::vector mutations; + while (1) { + if (reader.eof()) + break; + + // Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep() + uint32_t type, p1len, p2len; + type = reader.consume(); + p1len = reader.consume(); + p2len = reader.consume(); + + const uint8_t* key = reader.consume(p1len); + const uint8_t* val = reader.consume(p2len); + + mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len)); + } + return mutations; +} + +// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk, +// in chunk order, and can check the chunk set for completion or intersection with a set +// of ranges. +struct AccumulatedMutations { + AccumulatedMutations() : lastChunkNumber(-1) {} + + // Add a KV pair for this mutation chunk set + // It will be accumulated onto serializedMutations if the chunk number is + // the next expected value. + void addChunk(int chunkNumber, const KeyValueRef& kv) { + if (chunkNumber == lastChunkNumber + 1) { + lastChunkNumber = chunkNumber; + serializedMutations += kv.value.toString(); + } else { + lastChunkNumber = -2; + serializedMutations.clear(); + } + kvs.push_back(kv); + } + + // Returns true if both + // - 1 or more chunks were added to this set + // - The header of the first chunk contains a valid protocol version and a length + // that matches the bytes after the header in the combined value in serializedMutations + bool isComplete() const { + if (lastChunkNumber >= 0) { + StringRefReader reader(serializedMutations, restore_corrupted_data()); + + Version protocolVersion = reader.consume(); + if (protocolVersion <= 0x0FDB00A200090001) { + throw incompatible_protocol_version(); + } + + uint32_t vLen = reader.consume(); + return vLen == reader.remainder().size(); + } + + return false; + } + + // Returns true if a complete chunk contains any MutationRefs which intersect with any + // range in ranges. + // It is undefined behavior to run this if isComplete() does not return true. + bool matchesAnyRange(const std::vector& ranges) const { + std::vector mutations = decodeLogValue(serializedMutations); + for (auto& m : mutations) { + for (auto& r : ranges) { + if (m.type == MutationRef::ClearRange) { + if (r.intersects(KeyRangeRef(m.param1, m.param2))) { + return true; + } + } else { + if (r.contains(m.param1)) { + return true; + } + } + } + } + + return false; + } + + std::vector kvs; + std::string serializedMutations; + int lastChunkNumber; +}; + + + struct VersionedMutations { Version version; std::vector mutations; Arena arena; // The arena that contains the mutations. + std::string serializedMutations; // buffer that contains mutations }; -struct VersionedKVPart { - Arena arena; - Version version; - int32_t part; - StringRef kv; - VersionedKVPart(Arena arena, Version version, int32_t part, StringRef kv) - : arena(arena), version(version), part(part), kv(kv) {} -}; +ACTOR Future>> decodeLogFileBlock(Reference file, + int64_t offset, + int len) { + state Standalone buf = makeString(len); + int rLen = wait(file->read(mutateString(buf), len, offset)); + if (rLen != len) + throw restore_bad_read(); + + Standalone> results({}, buf.arena()); + state StringRefReader reader(buf, restore_corrupted_data()); + + try { + // Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION + if (reader.consume() != BACKUP_AGENT_MLOG_VERSION) + throw restore_unsupported_file_version(); + + // Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte. + while (1) { + // If eof reached or first key len bytes is 0xFF then end of block was reached. + if (reader.eof() || *reader.rptr == 0xFF) + break; + + // Read key and value. If anything throws then there is a problem. + uint32_t kLen = reader.consumeNetworkUInt32(); + const uint8_t* k = reader.consume(kLen); + uint32_t vLen = reader.consumeNetworkUInt32(); + const uint8_t* v = reader.consume(vLen); + + results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef(v, vLen))); + } + + // Make sure any remaining bytes in the block are 0xFF + for (auto b : reader.remainder()) + if (b != 0xFF) + throw restore_corrupted_data_padding(); + + return results; + + } catch (Error& e) { + TraceEvent(SevWarn, "FileRestoreCorruptLogFileBlock") + .error(e) + .detail("Filename", file->getFilename()) + .detail("BlockOffset", offset) + .detail("BlockLen", len) + .detail("ErrorRelativeOffset", reader.rptr - buf.begin()) + .detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset); + throw; + } +} /* * Model a decoding progress for a mutation file. Usage is: @@ -395,174 +565,78 @@ struct VersionedKVPart { * at any time this object might have two blocks of data in memory. */ class DecodeProgress { - std::vector keyValues; + Standalone> blocks; + std::unordered_map mutationBlocksByVersion; public: DecodeProgress() = default; - template - DecodeProgress(const LogFile& file, U&& values) : keyValues(std::forward(values)), file(file) {} + DecodeProgress(const LogFile& file) : file(file) {} // If there are no more mutations to pull from the file. - // However, we could have unfinished version in the buffer when EOF is true, - // which means we should look for data in the next file. The caller - // should call getUnfinishedBuffer() to get these left data. - bool finished() const { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); } - - std::vector&& getUnfinishedBuffer() && { return std::move(keyValues); } - - // Returns all mutations of the next version in a batch. - Future getNextBatch() { return getNextBatchImpl(this); } + bool finished() const { return done; } + // Open and loads file into memory Future openFile(Reference container) { return openFileImpl(this, container); } // The following are private APIs: - // Returns true if value contains complete data. - static bool isValueComplete(StringRef value) { - StringRefReader reader(value, restore_corrupted_data()); - - reader.consume(); // Consume the includeVersion - uint32_t val_length = reader.consume(); - return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t); - } - // PRECONDITION: finished() must return false before calling this function. // Returns the next batch of mutations along with the arena backing it. // Note the returned batch can be empty when the file has unfinished // version batch data that are in the next file. - ACTOR static Future getNextBatchImpl(DecodeProgress* self) { - ASSERT(!self->finished()); + VersionedMutations getNextBatch() { + ASSERT(!finished()); - loop { - if (self->keyValues.size() <= 1) { - // Try to decode another block when less than one left - wait(readAndDecodeFile(self)); - } - - const auto& kv = self->keyValues[0]; - ASSERT(kv.part == 0); - - // decode next versions, check if they are continuous parts - int idx = 1; // next kv pair in "keyValues" - int bufSize = kv.kv.size(); - for (int lastPart = 0; idx < self->keyValues.size(); idx++, lastPart++) { - if (idx == self->keyValues.size()) - break; - - const auto& nextKV = self->keyValues[idx]; - if (kv.version != nextKV.version) { - break; - } - - if (lastPart + 1 != nextKV.part) { - TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", nextKV.part); - throw restore_corrupted_data(); - } - bufSize += nextKV.kv.size(); - } - - VersionedMutations m; - m.version = kv.version; - TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size()); - StringRef value = kv.kv; - if (idx > 1) { - // Stitch parts into one and then decode one by one - Standalone buf = self->combineValues(idx, bufSize); - value = buf; - m.arena = buf.arena(); - } - if (isValueComplete(value)) { - m.mutations = decode_value(value); - if (m.arena.getSize() == 0) { - m.arena = kv.arena; - } - self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx); - return m; - } else if (!self->eof) { - // Read one more block, hopefully the missing part of the value can be found. - wait(readAndDecodeFile(self)); - } else { - TraceEvent(SevWarn, "MissingValue").detail("Version", m.version); - self->leftover = true; - return m; // Empty mutations + VersionedMutations vms; + for (auto& [version, m] : mutationBlocksByVersion) { + if (m.isComplete()) { + vms.version = version; + std::vector mutations = decodeLogValue(m.serializedMutations); + TraceEvent("Decode").detail("version", vms.version).detail("N", mutations.size()); + vms.mutations.insert(vms.mutations.end(), mutations.begin(), mutations.end()); + vms.arena = blocks.arena(); + vms.serializedMutations = m.serializedMutations; + mutationBlocksByVersion.erase(version); + return vms; } } - } - // Returns a buffer which stitches first "idx" values into one. - // "len" MUST equal the summation of these values. - Standalone combineValues(const int idx, const int len) { - ASSERT(idx <= keyValues.size() && idx > 1); - - Standalone buf = makeString(len); - int n = 0; - for (int i = 0; i < idx; i++) { - const auto& value = keyValues[i].kv; - memcpy(mutateString(buf) + n, value.begin(), value.size()); - n += value.size(); - } - - ASSERT(n == len); - return buf; - } - - // Decodes a block into KeyValueRef stored in "keyValues". - void decode_block(const Standalone& buf, int len) { - StringRef block(buf.begin(), len); - StringRefReader reader(block, restore_corrupted_data()); - - try { - // Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION - if (reader.consume() != BACKUP_AGENT_MLOG_VERSION) - throw restore_unsupported_file_version(); - - // Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte. - while (1) { - // If eof reached or first key len bytes is 0xFF then end of block was reached. - if (reader.eof() || *reader.rptr == 0xFF) - break; - - // Read key and value. If anything throws then there is a problem. - uint32_t kLen = reader.consumeNetworkUInt32(); - const uint8_t* k = reader.consume(kLen); - std::pair version_part = decode_key(StringRef(k, kLen)); - uint32_t vLen = reader.consumeNetworkUInt32(); - const uint8_t* v = reader.consume(vLen); - TraceEvent(SevDecodeInfo, "Block") - .detail("KeySize", kLen) - .detail("valueSize", vLen) - .detail("Offset", reader.rptr - buf.begin()) - .detail("Version", version_part.first) - .detail("Part", version_part.second); - keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen)); - } - - // Make sure any remaining bytes in the block are 0xFF - for (auto b : reader.remainder()) { - if (b != 0xFF) - throw restore_corrupted_data_padding(); - } - - // The (version, part) in a block can be out of order, i.e., (3, 0) - // can be followed by (4, 0), and then (3, 1). So we need to sort them - // first by version, and then by part number. - std::sort(keyValues.begin(), keyValues.end(), [](const VersionedKVPart& a, const VersionedKVPart& b) { - return a.version == b.version ? a.part < b.part : a.version < b.version; - }); - return; - } catch (Error& e) { - TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin()); - throw; - } + // No complete versions + TraceEvent(SevWarn, "UnfishedBlocks").detail("NumberOfVersions", mutationBlocksByVersion.size()); + done = true; + return vms; } ACTOR static Future openFileImpl(DecodeProgress* self, Reference container) { Reference fd = wait(container->readFile(self->file.fileName)); self->fd = fd; - wait(readAndDecodeFile(self)); + while (!self->eof) { + wait(readAndDecodeFile(self)); + } return Void(); } + // Add blocks to mutationBlocksByVersion + void filterLogMutationKVPairs(VectorRef blocks) { + for (auto& kv : blocks) { + auto versionAndChunkNumber = decodeLogKey(kv.key); + mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv); + } +/* + std::vector output; + + for (auto& vb : mutationBlocksByVersion) { + AccumulatedMutations& m = vb.second; + + // If the mutations are incomplete or match one of the ranges, include in results. + if (!m.isComplete() || m.matchesAnyRange(ranges)) { + output.insert(output.end(), m.kvs.begin(), m.kvs.end()); + } + } + + return output;*/ + } + // Reads a file block, decodes it into key/value pairs, and stores these pairs. ACTOR static Future readAndDecodeFile(DecodeProgress* self) { try { @@ -572,17 +646,21 @@ public: return Void(); } - state Standalone buf = makeString(len); - state int rLen = wait(self->fd->read(mutateString(buf), len, self->offset)); + // Decode a file block into log_key and log_value pairs + Standalone> blocks = wait(decodeLogFileBlock(self->fd, self->offset, len)); + // This is memory inefficient, but we don't know if blocks are complete version data + self->blocks.reserve(self->blocks.arena(), self->blocks.size() + blocks.size()); + for (int i = 0; i < blocks.size(); i++) { + self->blocks.push_back_deep(self->blocks.arena(), blocks[i]); + } + TraceEvent("ReadFile") .detail("Name", self->file.fileName) - .detail("Len", rLen) + .detail("Len", len) .detail("Offset", self->offset); - if (rLen != len) { - throw restore_corrupted_data(); - } - self->decode_block(buf, rLen); - self->offset += rLen; + self->filterLogMutationKVPairs(blocks); + self->offset += len; + return Void(); } catch (Error& e) { TraceEvent(SevWarn, "CorruptLogFileBlock") @@ -598,9 +676,55 @@ public: Reference fd; int64_t offset = 0; bool eof = false; - bool leftover = false; // Done but has unfinished version batch data left + bool done = false; }; +ACTOR Future process_file(Reference container, LogFile file, UID uid, DecodeParams params) { + TraceEvent("ProcessFile").detail("Name", file.fileName); + std::cout << "ProcessFile " << file.fileName << "\n"; + if (file.fileSize == 0) { + TraceEvent("SkipEmptyFile").detail("Name", file.fileName); + return Void(); + } + + state DecodeProgress progress(file); + wait(progress.openFile(container)); + while (!progress.finished()) { + VersionedMutations vms = progress.getNextBatch(); + if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) { + TraceEvent("SkipVersion").detail("Version", vms.version); + continue; + } + + int sub = 0; + for (const auto& m : vms.mutations) { + sub++; // sub sequence number starts at 1 + bool print = params.prefix.empty(); // no filtering + + if (!print) { + if (isSingleKeyMutation((MutationRef::Type)m.type)) { + print = m.param1.startsWith(StringRef(params.prefix)); + } else if (m.type == MutationRef::ClearRange) { + KeyRange range(KeyRangeRef(m.param1, m.param2)); + print = range.contains(StringRef(params.prefix)); + } else { + ASSERT(false); + } + } + if (print) { + TraceEvent(format("Mutation_%d_%d", vms.version, sub).c_str(), uid) + .detail("Version", vms.version) + .setMaxFieldLength(10000) + .detail("M", m.toString()); + std::cout << vms.version << " " << m.toString() << "\n"; + } + } + } + TraceEvent("ProcessFileDone").detail("File", file.fileName); + std::cout << "ProcessFileDone " << file.fileName << "\n"; + return Void(); +} + ACTOR Future decode_logs(DecodeParams params) { state Reference container = IBackupContainer::openContainer(params.container_url); state UID uid = deterministicRandom()->randomUniqueID(); @@ -625,49 +749,16 @@ ACTOR Future decode_logs(DecodeParams params) { if (params.list_only) return Void(); - state int i = 0; - // Previous file's unfinished version data - state std::vector left; - for (; i < logs.size(); i++) { - if (logs[i].fileSize == 0) - continue; + state int idx = 0; + while (idx < logs.size()) { + TraceEvent("ProcessFileI").detail("Name", logs[idx].fileName).detail("I", idx); + std::cout << "ProcessFileI " << logs[idx].fileName << " " << idx << "\n"; - state DecodeProgress progress(logs[i], std::move(left)); - wait(progress.openFile(container)); - while (!progress.finished()) { - VersionedMutations vms = wait(progress.getNextBatch()); - if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) { - continue; - } + wait(process_file(container, logs[idx], uid, params)); - int sub = 0; - for (const auto& m : vms.mutations) { - sub++; // sub sequence number starts at 1 - bool print = params.prefix.empty(); // no filtering - - if (!print) { - if (isSingleKeyMutation((MutationRef::Type)m.type)) { - print = m.param1.startsWith(StringRef(params.prefix)); - } else if (m.type == MutationRef::ClearRange) { - KeyRange range(KeyRangeRef(m.param1, m.param2)); - print = range.contains(StringRef(params.prefix)); - } else { - ASSERT(false); - } - } - if (print) { - TraceEvent(format("Mutation_%d_%d", vms.version, sub).c_str(), uid) - .detail("Version", vms.version) - .setMaxFieldLength(10000) - .detail("M", m.toString()); - std::cout << vms.version << " " << m.toString() << "\n"; - } - } - } - left = std::move(progress).getUnfinishedBuffer(); - if (!left.empty()) { - TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size()); - } + TraceEvent("ProcessFileIDone").detail("I", idx); + std::cout << "ProcessFileIDone " << idx << "\n"; + idx++; } TraceEvent("DecodeDone", uid); return Void();