2019-05-07 07:56:49 +08:00
/*
* RestoreApplier . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2019-05-10 11:55:44 +08:00
2019-05-13 12:53:09 +08:00
// This file defines the functions used by the RestoreApplier role.
// RestoreApplier role starts at restoreApplierCore actor
2019-05-10 11:55:44 +08:00
# include "fdbclient/NativeAPI.actor.h"
# include "fdbclient/SystemData.h"
# include "fdbclient/BackupAgent.actor.h"
# include "fdbclient/ManagementAPI.actor.h"
# include "fdbclient/MutationList.h"
# include "fdbclient/BackupContainer.h"
# include "fdbserver/RestoreCommon.actor.h"
# include "fdbserver/RestoreUtil.h"
# include "fdbserver/RestoreRoleCommon.actor.h"
# include "fdbserver/RestoreApplier.actor.h"
# include "flow/actorcompiler.h" // This must be the last #include.
2019-06-05 13:17:08 +08:00
ACTOR static Future < Void > handleSendMutationVectorRequest ( RestoreSendMutationVectorVersionedRequest req , Reference < RestoreApplierData > self ) ;
ACTOR static Future < Void > handleApplyToDBRequest ( RestoreVersionBatchRequest req , Reference < RestoreApplierData > self , Database cx ) ;
2019-05-10 11:55:44 +08:00
2019-07-25 07:59:05 +08:00
ACTOR Future < Void > restoreApplierCore ( RestoreApplierInterface applierInterf , int nodeIndex , Database cx ) {
state Reference < RestoreApplierData > self = Reference < RestoreApplierData > ( new RestoreApplierData ( applierInterf . id ( ) , nodeIndex ) ) ;
2019-05-10 11:55:44 +08:00
state ActorCollection actors ( false ) ;
2019-05-23 04:30:33 +08:00
state Future < Void > exitRole = Never ( ) ;
2019-05-10 11:55:44 +08:00
state double lastLoopTopTime ;
loop {
double loopTopTime = now ( ) ;
double elapsedTime = loopTopTime - lastLoopTopTime ;
if ( elapsedTime > 0.050 ) {
2019-07-26 01:46:11 +08:00
if ( deterministicRandom ( ) - > random01 ( ) < 0.01 )
2019-07-26 08:16:33 +08:00
TraceEvent ( SevWarn , " SlowRestoreApplierLoopx100 " ) . detail ( " NodeDesc " , self - > describeNode ( ) ) . detail ( " Elapsed " , elapsedTime ) ;
2019-05-10 11:55:44 +08:00
}
lastLoopTopTime = loopTopTime ;
state std : : string requestTypeStr = " [Init] " ;
try {
choose {
when ( RestoreSimpleRequest req = waitNext ( applierInterf . heartbeat . getFuture ( ) ) ) {
requestTypeStr = " heartbeat " ;
2019-05-23 04:30:33 +08:00
actors . add ( handleHeartbeat ( req , applierInterf . id ( ) ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-05-23 04:30:33 +08:00
when ( RestoreSendMutationVectorVersionedRequest req = waitNext ( applierInterf . sendMutationVector . getFuture ( ) ) ) {
2019-05-10 11:55:44 +08:00
requestTypeStr = " sendMutationVector " ;
2019-05-31 02:18:24 +08:00
actors . add ( handleSendMutationVectorRequest ( req , self ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-06-05 02:40:23 +08:00
when ( RestoreVersionBatchRequest req = waitNext ( applierInterf . applyToDB . getFuture ( ) ) ) {
2019-05-10 11:55:44 +08:00
requestTypeStr = " applyToDB " ;
actors . add ( handleApplyToDBRequest ( req , self , cx ) ) ;
}
when ( RestoreVersionBatchRequest req = waitNext ( applierInterf . initVersionBatch . getFuture ( ) ) ) {
requestTypeStr = " initVersionBatch " ;
2019-05-23 04:30:33 +08:00
actors . add ( handleInitVersionBatchRequest ( req , self ) ) ;
2019-05-10 11:55:44 +08:00
}
2019-06-05 02:40:23 +08:00
when ( RestoreVersionBatchRequest req = waitNext ( applierInterf . finishRestore . getFuture ( ) ) ) {
2019-05-11 07:48:01 +08:00
requestTypeStr = " finishRestore " ;
2019-06-05 13:17:08 +08:00
exitRole = handleFinishRestoreRequest ( req , self ) ;
2019-05-11 07:48:01 +08:00
}
2019-05-23 04:30:33 +08:00
when ( wait ( exitRole ) ) {
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " RestoreApplierCore " , " ExitRole " ) . detail ( " NodeID " , self - > id ( ) ) ;
2019-05-23 04:30:33 +08:00
break ;
2019-05-10 11:55:44 +08:00
}
}
} catch ( Error & e ) {
2019-06-01 02:09:31 +08:00
TraceEvent ( SevWarn , " FastRestore " ) . detail ( " RestoreLoaderError " , e . what ( ) ) . detail ( " RequestType " , requestTypeStr ) ;
break ;
2019-05-10 11:55:44 +08:00
}
}
2019-06-01 02:09:31 +08:00
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}
2019-05-31 02:18:24 +08:00
// The actor may be invovked multiple times and executed async.
// No race condition as long as we do not wait or yield when operate the shared data, it should be fine,
// because all actors run on 1 thread.
2019-06-05 13:17:08 +08:00
ACTOR static Future < Void > handleSendMutationVectorRequest ( RestoreSendMutationVectorVersionedRequest req , Reference < RestoreApplierData > self ) {
2019-05-23 04:30:33 +08:00
state int numMutations = 0 ;
2019-06-01 02:09:31 +08:00
TraceEvent ( " FastRestore " ) . detail ( " ApplierNode " , self - > id ( ) )
. detail ( " LogVersion " , self - > logVersion . get ( ) ) . detail ( " RangeVersion " , self - > rangeVersion . get ( ) )
. detail ( " Request " , req . toString ( ) ) ;
2019-05-23 04:30:33 +08:00
if ( req . isRangeFile ) {
wait ( self - > rangeVersion . whenAtLeast ( req . prevVersion ) ) ;
} else {
wait ( self - > logVersion . whenAtLeast ( req . prevVersion ) ) ;
}
if ( ( req . isRangeFile & & self - > rangeVersion . get ( ) = = req . prevVersion ) | |
( ! req . isRangeFile & & self - > logVersion . get ( ) = = req . prevVersion ) ) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req . version ;
VectorRef < MutationRef > mutations ( req . mutations ) ;
if ( self - > kvOps . find ( commitVersion ) = = self - > kvOps . end ( ) ) {
self - > kvOps . insert ( std : : make_pair ( commitVersion , VectorRef < MutationRef > ( ) ) ) ;
}
state int mIndex = 0 ;
for ( mIndex = 0 ; mIndex < mutations . size ( ) ; mIndex + + ) {
MutationRef mutation = mutations [ mIndex ] ;
self - > kvOps [ commitVersion ] . push_back_deep ( self - > kvOps [ commitVersion ] . arena ( ) , mutation ) ;
numMutations + + ;
}
// Notify the same actor and unblock the request at the next version
if ( req . isRangeFile ) {
self - > rangeVersion . set ( req . version ) ;
} else {
self - > logVersion . set ( req . version ) ;
}
}
2019-05-30 04:42:35 +08:00
req . reply . send ( RestoreCommonReply ( self - > id ( ) ) ) ;
2019-05-23 04:30:33 +08:00
return Void ( ) ;
}
2019-05-30 06:09:28 +08:00
ACTOR Future < Void > applyToDB ( Reference < RestoreApplierData > self , Database cx ) {
2019-05-10 11:55:44 +08:00
state std : : string typeStr = " " ;
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
if ( self - > kvOps . empty ( ) ) {
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " ApplierApplyToDBEmpty " , self - > id ( ) ) ;
2019-05-10 11:55:44 +08:00
return Void ( ) ;
}
2019-06-05 02:40:23 +08:00
std : : map < Version , Standalone < VectorRef < MutationRef > > > : : iterator begin = self - > kvOps . begin ( ) ;
std : : map < Version , Standalone < VectorRef < MutationRef > > > : : iterator end = self - > kvOps . end ( ) ;
end - - ;
ASSERT_WE_THINK ( end ! = self - > kvOps . end ( ) ) ;
TraceEvent ( " FastRestore " ) . detail ( " ApplierApplyToDB " , self - > id ( ) ) . detail ( " FromVersion " , begin - > first ) . detail ( " EndVersion " , end - > first ) ;
2019-05-10 11:55:44 +08:00
self - > sanityCheckMutationOps ( ) ;
state std : : map < Version , Standalone < VectorRef < MutationRef > > > : : iterator it = self - > kvOps . begin ( ) ;
state std : : map < Version , Standalone < VectorRef < MutationRef > > > : : iterator prevIt = it ;
state int index = 0 ;
state int prevIndex = index ;
state int count = 0 ;
state Reference < ReadYourWritesTransaction > tr ( new ReadYourWritesTransaction ( cx ) ) ;
state int numVersion = 0 ;
state double transactionSize = 0 ;
loop {
try {
tr - > reset ( ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
transactionSize = 0 ;
for ( ; it ! = self - > kvOps . end ( ) ; + + it ) {
numVersion + + ;
2019-06-05 02:40:23 +08:00
//TraceEvent("FastRestore").detail("Applier", self->id()).detail("ApplyKVsToDBVersion", it->first);
2019-05-10 11:55:44 +08:00
state MutationRef m ;
for ( ; index < it - > second . size ( ) ; + + index ) {
m = it - > second [ index ] ;
if ( m . type > = MutationRef : : Type : : SetValue & & m . type < = MutationRef : : Type : : MAX_ATOMIC_OP )
typeStr = typeString [ m . type ] ;
else {
2019-06-05 13:17:08 +08:00
TraceEvent ( SevError , " FastRestore " ) . detail ( " InvalidMutationType " , m . type ) ;
2019-05-10 11:55:44 +08:00
}
if ( m . type = = MutationRef : : SetValue ) {
tr - > set ( m . param1 , m . param2 ) ;
} else if ( m . type = = MutationRef : : ClearRange ) {
KeyRangeRef mutationRange ( m . param1 , m . param2 ) ;
tr - > clear ( mutationRange ) ;
} else if ( isAtomicOp ( ( MutationRef : : Type ) m . type ) ) {
tr - > atomicOp ( m . param1 , m . param2 , m . type ) ;
} else {
2019-06-05 13:17:08 +08:00
TraceEvent ( SevError , " FastRestore " ) . detail ( " UnhandledMutationType " , m . type ) . detail ( " TypeName " , typeStr ) ;
2019-05-10 11:55:44 +08:00
}
+ + count ;
transactionSize + = m . expectedSize ( ) ;
2019-06-01 02:09:31 +08:00
if ( transactionSize > = opConfig . transactionBatchSizeThreshold ) { // commit per 1000 mutations
2019-05-10 11:55:44 +08:00
wait ( tr - > commit ( ) ) ;
tr - > reset ( ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
prevIt = it ;
prevIndex = index ;
transactionSize = 0 ;
}
}
2019-05-15 08:39:44 +08:00
if ( transactionSize > 0 ) { // the commit batch should NOT across versions
wait ( tr - > commit ( ) ) ;
tr - > reset ( ) ;
tr - > setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr - > setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
prevIt = it ;
prevIndex = index ;
transactionSize = 0 ;
}
2019-05-10 11:55:44 +08:00
index = 0 ;
}
// Last transaction
if ( transactionSize > 0 ) {
wait ( tr - > commit ( ) ) ;
}
break ;
} catch ( Error & e ) {
wait ( tr - > onError ( e ) ) ;
it = prevIt ;
index = prevIndex ;
transactionSize = 0 ;
}
}
self - > kvOps . clear ( ) ;
return Void ( ) ;
2019-05-30 04:26:17 +08:00
}
2019-06-05 13:17:08 +08:00
ACTOR static Future < Void > handleApplyToDBRequest ( RestoreVersionBatchRequest req , Reference < RestoreApplierData > self , Database cx ) {
2019-06-05 02:40:23 +08:00
TraceEvent ( " FastRestore " ) . detail ( " ApplierApplyToDB " , self - > id ( ) ) . detail ( " DBApplierPresent " , self - > dbApplier . present ( ) ) ;
2019-05-30 04:26:17 +08:00
if ( ! self - > dbApplier . present ( ) ) {
2019-05-30 06:09:28 +08:00
self - > dbApplier = applyToDB ( self , cx ) ;
2019-05-30 04:26:17 +08:00
}
2019-05-10 11:55:44 +08:00
2019-06-05 02:40:23 +08:00
ASSERT ( self - > dbApplier . present ( ) ) ;
2019-05-10 11:55:44 +08:00
2019-06-05 02:40:23 +08:00
wait ( self - > dbApplier . get ( ) ) ;
req . reply . send ( RestoreCommonReply ( self - > id ( ) ) ) ;
2019-05-10 11:55:44 +08:00
2019-06-05 02:40:23 +08:00
return Void ( ) ;
}