foundationdb/fdbbackup/FileDecoder.actor.cpp

600 lines
19 KiB
C++

/*
* FileDecoder.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <iostream>
#include <vector>
#include "fdbbackup/BackupTLSConfig.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbbackup/FileConverter.h"
#include "fdbclient/MutationList.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/serialize.h"
#include "fdbclient/BuildFlags.h"
#include "flow/actorcompiler.h" // has to be last include
#define SevDecodeInfo SevVerbose
extern bool g_crashOnError;
namespace file_converter {
void printDecodeUsage() {
std::cout
<< "Decoder for FoundationDB backup mutation logs.\n"
"Usage: fdbdecode [OPTIONS]\n"
" -r, --container URL\n"
" Backup container URL, e.g., file:///some/path/.\n"
" -i, --input FILE\n"
" Log file filter, only matched files are decoded.\n"
" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
" unspecified, defaults to the current directory. Has\n"
" no effect unless --log is specified.\n"
" --loggroup LOG_GROUP\n"
" Sets the LogGroup field with the specified value for all\n"
" events in the trace output (defaults to `default').\n"
" --trace_format FORMAT\n"
" Select the format of the trace files, xml (the default) or json.\n"
" Has no effect unless --log is specified.\n"
" --crash Crash on serious error.\n"
" --blob_credentials FILE\n"
" File containing blob credentials in JSON format.\n"
" The same credential format/file fdbbackup uses.\n"
#ifndef TLS_DISABLED
TLS_HELP
#endif
" --build_flags Print build information and exit.\n"
"\n";
return;
}
void printBuildInformation() {
std::cout << jsonBuildInformation() << "\n";
}
struct DecodeParams {
std::string container_url;
std::string fileFilter; // only files match the filter will be decoded
bool log_enabled = false;
std::string log_dir, trace_format, trace_log_group;
BackupTLSConfig tlsConfig;
std::string toString() {
std::string s;
s.append("ContainerURL: ");
s.append(container_url);
s.append(", FileFilter: ");
s.append(fileFilter);
if (log_enabled) {
if (!log_dir.empty()) {
s.append(" LogDir:").append(log_dir);
}
if (!trace_format.empty()) {
s.append(" Format:").append(trace_format);
}
if (!trace_log_group.empty()) {
s.append(" LogGroup:").append(trace_log_group);
}
}
return s;
}
};
int parseDecodeCommandLine(DecodeParams* param, CSimpleOpt* args) {
while (args->Next()) {
auto lastError = args->LastError();
switch (lastError) {
case SO_SUCCESS:
break;
default:
std::cerr << "ERROR: argument given for option: " << args->OptionText() << "\n";
return FDB_EXIT_ERROR;
break;
}
int optId = args->OptionId();
switch (optId) {
case OPT_HELP:
return FDB_EXIT_ERROR;
case OPT_CONTAINER:
param->container_url = args->OptionArg();
break;
case OPT_CRASHONERROR:
g_crashOnError = true;
break;
case OPT_INPUT_FILE:
param->fileFilter = args->OptionArg();
break;
case OPT_TRACE:
param->log_enabled = true;
break;
case OPT_TRACE_DIR:
param->log_dir = args->OptionArg();
break;
case OPT_TRACE_FORMAT:
if (!validateTraceFormat(args->OptionArg())) {
std::cerr << "ERROR: Unrecognized trace format " << args->OptionArg() << "\n";
return FDB_EXIT_ERROR;
}
param->trace_format = args->OptionArg();
break;
case OPT_TRACE_LOG_GROUP:
param->trace_log_group = args->OptionArg();
break;
case OPT_BLOB_CREDENTIALS:
param->tlsConfig.blobCredentials.push_back(args->OptionArg());
break;
#ifndef TLS_DISABLED
case TLSConfig::OPT_TLS_PLUGIN:
args->OptionArg();
break;
case TLSConfig::OPT_TLS_CERTIFICATES:
param->tlsConfig.tlsCertPath = args->OptionArg();
break;
case TLSConfig::OPT_TLS_PASSWORD:
param->tlsConfig.tlsPassword = args->OptionArg();
break;
case TLSConfig::OPT_TLS_CA_FILE:
param->tlsConfig.tlsCAPath = args->OptionArg();
break;
case TLSConfig::OPT_TLS_KEY:
param->tlsConfig.tlsKeyPath = args->OptionArg();
break;
case TLSConfig::OPT_TLS_VERIFY_PEERS:
param->tlsConfig.tlsVerifyPeers = args->OptionArg();
break;
#endif
case OPT_BUILD_FLAGS:
printBuildInformation();
return FDB_EXIT_ERROR;
break;
}
}
return FDB_EXIT_SUCCESS;
}
void printLogFiles(std::string msg, const std::vector<LogFile>& files) {
std::cout << msg << " " << files.size() << " log files\n";
for (const auto& file : files) {
std::cout << file.toString() << "\n";
}
std::cout << std::endl;
}
std::vector<LogFile> getRelevantLogFiles(const std::vector<LogFile>& files, const DecodeParams& params) {
std::vector<LogFile> filtered;
for (const auto& file : files) {
if (file.fileName.find(params.fileFilter) != std::string::npos) {
filtered.push_back(file);
}
}
return filtered;
}
std::pair<Version, int32_t> decode_key(const StringRef& key) {
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
uint8_t hash;
Version version;
int32_t part;
BinaryReader rd(key, Unversioned());
rd >> hash >> version >> part;
version = bigEndian64(version);
part = bigEndian32(part);
int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash);
return std::make_pair(version, part);
}
// Decodes an encoded list of mutations in the format of:
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
// where a mutation is encoded as:
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][key][value]
std::vector<MutationRef> decode_value(const StringRef& value) {
StringRefReader reader(value, restore_corrupted_data());
reader.consume<uint64_t>(); // Consume the includeVersion
uint32_t val_length = reader.consume<uint32_t>();
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
TraceEvent(SevError, "ValueError")
.detail("ValueLen", val_length)
.detail("ValueSize", value.size())
.detail("Value", printable(value));
}
std::vector<MutationRef> mutations;
while (1) {
if (reader.eof())
break;
// Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep()
uint32_t type, p1len, p2len;
type = reader.consume<uint32_t>();
p1len = reader.consume<uint32_t>();
p2len = reader.consume<uint32_t>();
const uint8_t* key = reader.consume(p1len);
const uint8_t* val = reader.consume(p2len);
mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len));
}
return mutations;
}
struct VersionedMutations {
Version version;
std::vector<MutationRef> mutations;
Arena arena; // The arena that contains the mutations.
};
struct VersionedKVPart {
Arena arena;
Version version;
int32_t part;
StringRef kv;
VersionedKVPart(Arena arena, Version version, int32_t part, StringRef kv)
: arena(arena), version(version), part(part), kv(kv) {}
};
/*
* Model a decoding progress for a mutation file. Usage is:
*
* DecodeProgress progress(logfile);
* wait(progress->openFile(container));
* while (!progress->finished()) {
* VersionedMutations m = wait(progress->getNextBatch());
* ...
* }
*
* Internally, the decoding process is done block by block -- each block is
* decoded into a list of key/value pairs, which are then decoded into batches
* of mutations. Because a version's mutations can be split into many key/value
* pairs, the decoding of mutation batch needs to look ahead one more pair. So
* at any time this object might have two blocks of data in memory.
*/
class DecodeProgress {
std::vector<VersionedKVPart> keyValues;
public:
DecodeProgress() = default;
template <class U>
DecodeProgress(const LogFile& file, U&& values) : file(file), keyValues(std::forward<U>(values)) {}
// If there are no more mutations to pull from the file.
// However, we could have unfinished version in the buffer when EOF is true,
// which means we should look for data in the next file. The caller
// should call getUnfinishedBuffer() to get these left data.
bool finished() const { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); }
std::vector<VersionedKVPart>&& getUnfinishedBuffer() && { return std::move(keyValues); }
// Returns all mutations of the next version in a batch.
Future<VersionedMutations> getNextBatch() { return getNextBatchImpl(this); }
Future<Void> openFile(Reference<IBackupContainer> container) { return openFileImpl(this, container); }
// The following are private APIs:
// Returns true if value contains complete data.
static bool isValueComplete(StringRef value) {
StringRefReader reader(value, restore_corrupted_data());
reader.consume<uint64_t>(); // Consume the includeVersion
uint32_t val_length = reader.consume<uint32_t>();
return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t);
}
// PRECONDITION: finished() must return false before calling this function.
// Returns the next batch of mutations along with the arena backing it.
// Note the returned batch can be empty when the file has unfinished
// version batch data that are in the next file.
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
ASSERT(!self->finished());
loop {
if (self->keyValues.size() <= 1) {
// Try to decode another block when less than one left
wait(readAndDecodeFile(self));
}
const auto& kv = self->keyValues[0];
ASSERT(kv.part == 0);
// decode next versions, check if they are continuous parts
int idx = 1; // next kv pair in "keyValues"
int bufSize = kv.kv.size();
for (int lastPart = 0; idx < self->keyValues.size(); idx++, lastPart++) {
if (idx == self->keyValues.size())
break;
const auto& nextKV = self->keyValues[idx];
if (kv.version != nextKV.version) {
break;
}
if (lastPart + 1 != nextKV.part) {
TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", nextKV.part);
throw restore_corrupted_data();
}
bufSize += nextKV.kv.size();
}
VersionedMutations m;
m.version = kv.version;
TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size());
StringRef value = kv.kv;
if (idx > 1) {
// Stitch parts into one and then decode one by one
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
value = buf;
m.arena = buf.arena();
}
if (isValueComplete(value)) {
m.mutations = decode_value(value);
if (m.arena.getSize() == 0) {
m.arena = kv.arena;
}
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
return m;
} else if (!self->eof) {
// Read one more block, hopefully the missing part of the value can be found.
wait(readAndDecodeFile(self));
} else {
TraceEvent(SevWarn, "MissingValue").detail("Version", m.version);
self->leftover = true;
return m; // Empty mutations
}
}
}
// Returns a buffer which stitches first "idx" values into one.
// "len" MUST equal the summation of these values.
Standalone<StringRef> combineValues(const int idx, const int len) {
ASSERT(idx <= keyValues.size() && idx > 1);
Standalone<StringRef> buf = makeString(len);
int n = 0;
for (int i = 0; i < idx; i++) {
const auto& value = keyValues[i].kv;
memcpy(mutateString(buf) + n, value.begin(), value.size());
n += value.size();
}
ASSERT(n == len);
return buf;
}
// Decodes a block into KeyValueRef stored in "keyValues".
void decode_block(const Standalone<StringRef>& buf, int len) {
StringRef block(buf.begin(), len);
StringRefReader reader(block, restore_corrupted_data());
try {
// Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION
if (reader.consume<int32_t>() != BACKUP_AGENT_MLOG_VERSION)
throw restore_unsupported_file_version();
// Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte.
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF)
break;
// Read key and value. If anything throws then there is a problem.
uint32_t kLen = reader.consumeNetworkUInt32();
const uint8_t* k = reader.consume(kLen);
std::pair<Version, int32_t> version_part = decode_key(StringRef(k, kLen));
uint32_t vLen = reader.consumeNetworkUInt32();
const uint8_t* v = reader.consume(vLen);
TraceEvent(SevDecodeInfo, "Block")
.detail("KeySize", kLen)
.detail("valueSize", vLen)
.detail("Offset", reader.rptr - buf.begin())
.detail("Version", version_part.first)
.detail("Part", version_part.second);
keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen));
}
// Make sure any remaining bytes in the block are 0xFF
for (auto b : reader.remainder()) {
if (b != 0xFF)
throw restore_corrupted_data_padding();
}
// The (version, part) in a block can be out of order, i.e., (3, 0)
// can be followed by (4, 0), and then (3, 1). So we need to sort them
// first by version, and then by part number.
std::sort(keyValues.begin(), keyValues.end(), [](const VersionedKVPart& a, const VersionedKVPart& b) {
return a.version == b.version ? a.part < b.part : a.version < b.version;
});
return;
} catch (Error& e) {
TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin());
throw;
}
}
ACTOR static Future<Void> openFileImpl(DecodeProgress* self, Reference<IBackupContainer> container) {
Reference<IAsyncFile> fd = wait(container->readFile(self->file.fileName));
self->fd = fd;
wait(readAndDecodeFile(self));
return Void();
}
// Reads a file block, decodes it into key/value pairs, and stores these pairs.
ACTOR static Future<Void> readAndDecodeFile(DecodeProgress* self) {
try {
state int64_t len = std::min<int64_t>(self->file.blockSize, self->file.fileSize - self->offset);
if (len == 0) {
self->eof = true;
return Void();
}
state Standalone<StringRef> buf = makeString(len);
state int rLen = wait(self->fd->read(mutateString(buf), len, self->offset));
TraceEvent("ReadFile")
.detail("Name", self->file.fileName)
.detail("Len", rLen)
.detail("Offset", self->offset);
if (rLen != len) {
throw restore_corrupted_data();
}
self->decode_block(buf, rLen);
self->offset += rLen;
return Void();
} catch (Error& e) {
TraceEvent(SevWarn, "CorruptLogFileBlock")
.error(e)
.detail("Filename", self->file.fileName)
.detail("BlockOffset", self->offset)
.detail("BlockLen", self->file.blockSize);
throw;
}
}
LogFile file;
Reference<IAsyncFile> fd;
int64_t offset = 0;
bool eof = false;
bool leftover = false; // Done but has unfinished version batch data left
};
ACTOR Future<Void> decode_logs(DecodeParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state BackupFileList listing = wait(container->dumpFileList());
// remove partitioned logs
listing.logs.erase(std::remove_if(listing.logs.begin(),
listing.logs.end(),
[](const LogFile& file) {
std::string prefix("plogs/");
return file.fileName.substr(0, prefix.size()) == prefix;
}),
listing.logs.end());
std::sort(listing.logs.begin(), listing.logs.end());
TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size());
BackupDescription desc = wait(container->describeBackup());
std::cout << "\n" << desc.toString() << "\n";
state std::vector<LogFile> logs = getRelevantLogFiles(listing.logs, params);
printLogFiles("Relevant files are: ", logs);
state int i = 0;
// Previous file's unfinished version data
state std::vector<VersionedKVPart> left;
for (; i < logs.size(); i++) {
if (logs[i].fileSize == 0)
continue;
state DecodeProgress progress(logs[i], std::move(left));
wait(progress.openFile(container));
while (!progress.finished()) {
VersionedMutations vms = wait(progress.getNextBatch());
for (const auto& m : vms.mutations) {
std::cout << vms.version << " " << m.toString() << "\n";
}
}
left = std::move(progress).getUnfinishedBuffer();
if (!left.empty()) {
TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size());
}
}
return Void();
}
} // namespace file_converter
int main(int argc, char** argv) {
try {
CSimpleOpt* args = new CSimpleOpt(argc, argv, file_converter::gConverterOptions, SO_O_EXACT);
file_converter::DecodeParams param;
int status = file_converter::parseDecodeCommandLine(&param, args);
std::cout << "Params: " << param.toString() << "\n";
if (status != FDB_EXIT_SUCCESS) {
file_converter::printDecodeUsage();
return status;
}
if (param.log_enabled) {
if (param.log_dir.empty()) {
setNetworkOption(FDBNetworkOptions::TRACE_ENABLE);
} else {
setNetworkOption(FDBNetworkOptions::TRACE_ENABLE, StringRef(param.log_dir));
}
if (!param.trace_format.empty()) {
setNetworkOption(FDBNetworkOptions::TRACE_FORMAT, StringRef(param.trace_format));
}
if (!param.trace_log_group.empty()) {
setNetworkOption(FDBNetworkOptions::TRACE_LOG_GROUP, StringRef(param.trace_log_group));
}
}
if (!param.tlsConfig.setupTLS()) {
TraceEvent(SevError, "TLSError");
throw tls_error();
}
platformInit();
Error::init();
StringRef url(param.container_url);
setupNetwork(0, true);
TraceEvent::setNetworkThread();
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "decode", param.trace_log_group);
param.tlsConfig.setupBlobCredentials();
auto f = stopAfter(decode_logs(param));
runNetwork();
return status;
} catch (Error& e) {
std::cerr << "ERROR: " << e.what() << "\n";
return FDB_EXIT_ERROR;
} catch (std::exception& e) {
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
return FDB_EXIT_MAIN_EXCEPTION;
}
}