2017-05-26 04:48:44 +08:00
/*
* worker . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/ActorCollection.h"
# include "flow/SystemMonitor.h"
# include "flow/TDMetric.actor.h"
# include "fdbrpc/simulator.h"
2019-02-18 07:41:16 +08:00
# include "fdbclient/NativeAPI.actor.h"
2017-07-25 04:13:06 +08:00
# include "fdbclient/MetricLogger.h"
2019-02-18 11:13:26 +08:00
# include "fdbserver/WorkerInterface.actor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/IKeyValueStore.h"
# include "fdbserver/WaitFailure.h"
# include "fdbserver/TesterInterface.h" // for poisson()
# include "fdbserver/IDiskQueue.h"
2017-05-26 04:48:44 +08:00
# include "fdbclient/DatabaseContext.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/ClusterRecruitmentInterface.h"
2018-12-14 05:31:37 +08:00
# include "fdbserver/DataDistributorInterface.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/ServerDBInfo.h"
2017-05-26 04:48:44 +08:00
# include "fdbserver/CoordinationInterface.h"
# include "fdbclient/FailureMonitorClient.h"
# include "fdbclient/MonitorLeader.h"
2017-10-12 05:13:16 +08:00
# include "fdbclient/ClientWorkerInterface.h"
# include "flow/Profiler.h"
2017-05-26 04:48:44 +08:00
# ifdef __linux__
# ifdef USE_GPERFTOOLS
# include "gperftools/profiler.h"
# endif
# include <unistd.h>
# include <thread>
# include <execinfo.h>
# endif
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
# if CENABLED(0, NOT_IN_CLEAN)
extern IKeyValueStore * keyValueStoreCompressTestData ( IKeyValueStore * store ) ;
# define KV_STORE(filename,uid) keyValueStoreCompressTestData(keyValueStoreSQLite(filename,uid))
# elif CENABLED(0, NOT_IN_CLEAN)
# define KV_STORE(filename,uid) keyValueStoreSQLite(filename,uid)
# else
# define KV_STORE(filename,uid) keyValueStoreMemory(filename,uid)
# endif
ACTOR static Future < Void > extractClientInfo ( Reference < AsyncVar < ServerDBInfo > > db , Reference < AsyncVar < ClientDBInfo > > info ) {
loop {
info - > set ( db - > get ( ) . client ) ;
2018-08-11 04:57:10 +08:00
wait ( db - > onChange ( ) ) ;
2017-05-26 04:48:44 +08:00
}
}
Database openDBOnServer ( Reference < AsyncVar < ServerDBInfo > > const & db , int taskID , bool enableLocalityLoadBalance , bool lockAware ) {
Reference < AsyncVar < ClientDBInfo > > info ( new AsyncVar < ClientDBInfo > ) ;
return DatabaseContext : : create ( info , extractClientInfo ( db , info ) , enableLocalityLoadBalance ? db - > get ( ) . myLocality : LocalityData ( ) , enableLocalityLoadBalance , taskID , lockAware ) ;
}
struct ErrorInfo {
Error error ;
2018-09-06 06:06:14 +08:00
const Role & role ;
2017-05-26 04:48:44 +08:00
UID id ;
2018-09-06 06:06:14 +08:00
ErrorInfo ( Error e , const Role & role , UID id ) : error ( e ) , role ( role ) , id ( id ) { }
2017-05-26 04:48:44 +08:00
template < class Ar > void serialize ( Ar & ) { ASSERT ( false ) ; }
} ;
2017-05-27 08:43:28 +08:00
Error checkIOTimeout ( Error const & e ) {
2017-06-01 08:03:15 +08:00
// Convert all_errors to io_timeout if global timeout bool was set
2017-06-16 08:40:19 +08:00
bool timeoutOccurred = ( bool ) g_network - > global ( INetwork : : enASIOTimedOut ) ;
// In simulation, have to check global timed out flag for both this process and the machine process on which IO is done
if ( g_network - > isSimulated ( ) & & ! timeoutOccurred )
timeoutOccurred = g_pSimulator - > getCurrentProcess ( ) - > machine - > machineProcess - > global ( INetwork : : enASIOTimedOut ) ;
if ( timeoutOccurred ) {
TEST ( true ) ; // Timeout occurred
2017-05-27 08:43:28 +08:00
Error timeout = io_timeout ( ) ;
2017-06-16 08:40:19 +08:00
// Preserve injectedness of error
if ( e . isInjectedFault ( ) )
2017-05-27 08:43:28 +08:00
timeout = timeout . asInjectedFault ( ) ;
return timeout ;
}
return e ;
}
2017-05-26 04:48:44 +08:00
ACTOR Future < Void > forwardError ( PromiseStream < ErrorInfo > errors ,
2018-09-06 06:06:14 +08:00
Role role , UID id ,
2017-05-26 04:48:44 +08:00
Future < Void > process )
{
try {
2018-08-11 04:57:10 +08:00
wait ( process ) ;
2018-09-06 06:06:14 +08:00
errors . send ( ErrorInfo ( success ( ) , role , id ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
} catch ( Error & e ) {
2018-09-06 06:06:14 +08:00
errors . send ( ErrorInfo ( e , role , id ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
}
ACTOR Future < Void > handleIOErrors ( Future < Void > actor , IClosable * store , UID id , Future < Void > onClosed = Void ( ) ) {
2018-07-07 05:41:36 +08:00
state Future < ErrorOr < Void > > storeError = actor . isReady ( ) ? Never ( ) : errorOr ( store - > getError ( ) ) ;
2017-05-26 04:48:44 +08:00
choose {
when ( state ErrorOr < Void > e = wait ( errorOr ( actor ) ) ) {
2018-10-20 09:55:35 +08:00
if ( e . isError ( ) & & e . getError ( ) . code ( ) = = error_code_please_reboot ) {
2018-10-09 08:26:10 +08:00
// no need to wait.
} else {
2018-10-24 08:05:42 +08:00
wait ( onClosed ) ;
2018-10-09 08:26:10 +08:00
}
2018-10-24 07:53:41 +08:00
if ( storeError . isReady ( ) ) throw storeError . get ( ) . getError ( ) ;
2017-05-26 04:48:44 +08:00
if ( e . isError ( ) ) throw e . getError ( ) ; else return e . get ( ) ;
}
2018-07-07 05:41:36 +08:00
when ( ErrorOr < Void > e = wait ( storeError ) ) {
2017-05-26 04:48:44 +08:00
TraceEvent ( " WorkerTerminatingByIOError " , id ) . error ( e . getError ( ) , true ) ;
actor . cancel ( ) ;
// file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be reported SevError.
if ( e . getError ( ) . code ( ) = = error_code_file_not_found ) {
TEST ( true ) ; // Worker terminated with file_not_found error
return Void ( ) ;
}
throw e . getError ( ) ;
}
}
}
2017-05-27 08:43:28 +08:00
ACTOR Future < Void > workerHandleErrors ( FutureStream < ErrorInfo > errors ) {
2017-05-26 04:48:44 +08:00
loop choose {
2017-05-27 08:43:28 +08:00
when ( ErrorInfo _err = waitNext ( errors ) ) {
ErrorInfo err = _err ;
2017-05-26 04:48:44 +08:00
bool ok =
err . error . code ( ) = = error_code_success | |
err . error . code ( ) = = error_code_please_reboot | |
err . error . code ( ) = = error_code_actor_cancelled | |
2018-10-15 18:43:43 +08:00
err . error . code ( ) = = error_code_coordinators_changed | | // The worker server was cancelled
err . error . code ( ) = = error_code_shutdown_in_progress ;
2017-05-26 04:48:44 +08:00
2018-11-22 03:18:26 +08:00
if ( ! ok ) {
2017-06-16 08:40:19 +08:00
err . error = checkIOTimeout ( err . error ) ; // Possibly convert error to io_timeout
2018-08-30 05:40:39 +08:00
}
2017-06-16 08:40:19 +08:00
2018-09-06 06:06:14 +08:00
endRole ( err . role , err . id , " Error " , ok , err . error ) ;
2017-05-26 04:48:44 +08:00
2018-07-17 06:09:07 +08:00
if ( err . error . code ( ) = = error_code_please_reboot | | err . error . code ( ) = = error_code_io_timeout ) throw err . error ;
2017-05-26 04:48:44 +08:00
}
}
}
// Improve simulation code coverage by sometimes deferring the destruction of workerInterface (and therefore "endpoint not found" responses to clients
// for an extra second, so that clients are more likely to see broken_promise errors
ACTOR template < class T > Future < Void > zombie ( T workerInterface , Future < Void > worker ) {
try {
2018-08-11 04:57:10 +08:00
wait ( worker ) ;
2017-05-26 04:48:44 +08:00
if ( BUGGIFY )
2018-08-11 04:57:10 +08:00
wait ( delay ( 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
} catch ( Error & e ) {
throw ;
}
}
ACTOR Future < Void > loadedPonger ( FutureStream < LoadedPingRequest > pings ) {
2019-02-18 10:46:59 +08:00
state Standalone < StringRef > payloadBack ( std : : string ( 20480 , ' . ' ) ) ;
2017-05-26 04:48:44 +08:00
loop {
LoadedPingRequest pong = waitNext ( pings ) ;
LoadedReply rep ;
rep . payload = ( pong . loadReply ? payloadBack : LiteralStringRef ( " " ) ) ;
rep . id = pong . id ;
pong . reply . send ( rep ) ;
}
}
StringRef fileStoragePrefix = LiteralStringRef ( " storage- " ) ;
StringRef fileLogDataPrefix = LiteralStringRef ( " log- " ) ;
StringRef fileLogQueuePrefix = LiteralStringRef ( " logqueue- " ) ;
2017-09-22 14:51:55 +08:00
StringRef tlogQueueExtension = LiteralStringRef ( " fdq " ) ;
2017-05-26 04:48:44 +08:00
std : : pair < KeyValueStoreType , std : : string > bTreeV1Suffix = std : : make_pair ( KeyValueStoreType : : SSD_BTREE_V1 , " .fdb " ) ;
std : : pair < KeyValueStoreType , std : : string > bTreeV2Suffix = std : : make_pair ( KeyValueStoreType : : SSD_BTREE_V2 , " .sqlite " ) ;
std : : pair < KeyValueStoreType , std : : string > memorySuffix = std : : make_pair ( KeyValueStoreType : : MEMORY , " -0.fdq " ) ;
2017-09-22 14:51:55 +08:00
std : : pair < KeyValueStoreType , std : : string > redwoodSuffix = std : : make_pair ( KeyValueStoreType : : SSD_REDWOOD_V1 , " .redwood " ) ;
2017-05-26 04:48:44 +08:00
std : : string validationFilename = " _validate " ;
std : : string filenameFromSample ( KeyValueStoreType storeType , std : : string folder , std : : string sample_filename ) {
if ( storeType = = KeyValueStoreType : : SSD_BTREE_V1 )
return joinPath ( folder , sample_filename ) ;
else if ( storeType = = KeyValueStoreType : : SSD_BTREE_V2 )
2017-08-29 02:25:37 +08:00
return joinPath ( folder , sample_filename ) ;
2017-05-26 04:48:44 +08:00
else if ( storeType = = KeyValueStoreType : : MEMORY )
return joinPath ( folder , sample_filename . substr ( 0 , sample_filename . size ( ) - 5 ) ) ;
2017-09-22 14:51:55 +08:00
else if ( storeType = = KeyValueStoreType : : SSD_REDWOOD_V1 )
return joinPath ( folder , sample_filename ) ;
2017-05-26 04:48:44 +08:00
UNREACHABLE ( ) ;
}
std : : string filenameFromId ( KeyValueStoreType storeType , std : : string folder , std : : string prefix , UID id ) {
if ( storeType = = KeyValueStoreType : : SSD_BTREE_V1 )
return joinPath ( folder , prefix + id . toString ( ) + " .fdb " ) ;
else if ( storeType = = KeyValueStoreType : : SSD_BTREE_V2 )
2017-08-29 02:25:37 +08:00
return joinPath ( folder , prefix + id . toString ( ) + " .sqlite " ) ;
2017-05-26 04:48:44 +08:00
else if ( storeType = = KeyValueStoreType : : MEMORY )
return joinPath ( folder , prefix + id . toString ( ) + " - " ) ;
2017-09-22 14:51:55 +08:00
else if ( storeType = = KeyValueStoreType : : SSD_REDWOOD_V1 )
return joinPath ( folder , prefix + id . toString ( ) + " .redwood " ) ;
2017-05-26 04:48:44 +08:00
UNREACHABLE ( ) ;
}
struct DiskStore {
enum COMPONENT { TLogData , Storage } ;
UID storeID ;
std : : string filename ; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent ;
KeyValueStoreType storeType ;
} ;
std : : vector < DiskStore > getDiskStores ( std : : string folder , std : : string suffix , KeyValueStoreType type ) {
std : : vector < DiskStore > result ;
vector < std : : string > files = platform : : listFiles ( folder , suffix ) ;
for ( int idx = 0 ; idx < files . size ( ) ; idx + + ) {
DiskStore store ;
store . storeType = type ;
StringRef prefix ;
if ( StringRef ( files [ idx ] ) . startsWith ( fileStoragePrefix ) ) {
store . storedComponent = DiskStore : : Storage ;
prefix = fileStoragePrefix ;
}
else if ( StringRef ( files [ idx ] ) . startsWith ( fileLogDataPrefix ) ) {
store . storedComponent = DiskStore : : TLogData ;
prefix = fileLogDataPrefix ;
}
else
continue ;
store . storeID = UID : : fromString ( files [ idx ] . substr ( prefix . size ( ) , 32 ) ) ;
store . filename = filenameFromSample ( type , folder , files [ idx ] ) ;
result . push_back ( store ) ;
}
return result ;
}
std : : vector < DiskStore > getDiskStores ( std : : string folder ) {
auto result = getDiskStores ( folder , bTreeV1Suffix . second , bTreeV1Suffix . first ) ;
auto result1 = getDiskStores ( folder , bTreeV2Suffix . second , bTreeV2Suffix . first ) ;
result . insert ( result . end ( ) , result1 . begin ( ) , result1 . end ( ) ) ;
auto result2 = getDiskStores ( folder , memorySuffix . second , memorySuffix . first ) ;
result . insert ( result . end ( ) , result2 . begin ( ) , result2 . end ( ) ) ;
2017-09-22 14:51:55 +08:00
auto result3 = getDiskStores ( folder , redwoodSuffix . second , redwoodSuffix . first ) ;
result . insert ( result . end ( ) , result3 . begin ( ) , result3 . end ( ) ) ;
2017-05-26 04:48:44 +08:00
return result ;
}
2019-01-29 03:29:39 +08:00
ACTOR Future < Void > registrationClient (
Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface ,
WorkerInterface interf ,
Reference < AsyncVar < ClusterControllerPriorityInfo > > asyncPriorityInfo ,
ProcessClass initialClass ,
Reference < AsyncVar < Optional < DataDistributorInterface > > > ddInterf ) {
2017-05-26 04:48:44 +08:00
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
2019-01-29 03:29:39 +08:00
// The registration request piggybacks optional distributor interface if it exists.
2017-05-26 04:48:44 +08:00
state Generation requestGeneration = 0 ;
2018-02-10 08:48:55 +08:00
state ProcessClass processClass = initialClass ;
2017-05-26 04:48:44 +08:00
loop {
2019-01-29 03:29:39 +08:00
RegisterWorkerRequest request ( interf , initialClass , processClass , asyncPriorityInfo - > get ( ) , requestGeneration + + , ddInterf - > get ( ) ) ;
Future < RegisterWorkerReply > registrationReply = ccInterface - > get ( ) . present ( ) ? brokenPromiseToNever ( ccInterface - > get ( ) . get ( ) . registerWorker . getReply ( request ) ) : Never ( ) ;
2017-09-26 01:36:03 +08:00
choose {
2017-11-15 05:57:37 +08:00
when ( RegisterWorkerReply reply = wait ( registrationReply ) ) {
2018-02-10 08:48:55 +08:00
processClass = reply . processClass ;
asyncPriorityInfo - > set ( reply . priorityInfo ) ;
2017-09-26 01:36:03 +08:00
}
2018-08-11 04:57:10 +08:00
when ( wait ( ccInterface - > onChange ( ) ) ) { }
2019-01-30 02:14:11 +08:00
when ( wait ( ddInterf - > onChange ( ) ) ) { }
2017-09-26 01:36:03 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
# if defined(__linux__) && defined(USE_GPERFTOOLS)
//A set of threads that should be profiled
std : : set < std : : thread : : id > profiledThreads ;
//Returns whether or not a given thread should be profiled
int filter_in_thread ( void * arg ) {
return profiledThreads . count ( std : : this_thread : : get_id ( ) ) > 0 ? 1 : 0 ;
}
# endif
//Enables the calling thread to be profiled
void registerThreadForProfiling ( ) {
# if defined(__linux__) && defined(USE_GPERFTOOLS)
//Not sure if this is actually needed, but a call to backtrace was advised here:
//http://groups.google.com/group/google-perftools/browse_thread/thread/0dfd74532e038eb8/2686d9f24ac4365f?pli=1
profiledThreads . insert ( std : : this_thread : : get_id ( ) ) ;
const int num_levels = 100 ;
void * pc [ num_levels ] ;
backtrace ( pc , num_levels ) ;
# endif
}
//Starts or stops the CPU profiler
void updateCpuProfiler ( ProfilerRequest req ) {
2017-10-12 05:13:16 +08:00
switch ( req . type ) {
case ProfilerRequest : : Type : : GPROF :
# if defined(__linux__) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
switch ( req . action ) {
case ProfilerRequest : : Action : : ENABLE : {
2017-10-17 07:04:09 +08:00
const char * path = ( const char * ) req . outputFile . begin ( ) ;
2017-10-12 05:13:16 +08:00
ProfilerOptions * options = new ProfilerOptions ( ) ;
options - > filter_in_thread = & filter_in_thread ;
options - > filter_in_thread_arg = NULL ;
ProfilerStartWithOptions ( path , options ) ;
free ( workingDir ) ;
break ;
}
case ProfilerRequest : : Action : : DISABLE :
ProfilerStop ( ) ;
break ;
case ProfilerRequest : : Action : : RUN :
ASSERT ( false ) ; // User should have called runProfiler.
break ;
}
2017-05-26 04:48:44 +08:00
# endif
2017-10-12 05:13:16 +08:00
break ;
case ProfilerRequest : : Type : : FLOW :
switch ( req . action ) {
case ProfilerRequest : : Action : : ENABLE :
startProfiling ( g_network , { } , req . outputFile ) ;
break ;
case ProfilerRequest : : Action : : DISABLE :
stopProfiling ( ) ;
break ;
case ProfilerRequest : : Action : : RUN :
ASSERT ( false ) ; // User should have called runProfiler.
break ;
}
break ;
2017-05-26 04:48:44 +08:00
}
2017-10-12 05:13:16 +08:00
}
ACTOR Future < Void > runProfiler ( ProfilerRequest req ) {
2017-11-08 05:54:17 +08:00
if ( req . action = = ProfilerRequest : : Action : : RUN ) {
req . action = ProfilerRequest : : Action : : ENABLE ;
updateCpuProfiler ( req ) ;
2018-08-11 04:57:10 +08:00
wait ( delay ( req . duration ) ) ;
2017-11-08 05:54:17 +08:00
req . action = ProfilerRequest : : Action : : DISABLE ;
updateCpuProfiler ( req ) ;
return Void ( ) ;
} else {
updateCpuProfiler ( req ) ;
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
}
2018-10-09 08:26:10 +08:00
ACTOR Future < Void > storageServerRollbackRebooter ( Future < Void > prevStorageServer , KeyValueStoreType storeType , std : : string filename , UID id , LocalityData locality , Reference < AsyncVar < ServerDBInfo > > db , std : : string folder , ActorCollection * filesClosed , int64_t memoryLimit , IKeyValueStore * store ) {
2017-05-26 04:48:44 +08:00
loop {
ErrorOr < Void > e = wait ( errorOr ( prevStorageServer ) ) ;
if ( ! e . isError ( ) ) return Void ( ) ;
else if ( e . getError ( ) . code ( ) ! = error_code_please_reboot ) throw e . getError ( ) ;
2018-03-09 03:14:24 +08:00
TraceEvent ( " StorageServerRequestedReboot " , id ) ;
2017-05-26 04:48:44 +08:00
2018-11-28 01:50:39 +08:00
StorageServerInterface recruited ;
recruited . uniqueID = id ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
DUMPTOKEN ( recruited . getVersion ) ;
DUMPTOKEN ( recruited . getValue ) ;
DUMPTOKEN ( recruited . getKey ) ;
DUMPTOKEN ( recruited . getKeyValues ) ;
DUMPTOKEN ( recruited . getShardState ) ;
DUMPTOKEN ( recruited . waitMetrics ) ;
DUMPTOKEN ( recruited . splitMetrics ) ;
DUMPTOKEN ( recruited . getPhysicalMetrics ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . getKeyValueStoreType ) ;
DUMPTOKEN ( recruited . watchValue ) ;
prevStorageServer = storageServer ( store , recruited , db , folder , Promise < Void > ( ) ) ;
2018-10-20 09:55:35 +08:00
prevStorageServer = handleIOErrors ( prevStorageServer , store , id , store - > onClosed ( ) ) ;
2017-05-26 04:48:44 +08:00
}
}
// FIXME: This will not work correctly in simulation as all workers would share the same roles map
std : : set < std : : pair < std : : string , std : : string > > g_roles ;
Standalone < StringRef > roleString ( std : : set < std : : pair < std : : string , std : : string > > roles , bool with_ids ) {
std : : string result ;
for ( auto & r : roles ) {
if ( ! result . empty ( ) )
result . append ( " , " ) ;
result . append ( r . first ) ;
if ( with_ids ) {
result . append ( " : " ) ;
result . append ( r . second ) ;
}
}
return StringRef ( result ) ;
}
2018-09-06 06:06:14 +08:00
void startRole ( const Role & role , UID roleId , UID workerId , std : : map < std : : string , std : : string > details , std : : string origination ) {
2018-09-06 06:53:12 +08:00
if ( role . includeInTraceRoles ) {
2018-09-06 06:06:14 +08:00
addTraceRole ( role . abbreviation ) ;
}
2017-05-26 04:48:44 +08:00
TraceEvent ev ( " Role " , roleId ) ;
2018-09-06 06:06:14 +08:00
ev . detail ( " As " , role . roleName )
2017-05-26 04:48:44 +08:00
. detail ( " Transition " , " Begin " )
. detail ( " Origination " , origination )
. detail ( " OnWorker " , workerId ) ;
for ( auto it = details . begin ( ) ; it ! = details . end ( ) ; it + + )
ev . detail ( it - > first . c_str ( ) , it - > second ) ;
ev . trackLatest ( ( roleId . shortString ( ) + " .Role " ) . c_str ( ) ) ;
// Update roles map, log Roles metrics
2018-09-06 06:06:14 +08:00
g_roles . insert ( { role . roleName , roleId . shortString ( ) } ) ;
2017-05-26 04:48:44 +08:00
StringMetricHandle ( LiteralStringRef ( " Roles " ) ) = roleString ( g_roles , false ) ;
StringMetricHandle ( LiteralStringRef ( " RolesWithIDs " ) ) = roleString ( g_roles , true ) ;
2018-09-06 06:06:14 +08:00
if ( g_network - > isSimulated ( ) ) g_simulator . addRole ( g_network - > getLocalAddress ( ) , role . roleName ) ;
2017-05-26 04:48:44 +08:00
}
2018-09-06 06:06:14 +08:00
void endRole ( const Role & role , UID id , std : : string reason , bool ok , Error e ) {
2017-05-26 04:48:44 +08:00
{
TraceEvent ev ( " Role " , id ) ;
if ( e . code ( ) ! = invalid_error_code )
ev . error ( e , true ) ;
ev . detail ( " Transition " , " End " )
2018-09-06 06:06:14 +08:00
. detail ( " As " , role . roleName )
2017-05-26 04:48:44 +08:00
. detail ( " Reason " , reason ) ;
ev . trackLatest ( ( id . shortString ( ) + " .Role " ) . c_str ( ) ) ;
}
if ( ! ok ) {
2018-09-06 06:06:14 +08:00
std : : string type = role . roleName + " Failed " ;
2017-05-26 04:48:44 +08:00
TraceEvent err ( SevError , type . c_str ( ) , id ) ;
if ( e . code ( ) ! = invalid_error_code ) {
err . error ( e , true ) ;
}
2018-08-02 05:30:57 +08:00
err . detail ( " Reason " , reason ) ;
2017-05-26 04:48:44 +08:00
}
latestEventCache . clear ( id . shortString ( ) ) ;
// Update roles map, log Roles metrics
2018-09-06 06:06:14 +08:00
g_roles . erase ( { role . roleName , id . shortString ( ) } ) ;
2017-05-26 04:48:44 +08:00
StringMetricHandle ( LiteralStringRef ( " Roles " ) ) = roleString ( g_roles , false ) ;
StringMetricHandle ( LiteralStringRef ( " RolesWithIDs " ) ) = roleString ( g_roles , true ) ;
2018-09-06 06:06:14 +08:00
if ( g_network - > isSimulated ( ) ) g_simulator . removeRole ( g_network - > getLocalAddress ( ) , role . roleName ) ;
2018-09-06 06:53:12 +08:00
if ( role . includeInTraceRoles ) {
2018-09-06 06:06:14 +08:00
removeTraceRole ( role . abbreviation ) ;
}
2017-05-26 04:48:44 +08:00
}
ACTOR Future < Void > monitorServerDBInfo ( Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , Reference < ClusterConnectionFile > connFile , LocalityData locality , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo ;
localInfo . myLocality = locality ;
dbInfo - > set ( localInfo ) ;
2018-11-06 01:28:08 +08:00
state Optional < std : : string > incorrectConnectionString ;
2017-05-26 04:48:44 +08:00
loop {
GetServerDBInfoRequest req ;
req . knownServerInfoID = dbInfo - > get ( ) . id ;
ClusterConnectionString fileConnectionString ;
if ( connFile & & ! connFile - > fileContentsUpToDate ( fileConnectionString ) ) {
2018-01-06 02:29:47 +08:00
req . issues = LiteralStringRef ( " incorrect_cluster_file_contents " ) ;
2018-11-06 01:28:08 +08:00
std : : string connectionString = connFile - > getConnectionString ( ) . toString ( ) ;
2017-11-11 06:12:45 +08:00
if ( connFile - > canGetFilename ( ) ) {
2018-11-06 01:28:08 +08:00
// Don't log a SevWarnAlways the first time to account for transient issues (e.g. someone else changing the file right before us)
TraceEvent ( incorrectConnectionString . present ( ) & & incorrectConnectionString . get ( ) = = connectionString ? SevWarnAlways : SevWarn , " IncorrectClusterFileContents " )
. detail ( " Filename " , connFile - > getFilename ( ) )
2017-05-26 04:48:44 +08:00
. detail ( " ConnectionStringFromFile " , fileConnectionString . toString ( ) )
2018-11-06 01:28:08 +08:00
. detail ( " CurrentConnectionString " , connectionString ) ;
2017-05-26 04:48:44 +08:00
}
2018-11-06 01:28:08 +08:00
incorrectConnectionString = connectionString ;
}
else {
incorrectConnectionString = Optional < std : : string > ( ) ;
2017-05-26 04:48:44 +08:00
}
auto peers = FlowTransport : : transport ( ) . getIncompatiblePeers ( ) ;
for ( auto it = peers - > begin ( ) ; it ! = peers - > end ( ) ; ) {
if ( now ( ) - it - > second . second > SERVER_KNOBS - > INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
req . incompatiblePeers . push_back ( it - > first ) ;
it = peers - > erase ( it ) ;
} else {
it + + ;
}
}
choose {
when ( ServerDBInfo ni = wait ( ccInterface - > get ( ) . present ( ) ? brokenPromiseToNever ( ccInterface - > get ( ) . get ( ) . getServerDBInfo . getReply ( req ) ) : Never ( ) ) ) {
2019-01-19 03:30:18 +08:00
TraceEvent ( " GotServerDBInfoChange " ) . detail ( " ChangeID " , ni . id ) . detail ( " MasterID " , ni . master . id ( ) )
2019-02-13 07:50:44 +08:00
. detail ( " DataDistributorID " , ni . distributor . present ( ) ? ni . distributor . get ( ) . id ( ) : UID ( ) ) ;
2017-05-26 04:48:44 +08:00
ServerDBInfo localInfo = ni ;
localInfo . myLocality = locality ;
dbInfo - > set ( localInfo ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ccInterface - > onChange ( ) ) ) {
2017-05-26 04:48:44 +08:00
if ( ccInterface - > get ( ) . present ( ) )
2018-10-31 04:44:37 +08:00
TraceEvent ( " GotCCInterfaceChange " ) . detail ( " CCID " , ccInterface - > get ( ) . get ( ) . id ( ) ) . detail ( " CCMachine " , ccInterface - > get ( ) . get ( ) . getWorkers . getEndpoint ( ) . getPrimaryAddress ( ) ) ;
2017-05-26 04:48:44 +08:00
}
}
}
}
2018-09-29 03:12:06 +08:00
ACTOR Future < Void > workerServer ( Reference < ClusterConnectionFile > connFile , Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , LocalityData locality ,
Reference < AsyncVar < ClusterControllerPriorityInfo > > asyncPriorityInfo , ProcessClass initialClass , std : : string folder , int64_t memoryLimit , std : : string metricsConnFile , std : : string metricsPrefix , Promise < Void > recoveredDiskFiles ) {
2017-05-26 04:48:44 +08:00
state PromiseStream < ErrorInfo > errors ;
2019-01-29 03:29:39 +08:00
state Reference < AsyncVar < Optional < DataDistributorInterface > > > ddInterf ( new AsyncVar < Optional < DataDistributorInterface > > ( ) ) ;
2017-05-27 08:43:28 +08:00
state Future < Void > handleErrors = workerHandleErrors ( errors . getFuture ( ) ) ; // Needs to be stopped last
2017-05-26 04:48:44 +08:00
state ActorCollection errorForwarders ( false ) ;
state Future < Void > loggingTrigger = Void ( ) ;
state double loggingDelay = SERVER_KNOBS - > WORKER_LOGGING_INTERVAL ;
state ActorCollection filesClosed ( true ) ;
state Promise < Void > stopping ;
2018-05-06 09:16:28 +08:00
state WorkerCache < InitializeStorageReply > storageCache ;
2018-08-17 01:24:12 +08:00
state Reference < AsyncVar < ServerDBInfo > > dbInfo ( new AsyncVar < ServerDBInfo > ( ServerDBInfo ( ) ) ) ;
2017-05-26 04:48:44 +08:00
state Future < Void > metricsLogger ;
2018-11-08 13:09:51 +08:00
state std : : map < KeyValueStoreType : : StoreType , std : : pair < Future < Void > , PromiseStream < InitializeTLogRequest > > > sharedLogs ;
2017-05-26 04:48:44 +08:00
state WorkerInterface interf ( locality ) ;
if ( metricsPrefix . size ( ) > 0 ) {
if ( metricsConnFile . size ( ) > 0 ) {
try {
2018-09-22 06:58:14 +08:00
state Database db = Database : : createDatabase ( metricsConnFile , Database : : API_VERSION_LATEST , locality ) ;
metricsLogger = runMetrics ( db , KeyRef ( metricsPrefix ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
TraceEvent ( SevWarnAlways , " TDMetricsBadClusterFile " ) . error ( e ) . detail ( " ConnFile " , metricsConnFile ) ;
}
} else {
bool lockAware = metricsPrefix . size ( ) & & metricsPrefix [ 0 ] = = ' \xff ' ;
metricsLogger = runMetrics ( openDBOnServer ( dbInfo , TaskDefaultEndpoint , true , lockAware ) , KeyRef ( metricsPrefix ) ) ;
}
}
errorForwarders . add ( loadedPonger ( interf . debugPing . getFuture ( ) ) ) ;
errorForwarders . add ( waitFailureServer ( interf . waitFailure . getFuture ( ) ) ) ;
errorForwarders . add ( monitorServerDBInfo ( ccInterface , connFile , locality , dbInfo ) ) ;
2017-09-02 03:53:01 +08:00
errorForwarders . add ( testerServerCore ( interf . testerInterface , connFile , dbInfo , locality ) ) ;
2017-05-26 04:48:44 +08:00
filesClosed . add ( stopping . getFuture ( ) ) ;
initializeSystemMonitorMachineState ( SystemMonitorMachineState ( folder , locality . zoneId ( ) , locality . machineId ( ) , g_network - > getLocalAddress ( ) . ip ) ) ;
{
auto recruited = interf ; //ghetto! don't we all love a good #define
DUMPTOKEN ( recruited . clientInterface . reboot ) ;
2017-10-12 05:13:16 +08:00
DUMPTOKEN ( recruited . clientInterface . profiler ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . tLog ) ;
DUMPTOKEN ( recruited . master ) ;
DUMPTOKEN ( recruited . masterProxy ) ;
DUMPTOKEN ( recruited . resolver ) ;
DUMPTOKEN ( recruited . storage ) ;
DUMPTOKEN ( recruited . debugPing ) ;
DUMPTOKEN ( recruited . coordinationPing ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . setMetricsRate ) ;
DUMPTOKEN ( recruited . eventLogRequest ) ;
DUMPTOKEN ( recruited . traceBatchDumpRequest ) ;
}
try {
std : : vector < DiskStore > stores = getDiskStores ( folder ) ;
bool validateDataFiles = deleteFile ( joinPath ( folder , validationFilename ) ) ;
2017-09-16 01:57:58 +08:00
std : : vector < Future < Void > > recoveries ;
2017-05-26 04:48:44 +08:00
for ( int f = 0 ; f < stores . size ( ) ; f + + ) {
DiskStore s = stores [ f ] ;
// FIXME: Error handling
if ( s . storedComponent = = DiskStore : : Storage ) {
2017-05-27 08:43:28 +08:00
IKeyValueStore * kv = openKVStore ( s . storeType , s . filename , s . storeID , memoryLimit , false , validateDataFiles ) ;
2017-05-26 04:48:44 +08:00
Future < Void > kvClosed = kv - > onClosed ( ) ;
filesClosed . add ( kvClosed ) ;
StorageServerInterface recruited ;
recruited . uniqueID = s . storeID ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = s . storeType . toString ( ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : STORAGE_SERVER , recruited . id ( ) , interf . id ( ) , details , " Restored " ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . getVersion ) ;
DUMPTOKEN ( recruited . getValue ) ;
DUMPTOKEN ( recruited . getKey ) ;
DUMPTOKEN ( recruited . getKeyValues ) ;
DUMPTOKEN ( recruited . getShardState ) ;
DUMPTOKEN ( recruited . waitMetrics ) ;
DUMPTOKEN ( recruited . splitMetrics ) ;
DUMPTOKEN ( recruited . getPhysicalMetrics ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . getKeyValueStoreType ) ;
DUMPTOKEN ( recruited . watchValue ) ;
2017-09-16 01:57:58 +08:00
Promise < Void > recovery ;
Future < Void > f = storageServer ( kv , recruited , dbInfo , folder , recovery ) ;
recoveries . push_back ( recovery . getFuture ( ) ) ;
2018-10-21 11:48:46 +08:00
f = handleIOErrors ( f , kv , s . storeID , kvClosed ) ;
2018-10-09 08:26:10 +08:00
f = storageServerRollbackRebooter ( f , s . storeType , s . filename , recruited . id ( ) , recruited . locality , dbInfo , folder , & filesClosed , memoryLimit , kv ) ;
2018-09-06 06:06:14 +08:00
errorForwarders . add ( forwardError ( errors , Role : : STORAGE_SERVER , recruited . id ( ) , f ) ) ;
2017-05-26 04:48:44 +08:00
} else if ( s . storedComponent = = DiskStore : : TLogData ) {
IKeyValueStore * kv = openKVStore ( s . storeType , s . filename , s . storeID , memoryLimit , validateDataFiles ) ;
IDiskQueue * queue = openDiskQueue (
2018-02-12 17:30:02 +08:00
joinPath ( folder , fileLogQueuePrefix . toString ( ) + s . storeID . toString ( ) + " - " ) , tlogQueueExtension . toString ( ) , s . storeID , 10 * SERVER_KNOBS - > TARGET_BYTES_PER_TLOG ) ;
2017-05-26 04:48:44 +08:00
filesClosed . add ( kv - > onClosed ( ) ) ;
filesClosed . add ( queue - > onClosed ( ) ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = s . storeType . toString ( ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : SHARED_TRANSACTION_LOG , s . storeID , interf . id ( ) , details , " Restored " ) ;
2017-05-26 04:48:44 +08:00
Promise < Void > oldLog ;
2017-09-16 01:57:58 +08:00
Promise < Void > recovery ;
2018-11-08 13:09:51 +08:00
auto & logData = sharedLogs [ s . storeType ] ;
Future < Void > tl = tLog ( kv , queue , dbInfo , locality , ! logData . first . isValid ( ) | | logData . first . isReady ( ) ? logData . second : PromiseStream < InitializeTLogRequest > ( ) , s . storeID , true , oldLog , recovery ) ;
2017-09-16 01:57:58 +08:00
recoveries . push_back ( recovery . getFuture ( ) ) ;
2017-10-06 08:09:44 +08:00
2017-05-26 04:48:44 +08:00
tl = handleIOErrors ( tl , kv , s . storeID ) ;
tl = handleIOErrors ( tl , queue , s . storeID ) ;
2018-11-08 13:09:51 +08:00
if ( ! logData . first . isValid ( ) | | logData . first . isReady ( ) ) {
logData . first = oldLog . getFuture ( ) | | tl ;
2017-05-26 04:48:44 +08:00
}
2018-09-06 06:06:14 +08:00
errorForwarders . add ( forwardError ( errors , Role : : SHARED_TRANSACTION_LOG , s . storeID , tl ) ) ;
2017-05-26 04:48:44 +08:00
}
}
std : : map < std : : string , std : : string > details ;
details [ " Locality " ] = locality . toString ( ) ;
details [ " DataFolder " ] = folder ;
details [ " StoresPresent " ] = format ( " %d " , stores . size ( ) ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : WORKER , interf . id ( ) , interf . id ( ) , details ) ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( recoveries ) ) ;
2018-05-02 03:05:43 +08:00
recoveredDiskFiles . send ( Void ( ) ) ;
2019-01-29 03:29:39 +08:00
errorForwarders . add ( registrationClient ( ccInterface , interf , asyncPriorityInfo , initialClass , ddInterf ) ) ;
2017-09-16 01:57:58 +08:00
TraceEvent ( " RecoveriesComplete " , interf . id ( ) ) ;
2017-05-26 04:48:44 +08:00
loop choose {
when ( RebootRequest req = waitNext ( interf . clientInterface . reboot . getFuture ( ) ) ) {
state RebootRequest rebootReq = req ;
if ( rebootReq . checkData ) {
Reference < IAsyncFile > checkFile = wait ( IAsyncFileSystem : : filesystem ( ) - > open ( joinPath ( folder , validationFilename ) , IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_READWRITE , 0600 ) ) ;
2018-08-11 04:57:10 +08:00
wait ( checkFile - > sync ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2017-08-29 02:25:37 +08:00
2017-05-26 04:48:44 +08:00
if ( g_network - > isSimulated ( ) ) {
TraceEvent ( " SimulatedReboot " ) . detail ( " Deletion " , rebootReq . deleteData ) ;
if ( rebootReq . deleteData ) {
throw please_reboot_delete ( ) ;
}
throw please_reboot ( ) ;
}
else {
TraceEvent ( " ProcessReboot " ) ;
ASSERT ( ! rebootReq . deleteData ) ;
flushAndExit ( 0 ) ;
}
}
2017-10-12 05:13:16 +08:00
when ( ProfilerRequest req = waitNext ( interf . clientInterface . profiler . getFuture ( ) ) ) {
2017-10-17 07:04:09 +08:00
state ProfilerRequest profilerReq = req ;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,
// thus we instead enforce a different requirement. One can only write to a file that's
// beneath the working directory, and we remove the ability to do any symlink or ../..
// tricks by resolving all paths through `abspath` first.
try {
2017-10-13 08:49:41 +08:00
std : : string realLogDir = abspath ( SERVER_KNOBS - > LOG_DIRECTORY ) ;
std : : string realOutPath = abspath ( realLogDir + " / " + profilerReq . outputFile . toString ( ) ) ;
if ( realLogDir . size ( ) < realOutPath . size ( ) & &
strncmp ( realLogDir . c_str ( ) , realOutPath . c_str ( ) , realLogDir . size ( ) ) = = 0 ) {
profilerReq . outputFile = realOutPath ;
uncancellable ( runProfiler ( profilerReq ) ) ;
2017-10-17 07:04:09 +08:00
profilerReq . reply . send ( Void ( ) ) ;
} else {
profilerReq . reply . sendError ( client_invalid_operation ( ) ) ;
}
} catch ( Error & e ) {
profilerReq . reply . sendError ( e ) ;
}
2017-07-13 15:27:56 +08:00
}
2017-05-26 04:48:44 +08:00
when ( RecruitMasterRequest req = waitNext ( interf . master . getFuture ( ) ) ) {
MasterInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : MASTER , recruited . id ( ) , interf . id ( ) ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . tlogRejoin ) ;
DUMPTOKEN ( recruited . changeCoordinators ) ;
DUMPTOKEN ( recruited . getCommitVersion ) ;
//printf("Recruited as masterServer\n");
2018-07-01 21:39:04 +08:00
Future < Void > masterProcess = masterServer ( recruited , dbInfo , ServerCoordinators ( connFile ) , req . lifetime , req . forceRecovery ) ;
2018-09-06 06:06:14 +08:00
errorForwarders . add ( zombie ( recruited , forwardError ( errors , Role : : MASTER , recruited . id ( ) , masterProcess ) ) ) ;
2017-05-26 04:48:44 +08:00
req . reply . send ( recruited ) ;
}
2018-12-14 05:31:37 +08:00
when ( InitializeDataDistributorRequest req = waitNext ( interf . dataDistributor . getFuture ( ) ) ) {
2019-01-29 01:25:15 +08:00
DataDistributorInterface recruited ( locality ) ;
2019-02-01 02:10:41 +08:00
recruited . initEndpoints ( ) ;
2019-01-29 03:29:39 +08:00
if ( ddInterf - > get ( ) . present ( ) ) {
recruited = ddInterf - > get ( ) . get ( ) ;
2019-02-13 07:50:44 +08:00
TEST ( true ) ; // Recruited while already a data distributor.
2019-01-29 03:29:39 +08:00
} else {
startRole ( Role : : DATA_DISTRIBUTOR , recruited . id ( ) , interf . id ( ) ) ;
Future < Void > dataDistributorProcess = dataDistributor ( recruited , dbInfo ) ;
2019-02-01 02:10:41 +08:00
errorForwarders . add ( forwardError ( errors , Role : : DATA_DISTRIBUTOR , recruited . id ( ) , setWhenDoneOrError ( dataDistributorProcess , ddInterf , Optional < DataDistributorInterface > ( ) ) ) ) ;
2019-01-29 03:29:39 +08:00
ddInterf - > set ( Optional < DataDistributorInterface > ( recruited ) ) ;
}
2019-01-29 01:25:15 +08:00
TraceEvent ( " DataDistributorReceived " , req . reqId ) . detail ( " DataDistributorId " , recruited . id ( ) ) ;
2018-12-14 05:31:37 +08:00
req . reply . send ( recruited ) ;
}
2017-05-26 04:48:44 +08:00
when ( InitializeTLogRequest req = waitNext ( interf . tLog . getFuture ( ) ) ) {
2018-11-08 13:09:51 +08:00
auto & logData = sharedLogs [ req . storeType ] ;
logData . second . send ( req ) ;
if ( ! logData . first . isValid ( ) | | logData . first . isReady ( ) ) {
2017-05-26 04:48:44 +08:00
UID logId = g_random - > randomUniqueID ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " ForMaster " ] = req . recruitmentID . shortString ( ) ;
details [ " StorageEngine " ] = req . storeType . toString ( ) ;
2017-08-29 02:25:37 +08:00
2017-05-26 04:48:44 +08:00
//FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor
2018-09-06 06:06:14 +08:00
startRole ( Role : : SHARED_TRANSACTION_LOG , logId , interf . id ( ) , details ) ;
2017-05-26 04:48:44 +08:00
std : : string filename = filenameFromId ( req . storeType , folder , fileLogDataPrefix . toString ( ) , logId ) ;
IKeyValueStore * data = openKVStore ( req . storeType , filename , logId , memoryLimit ) ;
2017-09-22 14:51:55 +08:00
IDiskQueue * queue = openDiskQueue ( joinPath ( folder , fileLogQueuePrefix . toString ( ) + logId . toString ( ) + " - " ) , tlogQueueExtension . toString ( ) , logId ) ;
2017-05-26 04:48:44 +08:00
filesClosed . add ( data - > onClosed ( ) ) ;
filesClosed . add ( queue - > onClosed ( ) ) ;
2017-10-06 08:09:44 +08:00
2018-11-08 13:09:51 +08:00
logData . first = tLog ( data , queue , dbInfo , locality , logData . second , logId , false , Promise < Void > ( ) , Promise < Void > ( ) ) ;
logData . first = handleIOErrors ( logData . first , data , logId ) ;
logData . first = handleIOErrors ( logData . first , queue , logId ) ;
errorForwarders . add ( forwardError ( errors , Role : : SHARED_TRANSACTION_LOG , logId , logData . first ) ) ;
2017-05-26 04:48:44 +08:00
}
}
when ( InitializeStorageRequest req = waitNext ( interf . storage . getFuture ( ) ) ) {
if ( ! storageCache . exists ( req . reqId ) ) {
StorageServerInterface recruited ( req . interfaceId ) ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = req . storeType . toString ( ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : STORAGE_SERVER , recruited . id ( ) , interf . id ( ) , details ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . getVersion ) ;
DUMPTOKEN ( recruited . getValue ) ;
DUMPTOKEN ( recruited . getKey ) ;
DUMPTOKEN ( recruited . getKeyValues ) ;
DUMPTOKEN ( recruited . getShardState ) ;
DUMPTOKEN ( recruited . waitMetrics ) ;
DUMPTOKEN ( recruited . splitMetrics ) ;
DUMPTOKEN ( recruited . getPhysicalMetrics ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . getKeyValueStoreType ) ;
DUMPTOKEN ( recruited . watchValue ) ;
//printf("Recruited as storageServer\n");
std : : string filename = filenameFromId ( req . storeType , folder , fileStoragePrefix . toString ( ) , recruited . id ( ) ) ;
IKeyValueStore * data = openKVStore ( req . storeType , filename , recruited . id ( ) , memoryLimit ) ;
Future < Void > kvClosed = data - > onClosed ( ) ;
filesClosed . add ( kvClosed ) ;
2018-05-06 09:16:28 +08:00
ReplyPromise < InitializeStorageReply > storageReady = req . reply ;
2017-05-26 04:48:44 +08:00
storageCache . set ( req . reqId , storageReady . getFuture ( ) ) ;
Future < Void > s = storageServer ( data , recruited , req . seedTag , storageReady , dbInfo , folder ) ;
s = handleIOErrors ( s , data , recruited . id ( ) , kvClosed ) ;
s = storageCache . removeOnReady ( req . reqId , s ) ;
2018-10-09 08:26:10 +08:00
s = storageServerRollbackRebooter ( s , req . storeType , filename , recruited . id ( ) , recruited . locality , dbInfo , folder , & filesClosed , memoryLimit , data ) ;
2018-09-06 06:06:14 +08:00
errorForwarders . add ( forwardError ( errors , Role : : STORAGE_SERVER , recruited . id ( ) , s ) ) ;
2017-05-26 04:48:44 +08:00
} else
forwardPromise ( req . reply , storageCache . get ( req . reqId ) ) ;
}
when ( InitializeMasterProxyRequest req = waitNext ( interf . masterProxy . getFuture ( ) ) ) {
MasterProxyInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " ForMaster " ] = req . master . id ( ) . shortString ( ) ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : MASTER_PROXY , recruited . id ( ) , interf . id ( ) , details ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . commit ) ;
DUMPTOKEN ( recruited . getConsistentReadVersion ) ;
DUMPTOKEN ( recruited . getKeyServersLocations ) ;
DUMPTOKEN ( recruited . getStorageServerRejoinInfo ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getRawCommittedVersion ) ;
DUMPTOKEN ( recruited . txnState ) ;
//printf("Recruited as masterProxyServer\n");
2018-09-06 06:06:14 +08:00
errorForwarders . add ( zombie ( recruited , forwardError ( errors , Role : : MASTER_PROXY , recruited . id ( ) ,
2017-05-26 04:48:44 +08:00
masterProxyServer ( recruited , req , dbInfo ) ) ) ) ;
req . reply . send ( recruited ) ;
}
when ( InitializeResolverRequest req = waitNext ( interf . resolver . getFuture ( ) ) ) {
ResolverInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : RESOLVER , recruited . id ( ) , interf . id ( ) , details ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . resolve ) ;
DUMPTOKEN ( recruited . metrics ) ;
DUMPTOKEN ( recruited . split ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
2018-09-06 06:06:14 +08:00
errorForwarders . add ( zombie ( recruited , forwardError ( errors , Role : : RESOLVER , recruited . id ( ) ,
2017-05-26 04:48:44 +08:00
resolver ( recruited , req , dbInfo ) ) ) ) ;
req . reply . send ( recruited ) ;
}
2017-06-30 06:50:19 +08:00
when ( InitializeLogRouterRequest req = waitNext ( interf . logRouter . getFuture ( ) ) ) {
2018-03-11 01:52:09 +08:00
TLogInterface recruited ( locality ) ;
2017-06-30 06:50:19 +08:00
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
2018-09-06 06:06:14 +08:00
startRole ( Role : : LOG_ROUTER , recruited . id ( ) , interf . id ( ) , details ) ;
2017-06-30 06:50:19 +08:00
DUMPTOKEN ( recruited . peekMessages ) ;
DUMPTOKEN ( recruited . popMessages ) ;
DUMPTOKEN ( recruited . commit ) ;
DUMPTOKEN ( recruited . lock ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . confirmRunning ) ;
2018-09-06 06:06:14 +08:00
errorForwarders . add ( zombie ( recruited , forwardError ( errors , Role : : LOG_ROUTER , recruited . id ( ) ,
2017-06-30 06:50:19 +08:00
logRouter ( recruited , req , dbInfo ) ) ) ) ;
req . reply . send ( recruited ) ;
}
2017-05-26 04:48:44 +08:00
when ( CoordinationPingMessage m = waitNext ( interf . coordinationPing . getFuture ( ) ) ) {
TraceEvent ( " CoordinationPing " , interf . id ( ) ) . detail ( " CCID " , m . clusterControllerId ) . detail ( " TimeStep " , m . timeStep ) ;
}
when ( SetMetricsLogRateRequest req = waitNext ( interf . setMetricsRate . getFuture ( ) ) ) {
TraceEvent ( " LoggingRateChange " , interf . id ( ) ) . detail ( " OldDelay " , loggingDelay ) . detail ( " NewLogPS " , req . metricsLogsPerSecond ) ;
if ( req . metricsLogsPerSecond ! = 0 ) {
loggingDelay = 1.0 / req . metricsLogsPerSecond ;
loggingTrigger = Void ( ) ;
}
}
when ( EventLogRequest req = waitNext ( interf . eventLogRequest . getFuture ( ) ) ) {
2018-05-03 01:44:38 +08:00
TraceEventFields e ;
2017-05-26 04:48:44 +08:00
if ( req . getLastError )
2018-05-03 01:44:38 +08:00
e = latestEventCache . getLatestError ( ) ;
2017-05-26 04:48:44 +08:00
else
2018-05-03 01:44:38 +08:00
e = latestEventCache . get ( req . eventName . toString ( ) ) ;
2017-05-26 04:48:44 +08:00
req . reply . send ( e ) ;
}
when ( TraceBatchDumpRequest req = waitNext ( interf . traceBatchDumpRequest . getFuture ( ) ) ) {
g_traceBatch . dump ( ) ;
req . reply . send ( Void ( ) ) ;
}
when ( DiskStoreRequest req = waitNext ( interf . diskStoreRequest . getFuture ( ) ) ) {
Standalone < VectorRef < UID > > ids ;
for ( DiskStore d : getDiskStores ( folder ) ) {
bool included = true ;
if ( ! req . includePartialStores ) {
if ( d . storeType = = KeyValueStoreType : : SSD_BTREE_V1 ) {
included = fileExists ( d . filename + " .fdb-wal " ) ;
}
else if ( d . storeType = = KeyValueStoreType : : SSD_BTREE_V2 ) {
included = fileExists ( d . filename + " .sqlite-wal " ) ;
}
2017-09-22 14:51:55 +08:00
else if ( d . storeType = = KeyValueStoreType : : SSD_REDWOOD_V1 ) {
included = fileExists ( d . filename + " 0.pagerlog " ) & & fileExists ( d . filename + " 1.pagerlog " ) ;
}
2017-05-26 04:48:44 +08:00
else {
ASSERT ( d . storeType = = KeyValueStoreType : : MEMORY ) ;
included = fileExists ( d . filename + " 1.fdq " ) ;
}
if ( d . storedComponent = = DiskStore : : COMPONENT : : TLogData & & included ) {
std : : string basename = fileLogQueuePrefix . toString ( ) + d . filename . substr ( fileLogDataPrefix . size ( ) ) ;
included = fileExists ( basename + " 0.fdq " ) & & fileExists ( basename + " 1.fdq " ) ;
}
}
if ( included ) {
ids . push_back ( ids . arena ( ) , d . storeID ) ;
}
}
req . reply . send ( ids ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( loggingTrigger ) ) {
2017-05-26 04:48:44 +08:00
systemMonitor ( ) ;
loggingTrigger = delay ( loggingDelay , TaskFlushTrace ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( errorForwarders . getResult ( ) ) ) { }
when ( wait ( handleErrors ) ) { }
2017-05-26 04:48:44 +08:00
}
} catch ( Error & err ) {
state Error e = err ;
bool ok = e . code ( ) = = error_code_please_reboot | | e . code ( ) = = error_code_actor_cancelled | | e . code ( ) = = error_code_please_reboot_delete ;
2018-09-06 06:06:14 +08:00
endRole ( Role : : WORKER , interf . id ( ) , " WorkerError " , ok , e ) ;
2017-05-26 04:48:44 +08:00
errorForwarders . clear ( false ) ;
2018-11-08 13:09:51 +08:00
sharedLogs . clear ( ) ;
2017-05-26 04:48:44 +08:00
if ( e . code ( ) ! = error_code_actor_cancelled ) { // We get cancelled e.g. when an entire simulation times out, but in that case we won't be restarted and don't need to wait for shutdown
stopping . send ( Void ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( filesClosed . getResult ( ) ) ; // Wait for complete shutdown of KV stores
wait ( delay ( 0.0 ) ) ; //Unwind the callstack to make sure that IAsyncFile references are all gone
2017-05-26 04:48:44 +08:00
TraceEvent ( SevInfo , " WorkerShutdownComplete " , interf . id ( ) ) ;
}
throw e ;
}
}
ACTOR Future < Void > extractClusterInterface ( Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > a , Reference < AsyncVar < Optional < ClusterInterface > > > b ) {
loop {
if ( a - > get ( ) . present ( ) )
b - > set ( a - > get ( ) . get ( ) . clientInterface ) ;
else
b - > set ( Optional < ClusterInterface > ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( a - > onChange ( ) ) ;
2017-05-26 04:48:44 +08:00
}
}
static std : : set < int > const & normalWorkerErrors ( ) {
static std : : set < int > s ;
if ( s . empty ( ) ) {
s . insert ( error_code_please_reboot ) ;
s . insert ( error_code_please_reboot_delete ) ;
}
return s ;
}
ACTOR Future < Void > fileNotFoundToNever ( Future < Void > f ) {
try {
2018-08-11 04:57:10 +08:00
wait ( f ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_file_not_found ) {
TraceEvent ( SevWarn , " ClusterCoordinatorFailed " ) . error ( e ) ;
return Never ( ) ;
}
throw ;
}
}
ACTOR Future < Void > printTimeout ( ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 5 ) ) ;
2017-05-26 04:48:44 +08:00
if ( ! g_network - > isSimulated ( ) ) {
fprintf ( stderr , " Warning: FDBD has not joined the cluster after 5 seconds. \n " ) ;
fprintf ( stderr , " Check configuration and availability using the 'status' command with the fdbcli \n " ) ;
}
return Void ( ) ;
}
ACTOR Future < Void > printOnFirstConnected ( Reference < AsyncVar < Optional < ClusterInterface > > > ci ) {
state Future < Void > timeoutFuture = printTimeout ( ) ;
loop {
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( ci - > get ( ) . present ( ) ? IFailureMonitor : : failureMonitor ( ) . onStateEqual ( ci - > get ( ) . get ( ) . openDatabase . getEndpoint ( ) , FailureStatus ( false ) ) : Never ( ) ) ) {
2017-05-26 04:48:44 +08:00
printf ( " FDBD joined cluster. \n " ) ;
TraceEvent ( " FDBDConnected " ) ;
return Void ( ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ci - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
2018-06-09 02:51:51 +08:00
ClusterControllerPriorityInfo getCCPriorityInfo ( std : : string filePath , ProcessClass processClass ) {
2018-06-08 04:07:19 +08:00
if ( ! fileExists ( filePath ) )
return ClusterControllerPriorityInfo ( ProcessClass ( processClass . classType ( ) , ProcessClass : : CommandLineSource ) . machineClassFitness ( ProcessClass : : ClusterController ) , false , ClusterControllerPriorityInfo : : FitnessUnknown ) ;
std : : string contents ( readFileBytes ( filePath , 1000 ) ) ;
BinaryReader br ( StringRef ( contents ) , IncludeVersion ( ) ) ;
ClusterControllerPriorityInfo priorityInfo ( ProcessClass : : UnsetFit , false , ClusterControllerPriorityInfo : : FitnessUnknown ) ;
br > > priorityInfo ;
2018-06-09 05:03:36 +08:00
if ( ! br . empty ( ) ) {
if ( g_network - > isSimulated ( ) ) {
2018-06-09 07:09:59 +08:00
ASSERT ( false ) ;
2018-06-09 05:03:36 +08:00
}
else {
TraceEvent ( SevWarnAlways , " FitnessFileCorrupted " ) . detail ( " filePath " , filePath ) ;
return ClusterControllerPriorityInfo ( ProcessClass ( processClass . classType ( ) , ProcessClass : : CommandLineSource ) . machineClassFitness ( ProcessClass : : ClusterController ) , false , ClusterControllerPriorityInfo : : FitnessUnknown ) ;
}
}
2018-06-08 04:07:19 +08:00
return priorityInfo ;
}
2018-06-09 02:51:51 +08:00
ACTOR Future < Void > monitorAndWriteCCPriorityInfo ( std : : string filePath , Reference < AsyncVar < ClusterControllerPriorityInfo > > asyncPriorityInfo ) {
2018-06-08 04:07:19 +08:00
loop {
2018-08-11 04:57:10 +08:00
wait ( asyncPriorityInfo - > onChange ( ) ) ;
2018-06-08 04:07:19 +08:00
std : : string contents ( BinaryWriter : : toValue ( asyncPriorityInfo - > get ( ) , IncludeVersion ( ) ) . toString ( ) ) ;
atomicReplace ( filePath , contents , false ) ;
}
}
ACTOR Future < UID > createAndLockProcessIdFile ( std : : string folder ) {
state UID processIDUid ;
platform : : createDirectory ( folder ) ;
try {
state std : : string lockFilePath = joinPath ( folder , " processId " ) ;
state ErrorOr < Reference < IAsyncFile > > lockFile = wait ( errorOr ( IAsyncFileSystem : : filesystem ( g_network ) - > open ( lockFilePath , IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0600 ) ) ) ;
if ( lockFile . isError ( ) & & lockFile . getError ( ) . code ( ) = = error_code_file_not_found & & ! fileExists ( lockFilePath ) ) {
Reference < IAsyncFile > _lockFile = wait ( IAsyncFileSystem : : filesystem ( ) - > open ( lockFilePath , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_LOCK | IAsyncFile : : OPEN_READWRITE , 0600 ) ) ;
lockFile = _lockFile ;
processIDUid = g_random - > randomUniqueID ( ) ;
BinaryWriter wr ( IncludeVersion ( ) ) ;
wr < < processIDUid ;
2018-08-11 04:57:10 +08:00
wait ( lockFile . get ( ) - > write ( wr . getData ( ) , wr . getLength ( ) , 0 ) ) ;
wait ( lockFile . get ( ) - > sync ( ) ) ;
2018-06-08 04:07:19 +08:00
}
else {
if ( lockFile . isError ( ) ) throw lockFile . getError ( ) ; // If we've failed to open the file, throw an exception
int64_t fileSize = wait ( lockFile . get ( ) - > size ( ) ) ;
state Key fileData = makeString ( fileSize ) ;
int length = wait ( lockFile . get ( ) - > read ( mutateString ( fileData ) , fileSize , 0 ) ) ;
processIDUid = BinaryReader : : fromStringRef < UID > ( fileData , IncludeVersion ( ) ) ;
}
}
catch ( Error & e ) {
if ( e . code ( ) ! = error_code_actor_cancelled ) {
if ( ! e . isInjectedFault ( ) )
fprintf ( stderr , " ERROR: error creating or opening process id file `%s'. \n " , joinPath ( folder , " processId " ) . c_str ( ) ) ;
TraceEvent ( SevError , " OpenProcessIdError " ) . error ( e ) ;
}
throw ;
}
return processIDUid ;
}
2017-05-26 04:48:44 +08:00
ACTOR Future < Void > fdbd (
Reference < ClusterConnectionFile > connFile ,
LocalityData localities ,
ProcessClass processClass ,
std : : string dataFolder ,
std : : string coordFolder ,
int64_t memoryLimit ,
std : : string metricsConnFile ,
std : : string metricsPrefix )
{
2017-05-27 08:43:28 +08:00
try {
2018-06-08 04:07:19 +08:00
2017-05-27 08:43:28 +08:00
ServerCoordinators coordinators ( connFile ) ;
2018-06-09 02:11:08 +08:00
TraceEvent ( " StartingFDBD " ) . detailext ( " ZoneID " , localities . zoneId ( ) ) . detailext ( " MachineId " , localities . machineId ( ) ) . detail ( " DiskPath " , dataFolder ) . detail ( " CoordPath " , coordFolder ) ;
2017-05-27 08:43:28 +08:00
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
2018-06-08 04:07:19 +08:00
state vector < Future < Void > > v ;
// Endpoints should be registered first before any process trying to connect to it. So coordinationServer actor should be the first one executed before any other.
2017-05-27 08:43:28 +08:00
if ( coordFolder . size ( ) )
v . push_back ( fileNotFoundToNever ( coordinationServer ( coordFolder ) ) ) ; //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
2018-06-08 04:07:19 +08:00
state UID processIDUid = wait ( createAndLockProcessIdFile ( dataFolder ) ) ;
2018-09-29 03:12:06 +08:00
localities . set ( LocalityData : : keyProcessId , processIDUid . toString ( ) ) ;
2018-06-08 04:07:19 +08:00
// Only one process can execute on a dataFolder from this point onwards
2018-06-09 02:51:51 +08:00
std : : string fitnessFilePath = joinPath ( dataFolder , " fitness " ) ;
2018-06-08 04:07:19 +08:00
Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > cc ( new AsyncVar < Optional < ClusterControllerFullInterface > > ) ;
Reference < AsyncVar < Optional < ClusterInterface > > > ci ( new AsyncVar < Optional < ClusterInterface > > ) ;
2018-06-09 02:51:51 +08:00
Reference < AsyncVar < ClusterControllerPriorityInfo > > asyncPriorityInfo ( new AsyncVar < ClusterControllerPriorityInfo > ( getCCPriorityInfo ( fitnessFilePath , processClass ) ) ) ;
2018-06-08 04:07:19 +08:00
Promise < Void > recoveredDiskFiles ;
2018-06-09 07:09:59 +08:00
v . push_back ( reportErrors ( monitorAndWriteCCPriorityInfo ( fitnessFilePath , asyncPriorityInfo ) , " MonitorAndWriteCCPriorityInfo " ) ) ;
2018-09-29 03:12:06 +08:00
v . push_back ( reportErrors ( processClass = = ProcessClass : : TesterClass ? monitorLeader ( connFile , cc ) : clusterController ( connFile , cc , asyncPriorityInfo , recoveredDiskFiles . getFuture ( ) , localities ) , " ClusterController " ) ) ;
2018-06-09 02:11:08 +08:00
v . push_back ( reportErrors ( extractClusterInterface ( cc , ci ) , " ExtractClusterInterface " ) ) ;
v . push_back ( reportErrors ( failureMonitorClient ( ci , true ) , " FailureMonitorClient " ) ) ;
2018-09-29 03:12:06 +08:00
v . push_back ( reportErrorsExcept ( workerServer ( connFile , cc , localities , asyncPriorityInfo , processClass , dataFolder , memoryLimit , metricsConnFile , metricsPrefix , recoveredDiskFiles ) , " WorkerServer " , UID ( ) , & normalWorkerErrors ( ) ) ) ;
2017-05-27 08:43:28 +08:00
state Future < Void > firstConnect = reportErrors ( printOnFirstConnected ( ci ) , " ClusterFirstConnectedError " ) ;
2018-08-11 04:57:10 +08:00
wait ( quorum ( v , 1 ) ) ;
2017-05-27 08:43:28 +08:00
ASSERT ( false ) ; // None of these actors should terminate normally
throw internal_error ( ) ;
} catch ( Error & e ) {
Error err = checkIOTimeout ( e ) ;
throw err ;
}
2017-05-26 04:48:44 +08:00
}
2018-09-06 06:06:14 +08:00
2018-09-06 06:53:12 +08:00
const Role Role : : WORKER ( " Worker " , " WK " , false ) ;
2018-09-06 06:06:14 +08:00
const Role Role : : STORAGE_SERVER ( " StorageServer " , " SS " ) ;
const Role Role : : TRANSACTION_LOG ( " TLog " , " TL " ) ;
2018-09-06 06:53:12 +08:00
const Role Role : : SHARED_TRANSACTION_LOG ( " SharedTLog " , " SL " , false ) ;
2018-09-06 06:06:14 +08:00
const Role Role : : MASTER_PROXY ( " MasterProxyServer " , " MP " ) ;
const Role Role : : MASTER ( " MasterServer " , " MS " ) ;
const Role Role : : RESOLVER ( " Resolver " , " RV " ) ;
const Role Role : : CLUSTER_CONTROLLER ( " ClusterController " , " CC " ) ;
const Role Role : : TESTER ( " Tester " , " TS " ) ;
const Role Role : : LOG_ROUTER ( " LogRouter " , " LR " ) ;
2018-12-14 05:31:37 +08:00
const Role Role : : DATA_DISTRIBUTOR ( " DataDistributor " , " DD " ) ;