2019-05-07 07:56:49 +08:00
/*
* RestoreLoader . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2019-05-10 11:55:44 +08:00
2019-05-13 12:53:09 +08:00
// This file implements the functions and actors used by the RestoreLoader role.
// The RestoreLoader role starts with the restoreLoaderCore actor
2019-05-10 11:55:44 +08:00
# include "fdbclient/BackupContainer.h"
# include "fdbserver/RestoreLoader.actor.h"
# include "flow/actorcompiler.h" // This must be the last #include.
2019-06-01 02:09:31 +08:00
typedef std : : map < Standalone < StringRef > , Standalone < StringRef > > SerializedMutationListMap ; // Key is the signature/version of the mutation list, Value is the mutation list (or part of the mutation list)
bool isRangeMutation ( MutationRef m ) ;
void splitMutation ( Reference < RestoreLoaderData > self , MutationRef m , Arena & mvector_arena , VectorRef < MutationRef > & mvector , Arena & nodeIDs_arena , VectorRef < UID > & nodeIDs ) ;
void _parseSerializedMutation ( VersionedMutationsMap * kvOps , SerializedMutationListMap * mutationMap , bool isSampling = false ) ;
2019-05-10 11:55:44 +08:00
2019-05-28 09:39:30 +08:00
ACTOR Future < Void > handleSetApplierKeyRangeVectorRequest ( RestoreSetApplierKeyRangeVectorRequest req , Reference < RestoreLoaderData > self ) ;
ACTOR Future < Void > handleLoadFileRequest ( RestoreLoadFileRequest req , Reference < RestoreLoaderData > self , bool isSampling = false ) ;
2019-06-01 02:09:31 +08:00
ACTOR Future < Void > registerMutationsToApplier ( Reference < RestoreLoaderData > self , VersionedMutationsMap * kvOps , bool isRangeFile , Version startVersion , Version endVersion ) ;
ACTOR static Future < Void > _parseLogFileToMutationsOnLoader ( SerializedMutationListMap * mutationMap ,
2019-05-28 09:39:30 +08:00
std : : map < Standalone < StringRef > , uint32_t > * mutationPartMap ,
2019-05-10 11:55:44 +08:00
Reference < IBackupContainer > bc , Version version ,
std : : string fileName , int64_t readOffset , int64_t readLen ,
KeyRange restoreRange , Key addPrefix , Key removePrefix ,
2019-05-28 09:39:30 +08:00
Key mutationLogPrefix ) ;
2019-06-01 02:09:31 +08:00
ACTOR static Future < Void > _parseRangeFileToMutationsOnLoader ( VersionedMutationsMap * kvOps ,
2019-05-10 11:55:44 +08:00
Reference < IBackupContainer > bc , Version version ,
2019-05-31 02:18:24 +08:00
std : : string fileName , int64_t readOffset_input , int64_t readLen_input , KeyRange restoreRange ) ;
2019-05-10 11:55:44 +08:00
ACTOR Future < Void > restoreLoaderCore ( Reference < RestoreLoaderData > self , RestoreLoaderInterface loaderInterf , Database cx ) {
state ActorCollection actors ( false ) ;
2019-05-23 04:30:33 +08:00
state Future < Void > exitRole = Never ( ) ;
2019-05-10 11:55:44 +08:00
state double lastLoopTopTime ;
loop {
double loopTopTime = now ( ) ;
double elapsedTime = loopTopTime - lastLoopTopTime ;
if ( elapsedTime > 0.050 ) {
if ( g_random - > random01 ( ) < 0.01 )
TraceEvent ( SevWarn , " SlowRestoreLoaderLoopx100 " ) . detail ( " NodeDesc " , self - > describeNode ( ) ) . detail ( " Elapsed " , elapsedTime ) ;
}
lastLoopTopTime = loopTopTime ;
state std : : string requestTypeStr = " [Init] " ;
try {
choose {
when ( RestoreSimpleRequest req = waitNext ( loaderInterf . heartbeat . getFuture ( ) ) ) {
requestTypeStr = " heartbeat " ;
2019-05-23 04:30:33 +08:00
actors . add ( handleHeartbeat ( req , loaderInterf . id ( ) ) ) ;
2019-05-10 11:55:44 +08:00
}
when ( RestoreSetApplierKeyRangeVectorRequest req = waitNext ( loaderInterf . setApplierKeyRangeVectorRequest . getFuture ( ) ) ) {
requestTypeStr = " setApplierKeyRangeVectorRequest " ;
2019-05-23 04:30:33 +08:00
actors . add ( handleSetApplierKeyRangeVectorRequest ( req , self ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-05-28 09:39:30 +08:00
when ( RestoreLoadFileRequest req = waitNext ( loaderInterf . loadFile . getFuture ( ) ) ) {
requestTypeStr = " loadFile " ;
2019-05-10 11:55:44 +08:00
self - > initBackupContainer ( req . param . url ) ;
2019-05-28 09:39:30 +08:00
actors . add ( handleLoadFileRequest ( req , self , false ) ) ;
2019-05-10 11:55:44 +08:00
}
when ( RestoreVersionBatchRequest req = waitNext ( loaderInterf . initVersionBatch . getFuture ( ) ) ) {
requestTypeStr = " initVersionBatch " ;
2019-05-23 04:30:33 +08:00
actors . add ( handleInitVersionBatchRequest ( req , self ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-06-05 02:40:23 +08:00
when ( RestoreVersionBatchRequest req = waitNext ( loaderInterf . finishRestore . getFuture ( ) ) ) {
2019-05-11 07:48:01 +08:00
requestTypeStr = " finishRestore " ;
2019-06-05 13:17:08 +08:00
exitRole = handleFinishRestoreRequest ( req , self ) ;
2019-05-11 07:48:01 +08:00
}
2019-05-23 04:30:33 +08:00
when ( wait ( exitRole ) ) {
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " RestoreLoaderCore " , " ExitRole " ) . detail ( " NodeID " , self - > id ( ) ) ;
2019-05-23 04:30:33 +08:00
break ;
2019-05-10 11:55:44 +08:00
}
}
} catch ( Error & e ) {
2019-06-01 02:09:31 +08:00
TraceEvent ( SevWarn , " FastRestore " ) . detail ( " RestoreLoaderError " , e . what ( ) ) . detail ( " RequestType " , requestTypeStr ) ;
break ;
2019-05-10 11:55:44 +08:00
}
}
2019-06-01 02:09:31 +08:00
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}
ACTOR Future < Void > handleSetApplierKeyRangeVectorRequest ( RestoreSetApplierKeyRangeVectorRequest req , Reference < RestoreLoaderData > self ) {
// Idempodent operation. OK to re-execute the duplicate cmd
2019-05-30 04:26:17 +08:00
if ( self - > range2Applier . empty ( ) ) {
self - > range2Applier = req . range2Applier ;
2019-06-01 02:09:31 +08:00
}
2019-05-30 04:26:17 +08:00
req . reply . send ( RestoreCommonReply ( self - > id ( ) ) ) ;
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}
2019-05-28 09:39:30 +08:00
ACTOR Future < Void > _processLoadingParam ( LoadingParam param , Reference < RestoreLoaderData > self ) {
2019-06-05 02:40:23 +08:00
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent ( " FastRestore " ) . detail ( " Loader " , self - > id ( ) ) . detail ( " StartProcessLoadParam " , param . toString ( ) ) ;
ASSERT ( param . blockSize > 0 ) ;
ASSERT ( param . offset % param . blockSize = = 0 ) ; // Parse file must be at block bondary.
2019-05-28 09:39:30 +08:00
// Temporary data structure for parsing range and log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
2019-06-01 02:09:31 +08:00
state VersionedMutationsMap kvOps ;
state SerializedMutationListMap mutationMap ; // Key is the unique identifier for a batch of mutation logs at the same version
2019-05-28 09:39:30 +08:00
state std : : map < Standalone < StringRef > , uint32_t > mutationPartMap ; // Sanity check the data parsing is correct
2019-06-01 02:09:31 +08:00
state std : : vector < Future < Void > > fileParserFutures ;
2019-06-05 02:40:23 +08:00
2019-05-28 09:39:30 +08:00
state int64_t j ;
state int64_t readOffset ;
state int64_t readLen ;
2019-05-10 11:55:44 +08:00
for ( j = param . offset ; j < param . length ; j + = param . blockSize ) {
readOffset = j ;
readLen = std : : min < int64_t > ( param . blockSize , param . length - j ) ;
2019-05-28 09:39:30 +08:00
if ( param . isRangeFile ) {
2019-06-01 02:09:31 +08:00
fileParserFutures . push_back ( _parseRangeFileToMutationsOnLoader ( & kvOps , self - > bc , param . version , param . filename , readOffset , readLen , param . restoreRange ) ) ;
2019-05-28 09:39:30 +08:00
} else {
2019-06-01 02:09:31 +08:00
fileParserFutures . push_back ( _parseLogFileToMutationsOnLoader ( & mutationMap , & mutationPartMap , self - > bc , param . version , param . filename , readOffset , readLen , param . restoreRange , param . addPrefix , param . removePrefix , param . mutationLogPrefix ) ) ;
2019-05-28 09:39:30 +08:00
}
2019-05-10 11:55:44 +08:00
}
2019-06-01 02:09:31 +08:00
wait ( waitForAll ( fileParserFutures ) ) ;
2019-05-10 11:55:44 +08:00
2019-05-28 09:39:30 +08:00
if ( ! param . isRangeFile ) {
_parseSerializedMutation ( & kvOps , & mutationMap ) ;
2019-05-10 11:55:44 +08:00
}
2019-05-31 12:22:33 +08:00
wait ( registerMutationsToApplier ( self , & kvOps , param . isRangeFile , param . prevVersion , param . endVersion ) ) ; // Send the parsed mutation to applier who will apply the mutation to DB
2019-06-01 02:09:31 +08:00
TraceEvent ( " FastRestore " ) . detail ( " Loader " , self - > id ( ) ) . detail ( " FinishLoadingFile " , param . filename ) ;
2019-05-14 01:37:14 +08:00
2019-05-28 09:39:30 +08:00
return Void ( ) ;
}
2019-05-10 11:55:44 +08:00
2019-05-28 09:39:30 +08:00
ACTOR Future < Void > handleLoadFileRequest ( RestoreLoadFileRequest req , Reference < RestoreLoaderData > self , bool isSampling ) {
2019-06-01 02:09:31 +08:00
if ( self - > processedFileParams . find ( req . param ) = = self - > processedFileParams . end ( ) ) {
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " Loader " , self - > id ( ) ) . detail ( " ProcessLoadParam " , req . param . toString ( ) ) ;
2019-06-01 02:09:31 +08:00
self - > processedFileParams [ req . param ] = Never ( ) ;
self - > processedFileParams [ req . param ] = _processLoadingParam ( req . param , self ) ;
2019-05-14 16:49:44 +08:00
}
2019-06-01 02:09:31 +08:00
ASSERT ( self - > processedFileParams . find ( req . param ) ! = self - > processedFileParams . end ( ) ) ;
wait ( self - > processedFileParams [ req . param ] ) ; // wait on the processing of the req.param.
2019-05-23 04:30:33 +08:00
2019-05-30 04:42:35 +08:00
req . reply . send ( RestoreCommonReply ( self - > id ( ) ) ) ;
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}
2019-06-01 02:09:31 +08:00
// TODO: This function can be revised better
2019-05-28 09:39:30 +08:00
ACTOR Future < Void > registerMutationsToApplier ( Reference < RestoreLoaderData > self ,
VersionedMutationsMap * pkvOps ,
bool isRangeFile , Version startVersion , Version endVersion ) {
state VersionedMutationsMap & kvOps = * pkvOps ;
2019-05-23 04:30:33 +08:00
state int kvCount = 0 ;
state int splitMutationIndex = 0 ;
2019-06-01 02:09:31 +08:00
TraceEvent ( " FastRestore " ) . detail ( " RegisterMutationToApplier " , self - > id ( ) ) . detail ( " IsRangeFile " , isRangeFile )
. detail ( " StartVersion " , startVersion ) . detail ( " EndVersion " , endVersion ) ;
2019-05-23 04:30:33 +08:00
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
2019-05-28 09:39:30 +08:00
if ( kvOps . find ( endVersion ) = = kvOps . end ( ) ) {
2019-06-01 02:09:31 +08:00
kvOps [ endVersion ] = VectorRef < MutationRef > ( ) ; // Empty mutation vector will be handled by applier
2019-05-23 04:30:33 +08:00
}
state std : : map < UID , Standalone < VectorRef < MutationRef > > > applierMutationsBuffer ; // The mutation vector to be sent to each applier
state std : : map < UID , double > applierMutationsSize ; // buffered mutation vector size for each applier
state Standalone < VectorRef < MutationRef > > mvector ;
state Standalone < VectorRef < UID > > nodeIDs ;
// Initialize the above two maps
state std : : vector < UID > applierIDs = self - > getWorkingApplierIDs ( ) ;
2019-05-28 09:39:30 +08:00
state std : : vector < std : : pair < UID , RestoreSendMutationVectorVersionedRequest > > requests ;
2019-05-23 04:30:33 +08:00
state Version prevVersion = startVersion ;
2019-06-01 02:09:31 +08:00
splitMutationIndex = 0 ;
kvCount = 0 ;
state VersionedMutationsMap : : iterator kvOp ;
for ( kvOp = kvOps . begin ( ) ; kvOp ! = kvOps . end ( ) ; kvOp + + ) {
applierMutationsBuffer . clear ( ) ;
applierMutationsSize . clear ( ) ;
for ( auto & applierID : applierIDs ) {
applierMutationsBuffer [ applierID ] = Standalone < VectorRef < MutationRef > > ( VectorRef < MutationRef > ( ) ) ;
applierMutationsSize [ applierID ] = 0.0 ;
}
state Version commitVersion = kvOp - > first ;
state int mIndex ;
state MutationRef kvm ;
for ( mIndex = 0 ; mIndex < kvOp - > second . size ( ) ; mIndex + + ) {
kvm = kvOp - > second [ mIndex ] ;
// Send the mutation to applier
if ( isRangeMutation ( kvm ) ) {
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
mvector . pop_front ( mvector . size ( ) ) ;
nodeIDs . pop_front ( nodeIDs . size ( ) ) ;
// WARNING: The splitMutation() may have bugs
splitMutation ( self , kvm , mvector . arena ( ) , mvector . contents ( ) , nodeIDs . arena ( ) , nodeIDs . contents ( ) ) ;
ASSERT ( mvector . size ( ) = = nodeIDs . size ( ) ) ;
for ( splitMutationIndex = 0 ; splitMutationIndex < mvector . size ( ) ; splitMutationIndex + + ) {
MutationRef mutation = mvector [ splitMutationIndex ] ;
UID applierID = nodeIDs [ splitMutationIndex ] ;
2019-06-05 02:40:23 +08:00
//printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex, mutation.toString().c_str(), applierID.toString().c_str());
2019-06-01 02:09:31 +08:00
applierMutationsBuffer [ applierID ] . push_back_deep ( applierMutationsBuffer [ applierID ] . arena ( ) , mutation ) ; // Q: Maybe push_back_deep()?
applierMutationsSize [ applierID ] + = mutation . expectedSize ( ) ;
kvCount + + ;
2019-05-23 04:30:33 +08:00
}
2019-06-01 02:09:31 +08:00
} else { // mutation operates on a particular key
2019-06-05 02:40:23 +08:00
std : : map < Standalone < KeyRef > , UID > : : iterator itlow = self - > range2Applier . upper_bound ( kvm . param1 ) ; // lower_bound returns the iterator that is > m.param1
- - itlow ; // make sure itlow->first <= m.param1
2019-06-01 02:09:31 +08:00
ASSERT ( itlow - > first < = kvm . param1 ) ;
MutationRef mutation = kvm ;
UID applierID = itlow - > second ;
2019-06-05 02:40:23 +08:00
//printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(), applierID.toString().c_str());
2019-06-01 02:09:31 +08:00
kvCount + + ;
applierMutationsBuffer [ applierID ] . push_back_deep ( applierMutationsBuffer [ applierID ] . arena ( ) , mutation ) ; // Q: Maybe push_back_deep()?
applierMutationsSize [ applierID ] + = mutation . expectedSize ( ) ;
}
} // Mutations at the same version
2019-05-23 04:30:33 +08:00
2019-06-01 02:09:31 +08:00
// Register the mutations to appliers for each version
for ( auto & applierID : applierIDs ) {
requests . push_back ( std : : make_pair ( applierID , RestoreSendMutationVectorVersionedRequest ( prevVersion , commitVersion , isRangeFile , applierMutationsBuffer [ applierID ] ) ) ) ;
applierMutationsBuffer [ applierID ] . pop_front ( applierMutationsBuffer [ applierID ] . size ( ) ) ;
applierMutationsSize [ applierID ] = 0 ;
2019-05-23 04:30:33 +08:00
}
2019-06-01 02:09:31 +08:00
wait ( sendBatchRequests ( & RestoreApplierInterface : : sendMutationVector , self - > appliersInterf , requests ) ) ;
requests . clear ( ) ;
ASSERT ( prevVersion < commitVersion ) ;
prevVersion = commitVersion ;
} // all versions of mutations
2019-05-23 04:30:33 +08:00
2019-06-01 02:09:31 +08:00
TraceEvent ( " FastRestore " ) . detail ( " LoaderRegisterMutationOnAppliers " , kvCount ) ;
2019-05-23 04:30:33 +08:00
return Void ( ) ;
}
2019-05-10 11:55:44 +08:00
2019-05-14 08:24:57 +08:00
2019-05-13 13:05:49 +08:00
// TODO: Add a unit test for this function
2019-05-14 08:24:57 +08:00
void splitMutation ( Reference < RestoreLoaderData > self , MutationRef m , Arena & mvector_arena , VectorRef < MutationRef > & mvector , Arena & nodeIDs_arena , VectorRef < UID > & nodeIDs ) {
2019-05-10 11:55:44 +08:00
// mvector[i] should be mapped to nodeID[i]
ASSERT ( mvector . empty ( ) ) ;
ASSERT ( nodeIDs . empty ( ) ) ;
// key range [m->param1, m->param2)
std : : map < Standalone < KeyRef > , UID > : : iterator itlow , itup ; //we will return [itlow, itup)
itlow = self - > range2Applier . lower_bound ( m . param1 ) ; // lower_bound returns the iterator that is >= m.param1
2019-05-15 08:00:58 +08:00
if ( itlow - > first > m . param1 ) {
if ( itlow ! = self - > range2Applier . begin ( ) ) {
- - itlow ;
2019-05-10 11:55:44 +08:00
}
}
itup = self - > range2Applier . upper_bound ( m . param2 ) ; // upper_bound returns the iterator that is > m.param2; return rmap::end if no keys are considered to go after m.param2.
2019-06-01 02:09:31 +08:00
ASSERT ( itup = = self - > range2Applier . end ( ) | | itup - > first > m . param2 ) ;
2019-05-14 08:24:57 +08:00
2019-05-15 08:39:44 +08:00
std : : map < Standalone < KeyRef > , UID > : : iterator itApplier ;
2019-05-14 08:24:57 +08:00
while ( itlow ! = itup ) {
Standalone < MutationRef > curm ; //current mutation
2019-05-10 11:55:44 +08:00
curm . type = m . type ;
2019-06-01 02:09:31 +08:00
// The first split mutation should starts with m.first. The later ones should start with the range2Applier boundary
2019-05-15 08:00:58 +08:00
if ( m . param1 > itlow - > first ) {
curm . param1 = m . param1 ;
} else {
curm . param1 = itlow - > first ;
}
2019-05-15 08:39:44 +08:00
itApplier = itlow ;
2019-05-10 11:55:44 +08:00
itlow + + ;
2019-05-14 08:24:57 +08:00
if ( itlow = = itup ) {
ASSERT ( m . param2 < = normalKeys . end ) ;
curm . param2 = m . param2 ;
} else if ( m . param2 < itlow - > first ) {
2019-06-01 02:09:31 +08:00
UNREACHABLE ( ) ;
2019-05-14 08:24:57 +08:00
curm . param2 = m . param2 ;
2019-05-10 11:55:44 +08:00
} else {
curm . param2 = itlow - > first ;
}
2019-05-14 08:24:57 +08:00
ASSERT ( curm . param1 < = curm . param2 ) ;
mvector . push_back_deep ( mvector_arena , curm ) ;
2019-05-15 08:39:44 +08:00
nodeIDs . push_back ( nodeIDs_arena , itApplier - > second ) ;
2019-05-10 11:55:44 +08:00
}
return ;
}
2019-05-31 02:18:24 +08:00
// key_input format: [logRangeMutation.first][hash_value_of_commit_version:1B][bigEndian64(commitVersion)][bigEndian32(part)]
// value_input: serialized binary of mutations at the same version
2019-05-28 09:39:30 +08:00
bool concatenateBackupMutationForLogFile ( std : : map < Standalone < StringRef > , Standalone < StringRef > > * pMutationMap ,
std : : map < Standalone < StringRef > , uint32_t > * pMutationPartMap ,
2019-05-31 02:18:24 +08:00
Standalone < StringRef > key_input , Standalone < StringRef > val_input ) {
2019-06-01 02:09:31 +08:00
SerializedMutationListMap & mutationMap = * pMutationMap ;
2019-05-28 09:39:30 +08:00
std : : map < Standalone < StringRef > , uint32_t > & mutationPartMap = * pMutationPartMap ;
2019-05-10 11:55:44 +08:00
std : : string prefix = " || \t " ;
std : : stringstream ss ;
StringRef val = val_input . contents ( ) ;
2019-05-31 02:18:24 +08:00
2019-05-10 11:55:44 +08:00
StringRefReaderMX reader ( val , restore_corrupted_data ( ) ) ;
StringRefReaderMX readerKey ( key_input , restore_corrupted_data ( ) ) ; //read key_input!
int logRangeMutationFirstLength = key_input . size ( ) - 1 - 8 - 4 ;
bool concatenated = false ;
2019-05-31 02:18:24 +08:00
ASSERT_WE_THINK ( key_input . size ( ) > = 1 + 8 + 4 ) ;
2019-05-10 11:55:44 +08:00
if ( logRangeMutationFirstLength > 0 ) {
2019-05-31 02:18:24 +08:00
readerKey . consume ( logRangeMutationFirstLength ) ; // Strip out the [logRangeMutation.first]; otherwise, the following readerKey.consume will produce wrong value
2019-05-10 11:55:44 +08:00
}
uint8_t hashValue = readerKey . consume < uint8_t > ( ) ;
2019-05-31 02:18:24 +08:00
uint64_t commitVersion = readerKey . consumeNetworkUInt64 ( ) ; // Convert big Endian value encoded in log file into a littleEndian uint64_t value, i.e., commitVersion
2019-05-10 11:55:44 +08:00
uint32_t part = readerKey . consumeNetworkUInt32 ( ) ; //Consume big Endian value encoded in log file
//Use commitVersion as id
Standalone < StringRef > id = StringRef ( ( uint8_t * ) & commitVersion , 8 ) ;
2019-05-28 09:39:30 +08:00
if ( mutationMap . find ( id ) = = mutationMap . end ( ) ) {
mutationMap . insert ( std : : make_pair ( id , val_input ) ) ;
2019-05-31 02:18:24 +08:00
if ( part ! = 0 ) {
fprintf ( stderr , " [ERROR]!!! part:%d != 0 for key_input:%s \n " , part , getHexString ( key_input ) . c_str ( ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-05-31 02:18:24 +08:00
mutationPartMap . insert ( std : : make_pair ( id , part ) ) ;
} else { // concatenate the val string with the same commitVersion
2019-05-28 09:39:30 +08:00
mutationMap [ id ] = mutationMap [ id ] . contents ( ) . withSuffix ( val_input . contents ( ) ) ; //Assign the new Areana to the map's value
2019-05-31 02:18:24 +08:00
if ( part ! = ( mutationPartMap [ id ] + 1 ) ) {
// Check if the same range or log file has been processed more than once!
fprintf ( stderr , " [ERROR]!!! current part id:%d new part_direct:%d is not the next integer of key_input:%s \n " , mutationPartMap [ id ] , part , getHexString ( key_input ) . c_str ( ) ) ;
2019-05-10 11:55:44 +08:00
printf ( " [HINT] Check if the same range or log file has been processed more than once! \n " ) ;
}
2019-05-31 02:18:24 +08:00
mutationPartMap [ id ] = part ;
2019-05-10 11:55:44 +08:00
concatenated = true ;
}
return concatenated ;
}
bool isRangeMutation ( MutationRef m ) {
if ( m . type = = MutationRef : : Type : : ClearRange ) {
2019-05-31 02:18:24 +08:00
ASSERT ( m . type ! = MutationRef : : Type : : DebugKeyRange ) ;
2019-05-10 11:55:44 +08:00
return true ;
} else {
2019-05-31 02:18:24 +08:00
ASSERT ( m . type = = MutationRef : : Type : : SetValue | | isAtomicOp ( ( MutationRef : : Type ) m . type ) ) ;
2019-05-10 11:55:44 +08:00
return false ;
}
}
2019-05-31 02:18:24 +08:00
// Parse the kv pair (version, serialized_mutation), which are the results parsed from log file, into (version, <K, V, mutationType>) pair
// Put the parsed versioned mutations into *pkvOps
// Input key: [commitVersion_of_the_mutation_batch:uint64_t]
// Input value: [includeVersion:uint64_t][val_length:uint32_t][encoded_list_of_mutations], where
// includeVersion is the serialized version in the batch commit. It is not the commitVersion in Input key.
// val_length is always equal to (val.size() - 12); otherwise, we may not get the entire mutation list for the version
// encoded_list_of_mutations: [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
2019-06-01 02:09:31 +08:00
void _parseSerializedMutation ( VersionedMutationsMap * pkvOps , SerializedMutationListMap * pmutationMap , bool isSampling ) {
2019-05-28 09:39:30 +08:00
VersionedMutationsMap & kvOps = * pkvOps ;
2019-06-01 02:09:31 +08:00
SerializedMutationListMap & mutationMap = * pmutationMap ;
2019-05-28 09:39:30 +08:00
for ( auto & m : mutationMap ) {
2019-05-10 11:55:44 +08:00
StringRef k = m . first . contents ( ) ;
StringRef val = m . second . contents ( ) ;
2019-05-31 02:18:24 +08:00
StringRefReaderMX kReader ( k , restore_corrupted_data ( ) ) ;
uint64_t commitVersion = kReader . consume < uint64_t > ( ) ; // Consume little Endian data
2019-05-28 09:39:30 +08:00
kvOps . insert ( std : : make_pair ( commitVersion , VectorRef < MutationRef > ( ) ) ) ;
2019-05-10 11:55:44 +08:00
2019-05-31 02:18:24 +08:00
StringRefReaderMX vReader ( val , restore_corrupted_data ( ) ) ;
vReader . consume < uint64_t > ( ) ; // Consume the includeVersion
uint32_t val_length_decoded = vReader . consume < uint32_t > ( ) ; // Parse little endian value, confirmed it is correct!
ASSERT ( val_length_decoded = = val . size ( ) - 12 ) ; // 12 is the length of [includeVersion:uint64_t][val_length:uint32_t]
2019-05-10 11:55:44 +08:00
while ( 1 ) {
// stop when reach the end of the string
2019-05-31 02:18:24 +08:00
if ( vReader . eof ( ) ) { //|| *reader.rptr == 0xFF
2019-05-10 11:55:44 +08:00
break ;
}
2019-05-31 02:18:24 +08:00
uint32_t type = vReader . consume < uint32_t > ( ) ;
uint32_t kLen = vReader . consume < uint32_t > ( ) ;
uint32_t vLen = vReader . consume < uint32_t > ( ) ;
const uint8_t * k = vReader . consume ( kLen ) ;
const uint8_t * v = vReader . consume ( vLen ) ;
2019-05-10 11:55:44 +08:00
MutationRef mutation ( ( MutationRef : : Type ) type , KeyRef ( k , kLen ) , KeyRef ( v , vLen ) ) ;
2019-05-28 09:39:30 +08:00
kvOps [ commitVersion ] . push_back_deep ( kvOps [ commitVersion ] . arena ( ) , mutation ) ;
2019-05-31 02:18:24 +08:00
ASSERT_WE_THINK ( kLen > = 0 & & kLen < val . size ( ) ) ;
ASSERT_WE_THINK ( vLen > = 0 & & vLen < val . size ( ) ) ;
2019-05-10 11:55:44 +08:00
}
}
}
2019-05-31 02:18:24 +08:00
// Parsing the data blocks in a range file
2019-05-28 09:39:30 +08:00
ACTOR static Future < Void > _parseRangeFileToMutationsOnLoader ( VersionedMutationsMap * pkvOps ,
2019-05-10 11:55:44 +08:00
Reference < IBackupContainer > bc , Version version ,
2019-05-31 02:18:24 +08:00
std : : string fileName , int64_t readOffset , int64_t readLen ,
KeyRange restoreRange ) {
2019-05-28 09:39:30 +08:00
state VersionedMutationsMap & kvOps = * pkvOps ;
2019-05-10 11:55:44 +08:00
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference < IAsyncFile > inFile = wait ( bc - > readFile ( fileName ) ) ;
state Standalone < VectorRef < KeyValueRef > > blockData = wait ( parallelFileRestore : : decodeRangeFileBlock ( inFile , readOffset , readLen ) ) ;
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " DecodedRangeFile " , fileName ) . detail ( " DataSize " , blockData . contents ( ) . size ( ) ) ;
2019-05-10 11:55:44 +08:00
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef ( blockData . front ( ) . key , blockData . back ( ) . key ) ;
// If fileRange doesn't intersect restore range then we're done.
if ( ! fileRange . intersects ( restoreRange ) ) {
return Void ( ) ;
}
// We know the file range intersects the restore range but there could still be keys outside the restore range.
2019-05-31 02:18:24 +08:00
// Find the subvector of kv pairs that intersect the restore range.
// Note that the first and last keys are just the range endpoints for this file. They are metadata, not the real data
int rangeStart = 1 ;
int rangeEnd = blockData . size ( ) - 1 ; // The rangeStart and rangeEnd is [,)
2019-05-10 11:55:44 +08:00
2019-05-28 09:39:30 +08:00
// Slide start from begining, stop if something in range is found
2019-05-10 11:55:44 +08:00
// Move rangeStart and rangeEnd until they is within restoreRange
while ( rangeStart < rangeEnd & & ! restoreRange . contains ( blockData [ rangeStart ] . key ) ) {
+ + rangeStart ;
2019-05-31 02:18:24 +08:00
}
// Side end backwaself, stop if something at (rangeEnd-1) is found in range
2019-05-10 11:55:44 +08:00
while ( rangeEnd > rangeStart & & ! restoreRange . contains ( blockData [ rangeEnd - 1 ] . key ) ) {
- - rangeEnd ;
2019-05-31 02:18:24 +08:00
}
2019-05-10 11:55:44 +08:00
2019-05-31 02:18:24 +08:00
// Now data only contains the kv mutation within restoreRange
2019-05-10 11:55:44 +08:00
state VectorRef < KeyValueRef > data = blockData . slice ( rangeStart , rangeEnd ) ;
state int start = 0 ;
state int end = data . size ( ) ;
2019-05-31 02:18:24 +08:00
// Convert KV in data into mutations in kvOps
for ( int i = start ; i < end ; + + i ) {
// NOTE: The KV pairs in range files are the real KV pairs in original DB.
// Should NOT removePrefix and addPrefix for the backup data!
// In other words, the following operation is wrong: data[i].key.removePrefix(removePrefix).withPrefix(addPrefix)
MutationRef m ( MutationRef : : Type : : SetValue , data [ i ] . key , data [ i ] . value ) ; //ASSUME: all operation in range file is set.
2019-05-10 11:55:44 +08:00
2019-05-31 02:18:24 +08:00
// We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps . insert ( std : : make_pair ( version , VectorRef < MutationRef > ( ) ) ) ;
2019-05-10 11:55:44 +08:00
2019-05-31 02:18:24 +08:00
ASSERT_WE_THINK ( kvOps . find ( version ) ! = kvOps . end ( ) ) ;
kvOps [ version ] . push_back_deep ( kvOps [ version ] . arena ( ) , m ) ;
}
2019-05-10 11:55:44 +08:00
2019-05-31 02:18:24 +08:00
return Void ( ) ;
2019-05-10 11:55:44 +08:00
}
2019-05-31 02:18:24 +08:00
// Parse data blocks in a log file into a vector of <string, string> pairs. Each pair.second contains the mutations at a version encoded in pair.first
// Step 1: decodeLogFileBlock into <string, string> pairs
// Step 2: Concatenate the pair.second of pairs with the same pair.first.
2019-05-28 09:39:30 +08:00
ACTOR static Future < Void > _parseLogFileToMutationsOnLoader ( std : : map < Standalone < StringRef > , Standalone < StringRef > > * pMutationMap ,
std : : map < Standalone < StringRef > , uint32_t > * pMutationPartMap ,
2019-05-10 11:55:44 +08:00
Reference < IBackupContainer > bc , Version version ,
std : : string fileName , int64_t readOffset , int64_t readLen ,
KeyRange restoreRange , Key addPrefix , Key removePrefix ,
Key mutationLogPrefix ) {
state Reference < IAsyncFile > inFile = wait ( bc - > readFile ( fileName ) ) ;
2019-05-31 02:18:24 +08:00
// decodeLogFileBlock() must read block by block!
2019-05-10 11:55:44 +08:00
state Standalone < VectorRef < KeyValueRef > > data = wait ( parallelFileRestore : : decodeLogFileBlock ( inFile , readOffset , readLen ) ) ;
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " DecodedLogFile " , fileName ) . detail ( " DataSize " , data . contents ( ) . size ( ) ) ;
2019-05-10 11:55:44 +08:00
state int start = 0 ;
state int end = data . size ( ) ;
state int numConcatenated = 0 ;
2019-05-31 02:18:24 +08:00
for ( int i = start ; i < end ; + + i ) {
Key k = data [ i ] . key . withPrefix ( mutationLogPrefix ) ;
ValueRef v = data [ i ] . value ;
// Concatenate the backuped param1 and param2 (KV) at the same version.
bool concatenated = concatenateBackupMutationForLogFile ( pMutationMap , pMutationPartMap , data [ i ] . key , data [ i ] . value ) ;
numConcatenated + = ( concatenated ? 1 : 0 ) ;
}
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}