Decode out of order mutations in old mutation logs

In the old mutation logs, a version's mutations are serialized as a buffer.
Then the buffer is split into smaller chunks, e.g., 10000 bytes each. When
writting chunks to the final mutation log file, these chunks can be flushed
out of order. For instance, the (version, chunck_part) can be in the order of
(3, 0), (4, 0), (3, 1). As a result, the decoder must read forward to find all
chunks of data for a version.

Another complication is that the files are organized into blocks, where (3, 1)
can be in a subsequent block. This change checks the value size for each
version, if the size is smaller than the right size, the decoder will look
for the missing chucks in the next block.
This commit is contained in:
Jingyu Zhou 2020-03-10 15:45:57 -07:00
parent 7d1538a9fc
commit 937d8bcb8e
2 changed files with 100 additions and 42 deletions

View File

@ -31,6 +31,7 @@ namespace file_converter {
enum {
OPT_CONTAINER,
OPT_BEGIN_VERSION,
OPT_CRASHONERROR,
OPT_END_VERSION,
OPT_TRACE,
OPT_TRACE_DIR,
@ -44,6 +45,7 @@ CSimpleOpt::SOption gConverterOptions[] = { { OPT_CONTAINER, "-r", SO_REQ_SEP },
{ OPT_CONTAINER, "--container", SO_REQ_SEP },
{ OPT_BEGIN_VERSION, "-b", SO_REQ_SEP },
{ OPT_BEGIN_VERSION, "--begin", SO_REQ_SEP },
{ OPT_CRASHONERROR, "--crash", SO_NONE },
{ OPT_END_VERSION, "-e", SO_REQ_SEP },
{ OPT_END_VERSION, "--end", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },

View File

@ -30,12 +30,15 @@
#include "flow/serialize.h"
#include "flow/actorcompiler.h" // has to be last include
extern bool g_crashOnError;
namespace file_converter {
void printDecodeUsage() {
std::cout << "\n"
" -r, --container Container URL.\n"
" -i, --input FILE Log file to be decoded.\n"
" --crash Crash on serious error.\n"
"\n";
return;
}
@ -89,6 +92,10 @@ int parseDecodeCommandLine(DecodeParams* param, CSimpleOpt* args) {
param->container_url = args->OptionArg();
break;
case OPT_CRASHONERROR:
g_crashOnError = true;
break;
case OPT_INPUT_FILE:
param->file = args->OptionArg();
break;
@ -161,7 +168,13 @@ std::vector<MutationRef> decode_value(const StringRef& value) {
reader.consume<uint64_t>(); // Consume the includeVersion
uint32_t val_length = reader.consume<uint32_t>();
ASSERT(val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t));
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
TraceEvent("ValueError")
.detail("ValueLen", val_length)
.detail("ValueSize", value.size())
.detail("Value", printable(value));
ASSERT(false);
}
std::vector<MutationRef> mutations;
while (1) {
@ -217,54 +230,74 @@ struct DecodeProgress {
// The following are private APIs:
// Returns true if value contains complete data.
bool isValueComplete(StringRef value) {
StringRefReader reader(value, restore_corrupted_data());
reader.consume<uint64_t>(); // Consume the includeVersion
uint32_t val_length = reader.consume<uint32_t>();
return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t);
}
// PRECONDITION: finished() must return false before calling this function.
// Returns the next batch of mutations along with the arena backing it.
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
ASSERT(!self->finished());
state std::pair<Arena, KeyValueRef> arena_kv = self->keyValues[0];
// decode this batch's version
state std::pair<Version, int32_t> version_part = decode_key(arena_kv.second.key);
ASSERT(version_part.second == 0); // first part number must be 0.
// decode next versions, check if they are continuous parts
state int idx = 1; // next kv pair in "keyValues"
state int bufSize = arena_kv.second.value.size();
state int lastPart = 0;
loop {
// Try to decode another block if needed
if (idx == self->keyValues.size()) {
wait(readAndDecodeFile(self));
state std::tuple<Arena, Version, int32_t, StringRef> tuple = self->keyValues[0];
ASSERT(std::get<2>(tuple) == 0); // first part number must be 0.
// decode next versions, check if they are continuous parts
state int idx = 1; // next kv pair in "keyValues"
state int bufSize = std::get<3>(tuple).size();
state int lastPart = 0;
loop {
// Try to decode another block if needed
if (idx == self->keyValues.size()) {
wait(readAndDecodeFile(self));
}
if (idx == self->keyValues.size()) break;
auto next_tuple = self->keyValues[idx];
if (std::get<1>(tuple) != std::get<1>(next_tuple)) {
break;
}
if (lastPart + 1 != std::get<2>(next_tuple)) {
TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", std::get<2>(next_tuple));
throw restore_corrupted_data();
}
bufSize += std::get<3>(next_tuple).size();
idx++;
lastPart++;
}
if (idx == self->keyValues.size()) break;
std::pair<Version, int32_t> next_version_part = decode_key(self->keyValues[idx].second.key);
if (version_part.first != next_version_part.first) break;
if (lastPart + 1 != next_version_part.second) {
TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", next_version_part.second);
VersionedMutations m;
m.version = std::get<1>(tuple);
TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size());
StringRef value = std::get<3>(tuple);
if (idx > 1) {
// Stitch parts into one and then decode one by one
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
value = buf;
m.arena = buf.arena();
} else {
m.arena = std::get<0>(tuple);
}
if (self->isValueComplete(value)) {
m.mutations = decode_value(value);
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
return m;
} else if (!self->eof) {
// Read one more block, hopefully the missing part of the value can be found.
wait(readAndDecodeFile(self));
} else {
TraceEvent(SevError, "MissingValue").detail("Version", m.version);
throw restore_corrupted_data();
}
bufSize += self->keyValues[idx].second.value.size();
idx++;
lastPart++;
}
VersionedMutations m;
m.version = version_part.first;
if (idx > 1) {
// Stitch parts into one and then decode one by one
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
m.mutations = decode_value(buf);
m.arena = buf.arena();
} else {
m.mutations = decode_value(arena_kv.second.value);
m.arena = arena_kv.first;
}
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
return m;
}
// Returns a buffer which stitches first "idx" values into one.
@ -275,7 +308,7 @@ struct DecodeProgress {
Standalone<StringRef> buf = makeString(len);
int n = 0;
for (int i = 0; i < idx; i++) {
const auto& value = keyValues[i].second.value;
const auto& value = std::get<3>(keyValues[i]);
memcpy(mutateString(buf) + n, value.begin(), value.size());
n += value.size();
}
@ -301,9 +334,16 @@ struct DecodeProgress {
// Read key and value. If anything throws then there is a problem.
uint32_t kLen = reader.consumeNetworkUInt32();
const uint8_t* k = reader.consume(kLen);
std::pair<Version, int32_t> version_part = decode_key(StringRef(k, kLen));
uint32_t vLen = reader.consumeNetworkUInt32();
const uint8_t* v = reader.consume(vLen);
keyValues.emplace_back(buf.arena(), KeyValueRef(StringRef(k, kLen), StringRef(v, vLen)));
TraceEvent("Block")
.detail("KeySize", kLen)
.detail("valueSize", vLen)
.detail("Offset", reader.rptr - buf.begin())
.detail("Version", version_part.first)
.detail("Part", version_part.second);
keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen));
}
// Make sure any remaining bytes in the block are 0xFF
@ -311,6 +351,15 @@ struct DecodeProgress {
if (b != 0xFF) throw restore_corrupted_data_padding();
}
// The (version, part) in a block can be out of order, i.e., (3, 0)
// can be followed by (4, 0), and then (3, 1). So we need to sort them
// first by version, and then by part number.
std::sort(keyValues.begin(), keyValues.end(),
[](const std::tuple<Arena, Version, int32_t, StringRef>& a,
const std::tuple<Arena, Version, int32_t, StringRef>& b) {
return std::get<1>(a) == std::get<1>(b) ? std::get<2>(a) < std::get<2>(b)
: std::get<1>(a) < std::get<1>(b);
});
return;
} catch (Error& e) {
TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin());
@ -360,14 +409,21 @@ struct DecodeProgress {
Reference<IAsyncFile> fd;
int64_t offset = 0;
bool eof = false;
// Key value pairs and their memory arenas.
std::vector<std::pair<Arena, KeyValueRef>> keyValues;
// A (version, part_number)'s mutations and memory arena.
std::vector<std::tuple<Arena, Version, int32_t, StringRef>> keyValues;
};
ACTOR Future<Void> decode_logs(DecodeParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state BackupFileList listing = wait(container->dumpFileList());
// remove partitioned logs
listing.logs.erase(std::remove_if(listing.logs.begin(), listing.logs.end(),
[](const LogFile& file) {
std::string prefix("plogs/");
return file.fileName.substr(0, prefix.size()) == prefix;
}),
listing.logs.end());
std::sort(listing.logs.begin(), listing.logs.end());
TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size());