2019-11-26 13:00:13 +08:00
|
|
|
/*
|
|
|
|
* FileDecoder.actor.cpp
|
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
2022-03-22 04:36:23 +08:00
|
|
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
2019-11-26 13:00:13 +08:00
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include <algorithm>
|
2021-07-31 06:58:22 +08:00
|
|
|
#include <cstdlib>
|
2019-11-26 13:00:13 +08:00
|
|
|
#include <iostream>
|
2021-07-31 06:58:22 +08:00
|
|
|
#include <limits>
|
2022-04-06 07:16:59 +08:00
|
|
|
#include <memory>
|
2021-07-31 06:58:22 +08:00
|
|
|
#include <string>
|
2019-11-26 13:00:13 +08:00
|
|
|
#include <vector>
|
2022-04-06 20:43:59 +08:00
|
|
|
#include <fcntl.h>
|
|
|
|
#ifdef _WIN32
|
|
|
|
#include <io.h>
|
|
|
|
#endif
|
2019-11-26 13:00:13 +08:00
|
|
|
|
2021-04-06 02:47:23 +08:00
|
|
|
#include "fdbbackup/BackupTLSConfig.h"
|
2022-03-24 02:23:10 +08:00
|
|
|
#include "fdbclient/BuildFlags.h"
|
|
|
|
#include "fdbbackup/FileConverter.h"
|
2019-11-26 13:00:13 +08:00
|
|
|
#include "fdbclient/BackupAgent.actor.h"
|
|
|
|
#include "fdbclient/BackupContainer.h"
|
2021-07-31 07:31:53 +08:00
|
|
|
#include "fdbclient/CommitTransaction.h"
|
|
|
|
#include "fdbclient/FDBTypes.h"
|
2022-03-24 02:23:10 +08:00
|
|
|
#include "fdbclient/IKnobCollection.h"
|
|
|
|
#include "fdbclient/Knobs.h"
|
2019-11-26 13:00:13 +08:00
|
|
|
#include "fdbclient/MutationList.h"
|
2022-03-24 02:23:10 +08:00
|
|
|
#include "flow/ArgParseUtil.h"
|
2021-08-01 03:45:47 +08:00
|
|
|
#include "flow/IRandom.h"
|
2022-04-06 07:16:59 +08:00
|
|
|
#include "flow/Platform.h"
|
2021-04-03 09:22:06 +08:00
|
|
|
#include "flow/Trace.h"
|
2019-11-26 13:00:13 +08:00
|
|
|
#include "flow/flow.h"
|
|
|
|
#include "flow/serialize.h"
|
2022-03-24 02:23:10 +08:00
|
|
|
|
2020-02-04 02:42:05 +08:00
|
|
|
#include "flow/actorcompiler.h" // has to be last include
|
2019-11-26 13:00:13 +08:00
|
|
|
|
2020-03-25 01:54:12 +08:00
|
|
|
#define SevDecodeInfo SevVerbose
|
|
|
|
|
Decode out of order mutations in old mutation logs
In the old mutation logs, a version's mutations are serialized as a buffer.
Then the buffer is split into smaller chunks, e.g., 10000 bytes each. When
writting chunks to the final mutation log file, these chunks can be flushed
out of order. For instance, the (version, chunck_part) can be in the order of
(3, 0), (4, 0), (3, 1). As a result, the decoder must read forward to find all
chunks of data for a version.
Another complication is that the files are organized into blocks, where (3, 1)
can be in a subsequent block. This change checks the value size for each
version, if the size is smaller than the right size, the decoder will look
for the missing chucks in the next block.
2020-03-11 06:45:57 +08:00
|
|
|
extern bool g_crashOnError;
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
namespace file_converter {
|
|
|
|
|
|
|
|
void printDecodeUsage() {
|
2021-08-28 08:07:47 +08:00
|
|
|
std::cout << "Decoder for FoundationDB backup mutation logs.\n"
|
|
|
|
"Usage: fdbdecode [OPTIONS]\n"
|
|
|
|
" -r, --container URL\n"
|
|
|
|
" Backup container URL, e.g., file:///some/path/.\n"
|
|
|
|
" -i, --input FILE\n"
|
|
|
|
" Log file filter, only matched files are decoded.\n"
|
|
|
|
" --log Enables trace file logging for the CLI session.\n"
|
|
|
|
" --logdir PATH Specifes the output directory for trace files. If\n"
|
|
|
|
" unspecified, defaults to the current directory. Has\n"
|
|
|
|
" no effect unless --log is specified.\n"
|
|
|
|
" --loggroup LOG_GROUP\n"
|
|
|
|
" Sets the LogGroup field with the specified value for all\n"
|
|
|
|
" events in the trace output (defaults to `default').\n"
|
2021-12-15 01:59:14 +08:00
|
|
|
" --trace-format FORMAT\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
" Select the format of the trace files, xml (the default) or json.\n"
|
|
|
|
" Has no effect unless --log is specified.\n"
|
|
|
|
" --crash Crash on serious error.\n"
|
2021-12-15 01:59:14 +08:00
|
|
|
" --blob-credentials FILE\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
" File containing blob credentials in JSON format.\n"
|
2022-05-03 13:15:27 +08:00
|
|
|
" The same credential format/file fdbbackup uses.\n" TLS_HELP
|
2021-12-15 01:59:14 +08:00
|
|
|
" --build-flags Print build information and exit.\n"
|
|
|
|
" --list-only Print file list and exit.\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
" -k KEY_PREFIX Use the prefix for filtering mutations\n"
|
2021-12-15 01:59:14 +08:00
|
|
|
" --hex-prefix HEX_PREFIX\n"
|
2022-03-24 02:23:10 +08:00
|
|
|
" The prefix specified in HEX format, e.g., \"\\\\x05\\\\x01\".\n"
|
2021-12-15 01:59:14 +08:00
|
|
|
" --begin-version-filter BEGIN_VERSION\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
" The version range's begin version (inclusive) for filtering.\n"
|
2021-12-15 01:59:14 +08:00
|
|
|
" --end-version-filter END_VERSION\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
" The version range's end version (exclusive) for filtering.\n"
|
2022-03-24 02:23:10 +08:00
|
|
|
" --knob-KNOBNAME KNOBVALUE\n"
|
2022-04-06 07:16:59 +08:00
|
|
|
" Changes a knob value. KNOBNAME should be lowercase.\n"
|
|
|
|
" -s, --save Save a copy of downloaded files (default: not saving).\n"
|
2021-08-28 08:07:47 +08:00
|
|
|
"\n";
|
2019-11-26 13:00:13 +08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-09-11 08:06:16 +08:00
|
|
|
void printBuildInformation() {
|
2021-04-03 09:22:06 +08:00
|
|
|
std::cout << jsonBuildInformation() << "\n";
|
2020-09-11 08:06:16 +08:00
|
|
|
}
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
struct DecodeParams {
|
|
|
|
std::string container_url;
|
2022-03-29 08:10:49 +08:00
|
|
|
Optional<std::string> proxy;
|
2021-04-03 00:27:49 +08:00
|
|
|
std::string fileFilter; // only files match the filter will be decoded
|
2021-08-01 06:30:30 +08:00
|
|
|
bool log_enabled = true;
|
2019-11-26 13:00:13 +08:00
|
|
|
std::string log_dir, trace_format, trace_log_group;
|
2021-04-06 02:47:23 +08:00
|
|
|
BackupTLSConfig tlsConfig;
|
2021-07-31 06:58:22 +08:00
|
|
|
bool list_only = false;
|
2022-04-06 07:16:59 +08:00
|
|
|
bool save_file_locally = false;
|
2021-07-31 06:58:22 +08:00
|
|
|
std::string prefix; // Key prefix for filtering
|
|
|
|
Version beginVersionFilter = 0;
|
|
|
|
Version endVersionFilter = std::numeric_limits<Version>::max();
|
|
|
|
|
2022-03-24 02:23:10 +08:00
|
|
|
std::vector<std::pair<std::string, std::string>> knobs;
|
|
|
|
|
2021-07-31 06:58:22 +08:00
|
|
|
// Returns if [begin, end) overlap with the filter range
|
|
|
|
bool overlap(Version begin, Version end) const {
|
|
|
|
// Filter [100, 200), [50,75) [200, 300)
|
|
|
|
return !(begin >= endVersionFilter || end <= beginVersionFilter);
|
|
|
|
}
|
2019-11-26 13:00:13 +08:00
|
|
|
|
|
|
|
std::string toString() {
|
|
|
|
std::string s;
|
|
|
|
s.append("ContainerURL: ");
|
|
|
|
s.append(container_url);
|
2022-03-29 08:10:49 +08:00
|
|
|
if (proxy.present()) {
|
|
|
|
s.append(", Proxy: ");
|
|
|
|
s.append(proxy.get());
|
|
|
|
}
|
2021-04-03 00:27:49 +08:00
|
|
|
s.append(", FileFilter: ");
|
|
|
|
s.append(fileFilter);
|
2019-11-26 13:00:13 +08:00
|
|
|
if (log_enabled) {
|
|
|
|
if (!log_dir.empty()) {
|
|
|
|
s.append(" LogDir:").append(log_dir);
|
|
|
|
}
|
|
|
|
if (!trace_format.empty()) {
|
|
|
|
s.append(" Format:").append(trace_format);
|
|
|
|
}
|
|
|
|
if (!trace_log_group.empty()) {
|
|
|
|
s.append(" LogGroup:").append(trace_log_group);
|
|
|
|
}
|
|
|
|
}
|
2021-07-31 06:58:22 +08:00
|
|
|
s.append(", list_only: ").append(list_only ? "true" : "false");
|
|
|
|
if (beginVersionFilter != 0) {
|
|
|
|
s.append(", beginVersionFilter: ").append(std::to_string(beginVersionFilter));
|
|
|
|
}
|
|
|
|
if (endVersionFilter < std::numeric_limits<Version>::max()) {
|
|
|
|
s.append(", endVersionFilter: ").append(std::to_string(endVersionFilter));
|
|
|
|
}
|
2021-08-01 03:52:39 +08:00
|
|
|
if (!prefix.empty()) {
|
2021-08-01 09:50:33 +08:00
|
|
|
s.append(", KeyPrefix: ").append(printable(KeyRef(prefix)));
|
2021-08-01 03:52:39 +08:00
|
|
|
}
|
2022-03-24 02:23:10 +08:00
|
|
|
for (const auto& [knob, value] : knobs) {
|
|
|
|
s.append(", KNOB-").append(knob).append(" = ").append(value);
|
|
|
|
}
|
2022-04-06 07:16:59 +08:00
|
|
|
s.append(", SaveFile: ").append(save_file_locally ? "true" : "false");
|
2019-11-26 13:00:13 +08:00
|
|
|
return s;
|
|
|
|
}
|
2022-03-24 02:23:10 +08:00
|
|
|
|
|
|
|
void updateKnobs() {
|
2022-03-25 06:17:02 +08:00
|
|
|
IKnobCollection::setupKnobs(knobs);
|
2022-03-24 02:23:10 +08:00
|
|
|
|
|
|
|
// Reinitialize knobs in order to update knobs that are dependent on explicitly set knobs
|
2022-03-25 06:17:02 +08:00
|
|
|
IKnobCollection::getMutableGlobalKnobCollection().initialize(Randomize::False, IsSimulated::False);
|
2022-03-24 02:23:10 +08:00
|
|
|
}
|
2021-08-01 12:00:07 +08:00
|
|
|
};
|
2021-04-03 01:18:26 +08:00
|
|
|
|
2021-08-01 12:00:07 +08:00
|
|
|
// Decode an ASCII string, e.g., "\x15\x1b\x19\x04\xaf\x0c\x28\x0a",
|
|
|
|
// into the binary string.
|
|
|
|
std::string decode_hex_string(std::string line) {
|
|
|
|
size_t i = 0;
|
|
|
|
std::string ret;
|
|
|
|
|
|
|
|
while (i <= line.length()) {
|
|
|
|
switch (line[i]) {
|
|
|
|
case '\\':
|
|
|
|
if (i + 2 > line.length()) {
|
|
|
|
std::cerr << "Invalid hex string at: " << i << "\n";
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
switch (line[i + 1]) {
|
|
|
|
char ent, save;
|
|
|
|
case '"':
|
|
|
|
case '\\':
|
|
|
|
case ' ':
|
|
|
|
case ';':
|
|
|
|
line.erase(i, 1);
|
|
|
|
break;
|
|
|
|
case 'x':
|
|
|
|
if (i + 4 > line.length()) {
|
|
|
|
std::cerr << "Invalid hex string at: " << i << "\n";
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
char* pEnd;
|
|
|
|
save = line[i + 4];
|
|
|
|
line[i + 4] = 0;
|
|
|
|
ent = char(strtoul(line.data() + i + 2, &pEnd, 16));
|
|
|
|
if (*pEnd) {
|
|
|
|
std::cerr << "Invalid hex string at: " << i << "\n";
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
line[i + 4] = save;
|
|
|
|
line.replace(i, 4, 1, ent);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
std::cerr << "Invalid hex string at: " << i << "\n";
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
i++;
|
|
|
|
}
|
|
|
|
}
|
2021-04-03 01:18:26 +08:00
|
|
|
|
2021-08-01 12:00:07 +08:00
|
|
|
return line.substr(0, i);
|
|
|
|
}
|
2019-11-26 13:00:13 +08:00
|
|
|
|
|
|
|
int parseDecodeCommandLine(DecodeParams* param, CSimpleOpt* args) {
|
|
|
|
while (args->Next()) {
|
|
|
|
auto lastError = args->LastError();
|
|
|
|
switch (lastError) {
|
|
|
|
case SO_SUCCESS:
|
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
|
|
|
std::cerr << "ERROR: argument given for option: " << args->OptionText() << "\n";
|
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
int optId = args->OptionId();
|
|
|
|
switch (optId) {
|
|
|
|
case OPT_HELP:
|
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
|
|
|
|
case OPT_CONTAINER:
|
|
|
|
param->container_url = args->OptionArg();
|
|
|
|
break;
|
|
|
|
|
2021-07-31 06:58:22 +08:00
|
|
|
case OPT_LIST_ONLY:
|
|
|
|
param->list_only = true;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_KEY_PREFIX:
|
2021-07-31 07:31:53 +08:00
|
|
|
param->prefix = args->OptionArg();
|
2021-07-31 06:58:22 +08:00
|
|
|
break;
|
|
|
|
|
2021-08-01 12:00:07 +08:00
|
|
|
case OPT_HEX_KEY_PREFIX:
|
|
|
|
param->prefix = decode_hex_string(args->OptionArg());
|
|
|
|
break;
|
|
|
|
|
2021-07-31 06:58:22 +08:00
|
|
|
case OPT_BEGIN_VERSION_FILTER:
|
|
|
|
param->beginVersionFilter = std::atoll(args->OptionArg());
|
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_END_VERSION_FILTER:
|
|
|
|
param->endVersionFilter = std::atoll(args->OptionArg());
|
|
|
|
break;
|
|
|
|
|
Decode out of order mutations in old mutation logs
In the old mutation logs, a version's mutations are serialized as a buffer.
Then the buffer is split into smaller chunks, e.g., 10000 bytes each. When
writting chunks to the final mutation log file, these chunks can be flushed
out of order. For instance, the (version, chunck_part) can be in the order of
(3, 0), (4, 0), (3, 1). As a result, the decoder must read forward to find all
chunks of data for a version.
Another complication is that the files are organized into blocks, where (3, 1)
can be in a subsequent block. This change checks the value size for each
version, if the size is smaller than the right size, the decoder will look
for the missing chucks in the next block.
2020-03-11 06:45:57 +08:00
|
|
|
case OPT_CRASHONERROR:
|
|
|
|
g_crashOnError = true;
|
|
|
|
break;
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
case OPT_INPUT_FILE:
|
2021-04-03 00:27:49 +08:00
|
|
|
param->fileFilter = args->OptionArg();
|
2019-11-26 13:00:13 +08:00
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_TRACE:
|
|
|
|
param->log_enabled = true;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_TRACE_DIR:
|
|
|
|
param->log_dir = args->OptionArg();
|
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_TRACE_FORMAT:
|
2021-08-01 06:30:30 +08:00
|
|
|
if (!selectTraceFormatter(args->OptionArg())) {
|
2019-11-26 13:00:13 +08:00
|
|
|
std::cerr << "ERROR: Unrecognized trace format " << args->OptionArg() << "\n";
|
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
}
|
|
|
|
param->trace_format = args->OptionArg();
|
|
|
|
break;
|
|
|
|
|
|
|
|
case OPT_TRACE_LOG_GROUP:
|
|
|
|
param->trace_log_group = args->OptionArg();
|
|
|
|
break;
|
2021-04-03 00:27:49 +08:00
|
|
|
|
2021-04-03 09:22:06 +08:00
|
|
|
case OPT_BLOB_CREDENTIALS:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.blobCredentials.push_back(args->OptionArg());
|
2021-04-03 09:22:06 +08:00
|
|
|
break;
|
|
|
|
|
2022-03-24 02:23:10 +08:00
|
|
|
case OPT_KNOB: {
|
|
|
|
Optional<std::string> knobName = extractPrefixedArgument("--knob", args->OptionSyntax());
|
|
|
|
if (!knobName.present()) {
|
|
|
|
std::cerr << "ERROR: unable to parse knob option '" << args->OptionSyntax() << "'\n";
|
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
}
|
|
|
|
param->knobs.emplace_back(knobName.get(), args->OptionArg());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2022-04-06 07:16:59 +08:00
|
|
|
case OPT_SAVE_FILE:
|
|
|
|
param->save_file_locally = true;
|
|
|
|
break;
|
|
|
|
|
2021-04-03 01:18:26 +08:00
|
|
|
case TLSConfig::OPT_TLS_PLUGIN:
|
|
|
|
args->OptionArg();
|
|
|
|
break;
|
|
|
|
|
|
|
|
case TLSConfig::OPT_TLS_CERTIFICATES:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.tlsCertPath = args->OptionArg();
|
2021-04-03 01:18:26 +08:00
|
|
|
break;
|
|
|
|
|
|
|
|
case TLSConfig::OPT_TLS_PASSWORD:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.tlsPassword = args->OptionArg();
|
2021-04-03 01:18:26 +08:00
|
|
|
break;
|
|
|
|
|
|
|
|
case TLSConfig::OPT_TLS_CA_FILE:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.tlsCAPath = args->OptionArg();
|
2021-04-03 01:18:26 +08:00
|
|
|
break;
|
|
|
|
|
|
|
|
case TLSConfig::OPT_TLS_KEY:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.tlsKeyPath = args->OptionArg();
|
2021-04-03 01:18:26 +08:00
|
|
|
break;
|
|
|
|
|
|
|
|
case TLSConfig::OPT_TLS_VERIFY_PEERS:
|
2021-04-06 02:47:23 +08:00
|
|
|
param->tlsConfig.tlsVerifyPeers = args->OptionArg();
|
2021-04-03 01:18:26 +08:00
|
|
|
break;
|
|
|
|
|
2020-09-11 08:06:16 +08:00
|
|
|
case OPT_BUILD_FLAGS:
|
|
|
|
printBuildInformation();
|
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
break;
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return FDB_EXIT_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
void printLogFiles(std::string msg, const std::vector<LogFile>& files) {
|
|
|
|
std::cout << msg << " " << files.size() << " log files\n";
|
|
|
|
for (const auto& file : files) {
|
|
|
|
std::cout << file.toString() << "\n";
|
|
|
|
}
|
|
|
|
std::cout << std::endl;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<LogFile> getRelevantLogFiles(const std::vector<LogFile>& files, const DecodeParams& params) {
|
|
|
|
std::vector<LogFile> filtered;
|
|
|
|
for (const auto& file : files) {
|
2021-07-31 06:58:22 +08:00
|
|
|
if (file.fileName.find(params.fileFilter) != std::string::npos &&
|
|
|
|
params.overlap(file.beginVersion, file.endVersion + 1)) {
|
2019-11-26 13:00:13 +08:00
|
|
|
filtered.push_back(file);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return filtered;
|
|
|
|
}
|
|
|
|
|
2019-11-27 09:37:33 +08:00
|
|
|
struct VersionedMutations {
|
|
|
|
Version version;
|
|
|
|
std::vector<MutationRef> mutations;
|
2021-08-02 05:08:54 +08:00
|
|
|
std::string serializedMutations; // buffer that contains mutations
|
2020-12-26 02:55:42 +08:00
|
|
|
};
|
|
|
|
|
2019-11-27 09:37:33 +08:00
|
|
|
/*
|
|
|
|
* Model a decoding progress for a mutation file. Usage is:
|
|
|
|
*
|
|
|
|
* DecodeProgress progress(logfile);
|
|
|
|
* wait(progress->openFile(container));
|
2022-04-06 07:16:59 +08:00
|
|
|
* while (1) {
|
|
|
|
* Optional<VersionedMutations> batch = wait(progress->getNextBatch());
|
|
|
|
* if (!batch.present()) break;
|
|
|
|
* ... // process the batch mutations
|
2019-11-27 09:37:33 +08:00
|
|
|
* }
|
2019-12-03 02:27:48 +08:00
|
|
|
*
|
|
|
|
* Internally, the decoding process is done block by block -- each block is
|
|
|
|
* decoded into a list of key/value pairs, which are then decoded into batches
|
|
|
|
* of mutations. Because a version's mutations can be split into many key/value
|
2022-04-06 07:16:59 +08:00
|
|
|
* pairs, the decoding of mutation needs to look ahead to find all batches that
|
|
|
|
* belong to the same version.
|
2019-11-27 09:37:33 +08:00
|
|
|
*/
|
2020-12-26 02:55:42 +08:00
|
|
|
class DecodeProgress {
|
2021-08-03 00:21:33 +08:00
|
|
|
std::vector<Standalone<VectorRef<KeyValueRef>>> blocks;
|
2021-08-02 13:39:28 +08:00
|
|
|
std::unordered_map<Version, fileBackup::AccumulatedMutations> mutationBlocksByVersion;
|
2020-12-26 02:55:42 +08:00
|
|
|
|
|
|
|
public:
|
2019-11-26 13:00:13 +08:00
|
|
|
DecodeProgress() = default;
|
2022-04-06 07:16:59 +08:00
|
|
|
DecodeProgress(const LogFile& file, bool save) : file(file), save(save) {}
|
2019-11-27 09:37:33 +08:00
|
|
|
|
2022-04-06 07:16:59 +08:00
|
|
|
~DecodeProgress() {
|
|
|
|
if (lfd != -1) {
|
|
|
|
close(lfd);
|
|
|
|
}
|
|
|
|
}
|
2019-11-27 09:37:33 +08:00
|
|
|
|
2021-08-02 05:08:54 +08:00
|
|
|
// Open and loads file into memory
|
2019-11-27 09:37:33 +08:00
|
|
|
Future<Void> openFile(Reference<IBackupContainer> container) { return openFileImpl(this, container); }
|
|
|
|
|
|
|
|
// The following are private APIs:
|
|
|
|
|
2019-12-03 02:27:48 +08:00
|
|
|
// Returns the next batch of mutations along with the arena backing it.
|
2020-03-31 02:34:51 +08:00
|
|
|
// Note the returned batch can be empty when the file has unfinished
|
|
|
|
// version batch data that are in the next file.
|
2022-04-06 07:16:59 +08:00
|
|
|
Optional<VersionedMutations> getNextBatch() {
|
2021-08-02 05:08:54 +08:00
|
|
|
for (auto& [version, m] : mutationBlocksByVersion) {
|
|
|
|
if (m.isComplete()) {
|
2022-04-06 07:16:59 +08:00
|
|
|
VersionedMutations vms;
|
2021-08-02 05:08:54 +08:00
|
|
|
vms.version = version;
|
|
|
|
vms.serializedMutations = m.serializedMutations;
|
2022-04-06 07:16:59 +08:00
|
|
|
vms.mutations = fileBackup::decodeMutationLogValue(vms.serializedMutations);
|
|
|
|
TraceEvent("Decode").detail("Version", vms.version).detail("N", vms.mutations.size());
|
2021-08-02 05:08:54 +08:00
|
|
|
mutationBlocksByVersion.erase(version);
|
|
|
|
return vms;
|
2019-11-27 09:37:33 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-02 05:08:54 +08:00
|
|
|
// No complete versions
|
2021-08-03 00:26:42 +08:00
|
|
|
if (!mutationBlocksByVersion.empty()) {
|
|
|
|
TraceEvent(SevWarn, "UnfishedBlocks").detail("NumberOfVersions", mutationBlocksByVersion.size());
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
2022-04-06 07:16:59 +08:00
|
|
|
return Optional<VersionedMutations>();
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
|
|
|
|
2019-11-27 09:37:33 +08:00
|
|
|
ACTOR static Future<Void> openFileImpl(DecodeProgress* self, Reference<IBackupContainer> container) {
|
|
|
|
Reference<IAsyncFile> fd = wait(container->readFile(self->file.fileName));
|
|
|
|
self->fd = fd;
|
2022-04-06 07:16:59 +08:00
|
|
|
if (self->save) {
|
|
|
|
std::string dir = self->file.fileName;
|
|
|
|
std::size_t found = self->file.fileName.find_last_of('/');
|
|
|
|
if (found != std::string::npos) {
|
|
|
|
std::string path = self->file.fileName.substr(0, found);
|
|
|
|
if (!directoryExists(path)) {
|
|
|
|
platform::createDirectory(path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
self->lfd = open(self->file.fileName.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
|
|
|
|
if (self->lfd == -1) {
|
|
|
|
TraceEvent(SevError, "OpenLocalFileFailed").detail("File", self->file.fileName);
|
|
|
|
throw platform_error();
|
|
|
|
}
|
|
|
|
}
|
2021-08-02 05:08:54 +08:00
|
|
|
while (!self->eof) {
|
|
|
|
wait(readAndDecodeFile(self));
|
2019-11-27 09:37:33 +08:00
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
}
|
2019-11-26 13:00:13 +08:00
|
|
|
|
2021-08-03 00:21:33 +08:00
|
|
|
// Add chunks to mutationBlocksByVersion
|
|
|
|
void addBlockKVPairs(VectorRef<KeyValueRef> chunks) {
|
|
|
|
for (auto& kv : chunks) {
|
2021-08-02 13:39:28 +08:00
|
|
|
auto versionAndChunkNumber = fileBackup::decodeMutationLogKey(kv.key);
|
2021-08-02 05:08:54 +08:00
|
|
|
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
|
2019-11-27 09:37:33 +08:00
|
|
|
}
|
|
|
|
}
|
2019-11-26 13:00:13 +08:00
|
|
|
|
2019-12-03 02:27:48 +08:00
|
|
|
// Reads a file block, decodes it into key/value pairs, and stores these pairs.
|
2019-11-27 09:37:33 +08:00
|
|
|
ACTOR static Future<Void> readAndDecodeFile(DecodeProgress* self) {
|
2019-11-26 13:00:13 +08:00
|
|
|
try {
|
2019-11-27 09:37:33 +08:00
|
|
|
state int64_t len = std::min<int64_t>(self->file.blockSize, self->file.fileSize - self->offset);
|
|
|
|
if (len == 0) {
|
|
|
|
self->eof = true;
|
|
|
|
return Void();
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
2019-11-27 09:37:33 +08:00
|
|
|
|
2021-08-03 00:21:33 +08:00
|
|
|
// Decode a file block into log_key and log_value chunks
|
2022-04-06 07:16:59 +08:00
|
|
|
state Standalone<VectorRef<KeyValueRef>> chunks =
|
2021-08-02 13:39:28 +08:00
|
|
|
wait(fileBackup::decodeMutationLogFileBlock(self->fd, self->offset, len));
|
2021-08-03 00:21:33 +08:00
|
|
|
self->blocks.push_back(chunks);
|
2021-08-02 05:08:54 +08:00
|
|
|
|
2022-04-06 07:16:59 +08:00
|
|
|
if (self->save) {
|
|
|
|
ASSERT(self->lfd != -1);
|
|
|
|
|
|
|
|
// Read the chunck one more time
|
|
|
|
state Standalone<StringRef> buf = makeString(len);
|
|
|
|
int rLen = wait(self->fd->read(mutateString(buf), len, self->offset));
|
|
|
|
if (rLen != len)
|
|
|
|
throw restore_bad_read();
|
|
|
|
|
|
|
|
int wlen = write(self->lfd, buf.begin(), len);
|
|
|
|
if (wlen != len) {
|
|
|
|
TraceEvent(SevError, "WriteLocalFileFailed")
|
|
|
|
.detail("File", self->file.fileName)
|
|
|
|
.detail("Offset", self->offset)
|
|
|
|
.detail("Len", len)
|
|
|
|
.detail("Wrote", wlen);
|
|
|
|
throw platform_error();
|
|
|
|
}
|
|
|
|
TraceEvent("WriteLocalFile")
|
|
|
|
.detail("Name", self->file.fileName)
|
|
|
|
.detail("Len", len)
|
|
|
|
.detail("Offset", self->offset);
|
|
|
|
}
|
|
|
|
|
2019-11-27 09:37:33 +08:00
|
|
|
TraceEvent("ReadFile")
|
|
|
|
.detail("Name", self->file.fileName)
|
2021-08-02 05:08:54 +08:00
|
|
|
.detail("Len", len)
|
2019-11-27 09:37:33 +08:00
|
|
|
.detail("Offset", self->offset);
|
2021-08-03 00:21:33 +08:00
|
|
|
self->addBlockKVPairs(chunks);
|
2021-08-02 05:08:54 +08:00
|
|
|
self->offset += len;
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
TraceEvent(SevWarn, "CorruptLogFileBlock")
|
|
|
|
.error(e)
|
|
|
|
.detail("Filename", self->file.fileName)
|
|
|
|
.detail("BlockOffset", self->offset)
|
|
|
|
.detail("BlockLen", self->file.blockSize);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
LogFile file;
|
|
|
|
Reference<IAsyncFile> fd;
|
|
|
|
int64_t offset = 0;
|
|
|
|
bool eof = false;
|
2022-04-06 07:16:59 +08:00
|
|
|
bool save = false;
|
|
|
|
int lfd = -1; // local file descriptor
|
2019-11-26 13:00:13 +08:00
|
|
|
};
|
|
|
|
|
2021-08-02 05:08:54 +08:00
|
|
|
ACTOR Future<Void> process_file(Reference<IBackupContainer> container, LogFile file, UID uid, DecodeParams params) {
|
|
|
|
if (file.fileSize == 0) {
|
2021-08-02 12:55:39 +08:00
|
|
|
TraceEvent("SkipEmptyFile", uid).detail("Name", file.fileName);
|
2021-08-02 05:08:54 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2022-04-06 07:16:59 +08:00
|
|
|
state DecodeProgress progress(file, params.save_file_locally);
|
2021-08-02 05:08:54 +08:00
|
|
|
wait(progress.openFile(container));
|
2022-04-06 07:16:59 +08:00
|
|
|
while (true) {
|
|
|
|
auto batch = progress.getNextBatch();
|
|
|
|
if (!batch.present())
|
|
|
|
break;
|
|
|
|
|
|
|
|
const VersionedMutations& vms = batch.get();
|
2021-08-02 05:08:54 +08:00
|
|
|
if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) {
|
|
|
|
TraceEvent("SkipVersion").detail("Version", vms.version);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
int sub = 0;
|
|
|
|
for (const auto& m : vms.mutations) {
|
|
|
|
sub++; // sub sequence number starts at 1
|
|
|
|
bool print = params.prefix.empty(); // no filtering
|
|
|
|
|
|
|
|
if (!print) {
|
|
|
|
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
|
|
|
print = m.param1.startsWith(StringRef(params.prefix));
|
|
|
|
} else if (m.type == MutationRef::ClearRange) {
|
|
|
|
KeyRange range(KeyRangeRef(m.param1, m.param2));
|
2022-03-23 07:04:56 +08:00
|
|
|
KeyRange range2 = prefixRange(StringRef(params.prefix));
|
|
|
|
print = range.intersects(range2);
|
2021-08-02 05:08:54 +08:00
|
|
|
} else {
|
|
|
|
ASSERT(false);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (print) {
|
2021-08-02 10:25:39 +08:00
|
|
|
TraceEvent(format("Mutation_%llu_%d", vms.version, sub).c_str(), uid)
|
2021-08-28 08:07:47 +08:00
|
|
|
.detail("Version", vms.version)
|
|
|
|
.setMaxFieldLength(10000)
|
|
|
|
.detail("M", m.toString());
|
2021-08-02 05:08:54 +08:00
|
|
|
std::cout << vms.version << " " << m.toString() << "\n";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-08-02 12:55:39 +08:00
|
|
|
TraceEvent("ProcessFileDone", uid).detail("File", file.fileName);
|
2021-08-02 05:08:54 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
ACTOR Future<Void> decode_logs(DecodeParams params) {
|
2022-03-29 08:10:49 +08:00
|
|
|
state Reference<IBackupContainer> container =
|
|
|
|
IBackupContainer::openContainer(params.container_url, params.proxy, {});
|
2021-08-01 03:45:47 +08:00
|
|
|
state UID uid = deterministicRandom()->randomUniqueID();
|
2019-11-26 13:00:13 +08:00
|
|
|
state BackupFileList listing = wait(container->dumpFileList());
|
Decode out of order mutations in old mutation logs
In the old mutation logs, a version's mutations are serialized as a buffer.
Then the buffer is split into smaller chunks, e.g., 10000 bytes each. When
writting chunks to the final mutation log file, these chunks can be flushed
out of order. For instance, the (version, chunck_part) can be in the order of
(3, 0), (4, 0), (3, 1). As a result, the decoder must read forward to find all
chunks of data for a version.
Another complication is that the files are organized into blocks, where (3, 1)
can be in a subsequent block. This change checks the value size for each
version, if the size is smaller than the right size, the decoder will look
for the missing chucks in the next block.
2020-03-11 06:45:57 +08:00
|
|
|
// remove partitioned logs
|
2021-03-11 02:06:03 +08:00
|
|
|
listing.logs.erase(std::remove_if(listing.logs.begin(),
|
|
|
|
listing.logs.end(),
|
Decode out of order mutations in old mutation logs
In the old mutation logs, a version's mutations are serialized as a buffer.
Then the buffer is split into smaller chunks, e.g., 10000 bytes each. When
writting chunks to the final mutation log file, these chunks can be flushed
out of order. For instance, the (version, chunck_part) can be in the order of
(3, 0), (4, 0), (3, 1). As a result, the decoder must read forward to find all
chunks of data for a version.
Another complication is that the files are organized into blocks, where (3, 1)
can be in a subsequent block. This change checks the value size for each
version, if the size is smaller than the right size, the decoder will look
for the missing chucks in the next block.
2020-03-11 06:45:57 +08:00
|
|
|
[](const LogFile& file) {
|
|
|
|
std::string prefix("plogs/");
|
|
|
|
return file.fileName.substr(0, prefix.size()) == prefix;
|
|
|
|
}),
|
|
|
|
listing.logs.end());
|
2019-11-26 13:00:13 +08:00
|
|
|
std::sort(listing.logs.begin(), listing.logs.end());
|
2021-08-01 03:45:47 +08:00
|
|
|
TraceEvent("Container", uid).detail("URL", params.container_url).detail("Logs", listing.logs.size());
|
|
|
|
TraceEvent("DecodeParam", uid).setMaxFieldLength(100000).detail("Value", params.toString());
|
2019-11-26 13:00:13 +08:00
|
|
|
|
|
|
|
BackupDescription desc = wait(container->describeBackup());
|
|
|
|
std::cout << "\n" << desc.toString() << "\n";
|
|
|
|
|
|
|
|
state std::vector<LogFile> logs = getRelevantLogFiles(listing.logs, params);
|
|
|
|
printLogFiles("Relevant files are: ", logs);
|
|
|
|
|
2021-08-28 08:07:47 +08:00
|
|
|
if (params.list_only)
|
|
|
|
return Void();
|
2021-07-31 06:58:22 +08:00
|
|
|
|
2021-08-02 05:08:54 +08:00
|
|
|
state int idx = 0;
|
|
|
|
while (idx < logs.size()) {
|
2021-08-02 12:55:39 +08:00
|
|
|
TraceEvent("ProcessFile").detail("Name", logs[idx].fileName).detail("I", idx);
|
2021-08-02 05:08:54 +08:00
|
|
|
wait(process_file(container, logs[idx], uid, params));
|
|
|
|
idx++;
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
2021-08-08 15:03:25 +08:00
|
|
|
TraceEvent("DecodeDone", uid).log();
|
2019-11-26 13:00:13 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace file_converter
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
try {
|
2022-04-06 07:16:59 +08:00
|
|
|
std::unique_ptr<CSimpleOpt> args(
|
|
|
|
new CSimpleOpt(argc, argv, file_converter::gConverterOptions, SO_O_EXACT | SO_O_HYPHEN_TO_UNDERSCORE));
|
2019-11-26 13:00:13 +08:00
|
|
|
file_converter::DecodeParams param;
|
2022-04-06 07:16:59 +08:00
|
|
|
int status = file_converter::parseDecodeCommandLine(¶m, args.get());
|
2019-11-26 13:00:13 +08:00
|
|
|
std::cout << "Params: " << param.toString() << "\n";
|
|
|
|
if (status != FDB_EXIT_SUCCESS) {
|
|
|
|
file_converter::printDecodeUsage();
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (param.log_enabled) {
|
|
|
|
if (param.log_dir.empty()) {
|
|
|
|
setNetworkOption(FDBNetworkOptions::TRACE_ENABLE);
|
|
|
|
} else {
|
|
|
|
setNetworkOption(FDBNetworkOptions::TRACE_ENABLE, StringRef(param.log_dir));
|
|
|
|
}
|
|
|
|
if (!param.trace_format.empty()) {
|
|
|
|
setNetworkOption(FDBNetworkOptions::TRACE_FORMAT, StringRef(param.trace_format));
|
2021-08-01 06:30:30 +08:00
|
|
|
} else {
|
|
|
|
setNetworkOption(FDBNetworkOptions::TRACE_FORMAT, "json"_sr);
|
2019-11-26 13:00:13 +08:00
|
|
|
}
|
|
|
|
if (!param.trace_log_group.empty()) {
|
|
|
|
setNetworkOption(FDBNetworkOptions::TRACE_LOG_GROUP, StringRef(param.trace_log_group));
|
|
|
|
}
|
|
|
|
}
|
2021-04-03 09:22:06 +08:00
|
|
|
|
2021-04-06 02:47:23 +08:00
|
|
|
if (!param.tlsConfig.setupTLS()) {
|
2021-07-27 10:55:10 +08:00
|
|
|
TraceEvent(SevError, "TLSError").log();
|
2021-04-03 09:22:06 +08:00
|
|
|
throw tls_error();
|
|
|
|
}
|
2019-11-26 13:00:13 +08:00
|
|
|
|
|
|
|
platformInit();
|
|
|
|
Error::init();
|
|
|
|
|
|
|
|
StringRef url(param.container_url);
|
2021-07-17 15:11:40 +08:00
|
|
|
setupNetwork(0, UseMetrics::True);
|
2019-11-26 13:00:13 +08:00
|
|
|
|
2022-03-24 02:23:10 +08:00
|
|
|
// Must be called after setupNetwork() to be effective
|
|
|
|
param.updateKnobs();
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
TraceEvent::setNetworkThread();
|
2021-08-01 03:24:59 +08:00
|
|
|
openTraceFile(NetworkAddress(), 10 << 20, 500 << 20, param.log_dir, "decode", param.trace_log_group);
|
2021-04-06 02:47:23 +08:00
|
|
|
param.tlsConfig.setupBlobCredentials();
|
2019-11-26 13:00:13 +08:00
|
|
|
|
|
|
|
auto f = stopAfter(decode_logs(param));
|
|
|
|
|
|
|
|
runNetwork();
|
2021-08-01 02:28:13 +08:00
|
|
|
|
|
|
|
flushTraceFileVoid();
|
|
|
|
fflush(stdout);
|
|
|
|
closeTraceFile();
|
|
|
|
|
2019-11-26 13:00:13 +08:00
|
|
|
return status;
|
|
|
|
} catch (Error& e) {
|
2021-04-03 01:18:26 +08:00
|
|
|
std::cerr << "ERROR: " << e.what() << "\n";
|
2019-11-26 13:00:13 +08:00
|
|
|
return FDB_EXIT_ERROR;
|
|
|
|
} catch (std::exception& e) {
|
|
|
|
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
|
|
|
|
return FDB_EXIT_MAIN_EXCEPTION;
|
|
|
|
}
|
2020-02-04 02:42:05 +08:00
|
|
|
}
|