2017-05-26 04:48:44 +08:00
/*
* DatabaseBackupAgent . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "BackupAgent.h"
# include <ctime>
# include <climits>
# include "fdbrpc/IAsyncFile.h"
# include "flow/genericactors.actor.h"
# include "flow/Hash3.h"
# include <numeric>
# include "ManagementAPI.h"
# include "KeyBackedTypes.h"
const Key DatabaseBackupAgent : : keyAddPrefix = LiteralStringRef ( " add_prefix " ) ;
const Key DatabaseBackupAgent : : keyRemovePrefix = LiteralStringRef ( " remove_prefix " ) ;
const Key DatabaseBackupAgent : : keyRangeVersions = LiteralStringRef ( " range_versions " ) ;
const Key DatabaseBackupAgent : : keyCopyStop = LiteralStringRef ( " copy_stop " ) ;
const Key DatabaseBackupAgent : : keyDatabasesInSync = LiteralStringRef ( " databases_in_sync " ) ;
DatabaseBackupAgent : : DatabaseBackupAgent ( )
: subspace ( Subspace ( databaseBackupPrefixRange . begin ) )
, tagNames ( subspace . get ( BackupAgentBase : : keyTagName ) )
, states ( subspace . get ( BackupAgentBase : : keyStates ) )
, config ( subspace . get ( BackupAgentBase : : keyConfig ) )
, errors ( subspace . get ( BackupAgentBase : : keyErrors ) )
, ranges ( subspace . get ( BackupAgentBase : : keyRanges ) )
, taskBucket ( new TaskBucket ( subspace . get ( BackupAgentBase : : keyTasks ) , true , false , true ) )
, futureBucket ( new FutureBucket ( subspace . get ( BackupAgentBase : : keyFutures ) , true , true ) )
, sourceStates ( subspace . get ( BackupAgentBase : : keySourceStates ) )
, sourceTagNames ( subspace . get ( BackupAgentBase : : keyTagName ) )
{ }
DatabaseBackupAgent : : DatabaseBackupAgent ( Database src )
: subspace ( Subspace ( databaseBackupPrefixRange . begin ) )
, tagNames ( subspace . get ( BackupAgentBase : : keyTagName ) )
, states ( subspace . get ( BackupAgentBase : : keyStates ) )
, config ( subspace . get ( BackupAgentBase : : keyConfig ) )
, errors ( subspace . get ( BackupAgentBase : : keyErrors ) )
, ranges ( subspace . get ( BackupAgentBase : : keyRanges ) )
, taskBucket ( new TaskBucket ( subspace . get ( BackupAgentBase : : keyTasks ) , true , false , true ) )
, futureBucket ( new FutureBucket ( subspace . get ( BackupAgentBase : : keyFutures ) , true , true ) )
, sourceStates ( subspace . get ( BackupAgentBase : : keySourceStates ) )
, sourceTagNames ( subspace . get ( BackupAgentBase : : keyTagName ) )
{
taskBucket - > src = src ;
}
// Any new per-DR properties should go here.
class DRConfig {
public :
DRConfig ( UID uid = UID ( ) ) : uid ( uid ) , configSpace ( uidPrefixKey ( LiteralStringRef ( " uid->config/ " ) . withPrefix ( databaseBackupPrefixRange . begin ) , uid ) ) { }
DRConfig ( Reference < Task > task ) : DRConfig ( BinaryReader : : fromStringRef < UID > ( task - > params [ BackupAgentBase : : keyConfigLogUid ] , Unversioned ( ) ) ) { }
KeyBackedBinaryValue < int64_t > rangeBytesWritten ( ) {
return configSpace . pack ( LiteralStringRef ( __FUNCTION__ ) ) ;
}
KeyBackedBinaryValue < int64_t > logBytesWritten ( ) {
return configSpace . pack ( LiteralStringRef ( __FUNCTION__ ) ) ;
}
void clear ( Reference < ReadYourWritesTransaction > tr ) {
tr - > clear ( configSpace . range ( ) ) ;
}
UID getUid ( ) { return uid ; }
private :
UID uid ;
Subspace configSpace ;
} ;
namespace dbBackup {
bool copyDefaultParameters ( Reference < Task > source , Reference < Task > dest ) {
if ( source ) {
copyParameter ( source , dest , BackupAgentBase : : keyFolderId ) ;
copyParameter ( source , dest , BackupAgentBase : : keyConfigLogUid ) ;
copyParameter ( source , dest , DatabaseBackupAgent : : keyAddPrefix ) ;
copyParameter ( source , dest , DatabaseBackupAgent : : keyRemovePrefix ) ;
return true ;
}
return false ;
}
ACTOR template < class Tr >
Future < Void > checkTaskVersion ( Tr tr , Reference < Task > task , StringRef name , uint32_t version ) {
uint32_t taskVersion = task - > getVersion ( ) ;
if ( taskVersion > version ) {
TraceEvent ( SevError , " BA_BackupRangeTaskFunc_execute " ) . detail ( " taskVersion " , taskVersion ) . detail ( " Name " , printable ( name ) ) . detail ( " Version " , version ) ;
Void _ = wait ( logError ( tr , Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyErrors ) . pack ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ,
format ( " ERROR: %s task version `%lu' is greater than supported version `%lu' " , task - > params [ Task : : reservedTaskParamKeyType ] . toString ( ) . c_str ( ) , ( unsigned long ) taskVersion , ( unsigned long ) version ) ) ) ;
throw task_invalid_version ( ) ;
}
return Void ( ) ;
}
struct BackupRangeTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
static struct {
static TaskParam < int64_t > bytesWritten ( ) { return LiteralStringRef ( __FUNCTION__ ) ; }
} Params ;
static const Key keyAddBackupRangeTasks ;
static const Key keyBackupRangeBeginKey ;
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
ACTOR static Future < Standalone < VectorRef < KeyRef > > > getBlockOfShards ( Reference < ReadYourWritesTransaction > tr , Key beginKey , Key endKey , int limit ) {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Standalone < VectorRef < KeyRef > > results ;
Standalone < RangeResultRef > values = wait ( tr - > getRange ( KeyRangeRef ( keyAfter ( beginKey . withPrefix ( keyServersPrefix ) ) , endKey . withPrefix ( keyServersPrefix ) ) , limit ) ) ;
for ( auto & s : values ) {
KeyRef k = s . key . removePrefix ( keyServersPrefix ) ;
results . push_back_deep ( results . arena ( ) , k ) ;
}
return results ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , Key begin , Key end , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( BackupRangeTaskFunc : : name , BackupRangeTaskFunc : : version , doneKey ) ) ;
copyDefaultParameters ( parentTask , task ) ;
task - > params [ BackupAgentBase : : keyBeginKey ] = begin ;
task - > params [ BackupAgentBase : : keyEndKey ] = end ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Reference < FlowLock > lock ( new FlowLock ( CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ) ;
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( cx , task , BackupRangeTaskFunc : : name , BackupRangeTaskFunc : : version ) ) ;
// Find out if there is a shard boundary in(beginKey, endKey)
Standalone < VectorRef < KeyRef > > keys = wait ( runRYWTransaction ( taskBucket - > src , [ = ] ( Reference < ReadYourWritesTransaction > tr ) { return getBlockOfShards ( tr , task - > params [ DatabaseBackupAgent : : keyBeginKey ] , task - > params [ DatabaseBackupAgent : : keyEndKey ] , CLIENT_KNOBS - > BACKUP_SHARD_TASK_LIMIT ) ; } ) ) ;
if ( keys . size ( ) > 0 ) {
task - > params [ BackupRangeTaskFunc : : keyAddBackupRangeTasks ] = BinaryWriter : : toValue ( keys , IncludeVersion ( ) ) ;
return Void ( ) ;
}
// Read everything from beginKey to endKey, write it to an output file, run the output file processor, and
// then set on_done.If we are still writing after X seconds, end the output file and insert a new backup_range
// task for the remainder.
state double timeout = now ( ) + CLIENT_KNOBS - > BACKUP_RANGE_TIMEOUT ;
state Key addPrefix = task - > params [ DatabaseBackupAgent : : keyAddPrefix ] ;
state Key removePrefix = task - > params [ DatabaseBackupAgent : : keyRemovePrefix ] ;
state KeyRange range ( KeyRangeRef ( task - > params [ BackupAgentBase : : keyBeginKey ] , task - > params [ BackupAgentBase : : keyEndKey ] ) ) ;
// retrieve kvData
state PromiseStream < RangeResultWithVersion > results ;
state Future < Void > rc = readCommitted ( taskBucket - > src , results , lock , range , true , true , true ) ;
state Key rangeBegin = range . begin ;
state Key rangeEnd ;
state bool endOfStream = false ;
state RangeResultWithVersion nextValues ;
state int64_t nextValuesSize = 0 ;
nextValues . second = invalidVersion ;
loop {
if ( endOfStream & & nextValues . second = = invalidVersion ) {
return Void ( ) ;
}
state RangeResultWithVersion values = std : : move ( nextValues ) ;
state int64_t valuesSize = nextValuesSize ;
nextValues = RangeResultWithVersion ( ) ;
nextValues . second = invalidVersion ;
nextValuesSize = 0 ;
if ( ! endOfStream ) {
loop {
try {
RangeResultWithVersion v = waitNext ( results . getFuture ( ) ) ;
int64_t resultSize = v . first . expectedSize ( ) ;
lock - > release ( resultSize ) ;
if ( values . second = = invalidVersion ) {
values = v ;
} else if ( ( values . second ! = v . second ) | | ( valuesSize > 0 & & resultSize > 0 & & valuesSize + resultSize > CLIENT_KNOBS - > BACKUP_LOG_WRITE_BATCH_MAX_SIZE ) ) {
nextValues = v ;
nextValuesSize = resultSize ;
break ;
} else {
values . first . append_deep ( values . first . arena ( ) , v . first . begin ( ) , v . first . size ( ) ) ;
values . first . more = v . first . more ;
}
valuesSize + = resultSize ;
}
catch ( Error & e ) {
state Error err = e ;
if ( err . code ( ) = = error_code_actor_cancelled )
throw err ;
if ( err . code ( ) = = error_code_end_of_stream ) {
endOfStream = true ;
if ( values . second ! = invalidVersion )
break ;
return Void ( ) ;
}
Void _ = wait ( logError ( cx , Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyErrors ) . pack ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) , format ( " ERROR: %s " , err . what ( ) ) ) ) ;
throw err ;
}
}
}
if ( now ( ) > = timeout ) {
task - > params [ BackupRangeTaskFunc : : keyBackupRangeBeginKey ] = rangeBegin ;
return Void ( ) ;
}
rangeEnd = values . first . more ? keyAfter ( values . first . end ( ) [ - 1 ] . key ) : range . end ;
state int valueLoc = 0 ;
state int committedValueLoc = 0 ;
state Reference < ReadYourWritesTransaction > tr = Reference < ReadYourWritesTransaction > ( new ReadYourWritesTransaction ( cx ) ) ;
loop {
try {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Key prefix = task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsKeyVersionMapRange . begin ) ;
state Key rangeCountKey = task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsKeyVersionCountRange . begin ) ;
state Future < Standalone < RangeResultRef > > backupVersions = krmGetRanges ( tr , prefix , KeyRangeRef ( rangeBegin , rangeEnd ) , BUGGIFY ? 2 : 2000 , 1e5 ) ;
state Future < Optional < Value > > logVersionValue = tr - > get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsEndRange . begin ) , true ) ;
state Future < Optional < Value > > rangeCountValue = tr - > get ( rangeCountKey , true ) ;
state Future < Standalone < RangeResultRef > > prevRange = tr - > getRange ( firstGreaterOrEqual ( prefix ) , lastLessOrEqual ( rangeBegin . withPrefix ( prefix ) ) , 1 , true , true ) ;
state Future < Standalone < RangeResultRef > > nextRange = tr - > getRange ( firstGreaterOrEqual ( rangeEnd . withPrefix ( prefix ) ) , firstGreaterOrEqual ( strinc ( prefix ) ) , 1 , true , false ) ;
state Future < bool > verified = taskBucket - > keepRunning ( tr , task ) ;
Void _ = wait ( checkDatabaseLock ( tr , BinaryReader : : fromStringRef < UID > ( task - > params [ BackupAgentBase : : keyConfigLogUid ] , Unversioned ( ) ) ) ) ;
Void _ = wait ( success ( backupVersions ) & & success ( logVersionValue ) & & success ( rangeCountValue ) & & success ( prevRange ) & & success ( nextRange ) & & success ( verified ) ) ;
if ( ! verified . get ( ) ) {
return Void ( ) ;
}
int64_t rangeCount = 0 ;
if ( rangeCountValue . get ( ) . present ( ) ) {
ASSERT ( rangeCountValue . get ( ) . get ( ) . size ( ) = = sizeof ( int64_t ) ) ;
memcpy ( & rangeCount , rangeCountValue . get ( ) . get ( ) . begin ( ) , rangeCountValue . get ( ) . get ( ) . size ( ) ) ;
}
bool prevAdjacent = prevRange . get ( ) . size ( ) & & prevRange . get ( ) [ 0 ] . value . size ( ) & & BinaryReader : : fromStringRef < Version > ( prevRange . get ( ) [ 0 ] . value , Unversioned ( ) ) ! = invalidVersion ;
bool nextAdjacent = nextRange . get ( ) . size ( ) & & nextRange . get ( ) [ 0 ] . value . size ( ) & & BinaryReader : : fromStringRef < Version > ( nextRange . get ( ) [ 0 ] . value , Unversioned ( ) ) ! = invalidVersion ;
if ( ( ! prevAdjacent | | ! nextAdjacent ) & & rangeCount > ( ( prevAdjacent | | nextAdjacent ) ? CLIENT_KNOBS - > BACKUP_MAP_KEY_UPPER_LIMIT : CLIENT_KNOBS - > BACKUP_MAP_KEY_LOWER_LIMIT ) ) {
TEST ( true ) ; //range insert delayed because too versionMap is too large
if ( rangeCount > CLIENT_KNOBS - > BACKUP_MAP_KEY_UPPER_LIMIT )
TraceEvent ( SevWarnAlways , " DBA_KeyRangeMapTooLarge " ) ;
Void _ = wait ( delay ( 1 ) ) ;
task - > params [ BackupRangeTaskFunc : : keyBackupRangeBeginKey ] = rangeBegin ;
return Void ( ) ;
}
Version logVersion = logVersionValue . get ( ) . present ( ) ? BinaryReader : : fromStringRef < Version > ( logVersionValue . get ( ) . get ( ) , Unversioned ( ) ) : - 1 ;
if ( logVersion > = values . second ) {
task - > params [ BackupRangeTaskFunc : : keyBackupRangeBeginKey ] = rangeBegin ;
return Void ( ) ;
}
//TraceEvent("DBA_Range").detail("range", printable(KeyRangeRef(rangeBegin, rangeEnd))).detail("version", values.second).detail("size", values.first.size()).detail("LogUID", printable(task->params[BackupAgentBase::keyConfigLogUid])).detail("addPrefix", printable(addPrefix)).detail("removePrefix", printable(removePrefix));
Subspace krv ( conf . get ( DatabaseBackupAgent : : keyRangeVersions ) ) ;
state KeyRange versionRange = singleKeyRange ( krv . pack ( values . second ) ) ;
tr - > addReadConflictRange ( versionRange ) ;
tr - > addWriteConflictRange ( versionRange ) ;
int versionLoc = 0 ;
std : : vector < Future < Void > > setRanges ;
state int64_t bytesSet = 0 ;
loop {
while ( versionLoc < backupVersions . get ( ) . size ( ) - 1 & & ( backupVersions . get ( ) [ versionLoc ] . value . size ( ) < sizeof ( Version ) | | BinaryReader : : fromStringRef < Version > ( backupVersions . get ( ) [ versionLoc ] . value , Unversioned ( ) ) ! = invalidVersion ) ) {
versionLoc + + ;
}
if ( versionLoc = = backupVersions . get ( ) . size ( ) - 1 )
break ;
if ( backupVersions . get ( ) [ versionLoc + 1 ] . key = = ( removePrefix = = StringRef ( ) ? normalKeys . end : strinc ( removePrefix ) ) ) {
tr - > clear ( KeyRangeRef ( backupVersions . get ( ) [ versionLoc ] . key . removePrefix ( removePrefix ) . withPrefix ( addPrefix ) , addPrefix = = StringRef ( ) ? normalKeys . end : strinc ( addPrefix ) ) ) ;
}
else {
tr - > clear ( KeyRangeRef ( backupVersions . get ( ) [ versionLoc ] . key , backupVersions . get ( ) [ versionLoc + 1 ] . key ) . removePrefix ( removePrefix ) . withPrefix ( addPrefix ) ) ;
}
setRanges . push_back ( krmSetRange ( tr , prefix , KeyRangeRef ( backupVersions . get ( ) [ versionLoc ] . key , backupVersions . get ( ) [ versionLoc + 1 ] . key ) , BinaryWriter : : toValue ( values . second , Unversioned ( ) ) ) ) ;
int64_t added = 1 ;
tr - > atomicOp ( rangeCountKey , StringRef ( ( uint8_t * ) & added , 8 ) , MutationRef : : AddValue ) ;
for ( ; valueLoc < values . first . size ( ) ; + + valueLoc ) {
if ( values . first [ valueLoc ] . key > = backupVersions . get ( ) [ versionLoc + 1 ] . key )
break ;
if ( values . first [ valueLoc ] . key > = backupVersions . get ( ) [ versionLoc ] . key ) {
//TraceEvent("DBA_set", debugID).detail("key", printable(values.first[valueLoc].key)).detail("value", printable(values.first[valueLoc].value));
tr - > set ( values . first [ valueLoc ] . key . removePrefix ( removePrefix ) . withPrefix ( addPrefix ) , values . first [ valueLoc ] . value ) ;
bytesSet + = values . first [ valueLoc ] . expectedSize ( ) - removePrefix . expectedSize ( ) + addPrefix . expectedSize ( ) ;
}
}
versionLoc + + ;
}
Void _ = wait ( waitForAll ( setRanges ) ) ;
Void _ = wait ( tr - > commit ( ) ) ;
Params . bytesWritten ( ) . set ( task , Params . bytesWritten ( ) . getOrDefault ( task ) + bytesSet ) ;
//TraceEvent("DBA_setComplete", debugID).detail("ver", values.second).detail("logVersion", logVersion).detail("readVersion", readVer).detail("commitVer", tr.getCommittedVersion()).detail("range", printable(versionRange));
if ( backupVersions . get ( ) . more ) {
tr - > reset ( ) ;
committedValueLoc = valueLoc ;
rangeBegin = backupVersions . get ( ) . end ( ) [ - 1 ] . key ;
} else {
break ;
}
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
valueLoc = committedValueLoc ;
}
}
rangeBegin = rangeEnd ;
}
}
ACTOR static Future < Void > startBackupRangeInternal ( Reference < ReadYourWritesTransaction > tr , Standalone < VectorRef < KeyRef > > keys , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task , Reference < TaskFuture > onDone ) {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Key nextKey = task - > params [ BackupAgentBase : : keyBeginKey ] ;
std : : vector < Future < Key > > addTaskVector ;
for ( int idx = 0 ; idx < keys . size ( ) ; + + idx ) {
if ( nextKey ! = keys [ idx ] ) {
addTaskVector . push_back ( addTask ( tr , taskBucket , task , nextKey , keys [ idx ] , TaskCompletionKey : : joinWith ( onDone ) ) ) ;
}
nextKey = keys [ idx ] ;
}
if ( nextKey ! = task - > params [ BackupAgentBase : : keyEndKey ] ) {
addTaskVector . push_back ( addTask ( tr , taskBucket , task , nextKey , task - > params [ BackupAgentBase : : keyEndKey ] , TaskCompletionKey : : joinWith ( onDone ) ) ) ;
}
Void _ = wait ( waitForAll ( addTaskVector ) ) ;
return Void ( ) ;
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Reference < TaskFuture > taskFuture = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
// Get the bytesWritten parameter from task and atomically add it to the rangeBytesWritten() property of the DR config.
DRConfig config ( task ) ;
int64_t bytesWritten = Params . bytesWritten ( ) . getOrDefault ( task ) ;
config . rangeBytesWritten ( ) . atomicOp ( tr , bytesWritten , MutationRef : : AddValue ) ;
if ( task - > params . find ( BackupRangeTaskFunc : : keyAddBackupRangeTasks ) ! = task - > params . end ( ) ) {
Void _ = wait ( startBackupRangeInternal ( tr , BinaryReader : : fromStringRef < Standalone < VectorRef < KeyRef > > > ( task - > params [ BackupRangeTaskFunc : : keyAddBackupRangeTasks ] , IncludeVersion ( ) ) , taskBucket , futureBucket , task , taskFuture ) & & taskBucket - > finish ( tr , task ) ) ;
}
else if ( task - > params . find ( BackupRangeTaskFunc : : keyBackupRangeBeginKey ) ! = task - > params . end ( ) & & task - > params [ BackupRangeTaskFunc : : keyBackupRangeBeginKey ] < task - > params [ BackupAgentBase : : keyEndKey ] ) {
ASSERT ( taskFuture - > key . size ( ) > 0 ) ;
Void _ = wait ( success ( BackupRangeTaskFunc : : addTask ( tr , taskBucket , task , task - > params [ BackupRangeTaskFunc : : keyBackupRangeBeginKey ] , task - > params [ BackupAgentBase : : keyEndKey ] , TaskCompletionKey : : signal ( taskFuture - > key ) ) ) & & taskBucket - > finish ( tr , task ) ) ;
}
else {
Void _ = wait ( taskFuture - > set ( tr , taskBucket ) & & taskBucket - > finish ( tr , task ) ) ;
}
return Void ( ) ;
}
} ;
StringRef BackupRangeTaskFunc : : name = LiteralStringRef ( " db_backup_range " ) ;
const uint32_t BackupRangeTaskFunc : : version = 1 ;
const Key BackupRangeTaskFunc : : keyAddBackupRangeTasks = LiteralStringRef ( " addBackupRangeTasks " ) ;
const Key BackupRangeTaskFunc : : keyBackupRangeBeginKey = LiteralStringRef ( " backupRangeBeginKey " ) ;
REGISTER_TASKFUNC ( BackupRangeTaskFunc ) ;
struct FinishFullBackupTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( tr , task , FinishFullBackupTaskFunc : : name , FinishFullBackupTaskFunc : : version ) ) ;
// Enable the stop key
Transaction srcTr ( taskBucket - > src ) ;
srcTr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Version readVersion = wait ( srcTr . getReadVersion ( ) ) ;
tr - > set ( states . pack ( DatabaseBackupAgent : : keyCopyStop ) , BinaryWriter : : toValue ( readVersion , Unversioned ( ) ) ) ;
TraceEvent ( " DBA_FinishFullBackup " ) . detail ( " copyStop " , readVersion ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
// After the BackupRangeTask completes, set the stop key which will stop the BackupLogsTask
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( FinishFullBackupTaskFunc : : name , FinishFullBackupTaskFunc : : version , doneKey ) ) ;
copyDefaultParameters ( parentTask , task ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return Void ( ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef FinishFullBackupTaskFunc : : name = LiteralStringRef ( " db_finish_full_backup " ) ;
const uint32_t FinishFullBackupTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( FinishFullBackupTaskFunc ) ;
struct EraseLogRangeTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
ACTOR static Future < Void > eraseLogData ( Database cx , Reference < Task > task , Version beginVersion , Version endVersion ) {
if ( endVersion < = beginVersion )
return Void ( ) ;
state Transaction tr ( cx ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Standalone < VectorRef < KeyRangeRef > > ranges = getLogRanges ( beginVersion , endVersion , task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
for ( auto & rng : ranges )
tr . clear ( rng ) ;
Void _ = wait ( tr . commit ( ) ) ;
return Void ( ) ;
}
catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
}
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state FlowLock lock ( CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ;
Void _ = wait ( checkTaskVersion ( cx , task , EraseLogRangeTaskFunc : : name , EraseLogRangeTaskFunc : : version ) ) ;
Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
Version endVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyEndVersion ] , Unversioned ( ) ) ;
Void _ = wait ( eraseLogData ( taskBucket - > src , task , beginVersion , endVersion ) ) ;
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , Version beginVersion , Version endVersion , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( EraseLogRangeTaskFunc : : name , EraseLogRangeTaskFunc : : version , doneKey , 1 ) ) ;
copyDefaultParameters ( parentTask , task ) ;
task - > params [ DatabaseBackupAgent : : keyBeginVersion ] = BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ;
task - > params [ DatabaseBackupAgent : : keyEndVersion ] = BinaryWriter : : toValue ( endVersion , Unversioned ( ) ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
state Version endVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyEndVersion ] , Unversioned ( ) ) ;
state Reference < TaskFuture > taskFuture = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
Void _ = wait ( taskFuture - > set ( tr , taskBucket ) & & taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
} ;
StringRef EraseLogRangeTaskFunc : : name = LiteralStringRef ( " db_erase_log_range " ) ;
const uint32_t EraseLogRangeTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( EraseLogRangeTaskFunc ) ;
struct CopyLogRangeTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
static struct {
static TaskParam < int64_t > bytesWritten ( ) { return LiteralStringRef ( __FUNCTION__ ) ; }
} Params ;
static const Key keyNextBeginVersion ;
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
ACTOR static Future < Void > dumpData ( Database cx , Reference < Task > task , PromiseStream < RCGroup > results , FlowLock * lock , Reference < TaskBucket > tb ) {
state bool endOfStream = false ;
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state std : : vector < Standalone < RangeResultRef > > nextMutations ;
state int64_t nextMutationSize = 0 ;
loop {
try {
if ( endOfStream & & ! nextMutationSize ) {
return Void ( ) ;
}
state std : : vector < Standalone < RangeResultRef > > mutations = std : : move ( nextMutations ) ;
state int64_t mutationSize = nextMutationSize ;
nextMutations = std : : vector < Standalone < RangeResultRef > > ( ) ;
nextMutationSize = 0 ;
if ( ! endOfStream ) {
loop {
try {
RCGroup group = waitNext ( results . getFuture ( ) ) ;
lock - > release ( group . items . expectedSize ( ) ) ;
int vecSize = group . items . expectedSize ( ) ;
if ( mutationSize + vecSize > = CLIENT_KNOBS - > BACKUP_LOG_WRITE_BATCH_MAX_SIZE ) {
nextMutations . push_back ( group . items ) ;
nextMutationSize = vecSize ;
break ;
}
mutations . push_back ( group . items ) ;
mutationSize + = vecSize ;
}
catch ( Error & e ) {
state Error error = e ;
if ( e . code ( ) = = error_code_end_of_stream ) {
endOfStream = true ;
break ;
}
throw error ;
}
}
}
state Transaction tr ( cx ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
tr . options . customTransactionSizeLimit = 2 * CLIENT_KNOBS - > TRANSACTION_SIZE_LIMIT ;
Void _ = wait ( checkDatabaseLock ( & tr , BinaryReader : : fromStringRef < UID > ( task - > params [ BackupAgentBase : : keyConfigLogUid ] , Unversioned ( ) ) ) ) ;
state int64_t bytesSet = 0 ;
bool first = true ;
for ( auto m : mutations ) {
for ( auto kv : m ) {
if ( first ) {
tr . addReadConflictRange ( singleKeyRange ( kv . key ) ) ;
first = false ;
}
tr . set ( kv . key . removePrefix ( backupLogKeys . begin ) . withPrefix ( applyLogKeys . begin ) , kv . value ) ;
bytesSet + = kv . expectedSize ( ) - backupLogKeys . begin . expectedSize ( ) + applyLogKeys . begin . expectedSize ( ) ;
}
}
Void _ = wait ( tr . commit ( ) ) ;
Params . bytesWritten ( ) . set ( task , Params . bytesWritten ( ) . getOrDefault ( task ) + bytesSet ) ;
break ;
}
catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_actor_cancelled | | e . code ( ) = = error_code_backup_error )
throw e ;
state Error err = e ;
Void _ = wait ( logError ( cx , Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyErrors ) . pack ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) , format ( " ERROR: Failed to dump mutations because of error %s " , err . what ( ) ) ) ) ;
throw err ;
}
}
}
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Reference < FlowLock > lock ( new FlowLock ( CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ) ;
Void _ = wait ( checkTaskVersion ( cx , task , CopyLogRangeTaskFunc : : name , CopyLogRangeTaskFunc : : version ) ) ;
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
state Version endVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyEndVersion ] , Unversioned ( ) ) ;
state Version newEndVersion = std : : min ( endVersion , ( ( ( beginVersion - 1 ) / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) + 2 + ( g_network - > isSimulated ( ) ? CLIENT_KNOBS - > BACKUP_SIM_COPY_LOG_RANGES : 0 ) ) * CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ;
state Standalone < VectorRef < KeyRangeRef > > ranges = getLogRanges ( beginVersion , newEndVersion , task - > params [ BackupAgentBase : : keyConfigLogUid ] , CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ;
state std : : vector < PromiseStream < RCGroup > > results ;
state std : : vector < Future < Void > > rc ;
state std : : vector < Future < Void > > dump ;
for ( int i = 0 ; i < ranges . size ( ) ; + + i ) {
results . push_back ( PromiseStream < RCGroup > ( ) ) ;
rc . push_back ( readCommitted ( taskBucket - > src , results [ i ] , Future < Void > ( Void ( ) ) , lock , ranges [ i ] , decodeBKMutationLogKey , false , true , true , nullptr ) ) ;
dump . push_back ( dumpData ( cx , task , results [ i ] , lock . getPtr ( ) , taskBucket ) ) ;
}
state Future < Void > dumpComplete = waitForAll ( dump ) ;
try {
loop {
choose {
when ( Void _ = wait ( dumpComplete ) ) { break ; }
when ( Void _ = wait ( delay ( ( CLIENT_KNOBS - > TASKBUCKET_TIMEOUT_VERSIONS / 2 ) / CLIENT_KNOBS - > CORE_VERSIONSPERSECOND ) ) ) {
bool saveResult = wait ( taskBucket - > saveAndExtend ( cx , task ) ) ;
if ( ! saveResult ) {
return Void ( ) ;
}
}
}
}
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_backup_error )
return Void ( ) ;
throw ;
}
if ( newEndVersion < endVersion ) {
task - > params [ CopyLogRangeTaskFunc : : keyNextBeginVersion ] = BinaryWriter : : toValue ( newEndVersion , Unversioned ( ) ) ;
}
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , Version beginVersion , Version endVersion , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( CopyLogRangeTaskFunc : : name , CopyLogRangeTaskFunc : : version , doneKey , 1 ) ) ;
copyDefaultParameters ( parentTask , task ) ;
task - > params [ DatabaseBackupAgent : : keyBeginVersion ] = BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ;
task - > params [ DatabaseBackupAgent : : keyEndVersion ] = BinaryWriter : : toValue ( endVersion , Unversioned ( ) ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
state Version endVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyEndVersion ] , Unversioned ( ) ) ;
state Reference < TaskFuture > taskFuture = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
// Get the bytesWritten parameter from task and atomically add it to the logBytesWritten() property of the DR config.
DRConfig config ( task ) ;
int64_t bytesWritten = Params . bytesWritten ( ) . getOrDefault ( task ) ;
config . logBytesWritten ( ) . atomicOp ( tr , bytesWritten , MutationRef : : AddValue ) ;
if ( task - > params . find ( CopyLogRangeTaskFunc : : keyNextBeginVersion ) ! = task - > params . end ( ) ) {
state Version nextVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ CopyLogRangeTaskFunc : : keyNextBeginVersion ] , Unversioned ( ) ) ;
Void _ = wait ( success ( CopyLogRangeTaskFunc : : addTask ( tr , taskBucket , task , nextVersion , endVersion , TaskCompletionKey : : signal ( taskFuture - > key ) ) ) & &
success ( EraseLogRangeTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , nextVersion , TaskCompletionKey : : noSignal ( ) ) ) & &
taskBucket - > finish ( tr , task ) ) ;
}
else {
Void _ = wait ( success ( EraseLogRangeTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , endVersion , TaskCompletionKey : : noSignal ( ) ) ) & &
taskFuture - > set ( tr , taskBucket ) & &
taskBucket - > finish ( tr , task ) ) ;
}
return Void ( ) ;
}
} ;
StringRef CopyLogRangeTaskFunc : : name = LiteralStringRef ( " db_copy_log_range " ) ;
const uint32_t CopyLogRangeTaskFunc : : version = 1 ;
const Key CopyLogRangeTaskFunc : : keyNextBeginVersion = LiteralStringRef ( " nextBeginVersion " ) ;
REGISTER_TASKFUNC ( CopyLogRangeTaskFunc ) ;
struct CopyLogsTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( tr , task , CopyLogsTaskFunc : : name , CopyLogsTaskFunc : : version ) ) ;
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
state Future < Optional < Value > > fStopValue = tr - > get ( states . pack ( DatabaseBackupAgent : : keyCopyStop ) ) ;
state Future < Optional < Value > > fAppliedValue = tr - > get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsBeginRange . begin ) ) ;
Transaction srcTr ( taskBucket - > src ) ;
srcTr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Version endVersion = wait ( srcTr . getReadVersion ( ) ) ;
state Reference < TaskFuture > onDone = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
if ( endVersion < = beginVersion ) {
Void _ = wait ( delay ( FLOW_KNOBS - > PREVENT_FAST_SPIN_DELAY ) ) ;
Key _ = wait ( CopyLogsTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , TaskCompletionKey : : signal ( onDone ) ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
Optional < Value > appliedValue = wait ( fAppliedValue ) ;
state Version appliedVersion = appliedValue . present ( ) ? BinaryReader : : fromStringRef < Version > ( appliedValue . get ( ) , Unversioned ( ) ) : 100 ;
state Version applyVersion = std : : max < Version > ( appliedVersion , beginVersion - CLIENT_KNOBS - > BACKUP_VERSION_DELAY ) ;
Subspace krv = conf . get ( DatabaseBackupAgent : : keyRangeVersions ) ;
KeyRange versionRange = KeyRangeRef ( krv . pack ( 0 ) , krv . pack ( applyVersion + 1 ) ) ;
tr - > addReadConflictRange ( versionRange ) ;
tr - > addWriteConflictRange ( versionRange ) ;
tr - > set ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsEndRange . begin ) , BinaryWriter : : toValue ( applyVersion , Unversioned ( ) ) ) ;
Optional < Value > stopValue = wait ( fStopValue ) ;
state Version stopVersionData = stopValue . present ( ) ? BinaryReader : : fromStringRef < Version > ( stopValue . get ( ) , Unversioned ( ) ) : - 1 ;
if ( endVersion - beginVersion > g_random - > randomInt64 ( 0 , CLIENT_KNOBS - > BACKUP_VERSION_DELAY ) ) {
TraceEvent ( " DBA_CopyLogs " ) . detail ( " beginVersion " , beginVersion ) . detail ( " applyVersion " , applyVersion ) . detail ( " endVersion " , endVersion ) . detail ( " stopVersionData " , stopVersionData ) . detail ( " LogUID " , printable ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ) ;
}
if ( ( stopVersionData = = - 1 ) | | ( stopVersionData > = applyVersion ) ) {
state Reference < TaskFuture > allPartsDone = futureBucket - > future ( tr ) ;
std : : vector < Future < Key > > addTaskVector ;
addTaskVector . push_back ( CopyLogsTaskFunc : : addTask ( tr , taskBucket , task , endVersion , TaskCompletionKey : : signal ( onDone ) , allPartsDone ) ) ;
int blockSize = std : : max < int > ( 1 , ( ( endVersion - beginVersion ) / CLIENT_KNOBS - > BACKUP_COPY_TASKS ) / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ;
for ( int64_t vblock = beginVersion / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ; vblock < ( endVersion + CLIENT_KNOBS - > BACKUP_BLOCK_SIZE - 1 ) / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ; vblock + = blockSize ) {
addTaskVector . push_back ( CopyLogRangeTaskFunc : : addTask ( tr , taskBucket , task ,
std : : max ( beginVersion , vblock * CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ,
std : : min ( endVersion , ( vblock + blockSize ) * CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ,
TaskCompletionKey : : joinWith ( allPartsDone ) ) ) ;
}
Void _ = wait ( waitForAll ( addTaskVector ) & & taskBucket - > finish ( tr , task ) ) ;
} else {
if ( appliedVersion < = stopVersionData ) {
Void _ = wait ( delay ( FLOW_KNOBS - > PREVENT_FAST_SPIN_DELAY ) ) ;
Key _ = wait ( CopyLogsTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , TaskCompletionKey : : signal ( onDone ) ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
Void _ = wait ( onDone - > set ( tr , taskBucket ) & & taskBucket - > finish ( tr , task ) ) ;
tr - > set ( states . pack ( DatabaseBackupAgent : : keyStateStop ) , BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ) ;
}
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , Version beginVersion , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( CopyLogsTaskFunc : : name , CopyLogsTaskFunc : : version , doneKey , 1 ) ) ;
copyDefaultParameters ( parentTask , task ) ;
task - > params [ BackupAgentBase : : keyBeginVersion ] = BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return Void ( ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef CopyLogsTaskFunc : : name = LiteralStringRef ( " db_copy_logs " ) ;
const uint32_t CopyLogsTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( CopyLogsTaskFunc ) ;
struct FinishedFullBackupTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
static const Key keyInsertTask ;
StringRef getName ( ) const { return name ; } ;
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace sourceStates = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keySourceStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( cx , task , FinishedFullBackupTaskFunc : : name , FinishedFullBackupTaskFunc : : version ) ) ;
state Transaction tr2 ( cx ) ;
loop {
try {
tr2 . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > beginValue = wait ( tr2 . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsBeginRange . begin ) ) ) ;
state Version appliedVersion = beginValue . present ( ) ? BinaryReader : : fromStringRef < Version > ( beginValue . get ( ) , Unversioned ( ) ) : - 1 ;
Optional < Value > endValue = wait ( tr2 . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsEndRange . begin ) ) ) ;
Version endVersion = endValue . present ( ) ? BinaryReader : : fromStringRef < Version > ( endValue . get ( ) , Unversioned ( ) ) : - 1 ;
//TraceEvent("DBA_FinishedFullBackup").detail("applied", appliedVersion).detail("endVer", endVersion);
if ( appliedVersion < endVersion ) {
Void _ = wait ( delay ( FLOW_KNOBS - > PREVENT_FAST_SPIN_DELAY ) ) ;
task - > params [ FinishedFullBackupTaskFunc : : keyInsertTask ] = StringRef ( ) ;
return Void ( ) ;
}
break ;
} catch ( Error & e ) {
Void _ = wait ( tr2 . onError ( e ) ) ;
}
}
state Transaction tr ( taskBucket - > src ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > v = wait ( tr . get ( sourceStates . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
if ( v . present ( ) & & BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) > BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyFolderId ] , Unversioned ( ) ) )
return Void ( ) ;
UID logUid = BinaryReader : : fromStringRef < UID > ( task - > params [ DatabaseBackupAgent : : keyConfigLogUid ] , Unversioned ( ) ) ;
Key configPath = uidPrefixKey ( logRangesRange . begin , logUid ) ;
Key logsPath = uidPrefixKey ( backupLogKeys . begin , logUid ) ;
tr . set ( sourceStates . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_COMPLETED ) ) ) ;
tr . clear ( KeyRangeRef ( configPath , strinc ( configPath ) ) ) ;
tr . clear ( KeyRangeRef ( logsPath , strinc ( logsPath ) ) ) ;
Void _ = wait ( tr . commit ( ) ) ;
break ;
} catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( FinishedFullBackupTaskFunc : : name , FinishedFullBackupTaskFunc : : version , doneKey ) ) ;
copyDefaultParameters ( parentTask , task ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
if ( task - > params . find ( FinishedFullBackupTaskFunc : : keyInsertTask ) ! = task - > params . end ( ) ) {
state Reference < TaskFuture > onDone = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
Key _ = wait ( FinishedFullBackupTaskFunc : : addTask ( tr , taskBucket , task , TaskCompletionKey : : signal ( onDone ) ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
UID logUid = BinaryReader : : fromStringRef < UID > ( task - > params [ DatabaseBackupAgent : : keyConfigLogUid ] , Unversioned ( ) ) ;
Key logsPath = uidPrefixKey ( applyLogKeys . begin , logUid ) ;
tr - > clear ( KeyRangeRef ( logsPath , strinc ( logsPath ) ) ) ;
tr - > clear ( conf . range ( ) ) ;
tr - > set ( states . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_COMPLETED ) ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef FinishedFullBackupTaskFunc : : name = LiteralStringRef ( " db_finished_full_backup " ) ;
const uint32_t FinishedFullBackupTaskFunc : : version = 1 ;
const Key FinishedFullBackupTaskFunc : : keyInsertTask = LiteralStringRef ( " insertTask " ) ;
REGISTER_TASKFUNC ( FinishedFullBackupTaskFunc ) ;
struct CopyDiffLogsTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( tr , task , CopyDiffLogsTaskFunc : : name , CopyDiffLogsTaskFunc : : version ) ) ;
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyBeginVersion ] , Unversioned ( ) ) ;
state Future < Optional < Value > > fStopWhenDone = tr - > get ( conf . pack ( DatabaseBackupAgent : : keyConfigStopWhenDoneKey ) ) ;
Transaction srcTr ( taskBucket - > src ) ;
srcTr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Version endVersion = wait ( srcTr . getReadVersion ( ) ) ;
state Reference < TaskFuture > onDone = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
if ( endVersion < = beginVersion ) {
Void _ = wait ( delay ( FLOW_KNOBS - > PREVENT_FAST_SPIN_DELAY ) ) ;
Key _ = wait ( CopyDiffLogsTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , TaskCompletionKey : : signal ( onDone ) ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
tr - > set ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsEndRange . begin ) , BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ) ;
Optional < Value > stopWhenDone = wait ( fStopWhenDone ) ;
if ( endVersion - beginVersion > g_random - > randomInt64 ( 0 , CLIENT_KNOBS - > BACKUP_VERSION_DELAY ) ) {
TraceEvent ( " DBA_CopyDiffLogs " ) . detail ( " beginVersion " , beginVersion ) . detail ( " endVersion " , endVersion ) . detail ( " LogUID " , printable ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ) ;
}
if ( ! stopWhenDone . present ( ) ) {
state Reference < TaskFuture > allPartsDone = futureBucket - > future ( tr ) ;
std : : vector < Future < Key > > addTaskVector ;
addTaskVector . push_back ( CopyDiffLogsTaskFunc : : addTask ( tr , taskBucket , task , endVersion , TaskCompletionKey : : signal ( onDone ) , allPartsDone ) ) ;
int blockSize = std : : max < int > ( 1 , ( ( endVersion - beginVersion ) / CLIENT_KNOBS - > BACKUP_COPY_TASKS ) / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ;
for ( int64_t vblock = beginVersion / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ; vblock < ( endVersion + CLIENT_KNOBS - > BACKUP_BLOCK_SIZE - 1 ) / CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ; vblock + = blockSize ) {
addTaskVector . push_back ( CopyLogRangeTaskFunc : : addTask ( tr , taskBucket , task ,
std : : max ( beginVersion , vblock * CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ,
std : : min ( endVersion , ( vblock + blockSize ) * CLIENT_KNOBS - > BACKUP_BLOCK_SIZE ) ,
TaskCompletionKey : : joinWith ( allPartsDone ) ) ) ;
}
Void _ = wait ( waitForAll ( addTaskVector ) & & taskBucket - > finish ( tr , task ) ) ;
} else {
Void _ = wait ( onDone - > set ( tr , taskBucket ) & & taskBucket - > finish ( tr , task ) ) ;
}
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , Version beginVersion , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( CopyDiffLogsTaskFunc : : name , CopyDiffLogsTaskFunc : : version , doneKey , 1 ) ) ;
copyDefaultParameters ( parentTask , task ) ;
task - > params [ DatabaseBackupAgent : : keyBeginVersion ] = BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return Void ( ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef CopyDiffLogsTaskFunc : : name = LiteralStringRef ( " db_copy_diff_logs " ) ;
const uint32_t CopyDiffLogsTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( CopyDiffLogsTaskFunc ) ;
struct BackupRestorableTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace sourceStates = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keySourceStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( cx , task , BackupRestorableTaskFunc : : name , BackupRestorableTaskFunc : : version ) ) ;
state Transaction tr ( taskBucket - > src ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
tr . addReadConflictRange ( singleKeyRange ( sourceStates . pack ( DatabaseBackupAgent : : keyStateStatus ) ) ) ;
tr . set ( sourceStates . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_DIFFERENTIAL ) ) ) ;
Void _ = wait ( tr . commit ( ) ) ;
return Void ( ) ;
}
catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace conf = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( tr , task , BackupRestorableTaskFunc : : name , BackupRestorableTaskFunc : : version ) ) ;
state Reference < TaskFuture > onDone = futureBucket - > unpack ( task - > params [ Task : : reservedTaskParamKeyDone ] ) ;
state Optional < Value > stopValue = wait ( tr - > get ( states . pack ( DatabaseBackupAgent : : keyStateStop ) ) ) ;
state Version restoreVersion = stopValue . present ( ) ? BinaryReader : : fromStringRef < Version > ( stopValue . get ( ) , Unversioned ( ) ) : - 1 ;
state Optional < Value > stopWhenDone = wait ( tr - > get ( conf . pack ( DatabaseBackupAgent : : keyConfigStopWhenDoneKey ) ) ) ;
state Reference < TaskFuture > allPartsDone ;
state UID logUid = BinaryReader : : fromStringRef < UID > ( task - > params [ DatabaseBackupAgent : : keyConfigLogUid ] , Unversioned ( ) ) ;
TraceEvent ( " DBA_Complete " ) . detail ( " restoreVersion " , restoreVersion ) . detail ( " differential " , stopWhenDone . present ( ) ) . detail ( " LogUID " , printable ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ) ;
// Start the complete task, if differential is not enabled
if ( stopWhenDone . present ( ) ) {
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait ( FinishedFullBackupTaskFunc : : addTask ( tr , taskBucket , task , TaskCompletionKey : : noSignal ( ) ) ) ;
}
else { // Start the writing of logs, if differential
tr - > set ( states . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_DIFFERENTIAL ) ) ) ;
allPartsDone = futureBucket - > future ( tr ) ;
Key _ = wait ( CopyDiffLogsTaskFunc : : addTask ( tr , taskBucket , task , restoreVersion , TaskCompletionKey : : joinWith ( allPartsDone ) ) ) ;
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait ( FinishedFullBackupTaskFunc : : addTask ( tr , taskBucket , task , TaskCompletionKey : : noSignal ( ) , allPartsDone ) ) ;
}
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < Task > parentTask , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) ) {
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( BackupRestorableTaskFunc : : name , BackupRestorableTaskFunc : : version , doneKey ) ) ;
copyDefaultParameters ( parentTask , task ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , parentTask - > params [ Task : : reservedTaskParamValidKey ] , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef BackupRestorableTaskFunc : : name = LiteralStringRef ( " db_backup_restorable " ) ;
const uint32_t BackupRestorableTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( BackupRestorableTaskFunc ) ;
struct StartFullBackupTaskFunc : TaskFuncBase {
static StringRef name ;
static const uint32_t version ;
ACTOR static Future < Void > _execute ( Database cx , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace sourceStates = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keySourceStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
Void _ = wait ( checkTaskVersion ( cx , task , StartFullBackupTaskFunc : : name , StartFullBackupTaskFunc : : version ) ) ;
state UID logUid = BinaryReader : : fromStringRef < UID > ( task - > params [ DatabaseBackupAgent : : keyConfigLogUid ] , Unversioned ( ) ) ;
state Key logUidDest = uidPrefixKey ( backupLogKeys . begin , logUid ) ;
state Standalone < VectorRef < KeyRangeRef > > backupRanges = BinaryReader : : fromStringRef < Standalone < VectorRef < KeyRangeRef > > > ( task - > params [ DatabaseBackupAgent : : keyConfigBackupRanges ] , IncludeVersion ( ) ) ;
state Transaction tr ( taskBucket - > src ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > v = wait ( tr . get ( sourceStates . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
task - > params [ DatabaseBackupAgent : : keyBeginVersion ] = BinaryWriter : : toValue ( tr . getReadVersion ( ) . get ( ) , Unversioned ( ) ) ;
if ( v . present ( ) & & BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) > = BinaryReader : : fromStringRef < Version > ( task - > params [ DatabaseBackupAgent : : keyFolderId ] , Unversioned ( ) ) )
return Void ( ) ;
tr . set ( Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keySourceTagName ) . pack ( task - > params [ BackupAgentBase : : keyTagName ] ) , task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
tr . set ( sourceStates . pack ( DatabaseBackupAgent : : keyFolderId ) , task - > params [ DatabaseBackupAgent : : keyFolderId ] ) ;
tr . set ( sourceStates . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_BACKUP ) ) ) ;
// Start logging the mutations for the specified ranges of the tag
for ( auto & backupRange : backupRanges ) {
tr . set ( logRangesEncodeKey ( backupRange . begin , logUid ) , logRangesEncodeValue ( backupRange . end , logUidDest ) ) ;
}
Void _ = wait ( tr . commit ( ) ) ;
return Void ( ) ;
} catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
}
ACTOR static Future < Void > _finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Reference < FutureBucket > futureBucket , Reference < Task > task ) {
state Subspace states = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyStates ) . get ( task - > params [ BackupAgentBase : : keyConfigLogUid ] ) ;
state Version beginVersion = BinaryReader : : fromStringRef < Version > ( task - > params [ BackupAgentBase : : keyBeginVersion ] , Unversioned ( ) ) ;
state Standalone < VectorRef < KeyRangeRef > > backupRanges = BinaryReader : : fromStringRef < Standalone < VectorRef < KeyRangeRef > > > ( task - > params [ DatabaseBackupAgent : : keyConfigBackupRanges ] , IncludeVersion ( ) ) ;
TraceEvent ( " DBA_StartFullBackup " ) . detail ( " beginVer " , beginVersion ) ;
tr - > set ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsBeginRange . begin ) , BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ) ;
tr - > set ( task - > params [ BackupAgentBase : : keyConfigLogUid ] . withPrefix ( applyMutationsEndRange . begin ) , BinaryWriter : : toValue ( beginVersion , Unversioned ( ) ) ) ;
tr - > set ( states . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_BACKUP ) ) ) ;
state Reference < TaskFuture > kvBackupRangeComplete = futureBucket - > future ( tr ) ;
state Reference < TaskFuture > kvBackupComplete = futureBucket - > future ( tr ) ;
state int rangeCount = 0 ;
if ( task - > params [ DatabaseBackupAgent : : keyDatabasesInSync ] ! = std : : string ( " t " ) ) {
for ( ; rangeCount < backupRanges . size ( ) ; + + rangeCount ) {
Key _ = wait ( BackupRangeTaskFunc : : addTask ( tr , taskBucket , task , backupRanges [ rangeCount ] . begin , backupRanges [ rangeCount ] . end , TaskCompletionKey : : joinWith ( kvBackupRangeComplete ) ) ) ;
}
}
else {
kvBackupRangeComplete - > set ( tr , taskBucket ) ;
}
// After the BackupRangeTask completes, set the stop key which will stop the BackupLogsTask
Key _ = wait ( FinishFullBackupTaskFunc : : addTask ( tr , taskBucket , task , TaskCompletionKey : : noSignal ( ) , kvBackupRangeComplete ) ) ;
// Backup the logs which will create BackupLogRange tasks
Key _ = wait ( CopyLogsTaskFunc : : addTask ( tr , taskBucket , task , beginVersion , TaskCompletionKey : : joinWith ( kvBackupComplete ) ) ) ;
// After the Backup completes, clear the backup subspace and update the status
Key _ = wait ( BackupRestorableTaskFunc : : addTask ( tr , taskBucket , task , TaskCompletionKey : : noSignal ( ) , kvBackupComplete ) ) ;
Void _ = wait ( taskBucket - > finish ( tr , task ) ) ;
return Void ( ) ;
}
ACTOR static Future < Key > addTask ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > taskBucket , Key logUid , Key backupUid , Key keyAddPrefix , Key keyRemovePrefix , Key keyConfigBackupRanges , Key tagName , TaskCompletionKey completionKey , Reference < TaskFuture > waitFor = Reference < TaskFuture > ( ) , bool databasesInSync = false )
{
Key doneKey = wait ( completionKey . get ( tr , taskBucket ) ) ;
Reference < Task > task ( new Task ( StartFullBackupTaskFunc : : name , StartFullBackupTaskFunc : : version , doneKey ) ) ;
task - > params [ BackupAgentBase : : keyFolderId ] = backupUid ;
task - > params [ BackupAgentBase : : keyConfigLogUid ] = logUid ;
task - > params [ DatabaseBackupAgent : : keyAddPrefix ] = keyAddPrefix ;
task - > params [ DatabaseBackupAgent : : keyRemovePrefix ] = keyRemovePrefix ;
task - > params [ BackupAgentBase : : keyConfigBackupRanges ] = keyConfigBackupRanges ;
task - > params [ BackupAgentBase : : keyTagName ] = tagName ;
task - > params [ DatabaseBackupAgent : : keyDatabasesInSync ] = databasesInSync ? LiteralStringRef ( " t " ) : LiteralStringRef ( " f " ) ;
if ( ! waitFor ) {
return taskBucket - > addTask ( tr , task , Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( logUid ) . pack ( BackupAgentBase : : keyFolderId ) , task - > params [ BackupAgentBase : : keyFolderId ] ) ;
}
Void _ = wait ( waitFor - > onSetAddTask ( tr , taskBucket , task , Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyConfig ) . get ( logUid ) . pack ( BackupAgentBase : : keyFolderId ) , task - > params [ BackupAgentBase : : keyFolderId ] ) ) ;
return LiteralStringRef ( " OnSetAddTask " ) ;
}
StringRef getName ( ) const { return name ; } ;
Future < Void > execute ( Database cx , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _execute ( cx , tb , fb , task ) ; } ;
Future < Void > finish ( Reference < ReadYourWritesTransaction > tr , Reference < TaskBucket > tb , Reference < FutureBucket > fb , Reference < Task > task ) { return _finish ( tr , tb , fb , task ) ; } ;
} ;
StringRef StartFullBackupTaskFunc : : name = LiteralStringRef ( " db_start_full_backup " ) ;
const uint32_t StartFullBackupTaskFunc : : version = 1 ;
REGISTER_TASKFUNC ( StartFullBackupTaskFunc ) ;
}
class DatabaseBackupAgentImpl {
public :
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8 ;
// This method will return the final status of the backup
ACTOR static Future < int > waitBackup ( DatabaseBackupAgent * backupAgent , Database cx , Key tagName , bool stopWhenDone ) {
state std : : string backTrace ;
state UID logUid = wait ( backupAgent - > getLogUid ( cx , tagName ) ) ;
state Key statusKey = backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyStateStatus ) ;
loop {
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
try {
state int status = wait ( backupAgent - > getStateValue ( tr , logUid ) ) ;
// Break, if no longer runnable
if ( ! DatabaseBackupAgent : : isRunnable ( ( BackupAgentBase : : enumState ) status ) | | BackupAgentBase : : STATE_PARTIALLY_ABORTED = = status ) {
return status ;
}
// Break, if in differential mode (restorable) and stopWhenDone is not enabled
if ( ( ! stopWhenDone ) & & ( BackupAgentBase : : STATE_DIFFERENTIAL = = status ) ) {
return status ;
}
state Future < Void > watchFuture = tr - > watch ( statusKey ) ;
Void _ = wait ( tr - > commit ( ) ) ;
Void _ = wait ( watchFuture ) ;
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
}
}
}
// This method will return the final status of the backup
ACTOR static Future < int > waitSubmitted ( DatabaseBackupAgent * backupAgent , Database cx , Key tagName ) {
state UID logUid = wait ( backupAgent - > getLogUid ( cx , tagName ) ) ;
state Key statusKey = backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyStateStatus ) ;
loop {
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
try {
state int status = wait ( backupAgent - > getStateValue ( tr , logUid ) ) ;
// Break, if no longer runnable
if ( BackupAgentBase : : STATE_SUBMITTED ! = status ) {
return status ;
}
state Future < Void > watchFuture = tr - > watch ( statusKey ) ;
Void _ = wait ( tr - > commit ( ) ) ;
Void _ = wait ( watchFuture ) ;
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
}
}
}
ACTOR static Future < Void > submitBackup ( DatabaseBackupAgent * backupAgent , Reference < ReadYourWritesTransaction > tr , Key tagName , Standalone < VectorRef < KeyRangeRef > > backupRanges , bool stopWhenDone , Key addPrefix , Key removePrefix , bool lockDB , bool databasesInSync ) {
state UID logUid = g_random - > randomUniqueID ( ) ;
state Key logUidValue = BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ;
state UID logUidCurrent = wait ( backupAgent - > getLogUid ( tr , tagName ) ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
// We will use the global status for now to ensure that multiple backups do not start place with different tags
state int status = wait ( backupAgent - > getStateValue ( tr , logUidCurrent ) ) ;
if ( DatabaseBackupAgent : : isRunnable ( ( BackupAgentBase : : enumState ) status ) ) {
throw backup_duplicate ( ) ;
}
if ( logUidCurrent . isValid ( ) ) {
logUid = logUidCurrent ;
logUidValue = BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ;
}
Optional < Key > v = wait ( tr - > get ( backupAgent - > states . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
Version uidVersion = 0 ;
if ( v . present ( ) )
uidVersion = BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) + 1 ;
state Standalone < StringRef > backupUid = BinaryWriter : : toValue ( uidVersion , Unversioned ( ) ) ;
KeyRangeMap < int > backupRangeSet ;
for ( auto & backupRange : backupRanges ) {
backupRangeSet . insert ( backupRange , 1 ) ;
}
backupRangeSet . coalesce ( allKeys ) ;
backupRanges = Standalone < VectorRef < KeyRangeRef > > ( ) ;
for ( auto & backupRange : backupRangeSet . ranges ( ) ) {
if ( backupRange . value ( ) ) {
backupRanges . push_back_deep ( backupRanges . arena ( ) , backupRange . range ( ) ) ;
}
}
if ( ! databasesInSync ) {
// Make sure all of the ranges are empty before we backup into them.
state std : : vector < Future < Standalone < RangeResultRef > > > backupIntoResults ;
for ( auto & backupRange : backupRanges ) {
backupIntoResults . push_back ( tr - > getRange ( backupRange . removePrefix ( removePrefix ) . withPrefix ( addPrefix ) , 1 ) ) ;
}
Void _ = wait ( waitForAll ( backupIntoResults ) ) ;
for ( auto result : backupIntoResults ) {
if ( result . get ( ) . size ( ) > 0 ) {
// One of the ranges we will be backing up into has pre-existing data.
throw restore_destination_not_empty ( ) ;
}
}
}
// Clear the backup ranges for the tag
tr - > clear ( backupAgent - > config . get ( logUidValue ) . range ( ) ) ;
tr - > clear ( backupAgent - > states . get ( logUidValue ) . range ( ) ) ;
tr - > clear ( backupAgent - > errors . range ( ) ) ;
tr - > set ( backupAgent - > tagNames . pack ( tagName ) , logUidValue ) ;
// Clear DRConfig for this UID, which unfortunately only contains some newer vars and not the stuff below.
DRConfig ( logUid ) . clear ( tr ) ;
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyAddPrefix ) , addPrefix ) ;
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyRemovePrefix ) , removePrefix ) ;
tr - > set ( backupAgent - > states . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyConfigBackupTag ) , tagName ) ;
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyConfigLogUid ) , logUidValue ) ;
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyFolderId ) , backupUid ) ;
tr - > set ( backupAgent - > states . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyFolderId ) , backupUid ) ; // written to config and states because it's also used by abort
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyConfigBackupRanges ) , BinaryWriter : : toValue ( backupRanges , IncludeVersion ( ) ) ) ;
tr - > set ( backupAgent - > states . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_SUBMITTED ) ) ) ;
if ( stopWhenDone ) {
tr - > set ( backupAgent - > config . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyConfigStopWhenDoneKey ) , StringRef ( ) ) ;
}
int64_t startCount = 0 ;
state Key mapPrefix = logUidValue . withPrefix ( applyMutationsKeyVersionMapRange . begin ) ;
Key mapEnd = normalKeys . end . withPrefix ( mapPrefix ) ;
tr - > set ( logUidValue . withPrefix ( applyMutationsAddPrefixRange . begin ) , addPrefix ) ;
tr - > set ( logUidValue . withPrefix ( applyMutationsRemovePrefixRange . begin ) , removePrefix ) ;
tr - > set ( logUidValue . withPrefix ( applyMutationsKeyVersionCountRange . begin ) , StringRef ( ( uint8_t * ) & startCount , 8 ) ) ;
tr - > clear ( KeyRangeRef ( mapPrefix , mapEnd ) ) ;
state Version readVersion = invalidVersion ;
if ( databasesInSync ) {
Transaction readTransaction ( backupAgent - > taskBucket - > src ) ;
readTransaction . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Version _ = wait ( readTransaction . getReadVersion ( ) ) ;
readVersion = _ ;
}
tr - > set ( mapPrefix , BinaryWriter : : toValue < Version > ( readVersion , Unversioned ( ) ) ) ;
Key taskKey = wait ( dbBackup : : StartFullBackupTaskFunc : : addTask ( tr , backupAgent - > taskBucket , logUidValue , backupUid ,
addPrefix , removePrefix , BinaryWriter : : toValue ( backupRanges , IncludeVersion ( ) ) , tagName , TaskCompletionKey : : noSignal ( ) , Reference < TaskFuture > ( ) , databasesInSync ) ) ;
if ( lockDB )
Void _ = wait ( lockDatabase ( tr , logUid ) ) ;
else
Void _ = wait ( checkDatabaseLock ( tr , logUid ) ) ;
TraceEvent ( " DBA_Submit " ) . detail ( " logUid " , logUid ) . detail ( " lock " , lockDB ) . detail ( " LogUID " , printable ( logUidValue ) ) . detail ( " tag " , printable ( tagName ) )
. detail ( " key " , printable ( backupAgent - > states . get ( logUidValue ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) . detail ( " mapPrefix " , printable ( mapPrefix ) ) ;
return Void ( ) ;
}
ACTOR static Future < Void > unlockBackup ( DatabaseBackupAgent * backupAgent , Reference < ReadYourWritesTransaction > tr , Key tagName ) {
UID logUid = wait ( backupAgent - > getLogUid ( tr , tagName ) ) ;
Void _ = wait ( unlockDatabase ( tr , logUid ) ) ;
TraceEvent ( " DBA_unlock " ) . detail ( " tag " , printable ( tagName ) ) ;
return Void ( ) ;
}
ACTOR static Future < Void > atomicSwitchover ( DatabaseBackupAgent * backupAgent , Database dest , Key tagName , Standalone < VectorRef < KeyRangeRef > > backupRanges , Key addPrefix , Key removePrefix ) {
2017-06-01 07:23:37 +08:00
state DatabaseBackupAgent drAgent ( dest ) ;
2017-05-26 04:48:44 +08:00
state UID destlogUid = wait ( backupAgent - > getLogUid ( dest , tagName ) ) ;
state int status = wait ( backupAgent - > getStateValue ( dest , destlogUid ) ) ;
TraceEvent ( " DBA_switchover_start " ) . detail ( " status " , status ) ;
if ( status ! = BackupAgentBase : : STATE_DIFFERENTIAL & & status ! = BackupAgentBase : : STATE_COMPLETED ) {
throw backup_duplicate ( ) ;
}
state UID logUid = g_random - > randomUniqueID ( ) ;
state Key logUidValue = BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ;
2017-06-01 07:23:37 +08:00
state UID logUidCurrent = wait ( drAgent . getLogUid ( backupAgent - > taskBucket - > src , tagName ) ) ;
2017-05-26 04:48:44 +08:00
if ( logUidCurrent . isValid ( ) ) {
logUid = logUidCurrent ;
logUidValue = BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ;
}
//Lock src, record commit version
state Transaction tr ( backupAgent - > taskBucket - > src ) ;
state Version commitVersion ;
loop {
try {
Void _ = wait ( lockDatabase ( & tr , logUid ) ) ;
tr . set ( backupAgent - > tagNames . pack ( tagName ) , logUidValue ) ;
Void _ = wait ( tr . commit ( ) ) ;
commitVersion = tr . getCommittedVersion ( ) ;
break ;
} catch ( Error & e ) {
Void _ = wait ( tr . onError ( e ) ) ;
}
}
TraceEvent ( " DBA_switchover_locked " ) . detail ( " version " , commitVersion ) ;
2017-09-08 08:41:20 +08:00
// Wait for the destination to apply mutations up to the lock commit before switching over.
2017-05-26 04:48:44 +08:00
state ReadYourWritesTransaction tr2 ( dest ) ;
loop {
try {
tr2 . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr2 . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Optional < Value > backupUid = wait ( tr2 . get ( backupAgent - > states . get ( BinaryWriter : : toValue ( destlogUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
TraceEvent ( " DBA_switchover_backupUID " ) . detail ( " uid " , printable ( backupUid ) ) . detail ( " key " , printable ( backupAgent - > states . get ( BinaryWriter : : toValue ( destlogUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
if ( ! backupUid . present ( ) )
throw backup_duplicate ( ) ;
Optional < Value > v = wait ( tr2 . get ( BinaryWriter : : toValue ( destlogUid , Unversioned ( ) ) . withPrefix ( applyMutationsBeginRange . begin ) ) ) ;
TraceEvent ( " DBA_switchover_version " ) . detail ( " version " , v . present ( ) ? BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) : 0 ) ;
if ( v . present ( ) & & BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) > = commitVersion )
break ;
state Future < Void > versionWatch = tr2 . watch ( BinaryWriter : : toValue ( destlogUid , Unversioned ( ) ) . withPrefix ( applyMutationsBeginRange . begin ) ) ;
Void _ = wait ( tr2 . commit ( ) ) ;
Void _ = wait ( versionWatch ) ;
tr2 . reset ( ) ;
} catch ( Error & e ) {
Void _ = wait ( tr2 . onError ( e ) ) ;
}
}
TraceEvent ( " DBA_switchover_ready " ) ;
try {
Void _ = wait ( backupAgent - > discontinueBackup ( dest , tagName ) ) ;
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_backup_duplicate & & e . code ( ) ! = error_code_backup_unneeded )
throw ;
}
int _ = wait ( backupAgent - > waitBackup ( dest , tagName , true ) ) ;
TraceEvent ( " DBA_switchover_stopped " ) ;
try {
2017-06-01 07:23:37 +08:00
Void _ = wait ( drAgent . submitBackup ( backupAgent - > taskBucket - > src , tagName , backupRanges , false , addPrefix , removePrefix , true , true ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_backup_duplicate )
throw ;
}
TraceEvent ( " DBA_switchover_submitted " ) ;
2017-06-01 07:23:37 +08:00
int _ = wait ( drAgent . waitSubmitted ( backupAgent - > taskBucket - > src , tagName ) ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( " DBA_switchover_started " ) ;
2017-09-28 11:34:30 +08:00
state ReadYourWritesTransaction tr3 ( dest ) ;
2017-09-08 08:41:20 +08:00
loop {
try {
2017-09-28 11:34:30 +08:00
tr3 . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr3 . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Version destVersion = wait ( tr3 . getReadVersion ( ) ) ;
2017-09-08 08:41:20 +08:00
if ( destVersion < = commitVersion ) {
TraceEvent ( " DBA_switchover_version_upgrade " ) . detail ( " src " , commitVersion ) . detail ( " dest " , destVersion ) ;
TEST ( true ) ; // Forcing dest backup cluster to higher version
2017-09-28 11:34:30 +08:00
tr3 . set ( minRequiredCommitVersionKey , BinaryWriter : : toValue ( commitVersion + 1 , Unversioned ( ) ) ) ;
Void _ = wait ( tr3 . commit ( ) ) ;
2017-09-08 08:41:20 +08:00
} else {
break ;
}
} catch ( Error & e ) {
2017-09-28 11:34:30 +08:00
Void _ = wait ( tr3 . onError ( e ) ) ;
2017-09-08 08:41:20 +08:00
}
}
TraceEvent ( " DBA_switchover_version_upgraded " ) ;
2017-05-26 04:48:44 +08:00
Void _ = wait ( backupAgent - > unlockBackup ( dest , tagName ) ) ;
TraceEvent ( " DBA_switchover_unlocked " ) ;
return Void ( ) ;
}
ACTOR static Future < Void > discontinueBackup ( DatabaseBackupAgent * backupAgent , Reference < ReadYourWritesTransaction > tr , Key tagName ) {
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state UID logUid = wait ( backupAgent - > getLogUid ( tr , tagName ) ) ;
state int status = wait ( backupAgent - > getStateValue ( tr , logUid ) ) ;
TraceEvent ( " DBA_discontinue " ) . detail ( " status " , status ) ;
if ( ! DatabaseBackupAgent : : isRunnable ( ( BackupAgentBase : : enumState ) status ) ) {
throw backup_unneeded ( ) ;
}
state Optional < Value > stopWhenDoneValue = wait ( tr - > get ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyConfigStopWhenDoneKey ) ) ) ;
if ( stopWhenDoneValue . present ( ) ) {
throw backup_duplicate ( ) ;
}
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
tr - > set ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( BackupAgentBase : : keyConfigStopWhenDoneKey ) , StringRef ( ) ) ;
return Void ( ) ;
}
ACTOR static Future < Void > abortBackup ( DatabaseBackupAgent * backupAgent , Database cx , Key tagName , bool partial ) {
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
2017-09-28 11:34:30 +08:00
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
2017-05-26 04:48:44 +08:00
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state UID logUid ;
state Value backupUid ;
loop {
try {
UID _logUid = wait ( backupAgent - > getLogUid ( tr , tagName ) ) ;
logUid = _logUid ;
int status = wait ( backupAgent - > getStateValue ( tr , logUid ) ) ;
if ( ! backupAgent - > isRunnable ( ( BackupAgentBase : : enumState ) status ) ) {
throw backup_unneeded ( ) ;
}
2017-09-19 07:56:34 +08:00
state Version current = wait ( tr - > getReadVersion ( ) ) ;
2017-05-26 04:48:44 +08:00
Optional < Value > _backupUid = wait ( tr - > get ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
backupUid = _backupUid . get ( ) ;
// Clearing the folder id will prevent future tasks from executing
tr - > clear ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . range ( ) ) ;
// Clearing the end version of apply mutation cancels ongoing apply work
2017-09-19 07:56:34 +08:00
const auto & log_uid = BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ;
tr - > clear ( log_uid . withPrefix ( applyMutationsEndRange . begin ) ) ;
// Ensure that we're at a version higher than the data that we've written.
Optional < Value > lastApplied = wait ( tr - > get ( log_uid . withPrefix ( applyMutationsBeginRange . begin ) ) ) ;
if ( lastApplied . present ( ) ) {
Version applied = BinaryReader : : fromStringRef < Version > ( lastApplied . get ( ) , Unversioned ( ) ) ;
2017-09-28 11:34:30 +08:00
if ( current < = applied ) {
2017-09-19 07:56:34 +08:00
TraceEvent ( " DBA_abort_version_upgrade " ) . detail ( " src " , applied ) . detail ( " dest " , current ) ;
2017-09-28 11:34:30 +08:00
TEST ( true ) ; // Upgrading version of local database.
2017-09-19 07:56:34 +08:00
tr - > set ( minRequiredCommitVersionKey , BinaryWriter : : toValue ( applied + 1 , Unversioned ( ) ) ) ;
}
}
2017-05-26 04:48:44 +08:00
Key logsPath = uidPrefixKey ( applyLogKeys . begin , logUid ) ;
tr - > clear ( KeyRangeRef ( logsPath , strinc ( logsPath ) ) ) ;
tr - > set ( StringRef ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyStateStatus ) ) , StringRef ( DatabaseBackupAgent : : getStateText ( BackupAgentBase : : STATE_PARTIALLY_ABORTED ) ) ) ;
Void _ = wait ( tr - > commit ( ) ) ;
TraceEvent ( " DBA_Abort " ) . detail ( " commitVersion " , tr - > getCommittedVersion ( ) ) ;
break ;
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
}
}
if ( partial )
return Void ( ) ;
state Reference < ReadYourWritesTransaction > srcTr ( new ReadYourWritesTransaction ( backupAgent - > taskBucket - > src ) ) ;
loop {
try {
srcTr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
srcTr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > v = wait ( srcTr - > get ( backupAgent - > sourceStates . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
if ( v . present ( ) & & BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) > BinaryReader : : fromStringRef < Version > ( backupUid , Unversioned ( ) ) )
break ;
srcTr - > set ( backupAgent - > sourceStates . pack ( DatabaseBackupAgent : : keyStateStatus ) , StringRef ( BackupAgentBase : : getStateText ( BackupAgentBase : : STATE_ABORTED ) ) ) ;
srcTr - > set ( backupAgent - > sourceStates . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) , backupUid ) ;
Key logsPath = uidPrefixKey ( backupLogKeys . begin , logUid ) ;
Key configPath = uidPrefixKey ( logRangesRange . begin , logUid ) ;
srcTr - > clear ( KeyRangeRef ( logsPath , strinc ( logsPath ) ) ) ;
srcTr - > clear ( KeyRangeRef ( configPath , strinc ( configPath ) ) ) ;
Void _ = wait ( srcTr - > commit ( ) ) ;
break ;
}
catch ( Error & e ) {
Void _ = wait ( srcTr - > onError ( e ) ) ;
}
}
tr = Reference < ReadYourWritesTransaction > ( new ReadYourWritesTransaction ( cx ) ) ;
loop {
try {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > v = wait ( tr - > get ( StringRef ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ) ;
if ( v . present ( ) ) {
return Void ( ) ;
}
tr - > set ( StringRef ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyStateStatus ) ) , StringRef ( DatabaseBackupAgent : : getStateText ( BackupAgentBase : : STATE_ABORTED ) ) ) ;
Void _ = wait ( tr - > commit ( ) ) ;
return Void ( ) ;
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
}
}
}
ACTOR static Future < std : : string > getStatus ( DatabaseBackupAgent * backupAgent , Database cx , int errorLimit , Key tagName ) {
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state std : : string statusText ;
loop {
try {
Version _ = wait ( tr - > getReadVersion ( ) ) ; //get the read version before getting a version from the source database to prevent the time differential from going negative
state Transaction scrTr ( backupAgent - > taskBucket - > src ) ;
scrTr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Future < Version > srcReadVersion = scrTr . getReadVersion ( ) ;
statusText = " " ;
state UID logUid = wait ( backupAgent - > getLogUid ( tr , tagName ) ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
2017-10-31 04:11:54 +08:00
state Future < Optional < Value > > fDisabled = tr - > get ( backupAgent - > taskBucket - > getDisableKey ( ) ) ;
2017-05-26 04:48:44 +08:00
int backupStateInt = wait ( backupAgent - > getStateValue ( tr , logUid ) ) ;
state BackupAgentBase : : enumState backupState = ( BackupAgentBase : : enumState ) backupStateInt ;
if ( backupState = = DatabaseBackupAgent : : STATE_NEVERRAN ) {
statusText + = " No previous backups found. \n " ;
}
else {
state std : : string tagNameDisplay ;
Optional < Key > tagName = wait ( tr - > get ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( BackupAgentBase : : keyConfigBackupTag ) ) ) ;
// Define the display tag name
if ( tagName . present ( ) ) {
tagNameDisplay = tagName . get ( ) . toString ( ) ;
}
state Optional < Value > uid = wait ( tr - > get ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( BackupAgentBase : : keyFolderId ) ) ) ;
state Optional < Value > stopVersionKey = wait ( tr - > get ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( BackupAgentBase : : keyStateStop ) ) ) ;
state Standalone < VectorRef < KeyRangeRef > > backupRanges ;
Optional < Key > backupKeysPacked = wait ( tr - > get ( backupAgent - > config . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( BackupAgentBase : : keyConfigBackupRanges ) ) ) ;
if ( backupKeysPacked . present ( ) ) {
BinaryReader br ( backupKeysPacked . get ( ) , IncludeVersion ( ) ) ;
br > > backupRanges ;
}
switch ( backupState ) {
case BackupAgentBase : : STATE_SUBMITTED :
statusText + = " The DR on tag ` " + tagNameDisplay + " ' is NOT a complete copy of the primary database (just started). \n " ;
break ;
case BackupAgentBase : : STATE_BACKUP :
statusText + = " The DR on tag ` " + tagNameDisplay + " ' is NOT a complete copy of the primary database. \n " ;
break ;
case BackupAgentBase : : STATE_DIFFERENTIAL :
statusText + = " The DR on tag ` " + tagNameDisplay + " ' is a complete copy of the primary database. \n " ;
break ;
case BackupAgentBase : : STATE_COMPLETED :
{
Version stopVersion = stopVersionKey . present ( ) ? BinaryReader : : fromStringRef < Version > ( stopVersionKey . get ( ) , Unversioned ( ) ) : - 1 ;
statusText + = " The previous DR on tag ` " + tagNameDisplay + " ' completed at version " + format ( " %lld " , stopVersion ) + " . \n " ;
}
break ;
case BackupAgentBase : : STATE_PARTIALLY_ABORTED :
{
statusText + = " The previous DR on tag ` " + tagNameDisplay + " ' " + BackupAgentBase : : getStateText ( backupState ) + " . \n " ;
statusText + = " Abort the DR with --cleanup before starting a new DR. \n " ;
break ;
}
default :
statusText + = " The previous DR on tag ` " + tagNameDisplay + " ' " + BackupAgentBase : : getStateText ( backupState ) + " . \n " ;
break ;
}
}
// Append the errors, if requested
if ( errorLimit > 0 ) {
Standalone < RangeResultRef > values = wait ( tr - > getRange ( backupAgent - > errors . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . range ( ) , errorLimit , false , true ) ) ;
// Display the errors, if any
if ( values . size ( ) > 0 ) {
// Inform the user that the list of errors is complete or partial
statusText + = ( values . size ( ) < errorLimit )
? " WARNING: Some DR agents have reported issues: \n "
: " WARNING: Some DR agents have reported issues (printing " + std : : to_string ( errorLimit ) + " ): \n " ;
for ( auto & s : values ) {
statusText + = " " + printable ( s . value ) + " \n " ;
}
}
}
//calculate time differential
state Optional < Value > backupUid = wait ( tr - > get ( backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
if ( backupUid . present ( ) ) {
Optional < Value > v = wait ( tr - > get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) . withPrefix ( applyMutationsBeginRange . begin ) ) ) ;
if ( v . present ( ) ) {
state Version destApplyBegin = BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) ;
Version sourceVersion = wait ( srcReadVersion ) ;
double secondsBehind = ( ( double ) ( sourceVersion - destApplyBegin ) ) / CLIENT_KNOBS - > CORE_VERSIONSPERSECOND ;
statusText + = format ( " \n The DR is %.6f seconds behind. \n " , secondsBehind ) ;
}
}
2017-10-31 03:35:00 +08:00
Optional < Value > disabled = wait ( fDisabled ) ;
if ( disabled . present ( ) ) {
statusText + = format ( " \n All DR agents have been disabled. \n " ) ;
}
2017-05-26 04:48:44 +08:00
break ;
}
catch ( Error & e ) {
Void _ = wait ( tr - > onError ( e ) ) ;
}
}
return statusText ;
}
ACTOR static Future < int > getStateValue ( DatabaseBackupAgent * backupAgent , Reference < ReadYourWritesTransaction > tr , UID logUid ) {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Key statusKey = backupAgent - > states . get ( BinaryWriter : : toValue ( logUid , Unversioned ( ) ) ) . pack ( DatabaseBackupAgent : : keyStateStatus ) ;
Optional < Value > status = wait ( tr - > get ( statusKey ) ) ;
return ( ! status . present ( ) ) ? DatabaseBackupAgent : : STATE_NEVERRAN : BackupAgentBase : : getState ( status . get ( ) . toString ( ) ) ;
}
ACTOR static Future < UID > getLogUid ( DatabaseBackupAgent * backupAgent , Reference < ReadYourWritesTransaction > tr , Key tagName ) {
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
state Optional < Value > logUid = wait ( tr - > get ( backupAgent - > tagNames . pack ( tagName ) ) ) ;
return ( logUid . present ( ) ) ? BinaryReader : : fromStringRef < UID > ( logUid . get ( ) , Unversioned ( ) ) : UID ( ) ;
}
} ;
Future < Void > DatabaseBackupAgent : : unlockBackup ( Reference < ReadYourWritesTransaction > tr , Key tagName ) {
return DatabaseBackupAgentImpl : : unlockBackup ( this , tr , tagName ) ;
}
Future < Void > DatabaseBackupAgent : : atomicSwitchover ( Database dest , Key tagName , Standalone < VectorRef < KeyRangeRef > > backupRanges , Key addPrefix , Key removePrefix ) {
return DatabaseBackupAgentImpl : : atomicSwitchover ( this , dest , tagName , backupRanges , addPrefix , removePrefix ) ;
}
Future < Void > DatabaseBackupAgent : : submitBackup ( Reference < ReadYourWritesTransaction > tr , Key tagName , Standalone < VectorRef < KeyRangeRef > > backupRanges , bool stopWhenDone , Key addPrefix , Key removePrefix , bool lockDatabase , bool databasesInSync ) {
return DatabaseBackupAgentImpl : : submitBackup ( this , tr , tagName , backupRanges , stopWhenDone , addPrefix , removePrefix , lockDatabase , databasesInSync ) ;
}
Future < Void > DatabaseBackupAgent : : discontinueBackup ( Reference < ReadYourWritesTransaction > tr , Key tagName ) {
return DatabaseBackupAgentImpl : : discontinueBackup ( this , tr , tagName ) ;
}
Future < Void > DatabaseBackupAgent : : abortBackup ( Database cx , Key tagName , bool partial ) {
return DatabaseBackupAgentImpl : : abortBackup ( this , cx , tagName , partial ) ;
}
Future < std : : string > DatabaseBackupAgent : : getStatus ( Database cx , int errorLimit , Key tagName ) {
return DatabaseBackupAgentImpl : : getStatus ( this , cx , errorLimit , tagName ) ;
}
Future < int > DatabaseBackupAgent : : getStateValue ( Reference < ReadYourWritesTransaction > tr , UID logUid ) {
return DatabaseBackupAgentImpl : : getStateValue ( this , tr , logUid ) ;
}
Future < UID > DatabaseBackupAgent : : getLogUid ( Reference < ReadYourWritesTransaction > tr , Key tagName ) {
return DatabaseBackupAgentImpl : : getLogUid ( this , tr , tagName ) ;
}
Future < int > DatabaseBackupAgent : : waitBackup ( Database cx , Key tagName , bool stopWhenDone ) {
return DatabaseBackupAgentImpl : : waitBackup ( this , cx , tagName , stopWhenDone ) ;
}
Future < int > DatabaseBackupAgent : : waitSubmitted ( Database cx , Key tagName ) {
return DatabaseBackupAgentImpl : : waitSubmitted ( this , cx , tagName ) ;
}
Future < int64_t > DatabaseBackupAgent : : getRangeBytesWritten ( Reference < ReadYourWritesTransaction > tr , UID logUid ) {
return DRConfig ( logUid ) . rangeBytesWritten ( ) . getD ( tr ) ;
}
Future < int64_t > DatabaseBackupAgent : : getLogBytesWritten ( Reference < ReadYourWritesTransaction > tr , UID logUid ) {
return DRConfig ( logUid ) . logBytesWritten ( ) . getD ( tr ) ;
}