Code refactoring

The BackupWorker produces files not in blocks, which should be fixed.
This commit is contained in:
Jingyu Zhou 2019-11-15 20:30:43 -08:00
parent 84a49cf389
commit c1748c0460
1 changed files with 77 additions and 67 deletions

View File

@ -35,23 +35,23 @@
namespace file_converter {
void printConvertUsage() {
printf("\n");
printf(" -r, --container Container URL.\n");
printf(" -b, --begin BEGIN\n"
" Begin version.\n");
printf(" -e, --end END End version.\n");
printf(" --log Enables trace file logging for the CLI session.\n"
" --logdir PATH Specifes the output directory for trace files. If\n"
" unspecified, defaults to the current directory. Has\n"
" no effect unless --log is specified.\n");
printf(" --loggroup LOG_GROUP\n"
" Sets the LogGroup field with the specified value for all\n"
" events in the trace output (defaults to `default').\n");
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" -h, --help Display this help and exit.\n");
printf("\n");
std::cout << "\n"
<< " -r, --container Container URL.\n"
<< " -b, --begin BEGIN\n"
<< " Begin version.\n"
<< " -e, --end END End version.\n"
<< " --log Enables trace file logging for the CLI session.\n"
<< " --logdir PATH Specifes the output directory for trace files. If\n"
<< " unspecified, defaults to the current directory. Has\n"
<< " no effect unless --log is specified.\n"
<< " --loggroup LOG_GROUP\n"
<< " Sets the LogGroup field with the specified value for all\n"
<< " events in the trace output (defaults to `default').\n"
<< " --trace_format FORMAT\n"
<< " Select the format of the trace files. xml (the default) and json are supported.\n"
<< " Has no effect unless --log is specified.\n"
<< " -h, --help Display this help and exit.\n"
<< "\n";
return;
}
@ -120,7 +120,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
: files(logs), beginVersion(begin), endVersion(end) {}
struct FileProgress : public ReferenceCounted<FileProgress> {
FileProgress(int idx, Reference<IAsyncFile> f) : index(idx), fd(f), offset(0), eof(false) {}
FileProgress(Reference<IAsyncFile> f) : fd(f), offset(0), eof(false) {}
bool operator<(const FileProgress& rhs) const {
if (rhs.mutations.empty()) return true;
@ -129,12 +129,59 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
}
bool empty() { return eof && mutations.empty(); }
const int index; // index in "files" vector
// Decodes the block into mutations and save them if >= minVersion.
// Returns true if new mutations has been saved.
bool decodeBlock(Standalone<StringRef> buf, int len, Version minVersion) {
StringRef block(buf.begin(), len);
StringRefReader reader(block, restore_corrupted_data());
try {
int count = 0, inserted = 0;
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF) break;
// Deserialize messages written in saveMutationsToFile().
Version msgVersion = reader.consume<Version>();
uint32_t sub = reader.consume<uint32_t>();
int msgSize = reader.consume<int>();
const uint8_t* message = reader.consume(msgSize);
BinaryReader rd(message, msgSize, AssumeVersion(currentProtocolVersion));
MutationRef m;
rd >> m;
count++;
if (msgVersion >= minVersion) {
mutations.emplace_back(LogMessageVersion(msgVersion, sub), m, buf.arena());
inserted++;
// std::cout << msgVersion << ":" << sub << " m = " << m.toString() << "\n";
}
}
offset += len;
TraceEvent("Decoded")
.detail("Name", fd->getFilename())
.detail("Count", count)
.detail("Insert", inserted)
.detail("Total", mutations.size())
.detail("NewOffset", offset);
return inserted > 0;
} catch (Error& e) {
TraceEvent(SevWarn, "CorruptLogFileBlock")
.error(e)
.detail("Filename", fd->getFilename())
.detail("BlockOffset", offset)
.detail("BlockLen", len)
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset);
throw;
}
}
Reference<IAsyncFile> fd;
int64_t offset; // offset of the file to be read
std::vector<VersionedData> mutations; // Buffered mutations read so far
bool eof; // If EOF is seen so far or endVersion is encountered. If true, the file can't be read further.
std::vector<VersionedData> mutations; // Buffered mutations read so far
};
bool hasMutations() {
@ -202,15 +249,10 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
}
wait(waitForAll(asyncFiles)); // open all files
state std::vector<Reference<IAsyncFile>> results;
for (const auto& file : asyncFiles) {
results.push_back(file.get());
}
// Attempt decode the first few blocks of log files until beginVersion is consumed
std::vector<Future<Void>> fileDecodes;
for (int idx = 0; idx < results.size(); idx++) {
Reference<FileProgress> fp(new FileProgress(idx, results[idx]));
for (auto& asyncfile : asyncFiles) {
Reference<FileProgress> fp(new FileProgress(asyncfile.get()));
progress->fileProgress.push_back(fp);
fileDecodes.push_back(decodeToVersion(fp, progress->beginVersion));
}
@ -222,17 +264,17 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
return Void();
}
// Decode the file until EOF or an mutation >= version
ACTOR static Future<Void> decodeToVersion(Reference<FileProgress> fileProgress, Version version) {
// Decodes the file until EOF or an mutation >= minVersion and saves these mutations.
ACTOR static Future<Void> decodeToVersion(Reference<FileProgress> fileProgress, Version minVersion) {
if (fileProgress->empty()) return Void();
if (!fileProgress->mutations.empty() && fileProgress->mutations.back().version.version >= version)
if (!fileProgress->mutations.empty() && fileProgress->mutations.back().version.version >= minVersion)
return Void();
state int len = (1 << 20);
try {
while (1) {
// Read block by block until we see the version
loop {
// Read block by block until we see the minVersion
state Standalone<StringRef> buf = makeString(len);
state int rLen = wait(fileProgress->fd->read(mutateString(buf), len, fileProgress->offset));
TraceEvent("ReadFile")
@ -243,47 +285,15 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
fileProgress->eof = true;
return Void();
}
StringRef block(buf.begin(), rLen);
state StringRefReader reader(block, restore_corrupted_data());
state bool found = false;
int count = 0, inserted = 0;
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF) break;
// Deserialize messages written in saveMutationsToFile().
Version msgVersion = reader.consume<Version>();
uint32_t sub = reader.consume<uint32_t>();
int msgSize = reader.consume<int>();
const uint8_t* message = reader.consume(msgSize);
BinaryReader rd(message, msgSize, AssumeVersion(currentProtocolVersion));
MutationRef m;
rd >> m;
count++;
if (msgVersion >= version) {
fileProgress->mutations.emplace_back(LogMessageVersion(msgVersion, sub), m, buf.arena());
found = true;
inserted++;
// std::cout << msgVersion << ":" << sub << " m = " << m.toString() << "\n";
}
}
std::cout << "Decoded " << count << " mutations in " << fileProgress->fd->getFilename() << ", inserted "
<< inserted << ", total " << fileProgress->mutations.size() << "\n";
if (found) break;
if (fileProgress->decodeBlock(buf, rLen, minVersion)) break;
}
fileProgress->offset += rLen;
return Void();
} catch (Error& e) {
TraceEvent(SevWarn, "FileConvertCorruptLogFileBlock")
.error(e)
.detail("Filename", fileProgress->fd->getFilename())
.detail("BlockOffset", fileProgress->offset)
.detail("BlockLen", len)
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + fileProgress->offset);
.detail("BlockLen", len);
throw;
}
}