Merge pull request #3036 from jzhou77/backup-cmd
Several bug fixes for new backups
This commit is contained in:
commit
3f510d0653
|
@ -373,17 +373,6 @@ struct LogFileWriter {
|
|||
return wr.toValue();
|
||||
}
|
||||
|
||||
// Return a block of contiguous padding bytes, growing if needed.
|
||||
static Value makePadding(int size) {
|
||||
static Value pad;
|
||||
if (pad.size() < size) {
|
||||
pad = makeString(size);
|
||||
memset(mutateString(pad), '\xff', pad.size());
|
||||
}
|
||||
|
||||
return pad.substr(0, size);
|
||||
}
|
||||
|
||||
// Start a new block if needed, then write the key and value
|
||||
ACTOR static Future<Void> writeKV_impl(LogFileWriter* self, Key k, Value v) {
|
||||
// If key and value do not fit in this block, end it and start a new one
|
||||
|
@ -392,7 +381,7 @@ struct LogFileWriter {
|
|||
// Write padding if needed
|
||||
int bytesLeft = self->blockEnd - self->file->size();
|
||||
if (bytesLeft > 0) {
|
||||
state Value paddingFFs = makePadding(bytesLeft);
|
||||
state Value paddingFFs = fileBackup::makePadding(bytesLeft);
|
||||
wait(self->file->append(paddingFFs.begin(), bytesLeft));
|
||||
}
|
||||
|
||||
|
|
|
@ -278,7 +278,7 @@ public:
|
|||
// parallel restore
|
||||
Future<Void> parallelRestoreFinish(Database cx, UID randomUID);
|
||||
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
KeyRef bcUrl, Version targetVersion, bool lockDB, UID randomUID);
|
||||
Key bcUrl, Version targetVersion, bool lockDB, UID randomUID);
|
||||
Future<Void> atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||
Key addPrefix, Key removePrefix);
|
||||
|
||||
|
@ -933,6 +933,9 @@ struct StringRefReader {
|
|||
namespace fileBackup {
|
||||
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset,
|
||||
int len);
|
||||
|
||||
// Return a block of contiguous padding bytes "\0xff" for backup files, growing if needed.
|
||||
Value makePadding(int size);
|
||||
}
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -1373,7 +1373,7 @@ public:
|
|||
wait(bc->readKeyspaceSnapshot(snapshot.get()));
|
||||
restorable.ranges = std::move(results.first);
|
||||
restorable.keyRanges = std::move(results.second);
|
||||
if (false && g_network->isSimulated()) { // TODO: Reenable sanity check
|
||||
if (g_network->isSimulated()) {
|
||||
// Sanity check key ranges
|
||||
state std::map<std::string, KeyRange>::iterator rit;
|
||||
for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) {
|
||||
|
@ -2097,6 +2097,8 @@ ACTOR Future<Optional<int64_t>> timeKeeperEpochsFromVersion(Version v, Reference
|
|||
return found.first + (v - found.second) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
|
||||
}
|
||||
|
||||
namespace backup_test {
|
||||
|
||||
int chooseFileSize(std::vector<int> &sizes) {
|
||||
int size = 1000;
|
||||
if(!sizes.empty()) {
|
||||
|
@ -2134,7 +2136,30 @@ Version nextVersion(Version v) {
|
|||
return v + increment;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> testBackupContainer(std::string url) {
|
||||
// Write a snapshot file with only begin & end key
|
||||
ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key begin, Key end, uint32_t blockSize) {
|
||||
ASSERT(blockSize > 3 * sizeof(uint32_t) + begin.size() + end.size());
|
||||
|
||||
uint32_t fileVersion = BACKUP_AGENT_SNAPSHOT_FILE_VERSION;
|
||||
// write Header
|
||||
wait(file->append((uint8_t*)&fileVersion, sizeof(fileVersion)));
|
||||
|
||||
// write begin key length and key
|
||||
wait(file->appendStringRefWithLen(begin));
|
||||
|
||||
// write end key length and key
|
||||
wait(file->appendStringRefWithLen(end));
|
||||
|
||||
int bytesLeft = blockSize - file->size();
|
||||
if (bytesLeft > 0) {
|
||||
Value paddings = fileBackup::makePadding(bytesLeft);
|
||||
wait(file->append(paddings.begin(), bytesLeft));
|
||||
}
|
||||
wait(file->finish());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testBackupContainer(std::string url) {
|
||||
printf("BackupContainerTest URL %s\n", url.c_str());
|
||||
|
||||
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url);
|
||||
|
@ -2163,6 +2188,9 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
|
|||
loop {
|
||||
state Version logStart = v;
|
||||
state int kvfiles = deterministicRandom()->randomInt(0, 3);
|
||||
state Key begin = LiteralStringRef("");
|
||||
state Key end = LiteralStringRef("");
|
||||
state int blockSize = 3 * sizeof(uint32_t) + begin.size() + end.size() + 8;
|
||||
|
||||
while(kvfiles > 0) {
|
||||
if(snapshots.empty()) {
|
||||
|
@ -2173,15 +2201,17 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
|
|||
v = nextVersion(v);
|
||||
}
|
||||
}
|
||||
Reference<IBackupFile> range = wait(c->writeRangeFile(snapshots.rbegin()->first, 0, v, 10));
|
||||
Reference<IBackupFile> range = wait(c->writeRangeFile(snapshots.rbegin()->first, 0, v, blockSize));
|
||||
++nRangeFiles;
|
||||
v = nextVersion(v);
|
||||
snapshots.rbegin()->second.push_back(range->getFileName());
|
||||
snapshotBeginEndKeys.rbegin()->second.emplace_back(LiteralStringRef(""), LiteralStringRef(""));
|
||||
snapshotBeginEndKeys.rbegin()->second.emplace_back(begin, end);
|
||||
|
||||
int size = chooseFileSize(fileSizes);
|
||||
snapshotSizes.rbegin()->second += size;
|
||||
writes.push_back(writeAndVerifyFile(c, range, size));
|
||||
// Write in actual range file format, instead of random data.
|
||||
// writes.push_back(writeAndVerifyFile(c, range, size));
|
||||
wait(testWriteSnapshotFile(range, begin, end, blockSize));
|
||||
|
||||
if(deterministicRandom()->random01() < .2) {
|
||||
writes.push_back(c->writeKeyspaceSnapshotFile(
|
||||
|
@ -2377,4 +2407,6 @@ TEST_CASE("/backup/continuous") {
|
|||
ASSERT(BackupContainerFileSystem::getPartitionedLogsContinuousEndVersion(files, 250) == 399);
|
||||
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace backup_test
|
||||
|
|
|
@ -68,6 +68,9 @@ static const uint32_t BACKUP_AGENT_MLOG_VERSION = 2001;
|
|||
// Mutation log version written by BackupWorker
|
||||
static const uint32_t PARTITIONED_MLOG_VERSION = 4110;
|
||||
|
||||
// Snapshot file version written by FileBackupAgent
|
||||
static const uint32_t BACKUP_AGENT_SNAPSHOT_FILE_VERSION = 1001;
|
||||
|
||||
struct LogFile {
|
||||
Version beginVersion;
|
||||
Version endVersion;
|
||||
|
@ -108,12 +111,6 @@ struct RangeFile {
|
|||
std::string fileName;
|
||||
int64_t fileSize;
|
||||
|
||||
RangeFile() {}
|
||||
RangeFile(Version v, uint32_t bSize, std::string name, int64_t size)
|
||||
: version(v), blockSize(bSize), fileName(name), fileSize(size) {}
|
||||
RangeFile(const RangeFile& f)
|
||||
: version(f.version), blockSize(f.blockSize), fileName(f.fileName), fileSize(f.fileSize) {}
|
||||
|
||||
// Order by version, break ties with name
|
||||
bool operator< (const RangeFile &rhs) const {
|
||||
return version == rhs.version ? fileName < rhs.fileName : version < rhs.version;
|
||||
|
|
|
@ -461,7 +461,8 @@ namespace fileBackup {
|
|||
// then the space after the final key to the next 1MB boundary would
|
||||
// just be padding anyway.
|
||||
struct RangeFileWriter {
|
||||
RangeFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0) : file(file), blockSize(blockSize), blockEnd(0), fileVersion(1001) {}
|
||||
RangeFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0)
|
||||
: file(file), blockSize(blockSize), blockEnd(0), fileVersion(BACKUP_AGENT_SNAPSHOT_FILE_VERSION) {}
|
||||
|
||||
// Handles the first block and internal blocks. Ends current block if needed.
|
||||
// The final flag is used in simulation to pad the file's final block to a whole block size
|
||||
|
@ -557,8 +558,8 @@ namespace fileBackup {
|
|||
state StringRefReader reader(buf, restore_corrupted_data());
|
||||
|
||||
try {
|
||||
// Read header, currently only decoding version 1001
|
||||
if(reader.consume<int32_t>() != 1001)
|
||||
// Read header, currently only decoding BACKUP_AGENT_SNAPSHOT_FILE_VERSION
|
||||
if(reader.consume<int32_t>() != BACKUP_AGENT_SNAPSHOT_FILE_VERSION)
|
||||
throw restore_unsupported_file_version();
|
||||
|
||||
// Read begin key, if this fails then block was invalid.
|
||||
|
@ -2406,6 +2407,7 @@ namespace fileBackup {
|
|||
state bool backupWorkerEnabled = dbConfig.backupWorkerEnabled;
|
||||
if (!backupWorkerEnabled) {
|
||||
wait(success(changeConfig(cx, "backup_worker_enabled:=1", true)));
|
||||
backupWorkerEnabled = true;
|
||||
}
|
||||
|
||||
// Set the "backupStartedKey" and wait for all backup worker started
|
||||
|
@ -3626,7 +3628,7 @@ public:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> submitParallelRestore(Database cx, Key backupTag,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, Key bcUrl,
|
||||
Version targetVersion, bool lockDB, UID randomUID) {
|
||||
// Sanity check backup is valid
|
||||
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
|
||||
|
@ -4629,7 +4631,7 @@ Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx, UID randomUID)
|
|||
}
|
||||
|
||||
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, Key bcUrl,
|
||||
Version targetVersion, bool lockDB, UID randomUID) {
|
||||
return FileBackupAgentImpl::submitParallelRestore(cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB,
|
||||
randomUID);
|
||||
|
|
|
@ -571,17 +571,6 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
|
|||
}
|
||||
}
|
||||
|
||||
// Return a block of contiguous padding bytes, growing if needed.
|
||||
static Value makePadding(int size) {
|
||||
static Value pad;
|
||||
if (pad.size() < size) {
|
||||
pad = makeString(size);
|
||||
memset(mutateString(pad), '\xff', pad.size());
|
||||
}
|
||||
|
||||
return pad.substr(0, size);
|
||||
}
|
||||
|
||||
// Write a mutation to a log file. Note the mutation can be different from
|
||||
// message.message for clear mutations.
|
||||
ACTOR Future<Void> addMutation(Reference<IBackupFile> logFile, VersionedMessage message, StringRef mutation,
|
||||
|
@ -602,7 +591,7 @@ ACTOR Future<Void> addMutation(Reference<IBackupFile> logFile, VersionedMessage
|
|||
// Write padding if needed
|
||||
const int bytesLeft = *blockEnd - logFile->size();
|
||||
if (bytesLeft > 0) {
|
||||
state Value paddingFFs = makePadding(bytesLeft);
|
||||
state Value paddingFFs = fileBackup::makePadding(bytesLeft);
|
||||
wait(logFile->append(paddingFFs.begin(), bytesLeft));
|
||||
}
|
||||
|
||||
|
|
|
@ -817,7 +817,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
|
||||
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
|
||||
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
|
||||
state VectorRef<KeyValueRef> blockData;
|
||||
state Standalone<VectorRef<KeyValueRef>> blockData;
|
||||
try {
|
||||
Standalone<VectorRef<KeyValueRef>> kvs =
|
||||
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
|
||||
|
|
|
@ -717,7 +717,7 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
|||
ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersions, RestoreFileFR* file,
|
||||
Reference<IBackupContainer> bc) {
|
||||
TraceEvent("FastRestoreMasterDecodeRangeVersion").detail("File", file->toString());
|
||||
RangeFile rangeFile(file->version, file->blockSize, file->fileName, file->fileSize);
|
||||
RangeFile rangeFile = { file->version, (uint32_t)file->blockSize, file->fileName, file->fileSize };
|
||||
|
||||
// First and last key are the range for this file: endKey is exclusive
|
||||
KeyRange fileRange = wait(bc->getSnapshotFileKeyRange(rangeFile));
|
||||
|
|
Loading…
Reference in New Issue