Use block size encoded in file names

The log files have block size encoded in their names and the converter should
use these sizes.
This commit is contained in:
Jingyu Zhou 2019-11-21 15:21:01 -08:00
parent 1123157ae0
commit 114e153bc8
1 changed files with 44 additions and 32 deletions

View File

@ -138,7 +138,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
: files(logs), beginVersion(begin), endVersion(end) {}
struct FileProgress : public ReferenceCounted<FileProgress> {
FileProgress(Reference<IAsyncFile> f) : fd(f), offset(0), eof(false) {}
FileProgress(Reference<IAsyncFile> f, int index) : fd(f), idx(index), offset(0), eof(false) {}
bool operator<(const FileProgress& rhs) const {
if (rhs.mutations.empty()) return true;
@ -157,15 +157,16 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
bool decodeBlock(const Standalone<StringRef>& buf, int len, Version minVersion, Version maxVersion) {
StringRef block(buf.begin(), len);
StringRefReader reader(block, restore_corrupted_data());
int count = 0, inserted = 0;
Version msgVersion = invalidVersion;
try {
int count = 0, inserted = 0;
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF) break;
// Deserialize messages written in saveMutationsToFile().
Version msgVersion = reader.consume<Version>();
msgVersion = reader.consume<Version>();
uint32_t sub = reader.consume<uint32_t>();
int msgSize = reader.consume<int>();
const uint8_t* message = reader.consume(msgSize);
@ -174,6 +175,8 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
MutationRef m;
rd >> m;
count++;
// std::cout << msgVersion << "." << sub << " m = " << m.toString() << " size=" << msgSize << " " <<
// fd->getFilename() << "\n";
if (msgVersion >= maxVersion) {
TraceEvent("FileDecodeEnd")
.detail("MaxV", maxVersion)
@ -186,8 +189,6 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
mutations.emplace_back(LogMessageVersion(msgVersion, sub), StringRef(message, msgSize),
buf.arena());
inserted++;
// std::cout << msgVersion << "." << sub << " m = " << m.toString() << " size=" << msgSize << "
// " << fd->getFilename() << "\n";
}
}
offset += len;
@ -196,7 +197,10 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
.detail("Name", fd->getFilename())
.detail("Count", count)
.detail("Insert", inserted)
.detail("BlockOffset", reader.rptr - buf.begin())
.detail("Total", mutations.size())
.detail("EOF", eof)
.detail("Version", msgVersion)
.detail("NewOffset", offset);
return inserted > 0;
} catch (Error& e) {
@ -212,6 +216,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
}
Reference<IAsyncFile> fd;
int idx; // index in the MutationFilesReadProgress::files vector
int64_t offset; // offset of the file to be read
bool eof; // If EOF is seen so far or endVersion is encountered. If true, the file can't be read further.
std::vector<VersionedData> mutations; // Buffered mutations read so far
@ -225,12 +230,14 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
}
void dumpProgress(std::string msg) {
std::cout << msg;
std::cout << msg << "\n ";
for (const auto fp : fileProgress) {
std::cout << fp->fd->getFilename() << " " << fp->mutations.size() << " mutations";
if (fp->mutations.size() > 0) {
std::cout << ", range " << fp->mutations[0].version.toString() << " "
<< fp->mutations.back().version.toString() << "\n";
} else {
std::cout << "\n\n";
}
}
}
@ -256,7 +263,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
fp->mutations.erase(fp->mutations.begin());
if (fp->mutations.empty()) {
// decode one more block
wait(decodeToVersion(fp, /*version=*/0, self->endVersion));
wait(decodeToVersion(fp, /*version=*/0, self->endVersion, self->getLogFile(fp->idx)));
}
if (fp->empty()) {
@ -273,6 +280,8 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
return data;
}
LogFile& getLogFile(int index) { return files[index]; }
Future<Void> openLogFiles(Reference<IBackupContainer> container) { return openLogFilesImpl(this, container); }
// Opens log files in the progress and starts decoding until the beginVersion is seen.
@ -286,10 +295,11 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
// Attempt decode the first few blocks of log files until beginVersion is consumed
std::vector<Future<Void>> fileDecodes;
for (auto& asyncfile : asyncFiles) {
Reference<FileProgress> fp(new FileProgress(asyncfile.get()));
for (int i = 0; i < asyncFiles.size(); i++) {
Reference<FileProgress> fp(new FileProgress(asyncFiles[i].get(), i));
progress->fileProgress.push_back(fp);
fileDecodes.push_back(decodeToVersion(fp, progress->beginVersion, progress->endVersion));
fileDecodes.push_back(
decodeToVersion(fp, progress->beginVersion, progress->endVersion, progress->getLogFile(i)));
}
wait(waitForAll(fileDecodes));
@ -301,35 +311,37 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
// Decodes the file until EOF or an mutation >= minVersion and saves these mutations.
// Skip mutations >= maxVersion.
ACTOR static Future<Void> decodeToVersion(Reference<FileProgress> fileProgress, Version minVersion,
Version maxVersion) {
if (fileProgress->empty()) return Void();
ACTOR static Future<Void> decodeToVersion(Reference<FileProgress> fp, Version minVersion, Version maxVersion,
LogFile file) {
if (fp->empty()) return Void();
if (!fileProgress->mutations.empty() && fileProgress->mutations.back().version.version >= minVersion)
return Void();
if (!fp->mutations.empty() && fp->mutations.back().version.version >= minVersion) return Void();
state int len = (1 << 20);
try {
// Read block by block until we see the minVersion
loop {
// Read block by block until we see the minVersion
state Standalone<StringRef> buf = makeString(len);
state int rLen = wait(fileProgress->fd->read(mutateString(buf), len, fileProgress->offset));
TraceEvent("ReadFile")
.detail("Name", fileProgress->fd->getFilename())
.detail("Len", rLen)
.detail("Offset", fileProgress->offset);
if (rLen == 0) {
fileProgress->eof = true;
state int64_t len = std::min<int64_t>(file.blockSize, file.fileSize - fp->offset);
if (len == 0) {
fp->eof = true;
return Void();
}
if (fileProgress->decodeBlock(buf, rLen, minVersion, maxVersion)) break;
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(fp->fd->read(mutateString(buf), len, fp->offset));
if (len != rLen) throw restore_bad_read();
TraceEvent("ReadFile")
.detail("Name", fp->fd->getFilename())
.detail("Length", rLen)
.detail("Offset", fp->offset);
if (fp->decodeBlock(buf, rLen, minVersion, maxVersion)) break;
}
return Void();
} catch (Error& e) {
TraceEvent(SevWarn, "FileConvertCorruptLogFileBlock")
TraceEvent(SevWarn, "CorruptedLogFileBlock")
.error(e)
.detail("Filename", fileProgress->fd->getFilename())
.detail("BlockOffset", fileProgress->offset)
.detail("Filename", fp->fd->getFilename())
.detail("BlockOffset", fp->offset)
.detail("BlockLen", len);
throw;
}
@ -428,7 +440,7 @@ private:
const uint32_t fileVersion = 2001;
};
ACTOR Future<Void> test_container(ConvertParams params) {
ACTOR Future<Void> convert(ConvertParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state BackupFileList listing = wait(container->dumpFileList());
std::sort(listing.logs.begin(), listing.logs.end());
@ -487,7 +499,7 @@ int parseCommandLine(ConvertParams* param, CSimpleOpt* args) {
break;
default:
fprintf(stderr, "ERROR: argument given for option `%s'\n", args->OptionText());
std::cerr << "ERROR: argument given for option: " << args->OptionText() << "\n";
return FDB_EXIT_ERROR;
break;
}
@ -579,7 +591,7 @@ int main(int argc, char** argv) {
TraceEvent::setNetworkThread();
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "convert", param.trace_log_group);
auto f = stopAfter(test_container(param));
auto f = stopAfter(convert(param));
runNetwork();
return status;