foundationdb/fdbclient/FileBackupAgent.actor.cpp

3225 lines
130 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* FileBackupAgent.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "BackupAgent.h"
#include "BackupContainer.h"
#include "DatabaseContext.h"
#include "ManagementAPI.h"
#include "Status.h"
#include "KeyBackedTypes.h"
#include <ctime>
#include <climits>
#include "fdbrpc/IAsyncFile.h"
#include "flow/genericactors.actor.h"
#include "flow/Hash3.h"
#include <numeric>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <algorithm>
const Key FileBackupAgent::keyLastRestorable = LiteralStringRef("last_restorable");
2017-05-26 04:48:44 +08:00
// For convenience
typedef FileBackupAgent::ERestoreState ERestoreState;
StringRef FileBackupAgent::restoreStateText(ERestoreState id) {
switch(id) {
case ERestoreState::UNITIALIZED: return LiteralStringRef("unitialized");
case ERestoreState::QUEUED: return LiteralStringRef("queued");
case ERestoreState::STARTING: return LiteralStringRef("starting");
case ERestoreState::RUNNING: return LiteralStringRef("running");
case ERestoreState::COMPLETED: return LiteralStringRef("completed");
case ERestoreState::ABORTED: return LiteralStringRef("aborted");
default: return LiteralStringRef("Unknown");
}
}
template<> Tuple Codec<ERestoreState>::pack(ERestoreState const &val) { return Tuple().append(val); }
template<> ERestoreState Codec<ERestoreState>::unpack(Tuple const &val) { return (ERestoreState)val.getInt(0); }
ACTOR Future<std::vector<KeyBackedTag>> TagUidMap::getAll_impl(TagUidMap *tagsMap, Reference<ReadYourWritesTransaction> tr) {
state Key prefix = tagsMap->prefix; // Copying it here as tagsMap lifetime is not tied to this actor
TagMap::PairsType tagPairs = wait(tagsMap->getRange(tr, std::string(), {}, 1e6));
std::vector<KeyBackedTag> results;
for(auto &p : tagPairs)
results.push_back(KeyBackedTag(p.first, prefix));
return results;
}
2017-05-26 04:48:44 +08:00
KeyBackedTag::KeyBackedTag(std::string tagName, StringRef tagMapPrefix)
: KeyBackedProperty<UidAndAbortedFlagT>(TagUidMap(tagMapPrefix).getProperty(tagName)), tagName(tagName), tagMapPrefix(tagMapPrefix) {}
2017-08-29 02:59:04 +08:00
class RestoreConfig : public KeyBackedConfig {
public:
RestoreConfig(UID uid = UID()) : KeyBackedConfig(fileRestorePrefixRange.begin, uid) {}
RestoreConfig(Reference<Task> task) : KeyBackedConfig(fileRestorePrefixRange.begin, task) {}
2017-05-26 04:48:44 +08:00
KeyBackedProperty<ERestoreState> stateEnum() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
Future<StringRef> stateText(Reference<ReadYourWritesTransaction> tr) {
return map(stateEnum().getD(tr), [](ERestoreState s) -> StringRef { return FileBackupAgent::restoreStateText(s); });
}
KeyBackedProperty<Key> addPrefix() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Key> removePrefix() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<KeyRange> restoreRange() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Key> batchFuture() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Version> restoreVersion() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
KeyBackedProperty<Reference<IBackupContainer>> sourceContainer() {
2017-05-26 04:48:44 +08:00
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Get the source container as a bare URL, without creating a container instance
KeyBackedProperty<Value> sourceContainerURL() {
return configSpace.pack(LiteralStringRef("sourceContainer"));
}
2017-05-26 04:48:44 +08:00
// Total bytes written by all log and range restore tasks.
KeyBackedBinaryValue<int64_t> bytesWritten() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// File blocks that have had tasks created for them by the Dispatch task
KeyBackedBinaryValue<int64_t> filesBlocksDispatched() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// File blocks whose tasks have finished
KeyBackedBinaryValue<int64_t> fileBlocksFinished() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Total number of files in the fileMap
KeyBackedBinaryValue<int64_t> fileCount() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Total number of file blocks in the fileMap
KeyBackedBinaryValue<int64_t> fileBlockCount() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Describes a file to load blocks from during restore. Ordered by version and then fileName to enable
// incrementally advancing through the map, saving the version and path of the next starting point.
struct RestoreFile {
Version version;
std::string fileName;
bool isRange; // false for log file
int64_t blockSize;
int64_t fileSize;
Version endVersion; // not meaningful for range files
Tuple pack() const {
return Tuple()
.append(version)
.append(StringRef(fileName))
.append(isRange)
.append(fileSize)
.append(blockSize)
.append(endVersion);
}
static RestoreFile unpack(Tuple const &t) {
RestoreFile r;
int i = 0;
r.version = t.getInt(i++);
r.fileName = t.getString(i++).toString();
r.isRange = t.getInt(i++) != 0;
r.fileSize = t.getInt(i++);
r.blockSize = t.getInt(i++);
r.endVersion = t.getInt(i++);
return r;
}
};
typedef KeyBackedSet<RestoreFile> FileSetT;
FileSetT fileSet() {
2017-05-26 04:48:44 +08:00
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
Future<bool> isRunnable(Reference<ReadYourWritesTransaction> tr) {
return map(stateEnum().getD(tr), [](ERestoreState s) -> bool { return s != ERestoreState::ABORTED
&& s != ERestoreState::COMPLETED
&& s != ERestoreState::UNITIALIZED;
});
}
Future<Void> logError(Database cx, Error e, std::string const &details, void *taskInstance = nullptr) {
2017-05-26 04:48:44 +08:00
if(!uid.isValid()) {
TraceEvent(SevError, "FileRestoreErrorNoUID").error(e).detail("Description", details);
return Void();
}
TraceEvent t(SevWarn, "FileRestoreError");
t.error(e).detail("RestoreUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
// These should not happen
if(e.code() == error_code_key_not_found)
t.backtrace();
std::string msg = format("ERROR: %s (%s)", details.c_str(), e.what());
2017-09-09 07:09:18 +08:00
return lastError().set(cx, {msg, (int64_t)now()});
2017-05-26 04:48:44 +08:00
}
Key mutationLogPrefix() {
return uidPrefixKey(applyLogKeys.begin, uid);
}
Key applyMutationsMapPrefix() {
return uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
}
ACTOR static Future<int64_t> getApplyVersionLag_impl(Reference<ReadYourWritesTransaction> tr, UID uid) {
// Both of these are snapshot reads
state Future<Optional<Value>> beginVal = tr->get(uidPrefixKey(applyMutationsBeginRange.begin, uid), true);
state Future<Optional<Value>> endVal = tr->get(uidPrefixKey(applyMutationsEndRange.begin, uid), true);
Void _ = wait(success(beginVal) && success(endVal));
if(!beginVal.get().present() || !endVal.get().present())
return 0;
Version beginVersion = BinaryReader::fromStringRef<Version>(beginVal.get().get(), Unversioned());
Version endVersion = BinaryReader::fromStringRef<Version>(endVal.get().get(), Unversioned());
return endVersion - beginVersion;
}
Future<int64_t> getApplyVersionLag(Reference<ReadYourWritesTransaction> tr) {
return getApplyVersionLag_impl(tr, uid);
}
void initApplyMutations(Reference<ReadYourWritesTransaction> tr, Key addPrefix, Key removePrefix) {
// Set these because they have to match the applyMutations values.
this->addPrefix().set(tr, addPrefix);
this->removePrefix().set(tr, removePrefix);
clearApplyMutationsKeys(tr);
// Initialize add/remove prefix, range version map count and set the map's start key to InvalidVersion
tr->set(uidPrefixKey(applyMutationsAddPrefixRange.begin, uid), addPrefix);
tr->set(uidPrefixKey(applyMutationsRemovePrefixRange.begin, uid), removePrefix);
int64_t startCount = 0;
tr->set(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid), StringRef((uint8_t*)&startCount, 8));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->set(mapStart, BinaryWriter::toValue<Version>(invalidVersion, Unversioned()));
}
void clearApplyMutationsKeys(Reference<ReadYourWritesTransaction> tr) {
// Clear add/remove prefix keys
tr->clear(uidPrefixKey(applyMutationsAddPrefixRange.begin, uid));
tr->clear(uidPrefixKey(applyMutationsRemovePrefixRange.begin, uid));
// Clear range version map and count key
tr->clear(uidPrefixKey(applyMutationsKeyVersionCountRange.begin, uid));
Key mapStart = uidPrefixKey(applyMutationsKeyVersionMapRange.begin, uid);
tr->clear(KeyRangeRef(mapStart, strinc(mapStart)));
// Clear any loaded mutations that have not yet been applied
Key mutationPrefix = mutationLogPrefix();
tr->clear(KeyRangeRef(mutationPrefix, strinc(mutationPrefix)));
// Clear end and begin versions (intentionally in this order)
tr->clear(uidPrefixKey(applyMutationsEndRange.begin, uid));
tr->clear(uidPrefixKey(applyMutationsBeginRange.begin, uid));
}
void setApplyBeginVersion(Reference<ReadYourWritesTransaction> tr, Version ver) {
tr->set(uidPrefixKey(applyMutationsBeginRange.begin, uid), BinaryWriter::toValue(ver, Unversioned()));
}
void setApplyEndVersion(Reference<ReadYourWritesTransaction> tr, Version ver) {
tr->set(uidPrefixKey(applyMutationsEndRange.begin, uid), BinaryWriter::toValue(ver, Unversioned()));
}
Future<Version> getApplyEndVersion(Reference<ReadYourWritesTransaction> tr) {
return map(tr->get(uidPrefixKey(applyMutationsEndRange.begin, uid)), [=](Optional<Value> const &value) -> Version {
return value.present() ? BinaryReader::fromStringRef<Version>(value.get(), Unversioned()) : 0;
});
}
static Future<std::string> getProgress_impl(RestoreConfig const &restore, Reference<ReadYourWritesTransaction> const &tr);
Future<std::string> getProgress(Reference<ReadYourWritesTransaction> tr) {
return getProgress_impl(*this, tr);
}
static Future<std::string> getFullStatus_impl(RestoreConfig const &restore, Reference<ReadYourWritesTransaction> const &tr);
Future<std::string> getFullStatus(Reference<ReadYourWritesTransaction> tr) {
return getFullStatus_impl(*this, tr);
}
};
typedef RestoreConfig::RestoreFile RestoreFile;
2017-05-26 04:48:44 +08:00
ACTOR Future<std::string> RestoreConfig::getProgress_impl(RestoreConfig restore, Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<int64_t> fileCount = restore.fileCount().getD(tr);
state Future<int64_t> fileBlockCount = restore.fileBlockCount().getD(tr);
state Future<int64_t> fileBlocksDispatched = restore.filesBlocksDispatched().getD(tr);
state Future<int64_t> fileBlocksFinished = restore.fileBlocksFinished().getD(tr);
state Future<int64_t> bytesWritten = restore.bytesWritten().getD(tr);
state Future<StringRef> status = restore.stateText(tr);
state Future<Version> lag = restore.getApplyVersionLag(tr);
state Future<std::string> tag = restore.tag().getD(tr);
2017-09-09 07:09:18 +08:00
state Future<std::pair<std::string, int64_t>> lastError = restore.lastError().getD(tr);
2017-05-26 04:48:44 +08:00
// restore might no longer be valid after the first wait so make sure it is not needed anymore.
state UID uid = restore.getUid();
Void _ = wait(success(fileCount) && success(fileBlockCount) && success(fileBlocksDispatched) && success(fileBlocksFinished) && success(bytesWritten) && success(status) && success(lag) && success(tag) && success(lastError));
std::string errstr = "None";
if(lastError.get().second != 0)
2017-09-09 07:09:18 +08:00
errstr = format("'%s' %llds ago.\n", lastError.get().first.c_str(), (int64_t)now() - lastError.get().second);
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreProgress")
.detail("UID", uid)
.detail("Tag", tag.get())
2017-05-26 04:48:44 +08:00
.detail("State", status.get().toString())
.detail("FileCount", fileCount.get())
.detail("FileBlocksFinished", fileBlocksFinished.get())
.detail("FileBlocksTotal", fileBlockCount.get())
.detail("FileBlocksInProgress", fileBlocksDispatched.get() - fileBlocksFinished.get())
.detail("BytesWritten", bytesWritten.get())
.detail("ApplyLag", lag.get())
.detail("TaskInstance", (uint64_t)this);
return format("Tag: %s UID: %s State: %s Blocks: %lld/%lld BlocksInProgress: %lld Files: %lld BytesWritten: %lld ApplyVersionLag: %lld LastError: %s",
tag.get().c_str(),
2017-05-26 04:48:44 +08:00
uid.toString().c_str(),
status.get().toString().c_str(),
fileBlocksFinished.get(),
fileBlockCount.get(),
fileBlocksDispatched.get() - fileBlocksFinished.get(),
fileCount.get(),
bytesWritten.get(),
lag.get(),
errstr.c_str()
);
}
ACTOR Future<std::string> RestoreConfig::getFullStatus_impl(RestoreConfig restore, Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<KeyRange> range = restore.restoreRange().getD(tr);
state Future<Key> addPrefix = restore.addPrefix().getD(tr);
state Future<Key> removePrefix = restore.removePrefix().getD(tr);
state Future<Key> url = restore.sourceContainerURL().getD(tr);
2017-05-26 04:48:44 +08:00
state Future<Version> restoreVersion = restore.restoreVersion().getD(tr);
state Future<std::string> progress = restore.getProgress(tr);
// restore might no longer be valid after the first wait so make sure it is not needed anymore.
state UID uid = restore.getUid();
Void _ = wait(success(range) && success(addPrefix) && success(removePrefix) && success(url) && success(restoreVersion) && success(progress));
return format("%s URL: %s Begin: '%s' End: '%s' AddPrefix: '%s' RemovePrefix: '%s' Version: %lld",
progress.get().c_str(),
url.get().toString().c_str(),
printable(range.get().begin).c_str(),
printable(range.get().end).c_str(),
printable(addPrefix.get()).c_str(),
printable(removePrefix.get()).c_str(),
restoreVersion.get()
);
}
FileBackupAgent::FileBackupAgent()
: subspace(Subspace(fileBackupPrefixRange.begin))
// The other subspaces have logUID -> value
, config(subspace.get(BackupAgentBase::keyConfig))
, lastRestorable(subspace.get(FileBackupAgent::keyLastRestorable))
2017-05-26 04:48:44 +08:00
, taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks), true, false, true))
, futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), true, true))
{
}
namespace fileBackup {
// Try to save and extend task repeatedly until it fails or f is ready (or throws)
// In case the task was started or saveAndExtend'd recently, firstSaveAndExtendTimestamp can be used to indicate
// when the first saveAndExtend should be done.
ACTOR static Future<Void> saveAndExtendIncrementally(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task, Future<Void> f, double firstSaveAndExtendTimestamp = 0) {
// delaySeconds is half of the taskBucket task timeout.
state double delaySeconds = 0.5 * taskBucket->getTimeoutSeconds();
state Future<Void> timeout = delayUntil(firstSaveAndExtendTimestamp);
loop {
choose {
when(Void _ = wait(f)) {
break;
}
when(Void _ = wait(timeout)) {
bool keepGoing = wait(taskBucket->saveAndExtend(cx, task));
if(!keepGoing)
throw timed_out();
timeout = delay(delaySeconds);
}
}
}
return Void();
}
2017-05-26 04:48:44 +08:00
// Padding bytes for backup files. The largest padded area that could ever have to be written is
// the size of two 32 bit ints and the largest key size and largest value size. Since CLIENT_KNOBS
// may not be initialized yet a conservative constant is being used.
std::string paddingFFs(128 * 1024, 0xFF);
// File Format handlers.
// Both Range and Log formats are designed to be readable starting at any 1MB boundary
// so they can be read in parallel.
//
// Writer instances must be kept alive while any member actors are in progress.
//
// RangeFileWriter must be used as follows:
// 1 - writeKey(key) the queried key range begin
// 2 - writeKV(k, v) each kv pair to restore
// 3 - writeKey(key) the queried key range end
//
// RangeFileWriter will insert the required padding, header, and extra
// end/begin keys around the 1MB boundaries as needed.
//
// Example:
// The range a-z is queries and returns c-j which covers 3 blocks.
// The client code writes keys in this sequence:
// a c d e f g h i j z
//
// H = header P = padding a...z = keys v = value | = block boundary
//
// Encoded file: H a cv dv ev P | H e ev fv gv hv P | H h hv iv jv z
// Decoded in blocks yields:
// Block 1: range [a, e) with kv pairs cv, dv
// Block 2: range [e, h) with kv pairs ev, fv, gv
// Block 3: range [h, z) with kv pairs hv, iv, jv
//
// NOTE: All blocks except for the final block will have one last
// value which will not be used. This isn't actually a waste since
// if the next KV pair wouldn't fit within the block after the value
// then the space after the final key to the next 1MB boundary would
// just be padding anyway.
struct RangeFileWriter {
RangeFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0) : file(file), blockSize(blockSize), blockEnd(0), fileVersion(1001) {}
2017-05-26 04:48:44 +08:00
// Handles the first block and internal blocks. Ends current block if needed.
ACTOR static Future<Void> newBlock(RangeFileWriter *self, int bytesNeeded) {
// Write padding to finish current block if needed
int bytesLeft = self->blockEnd - self->file->size();
2017-05-26 04:48:44 +08:00
if(bytesLeft > 0) {
Void _ = wait(self->file->append((uint8_t *)paddingFFs.data(), bytesLeft));
2017-05-26 04:48:44 +08:00
}
// Set new blockEnd
self->blockEnd += self->blockSize;
// write Header
Void _ = wait(self->file->append((uint8_t *)&self->fileVersion, sizeof(self->fileVersion)));
2017-05-26 04:48:44 +08:00
// If this is NOT the first block then write duplicate stuff needed from last block
if(self->blockEnd > self->blockSize) {
Void _ = wait(self->file->appendString(self->lastKey));
Void _ = wait(self->file->appendString(self->lastKey));
Void _ = wait(self->file->appendString(self->lastValue));
2017-05-26 04:48:44 +08:00
}
// There must now be room in the current block for bytesNeeded or the block size is too small
if(self->file->size() + bytesNeeded > self->blockEnd)
2017-05-26 04:48:44 +08:00
throw backup_bad_block_size();
return Void();
}
// Ends the current block if necessary based on bytesNeeded.
Future<Void> newBlockIfNeeded(int bytesNeeded) {
if(file->size() + bytesNeeded > blockEnd)
2017-05-26 04:48:44 +08:00
return newBlock(this, bytesNeeded);
return Void();
}
// Start a new block if needed, then write the key and value
ACTOR static Future<Void> writeKV_impl(RangeFileWriter *self, Key k, Value v) {
int toWrite = sizeof(int32_t) + k.size() + sizeof(int32_t) + v.size();
Void _ = wait(self->newBlockIfNeeded(toWrite));
Void _ = wait(self->file->appendString(k));
Void _ = wait(self->file->appendString(v));
2017-05-26 04:48:44 +08:00
self->lastKey = k;
self->lastValue = v;
return Void();
}
Future<Void> writeKV(Key k, Value v) { return writeKV_impl(this, k, v); }
// Write begin key or end key.
ACTOR static Future<Void> writeKey_impl(RangeFileWriter *self, Key k) {
int toWrite = sizeof(uint32_t) + k.size();
2017-05-26 04:48:44 +08:00
Void _ = wait(self->newBlockIfNeeded(toWrite));
Void _ = wait(self->file->appendString(k));
2017-05-26 04:48:44 +08:00
return Void();
}
Future<Void> writeKey(Key k) { return writeKey_impl(this, k); }
Reference<IBackupFile> file;
2017-05-26 04:48:44 +08:00
int blockSize;
private:
int64_t blockEnd;
uint32_t fileVersion;
Key lastKey;
Key lastValue;
};
// Helper class for reading restore data from a buffer and throwing the right errors.
struct StringRefReader {
StringRefReader(StringRef s = StringRef(), Error e = Error()) : rptr(s.begin()), end(s.end()), failure_error(e) {}
// Return remainder of data as a StringRef
StringRef remainder() {
return StringRef(rptr, end - rptr);
}
// Return a pointer to len bytes at the current read position and advance read pos
const uint8_t * consume(unsigned int len) {
if(rptr == end && len != 0)
throw end_of_stream();
const uint8_t *p = rptr;
rptr += len;
if(rptr > end)
throw failure_error;
return p;
}
// Return a T from the current read position and advance read pos
template<typename T> const T consume() {
return *(const T *)consume(sizeof(T));
}
// Functions for consuming big endian (network byte order) integers.
// Consumes a big endian number, swaps it to little endian, and returns it.
const int32_t consumeNetworkInt32() { return (int32_t)bigEndian32((uint32_t)consume< int32_t>());}
const uint32_t consumeNetworkUInt32() { return bigEndian32( consume<uint32_t>());}
bool eof() { return rptr == end; }
const uint8_t *rptr, *end;
Error failure_error;
};
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset, int len) {
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(file->read(mutateString(buf), len, offset));
if(rLen != len)
throw restore_bad_read();
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
state StringRefReader reader(buf, restore_corrupted_data());
try {
// Read header, currently only decoding version 1001
if(reader.consume<int32_t>() != 1001)
throw restore_unsupported_file_version();
// Read begin key, if this fails then block was invalid.
uint32_t kLen = reader.consumeNetworkUInt32();
const uint8_t *k = reader.consume(kLen);
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef()));
// Read kv pairs and end key
while(1) {
// Read a key.
kLen = reader.consumeNetworkUInt32();
k = reader.consume(kLen);
// If eof reached or first value len byte is 0xFF then a valid block end was reached.
if(reader.eof() || *reader.rptr == 0xFF) {
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef()));
break;
}
// Read a value, which must exist or the block is invalid
uint32_t vLen = reader.consumeNetworkUInt32();
const uint8_t *v = reader.consume(vLen);
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef(v, vLen)));
// If eof reached or first byte of next key len is 0xFF then a valid block end was reached.
if(reader.eof() || *reader.rptr == 0xFF)
break;
}
// Make sure any remaining bytes in the block are 0xFF
for(auto b : reader.remainder())
if(b != 0xFF)
throw restore_corrupted_data_padding();
return results;
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreCorruptRangeFileBlock")
2017-05-26 04:48:44 +08:00
.detail("Filename", file->getFilename())
.detail("BlockOffset", offset)
.detail("BlockLen", len)
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset)
.error(e);
throw;
}
}
// Very simple format compared to KeyRange files.
// Header, [Key, Value]... Key len
struct LogFileWriter {
static const std::string &FFs;
LogFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0) : file(file), blockSize(blockSize), blockEnd(0), fileVersion(2001) {}
2017-05-26 04:48:44 +08:00
// Start a new block if needed, then write the key and value
ACTOR static Future<Void> writeKV_impl(LogFileWriter *self, Key k, Value v) {
// If key and value do not fit in this block, end it and start a new one
int toWrite = sizeof(int32_t) + k.size() + sizeof(int32_t) + v.size();
if(self->file->size() + toWrite > self->blockEnd) {
2017-05-26 04:48:44 +08:00
// Write padding if needed
int bytesLeft = self->blockEnd - self->file->size();
2017-05-26 04:48:44 +08:00
if(bytesLeft > 0) {
Void _ = wait(self->file->append((uint8_t *)paddingFFs.data(), bytesLeft));
2017-05-26 04:48:44 +08:00
}
// Set new blockEnd
self->blockEnd += self->blockSize;
// write Header
Void _ = wait(self->file->append((uint8_t *)&self->fileVersion, sizeof(self->fileVersion)));
2017-05-26 04:48:44 +08:00
}
Void _ = wait(self->file->appendString(k));
Void _ = wait(self->file->appendString(v));
2017-05-26 04:48:44 +08:00
// At this point we should be in whatever the current block is or the block size is too small
if(self->file->size() > self->blockEnd)
2017-05-26 04:48:44 +08:00
throw backup_bad_block_size();
return Void();
}
Future<Void> writeKV(Key k, Value v) { return writeKV_impl(this, k, v); }
Reference<IBackupFile> file;
2017-05-26 04:48:44 +08:00
int blockSize;
private:
int64_t blockEnd;
uint32_t fileVersion;
};
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file, int64_t offset, int len) {
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(file->read(mutateString(buf), len, offset));
if(rLen != len)
throw restore_bad_read();
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
state StringRefReader reader(buf, restore_corrupted_data());
try {
// Read header, currently only decoding version 2001
if(reader.consume<int32_t>() != 2001)
throw restore_unsupported_file_version();
// Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte.
while(1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if(reader.eof() || *reader.rptr == 0xFF)
break;
// Read key and value. If anything throws then there is a problem.
uint32_t kLen = reader.consumeNetworkUInt32();
const uint8_t *k = reader.consume(kLen);
uint32_t vLen = reader.consumeNetworkUInt32();
const uint8_t *v = reader.consume(vLen);
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef(v, vLen)));
}
// Make sure any remaining bytes in the block are 0xFF
for(auto b : reader.remainder())
if(b != 0xFF)
throw restore_corrupted_data_padding();
return results;
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreCorruptLogFileBlock")
2017-05-26 04:48:44 +08:00
.detail("Filename", file->getFilename())
.detail("BlockOffset", offset)
.detail("BlockLen", len)
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset)
.error(e);
throw;
}
}
2017-09-09 07:09:18 +08:00
ACTOR Future<Void> checkTaskVersion(Database cx, Reference<Task> task, StringRef name, uint32_t version) {
2017-05-26 04:48:44 +08:00
uint32_t taskVersion = task->getVersion();
if (taskVersion > version) {
2017-09-09 07:09:18 +08:00
state Error err = task_invalid_version();
TraceEvent(SevWarn, "BA_BackupRangeTaskFunc_execute").detail("taskVersion", taskVersion).detail("Name", printable(name)).detail("Version", version);
2017-09-09 07:09:18 +08:00
if (KeyBackedConfig::TaskParams.uid().exists(task)) {
std::string msg = format("%s task version `%lu' is greater than supported version `%lu'", task->params[Task::reservedTaskParamKeyType].toString().c_str(), (unsigned long)taskVersion, (unsigned long)version);
2017-09-09 07:09:18 +08:00
Void _ = wait(BackupConfig(task).logError(cx, err, msg));
}
2017-05-26 04:48:44 +08:00
2017-09-09 07:09:18 +08:00
throw err;
2017-05-26 04:48:44 +08:00
}
return Void();
}
std::function<void(Reference<Task>)> NOP_SETUP_TASK_FN = [](Reference<Task> task) { /* NOP */ };
ACTOR static Future<Key> addBackupTask(StringRef name,
uint32_t version,
Reference<ReadYourWritesTransaction> tr,
Reference<TaskBucket> taskBucket,
TaskCompletionKey completionKey,
BackupConfig config,
Reference<TaskFuture> waitFor = Reference<TaskFuture>(),
std::function<void(Reference<Task>)> setupTaskFn = NOP_SETUP_TASK_FN,
int priority = 0) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Key doneKey = wait(completionKey.get(tr, taskBucket));
state Reference<Task> task(new Task(name, version, doneKey, priority));
// Bind backup config to new task
Void _ = wait(config.toTask(tr, task));
// Set task specific params
setupTaskFn(task);
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
// Backup and Restore taskFunc definitions will inherit from one of the following classes which
// servers to catch and log to the appropriate config any error that execute/finish didn't catch and log.
struct RestoreTaskFuncBase : TaskFuncBase {
virtual Future<Void> handleError(Database cx, Reference<Task> task, Error const &error) {
return RestoreConfig(task).logError(cx, error, format("Task '%s' UID '%s' failed", task->params[Task::reservedTaskParamKeyType].printable().c_str(), task->key.printable().c_str()));
}
};
struct BackupTaskFuncBase : TaskFuncBase {
virtual Future<Void> handleError(Database cx, Reference<Task> task, Error const &error) {
return RestoreConfig(task).logError(cx, error, format("Task '%s' UID '%s' failed", task->params[Task::reservedTaskParamKeyType].printable().c_str(), task->key.printable().c_str()));
}
};
struct BackupRangeTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
2017-09-02 04:50:38 +08:00
static struct {
static TaskParam<Key> beginKey() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Key> endKey() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<bool> addBackupRangeTasks() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
2017-05-26 04:48:44 +08:00
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
// Finish (which flushes/syncs) the file, and then in a single transaction, make some range backup progress durable.
// This means:
// - increment the backup config's range bytes written
// - update the range file map
// - update the task begin key
// - save/extend the task with the new params
// Returns whether or not the caller should continue executing the task.
ACTOR static Future<bool> finishRangeFile(Reference<IBackupFile> file, Database cx, Reference<Task> task, Reference<TaskBucket> taskBucket, KeyRange range, Version version) {
Void _ = wait(file->finish());
2017-05-26 04:48:44 +08:00
// Ignore empty ranges.
if(range.empty())
return true;
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state BackupConfig backup(task);
2017-05-26 04:48:44 +08:00
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Update the start key of the task
Params.beginKey().set(task, range.end);
// Save and extend the task with the new begin parameter
bool keepGoing = wait(taskBucket->saveAndExtend(tr, task));
2017-05-26 04:48:44 +08:00
if(!keepGoing)
return false;
// Update the range bytes written in the backup config
backup.rangeBytesWritten().atomicOp(tr, file->size(), MutationRef::AddValue);
2017-05-26 04:48:44 +08:00
// See if there is already a file for this key which has an earlier begin, update the map if not.
Optional<BackupConfig::RangeSlice> s = wait(backup.rangeFileMap().get(tr, range.end));
if(!s.present() || s.get().begin >= range.begin)
backup.rangeFileMap().set(tr, range.end, {range.begin, version, file->getFileName(), file->size()});
2017-05-26 04:48:44 +08:00
Void _ = wait(tr->commit());
break;
} catch(Error &e) {
Void _ = wait(tr->onError(e));
}
}
return true;
}
ACTOR static Future<Standalone<VectorRef<KeyRef>>> getBlockOfShards(Reference<ReadYourWritesTransaction> tr, Key beginKey, Key endKey, int limit) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Standalone<VectorRef<KeyRef>> results;
Standalone<RangeResultRef> values = wait(tr->getRange(KeyRangeRef(keyAfter(beginKey.withPrefix(keyServersPrefix)), endKey.withPrefix(keyServersPrefix)), limit));
for (auto &s : values) {
KeyRef k = s.key.removePrefix(keyServersPrefix);
results.push_back_deep(results.arena(), k);
}
return results;
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Key begin, Key end, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), int priority = 0) {
Key key = wait(addBackupTask(BackupRangeTaskFunc::name,
BackupRangeTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask),
waitFor,
[=](Reference<Task> task) {
Params.beginKey().set(task, begin);
Params.endKey().set(task, end);
Params.addBackupRangeTasks().set(task, false);
},
priority));
return key;
2017-05-26 04:48:44 +08:00
}
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
Void _ = wait(checkTaskVersion(cx, task, BackupRangeTaskFunc::name, BackupRangeTaskFunc::version));
// Find out if there is a shard boundary in(beginKey, endKey)
Standalone<VectorRef<KeyRef>> keys = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getBlockOfShards(tr, task->params[FileBackupAgent::keyBeginKey], task->params[FileBackupAgent::keyEndKey], 1); }));
if (keys.size() > 0) {
2017-09-02 04:50:38 +08:00
Params.addBackupRangeTasks().set(task, true);
2017-05-26 04:48:44 +08:00
return Void();
}
// Read everything from beginKey to endKey, write it to an output file, run the output file processor, and
// then set on_done. If we are still writing after X seconds, end the output file and insert a new backup_range
// task for the remainder.
state Reference<IBackupFile> outFile;
state Version outVersion = invalidVersion;
2017-05-26 04:48:44 +08:00
2017-09-02 04:50:38 +08:00
state Key beginKey = Params.beginKey().get(task);
state Key endKey = Params.endKey().get(task);
2017-05-26 04:48:44 +08:00
state Key lastKey;
state KeyRange range(KeyRangeRef(beginKey, endKey));
// retrieve kvData
state PromiseStream<RangeResultWithVersion> results;
state Future<Void> rc = readCommitted(cx, results, lock, range, true, true, true);
state RangeFileWriter rangeFile;
// TODO: Try to run forever until the shard is done, but also update the task after each output file is sync'd
// so in case this process dies the next worker to run this task will continue from where it left off.
// This will require some kind of coordination with the saveAndExtendIncrementally going on in parallel with
// this execute() function().
2017-05-26 04:48:44 +08:00
state BackupConfig backup(task);
state Reference<IBackupContainer> bc = wait(backup.backupContainer().getOrThrow(cx));
state bool done = false;
2017-05-26 04:48:44 +08:00
loop{
try{
state RangeResultWithVersion values;
try {
RangeResultWithVersion _values = waitNext(results.getFuture());
values = _values;
lock->release(values.first.expectedSize());
} catch(Error &e) {
if(e.code() == error_code_end_of_stream)
done = true;
else
throw;
}
2017-05-26 04:48:44 +08:00
// If we've seen a new read version OR hit the end of the stream, then if we were writing a file finish it.
if (values.second != outVersion || done) {
2017-05-26 04:48:44 +08:00
if (outFile){
TEST(true); // Backup range task wrote multiple versions
state Key nextKey = keyAfter(lastKey);
Void _ = wait(rangeFile.writeKey(nextKey));
bool keepGoing = wait(finishRangeFile(outFile, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outVersion));
2017-05-26 04:48:44 +08:00
if(!keepGoing)
return Void();
}
if(done)
2017-05-26 04:48:44 +08:00
return Void();
// Start writing a new file
2017-05-26 04:48:44 +08:00
outVersion = values.second;
// block size must be at least large enough for 3 max size keys and 2 max size values + overhead so 250k conservatively.
state int blockSize = BUGGIFY ? g_random->randomInt(250e3, 4e6) : CLIENT_KNOBS->BACKUP_RANGEFILE_BLOCK_SIZE;
Reference<IBackupFile> f = wait(bc->writeRangeFile(outVersion, blockSize));
outFile = f;
2017-05-26 04:48:44 +08:00
// Initialize range file writer and write begin key
rangeFile = RangeFileWriter(outFile, blockSize);
2017-05-26 04:48:44 +08:00
Void _ = wait(rangeFile.writeKey(beginKey));
}
// write kvData to file
state size_t i = 0;
for (; i < values.first.size(); ++i) {
lastKey = values.first[i].key;
Void _ = wait(rangeFile.writeKV(lastKey, values.first[i].value));
}
}
catch (Error &e) {
if (e.code() == error_code_actor_cancelled)
throw;
2017-05-26 04:48:44 +08:00
state Error err = e;
Void _ = wait(backup.logError(cx, err, format("Failed to write to file `%s'", outFile->getFileName().c_str())));
2017-05-26 04:48:44 +08:00
throw err;
}
}
}
ACTOR static Future<Void> startBackupRangeInternal(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task, Reference<TaskFuture> onDone) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-09-02 04:50:38 +08:00
state Key nextKey = Params.beginKey().get(task);
state Standalone<VectorRef<KeyRef>> keys = wait(getBlockOfShards(tr, nextKey, Params.endKey().get(task), CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT));
2017-05-26 04:48:44 +08:00
std::vector<Future<Key>> addTaskVector;
for (int idx = 0; idx < keys.size(); ++idx) {
if (nextKey != keys[idx]) {
addTaskVector.push_back(addTask(tr, taskBucket, task, nextKey, keys[idx], TaskCompletionKey::joinWith(onDone)));
}
nextKey = keys[idx];
}
Void _ = wait(waitForAll(addTaskVector));
2017-09-02 04:50:38 +08:00
if (nextKey != Params.endKey().get(task)) {
2017-05-26 04:48:44 +08:00
// Add task to cover nextKey to the end, using the priority of the current task
2017-09-02 04:50:38 +08:00
Key _ = wait(addTask(tr, taskBucket, task, nextKey, Params.endKey().get(task), TaskCompletionKey::joinWith(onDone), Reference<TaskFuture>(), task->getPriority()));
2017-05-26 04:48:44 +08:00
}
return Void();
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
2017-09-02 04:50:38 +08:00
if (Params.addBackupRangeTasks().get(task)) {
2017-05-26 04:48:44 +08:00
Void _ = wait(startBackupRangeInternal(tr, taskBucket, futureBucket, task, taskFuture));
}
else {
Void _ = wait(taskFuture->set(tr, taskBucket));
}
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
};
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_range");
const uint32_t BackupRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupRangeTaskFunc);
struct FinishFullBackupTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, FinishFullBackupTaskFunc::name, FinishFullBackupTaskFunc::version));
2017-05-26 04:48:44 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state BackupConfig config(task);
2017-05-26 04:48:44 +08:00
// Enable the stop key
state Version readVersion = wait(tr->getReadVersion());
config.stopVersion().set(tr, readVersion);
TraceEvent(SevInfo, "FBA_setStopVersion").detail("stopVersion", readVersion);
2017-05-26 04:48:44 +08:00
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(FinishFullBackupTaskFunc::name,
FinishFullBackupTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask), waitFor));
return key;
2017-05-26 04:48:44 +08:00
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef FinishFullBackupTaskFunc::name = LiteralStringRef("file_finish_full_backup");
const uint32_t FinishFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(FinishFullBackupTaskFunc);
struct BackupLogRangeTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
2017-09-02 04:50:38 +08:00
static struct {
static TaskParam<Version> nextBeginVersion() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<bool> addBackupLogRangeTasks() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<int64_t> fileSize() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Version> endVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
2017-05-26 04:48:44 +08:00
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
Void _ = wait(checkTaskVersion(cx, task, BackupLogRangeTaskFunc::name, BackupLogRangeTaskFunc::version));
2017-09-02 04:50:38 +08:00
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = Params.endVersion().get(task);
2017-05-26 04:48:44 +08:00
state BackupConfig config(task);
state Reference<IBackupContainer> bc;
2017-05-26 04:48:44 +08:00
loop{
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Wait for the read version to pass endVersion
try {
Reference<IBackupContainer> _bc = wait(config.backupContainer().getOrThrow(tr));
bc = _bc;
2017-05-26 04:48:44 +08:00
Version currentVersion = wait(tr->getReadVersion());
if(endVersion < currentVersion)
break;
Void _ = wait(delay(std::max(CLIENT_KNOBS->BACKUP_RANGE_MINWAIT, (double) (endVersion-currentVersion)/CLIENT_KNOBS->CORE_VERSIONSPERSECOND)));
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey());
2017-05-26 04:48:44 +08:00
if (ranges.size() > CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES) {
2017-09-02 04:50:38 +08:00
Params.addBackupLogRangeTasks().set(task, true);
2017-05-26 04:48:44 +08:00
return Void();
}
// Block size must be at least large enough for 1 max size key, 1 max size value, and overhead, so conservatively 125k.
state int blockSize = BUGGIFY ? g_random->randomInt(125e3, 4e6) : CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE;
state Reference<IBackupFile> outFile = wait(bc->writeLogFile(beginVersion, endVersion, blockSize));
state LogFileWriter logFile(outFile, blockSize);
2017-05-26 04:48:44 +08:00
state size_t idx;
state PromiseStream<RangeResultWithVersion> results;
2017-05-26 04:48:44 +08:00
state std::vector<Future<Void>> rc;
for (auto &range : ranges) {
rc.push_back(readCommitted(cx, results, lock, range, false, true, true));
2017-05-26 04:48:44 +08:00
}
state Future<Void> sendEOS = map(errorOr(waitForAll(rc)), [=](ErrorOr<Void> const &result) {
if(result.isError())
results.sendError(result.getError());
else
results.sendError(end_of_stream());
return Void();
});
2017-05-26 04:48:44 +08:00
try {
loop {
state RangeResultWithVersion r = waitNext(results.getFuture());
lock->release(r.first.expectedSize());
2017-05-26 04:48:44 +08:00
state int i = 0;
for (; i < r.first.size(); ++i) {
// Remove the backupLogPrefix + UID bytes from the key
Void _ = wait(logFile.writeKV(r.first[i].key.substr(backupLogPrefixBytes + 16), r.first[i].value));
}
}
} catch (Error &e) {
if(e.code() == error_code_actor_cancelled)
throw;
2017-05-26 04:48:44 +08:00
if (e.code() != error_code_end_of_stream) {
state Error err = e;
Void _ = wait(config.logError(cx, err, format("Failed to write to file `%s'", outFile->getFileName().c_str())));
throw err;
2017-05-26 04:48:44 +08:00
}
}
Void _ = wait(outFile->finish());
Params.fileSize().set(task, outFile->size());
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, Version endVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(BackupLogRangeTaskFunc::name,
BackupLogRangeTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask),
waitFor,
[=](Reference<Task> task) {
Params.beginVersion().set(task, beginVersion);
Params.endVersion().set(task, endVersion);
Params.addBackupLogRangeTasks().set(task, false);
}));
return key;
2017-05-26 04:48:44 +08:00
}
ACTOR static Future<Void> startBackupLogRangeInternal(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task, Reference<TaskFuture> taskFuture, Version beginVersion, Version endVersion ) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
std::vector<Future<Key>> addTaskVector;
int tasks = 0;
for (int64_t vblock = beginVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE - 1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; vblock += CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES) {
Version bv = std::max(beginVersion, vblock * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
if( tasks >= CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT ) {
addTaskVector.push_back(addTask(tr, taskBucket, task, bv, endVersion, TaskCompletionKey::joinWith(taskFuture)));
break;
}
Version ev = std::min(endVersion, (vblock + CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES) * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
addTaskVector.push_back(addTask(tr, taskBucket, task, bv, ev, TaskCompletionKey::joinWith(taskFuture)));
tasks++;
}
Void _ = wait(waitForAll(addTaskVector));
return Void();
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-02 04:50:38 +08:00
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = Params.endVersion().get(task);
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
2017-09-02 04:50:38 +08:00
if(Params.fileSize().exists(task)) {
BackupConfig(task).logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue);
2017-05-26 04:48:44 +08:00
}
2017-09-02 04:50:38 +08:00
if (Params.addBackupLogRangeTasks().get(task)) {
2017-05-26 04:48:44 +08:00
Void _ = wait(startBackupLogRangeInternal(tr, taskBucket, futureBucket, task, taskFuture, beginVersion, endVersion));
endVersion = beginVersion;
}
2017-09-02 04:50:38 +08:00
else if (Params.nextBeginVersion().exists(task)) {
state Version nextVersion = Params.nextBeginVersion().get(task);
Key _ = wait(addTask(tr, taskBucket, task, nextVersion, endVersion, TaskCompletionKey::joinWith(taskFuture)));
2017-05-26 04:48:44 +08:00
endVersion = nextVersion;
} else {
Void _ = wait(taskFuture->set(tr, taskBucket));
}
if(endVersion > beginVersion) {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, task->params[FileBackupAgent::keyConfigLogUid]);
for (auto & rng : ranges)
tr->clear(rng);
}
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
};
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_log_range");
const uint32_t BackupLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
struct BackupLogsTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
2017-09-02 04:50:38 +08:00
static struct {
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
2017-05-26 04:48:44 +08:00
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, BackupLogsTaskFunc::name, BackupLogsTaskFunc::version));
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
state Reference<TaskFuture> allPartsDone;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state BackupConfig config(task);
state Version stopVersionData = wait(config.stopVersion().getD(tr, -1));
2017-05-26 04:48:44 +08:00
2017-09-02 04:50:38 +08:00
state Version beginVersion = Params.beginVersion().get(task);
2017-05-26 04:48:44 +08:00
state Version endVersion = std::max<Version>( tr->getReadVersion().get() + 1, beginVersion + (CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES-1)*CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE );
if(endVersion - beginVersion > g_random->randomInt64(0, CLIENT_KNOBS->BACKUP_VERSION_DELAY)) {
TraceEvent("FBA_BackupLogs").detail("beginVersion", beginVersion).detail("endVersion", endVersion).detail("stopVersionData", stopVersionData);
}
// Only consider stopping, if the stop key is set and less than begin version
if ((stopVersionData > 0) && (stopVersionData < beginVersion)) {
allPartsDone = onDone;
// Updated the stop version
config.stopVersion().set(tr, endVersion);
2017-05-26 04:48:44 +08:00
}
else {
allPartsDone = futureBucket->future(tr);
Key _ = wait(BackupLogsTaskFunc::addTask(tr, taskBucket, task, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
}
Key _ = wait(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::joinWith(allPartsDone)));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(BackupLogsTaskFunc::name,
BackupLogsTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask),
waitFor,
[=](Reference<Task> task) {
Params.beginVersion().set(task, beginVersion);
}));
return key;
2017-05-26 04:48:44 +08:00
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupLogsTaskFunc::name = LiteralStringRef("file_backup_logs");
const uint32_t BackupLogsTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogsTaskFunc);
struct FinishedFullBackupTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, FinishedFullBackupTaskFunc::name, FinishedFullBackupTaskFunc::version));
2017-05-26 04:48:44 +08:00
state BackupConfig backup(task);
2017-09-02 05:39:38 +08:00
state UID uid = backup.getUid();
2017-05-26 04:48:44 +08:00
2017-09-02 05:39:38 +08:00
state Key configPath = uidPrefixKey(logRangesRange.begin, uid);
state Key logsPath = uidPrefixKey(backupLogKeys.begin, uid);
2017-05-26 04:48:44 +08:00
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
2017-05-26 04:48:44 +08:00
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(FinishedFullBackupTaskFunc::name,
FinishedFullBackupTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask), waitFor));
return key;
2017-05-26 04:48:44 +08:00
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef FinishedFullBackupTaskFunc::name = LiteralStringRef("file_finished_full_backup");
const uint32_t FinishedFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(FinishedFullBackupTaskFunc);
struct BackupDiffLogsTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
2017-09-02 04:50:38 +08:00
static struct {
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
2017-05-26 04:48:44 +08:00
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, BackupDiffLogsTaskFunc::name, BackupDiffLogsTaskFunc::version));
2017-05-26 04:48:44 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state BackupConfig config(task);
state std::string tagName = wait(config.tag().getOrThrow(tr));
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
state Reference<TaskFuture> allPartsDone;
state bool stopWhenDone = wait(config.stopWhenDone().getOrThrow(tr));
2017-05-26 04:48:44 +08:00
2017-09-02 04:50:38 +08:00
state Version beginVersion = Params.beginVersion().get(task);
2017-05-26 04:48:44 +08:00
state Version endVersion = std::max<Version>( tr->getReadVersion().get() + 1, beginVersion + (CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES-1)*CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE );
FileBackupAgent().setLastRestorable(tr, KeyRef(tagName), beginVersion);
2017-05-26 04:48:44 +08:00
if(endVersion - beginVersion > g_random->randomInt64(0, CLIENT_KNOBS->BACKUP_VERSION_DELAY)) {
TraceEvent("FBA_DiffLogs").detail("beginVersion", beginVersion).detail("endVersion", endVersion).detail("stopWhenDone", stopWhenDone);
2017-05-26 04:48:44 +08:00
}
if (stopWhenDone) {
2017-05-26 04:48:44 +08:00
allPartsDone = onDone;
config.stopVersion().set(tr, endVersion);
2017-05-26 04:48:44 +08:00
}
else {
allPartsDone = futureBucket->future(tr);
Key _ = wait(BackupDiffLogsTaskFunc::addTask(tr, taskBucket, task, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
}
Key _ = wait(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::joinWith(allPartsDone)));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(BackupDiffLogsTaskFunc::name,
BackupDiffLogsTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask),
waitFor,
[=](Reference<Task> task) {
Params.beginVersion().set(task, beginVersion);
}));
return key;
2017-05-26 04:48:44 +08:00
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupDiffLogsTaskFunc::name = LiteralStringRef("file_backup_diff_logs");
const uint32_t BackupDiffLogsTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupDiffLogsTaskFunc);
struct BackupRestorableTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state BackupConfig config(task);
state Reference<IBackupContainer> bc;
2017-05-26 04:48:44 +08:00
try {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
2017-05-26 04:48:44 +08:00
// Read the entire range file map into memory, then walk it backwards from its last entry to produce a list of non overlapping key range files
state std::map<Key, BackupConfig::RangeSlice> localmap;
state Key startKey;
state int batchSize = BUGGIFY ? 1 : 1000000;
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Reference<IBackupContainer> _bc = wait(config.backupContainer().getOrThrow(tr));
bc = _bc;
2017-05-26 04:48:44 +08:00
bool keepGoing = wait(taskBucket->keepRunning(tr, task));
if(!keepGoing)
return Void();
2017-05-26 04:48:44 +08:00
BackupConfig::RangeFileMapT::PairsType rangeresults = wait(config.rangeFileMap().getRange(tr, startKey, {}, batchSize));
2017-05-26 04:48:44 +08:00
for(auto &p : rangeresults)
localmap.insert(p);
2017-05-26 04:48:44 +08:00
if(rangeresults.size() < batchSize)
break;
startKey = keyAfter(rangeresults.back().first);
} catch(Error &e) {
Void _ = wait(tr->onError(e));
}
2017-05-26 04:48:44 +08:00
}
std::vector<std::string> files;
state Version maxVer = 0;
state Version minVer = std::numeric_limits<Version>::max();
state int64_t totalBytes = 0;
if(!localmap.empty()) {
// Get iterator that points to greatest key, start there.
auto ri = localmap.rbegin();
auto i = (++ri).base();
while(1) {
const BackupConfig::RangeSlice &r = i->second;
// Add file to final file list
files.push_back(r.fileName);
// Update version range seen
if(r.version < minVer)
minVer = r.version;
if(r.version > maxVer)
maxVer = r.version;
// Update total bytes counted.
totalBytes += r.fileSize;
// Jump to file that either ends where this file begins or has the greatest end that is less than
// the begin of this file. In other words find the map key that is <= begin of this file. To do this
// find the first end strictly greater than begin and then back up one.
i = localmap.upper_bound(i->second.begin);
// If we get begin then we're done, there are no more ranges that end at or before the last file's begin
if(i == localmap.begin())
break;
--i;
}
2017-05-26 04:48:44 +08:00
}
Void _ = wait(bc->writeKeyspaceSnapshotFile(files, totalBytes));
2017-05-26 04:48:44 +08:00
TraceEvent(SevInfo, "FileBackupWroteKVManifestFile")
.detail("Container", bc->getURL())
.detail("BeginVersion", maxVer)
.detail("EndVersion", maxVer)
.detail("TotalBytes", totalBytes);
2017-05-26 04:48:44 +08:00
// Now that the manifest is written, the backup is restorable so set last restorable for the tag.
loop {
try {
tr->reset();
bool keepGoing = wait(taskBucket->keepRunning(tr, task));
if(!keepGoing)
return Void();
state std::string tag = wait(config.tag().getOrThrow(tr));
// stopVersion must be set or the current task would not be running
state Version ver = wait(config.stopVersion().getOrThrow(tr));
FileBackupAgent().setLastRestorable(tr, StringRef(tag), ver);
Void _ = wait(tr->commit());
TraceEvent("FileBackupRestorable")
.detail("logUid", config.getUid())
.detail("stopVersion", ver)
.detail("backupContainer", bc->getURL())
.detail("backupTag", tag);
break;
} catch(Error &e) {
Void _ = wait(tr->onError(e));
}
}
} catch(Error &e) {
if(e.code() == error_code_actor_cancelled)
throw;
state Error err = e;
Void _ = wait(config.logError(cx, err, format("Failed to write restorable metadata to `%s'", bc->getURL().c_str())));
throw err;
}
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, BackupRestorableTaskFunc::name, BackupRestorableTaskFunc::version));
2017-05-26 04:48:44 +08:00
state BackupConfig config(task);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
state Version restoreVersion = wait(config.stopVersion().getD(tr, -1));
2017-05-26 04:48:44 +08:00
state bool stopWhenDone = wait(config.stopWhenDone().getOrThrow(tr));
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> allPartsDone;
TraceEvent("FileBackupRestorable").detail("restoreVersion", restoreVersion).detail("differential", stopWhenDone);
2017-05-26 04:48:44 +08:00
// Start the complete task, if differential is not enabled
if (stopWhenDone) {
2017-05-26 04:48:44 +08:00
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()));
}
else { // Start the writing of logs, if differential
config.stateEnum().set(tr, EBackupState::STATE_DIFFERENTIAL);
2017-05-26 04:48:44 +08:00
allPartsDone = futureBucket->future(tr);
Key _ = wait(BackupDiffLogsTaskFunc::addTask(tr, taskBucket, task, restoreVersion, TaskCompletionKey::joinWith(allPartsDone)));
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), allPartsDone));
}
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(BackupRestorableTaskFunc::name,
BackupRestorableTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(parentTask), waitFor));
return key;
2017-05-26 04:48:44 +08:00
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupRestorableTaskFunc::name = LiteralStringRef("file_backup_restorable");
const uint32_t BackupRestorableTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupRestorableTaskFunc);
struct StartFullBackupTaskFunc : BackupTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
2017-09-02 04:50:38 +08:00
static struct {
static TaskParam<Version> beginVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
2017-05-26 04:48:44 +08:00
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
Void _ = wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));
loop{
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Version startVersion = wait(tr->getReadVersion());
2017-09-02 04:50:38 +08:00
Params.beginVersion().set(task, startVersion);
2017-05-26 04:48:44 +08:00
break;
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
return Void();
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state BackupConfig config(task);
2017-09-02 04:50:38 +08:00
state Version beginVersion = Params.beginVersion().get(task);
2017-05-26 04:48:44 +08:00
state std::vector<KeyRange> backupRanges = wait(config.backupRanges().getOrThrow(tr));
2017-05-26 04:48:44 +08:00
// Start logging the mutations for the specified ranges of the tag
for (auto &backupRange : backupRanges) {
2017-09-06 00:42:14 +08:00
config.startMutationLogs(tr, backupRange);
2017-05-26 04:48:44 +08:00
}
config.stateEnum().set(tr, EBackupState::STATE_BACKUP);
2017-05-26 04:48:44 +08:00
state Reference<TaskFuture> kvBackupRangeComplete = futureBucket->future(tr);
state Reference<TaskFuture> kvBackupComplete = futureBucket->future(tr);
state int rangeCount = 0;
for (; rangeCount < backupRanges.size(); ++rangeCount) {
// Add the initial range task as high priority.
Key _ = wait(BackupRangeTaskFunc::addTask(tr, taskBucket, task, backupRanges[rangeCount].begin, backupRanges[rangeCount].end, TaskCompletionKey::joinWith(kvBackupRangeComplete), Reference<TaskFuture>(), 1));
}
// After the BackupRangeTask completes, set the stop key which will stop the BackupLogsTask
Key _ = wait(FinishFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), kvBackupRangeComplete));
// Backup the logs which will create BackupLogRange tasks
Key _ = wait(BackupLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::joinWith(kvBackupComplete)));
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait(BackupRestorableTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), kvBackupComplete));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
2017-09-09 07:09:18 +08:00
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID uid, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>())
2017-05-26 04:48:44 +08:00
{
Key key = wait(addBackupTask(StartFullBackupTaskFunc::name,
StartFullBackupTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(uid), waitFor));
return key;
2017-05-26 04:48:44 +08:00
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_start_full_backup");
const uint32_t StartFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
struct RestoreCompleteTaskFunc : RestoreTaskFuncBase {
2017-05-26 04:48:44 +08:00
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
2017-05-26 04:48:44 +08:00
state RestoreConfig restore(task);
restore.stateEnum().set(tr, ERestoreState::COMPLETED);
// Clear the file map now since it could be huge.
restore.fileSet().clear(tr);
2017-05-26 04:48:44 +08:00
// TODO: Validate that the range version map has exactly the restored ranges in it. This means that for any restore operation
// the ranges to restore must be within the backed up ranges, otherwise from the restore perspective it will appear that some
// key ranges were missing and so the backup set is incomplete and the restore has failed.
// This validation cannot be done currently because Restore only supports a single restore range but backups can have many ranges.
// Clear the applyMutations stuff, including any unapplied mutations from versions beyond the restored version.
restore.clearApplyMutationsKeys(tr);
Void _ = wait(taskBucket->finish(tr, task));
Void _ = wait(unlockDatabase(tr, restore.getUid()));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key doneKey = wait(completionKey.get(tr, taskBucket));
state Reference<Task> task(new Task(RestoreCompleteTaskFunc::name, RestoreCompleteTaskFunc::version, doneKey));
// Get restore config from parent task and bind it to new task
Void _ = wait(RestoreConfig(parentTask).toTask(tr, task));
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreCompleteTaskFunc::name = LiteralStringRef("restore_complete");
const uint32_t RestoreCompleteTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreCompleteTaskFunc);
struct RestoreFileTaskFuncBase : RestoreTaskFuncBase {
struct InputParams {
static TaskParam<RestoreFile> inputFile() { return LiteralStringRef(__FUNCTION__); }
static TaskParam<int64_t> readOffset() { return LiteralStringRef(__FUNCTION__); }
static TaskParam<int64_t> readLen() { return LiteralStringRef(__FUNCTION__); }
} Params;
virtual Future<Void> handleError(Database cx, Reference<Task> task, Error const &error) {
std::string msg = format("%s error on fileName:%s readLen: %lld readOffset: %lld",
getName().toString().c_str(),
Params.inputFile().get(task).fileName.c_str(),
Params.readLen().get(task),
Params.readOffset().get(task));
return RestoreConfig(task).logError(cx, error, msg, this);
}
};
struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
static struct : InputParams {
// The range of data that the (possibly empty) data represented, which is set if it intersects the target restore range
2017-05-26 04:48:44 +08:00
static TaskParam<KeyRange> originalFileRange() { return LiteralStringRef(__FUNCTION__); }
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state double startTime = now();
state RestoreConfig restore(task);
state RestoreFile rangeFile = Params.inputFile().get(task);
state int64_t readOffset = Params.readOffset().get(task);
state int64_t readLen = Params.readLen().get(task);
TraceEvent("FileRestoreRangeStart")
.detail("UID", restore.getUid())
.detail("FileName", rangeFile.fileName)
.detail("FileVersion", rangeFile.version)
.detail("FileSize", rangeFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("TaskInstance", (uint64_t)this);
state Reference<ReadYourWritesTransaction> tr( new ReadYourWritesTransaction(cx) );
state Future<Reference<IBackupContainer>> bc;
state Future<KeyRange> restoreRange;
state Future<Key> addPrefix;
state Future<Key> removePrefix;
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-05-26 04:48:44 +08:00
bc = restore.sourceContainer().getOrThrow(tr);
restoreRange = restore.restoreRange().getD(tr);
addPrefix = restore.addPrefix().getD(tr);
removePrefix = restore.removePrefix().getD(tr);
2017-05-26 04:48:44 +08:00
Void _ = wait(success(bc) && success(restoreRange) && success(addPrefix) && success(removePrefix) && checkTaskVersion(tr->getDatabase(), task, name, version));
bool go = wait(taskBucket->keepRunning(tr, task));
if(!go)
return Void();
break;
2017-05-26 04:48:44 +08:00
} catch(Error &e) {
Void _ = wait(tr->onError(e));
2017-05-26 04:48:44 +08:00
}
}
2017-05-26 04:48:44 +08:00
state Reference<IAsyncFile> inFile = wait(bc.get()->readFile(rangeFile.fileName));
state Standalone<VectorRef<KeyValueRef>> blockData = wait(decodeRangeFileBlock(inFile, readOffset, readLen));
2017-05-26 04:48:44 +08:00
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
2017-05-26 04:48:44 +08:00
// If fileRange doesn't intersect restore range then we're done.
if(!fileRange.intersects(restoreRange.get()))
return Void();
2017-05-26 04:48:44 +08:00
// We know the file range intersects the restore range but there could still be keys outside the restore range.
// Find the subvector of kv pairs that intersect the restore range. Note that the first and last keys are just the range endpoints for this file
int rangeStart = 1;
int rangeEnd = blockData.size() - 1;
// Slide start forward, stop if something in range is found
while(rangeStart < rangeEnd && !restoreRange.get().contains(blockData[rangeStart].key))
++rangeStart;
// Side end backward, stop if something in range is found
while(rangeEnd > rangeStart && !restoreRange.get().contains(blockData[rangeEnd - 1].key))
--rangeEnd;
state VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
// Shrink file range to be entirely within restoreRange and translate it to the new prefix
// First, use the untranslated file range to create the shrunk original file range which must be used in the kv range version map for applying mutations
state KeyRange originalFileRange = KeyRangeRef(std::max(fileRange.begin, restoreRange.get().begin), std::min(fileRange.end, restoreRange.get().end));
Params.originalFileRange().set(task, originalFileRange);
// Now shrink and translate fileRange
Key fileEnd = std::min(fileRange.end, restoreRange.get().end);
if(fileEnd == (removePrefix.get() == StringRef() ? normalKeys.end : strinc(removePrefix.get())) ) {
fileEnd = addPrefix.get() == StringRef() ? normalKeys.end : strinc(addPrefix.get());
} else {
fileEnd = fileEnd.removePrefix(removePrefix.get()).withPrefix(addPrefix.get());
}
fileRange = KeyRangeRef(std::max(fileRange.begin, restoreRange.get().begin).removePrefix(removePrefix.get()).withPrefix(addPrefix.get()),fileEnd);
2017-05-26 04:48:44 +08:00
state int start = 0;
state int end = data.size();
state int dataSizeLimit = BUGGIFY ? g_random->randomInt(256 * 1024, 10e6) : CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE;
2017-05-26 04:48:44 +08:00
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-05-26 04:48:44 +08:00
state int i = start;
state int txBytes = 0;
state int iend = start;
2017-05-26 04:48:44 +08:00
// find iend that results in the desired transaction size
for(; iend < end && txBytes < dataSizeLimit; ++iend) {
txBytes += data[iend].key.expectedSize();
txBytes += data[iend].value.expectedSize();
}
2017-05-26 04:48:44 +08:00
// Clear the range we are about to set.
// If start == 0 then use fileBegin for the start of the range, else data[start]
// If iend == end then use fileEnd for the end of the range, else data[iend]
state KeyRange trRange = KeyRangeRef((start == 0 ) ? fileRange.begin : data[start].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get())
, (iend == end) ? fileRange.end : data[iend ].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get()));
2017-05-26 04:48:44 +08:00
tr->clear(trRange);
2017-05-26 04:48:44 +08:00
for(; i < iend; ++i) {
tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
tr->set(data[i].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get()), data[i].value);
}
2017-05-26 04:48:44 +08:00
// Add to bytes written count
restore.bytesWritten().atomicOp(tr, txBytes, MutationRef::Type::AddValue);
2017-05-26 04:48:44 +08:00
state Future<Void> checkLock = checkDatabaseLock(tr, restore.getUid());
2017-05-26 04:48:44 +08:00
// Save and extend this task periodically in case database is slow, which checks keepRunning, or check it directly.
Future<bool> goFuture;
if(now() - startTime > 30) {
// TODO: Optionally, task params could be modified here to save progress so far.
startTime = now();
goFuture = taskBucket->saveAndExtend(tr, task);
}
else
goFuture = taskBucket->keepRunning(tr, task);
2017-05-26 04:48:44 +08:00
bool go = wait(goFuture);
if(!go)
return Void();
2017-05-26 04:48:44 +08:00
Void _ = wait( checkLock );
2017-05-26 04:48:44 +08:00
Void _ = wait(tr->commit());
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreCommittedRange")
.detail("UID", restore.getUid())
.detail("FileName", rangeFile.fileName)
.detail("FileVersion", rangeFile.version)
.detail("FileSize", rangeFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("CommitVersion", tr->getCommittedVersion())
.detail("BeginRange", printable(trRange.begin))
.detail("EndRange", printable(trRange.end))
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.detail("OriginalFileRange", printable(originalFileRange))
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
// Commit succeeded, so advance starting point
start = i;
2017-05-26 04:48:44 +08:00
if(start == end)
return Void();
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreErrorRangeWrite")
.detail("UID", restore.getUid())
.detail("FileName", rangeFile.fileName)
.detail("FileVersion", rangeFile.version)
.detail("FileSize", rangeFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("BeginRange", printable(trRange.begin))
.detail("EndRange", printable(trRange.end))
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.error(e)
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
if(e.code() == error_code_transaction_too_large)
dataSizeLimit /= 2;
else
Void _ = wait(tr->onError(e));
2017-05-26 04:48:44 +08:00
}
}
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state RestoreConfig restore(task);
restore.fileBlocksFinished().atomicOp(tr, 1, MutationRef::Type::AddValue);
// Update the KV range map if originalFileRange is set
Future<Void> updateMap = Void();
if(Params.originalFileRange().exists(task)) {
Value versionEncoded = BinaryWriter::toValue(Params.inputFile().get(task).version, Unversioned());
2017-05-26 04:48:44 +08:00
updateMap = krmSetRange(tr, restore.applyMutationsMapPrefix(), Params.originalFileRange().get(task), versionEncoded);
}
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
Void _ = wait(taskFuture->set(tr, taskBucket) &&
taskBucket->finish(tr, task) && updateMap);
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, RestoreFile rf, int64_t offset, int64_t len, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
2017-05-26 04:48:44 +08:00
Key doneKey = wait(completionKey.get(tr, taskBucket));
state Reference<Task> task(new Task(RestoreRangeTaskFunc::name, RestoreRangeTaskFunc::version, doneKey));
// Create a restore config from the current task and bind it to the new task.
Void _ = wait(RestoreConfig(parentTask).toTask(tr, task));
Params.inputFile().set(task, rf);
Params.readOffset().set(task, offset);
Params.readLen().set(task, len);
2017-05-26 04:48:44 +08:00
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreRangeTaskFunc::name = LiteralStringRef("restore_range_data");
const uint32_t RestoreRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreRangeTaskFunc);
struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static struct : InputParams {
2017-05-26 04:48:44 +08:00
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state double startTime = now();
state RestoreConfig restore(task);
state RestoreFile logFile = Params.inputFile().get(task);
state int64_t readOffset = Params.readOffset().get(task);
state int64_t readLen = Params.readLen().get(task);
TraceEvent("FileRestoreLogStart")
.detail("UID", restore.getUid())
.detail("FileName", logFile.fileName)
.detail("FileBeginVersion", logFile.version)
.detail("FileEndVersion", logFile.endVersion)
.detail("FileSize", logFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("TaskInstance", (uint64_t)this);
state Reference<ReadYourWritesTransaction> tr( new ReadYourWritesTransaction(cx) );
state Reference<IBackupContainer> bc;
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-05-26 04:48:44 +08:00
Reference<IBackupContainer> _bc = wait(restore.sourceContainer().getOrThrow(tr));
bc = _bc;
2017-05-26 04:48:44 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
bool go = wait(taskBucket->keepRunning(tr, task));
if(!go)
return Void();
2017-05-26 04:48:44 +08:00
break;
} catch(Error &e) {
Void _ = wait(tr->onError(e));
2017-05-26 04:48:44 +08:00
}
}
2017-05-26 04:48:44 +08:00
state Key mutationLogPrefix = restore.mutationLogPrefix();
state Reference<IAsyncFile> inFile = wait(bc->readFile(logFile.fileName));
state Standalone<VectorRef<KeyValueRef>> data = wait(decodeLogFileBlock(inFile, readOffset, readLen));
2017-05-26 04:48:44 +08:00
state int start = 0;
state int end = data.size();
state int dataSizeLimit = BUGGIFY ? g_random->randomInt(256 * 1024, 10e6) : CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE;
2017-05-26 04:48:44 +08:00
loop {
try {
if(start == end)
return Void();
2017-05-26 04:48:44 +08:00
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-05-26 04:48:44 +08:00
state int i = start;
state int txBytes = 0;
for(; i < end && txBytes < dataSizeLimit; ++i) {
Key k = data[i].key.withPrefix(mutationLogPrefix);
ValueRef v = data[i].value;
tr->set(k, v);
txBytes += k.expectedSize();
txBytes += v.expectedSize();
}
2017-05-26 04:48:44 +08:00
state Future<Void> checkLock = checkDatabaseLock(tr, restore.getUid());
2017-05-26 04:48:44 +08:00
// Save and extend this task periodically in case database is slow, which checks keepRunning, or check it directly.
Future<bool> goFuture;
if(now() - startTime > 30) {
// TODO: Optionally, task params could be modified here to save progress so far.
startTime = now();
goFuture = taskBucket->saveAndExtend(tr, task);
}
else
goFuture = taskBucket->keepRunning(tr, task);
2017-05-26 04:48:44 +08:00
bool go = wait(goFuture);
if(!go)
return Void();
2017-05-26 04:48:44 +08:00
Void _ = wait( checkLock );
2017-05-26 04:48:44 +08:00
// Add to bytes written count
restore.bytesWritten().atomicOp(tr, txBytes, MutationRef::Type::AddValue);
2017-05-26 04:48:44 +08:00
Void _ = wait(tr->commit());
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreCommittedLog")
.detail("UID", restore.getUid())
.detail("FileName", logFile.fileName)
.detail("FileBeginVersion", logFile.version)
.detail("FileEndVersion", logFile.endVersion)
.detail("FileSize", logFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("CommitVersion", tr->getCommittedVersion())
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
// Commit succeeded, so advance starting point
start = i;
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreErrorLogWrite")
.detail("UID", restore.getUid())
.detail("FileName", logFile.fileName)
.detail("FileBeginVersion", logFile.version)
.detail("FileEndVersion", logFile.endVersion)
.detail("FileSize", logFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.error(e)
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
if(e.code() == error_code_transaction_too_large)
dataSizeLimit /= 2;
else
Void _ = wait(tr->onError(e));
2017-05-26 04:48:44 +08:00
}
}
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
RestoreConfig(task).fileBlocksFinished().atomicOp(tr, 1, MutationRef::Type::AddValue);
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
// TODO: Check to see if there is a leak in the FutureBucket since an invalid task (validation key fails) will never set its taskFuture.
Void _ = wait(taskFuture->set(tr, taskBucket) &&
taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, RestoreFile lf, int64_t offset, int64_t len, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
2017-05-26 04:48:44 +08:00
Key doneKey = wait(completionKey.get(tr, taskBucket));
state Reference<Task> task(new Task(RestoreLogDataTaskFunc::name, RestoreLogDataTaskFunc::version, doneKey));
// Create a restore config from the current task and bind it to the new task.
Void _ = wait(RestoreConfig(parentTask).toTask(tr, task));
Params.inputFile().set(task, lf);
Params.readOffset().set(task, offset);
Params.readLen().set(task, len);
2017-05-26 04:48:44 +08:00
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreLogDataTaskFunc::name = LiteralStringRef("restore_log_data");
const uint32_t RestoreLogDataTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreLogDataTaskFunc);
struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static struct {
static TaskParam<Version> beginVersion() { return LiteralStringRef(__FUNCTION__); }
static TaskParam<std::string> beginFile() { return LiteralStringRef(__FUNCTION__); }
2017-05-26 04:48:44 +08:00
static TaskParam<int64_t> beginBlock() { return LiteralStringRef(__FUNCTION__); }
static TaskParam<int64_t> batchSize() { return LiteralStringRef(__FUNCTION__); }
static TaskParam<int64_t> remainingInBatch() { return LiteralStringRef(__FUNCTION__); }
} Params;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state RestoreConfig restore(task);
state Version beginVersion = Params.beginVersion().get(task);
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
2017-05-26 04:48:44 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
state int64_t remainingInBatch = Params.remainingInBatch().get(task);
state bool addingToExistingBatch = remainingInBatch > 0;
2017-05-26 04:48:44 +08:00
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
// previous batch can be applied.
if(!addingToExistingBatch)
restore.setApplyEndVersion(tr, beginVersion);
2017-05-26 04:48:44 +08:00
// Get the apply version lag
state int64_t applyLag = wait(restore.getApplyVersionLag(tr));
state int64_t batchSize = Params.batchSize().get(task);
2017-05-26 04:48:44 +08:00
// If starting a new batch and the apply lag is too large then re-queue and wait
if(!addingToExistingBatch && applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) {
// Wait a small amount of time and then re-add this same task.
Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize, remainingInBatch));
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "too_far_behind")
.detail("TaskInstance", (uint64_t)this);
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
state std::string beginFile = Params.beginFile().getOrDefault(task);
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks).
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE));
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;
// If adding to existing batch then join the new block tasks to the existing batch future
if(addingToExistingBatch) {
Key fKey = wait(restore.batchFuture().getD(tr));
allPartsDone = Reference<TaskFuture>(new TaskFuture(futureBucket, fKey));
}
else {
// Otherwise create a new future for the new batch
allPartsDone = futureBucket->future(tr);
restore.batchFuture().set(tr, allPartsDone->pack());
// Set batch quota remaining to batch size
remainingInBatch = batchSize;
}
// If there were no files to load then this batch is done and restore is almost done.
if(files.size() == 0) {
state Version restoreVersion = wait(restore.restoreVersion().getD(tr));
// If adding to existing batch then blocks could be in progress so create a new Dispatch task that waits for them to finish
if(addingToExistingBatch) {
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
2017-05-26 04:48:44 +08:00
.detail("ApplyLag", applyLag)
.detail("Decision", "end_of_final_batch")
2017-05-26 04:48:44 +08:00
.detail("TaskInstance", (uint64_t)this);
}
else if(beginVersion < restoreVersion) {
// If beginVersion is less than restoreVersion then do one more dispatch task to get there
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize));
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_to_restore_version")
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
}
else if(applyLag == 0) {
// If apply lag is 0 then we are done so create the completion task
Key _ = wait(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()));
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("ApplyLag", applyLag)
.detail("Decision", "restore_complete")
.detail("TaskInstance", (uint64_t)this);
} else {
// Applying of mutations is not yet finished so wait a small amount of time and then re-add this same task.
Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize));
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_still_behind")
.detail("TaskInstance", (uint64_t)this);
2017-05-26 04:48:44 +08:00
}
// If adding to existing batch then task is joined with a batch future so set done future
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
2017-05-26 04:48:44 +08:00
Void _ = wait(taskBucket->finish(tr, task) && setDone);
return Void();
}
2017-05-26 04:48:44 +08:00
// Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE blocks per Dispatch task
// and target batchSize total per batch but a batch must end on a complete version boundary so exceed the limit if necessary
// to reach the end of a version of files.
state std::vector<Future<Key>> addTaskFutures;
state Version endVersion = files[0].version;
state int blocksDispatched = 0;
state int64_t beginBlock = Params.beginBlock().getOrDefault(task);
state int i = 0;
for(; i < files.size(); ++i) {
RestoreConfig::RestoreFile &f = files[i];
// Here we are "between versions" (prior to adding the first block of the first file of a new version) so this is an opportunity
// to end the current dispatch batch (which must end on a version boundary) if the batch size has been reached or exceeded
if(f.version != endVersion && remainingInBatch <= 0) {
// Next start will be at the first version after endVersion at the first file first block
++endVersion;
beginFile = "";
beginBlock = 0;
break;
}
2017-05-26 04:48:44 +08:00
// Set the starting point for the next task in case we stop inside this file
endVersion = f.version;
beginFile = f.fileName;
2017-05-26 04:48:44 +08:00
state int64_t j = beginBlock * f.blockSize;
// For each block of the file
for(; j < f.fileSize; j += f.blockSize) {
2017-05-26 04:48:44 +08:00
// Stop if we've reached the addtask limit
if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)
break;
if(f.isRange) {
addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
TaskCompletionKey::joinWith(allPartsDone)));
}
else {
addTaskFutures.push_back(RestoreLogDataTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
TaskCompletionKey::joinWith(allPartsDone)));
2017-05-26 04:48:44 +08:00
}
// Increment beginBlock for the file and total blocks dispatched for this task
++beginBlock;
++blocksDispatched;
--remainingInBatch;
2017-05-26 04:48:44 +08:00
}
// Stop if we've reached the addtask limit
if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)
break;
2017-05-26 04:48:44 +08:00
// We just completed an entire file so the next task should start at the file after this one within endVersion (or later)
// if this iteration ends up being the last for this task
beginFile = beginFile + '\x00';
beginBlock = 0;
2017-05-26 04:48:44 +08:00
//TraceEvent("FileRestoreDispatchedFile").detail("UID", restore.getUid()).detail("FileName", fi.filename).detail("TaskInstance", (uint64_t)this);
}
2017-05-26 04:48:44 +08:00
// If no blocks were dispatched then the next dispatch task should run now and be joined with the allPartsDone future
if(blocksDispatched == 0) {
std::string decision;
2017-05-26 04:48:44 +08:00
// If no files were dispatched either then the batch size wasn't large enough to catch all of the files at the next lowest non-dispatched
// version, so increase the batch size.
if(i == 0) {
batchSize *= 2;
decision = "increased_batch_size";
}
else
decision = "all_files_were_empty";
2017-05-26 04:48:44 +08:00
TraceEvent("FileRestoreDispatch")
.detail("UID", restore.getUid())
2017-05-26 04:48:44 +08:00
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
2017-05-26 04:48:44 +08:00
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", decision)
2017-05-26 04:48:44 +08:00
.detail("TaskInstance", (uint64_t)this)
.detail("RemainingInBatch", remainingInBatch);
Void _ = wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith((allPartsDone)))));
// If adding to existing batch then task is joined with a batch future so set done future.
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
Void _ = wait(setDone && taskBucket->finish(tr, task));
return Void();
2017-05-26 04:48:44 +08:00
}
// If adding to existing batch then task is joined with a batch future so set done future.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
// Increment the number of blocks dispatched in the restore config
restore.filesBlocksDispatched().atomicOp(tr, blocksDispatched, MutationRef::Type::AddValue);
// If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we cannot end
// the batch here because we do not know if we got all of the files and blocks from the last version queued, so
// make sure remainingInBatch is at least 1.
if(beginFile.size() != 0)
remainingInBatch = std::max<int64_t>(1, remainingInBatch);
// If more blocks need to be dispatched in this batch then add a follow-on task that is part of the allPartsDone group which will won't wait
// to run and will add more block tasks.
if(remainingInBatch > 0)
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith(allPartsDone)));
else // Otherwise, add a follow-on task to continue after all previously dispatched blocks are done
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
Void _ = wait(setDone && waitForAll(addTaskFutures) && taskBucket->finish(tr, task));
TraceEvent("FileRestoreDispatch")
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "dispatched_files")
.detail("FilesDispatched", i)
.detail("BlocksDispatched", blocksDispatched)
.detail("TaskInstance", (uint64_t)this)
.detail("RemainingInBatch", remainingInBatch);
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, std::string beginFile, int64_t beginBlock, int64_t batchSize, int64_t remainingInBatch = 0, TaskCompletionKey completionKey = TaskCompletionKey::noSignal(), Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
2017-05-26 04:48:44 +08:00
Key doneKey = wait(completionKey.get(tr, taskBucket));
// Use high priority for dispatch tasks that have to queue more blocks for the current batch
unsigned int priority = (remainingInBatch > 0) ? 1 : 0;
2017-11-07 04:59:00 +08:00
state Reference<Task> task(new Task(RestoreDispatchTaskFunc::name, RestoreDispatchTaskFunc::version, doneKey, priority));
2017-05-26 04:48:44 +08:00
2017-11-07 04:59:00 +08:00
// Create a config from the parent task and bind it to the new task
2017-05-26 04:48:44 +08:00
Void _ = wait(RestoreConfig(parentTask).toTask(tr, task));
Params.beginVersion().set(task, beginVersion);
Params.batchSize().set(task, batchSize);
Params.remainingInBatch().set(task, remainingInBatch);
Params.beginBlock().set(task, beginBlock);
Params.beginFile().set(task, beginFile);
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreDispatchTaskFunc::name = LiteralStringRef("restore_dispatch");
const uint32_t RestoreDispatchTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreDispatchTaskFunc);
ACTOR Future<std::string> restoreStatus(Reference<ReadYourWritesTransaction> tr, Key tagName) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<KeyBackedTag> tags;
2017-05-26 04:48:44 +08:00
if(tagName.size() == 0) {
std::vector<KeyBackedTag> t = wait(getAllRestoreTags(tr));
2017-05-26 04:48:44 +08:00
tags = t;
}
else
tags.push_back(makeRestoreTag(tagName.toString()));
2017-05-26 04:48:44 +08:00
state std::string result;
state int i = 0;
for(; i < tags.size(); ++i) {
UidAndAbortedFlagT u = wait(tags[i].getD(tr));
std::string s = wait(RestoreConfig(u.first).getFullStatus(tr));
result.append(s);
result.append("\n\n");
}
return result;
}
ACTOR Future<ERestoreState> abortRestore(Reference<ReadYourWritesTransaction> tr, Key tagName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeRestoreTag(tagName.toString());
2017-05-26 04:48:44 +08:00
state Optional<UidAndAbortedFlagT> current = wait(tag.get(tr));
if(!current.present())
return ERestoreState::UNITIALIZED;
state RestoreConfig restore(current.get().first);
state ERestoreState status = wait(restore.stateEnum().getD(tr));
state bool runnable = wait(restore.isRunnable(tr));
if (!runnable)
return status;
restore.stateEnum().set(tr, ERestoreState::ABORTED);
// Clear all of the ApplyMutations stuff
restore.clearApplyMutationsKeys(tr);
// Cancel the backup tasks on this tag
Void _ = wait(tag.cancel(tr));
Void _ = wait(unlockDatabase(tr, current.get().first));
return ERestoreState::ABORTED;
}
struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
2017-05-26 04:48:44 +08:00
static StringRef name;
static const uint32_t version;
static struct {
static TaskParam<Version> firstVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state RestoreConfig restore(task);
state Version restoreVersion;
state Reference<IBackupContainer> bc;
2017-05-26 04:48:44 +08:00
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-09-09 07:09:18 +08:00
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
Version _restoreVersion = wait(restore.restoreVersion().getOrThrow(tr));
restoreVersion = _restoreVersion;
2017-05-26 04:48:44 +08:00
bool go = wait(taskBucket->keepRunning(tr, task));
if(!go)
return Void();
ERestoreState oldState = wait(restore.stateEnum().getD(tr));
if(oldState != ERestoreState::QUEUED && oldState != ERestoreState::STARTING) {
Void _ = wait(restore.logError(cx, restore_error(), format("StartFullRestore: Encountered unexpected state(%d)", oldState), this));
return Void();
}
restore.stateEnum().set(tr, ERestoreState::STARTING);
restore.fileSet().clear(tr);
2017-05-26 04:48:44 +08:00
restore.fileBlockCount().clear(tr);
restore.fileCount().clear(tr);
Reference<IBackupContainer> _bc = wait(restore.sourceContainer().getOrThrow(tr));
bc = _bc;
2017-05-26 04:48:44 +08:00
Void _ = wait(tr->commit());
break;
} catch(Error &e) {
Void _ = wait(tr->onError(e));
}
}
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(restoreVersion));
2017-05-26 04:48:44 +08:00
if(!restorable.present())
throw restore_missing_data();
2017-05-26 04:48:44 +08:00
// Convert the two lists in restorable (logs and ranges) to a single list of RestoreFiles.
// Order does not matter, they will be put in order when written to the restoreFileMap below.
state std::vector<RestoreConfig::RestoreFile> files;
2017-05-26 04:48:44 +08:00
for(const RangeFile &f : restorable.get().ranges) {
files.push_back({f.version, f.fileName, true, f.blockSize, f.fileSize});
}
for(const LogFile &f : restorable.get().logs) {
files.push_back({f.beginVersion, f.fileName, false, f.blockSize, f.fileSize});
}
2017-05-26 04:48:44 +08:00
state std::vector<RestoreConfig::RestoreFile>::iterator start = files.begin();
state std::vector<RestoreConfig::RestoreFile>::iterator end = files.end();
2017-05-26 04:48:44 +08:00
while(start != end) {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
bool go = wait(taskBucket->keepRunning(tr, task));
if(!go)
return Void();
state std::vector<RestoreConfig::RestoreFile>::iterator i = start;
2017-05-26 04:48:44 +08:00
state int txBytes = 0;
state int nFileBlocks = 0;
state int nFiles = 0;
auto fileSet = restore.fileSet();
2017-05-26 04:48:44 +08:00
for(; i != end && txBytes < 1e6; ++i) {
if(i == files.begin())
Params.firstVersion().set(task, i->version);
if(i->version > restoreVersion) {
2017-05-26 04:48:44 +08:00
end = i;
break;
}
txBytes += fileSet.insert(tr, *i);
nFileBlocks += (i->fileSize + i->blockSize - 1) / i->blockSize;
2017-05-26 04:48:44 +08:00
++nFiles;
}
// Increment counts
restore.fileCount().atomicOp(tr, nFiles, MutationRef::Type::AddValue);
restore.fileBlockCount().atomicOp(tr, nFileBlocks, MutationRef::Type::AddValue);
Void _ = wait(tr->commit());
TraceEvent("FileRestoreLoadedFiles")
.detail("UID", restore.getUid())
.detail("FileCount", nFiles)
.detail("FileBlockCount", nFileBlocks)
.detail("Bytes", txBytes)
.detail("TaskInstance", (uint64_t)this);
start = i;
} catch(Error &e) {
Void _ = wait(tr->onError(e));
}
}
return Void();
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Version firstVersion = Params.firstVersion().get(task);
state RestoreConfig restore(task);
if(firstVersion == Version()) {
Void _ = wait(restore.logError(tr->getDatabase(), restore_missing_data(), "StartFullRestore: The backup had no data.", this));
std::string tag = wait(restore.tag().getD(tr));
ERestoreState _ = wait(abortRestore(tr, StringRef(tag)));
2017-05-26 04:48:44 +08:00
return Void();
}
restore.stateEnum().set(tr, ERestoreState::RUNNING);
// Set applyMutation versions
restore.setApplyBeginVersion(tr, firstVersion);
restore.setApplyEndVersion(tr, firstVersion);
// Apply range data and log data in order
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, firstVersion, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE));
2017-05-26 04:48:44 +08:00
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID uid, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>())
{
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Key doneKey = wait(completionKey.get(tr, taskBucket));
state Reference<Task> task(new Task(StartFullRestoreTaskFunc::name, StartFullRestoreTaskFunc::version, doneKey));
state RestoreConfig restore(uid);
// Bind the restore config to the new task
Void _ = wait(restore.toTask(tr, task));
if (!waitFor) {
return taskBucket->addTask(tr, task);
}
Void _ = wait(waitFor->onSetAddTask(tr, taskBucket, task));
return LiteralStringRef("OnSetAddTask");
}
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullRestoreTaskFunc::name = LiteralStringRef("restore_start");
const uint32_t StartFullRestoreTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullRestoreTaskFunc);
}
struct LogInfo : public ReferenceCounted<LogInfo> {
std::string fileName;
Reference<IAsyncFile> logFile;
Version beginVersion;
Version endVersion;
int64_t offset;
LogInfo() : offset(0) {};
};
class FileBackupAgentImpl {
public:
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
// This method will return the final status of the backup
ACTOR static Future<int> waitBackup(FileBackupAgent* backupAgent, Database cx, std::string tagName, bool stopWhenDone) {
2017-05-26 04:48:44 +08:00
state std::string backTrace;
state KeyBackedTag tag = makeBackupTag(tagName);
2017-05-26 04:48:44 +08:00
loop {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
state Optional<UidAndAbortedFlagT> oldUidAndAborted = wait(tag.get(tr));
if (!oldUidAndAborted.present()) {
return EBackupState::STATE_NEVERRAN;
}
state BackupConfig config(oldUidAndAborted.get().first);
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
2017-05-26 04:48:44 +08:00
// Break, if no longer runnable
if (!FileBackupAgent::isRunnable(status)) {
2017-05-26 04:48:44 +08:00
return status;
}
// Break, if in differential mode (restorable) and stopWhenDone is not enabled
if ((!stopWhenDone) && (BackupAgentBase::STATE_DIFFERENTIAL == status)) {
return status;
}
state Future<Void> watchFuture = tr->watch( config.stateEnum().key );
2017-05-26 04:48:44 +08:00
Void _ = wait( tr->commit() );
Void _ = wait( watchFuture );
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key outContainer, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone) {
2017-05-26 04:48:44 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-10-13 07:12:42 +08:00
TraceEvent(SevInfo, "FBA_submitBackup")
.detail("tagName", tagName.c_str())
.detail("stopWhenDone", stopWhenDone)
.detail("outContainer", outContainer.toString());
2017-08-31 09:05:50 +08:00
state KeyBackedTag tag = makeBackupTag(tagName);
Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
if (uidAndAbortedFlag.present()) {
state BackupConfig prevConfig(uidAndAbortedFlag.get().first);
state EBackupState prevBackupStatus = wait(prevConfig.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
if (FileBackupAgent::isRunnable(prevBackupStatus)) {
throw backup_duplicate();
}
2017-05-26 04:48:44 +08:00
2017-08-31 09:05:50 +08:00
// Now is time to clear prev backup config space. We have no more use for it.
prevConfig.clear(tr);
2017-05-26 04:48:44 +08:00
}
2017-08-31 09:05:50 +08:00
state BackupConfig config(g_random->randomUniqueID());
state UID uid = config.getUid();
2017-05-26 04:48:44 +08:00
// This check will ensure that current backupUid is later than the last backup Uid
2017-09-06 05:06:55 +08:00
state Standalone<StringRef> nowStr = BackupAgentBase::getCurrentTime();
2017-05-26 04:48:44 +08:00
state std::string backupContainer = outContainer.toString();
// To be consistent with directory handling behavior since FDB backup was first released, if the container string
// describes a local directory then "/backup-<timestamp>" will be added to it.
2017-05-26 04:48:44 +08:00
if(backupContainer.find("file://") == 0) {
backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString());
2017-05-26 04:48:44 +08:00
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer);
2017-05-26 04:48:44 +08:00
try {
Void _ = wait(timeoutError(bc->create(), 30));
} catch(Error &e) {
fprintf(stderr, "ERROR: Could not create backup container: %s\n", e.what());
2017-05-26 04:48:44 +08:00
throw backup_error();
}
2017-08-31 09:05:50 +08:00
Optional<Value> lastBackupTimestamp = wait(backupAgent->lastBackupTimestamp().get(tr));
2017-05-26 04:48:44 +08:00
2017-09-06 05:06:55 +08:00
if ((lastBackupTimestamp.present()) && (lastBackupTimestamp.get() >= nowStr)) {
2017-08-31 09:05:50 +08:00
fprintf(stderr, "ERROR: The last backup `%s' happened in the future.\n", printable(lastBackupTimestamp.get()).c_str());
2017-05-26 04:48:44 +08:00
throw backup_error();
}
KeyRangeMap<int> backupRangeSet;
for (auto& backupRange : backupRanges) {
backupRangeSet.insert(backupRange, 1);
}
backupRangeSet.coalesce(allKeys);
state std::vector<KeyRange> normalizedRanges;
2017-05-26 04:48:44 +08:00
for (auto& backupRange : backupRangeSet.ranges()) {
if (backupRange.value()) {
2017-09-06 05:06:55 +08:00
normalizedRanges.push_back(KeyRange(KeyRangeRef(backupRange.range().begin, backupRange.range().end)));
2017-05-26 04:48:44 +08:00
}
}
config.clear(tr);
// Point the tag to this new uid
tag.set(tr, {uid, false});
2017-05-26 04:48:44 +08:00
2017-09-06 05:06:55 +08:00
backupAgent->lastBackupTimestamp().set(tr, nowStr);
2017-05-26 04:48:44 +08:00
// Set the backup keys
config.tag().set(tr, tagName);
config.stateEnum().set(tr, EBackupState::STATE_SUBMITTED);
config.backupContainer().set(tr, bc);
config.stopWhenDone().set(tr, stopWhenDone);
config.backupRanges().set(tr, normalizedRanges);
2017-05-26 04:48:44 +08:00
2017-09-09 07:09:18 +08:00
Key taskKey = wait(fileBackup::StartFullBackupTaskFunc::addTask(tr, backupAgent->taskBucket, uid, TaskCompletionKey::noSignal()));
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<Void> submitRestore(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName, Key backupURL, Version restoreVersion, Key addPrefix, Key removePrefix, KeyRange restoreRange, bool lockDB, UID uid) {
ASSERT(restoreRange.contains(removePrefix) || removePrefix.size() == 0);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Get old restore config for this tag
state KeyBackedTag tag = makeRestoreTag(tagName.toString());
2017-05-26 04:48:44 +08:00
state Optional<UidAndAbortedFlagT> oldUidAndAborted = wait(tag.get(tr));
if(oldUidAndAborted.present()) {
if (oldUidAndAborted.get().first == uid) {
if (oldUidAndAborted.get().second) {
throw restore_duplicate_uid();
}
else {
return Void();
}
}
state RestoreConfig oldRestore(oldUidAndAborted.get().first);
// Make sure old restore for this tag is not runnable
bool runnable = wait(oldRestore.isRunnable(tr));
if (runnable) {
throw restore_duplicate_tag();
}
// Clear the old restore config
oldRestore.clear(tr);
}
KeyRange restoreIntoRange = KeyRangeRef(restoreRange.begin, restoreRange.end).removePrefix(removePrefix).withPrefix(addPrefix);
Standalone<RangeResultRef> existingRows = wait(tr->getRange(restoreIntoRange, 1));
if (existingRows.size() > 0) {
throw restore_destination_not_empty();
}
// Make new restore config
state RestoreConfig restore(uid);
// Point the tag to the new uid
tag.set(tr, {uid, false});
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupURL.toString());
2017-05-26 04:48:44 +08:00
// Configure the new restore
restore.tag().set(tr, tagName.toString());
restore.sourceContainer().set(tr, bc);
2017-05-26 04:48:44 +08:00
restore.stateEnum().set(tr, ERestoreState::QUEUED);
restore.restoreVersion().set(tr, restoreVersion);
restore.restoreRange().set(tr, restoreRange);
// this also sets restore.add/removePrefix.
restore.initApplyMutations(tr, addPrefix, removePrefix);
Key taskKey = wait(fileBackup::StartFullRestoreTaskFunc::addTask(tr, backupAgent->taskBucket, uid, TaskCompletionKey::noSignal()));
if (lockDB)
Void _ = wait(lockDatabase(tr, uid));
else
Void _ = wait(checkDatabaseLock(tr, uid));
return Void();
}
// This method will return the final status of the backup
ACTOR static Future<ERestoreState> waitRestore(Database cx, Key tagName, bool verbose) {
loop {
try {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeRestoreTag(tagName.toString());
2017-05-26 04:48:44 +08:00
Optional<UidAndAbortedFlagT> current = wait(tag.get(tr));
if(!current.present()) {
if(verbose)
printf("Tag: %s State: %s\n", tagName.toString().c_str(), FileBackupAgent::restoreStateText(ERestoreState::UNITIALIZED).toString().c_str());
return ERestoreState::UNITIALIZED;
}
state RestoreConfig restore(current.get().first);
if(verbose) {
state std::string details = wait(restore.getProgress(tr));
printf("%s\n", details.c_str());
}
state ERestoreState status = wait(restore.stateEnum().getD(tr));
state bool runnable = wait(restore.isRunnable(tr));
// State won't change from here
if (!runnable)
break;
// Wait for a change
state Future<Void> watchFuture = tr->watch(restore.stateEnum().key);
Void _ = wait(tr->commit());
if(verbose)
Void _ = wait(watchFuture || delay(1));
else
Void _ = wait(watchFuture);
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
return status;
}
ACTOR static Future<Void> discontinueBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
2017-09-02 05:39:38 +08:00
state KeyBackedTag tag = makeBackupTag(tagName.toString());
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
state BackupConfig config(current.first);
2017-09-02 05:39:38 +08:00
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
2017-05-26 04:48:44 +08:00
if (!FileBackupAgent::isRunnable(status)) {
2017-05-26 04:48:44 +08:00
throw backup_unneeded();
}
2017-10-13 07:12:42 +08:00
TraceEvent(SevInfo, "FBA_discontinueBackup")
.detail("tagName", tag.tagName.c_str())
.detail("status", BackupAgentBase::getStateText(status));
state bool stopWhenDone = wait(config.stopWhenDone().getOrThrow(tr));
2017-05-26 04:48:44 +08:00
if (stopWhenDone) {
2017-05-26 04:48:44 +08:00
throw backup_duplicate();
}
config.stopWhenDone().set(tr, true);
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<Void> abortBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, std::string tagName) {
2017-05-26 04:48:44 +08:00
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName);
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
state BackupConfig config(current.first);
2017-09-02 05:39:38 +08:00
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
2017-05-26 04:48:44 +08:00
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
throw backup_unneeded();
}
2017-10-13 07:12:42 +08:00
TraceEvent(SevInfo, "FBA_abortBackup")
.detail("tagName", tagName.c_str())
.detail("status", BackupAgentBase::getStateText(status));
// Cancel backup task through tag
Void _ = wait(tag.cancel(tr));
2017-09-02 05:39:38 +08:00
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
Key logsPath = uidPrefixKey(backupLogKeys.begin, config.getUid());
2017-05-26 04:48:44 +08:00
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR static Future<std::string> getStatus(FileBackupAgent* backupAgent, Database cx, int errorLimit, std::string tagName) {
2017-05-26 04:48:44 +08:00
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::string statusText;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag;
state BackupConfig config;
state EBackupState backupState;
2017-05-26 04:48:44 +08:00
statusText = "";
tag = makeBackupTag(tagName);
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
state Future<Optional<Value>> fDisabled = tr->get(backupAgent->taskBucket->getDisableKey());
if (uidAndAbortedFlag.present()) {
2017-09-06 00:42:14 +08:00
config = BackupConfig(uidAndAbortedFlag.get().first);
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
backupState = status;
}
2017-05-26 04:48:44 +08:00
if (!uidAndAbortedFlag.present() || backupState == EBackupState::STATE_NEVERRAN) {
2017-05-26 04:48:44 +08:00
statusText += "No previous backups found.\n";
} else {
state std::string backupStatus(BackupAgentBase::getStateText(backupState));
state Reference<IBackupContainer> bc = wait(config.backupContainer().getOrThrow(tr));
state Version stopVersion = wait(config.stopVersion().getD(tr, -1));
2017-05-26 04:48:44 +08:00
switch (backupState) {
case BackupAgentBase::STATE_SUBMITTED:
statusText += "The backup on tag `" + tagName + "' is in progress (just started) to " + bc->getURL() + ".\n";
2017-05-26 04:48:44 +08:00
break;
case BackupAgentBase::STATE_BACKUP:
statusText += "The backup on tag `" + tagName + "' is in progress to " + bc->getURL() + ".\n";
2017-05-26 04:48:44 +08:00
break;
case BackupAgentBase::STATE_DIFFERENTIAL:
statusText += "The backup on tag `" + tagName + "' is restorable but continuing to " + bc->getURL() + ".\n";
2017-05-26 04:48:44 +08:00
break;
case BackupAgentBase::STATE_COMPLETED:
statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " completed at version " + format("%lld", stopVersion) + ".\n";
2017-05-26 04:48:44 +08:00
break;
default:
statusText += "The previous backup on tag `" + tagName + "' at " + bc->getURL() + " " + backupStatus + ".\n";
2017-05-26 04:48:44 +08:00
break;
}
}
// Append the errors, if requested
2017-09-09 07:09:18 +08:00
if (errorLimit > 0 && config.getUid().isValid()) {
Optional<std::pair<std::string, int64_t>> errMsg = wait(config.lastError().get(tr));
if (errMsg.present()) {
statusText += "WARNING: Some backup agents have reported issues:\n";
statusText += format("[%lld]: %s\n", errMsg.get().second, errMsg.get().first.c_str());
2017-05-26 04:48:44 +08:00
}
}
Optional<Value> disabled = wait(fDisabled);
if(disabled.present()) {
statusText += format("\nAll backup agents have been disabled.\n");
}
2017-05-26 04:48:44 +08:00
break;
}
catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
return statusText;
}
ACTOR static Future<Version> getLastRestorable(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<Value> version = wait(tr->get(backupAgent->lastRestorable.pack(tagName)));
return (version.present()) ? BinaryReader::fromStringRef<Version>(version.get(), Unversioned()) : 0;
}
static StringRef read(StringRef& data, int bytes) {
if (bytes > data.size()) throw restore_error();
StringRef r = data.substr(0, bytes);
data = data.substr(bytes);
return r;
}
ACTOR static Future<Version> restore(FileBackupAgent* backupAgent, Database cx, Key tagName, Key url, bool waitForComplete, Version targetVersion, bool verbose, KeyRange range, Key addPrefix, Key removePrefix, bool lockDB, UID randomUid) {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
BackupDescription desc = wait(bc->describeBackup());
printf("Backup Description\n%s", desc.toString().c_str());
if(targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
if(!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
fprintf(stderr, "ERROR: Restore version %lld is not possible from %s\n", targetVersion, bc->getURL().c_str());
2017-05-26 04:48:44 +08:00
throw restore_invalid_version();
}
if (verbose) {
printf("Restoring backup to version: %lld\n", (long long) targetVersion);
}
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Void _ = wait(submitRestore(backupAgent, tr, tagName, url, targetVersion, addPrefix, removePrefix, range, lockDB, randomUid));
Void _ = wait(tr->commit());
break;
} catch(Error &e) {
if(e.code() != error_code_restore_duplicate_tag) {
Void _ = wait(tr->onError(e));
}
}
}
if(waitForComplete) {
ERestoreState finalState = wait(waitRestore(cx, tagName, verbose));
if(finalState != ERestoreState::COMPLETED)
throw restore_error();
}
return targetVersion;
}
//used for correctness only, locks the database before discontinuing the backup and that same lock is then used while doing the restore.
//the tagname of the backup must be the same as the restore.
ACTOR static Future<Version> atomicRestore(FileBackupAgent* backupAgent, Database cx, Key tagName, KeyRange range, Key addPrefix, Key removePrefix) {
state Reference<ReadYourWritesTransaction> ryw_tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
state BackupConfig backupConfig;
2017-05-26 04:48:44 +08:00
loop {
try {
ryw_tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
ryw_tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName.toString());
UidAndAbortedFlagT uidFlag = wait(tag.getOrThrow(ryw_tr));
backupConfig = BackupConfig(uidFlag.first);
state EBackupState status = wait(backupConfig.stateEnum().getOrThrow(ryw_tr));
2017-05-26 04:48:44 +08:00
if (status != BackupAgentBase::STATE_DIFFERENTIAL ) {
2017-05-26 04:48:44 +08:00
throw backup_duplicate();
}
break;
} catch( Error &e ) {
Void _ = wait( ryw_tr->onError(e) );
}
}
//Lock src, record commit version
state Transaction tr(cx);
state Version commitVersion;
state UID randomUid = g_random->randomUniqueID();
loop {
try {
Void _ = wait( lockDatabase(&tr, randomUid) );
Void _ = wait(tr.commit());
commitVersion = tr.getCommittedVersion();
break;
} catch( Error &e ) {
Void _ = wait(tr.onError(e));
}
}
ryw_tr->reset();
loop {
try {
Void _ = wait( discontinueBackup(backupAgent, ryw_tr, tagName) );
Void _ = wait( ryw_tr->commit() );
break;
} catch( Error &e ) {
if(e.code() == error_code_backup_unneeded || e.code() == error_code_backup_duplicate){
break;
}
Void _ = wait( ryw_tr->onError(e) );
}
}
int _ = wait( waitBackup(backupAgent, cx, tagName.toString(), true) );
2017-05-26 04:48:44 +08:00
ryw_tr->reset();
loop {
try {
ryw_tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
ryw_tr->setOption(FDBTransactionOptions::LOCK_AWARE);
ryw_tr->addReadConflictRange(range);
ryw_tr->clear(range);
Void _ = wait( ryw_tr->commit() );
break;
} catch( Error &e ) {
Void _ = wait( ryw_tr->onError(e) );
}
}
Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx));
Version ver = wait( restore(backupAgent, cx, tagName, KeyRef(bc->getURL()), true, -1, true, range, addPrefix, removePrefix, true, randomUid) );
2017-05-26 04:48:44 +08:00
return ver;
}
};
const std::string BackupAgentBase::defaultTagName = "default";
2017-05-26 04:48:44 +08:00
const int BackupAgentBase::logHeaderSize = 12;
const int FileBackupAgent::dataFooterSize = 20;
Future<Version> FileBackupAgent::restore(Database cx, Key tagName, Key url, bool waitForComplete, Version targetVersion, bool verbose, KeyRange range, Key addPrefix, Key removePrefix, bool lockDB) {
return FileBackupAgentImpl::restore(this, cx, tagName, url, waitForComplete, targetVersion, verbose, range, addPrefix, removePrefix, lockDB, g_random->randomUniqueID());
}
Future<Version> FileBackupAgent::atomicRestore(Database cx, Key tagName, KeyRange range, Key addPrefix, Key removePrefix) {
return FileBackupAgentImpl::atomicRestore(this, cx, tagName, range, addPrefix, removePrefix);
}
Future<ERestoreState> FileBackupAgent::abortRestore(Reference<ReadYourWritesTransaction> tr, Key tagName) {
return fileBackup::abortRestore(tr, tagName);
}
Future<std::string> FileBackupAgent::restoreStatus(Reference<ReadYourWritesTransaction> tr, Key tagName) {
return fileBackup::restoreStatus(tr, tagName);
}
Future<ERestoreState> FileBackupAgent::waitRestore(Database cx, Key tagName, bool verbose) {
return FileBackupAgentImpl::waitRestore(cx, tagName, verbose);
};
Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr, Key outContainer, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone) {
2017-05-26 04:48:44 +08:00
return FileBackupAgentImpl::submitBackup(this, tr, outContainer, tagName, backupRanges, stopWhenDone);
}
Future<Void> FileBackupAgent::discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName){
return FileBackupAgentImpl::discontinueBackup(this, tr, tagName);
}
Future<Void> FileBackupAgent::abortBackup(Reference<ReadYourWritesTransaction> tr, std::string tagName){
2017-05-26 04:48:44 +08:00
return FileBackupAgentImpl::abortBackup(this, tr, tagName);
}
Future<std::string> FileBackupAgent::getStatus(Database cx, int errorLimit, std::string tagName) {
2017-05-26 04:48:44 +08:00
return FileBackupAgentImpl::getStatus(this, cx, errorLimit, tagName);
}
Future<Version> FileBackupAgent::getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName) {
return FileBackupAgentImpl::getLastRestorable(this, tr, tagName);
}
void FileBackupAgent::setLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName, Version version) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->set(lastRestorable.pack(tagName), BinaryWriter::toValue<Version>(version, Unversioned()));
2017-05-26 04:48:44 +08:00
}
Future<int> FileBackupAgent::waitBackup(Database cx, std::string tagName, bool stopWhenDone) {
return FileBackupAgentImpl::waitBackup(this, cx, tagName, stopWhenDone);
2017-05-26 04:48:44 +08:00
}
Future<std::string> FileBackupAgent::getBackupInfo(std::string container, Version* defaultVersion) {
return map(IBackupContainer::openContainer(container)->describeBackup(), [=] (BackupDescription const &d) {
if(defaultVersion != nullptr && d.maxRestorableVersion.present())
*defaultVersion = d.maxRestorableVersion.get();
return d.toString();
});
2017-05-26 04:48:44 +08:00
}