2017-05-26 04:48:44 +08:00
/*
* BackupAgentBase . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2019-03-20 16:36:25 +08:00
# include <iomanip>
2019-03-20 17:51:40 +08:00
# include <time.h>
2019-03-20 16:36:25 +08:00
2019-02-18 07:19:05 +08:00
# include "fdbclient/BackupAgent.actor.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/simulator.h"
# include "flow/ActorCollection.h"
2019-02-18 06:55:47 +08:00
# include "flow/actorcompiler.h" // has to be last include
2017-05-26 04:48:44 +08:00
2019-03-20 16:18:37 +08:00
std : : string BackupAgentBase : : formatTime ( int64_t epochs ) {
time_t curTime = ( time_t ) epochs ;
char buffer [ 30 ] ;
struct tm timeinfo ;
getLocalTime ( & curTime , & timeinfo ) ;
strftime ( buffer , 30 , " %Y/%m/%d.%H:%M:%S%z " , & timeinfo ) ;
return buffer ;
}
int64_t BackupAgentBase : : parseTime ( std : : string timestamp ) {
2019-03-20 17:39:49 +08:00
struct tm out ;
# ifdef _WIN32
2019-03-20 16:18:37 +08:00
// Windows does not support strptime, so we will use std::get_time
// Unfortunately, std::get_time() does not support %z (as strptime does) so we must read
// the date/time part separately from the timezone and then adjust tm.
std : : istringstream s ( timestamp . substr ( 0 , 19 ) ) ;
s . imbue ( std : : locale ( setlocale ( LC_TIME , nullptr ) ) ) ;
s > > std : : get_time ( & out , " %Y/%m/%d.%H:%M:%S " ) ;
if ( s . fail ( ) ) {
return - 1 ;
}
// Read timezone offset in +/-HHMM format then convert to seconds
int tzHH ;
int tzMM ;
if ( sscanf ( timestamp . substr ( 19 , 5 ) . c_str ( ) , " %3d%2d " , & tzHH , & tzMM ) ! = 2 ) {
return - 1 ;
}
if ( tzHH < 0 ) {
tzMM = - tzMM ;
}
int tzOffset = tzHH * 60 * 60 + tzMM * 60 ;
2019-03-20 17:51:40 +08:00
// timestamp was meant to be read in timezone tzOffset, but instead (for reasons stated above) was read without a timezone.
// mktime() will return epoch seconds and update out.tm_gmtoff to the local timezone (if it exists)
2019-03-20 16:18:37 +08:00
int64_t ts = mktime ( & out ) ;
// Add back the difference between the default timezone offset and the intended one.
2019-03-20 17:51:40 +08:00
ts + = ( ( - _timezone ) - tzOffset ) ; // Would like to use out.tm_gmtoff here but on Windows the negative of _timezone must be used instead.
2019-03-20 16:18:37 +08:00
return ts ;
2019-03-20 17:39:49 +08:00
# else
if ( strptime ( timestamp . c_str ( ) , " %Y/%m/%d.%H:%M:%S%z " , & out ) = = nullptr ) {
return - 1 ;
}
return ( int64_t ) mktime ( & out ) ;
# endif
2019-03-20 16:18:37 +08:00
}
2017-05-26 04:48:44 +08:00
const Key BackupAgentBase : : keyFolderId = LiteralStringRef ( " config_folderid " ) ;
const Key BackupAgentBase : : keyBeginVersion = LiteralStringRef ( " beginVersion " ) ;
const Key BackupAgentBase : : keyEndVersion = LiteralStringRef ( " endVersion " ) ;
2018-02-21 05:22:31 +08:00
const Key BackupAgentBase : : keyPrevBeginVersion = LiteralStringRef ( " prevBeginVersion " ) ;
2017-05-26 04:48:44 +08:00
const Key BackupAgentBase : : keyConfigBackupTag = LiteralStringRef ( " config_backup_tag " ) ;
const Key BackupAgentBase : : keyConfigLogUid = LiteralStringRef ( " config_log_uid " ) ;
const Key BackupAgentBase : : keyConfigBackupRanges = LiteralStringRef ( " config_backup_ranges " ) ;
const Key BackupAgentBase : : keyConfigStopWhenDoneKey = LiteralStringRef ( " config_stop_when_done " ) ;
const Key BackupAgentBase : : keyStateStop = LiteralStringRef ( " state_stop " ) ;
const Key BackupAgentBase : : keyStateStatus = LiteralStringRef ( " state_status " ) ;
const Key BackupAgentBase : : keyLastUid = LiteralStringRef ( " last_uid " ) ;
const Key BackupAgentBase : : keyBeginKey = LiteralStringRef ( " beginKey " ) ;
const Key BackupAgentBase : : keyEndKey = LiteralStringRef ( " endKey " ) ;
2018-04-27 08:24:40 +08:00
const Key BackupAgentBase : : keyDrVersion = LiteralStringRef ( " drVersion " ) ;
2018-02-21 05:22:31 +08:00
const Key BackupAgentBase : : destUid = LiteralStringRef ( " destUid " ) ;
2018-03-14 02:21:24 +08:00
const Key BackupAgentBase : : backupStartVersion = LiteralStringRef ( " backupStartVersion " ) ;
2017-05-26 04:48:44 +08:00
const Key BackupAgentBase : : keyTagName = LiteralStringRef ( " tagname " ) ;
const Key BackupAgentBase : : keyStates = LiteralStringRef ( " state " ) ;
const Key BackupAgentBase : : keyConfig = LiteralStringRef ( " config " ) ;
const Key BackupAgentBase : : keyErrors = LiteralStringRef ( " errors " ) ;
const Key BackupAgentBase : : keyRanges = LiteralStringRef ( " ranges " ) ;
const Key BackupAgentBase : : keyTasks = LiteralStringRef ( " tasks " ) ;
const Key BackupAgentBase : : keyFutures = LiteralStringRef ( " futures " ) ;
const Key BackupAgentBase : : keySourceStates = LiteralStringRef ( " source_states " ) ;
const Key BackupAgentBase : : keySourceTagName = LiteralStringRef ( " source_tagname " ) ;
bool copyParameter ( Reference < Task > source , Reference < Task > dest , Key key ) {
if ( source ) {
dest - > params [ key ] = source - > params [ key ] ;
return true ;
}
return false ;
}
Version getVersionFromString ( std : : string const & value ) {
Version version ( - 1 ) ;
int n = 0 ;
if ( sscanf ( value . c_str ( ) , " %lld%n " , ( long long * ) & version , & n ) ! = 1 | | n ! = value . size ( ) ) {
2018-06-09 04:57:00 +08:00
TraceEvent ( SevWarnAlways , " GetVersionFromString " ) . detail ( " InvalidVersion " , value ) ;
2017-05-26 04:48:44 +08:00
throw restore_invalid_version ( ) ;
}
return version ;
}
// Transaction log data is stored by the FoundationDB core in the
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// of versions.
2018-02-21 05:22:31 +08:00
Standalone < VectorRef < KeyRangeRef > > getLogRanges ( Version beginVersion , Version endVersion , Key destUidValue , int blockSize ) {
2017-05-26 04:48:44 +08:00
Standalone < VectorRef < KeyRangeRef > > ret ;
2018-02-21 05:22:31 +08:00
Key baLogRangePrefix = destUidValue . withPrefix ( backupLogKeys . begin ) ;
2017-05-26 04:48:44 +08:00
2018-06-09 02:11:08 +08:00
//TraceEvent("GetLogRanges").detail("DestUidValue", destUidValue).detail("Prefix", printable(StringRef(baLogRangePrefix)));
2017-05-26 04:48:44 +08:00
for ( int64_t vblock = beginVersion / blockSize ; vblock < ( endVersion + blockSize - 1 ) / blockSize ; + + vblock ) {
int64_t tb = vblock * blockSize / CLIENT_KNOBS - > LOG_RANGE_BLOCK_SIZE ;
uint64_t bv = bigEndian64 ( std : : max ( beginVersion , vblock * blockSize ) ) ;
uint64_t ev = bigEndian64 ( std : : min ( endVersion , ( vblock + 1 ) * blockSize ) ) ;
uint32_t data = tb & 0xffffffff ;
uint8_t hash = ( uint8_t ) hashlittle ( & data , sizeof ( uint32_t ) , 0 ) ;
Key vblockPrefix = StringRef ( & hash , sizeof ( uint8_t ) ) . withPrefix ( baLogRangePrefix ) ;
ret . push_back_deep ( ret . arena ( ) , KeyRangeRef ( StringRef ( ( uint8_t * ) & bv , sizeof ( uint64_t ) ) . withPrefix ( vblockPrefix ) ,
StringRef ( ( uint8_t * ) & ev , sizeof ( uint64_t ) ) . withPrefix ( vblockPrefix ) ) ) ;
}
return ret ;
}
Standalone < VectorRef < KeyRangeRef > > getApplyRanges ( Version beginVersion , Version endVersion , Key backupUid ) {
Standalone < VectorRef < KeyRangeRef > > ret ;
Key baLogRangePrefix = backupUid . withPrefix ( applyLogKeys . begin ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", printable(StringRef(baLogRangePrefix)));
2017-05-26 04:48:44 +08:00
for ( int64_t vblock = beginVersion / CLIENT_KNOBS - > APPLY_BLOCK_SIZE ; vblock < ( endVersion + CLIENT_KNOBS - > APPLY_BLOCK_SIZE - 1 ) / CLIENT_KNOBS - > APPLY_BLOCK_SIZE ; + + vblock ) {
int64_t tb = vblock * CLIENT_KNOBS - > APPLY_BLOCK_SIZE / CLIENT_KNOBS - > LOG_RANGE_BLOCK_SIZE ;
uint64_t bv = bigEndian64 ( std : : max ( beginVersion , vblock * CLIENT_KNOBS - > APPLY_BLOCK_SIZE ) ) ;
uint64_t ev = bigEndian64 ( std : : min ( endVersion , ( vblock + 1 ) * CLIENT_KNOBS - > APPLY_BLOCK_SIZE ) ) ;
uint32_t data = tb & 0xffffffff ;
uint8_t hash = ( uint8_t ) hashlittle ( & data , sizeof ( uint32_t ) , 0 ) ;
Key vblockPrefix = StringRef ( & hash , sizeof ( uint8_t ) ) . withPrefix ( baLogRangePrefix ) ;
ret . push_back_deep ( ret . arena ( ) , KeyRangeRef ( StringRef ( ( uint8_t * ) & bv , sizeof ( uint64_t ) ) . withPrefix ( vblockPrefix ) ,
StringRef ( ( uint8_t * ) & ev , sizeof ( uint64_t ) ) . withPrefix ( vblockPrefix ) ) ) ;
}
return ret ;
}
Key getApplyKey ( Version version , Key backupUid ) {
int64_t vblock = ( version - 1 ) / CLIENT_KNOBS - > LOG_RANGE_BLOCK_SIZE ;
uint64_t v = bigEndian64 ( version ) ;
uint32_t data = vblock & 0xffffffff ;
uint8_t hash = ( uint8_t ) hashlittle ( & data , sizeof ( uint32_t ) , 0 ) ;
Key k1 = StringRef ( ( uint8_t * ) & v , sizeof ( uint64_t ) ) . withPrefix ( StringRef ( & hash , sizeof ( uint8_t ) ) ) ;
Key k2 = k1 . withPrefix ( backupUid ) ;
return k2 . withPrefix ( applyLogKeys . begin ) ;
}
//Given a key from one of the ranges returned by get_log_ranges,
//returns(version, part) where version is the database version number of
//the transaction log data in the value, and part is 0 for the first such
//data for a given version, 1 for the second block of data, etc.
std : : pair < uint64_t , uint32_t > decodeBKMutationLogKey ( Key key ) {
return std : : make_pair ( bigEndian64 ( * ( int64_t * ) ( key . begin ( ) + backupLogPrefixBytes + sizeof ( UID ) + sizeof ( uint8_t ) ) ) ,
bigEndian32 ( * ( int32_t * ) ( key . begin ( ) + backupLogPrefixBytes + sizeof ( UID ) + sizeof ( uint8_t ) + sizeof ( int64_t ) ) ) ) ;
}
// value is an iterable representing all of the transaction log data for
// a given version.Returns an iterable(generator) yielding a tuple for
// each mutation in the log.At present, all mutations are represented as
// (type, param1, param2) where type is an integer and param1 and param2 are byte strings
Standalone < VectorRef < MutationRef > > decodeBackupLogValue ( StringRef value ) {
try {
uint64_t offset ( 0 ) ;
uint64_t protocolVersion = 0 ;
memcpy ( & protocolVersion , value . begin ( ) , sizeof ( uint64_t ) ) ;
offset + = sizeof ( uint64_t ) ;
if ( protocolVersion < = 0x0FDB00A200090001 ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " DecodeBackupLogValue " ) . detail ( " IncompatibleProtocolVersion " , protocolVersion )
. detail ( " ValueSize " , value . size ( ) ) . detail ( " Value " , printable ( value ) ) ;
2017-05-26 04:48:44 +08:00
throw incompatible_protocol_version ( ) ;
}
Standalone < VectorRef < MutationRef > > result ;
uint32_t totalBytes = 0 ;
memcpy ( & totalBytes , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t consumed = 0 ;
if ( totalBytes + offset > value . size ( ) )
throw restore_missing_data ( ) ;
int originalOffset = offset ;
while ( consumed < totalBytes ) {
uint32_t type = 0 ;
memcpy ( & type , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t len1 = 0 ;
memcpy ( & len1 , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t len2 = 0 ;
memcpy ( & len2 , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
MutationRef logValue ;
logValue . type = type ;
logValue . param1 = value . substr ( offset , len1 ) ;
offset + = len1 ;
logValue . param2 = value . substr ( offset , len2 ) ;
offset + = len2 ;
result . push_back_deep ( result . arena ( ) , logValue ) ;
consumed + = BackupAgentBase : : logHeaderSize + len1 + len2 ;
}
ASSERT ( consumed = = totalBytes ) ;
if ( value . size ( ) ! = offset ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " BA_DecodeBackupLogValue " ) . detail ( " UnexpectedExtraDataSize " , value . size ( ) ) . detail ( " Offset " , offset ) . detail ( " TotalBytes " , totalBytes ) . detail ( " Consumed " , consumed ) . detail ( " OriginalOffset " , originalOffset ) ;
2017-05-26 04:48:44 +08:00
throw restore_corrupted_data ( ) ;
}
return result ;
}
catch ( Error & e ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( e . code ( ) = = error_code_restore_missing_data ? SevWarn : SevError , " BA_DecodeBackupLogValue " ) . error ( e ) . GetLastError ( ) . detail ( " ValueSize " , value . size ( ) ) . detail ( " Value " , printable ( value ) ) ;
2017-05-26 04:48:44 +08:00
throw ;
}
}
void decodeBackupLogValue ( Arena & arena , VectorRef < MutationRef > & result , int & mutationSize , StringRef value , StringRef addPrefix , StringRef removePrefix , Version version , Reference < KeyRangeMap < Version > > key_version ) {
try {
uint64_t offset ( 0 ) ;
uint64_t protocolVersion = 0 ;
memcpy ( & protocolVersion , value . begin ( ) , sizeof ( uint64_t ) ) ;
offset + = sizeof ( uint64_t ) ;
if ( protocolVersion < = 0x0FDB00A200090001 ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " DecodeBackupLogValue " ) . detail ( " IncompatibleProtocolVersion " , protocolVersion )
. detail ( " ValueSize " , value . size ( ) ) . detail ( " Value " , printable ( value ) ) ;
2017-05-26 04:48:44 +08:00
throw incompatible_protocol_version ( ) ;
}
uint32_t totalBytes = 0 ;
memcpy ( & totalBytes , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t consumed = 0 ;
if ( totalBytes + offset > value . size ( ) )
throw restore_missing_data ( ) ;
int originalOffset = offset ;
while ( consumed < totalBytes ) {
uint32_t type = 0 ;
memcpy ( & type , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t len1 = 0 ;
memcpy ( & len1 , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
uint32_t len2 = 0 ;
memcpy ( & len2 , value . begin ( ) + offset , sizeof ( uint32_t ) ) ;
offset + = sizeof ( uint32_t ) ;
ASSERT ( offset + len1 + len2 < = value . size ( ) & & isValidMutationType ( type ) ) ;
MutationRef logValue ;
Arena tempArena ;
logValue . type = type ;
logValue . param1 = value . substr ( offset , len1 ) ;
offset + = len1 ;
logValue . param2 = value . substr ( offset , len2 ) ;
offset + = len2 ;
if ( logValue . type = = MutationRef : : ClearRange ) {
KeyRangeRef range ( logValue . param1 , logValue . param2 ) ;
auto ranges = key_version - > intersectingRanges ( range ) ;
for ( auto r : ranges ) {
if ( version > r . value ( ) & & r . value ( ) ! = invalidVersion ) {
KeyRef minKey = std : : min ( r . range ( ) . end , range . end ) ;
if ( minKey = = ( removePrefix = = StringRef ( ) ? normalKeys . end : strinc ( removePrefix ) ) ) {
logValue . param1 = std : : max ( r . range ( ) . begin , range . begin ) ;
if ( removePrefix . size ( ) ) {
logValue . param1 = logValue . param1 . removePrefix ( removePrefix ) ;
}
if ( addPrefix . size ( ) ) {
logValue . param1 = logValue . param1 . withPrefix ( addPrefix , tempArena ) ;
}
logValue . param2 = addPrefix = = StringRef ( ) ? normalKeys . end : strinc ( addPrefix , tempArena ) ;
result . push_back_deep ( arena , logValue ) ;
mutationSize + = logValue . expectedSize ( ) ;
}
else {
logValue . param1 = std : : max ( r . range ( ) . begin , range . begin ) ;
logValue . param2 = minKey ;
if ( removePrefix . size ( ) ) {
logValue . param1 = logValue . param1 . removePrefix ( removePrefix ) ;
logValue . param2 = logValue . param2 . removePrefix ( removePrefix ) ;
}
if ( addPrefix . size ( ) ) {
logValue . param1 = logValue . param1 . withPrefix ( addPrefix , tempArena ) ;
logValue . param2 = logValue . param2 . withPrefix ( addPrefix , tempArena ) ;
}
result . push_back_deep ( arena , logValue ) ;
mutationSize + = logValue . expectedSize ( ) ;
}
}
}
}
else {
Version ver = key_version - > rangeContaining ( logValue . param1 ) . value ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
2017-05-26 04:48:44 +08:00
if ( version > ver & & ver ! = invalidVersion ) {
if ( removePrefix . size ( ) ) {
logValue . param1 = logValue . param1 . removePrefix ( removePrefix ) ;
}
if ( addPrefix . size ( ) ) {
logValue . param1 = logValue . param1 . withPrefix ( addPrefix , tempArena ) ;
}
result . push_back_deep ( arena , logValue ) ;
mutationSize + = logValue . expectedSize ( ) ;
}
}
consumed + = BackupAgentBase : : logHeaderSize + len1 + len2 ;
}
ASSERT ( consumed = = totalBytes ) ;
if ( value . size ( ) ! = offset ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " BA_DecodeBackupLogValue " ) . detail ( " UnexpectedExtraDataSize " , value . size ( ) ) . detail ( " Offset " , offset ) . detail ( " TotalBytes " , totalBytes ) . detail ( " Consumed " , consumed ) . detail ( " OriginalOffset " , originalOffset ) ;
2017-05-26 04:48:44 +08:00
throw restore_corrupted_data ( ) ;
}
}
catch ( Error & e ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( e . code ( ) = = error_code_restore_missing_data ? SevWarn : SevError , " BA_DecodeBackupLogValue " ) . error ( e ) . GetLastError ( ) . detail ( " ValueSize " , value . size ( ) ) . detail ( " Value " , printable ( value ) ) ;
2017-05-26 04:48:44 +08:00
throw ;
}
}
static double lastErrorTime = 0 ;
2018-09-11 01:51:41 +08:00
void logErrorWorker ( Reference < ReadYourWritesTransaction > tr , Key keyErrors , std : : string message ) {
2017-05-26 04:48:44 +08:00
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
if ( now ( ) - lastErrorTime > CLIENT_KNOBS - > BACKUP_ERROR_DELAY ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( " BA_LogError " ) . detail ( " Key " , printable ( keyErrors ) ) . detail ( " Message " , message ) ;
2017-05-26 04:48:44 +08:00
lastErrorTime = now ( ) ;
}
tr - > set ( keyErrors , message ) ;
}
Future < Void > logError ( Database cx , Key keyErrors , const std : : string & message ) {
2018-09-11 01:51:41 +08:00
return runRYWTransaction ( cx , [ = ] ( Reference < ReadYourWritesTransaction > tr ) {
logErrorWorker ( tr , keyErrors , message ) ;
return Future < Void > ( Void ( ) ) ;
} ) ;
2017-05-26 04:48:44 +08:00
}
Future < Void > logError ( Reference < ReadYourWritesTransaction > tr , Key keyErrors , const std : : string & message ) {
return logError ( tr - > getDatabase ( ) , keyErrors , message ) ;
}
ACTOR Future < Void > readCommitted ( Database cx , PromiseStream < RangeResultWithVersion > results , Reference < FlowLock > lock ,
KeyRangeRef range , bool terminator , bool systemAccess , bool lockAware ) {
state KeySelector begin = firstGreaterOrEqual ( range . begin ) ;
state KeySelector end = firstGreaterOrEqual ( range . end ) ;
2018-03-08 05:56:34 +08:00
state Transaction tr ( cx ) ;
2017-05-26 04:48:44 +08:00
state FlowLock : : Releaser releaser ;
loop {
try {
2017-12-22 09:21:05 +08:00
state GetRangeLimits limits ( CLIENT_KNOBS - > ROW_LIMIT_UNLIMITED , ( g_network - > isSimulated ( ) & & ! g_simulator . speedUpSimulation ) ? CLIENT_KNOBS - > BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS - > BACKUP_GET_RANGE_LIMIT_BYTES ) ;
2017-05-26 04:48:44 +08:00
if ( systemAccess )
2018-03-08 05:56:34 +08:00
tr . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
2017-05-26 04:48:44 +08:00
if ( lockAware )
2018-03-08 05:56:34 +08:00
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
2017-05-26 04:48:44 +08:00
//add lock
releaser . release ( ) ;
2018-08-11 04:57:10 +08:00
wait ( lock - > take ( TaskDefaultYield , limits . bytes + CLIENT_KNOBS - > VALUE_SIZE_LIMIT + CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ) ;
2017-05-26 04:48:44 +08:00
releaser = FlowLock : : Releaser ( * lock , limits . bytes + CLIENT_KNOBS - > VALUE_SIZE_LIMIT + CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
2018-03-08 05:56:34 +08:00
state Standalone < RangeResultRef > values = wait ( tr . getRange ( begin , end , limits ) ) ;
2017-05-26 04:48:44 +08:00
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if ( values . size ( ) > 1 & & BUGGIFY ) {
values . resize ( values . arena ( ) , values . size ( ) / 2 ) ;
values . more = true ;
// Half of the time wait for this tr to expire so that the next read is at a different version
if ( g_random - > random01 ( ) < 0.5 )
2018-08-11 04:57:10 +08:00
wait ( delay ( 6.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
releaser . remaining - = values . expectedSize ( ) ; //its the responsibility of the caller to release after this point
ASSERT ( releaser . remaining > = 0 ) ;
2018-03-08 05:56:34 +08:00
results . send ( RangeResultWithVersion ( values , tr . getReadVersion ( ) . get ( ) ) ) ;
2017-05-26 04:48:44 +08:00
if ( values . size ( ) > 0 )
begin = firstGreaterThan ( values . end ( ) [ - 1 ] . key ) ;
if ( ! values . more & & ! limits . isReached ( ) ) {
2017-11-15 15:33:17 +08:00
if ( terminator )
results . sendError ( end_of_stream ( ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
}
catch ( Error & e ) {
2017-09-29 07:35:08 +08:00
if ( e . code ( ) ! = error_code_transaction_too_old & & e . code ( ) ! = error_code_future_version )
2017-05-26 04:48:44 +08:00
throw ;
2018-03-08 05:56:34 +08:00
tr = Transaction ( cx ) ;
2017-05-26 04:48:44 +08:00
}
}
}
ACTOR Future < Void > readCommitted ( Database cx , PromiseStream < RCGroup > results , Future < Void > active , Reference < FlowLock > lock ,
KeyRangeRef range , std : : function < std : : pair < uint64_t , uint32_t > ( Key key ) > groupBy ,
2018-03-08 05:56:34 +08:00
bool terminator , bool systemAccess , bool lockAware )
2017-05-26 04:48:44 +08:00
{
state KeySelector nextKey = firstGreaterOrEqual ( range . begin ) ;
state KeySelector end = firstGreaterOrEqual ( range . end ) ;
state RCGroup rcGroup = RCGroup ( ) ;
state uint64_t skipGroup ( ULLONG_MAX ) ;
2018-03-08 05:56:34 +08:00
state Transaction tr ( cx ) ;
2017-05-26 04:48:44 +08:00
state FlowLock : : Releaser releaser ;
loop {
try {
2017-12-22 09:21:05 +08:00
state GetRangeLimits limits ( CLIENT_KNOBS - > ROW_LIMIT_UNLIMITED , ( g_network - > isSimulated ( ) & & ! g_simulator . speedUpSimulation ) ? CLIENT_KNOBS - > BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS - > BACKUP_GET_RANGE_LIMIT_BYTES ) ;
2017-05-26 04:48:44 +08:00
if ( systemAccess )
2018-03-08 05:56:34 +08:00
tr . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
2017-05-26 04:48:44 +08:00
if ( lockAware )
2018-03-08 05:56:34 +08:00
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
2017-05-26 04:48:44 +08:00
2018-03-08 05:56:34 +08:00
state Standalone < RangeResultRef > rangevalue = wait ( tr . getRange ( nextKey , end , limits ) ) ;
2017-05-26 04:48:44 +08:00
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if ( rangevalue . size ( ) > 1 & & BUGGIFY ) {
rangevalue . resize ( rangevalue . arena ( ) , rangevalue . size ( ) / 2 ) ;
rangevalue . more = true ;
// Half of the time wait for this tr to expire so that the next read is at a different version
if ( g_random - > random01 ( ) < 0.5 )
2018-08-11 04:57:10 +08:00
wait ( delay ( 6.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
//add lock
2018-08-11 04:57:10 +08:00
wait ( active ) ;
2017-05-26 04:48:44 +08:00
releaser . release ( ) ;
2018-08-11 04:57:10 +08:00
wait ( lock - > take ( TaskDefaultYield , rangevalue . expectedSize ( ) + rcGroup . items . expectedSize ( ) ) ) ;
2017-05-26 04:48:44 +08:00
releaser = FlowLock : : Releaser ( * lock , rangevalue . expectedSize ( ) + rcGroup . items . expectedSize ( ) ) ;
int index ( 0 ) ;
for ( auto & s : rangevalue ) {
uint64_t groupKey = groupBy ( s . key ) . first ;
2018-06-09 02:20:06 +08:00
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", printable(nextKey.key)).detail("End", printable(end.key)).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
2017-05-26 04:48:44 +08:00
if ( groupKey ! = skipGroup ) {
if ( rcGroup . version = = - 1 ) {
2018-03-08 05:56:34 +08:00
rcGroup . version = tr . getReadVersion ( ) . get ( ) ;
2017-05-26 04:48:44 +08:00
rcGroup . groupKey = groupKey ;
}
else if ( rcGroup . groupKey ! = groupKey ) {
2018-06-09 04:57:00 +08:00
//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
2017-05-26 04:48:44 +08:00
//state uint32_t len(0);
//for (size_t j = 0; j < rcGroup.items.size(); ++j) {
// len += rcGroup.items[j].value.size();
//}
2018-06-09 02:11:08 +08:00
//TraceEvent("SendGroup").detail("GroupKey", rcGroup.groupKey).detail("Version", rcGroup.version).detail("Length", len).detail("Releaser.remaining", releaser.remaining);
2017-05-26 04:48:44 +08:00
releaser . remaining - = rcGroup . items . expectedSize ( ) ; //its the responsibility of the caller to release after this point
ASSERT ( releaser . remaining > = 0 ) ;
results . send ( rcGroup ) ;
nextKey = firstGreaterThan ( rcGroup . items . end ( ) [ - 1 ] . key ) ;
skipGroup = rcGroup . groupKey ;
rcGroup = RCGroup ( ) ;
2018-03-08 05:56:34 +08:00
rcGroup . version = tr . getReadVersion ( ) . get ( ) ;
2017-05-26 04:48:44 +08:00
rcGroup . groupKey = groupKey ;
}
rcGroup . items . push_back_deep ( rcGroup . items . arena ( ) , s ) ;
}
}
if ( ! rangevalue . more ) {
if ( rcGroup . version ! = - 1 ) {
releaser . remaining - = rcGroup . items . expectedSize ( ) ; //its the responsibility of the caller to release after this point
ASSERT ( releaser . remaining > = 0 ) ;
2018-06-09 02:20:06 +08:00
//TraceEvent("Log_ReadCommitted").detail("SendGroup1", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength", rcGroup.items[0].value.size());
2017-05-26 04:48:44 +08:00
results . send ( rcGroup ) ;
}
2017-11-15 15:33:17 +08:00
if ( terminator )
results . sendError ( end_of_stream ( ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
nextKey = firstGreaterThan ( rangevalue . end ( ) [ - 1 ] . key ) ;
}
catch ( Error & e ) {
2017-09-29 07:35:08 +08:00
if ( e . code ( ) ! = error_code_transaction_too_old & & e . code ( ) ! = error_code_future_version )
2017-05-26 04:48:44 +08:00
throw ;
2018-08-11 04:57:10 +08:00
wait ( tr . onError ( e ) ) ;
2017-05-26 04:48:44 +08:00
}
}
}
2018-03-08 05:56:34 +08:00
Future < Void > readCommitted ( Database cx , PromiseStream < RCGroup > results , Reference < FlowLock > lock , KeyRangeRef range , std : : function < std : : pair < uint64_t , uint32_t > ( Key key ) > groupBy ) {
return readCommitted ( cx , results , Void ( ) , lock , range , groupBy , true , true , true ) ;
2017-05-26 04:48:44 +08:00
}
2017-09-21 06:50:20 +08:00
ACTOR Future < int > dumpData ( Database cx , PromiseStream < RCGroup > results , Reference < FlowLock > lock , Key uid , Key addPrefix , Key removePrefix , RequestStream < CommitTransactionRequest > commit ,
2017-05-26 04:48:44 +08:00
NotifiedVersion * committedVersion , Optional < Version > endVersion , Key rangeBegin , PromiseStream < Future < Void > > addActor , FlowLock * commitLock , Reference < KeyRangeMap < Version > > keyVersion ) {
state Version lastVersion = invalidVersion ;
state bool endOfStream = false ;
state int totalBytes = 0 ;
loop {
state CommitTransactionRequest req ;
state Version newBeginVersion = invalidVersion ;
state int mutationSize = 0 ;
loop {
try {
RCGroup group = waitNext ( results . getFuture ( ) ) ;
lock - > release ( group . items . expectedSize ( ) ) ;
BinaryWriter bw ( Unversioned ( ) ) ;
for ( int i = 0 ; i < group . items . size ( ) ; + + i ) {
bw . serializeBytes ( group . items [ i ] . value ) ;
}
decodeBackupLogValue ( req . arena , req . transaction . mutations , mutationSize , bw . toStringRef ( ) , addPrefix , removePrefix , group . groupKey , keyVersion ) ;
newBeginVersion = group . groupKey + 1 ;
if ( mutationSize > = CLIENT_KNOBS - > BACKUP_LOG_WRITE_BATCH_MAX_SIZE ) {
break ;
}
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_end_of_stream ) {
if ( endVersion . present ( ) & & endVersion . get ( ) > lastVersion & & endVersion . get ( ) > newBeginVersion ) {
newBeginVersion = endVersion . get ( ) ;
}
if ( newBeginVersion = = invalidVersion )
return totalBytes ;
endOfStream = true ;
break ;
}
throw ;
}
}
Key applyBegin = uid . withPrefix ( applyMutationsBeginRange . begin ) ;
Key versionKey = BinaryWriter : : toValue ( newBeginVersion , Unversioned ( ) ) ;
Key rangeEnd = getApplyKey ( newBeginVersion , uid ) ;
2017-09-21 06:50:20 +08:00
2017-05-26 04:48:44 +08:00
req . transaction . mutations . push_back_deep ( req . arena , MutationRef ( MutationRef : : SetValue , applyBegin , versionKey ) ) ;
2017-09-21 06:50:20 +08:00
req . transaction . write_conflict_ranges . push_back_deep ( req . arena , singleKeyRange ( applyBegin ) ) ;
2017-05-26 04:48:44 +08:00
req . transaction . mutations . push_back_deep ( req . arena , MutationRef ( MutationRef : : ClearRange , rangeBegin , rangeEnd ) ) ;
2017-09-21 06:50:20 +08:00
req . transaction . write_conflict_ranges . push_back_deep ( req . arena , singleKeyRange ( rangeBegin ) ) ;
2017-05-26 04:48:44 +08:00
2017-12-21 08:54:57 +08:00
// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
2017-05-26 04:48:44 +08:00
req . transaction . read_snapshot = committedVersion - > get ( ) ;
2018-02-10 10:21:29 +08:00
req . flags = req . flags | CommitTransactionRequest : : FLAG_IS_LOCK_AWARE ;
2017-05-26 04:48:44 +08:00
totalBytes + = mutationSize ;
2018-08-11 04:57:10 +08:00
wait ( commitLock - > take ( TaskDefaultYield , mutationSize ) ) ;
2017-05-26 04:48:44 +08:00
addActor . send ( commitLock - > releaseWhen ( success ( commit . getReply ( req ) ) , mutationSize ) ) ;
if ( endOfStream ) {
return totalBytes ;
}
}
}
ACTOR Future < Void > coalesceKeyVersionCache ( Key uid , Version endVersion , Reference < KeyRangeMap < Version > > keyVersion , RequestStream < CommitTransactionRequest > commit , NotifiedVersion * committedVersion , PromiseStream < Future < Void > > addActor , FlowLock * commitLock ) {
Version lastVersion = - 1000 ;
int64_t removed = 0 ;
state CommitTransactionRequest req ;
state int64_t mutationSize = 0 ;
Key mapPrefix = uid . withPrefix ( applyMutationsKeyVersionMapRange . begin ) ;
for ( auto it : keyVersion - > ranges ( ) ) {
if ( lastVersion = = - 1000 ) {
lastVersion = it . value ( ) ;
} else {
Version ver = it . value ( ) ;
if ( ver < endVersion & & lastVersion < endVersion & & ver ! = invalidVersion & & lastVersion ! = invalidVersion ) {
Key removeKey = it . range ( ) . begin . withPrefix ( mapPrefix ) ;
Key removeEnd = keyAfter ( removeKey ) ;
req . transaction . mutations . push_back_deep ( req . arena , MutationRef ( MutationRef : : ClearRange , removeKey , removeEnd ) ) ;
mutationSize + = removeKey . size ( ) + removeEnd . size ( ) ;
removed - - ;
} else {
lastVersion = ver ;
}
}
}
if ( removed ! = 0 ) {
Key countKey = uid . withPrefix ( applyMutationsKeyVersionCountRange . begin ) ;
req . transaction . write_conflict_ranges . push_back_deep ( req . arena , singleKeyRange ( countKey ) ) ;
req . transaction . mutations . push_back_deep ( req . arena , MutationRef ( MutationRef : : AddValue , countKey , StringRef ( ( uint8_t * ) & removed , 8 ) ) ) ;
req . transaction . read_snapshot = committedVersion - > get ( ) ;
2018-02-10 10:21:29 +08:00
req . flags = req . flags | CommitTransactionRequest : : FLAG_IS_LOCK_AWARE ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( commitLock - > take ( TaskDefaultYield , mutationSize ) ) ;
2017-05-26 04:48:44 +08:00
addActor . send ( commitLock - > releaseWhen ( success ( commit . getReply ( req ) ) , mutationSize ) ) ;
}
return Void ( ) ;
}
ACTOR Future < Void > applyMutations ( Database cx , Key uid , Key addPrefix , Key removePrefix , Version beginVersion , Version * endVersion , RequestStream < CommitTransactionRequest > commit , NotifiedVersion * committedVersion , Reference < KeyRangeMap < Version > > keyVersion ) {
state FlowLock commitLock ( CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ;
state PromiseStream < Future < Void > > addActor ;
state Future < Void > error = actorCollection ( addActor . getFuture ( ) ) ;
state int maxBytes = CLIENT_KNOBS - > APPLY_MIN_LOCK_BYTES ;
2019-03-01 09:45:00 +08:00
keyVersion - > insert ( metadataVersionKey , 0 ) ;
2017-05-26 04:48:44 +08:00
try {
loop {
if ( beginVersion > = * endVersion ) {
2018-08-11 04:57:10 +08:00
wait ( commitLock . take ( TaskDefaultYield , CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ) ;
2017-05-26 04:48:44 +08:00
commitLock . release ( CLIENT_KNOBS - > BACKUP_LOCK_BYTES ) ;
if ( beginVersion > = * endVersion ) {
return Void ( ) ;
}
}
int rangeCount = std : : max ( 1 , CLIENT_KNOBS - > APPLY_MAX_LOCK_BYTES / maxBytes ) ;
state Version newEndVersion = std : : min ( * endVersion , ( ( beginVersion / CLIENT_KNOBS - > APPLY_BLOCK_SIZE ) + rangeCount ) * CLIENT_KNOBS - > APPLY_BLOCK_SIZE ) ;
state Standalone < VectorRef < KeyRangeRef > > ranges = getApplyRanges ( beginVersion , newEndVersion , uid ) ;
state size_t idx ;
state std : : vector < PromiseStream < RCGroup > > results ;
state std : : vector < Future < Void > > rc ;
state std : : vector < Reference < FlowLock > > locks ;
for ( int i = 0 ; i < ranges . size ( ) ; + + i ) {
results . push_back ( PromiseStream < RCGroup > ( ) ) ;
locks . push_back ( Reference < FlowLock > ( new FlowLock ( std : : max ( CLIENT_KNOBS - > APPLY_MAX_LOCK_BYTES / ranges . size ( ) , CLIENT_KNOBS - > APPLY_MIN_LOCK_BYTES ) ) ) ) ;
rc . push_back ( readCommitted ( cx , results [ i ] , locks [ i ] , ranges [ i ] , decodeBKMutationLogKey ) ) ;
}
maxBytes = std : : max < int > ( maxBytes * CLIENT_KNOBS - > APPLY_MAX_DECAY_RATE , CLIENT_KNOBS - > APPLY_MIN_LOCK_BYTES ) ;
for ( idx = 0 ; idx < ranges . size ( ) ; + + idx ) {
int bytes = wait ( dumpData ( cx , results [ idx ] , locks [ idx ] , uid , addPrefix , removePrefix , commit , committedVersion , idx = = ranges . size ( ) - 1 ? newEndVersion : Optional < Version > ( ) , ranges [ idx ] . begin , addActor , & commitLock , keyVersion ) ) ;
maxBytes = std : : max < int > ( CLIENT_KNOBS - > APPLY_MAX_INCREASE_FACTOR * bytes , maxBytes ) ;
if ( error . isError ( ) ) throw error . getError ( ) ;
}
2018-08-11 04:57:10 +08:00
wait ( coalesceKeyVersionCache ( uid , newEndVersion , keyVersion , commit , committedVersion , addActor , & commitLock ) ) ;
2017-05-26 04:48:44 +08:00
beginVersion = newEndVersion ;
}
} catch ( Error & e ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( e . code ( ) = = error_code_restore_missing_data ? SevWarnAlways : SevError , " ApplyMutationsError " ) . error ( e ) ;
2017-05-26 04:48:44 +08:00
throw ;
}
2017-09-21 06:50:20 +08:00
}
2018-02-21 05:22:31 +08:00
2018-06-07 04:05:53 +08:00
ACTOR static Future < Void > _eraseLogData ( Database cx , Key logUidValue , Key destUidValue , Optional < Version > endVersion , bool checkBackupUid , Version backupUid ) {
2018-02-21 05:22:31 +08:00
state Key backupLatestVersionsPath = destUidValue . withPrefix ( backupLatestVersionsPrefix ) ;
state Key backupLatestVersionsKey = logUidValue . withPrefix ( backupLatestVersionsPath ) ;
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
if ( ! destUidValue . size ( ) ) {
2018-02-21 05:22:31 +08:00
return Void ( ) ;
}
2018-06-07 04:05:53 +08:00
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
loop {
try {
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
if ( checkBackupUid ) {
Subspace sourceStates = Subspace ( databaseBackupPrefixRange . begin ) . get ( BackupAgentBase : : keySourceStates ) . get ( logUidValue ) ;
Optional < Value > v = wait ( tr - > get ( sourceStates . pack ( DatabaseBackupAgent : : keyFolderId ) ) ) ;
if ( v . present ( ) & & BinaryReader : : fromStringRef < Version > ( v . get ( ) , Unversioned ( ) ) > backupUid )
return Void ( ) ;
2018-02-21 05:22:31 +08:00
}
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
state Standalone < RangeResultRef > backupVersions = wait ( tr - > getRange ( KeyRangeRef ( backupLatestVersionsPath , strinc ( backupLatestVersionsPath ) ) , CLIENT_KNOBS - > TOO_MANY ) ) ;
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion ;
for ( auto backupVersion : backupVersions ) {
Key currLogUidValue = backupVersion . key . removePrefix ( backupLatestVersionsPrefix ) . removePrefix ( destUidValue ) ;
2018-02-21 05:22:31 +08:00
2018-06-07 04:05:53 +08:00
if ( currLogUidValue = = logUidValue ) {
currBeginVersion = BinaryReader : : fromStringRef < Version > ( backupVersion . value , Unversioned ( ) ) ;
break ;
}
2018-02-21 05:22:31 +08:00
}
2018-06-07 04:05:53 +08:00
// Do not clear anything if version history key cannot be found
if ( currBeginVersion = = invalidVersion ) {
return Void ( ) ;
}
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
state Version currEndVersion = currBeginVersion + CLIENT_KNOBS - > CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS - > LOG_RANGE_BLOCK_SIZE ;
if ( endVersion . present ( ) ) {
currEndVersion = std : : min ( currEndVersion , endVersion . get ( ) ) ;
}
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
state Version nextSmallestVersion = currEndVersion ;
bool clearLogRangesRequired = true ;
// More than one backup/DR with the same range
if ( backupVersions . size ( ) > 1 ) {
for ( auto backupVersion : backupVersions ) {
Key currLogUidValue = backupVersion . key . removePrefix ( backupLatestVersionsPrefix ) . removePrefix ( destUidValue ) ;
Version currVersion = BinaryReader : : fromStringRef < Version > ( backupVersion . value , Unversioned ( ) ) ;
if ( currLogUidValue = = logUidValue ) {
continue ;
} else if ( currVersion > currBeginVersion ) {
nextSmallestVersion = std : : min ( currVersion , nextSmallestVersion ) ;
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false ;
break ;
}
}
}
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
if ( ! endVersion . present ( ) & & backupVersions . size ( ) = = 1 ) {
// Clear version history
tr - > clear ( prefixRange ( backupLatestVersionsPath ) ) ;
2018-03-17 06:40:59 +08:00
2018-06-07 04:05:53 +08:00
// Clear everything under blog/[destUid]
tr - > clear ( prefixRange ( destUidValue . withPrefix ( backupLogKeys . begin ) ) ) ;
2018-03-14 02:21:24 +08:00
2018-06-07 04:05:53 +08:00
// Disable committing mutations into blog
tr - > clear ( prefixRange ( destUidValue . withPrefix ( logRangesRange . begin ) ) ) ;
} else {
if ( ! endVersion . present ( ) & & currEndVersion > = nextSmallestVersion ) {
// Clear current backup version history
tr - > clear ( backupLatestVersionsKey ) ;
} else {
// Update current backup latest version
tr - > set ( backupLatestVersionsKey , BinaryWriter : : toValue < Version > ( currEndVersion , Unversioned ( ) ) ) ;
2018-03-14 02:21:24 +08:00
}
2018-06-07 04:05:53 +08:00
// Clear log ranges if needed
if ( clearLogRangesRequired ) {
Standalone < VectorRef < KeyRangeRef > > ranges = getLogRanges ( currBeginVersion , nextSmallestVersion , destUidValue ) ;
for ( auto & range : ranges ) {
tr - > clear ( range ) ;
}
2018-03-17 06:40:59 +08:00
}
2018-06-07 04:05:53 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( tr - > commit ( ) ) ;
2018-03-17 06:40:59 +08:00
2018-06-07 04:05:53 +08:00
if ( ! endVersion . present ( ) & & ( backupVersions . size ( ) = = 1 | | currEndVersion > = nextSmallestVersion ) ) {
return Void ( ) ;
2018-03-14 02:21:24 +08:00
}
2018-06-07 04:05:53 +08:00
if ( endVersion . present ( ) & & currEndVersion = = endVersion . get ( ) ) {
return Void ( ) ;
}
tr - > reset ( ) ;
} catch ( Error & e ) {
2018-08-11 04:57:10 +08:00
wait ( tr - > onError ( e ) ) ;
2018-03-14 02:21:24 +08:00
}
}
}
2018-06-07 04:05:53 +08:00
Future < Void > eraseLogData ( Database cx , Key logUidValue , Key destUidValue , Optional < Version > endVersion , bool checkBackupUid , Version backupUid ) {
return _eraseLogData ( cx , logUidValue , destUidValue , endVersion , checkBackupUid , backupUid ) ;
}