Moving from task->params to Params

This commit is contained in:
Bhaskar Muppana 2017-09-01 13:50:38 -07:00
parent c564aaae68
commit d834ab9d4d
2 changed files with 91 additions and 67 deletions

View File

@ -216,23 +216,19 @@ public:
subspace( std::move(r.subspace) ),
config( std::move(r.config) ),
errors( std::move(r.errors) ),
ranges( std::move(r.ranges) ),
tagNames( std::move(r.tagNames) ),
lastRestorable( std::move(r.lastRestorable) ),
taskBucket( std::move(r.taskBucket) ),
futureBucket( std::move(r.futureBucket) ),
sourceStates( std::move(r.sourceStates) ) {}
futureBucket( std::move(r.futureBucket) ) {}
void operator=( FileBackupAgent&& r ) noexcept(true) {
subspace = std::move(r.subspace);
config = std::move(r.config);
errors = std::move(r.errors);
ranges = std::move(r.ranges);
tagNames = std::move(r.tagNames);
lastRestorable = std::move(r.lastRestorable),
taskBucket = std::move(r.taskBucket);
futureBucket = std::move(r.futureBucket);
sourceStates = std::move(r.sourceStates);
}
KeyBackedProperty<Key> lastBackupTimestamp() {
@ -332,9 +328,6 @@ public:
static std::string getDataFilename(Version version, int64_t size, int blockSize);
static std::string getLogFilename(Version beginVer, Version endVer, int64_t size, int blockSize);
static const Key keyBackupContainer;
static const Key keyLastRestorable;
Future<int64_t> getTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->getTaskCount(tr); }
Future<int64_t> getTaskCount(Database cx) { return taskBucket->getTaskCount(cx); }
Future<Void> watchTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->watchTaskCount(tr); }
@ -347,10 +340,8 @@ public:
Subspace subspace;
Subspace config;
Subspace errors;
Subspace ranges;
Subspace tagNames;
Subspace lastRestorable;
Subspace sourceStates;
Reference<TaskBucket> taskBucket;
Reference<FutureBucket> futureBucket;

View File

@ -35,8 +35,6 @@
#include <boost/algorithm/string/classification.hpp>
#include <algorithm>
const Key FileBackupAgent::keyLastRestorable = LiteralStringRef("last_restorable");
// For convenience
typedef FileBackupAgent::ERestoreState ERestoreState;
@ -475,13 +473,10 @@ FileBackupAgent::FileBackupAgent()
// tagNames has tagName => logUID
, tagNames(subspace.get(BackupAgentBase::keyTagName))
// The other subspaces have logUID -> value
, lastRestorable(subspace.get(FileBackupAgent::keyLastRestorable))
, config(subspace.get(BackupAgentBase::keyConfig))
, errors(subspace.get(BackupAgentBase::keyErrors))
, ranges(subspace.get(BackupAgentBase::keyRanges))
, taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks), true, false, true))
, futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), true, true))
, sourceStates(subspace.get(BackupAgentBase::keySourceStates))
{
}
@ -982,9 +977,23 @@ namespace fileBackup {
static StringRef name;
static const uint32_t version;
static const Key keyAddBackupRangeTasks;
static const Key keyBackupRangeBeginKey;
static const Key keyFileSize;
static struct {
static TaskParam<Key> beginKey() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Key> endKey() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Key> backupRangeBeginKey() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<bool> addBackupRangeTasks() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<int64_t> fileSize() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
StringRef getName() const { return name; };
@ -1052,8 +1061,9 @@ namespace fileBackup {
copyDefaultParameters(parentTask, task);
task->params[BackupAgentBase::keyBeginKey] = begin;
task->params[BackupAgentBase::keyEndKey] = end;
Params.beginKey().set(task, begin);
Params.endKey().set(task, end);
Params.addBackupRangeTasks().set(task, false);
if (!waitFor) {
return taskBucket->addTask(tr, task);
@ -1089,7 +1099,7 @@ namespace fileBackup {
// Find out if there is a shard boundary in(beginKey, endKey)
Standalone<VectorRef<KeyRef>> keys = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getBlockOfShards(tr, task->params[FileBackupAgent::keyBeginKey], task->params[FileBackupAgent::keyEndKey], 1); }));
if (keys.size() > 0) {
task->params[BackupRangeTaskFunc::keyAddBackupRangeTasks] = StringRef();
Params.addBackupRangeTasks().set(task, true);
return Void();
}
@ -1101,8 +1111,8 @@ namespace fileBackup {
state Version outVersion = -1;
state std::string outFileName;
state Key beginKey = task->params[FileBackupAgent::keyBeginKey];
state Key endKey = task->params[FileBackupAgent::keyEndKey];
state Key beginKey = Params.beginKey().get(task);
state Key endKey = Params.endKey().get(task);
state Key lastKey;
state KeyRange range(KeyRangeRef(beginKey, endKey));
@ -1161,8 +1171,8 @@ namespace fileBackup {
if (now() >= timeout) {
TEST(true); // Backup range task did not finish before timeout
task->params[BackupRangeTaskFunc::keyBackupRangeBeginKey] = beginKey;
task->params[BackupRangeTaskFunc::keyFileSize] = BinaryWriter::toValue<int64_t>(fileSize, Unversioned());
Params.backupRangeBeginKey().set(task, beginKey);
Params.fileSize().set(task, fileSize);
return Void();
}
@ -1216,7 +1226,7 @@ namespace fileBackup {
throw e2;
}
}
task->params[BackupRangeTaskFunc::keyFileSize] = BinaryWriter::toValue<int64_t>(fileSize, Unversioned());
Params.fileSize().set(task, fileSize);
return Void();
}
@ -1230,8 +1240,8 @@ namespace fileBackup {
ACTOR static Future<Void> startBackupRangeInternal(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task, Reference<TaskFuture> onDone) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Key nextKey = task->params[BackupAgentBase::keyBeginKey];
state Standalone<VectorRef<KeyRef>> keys = wait(getBlockOfShards(tr, nextKey, task->params[FileBackupAgent::keyEndKey], CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT));
state Key nextKey = Params.beginKey().get(task);
state Standalone<VectorRef<KeyRef>> keys = wait(getBlockOfShards(tr, nextKey, Params.endKey().get(task), CLIENT_KNOBS->BACKUP_SHARD_TASK_LIMIT));
std::vector<Future<Key>> addTaskVector;
for (int idx = 0; idx < keys.size(); ++idx) {
@ -1243,9 +1253,9 @@ namespace fileBackup {
Void _ = wait(waitForAll(addTaskVector));
if (nextKey != task->params[BackupAgentBase::keyEndKey]) {
if (nextKey != Params.endKey().get(task)) {
// Add task to cover nextKey to the end, using the priority of the current task
Key _ = wait(addTask(tr, taskBucket, task, nextKey, task->params[BackupAgentBase::keyEndKey], TaskCompletionKey::joinWith(onDone), Reference<TaskFuture>(), task->getPriority()));
Key _ = wait(addTask(tr, taskBucket, task, nextKey, Params.endKey().get(task), TaskCompletionKey::joinWith(onDone), Reference<TaskFuture>(), task->getPriority()));
}
return Void();
@ -1254,17 +1264,16 @@ namespace fileBackup {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
if(task->params.find(BackupRangeTaskFunc::keyFileSize) != task->params.end()) {
int64_t fileSize = BinaryReader::fromStringRef<int64_t>(task->params[BackupRangeTaskFunc::keyFileSize], Unversioned());
BackupConfig(task).rangeBytesWritten().atomicOp(tr, fileSize, MutationRef::AddValue);
if(Params.fileSize().exists(task)) {
BackupConfig(task).rangeBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue);
}
if (task->params.find(BackupRangeTaskFunc::keyAddBackupRangeTasks) != task->params.end()) {
if (Params.addBackupRangeTasks().get(task)) {
Void _ = wait(startBackupRangeInternal(tr, taskBucket, futureBucket, task, taskFuture));
}
else if (task->params.find(BackupRangeTaskFunc::keyBackupRangeBeginKey) != task->params.end() && task->params[BackupRangeTaskFunc::keyBackupRangeBeginKey] < task->params[BackupAgentBase::keyEndKey]) {
else if (Params.backupRangeBeginKey().exists(task) && Params.backupRangeBeginKey().get(task) < Params.endKey().get(task)) {
ASSERT(taskFuture->key.size() > 0);
Key _ = wait(BackupRangeTaskFunc::addTask(tr, taskBucket, task, task->params[BackupRangeTaskFunc::keyBackupRangeBeginKey], task->params[BackupAgentBase::keyEndKey], TaskCompletionKey::signal(taskFuture->key)));
Key _ = wait(BackupRangeTaskFunc::addTask(tr, taskBucket, task, Params.backupRangeBeginKey().get(task), Params.endKey().get(task), TaskCompletionKey::signal(taskFuture->key)));
}
else {
Void _ = wait(taskFuture->set(tr, taskBucket));
@ -1277,9 +1286,6 @@ namespace fileBackup {
};
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_range");
const uint32_t BackupRangeTaskFunc::version = 1;
const Key BackupRangeTaskFunc::keyAddBackupRangeTasks = LiteralStringRef("addBackupRangeTasks");
const Key BackupRangeTaskFunc::keyBackupRangeBeginKey = LiteralStringRef("backupRangeBeginKey");
const Key BackupRangeTaskFunc::keyFileSize = LiteralStringRef("fileSize");
REGISTER_TASKFUNC(BackupRangeTaskFunc);
struct FinishFullBackupTaskFunc : TaskFuncBase {
@ -1335,9 +1341,23 @@ namespace fileBackup {
static StringRef name;
static const uint32_t version;
static const Key keyNextBeginVersion;
static const Key keyAddBackupLogRangeTasks;
static const Key keyFileSize;
static struct {
static TaskParam<Version> nextBeginVersion() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<bool> addBackupLogRangeTasks() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<int64_t> fileSize() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
static TaskParam<Version> endVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
StringRef getName() const { return name; };
@ -1386,8 +1406,8 @@ namespace fileBackup {
Void _ = wait(checkTaskVersion(cx, task, BackupLogRangeTaskFunc::name, BackupLogRangeTaskFunc::version));
state double timeout = now() + CLIENT_KNOBS->BACKUP_RANGE_TIMEOUT;
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyBeginVersion], Unversioned());
state Version endVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyEndVersion], Unversioned());
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = Params.endVersion().get(task);
state BackupConfig config(task);
state std::string backupContainer;
@ -1412,13 +1432,13 @@ namespace fileBackup {
}
if (now() >= timeout) {
task->params[BackupLogRangeTaskFunc::keyNextBeginVersion] = task->params[FileBackupAgent::keyBeginVersion];
Params.nextBeginVersion().set(task, beginVersion);
return Void();
}
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey());
if (ranges.size() > CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES) {
task->params[BackupLogRangeTaskFunc::keyAddBackupLogRangeTasks] = StringRef();
Params.addBackupLogRangeTasks().set(task, true);
return Void();
}
@ -1449,12 +1469,12 @@ namespace fileBackup {
else
endVersion = std::min<Version>(endVersion, ((beginVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE) + idx + 1) * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
task->params[BackupLogRangeTaskFunc::keyNextBeginVersion] = BinaryWriter::toValue(endVersion, Unversioned());
Params.nextBeginVersion().set(task, endVersion);
break;
}
}
task->params[BackupLogRangeTaskFunc::keyFileSize] = BinaryWriter::toValue<int64_t>(logFile.offset, Unversioned());
Params.fileSize().set(task, logFile.offset);
std::string logFileName = FileBackupAgent::getLogFilename(beginVersion, endVersion, logFile.offset, logFile.blockSize);
Void _ = wait(endLogFile(cx, taskBucket, futureBucket, task, outFile, tempFileName, logFileName, logFile.offset, backupContainer));
@ -1470,8 +1490,9 @@ namespace fileBackup {
copyDefaultParameters(parentTask, task);
task->params[FileBackupAgent::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
task->params[FileBackupAgent::keyEndVersion] = BinaryWriter::toValue(endVersion, Unversioned());
Params.beginVersion().set(task, beginVersion);
Params.endVersion().set(task, endVersion);
Params.addBackupLogRangeTasks().set(task, false);
if (!waitFor) {
return taskBucket->addTask(tr, task);
@ -1506,21 +1527,20 @@ namespace fileBackup {
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyBeginVersion], Unversioned());
state Version endVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyEndVersion], Unversioned());
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = Params.endVersion().get(task);
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
if(task->params.find(BackupLogRangeTaskFunc::keyFileSize) != task->params.end()) {
int64_t fileSize = BinaryReader::fromStringRef<int64_t>(task->params[BackupLogRangeTaskFunc::keyFileSize], Unversioned());
BackupConfig(task).logBytesWritten().atomicOp(tr, fileSize, MutationRef::AddValue);
if(Params.fileSize().exists(task)) {
BackupConfig(task).logBytesWritten().atomicOp(tr, Params.fileSize().get(task), MutationRef::AddValue);
}
if (task->params.find(BackupLogRangeTaskFunc::keyAddBackupLogRangeTasks) != task->params.end()) {
if (Params.addBackupLogRangeTasks().get(task)) {
Void _ = wait(startBackupLogRangeInternal(tr, taskBucket, futureBucket, task, taskFuture, beginVersion, endVersion));
endVersion = beginVersion;
}
else if (task->params.find(BackupLogRangeTaskFunc::keyNextBeginVersion) != task->params.end()) {
state Version nextVersion = BinaryReader::fromStringRef<Version>(task->params[BackupLogRangeTaskFunc::keyNextBeginVersion], Unversioned());
else if (Params.nextBeginVersion().exists(task)) {
state Version nextVersion = Params.nextBeginVersion().get(task);
Key _ = wait(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, nextVersion, endVersion, TaskCompletionKey::joinWith(taskFuture)));
endVersion = nextVersion;
} else {
@ -1561,15 +1581,18 @@ namespace fileBackup {
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_log_range");
const uint32_t BackupLogRangeTaskFunc::version = 1;
const Key BackupLogRangeTaskFunc::keyNextBeginVersion = LiteralStringRef("nextBeginVersion");
const Key BackupLogRangeTaskFunc::keyAddBackupLogRangeTasks = LiteralStringRef("addBackupLogRangeTasks");
const Key BackupLogRangeTaskFunc::keyFileSize = LiteralStringRef("fileSize");
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
struct BackupLogsTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static struct {
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
Void _ = wait(checkTaskVersion(tr, task, BackupLogsTaskFunc::name, BackupLogsTaskFunc::version));
@ -1582,7 +1605,7 @@ namespace fileBackup {
state BackupConfig config(task);
state Version stopVersionData = wait(config.stopVersion().getD(tr, -1));
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyBeginVersion], Unversioned());
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = std::max<Version>( tr->getReadVersion().get() + 1, beginVersion + (CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES-1)*CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE );
if(endVersion - beginVersion > g_random->randomInt64(0, CLIENT_KNOBS->BACKUP_VERSION_DELAY)) {
@ -1616,7 +1639,7 @@ namespace fileBackup {
copyDefaultParameters(parentTask, task);
task->params[BackupAgentBase::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
Params.beginVersion().set(task, beginVersion);
if (!waitFor) {
return taskBucket->addTask(tr, task);
@ -1687,6 +1710,12 @@ namespace fileBackup {
static StringRef name;
static const uint32_t version;
static struct {
static TaskParam<Version> beginVersion() {
return LiteralStringRef(__FUNCTION__);
}
} Params;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
Void _ = wait(checkTaskVersion(tr, task, BackupDiffLogsTaskFunc::name, BackupDiffLogsTaskFunc::version));
@ -1702,7 +1731,7 @@ namespace fileBackup {
state bool stopWhenDone = wait(config.stopWhenDone().getOrThrow(tr));
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[FileBackupAgent::keyBeginVersion], Unversioned());
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = std::max<Version>( tr->getReadVersion().get() + 1, beginVersion + (CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES-1)*CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE );
tr->set(FileBackupAgent().lastRestorable.get(tagBytes).pack(), BinaryWriter::toValue(beginVersion, Unversioned()));
@ -1735,7 +1764,7 @@ namespace fileBackup {
copyDefaultParameters(parentTask, task);
task->params[FileBackupAgent::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
Params.beginVersion().set(task, beginVersion);
if (!waitFor) {
return taskBucket->addTask(tr, task);
@ -1999,6 +2028,10 @@ namespace fileBackup {
static StringRef name;
static const uint32_t version;
static struct {
static TaskParam<Version> beginVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
Void _ = wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));
@ -2009,7 +2042,7 @@ namespace fileBackup {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Version startVersion = wait(tr->getReadVersion());
task->params[FileBackupAgent::keyBeginVersion] = BinaryWriter::toValue(startVersion, Unversioned());
Params.beginVersion().set(task, startVersion);
break;
}
catch (Error &e) {
@ -2024,7 +2057,7 @@ namespace fileBackup {
state BackupConfig config(task);
state UID logUid = config.getUid();
state Key logUidDest = uidPrefixKey(backupLogKeys.begin, logUid);
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyBeginVersion], Unversioned());
state Version beginVersion = Params.beginVersion().get(task);
ASSERT(config.getUid() == logUid);