2017-05-26 04:48:44 +08:00
|
|
|
/*
|
|
|
|
* BackupAgent.h
|
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef FDBCLIENT_BACKUP_AGENT_H
|
|
|
|
#define FDBCLIENT_BACKUP_AGENT_H
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include "flow/flow.h"
|
|
|
|
#include "NativeAPI.h"
|
|
|
|
#include "TaskBucket.h"
|
2017-07-15 06:49:30 +08:00
|
|
|
#include "Notified.h"
|
2017-05-26 04:48:44 +08:00
|
|
|
#include <fdbrpc/IAsyncFile.h>
|
|
|
|
#include "KeyBackedTypes.h"
|
|
|
|
#include <ctime>
|
|
|
|
#include <climits>
|
|
|
|
|
|
|
|
class BackupAgentBase : NonCopyable {
|
|
|
|
public:
|
|
|
|
// Type of program being executed
|
|
|
|
enum enumActionResult {
|
|
|
|
RESULT_SUCCESSFUL = 0, RESULT_ERRORED = 1, RESULT_DUPLICATE = 2, RESULT_UNNEEDED = 3
|
|
|
|
};
|
|
|
|
|
|
|
|
enum enumState {
|
|
|
|
STATE_ERRORED = 0, STATE_SUBMITTED = 1, STATE_BACKUP = 2, STATE_DIFFERENTIAL = 3, STATE_COMPLETED = 4, STATE_NEVERRAN = 5, STATE_ABORTED = 6, STATE_PARTIALLY_ABORTED = 7
|
|
|
|
};
|
|
|
|
|
|
|
|
static const Key keyFolderId;
|
|
|
|
static const Key keyBeginVersion;
|
|
|
|
static const Key keyEndVersion;
|
|
|
|
static const Key keyConfigBackupTag;
|
|
|
|
static const Key keyConfigLogUid;
|
|
|
|
static const Key keyConfigBackupRanges;
|
|
|
|
static const Key keyConfigStopWhenDoneKey;
|
|
|
|
static const Key keyStateStatus;
|
|
|
|
static const Key keyStateStop;
|
|
|
|
static const Key keyLastUid;
|
|
|
|
static const Key keyBeginKey;
|
|
|
|
static const Key keyEndKey;
|
|
|
|
|
|
|
|
static const Key keyTagName;
|
|
|
|
static const Key keyStates;
|
|
|
|
static const Key keyConfig;
|
|
|
|
static const Key keyErrors;
|
|
|
|
static const Key keyRanges;
|
|
|
|
static const Key keyTasks;
|
|
|
|
static const Key keyFutures;
|
|
|
|
static const Key keySourceStates;
|
|
|
|
static const Key keySourceTagName;
|
|
|
|
|
|
|
|
static const int logHeaderSize;
|
|
|
|
|
|
|
|
// Convert the status text to an enumerated value
|
|
|
|
static enumState getState(std::string stateText)
|
|
|
|
{
|
|
|
|
enumState enState = STATE_ERRORED;
|
|
|
|
|
|
|
|
if (stateText.empty()) {
|
|
|
|
enState = STATE_NEVERRAN;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("has been submitted")) {
|
|
|
|
enState = STATE_SUBMITTED;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("has been started")) {
|
|
|
|
enState = STATE_BACKUP;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("is differential")) {
|
|
|
|
enState = STATE_DIFFERENTIAL;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("has been completed")) {
|
|
|
|
enState = STATE_COMPLETED;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("has been aborted")) {
|
|
|
|
enState = STATE_ABORTED;
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (!stateText.compare("has been partially aborted")) {
|
|
|
|
enState = STATE_PARTIALLY_ABORTED;
|
|
|
|
}
|
|
|
|
|
|
|
|
return enState;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert the status text to an enumerated value
|
|
|
|
static const char* getStateText(enumState enState)
|
|
|
|
{
|
|
|
|
const char* stateText;
|
|
|
|
|
|
|
|
switch (enState)
|
|
|
|
{
|
|
|
|
case STATE_ERRORED:
|
|
|
|
stateText = "has errored";
|
|
|
|
break;
|
|
|
|
case STATE_NEVERRAN:
|
|
|
|
stateText = "has never been started";
|
|
|
|
break;
|
|
|
|
case STATE_SUBMITTED:
|
|
|
|
stateText = "has been submitted";
|
|
|
|
break;
|
|
|
|
case STATE_BACKUP:
|
|
|
|
stateText = "has been started";
|
|
|
|
break;
|
|
|
|
case STATE_DIFFERENTIAL:
|
|
|
|
stateText = "is differential";
|
|
|
|
break;
|
|
|
|
case STATE_COMPLETED:
|
|
|
|
stateText = "has been completed";
|
|
|
|
break;
|
|
|
|
case STATE_ABORTED:
|
|
|
|
stateText = "has been aborted";
|
|
|
|
break;
|
|
|
|
case STATE_PARTIALLY_ABORTED:
|
|
|
|
stateText = "has been partially aborted";
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
stateText = "<undefined>";
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return stateText;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Determine if the specified state is runnable
|
|
|
|
static bool isRunnable(enumState enState)
|
|
|
|
{
|
|
|
|
bool isRunnable = false;
|
|
|
|
|
|
|
|
switch (enState)
|
|
|
|
{
|
|
|
|
case STATE_SUBMITTED:
|
|
|
|
case STATE_BACKUP:
|
|
|
|
case STATE_DIFFERENTIAL:
|
|
|
|
case STATE_PARTIALLY_ABORTED:
|
|
|
|
isRunnable = true;
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
return isRunnable;
|
|
|
|
}
|
|
|
|
|
2017-10-13 07:12:42 +08:00
|
|
|
static const KeyRef getDefaultTag() {
|
2017-09-06 02:38:40 +08:00
|
|
|
return StringRef(defaultTagName);
|
|
|
|
}
|
|
|
|
|
|
|
|
static const std::string getDefaultTagName() {
|
|
|
|
return defaultTagName;
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
static Standalone<StringRef> getCurrentTime() {
|
|
|
|
double t = now();
|
|
|
|
time_t curTime = t;
|
|
|
|
char buffer[128];
|
|
|
|
struct tm* timeinfo;
|
|
|
|
timeinfo = localtime(&curTime);
|
|
|
|
strftime(buffer, 128, "%Y-%m-%d-%H-%M-%S", timeinfo);
|
|
|
|
|
|
|
|
std::string time(buffer);
|
|
|
|
return StringRef(time + format(".%06d", (int)(1e6*(t - curTime))));
|
|
|
|
}
|
|
|
|
|
|
|
|
protected:
|
2017-09-06 02:38:40 +08:00
|
|
|
static const std::string defaultTagName;
|
2017-05-26 04:48:44 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
class FileBackupAgent : public BackupAgentBase {
|
|
|
|
public:
|
|
|
|
FileBackupAgent();
|
|
|
|
|
|
|
|
FileBackupAgent( FileBackupAgent&& r ) noexcept(true) :
|
|
|
|
subspace( std::move(r.subspace) ),
|
|
|
|
config( std::move(r.config) ),
|
|
|
|
lastRestorable( std::move(r.lastRestorable) ),
|
|
|
|
taskBucket( std::move(r.taskBucket) ),
|
2017-09-02 04:50:38 +08:00
|
|
|
futureBucket( std::move(r.futureBucket) ) {}
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
void operator=( FileBackupAgent&& r ) noexcept(true) {
|
|
|
|
subspace = std::move(r.subspace);
|
|
|
|
config = std::move(r.config);
|
|
|
|
lastRestorable = std::move(r.lastRestorable),
|
|
|
|
taskBucket = std::move(r.taskBucket);
|
|
|
|
futureBucket = std::move(r.futureBucket);
|
|
|
|
}
|
|
|
|
|
2017-08-31 09:05:50 +08:00
|
|
|
KeyBackedProperty<Key> lastBackupTimestamp() {
|
|
|
|
return config.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
Future<Void> run(Database cx, double *pollDelay, int maxConcurrentTasks) {
|
|
|
|
return taskBucket->run(cx, futureBucket, pollDelay, maxConcurrentTasks);
|
|
|
|
}
|
|
|
|
|
|
|
|
/** RESTORE **/
|
|
|
|
|
|
|
|
enum ERestoreState { UNITIALIZED = 0, QUEUED = 1, STARTING = 2, RUNNING = 3, COMPLETED = 4, ABORTED = 5 };
|
|
|
|
static StringRef restoreStateText(ERestoreState id);
|
|
|
|
|
|
|
|
// restore() will
|
|
|
|
// - make sure that url is readable and appears to be a complete backup
|
|
|
|
// - make sure the requested TargetVersion is valid
|
|
|
|
// - submit a restore on the given tagName
|
|
|
|
// - Optionally wait for the restore's completion. Will restore_error if restore fails or is aborted.
|
|
|
|
// restore() will return the targetVersion which will be either the valid version passed in or the max restorable version for the given url.
|
|
|
|
Future<Version> restore(Database cx, Key tagName, Key url, bool waitForComplete = true, Version targetVersion = -1, bool verbose = true, KeyRange range = normalKeys, Key addPrefix = Key(), Key removePrefix = Key(), bool lockDB = true);
|
|
|
|
Future<Version> atomicRestore(Database cx, Key tagName, KeyRange range = normalKeys, Key addPrefix = Key(), Key removePrefix = Key());
|
|
|
|
|
|
|
|
// Tries to abort the restore for a tag. Returns the final (stable) state of the tag.
|
|
|
|
Future<ERestoreState> abortRestore(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<ERestoreState> abortRestore(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return abortRestore(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
|
|
|
// Waits for a restore tag to reach a final (stable) state.
|
|
|
|
Future<ERestoreState> waitRestore(Database cx, Key tagName, bool verbose);
|
|
|
|
|
|
|
|
// Get a string describing the status of a tag
|
|
|
|
Future<std::string> restoreStatus(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<std::string> restoreStatus(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return restoreStatus(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
|
|
|
/** BACKUP METHODS **/
|
|
|
|
|
2017-09-06 02:38:40 +08:00
|
|
|
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr, Key outContainer, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone = true);
|
|
|
|
Future<Void> submitBackup(Database cx, Key outContainer, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone = true) {
|
2017-05-26 04:48:44 +08:00
|
|
|
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr){ return submitBackup(tr, outContainer, tagName, backupRanges, stopWhenDone); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<Void> discontinueBackup(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return discontinueBackup(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
2017-09-06 02:38:40 +08:00
|
|
|
Future<Void> abortBackup(Reference<ReadYourWritesTransaction> tr, std::string tagName);
|
|
|
|
Future<Void> abortBackup(Database cx, std::string tagName) {
|
2017-05-26 04:48:44 +08:00
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return abortBackup(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
2017-09-06 02:38:40 +08:00
|
|
|
Future<std::string> getStatus(Database cx, int errorLimit, std::string tagName);
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
Future<Version> getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
|
|
|
|
// stopWhenDone will return when the backup is stopped, if enabled. Otherwise, it
|
|
|
|
// will return when the backup directory is restorable.
|
2017-09-06 02:38:40 +08:00
|
|
|
Future<int> waitBackup(Database cx, std::string tagName, bool stopWhenDone = true);
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
static Future<std::string> getBackupInfo(std::string backupContainer, Version* defaultVersion = NULL);
|
|
|
|
|
|
|
|
static std::string getTempFilename();
|
|
|
|
// Data(key ranges) and Log files will have their file size in the name because it is not at all convenient
|
|
|
|
// to fetch filesizes from either of the current BackupContainer implementations. LocalDirectory requires
|
|
|
|
// querying each file separately, and Blob Store doesn't support renames so the apparent log and data files
|
|
|
|
// are actually a kind of symbolic link so to get the size of the final file it would have to be read.
|
|
|
|
static std::string getDataFilename(Version version, int64_t size, int blockSize);
|
|
|
|
static std::string getLogFilename(Version beginVer, Version endVer, int64_t size, int blockSize);
|
|
|
|
|
2017-10-28 05:06:15 +08:00
|
|
|
static const Key keyLastRestorable;
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
Future<int64_t> getTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->getTaskCount(tr); }
|
|
|
|
Future<int64_t> getTaskCount(Database cx) { return taskBucket->getTaskCount(cx); }
|
|
|
|
Future<Void> watchTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->watchTaskCount(tr); }
|
|
|
|
|
|
|
|
Future<bool> checkActive(Database cx) { return taskBucket->checkActive(cx); }
|
|
|
|
|
|
|
|
friend class FileBackupAgentImpl;
|
|
|
|
static const int dataFooterSize;
|
|
|
|
|
|
|
|
Subspace subspace;
|
|
|
|
Subspace config;
|
|
|
|
Subspace lastRestorable;
|
|
|
|
|
|
|
|
Reference<TaskBucket> taskBucket;
|
|
|
|
Reference<FutureBucket> futureBucket;
|
|
|
|
};
|
|
|
|
|
|
|
|
class DatabaseBackupAgent : public BackupAgentBase {
|
|
|
|
public:
|
|
|
|
DatabaseBackupAgent();
|
|
|
|
explicit DatabaseBackupAgent(Database src);
|
|
|
|
|
|
|
|
DatabaseBackupAgent( DatabaseBackupAgent&& r ) noexcept(true) :
|
|
|
|
subspace( std::move(r.subspace) ),
|
|
|
|
states( std::move(r.states) ),
|
|
|
|
config( std::move(r.config) ),
|
|
|
|
errors( std::move(r.errors) ),
|
|
|
|
ranges( std::move(r.ranges) ),
|
|
|
|
tagNames( std::move(r.tagNames) ),
|
|
|
|
taskBucket( std::move(r.taskBucket) ),
|
|
|
|
futureBucket( std::move(r.futureBucket) ),
|
|
|
|
sourceStates( std::move(r.sourceStates) ),
|
|
|
|
sourceTagNames( std::move(r.sourceTagNames) ) {}
|
|
|
|
|
|
|
|
void operator=( DatabaseBackupAgent&& r ) noexcept(true) {
|
|
|
|
subspace = std::move(r.subspace);
|
|
|
|
states = std::move(r.states);
|
|
|
|
config = std::move(r.config);
|
|
|
|
errors = std::move(r.errors);
|
|
|
|
ranges = std::move(r.ranges);
|
|
|
|
tagNames = std::move(r.tagNames);
|
|
|
|
taskBucket = std::move(r.taskBucket);
|
|
|
|
futureBucket = std::move(r.futureBucket);
|
|
|
|
sourceStates = std::move(r.sourceStates);
|
|
|
|
sourceTagNames = std::move(r.sourceTagNames);
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> run(Database cx, double *pollDelay, int maxConcurrentTasks) {
|
|
|
|
return taskBucket->run(cx, futureBucket, pollDelay, maxConcurrentTasks);
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> atomicSwitchover(Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix);
|
|
|
|
|
|
|
|
Future<Void> unlockBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<Void> unlockBackup(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return unlockBackup(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone = true, Key addPrefix = StringRef(), Key removePrefix = StringRef(), bool lockDatabase = false, bool databasesInSync=false);
|
|
|
|
Future<Void> submitBackup(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone = true, Key addPrefix = StringRef(), Key removePrefix = StringRef(), bool lockDatabase = false, bool databasesInSync=false) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return submitBackup(tr, tagName, backupRanges, stopWhenDone, addPrefix, removePrefix, lockDatabase, databasesInSync); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<Void> discontinueBackup(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return discontinueBackup(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> abortBackup(Database cx, Key tagName, bool partial = false);
|
|
|
|
|
|
|
|
Future<std::string> getStatus(Database cx, int errorLimit, Key tagName);
|
|
|
|
|
|
|
|
Future<int> getStateValue(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
|
|
|
Future<int> getStateValue(Database cx, UID logUid) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getStateValue(tr, logUid); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<UID> getLogUid(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
|
|
|
Future<UID> getLogUid(Database cx, Key tagName) {
|
|
|
|
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getLogUid(tr, tagName); });
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<int64_t> getRangeBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
|
|
|
Future<int64_t> getLogBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
|
|
|
|
|
|
|
// stopWhenDone will return when the backup is stopped, if enabled. Otherwise, it
|
|
|
|
// will return when the backup directory is restorable.
|
|
|
|
Future<int> waitBackup(Database cx, Key tagName, bool stopWhenDone = true);
|
|
|
|
Future<int> waitSubmitted(Database cx, Key tagName);
|
|
|
|
|
|
|
|
static const Key keyAddPrefix;
|
|
|
|
static const Key keyRemovePrefix;
|
|
|
|
static const Key keyRangeVersions;
|
|
|
|
static const Key keyCopyStop;
|
|
|
|
static const Key keyDatabasesInSync;
|
|
|
|
|
|
|
|
Future<int64_t> getTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->getTaskCount(tr); }
|
|
|
|
Future<int64_t> getTaskCount(Database cx) { return taskBucket->getTaskCount(cx); }
|
|
|
|
Future<Void> watchTaskCount(Reference<ReadYourWritesTransaction> tr) { return taskBucket->watchTaskCount(tr); }
|
|
|
|
|
|
|
|
Future<bool> checkActive(Database cx) { return taskBucket->checkActive(cx); }
|
|
|
|
|
|
|
|
friend class DatabaseBackupAgentImpl;
|
|
|
|
|
|
|
|
Subspace subspace;
|
|
|
|
Subspace states;
|
|
|
|
Subspace config;
|
|
|
|
Subspace errors;
|
|
|
|
Subspace ranges;
|
|
|
|
Subspace tagNames;
|
|
|
|
Subspace sourceStates;
|
|
|
|
Subspace sourceTagNames;
|
|
|
|
|
|
|
|
Reference<TaskBucket> taskBucket;
|
|
|
|
Reference<FutureBucket> futureBucket;
|
|
|
|
};
|
|
|
|
|
|
|
|
typedef std::pair<Standalone<RangeResultRef>, Version> RangeResultWithVersion;
|
|
|
|
|
|
|
|
struct RCGroup {
|
|
|
|
Standalone<RangeResultRef> items;
|
|
|
|
Version version;
|
|
|
|
uint64_t groupKey;
|
|
|
|
|
|
|
|
RCGroup() : version(-1), groupKey(ULLONG_MAX) {};
|
|
|
|
|
|
|
|
template <class Ar>
|
|
|
|
void serialize(Ar& ar) {
|
|
|
|
ar & items & version & groupKey;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
|
|
|
|
Version getVersionFromString(std::string const& value);
|
|
|
|
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key backupUid, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
|
|
|
|
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
|
|
|
|
Key getApplyKey( Version version, Key backupUid );
|
|
|
|
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
|
|
|
|
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
|
|
|
|
void decodeBackupLogValue(Arena& arena, VectorRef<MutationRef>& result, int64_t& mutationSize, StringRef value, StringRef addPrefix = StringRef(), StringRef removePrefix = StringRef());
|
|
|
|
Future<Void> logErrorWorker(Reference<ReadYourWritesTransaction> const& tr, Key const& keyErrors, std::string const& message);
|
|
|
|
Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
|
|
|
|
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);
|
|
|
|
Future<Void> checkVersion(Reference<ReadYourWritesTransaction> const& tr);
|
|
|
|
Future<Void> readCommitted(Database const& cx, PromiseStream<RangeResultWithVersion> const& results, Reference<FlowLock> const& lock, KeyRangeRef const& range, bool const& terminator = false, bool const& systemAccess = false, bool const& lockAware = false);
|
|
|
|
Future<Void> readCommitted(Database const& cx, PromiseStream<RCGroup> const& results, Future<Void> const& active, Reference<FlowLock> const& lock, KeyRangeRef const& range, std::function< std::pair<uint64_t, uint32_t>(Key key) > const& groupBy, bool const& terminator = false, bool const& systemAccess = false, bool const& lockAware = false, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > const& withEachFunction = nullptr);
|
|
|
|
Future<Void> applyMutations(Database const& cx, Key const& uid, Key const& addPrefix, Key const& removePrefix, Version const& beginVersion, Version* const& endVersion, RequestStream<CommitTransactionRequest> const& commit, NotifiedVersion* const& committedVersion, Reference<KeyRangeMap<Version>> const& keyVersion);
|
|
|
|
|
|
|
|
template <typename T>
|
|
|
|
class TaskParam {
|
|
|
|
public:
|
|
|
|
TaskParam(StringRef key) : key(key) {}
|
|
|
|
T get(Reference<Task> task) const {
|
|
|
|
return Codec<T>::unpack(Tuple::unpack(task->params[key]));
|
|
|
|
}
|
|
|
|
void set(Reference<Task> task, T const &val) const {
|
|
|
|
task->params[key] = Codec<T>::pack(val).pack();
|
|
|
|
}
|
|
|
|
bool exists(Reference<Task> task) const {
|
|
|
|
return task->params.find(key) != task->params.end();
|
|
|
|
}
|
|
|
|
T getOrDefault(Reference<Task> task, const T defaultValue = T()) const {
|
|
|
|
if(!exists(task))
|
|
|
|
return defaultValue;
|
|
|
|
return get(task);
|
|
|
|
}
|
|
|
|
StringRef key;
|
|
|
|
};
|
|
|
|
|
2017-09-07 00:46:27 +08:00
|
|
|
typedef BackupAgentBase::enumState EBackupState;
|
|
|
|
template<> inline Tuple Codec<EBackupState>::pack(EBackupState const &val) { return Tuple().append(val); }
|
|
|
|
template<> inline EBackupState Codec<EBackupState>::unpack(Tuple const &val) { return (EBackupState)val.getInt(0); }
|
|
|
|
|
|
|
|
// Key backed tags are a single-key slice of the TagUidMap, defined below.
|
|
|
|
// The Value type of the key is a UidAndAbortedFlagT which is a pair of {UID, aborted_flag}
|
|
|
|
// All tasks on the UID will have a validation key/value that requires aborted_flag to be
|
|
|
|
// false, so changing that value, such as changing the UID or setting aborted_flag to true,
|
|
|
|
// will kill all of the active tasks on that backup/restore UID.
|
|
|
|
typedef std::pair<UID, bool> UidAndAbortedFlagT;
|
|
|
|
class KeyBackedTag : public KeyBackedProperty<UidAndAbortedFlagT> {
|
|
|
|
public:
|
|
|
|
KeyBackedTag() : KeyBackedProperty(StringRef()) {}
|
|
|
|
KeyBackedTag(std::string tagName, StringRef tagMapPrefix);
|
|
|
|
|
|
|
|
Future<Void> cancel(Reference<ReadYourWritesTransaction> tr) {
|
|
|
|
std::string tag = tagName;
|
|
|
|
Key _tagMapPrefix = tagMapPrefix;
|
|
|
|
return map(get(tr), [tag, _tagMapPrefix, tr](Optional<UidAndAbortedFlagT> up) -> Void {
|
|
|
|
if (up.present()) {
|
|
|
|
// Set aborted flag to true
|
|
|
|
up.get().second = true;
|
|
|
|
KeyBackedTag(tag, _tagMapPrefix).set(tr, up.get());
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string tagName;
|
|
|
|
Key tagMapPrefix;
|
|
|
|
};
|
|
|
|
|
|
|
|
typedef KeyBackedMap<std::string, UidAndAbortedFlagT> TagMap;
|
|
|
|
// Map of tagName to {UID, aborted_flag} located in the fileRestorePrefixRange keyspace.
|
|
|
|
class TagUidMap : public KeyBackedMap<std::string, UidAndAbortedFlagT> {
|
|
|
|
public:
|
|
|
|
TagUidMap(const StringRef & prefix) : TagMap(LiteralStringRef("tag->uid/").withPrefix(prefix)), prefix(prefix) {}
|
|
|
|
|
|
|
|
static Future<std::vector<KeyBackedTag>> getAll_impl(TagUidMap * const & tagsMap, Reference<ReadYourWritesTransaction> const & tr);
|
|
|
|
|
|
|
|
Future<std::vector<KeyBackedTag>> getAll(Reference<ReadYourWritesTransaction> tr) {
|
|
|
|
return getAll_impl(this, tr);
|
|
|
|
}
|
|
|
|
|
|
|
|
Key prefix;
|
|
|
|
};
|
|
|
|
|
|
|
|
static inline KeyBackedTag makeRestoreTag(std::string tagName) {
|
|
|
|
return KeyBackedTag(tagName, fileRestorePrefixRange.begin);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline KeyBackedTag makeBackupTag(std::string tagName) {
|
|
|
|
return KeyBackedTag(tagName, fileBackupPrefixRange.begin);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline Future<std::vector<KeyBackedTag>> getAllRestoreTags(Reference<ReadYourWritesTransaction> tr) {
|
|
|
|
return TagUidMap(fileRestorePrefixRange.begin).getAll(tr);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline Future<std::vector<KeyBackedTag>> getAllBackupTags(Reference<ReadYourWritesTransaction> tr) {
|
|
|
|
return TagUidMap(fileBackupPrefixRange.begin).getAll(tr);
|
|
|
|
}
|
|
|
|
|
|
|
|
class KeyBackedConfig {
|
|
|
|
public:
|
|
|
|
static struct {
|
|
|
|
static TaskParam<UID> uid() {return LiteralStringRef(__FUNCTION__); }
|
|
|
|
} TaskParams;
|
|
|
|
|
|
|
|
KeyBackedConfig(StringRef prefix, UID uid = UID()) :
|
|
|
|
uid(uid),
|
|
|
|
prefix(prefix),
|
|
|
|
configSpace(uidPrefixKey(LiteralStringRef("uid->config/").withPrefix(prefix), uid)) {}
|
|
|
|
|
|
|
|
KeyBackedConfig(StringRef prefix, Reference<Task> task) : KeyBackedConfig(prefix, TaskParams.uid().get(task)) {}
|
|
|
|
|
|
|
|
Future<Void> toTask(Reference<ReadYourWritesTransaction> tr, Reference<Task> task) {
|
|
|
|
// Set the uid task parameter
|
|
|
|
TaskParams.uid().set(task, uid);
|
|
|
|
// Set the validation condition for the task which is that the restore uid's tag's uid is the same as the restore uid.
|
|
|
|
// Get this uid's tag, then get the KEY for the tag's uid but don't read it. That becomes the validation key
|
|
|
|
// which TaskBucket will check, and its value must be this restore config's uid.
|
|
|
|
UID u = uid; // 'this' could be invalid in lambda
|
|
|
|
Key p = prefix;
|
|
|
|
return map(tag().get(tr), [u,p,task](Optional<std::string> const &tag) -> Void {
|
|
|
|
if(!tag.present())
|
|
|
|
throw restore_error();
|
|
|
|
// Validation contition is that the uidPair key must be exactly {u, false}
|
|
|
|
TaskBucket::setValidationCondition(task, KeyBackedTag(tag.get(), p).key, Codec<UidAndAbortedFlagT>::pack({u, false}).pack());
|
|
|
|
return Void();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedProperty<std::string> tag() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
UID getUid() { return uid; }
|
|
|
|
|
|
|
|
Key getUidAsKey() { return BinaryWriter::toValue(uid, Unversioned()); }
|
|
|
|
|
|
|
|
void clear(Reference<ReadYourWritesTransaction> tr) {
|
|
|
|
tr->clear(configSpace.range());
|
|
|
|
}
|
|
|
|
|
2017-09-09 07:09:18 +08:00
|
|
|
// lastError is a pair of error message and timestamp expressed as an int64_t
|
|
|
|
KeyBackedProperty<std::pair<std::string, int64_t>> lastError() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
2017-09-07 00:46:27 +08:00
|
|
|
protected:
|
|
|
|
UID uid;
|
|
|
|
Key prefix;
|
|
|
|
Subspace configSpace;
|
|
|
|
};
|
|
|
|
|
|
|
|
class BackupConfig : public KeyBackedConfig {
|
|
|
|
public:
|
|
|
|
BackupConfig(UID uid = UID()) : KeyBackedConfig(fileBackupPrefixRange.begin, uid) {}
|
|
|
|
BackupConfig(Reference<Task> task) : KeyBackedConfig(fileBackupPrefixRange.begin, task) {}
|
|
|
|
|
|
|
|
// rangeFileMap maps a keyrange file's End to its Begin and Filename
|
|
|
|
typedef std::pair<Key, Key> KeyAndFilenameT;
|
|
|
|
typedef KeyBackedMap<Key, KeyAndFilenameT> RangeFileMapT;
|
|
|
|
RangeFileMapT rangeFileMap() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedBinaryValue<int64_t> rangeBytesWritten() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedBinaryValue<int64_t> logBytesWritten() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedProperty<EBackupState> stateEnum() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedProperty<std::string> backupContainer() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop differntial logging if already started or don't start after completing KV ranges
|
|
|
|
KeyBackedProperty<bool> stopWhenDone() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedProperty<Version> stopVersion() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
KeyBackedProperty<std::vector<KeyRange>> backupRanges() {
|
|
|
|
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
|
|
|
}
|
|
|
|
|
|
|
|
void startMutationLogs(Reference<ReadYourWritesTransaction> tr, KeyRangeRef backupRange) {
|
|
|
|
Key mutationLogsDestKey = uidPrefixKey(backupLogKeys.begin, getUid());
|
|
|
|
tr->set(logRangesEncodeKey(backupRange.begin, getUid()), logRangesEncodeValue(backupRange.end, mutationLogsDestKey));
|
|
|
|
}
|
2017-09-09 07:09:18 +08:00
|
|
|
|
|
|
|
Future<Void> logError(Database cx, Error e, std::string details, void *taskInstance = nullptr) {
|
|
|
|
if(!uid.isValid()) {
|
|
|
|
TraceEvent(SevError, "FileBackupErrorNoUID").error(e).detail("Description", details);
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
TraceEvent(SevWarn, "FileBackupError").error(e).detail("BackupUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
|
|
|
|
std::string msg = format("ERROR: %s %s", e.what(), details.c_str());
|
|
|
|
return lastError().set(cx, {msg, (int64_t)now()});
|
|
|
|
}
|
2017-09-07 00:46:27 +08:00
|
|
|
};
|
2017-05-26 04:48:44 +08:00
|
|
|
#endif
|