2017-05-26 04:48:44 +08:00
/*
* worker . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/actorcompiler.h"
# include "flow/ActorCollection.h"
# include "flow/SystemMonitor.h"
# include "flow/TDMetric.actor.h"
# include "fdbrpc/simulator.h"
# include "fdbclient/NativeAPI.h"
2017-07-25 04:13:06 +08:00
# include "fdbclient/MetricLogger.h"
2017-05-26 04:48:44 +08:00
# include "WorkerInterface.h"
# include "IKeyValueStore.h"
# include "WaitFailure.h"
# include "TesterInterface.h" // for poisson()
# include "IDiskQueue.h"
# include "fdbclient/DatabaseContext.h"
# include "ClusterRecruitmentInterface.h"
# include "ServerDBInfo.h"
# include "fdbserver/CoordinationInterface.h"
# include "fdbclient/FailureMonitorClient.h"
# include "fdbclient/MonitorLeader.h"
2017-10-12 05:13:16 +08:00
# include "fdbclient/ClientWorkerInterface.h"
# include "flow/Profiler.h"
2017-05-26 04:48:44 +08:00
# ifdef __linux__
# ifdef USE_GPERFTOOLS
# include "gperftools/profiler.h"
# endif
# include <unistd.h>
# include <thread>
# include <execinfo.h>
# endif
# if CENABLED(0, NOT_IN_CLEAN)
extern IKeyValueStore * keyValueStoreCompressTestData ( IKeyValueStore * store ) ;
# define KV_STORE(filename,uid) keyValueStoreCompressTestData(keyValueStoreSQLite(filename,uid))
# elif CENABLED(0, NOT_IN_CLEAN)
# define KV_STORE(filename,uid) keyValueStoreSQLite(filename,uid)
# else
# define KV_STORE(filename,uid) keyValueStoreMemory(filename,uid)
# endif
ACTOR static Future < Void > extractClientInfo ( Reference < AsyncVar < ServerDBInfo > > db , Reference < AsyncVar < ClientDBInfo > > info ) {
loop {
info - > set ( db - > get ( ) . client ) ;
Void _ = wait ( db - > onChange ( ) ) ;
}
}
Database openDBOnServer ( Reference < AsyncVar < ServerDBInfo > > const & db , int taskID , bool enableLocalityLoadBalance , bool lockAware ) {
Reference < AsyncVar < ClientDBInfo > > info ( new AsyncVar < ClientDBInfo > ) ;
return DatabaseContext : : create ( info , extractClientInfo ( db , info ) , enableLocalityLoadBalance ? db - > get ( ) . myLocality : LocalityData ( ) , enableLocalityLoadBalance , taskID , lockAware ) ;
}
struct ErrorInfo {
Error error ;
const char * context ;
UID id ;
ErrorInfo ( ) { }
ErrorInfo ( Error e , const char * context , UID id ) : error ( e ) , context ( context ) , id ( id ) { }
template < class Ar > void serialize ( Ar & ) { ASSERT ( false ) ; }
} ;
2017-05-27 08:43:28 +08:00
Error checkIOTimeout ( Error const & e ) {
2017-06-01 08:03:15 +08:00
// Convert all_errors to io_timeout if global timeout bool was set
2017-06-16 08:40:19 +08:00
bool timeoutOccurred = ( bool ) g_network - > global ( INetwork : : enASIOTimedOut ) ;
// In simulation, have to check global timed out flag for both this process and the machine process on which IO is done
if ( g_network - > isSimulated ( ) & & ! timeoutOccurred )
timeoutOccurred = g_pSimulator - > getCurrentProcess ( ) - > machine - > machineProcess - > global ( INetwork : : enASIOTimedOut ) ;
if ( timeoutOccurred ) {
TEST ( true ) ; // Timeout occurred
2017-05-27 08:43:28 +08:00
Error timeout = io_timeout ( ) ;
2017-06-16 08:40:19 +08:00
// Preserve injectedness of error
if ( e . isInjectedFault ( ) )
2017-05-27 08:43:28 +08:00
timeout = timeout . asInjectedFault ( ) ;
return timeout ;
}
return e ;
}
2017-05-26 04:48:44 +08:00
ACTOR Future < Void > forwardError ( PromiseStream < ErrorInfo > errors ,
const char * context , UID id ,
Future < Void > process )
{
try {
Void _ = wait ( process ) ;
errors . send ( ErrorInfo ( success ( ) , context , id ) ) ;
return Void ( ) ;
} catch ( Error & e ) {
errors . send ( ErrorInfo ( e , context , id ) ) ;
return Void ( ) ;
}
}
ACTOR Future < Void > handleIOErrors ( Future < Void > actor , IClosable * store , UID id , Future < Void > onClosed = Void ( ) ) {
choose {
when ( state ErrorOr < Void > e = wait ( errorOr ( actor ) ) ) {
Void _ = wait ( onClosed ) ;
if ( e . isError ( ) ) throw e . getError ( ) ; else return e . get ( ) ;
}
when ( ErrorOr < Void > e = wait ( actor . isReady ( ) ? Never ( ) : errorOr ( store - > getError ( ) ) ) ) {
TraceEvent ( " WorkerTerminatingByIOError " , id ) . error ( e . getError ( ) , true ) ;
actor . cancel ( ) ;
// file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be reported SevError.
if ( e . getError ( ) . code ( ) = = error_code_file_not_found ) {
TEST ( true ) ; // Worker terminated with file_not_found error
return Void ( ) ;
}
throw e . getError ( ) ;
}
}
}
2017-05-27 08:43:28 +08:00
ACTOR Future < Void > workerHandleErrors ( FutureStream < ErrorInfo > errors ) {
2017-05-26 04:48:44 +08:00
loop choose {
2017-05-27 08:43:28 +08:00
when ( ErrorInfo _err = waitNext ( errors ) ) {
ErrorInfo err = _err ;
2017-05-26 04:48:44 +08:00
bool ok =
err . error . code ( ) = = error_code_success | |
err . error . code ( ) = = error_code_please_reboot | |
err . error . code ( ) = = error_code_actor_cancelled | |
err . error . code ( ) = = error_code_coordinators_changed ; // The worker server was cancelled
2017-06-16 08:40:19 +08:00
if ( ! ok )
err . error = checkIOTimeout ( err . error ) ; // Possibly convert error to io_timeout
2017-05-26 04:48:44 +08:00
endRole ( err . id , err . context , " Error " , ok , err . error ) ;
if ( err . error . code ( ) = = error_code_please_reboot | | err . error . code ( ) = = error_code_io_timeout ) throw err . error ;
}
}
}
// Improve simulation code coverage by sometimes deferring the destruction of workerInterface (and therefore "endpoint not found" responses to clients
// for an extra second, so that clients are more likely to see broken_promise errors
ACTOR template < class T > Future < Void > zombie ( T workerInterface , Future < Void > worker ) {
try {
Void _ = wait ( worker ) ;
if ( BUGGIFY )
Void _ = wait ( delay ( 1.0 ) ) ;
return Void ( ) ;
} catch ( Error & e ) {
throw ;
}
}
ACTOR Future < Void > loadedPonger ( FutureStream < LoadedPingRequest > pings ) {
state Standalone < StringRef > payloadBack = std : : string ( 20480 , ' . ' ) ;
loop {
LoadedPingRequest pong = waitNext ( pings ) ;
LoadedReply rep ;
rep . payload = ( pong . loadReply ? payloadBack : LiteralStringRef ( " " ) ) ;
rep . id = pong . id ;
pong . reply . send ( rep ) ;
}
}
StringRef fileStoragePrefix = LiteralStringRef ( " storage- " ) ;
StringRef fileLogDataPrefix = LiteralStringRef ( " log- " ) ;
StringRef fileLogQueuePrefix = LiteralStringRef ( " logqueue- " ) ;
std : : pair < KeyValueStoreType , std : : string > bTreeV1Suffix = std : : make_pair ( KeyValueStoreType : : SSD_BTREE_V1 , " .fdb " ) ;
std : : pair < KeyValueStoreType , std : : string > bTreeV2Suffix = std : : make_pair ( KeyValueStoreType : : SSD_BTREE_V2 , " .sqlite " ) ;
std : : pair < KeyValueStoreType , std : : string > memorySuffix = std : : make_pair ( KeyValueStoreType : : MEMORY , " -0.fdq " ) ;
std : : string validationFilename = " _validate " ;
std : : string filenameFromSample ( KeyValueStoreType storeType , std : : string folder , std : : string sample_filename ) {
if ( storeType = = KeyValueStoreType : : SSD_BTREE_V1 )
return joinPath ( folder , sample_filename ) ;
else if ( storeType = = KeyValueStoreType : : SSD_BTREE_V2 )
2017-08-29 02:25:37 +08:00
return joinPath ( folder , sample_filename ) ;
2017-05-26 04:48:44 +08:00
else if ( storeType = = KeyValueStoreType : : MEMORY )
return joinPath ( folder , sample_filename . substr ( 0 , sample_filename . size ( ) - 5 ) ) ;
UNREACHABLE ( ) ;
}
std : : string filenameFromId ( KeyValueStoreType storeType , std : : string folder , std : : string prefix , UID id ) {
if ( storeType = = KeyValueStoreType : : SSD_BTREE_V1 )
return joinPath ( folder , prefix + id . toString ( ) + " .fdb " ) ;
else if ( storeType = = KeyValueStoreType : : SSD_BTREE_V2 )
2017-08-29 02:25:37 +08:00
return joinPath ( folder , prefix + id . toString ( ) + " .sqlite " ) ;
2017-05-26 04:48:44 +08:00
else if ( storeType = = KeyValueStoreType : : MEMORY )
return joinPath ( folder , prefix + id . toString ( ) + " - " ) ;
UNREACHABLE ( ) ;
}
struct DiskStore {
enum COMPONENT { TLogData , Storage } ;
UID storeID ;
std : : string filename ; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent ;
KeyValueStoreType storeType ;
} ;
std : : vector < DiskStore > getDiskStores ( std : : string folder , std : : string suffix , KeyValueStoreType type ) {
std : : vector < DiskStore > result ;
vector < std : : string > files = platform : : listFiles ( folder , suffix ) ;
for ( int idx = 0 ; idx < files . size ( ) ; idx + + ) {
DiskStore store ;
store . storeType = type ;
StringRef prefix ;
if ( StringRef ( files [ idx ] ) . startsWith ( fileStoragePrefix ) ) {
store . storedComponent = DiskStore : : Storage ;
prefix = fileStoragePrefix ;
}
else if ( StringRef ( files [ idx ] ) . startsWith ( fileLogDataPrefix ) ) {
store . storedComponent = DiskStore : : TLogData ;
prefix = fileLogDataPrefix ;
}
else
continue ;
store . storeID = UID : : fromString ( files [ idx ] . substr ( prefix . size ( ) , 32 ) ) ;
store . filename = filenameFromSample ( type , folder , files [ idx ] ) ;
result . push_back ( store ) ;
}
return result ;
}
std : : vector < DiskStore > getDiskStores ( std : : string folder ) {
auto result = getDiskStores ( folder , bTreeV1Suffix . second , bTreeV1Suffix . first ) ;
auto result1 = getDiskStores ( folder , bTreeV2Suffix . second , bTreeV2Suffix . first ) ;
result . insert ( result . end ( ) , result1 . begin ( ) , result1 . end ( ) ) ;
auto result2 = getDiskStores ( folder , memorySuffix . second , memorySuffix . first ) ;
result . insert ( result . end ( ) , result2 . begin ( ) , result2 . end ( ) ) ;
return result ;
}
2017-11-15 05:57:37 +08:00
ACTOR Future < Void > registrationClient ( Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , WorkerInterface interf , Reference < AsyncVar < ProcessClass > > asyncProcessClass , ProcessClass initialClass , Reference < AsyncVar < bool > > asyncIsExcluded ) {
2017-05-26 04:48:44 +08:00
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0 ;
2017-10-05 08:11:12 +08:00
2017-05-26 04:48:44 +08:00
loop {
2017-11-15 05:57:37 +08:00
Future < RegisterWorkerReply > registrationReply = ccInterface - > get ( ) . present ( ) ? brokenPromiseToNever ( ccInterface - > get ( ) . get ( ) . registerWorker . getReply ( RegisterWorkerRequest ( interf , initialClass , asyncProcessClass - > get ( ) , asyncIsExcluded - > get ( ) , requestGeneration + + ) ) ) : Never ( ) ;
2017-09-26 01:36:03 +08:00
choose {
2017-11-15 05:57:37 +08:00
when ( RegisterWorkerReply reply = wait ( registrationReply ) ) {
asyncProcessClass - > set ( reply . processClass ) ;
asyncIsExcluded - > set ( reply . isExcluded ) ;
2017-09-26 01:36:03 +08:00
}
when ( Void _ = wait ( ccInterface - > onChange ( ) ) ) { }
}
2017-05-26 04:48:44 +08:00
}
}
# if defined(__linux__) && defined(USE_GPERFTOOLS)
//A set of threads that should be profiled
std : : set < std : : thread : : id > profiledThreads ;
//Returns whether or not a given thread should be profiled
int filter_in_thread ( void * arg ) {
return profiledThreads . count ( std : : this_thread : : get_id ( ) ) > 0 ? 1 : 0 ;
}
# endif
//Enables the calling thread to be profiled
void registerThreadForProfiling ( ) {
# if defined(__linux__) && defined(USE_GPERFTOOLS)
//Not sure if this is actually needed, but a call to backtrace was advised here:
//http://groups.google.com/group/google-perftools/browse_thread/thread/0dfd74532e038eb8/2686d9f24ac4365f?pli=1
profiledThreads . insert ( std : : this_thread : : get_id ( ) ) ;
const int num_levels = 100 ;
void * pc [ num_levels ] ;
backtrace ( pc , num_levels ) ;
# endif
}
//Starts or stops the CPU profiler
void updateCpuProfiler ( ProfilerRequest req ) {
2017-10-12 05:13:16 +08:00
switch ( req . type ) {
case ProfilerRequest : : Type : : GPROF :
# if defined(__linux__) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
switch ( req . action ) {
case ProfilerRequest : : Action : : ENABLE : {
2017-10-17 07:04:09 +08:00
const char * path = ( const char * ) req . outputFile . begin ( ) ;
2017-10-12 05:13:16 +08:00
ProfilerOptions * options = new ProfilerOptions ( ) ;
options - > filter_in_thread = & filter_in_thread ;
options - > filter_in_thread_arg = NULL ;
ProfilerStartWithOptions ( path , options ) ;
free ( workingDir ) ;
break ;
}
case ProfilerRequest : : Action : : DISABLE :
ProfilerStop ( ) ;
break ;
case ProfilerRequest : : Action : : RUN :
ASSERT ( false ) ; // User should have called runProfiler.
break ;
}
2017-05-26 04:48:44 +08:00
# endif
2017-10-12 05:13:16 +08:00
break ;
case ProfilerRequest : : Type : : FLOW :
switch ( req . action ) {
case ProfilerRequest : : Action : : ENABLE :
startProfiling ( g_network , { } , req . outputFile ) ;
break ;
case ProfilerRequest : : Action : : DISABLE :
stopProfiling ( ) ;
break ;
case ProfilerRequest : : Action : : RUN :
ASSERT ( false ) ; // User should have called runProfiler.
break ;
}
break ;
}
}
ACTOR Future < Void > runProfiler ( ProfilerRequest req ) {
2017-11-08 05:54:17 +08:00
if ( req . action = = ProfilerRequest : : Action : : RUN ) {
req . action = ProfilerRequest : : Action : : ENABLE ;
updateCpuProfiler ( req ) ;
Void _ = wait ( delay ( req . duration ) ) ;
req . action = ProfilerRequest : : Action : : DISABLE ;
updateCpuProfiler ( req ) ;
return Void ( ) ;
} else {
updateCpuProfiler ( req ) ;
return Void ( ) ;
}
2017-05-26 04:48:44 +08:00
}
ACTOR Future < Void > storageServerRollbackRebooter ( Future < Void > prevStorageServer , KeyValueStoreType storeType , std : : string filename , StorageServerInterface ssi , Reference < AsyncVar < ServerDBInfo > > db , std : : string folder , ActorCollection * filesClosed , int64_t memoryLimit ) {
loop {
ErrorOr < Void > e = wait ( errorOr ( prevStorageServer ) ) ;
if ( ! e . isError ( ) ) return Void ( ) ;
else if ( e . getError ( ) . code ( ) ! = error_code_please_reboot ) throw e . getError ( ) ;
TraceEvent ( " StorageServerRequestedReboot " , ssi . id ( ) ) ;
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
// the old one so the failure detector will know it is gone.
StorageServerInterface ssi_new ;
ssi_new . uniqueID = ssi . uniqueID ;
ssi_new . locality = ssi . locality ;
ssi = ssi_new ;
ssi . initEndpoints ( ) ;
auto * kv = openKVStore ( storeType , filename , ssi . uniqueID , memoryLimit ) ;
Future < Void > kvClosed = kv - > onClosed ( ) ;
filesClosed - > add ( kvClosed ) ;
2017-09-16 01:57:58 +08:00
prevStorageServer = storageServer ( kv , ssi , db , folder , Promise < Void > ( ) ) ;
2017-05-26 04:48:44 +08:00
prevStorageServer = handleIOErrors ( prevStorageServer , kv , ssi . id ( ) , kvClosed ) ;
}
}
// FIXME: This will not work correctly in simulation as all workers would share the same roles map
std : : set < std : : pair < std : : string , std : : string > > g_roles ;
Standalone < StringRef > roleString ( std : : set < std : : pair < std : : string , std : : string > > roles , bool with_ids ) {
std : : string result ;
for ( auto & r : roles ) {
if ( ! result . empty ( ) )
result . append ( " , " ) ;
result . append ( r . first ) ;
if ( with_ids ) {
result . append ( " : " ) ;
result . append ( r . second ) ;
}
}
return StringRef ( result ) ;
}
void startRole ( UID roleId , UID workerId , std : : string as , std : : map < std : : string , std : : string > details , std : : string origination ) {
TraceEvent ev ( " Role " , roleId ) ;
ev . detail ( " As " , as )
. detail ( " Transition " , " Begin " )
. detail ( " Origination " , origination )
. detail ( " OnWorker " , workerId ) ;
for ( auto it = details . begin ( ) ; it ! = details . end ( ) ; it + + )
ev . detail ( it - > first . c_str ( ) , it - > second ) ;
ev . trackLatest ( ( roleId . shortString ( ) + " .Role " ) . c_str ( ) ) ;
// Update roles map, log Roles metrics
g_roles . insert ( { as , roleId . shortString ( ) } ) ;
StringMetricHandle ( LiteralStringRef ( " Roles " ) ) = roleString ( g_roles , false ) ;
StringMetricHandle ( LiteralStringRef ( " RolesWithIDs " ) ) = roleString ( g_roles , true ) ;
2017-08-29 02:25:37 +08:00
if ( g_network - > isSimulated ( ) ) g_simulator . addRole ( g_network - > getLocalAddress ( ) , as ) ;
2017-05-26 04:48:44 +08:00
}
void endRole ( UID id , std : : string as , std : : string reason , bool ok , Error e ) {
{
TraceEvent ev ( " Role " , id ) ;
ev . detail ( " Transition " , " End " )
. detail ( " As " , as )
. detail ( " Reason " , reason ) ;
if ( e . code ( ) ! = invalid_error_code )
ev . error ( e , true ) ;
ev . trackLatest ( ( id . shortString ( ) + " .Role " ) . c_str ( ) ) ;
}
if ( ! ok ) {
std : : string type = as + " Failed " ;
TraceEvent err ( SevError , type . c_str ( ) , id ) ;
err . detail ( " Reason " , reason ) ;
if ( e . code ( ) ! = invalid_error_code ) {
err . error ( e , true ) ;
}
}
latestEventCache . clear ( id . shortString ( ) ) ;
// Update roles map, log Roles metrics
g_roles . erase ( { as , id . shortString ( ) } ) ;
StringMetricHandle ( LiteralStringRef ( " Roles " ) ) = roleString ( g_roles , false ) ;
StringMetricHandle ( LiteralStringRef ( " RolesWithIDs " ) ) = roleString ( g_roles , true ) ;
2017-08-29 02:25:37 +08:00
if ( g_network - > isSimulated ( ) ) g_simulator . removeRole ( g_network - > getLocalAddress ( ) , as ) ;
2017-05-26 04:48:44 +08:00
}
ACTOR Future < Void > monitorServerDBInfo ( Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , Reference < ClusterConnectionFile > connFile , LocalityData locality , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo ;
localInfo . myLocality = locality ;
dbInfo - > set ( localInfo ) ;
loop {
GetServerDBInfoRequest req ;
req . knownServerInfoID = dbInfo - > get ( ) . id ;
ClusterConnectionString fileConnectionString ;
if ( connFile & & ! connFile - > fileContentsUpToDate ( fileConnectionString ) ) {
2018-01-06 02:29:47 +08:00
req . issues = LiteralStringRef ( " incorrect_cluster_file_contents " ) ;
2017-11-11 06:12:45 +08:00
if ( connFile - > canGetFilename ( ) ) {
2017-05-26 04:48:44 +08:00
TraceEvent ( SevWarnAlways , " IncorrectClusterFileContents " ) . detail ( " Filename " , connFile - > getFilename ( ) )
. detail ( " ConnectionStringFromFile " , fileConnectionString . toString ( ) )
. detail ( " CurrentConnectionString " , connFile - > getConnectionString ( ) . toString ( ) ) ;
}
}
auto peers = FlowTransport : : transport ( ) . getIncompatiblePeers ( ) ;
for ( auto it = peers - > begin ( ) ; it ! = peers - > end ( ) ; ) {
if ( now ( ) - it - > second . second > SERVER_KNOBS - > INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
req . incompatiblePeers . push_back ( it - > first ) ;
it = peers - > erase ( it ) ;
} else {
it + + ;
}
}
choose {
when ( ServerDBInfo ni = wait ( ccInterface - > get ( ) . present ( ) ? brokenPromiseToNever ( ccInterface - > get ( ) . get ( ) . getServerDBInfo . getReply ( req ) ) : Never ( ) ) ) {
TraceEvent ( " GotServerDBInfoChange " ) . detail ( " ChangeID " , ni . id ) . detail ( " MasterID " , ni . master . id ( ) ) ;
ServerDBInfo localInfo = ni ;
localInfo . myLocality = locality ;
dbInfo - > set ( localInfo ) ;
}
when ( Void _ = wait ( ccInterface - > onChange ( ) ) ) {
if ( ccInterface - > get ( ) . present ( ) )
TraceEvent ( " GotCCInterfaceChange " ) . detail ( " CCID " , ccInterface - > get ( ) . get ( ) . id ( ) ) . detail ( " CCMachine " , ccInterface - > get ( ) . get ( ) . getWorkers . getEndpoint ( ) . address ) ;
}
}
}
}
ACTOR Future < Void > workerServer ( Reference < ClusterConnectionFile > connFile , Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , LocalityData localities ,
2017-11-15 05:57:37 +08:00
Reference < AsyncVar < ProcessClass > > asyncProcessClass , ProcessClass initialClass , Reference < AsyncVar < bool > > asyncIsExcluded , std : : string folder , int64_t memoryLimit , std : : string metricsConnFile , std : : string metricsPrefix ) {
2017-05-26 04:48:44 +08:00
state PromiseStream < ErrorInfo > errors ;
2017-05-27 08:43:28 +08:00
state Future < Void > handleErrors = workerHandleErrors ( errors . getFuture ( ) ) ; // Needs to be stopped last
2017-05-26 04:48:44 +08:00
state ActorCollection errorForwarders ( false ) ;
state Future < Void > loggingTrigger = Void ( ) ;
state double loggingDelay = SERVER_KNOBS - > WORKER_LOGGING_INTERVAL ;
state ActorCollection filesClosed ( true ) ;
state Promise < Void > stopping ;
state WorkerCache < StorageServerInterface > storageCache ;
state Reference < AsyncVar < ServerDBInfo > > dbInfo ( new AsyncVar < ServerDBInfo > ( ServerDBInfo ( LiteralStringRef ( " DB " ) ) ) ) ;
state Future < Void > metricsLogger ;
state UID processIDUid ;
state PromiseStream < InitializeTLogRequest > tlogRequests ;
state Future < Void > tlog = Void ( ) ;
platform : : createDirectory ( folder ) ;
try {
state std : : string lockFilePath = joinPath ( folder , " processId " ) ;
state ErrorOr < Reference < IAsyncFile > > lockFile = wait ( errorOr ( IAsyncFileSystem : : filesystem ( g_network ) - > open ( lockFilePath , IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0600 ) ) ) ;
if ( lockFile . isError ( ) & & lockFile . getError ( ) . code ( ) = = error_code_file_not_found & & ! fileExists ( lockFilePath ) ) {
Reference < IAsyncFile > _lockFile = wait ( IAsyncFileSystem : : filesystem ( ) - > open ( lockFilePath , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_LOCK | IAsyncFile : : OPEN_READWRITE , 0600 ) ) ;
lockFile = _lockFile ;
processIDUid = g_random - > randomUniqueID ( ) ;
BinaryWriter wr ( IncludeVersion ( ) ) ;
wr < < processIDUid ;
Void _ = wait ( lockFile . get ( ) - > write ( wr . getData ( ) , wr . getLength ( ) , 0 ) ) ;
Void _ = wait ( lockFile . get ( ) - > sync ( ) ) ;
} else {
if ( lockFile . isError ( ) ) throw lockFile . getError ( ) ; // If we've failed to open the file, throw an exception
int64_t fileSize = wait ( lockFile . get ( ) - > size ( ) ) ;
state Key fileData = makeString ( fileSize ) ;
int length = wait ( lockFile . get ( ) - > read ( mutateString ( fileData ) , fileSize , 0 ) ) ;
processIDUid = BinaryReader : : fromStringRef < UID > ( fileData , IncludeVersion ( ) ) ;
}
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_actor_cancelled ) {
if ( ! e . isInjectedFault ( ) )
fprintf ( stderr , " ERROR: error creating or opening process id file `%s'. \n " , joinPath ( folder , " processId " ) . c_str ( ) ) ;
TraceEvent ( SevError , " OpenProcessIdError " ) . error ( e ) ;
}
throw ;
}
state Standalone < StringRef > processID = processIDUid . toString ( ) ;
state LocalityData locality = localities ;
locality . set ( LocalityData : : keyProcessId , processID ) ;
state WorkerInterface interf ( locality ) ;
if ( metricsPrefix . size ( ) > 0 ) {
if ( metricsConnFile . size ( ) > 0 ) {
try {
state Reference < Cluster > cluster = Cluster : : createCluster ( metricsConnFile , Cluster : : API_VERSION_LATEST ) ;
2017-09-02 03:53:01 +08:00
metricsLogger = runMetrics ( cluster - > createDatabase ( LiteralStringRef ( " DB " ) , locality ) , KeyRef ( metricsPrefix ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
TraceEvent ( SevWarnAlways , " TDMetricsBadClusterFile " ) . error ( e ) . detail ( " ConnFile " , metricsConnFile ) ;
}
} else {
bool lockAware = metricsPrefix . size ( ) & & metricsPrefix [ 0 ] = = ' \xff ' ;
metricsLogger = runMetrics ( openDBOnServer ( dbInfo , TaskDefaultEndpoint , true , lockAware ) , KeyRef ( metricsPrefix ) ) ;
}
}
errorForwarders . add ( loadedPonger ( interf . debugPing . getFuture ( ) ) ) ;
errorForwarders . add ( waitFailureServer ( interf . waitFailure . getFuture ( ) ) ) ;
errorForwarders . add ( monitorServerDBInfo ( ccInterface , connFile , locality , dbInfo ) ) ;
2017-09-02 03:53:01 +08:00
errorForwarders . add ( testerServerCore ( interf . testerInterface , connFile , dbInfo , locality ) ) ;
2017-05-26 04:48:44 +08:00
filesClosed . add ( stopping . getFuture ( ) ) ;
initializeSystemMonitorMachineState ( SystemMonitorMachineState ( folder , locality . zoneId ( ) , locality . machineId ( ) , g_network - > getLocalAddress ( ) . ip ) ) ;
{
auto recruited = interf ; //ghetto! don't we all love a good #define
DUMPTOKEN ( recruited . clientInterface . reboot ) ;
2017-10-12 05:13:16 +08:00
DUMPTOKEN ( recruited . clientInterface . profiler ) ;
2017-05-26 04:48:44 +08:00
DUMPTOKEN ( recruited . tLog ) ;
DUMPTOKEN ( recruited . master ) ;
DUMPTOKEN ( recruited . masterProxy ) ;
DUMPTOKEN ( recruited . resolver ) ;
DUMPTOKEN ( recruited . storage ) ;
DUMPTOKEN ( recruited . debugQuery ) ;
DUMPTOKEN ( recruited . debugPing ) ;
DUMPTOKEN ( recruited . coordinationPing ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . setMetricsRate ) ;
DUMPTOKEN ( recruited . eventLogRequest ) ;
DUMPTOKEN ( recruited . traceBatchDumpRequest ) ;
}
try {
std : : vector < DiskStore > stores = getDiskStores ( folder ) ;
bool validateDataFiles = deleteFile ( joinPath ( folder , validationFilename ) ) ;
2017-09-16 01:57:58 +08:00
std : : vector < Future < Void > > recoveries ;
2017-05-26 04:48:44 +08:00
for ( int f = 0 ; f < stores . size ( ) ; f + + ) {
DiskStore s = stores [ f ] ;
// FIXME: Error handling
if ( s . storedComponent = = DiskStore : : Storage ) {
2017-05-27 08:43:28 +08:00
IKeyValueStore * kv = openKVStore ( s . storeType , s . filename , s . storeID , memoryLimit , false , validateDataFiles ) ;
2017-05-26 04:48:44 +08:00
Future < Void > kvClosed = kv - > onClosed ( ) ;
filesClosed . add ( kvClosed ) ;
StorageServerInterface recruited ;
recruited . uniqueID = s . storeID ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = s . storeType . toString ( ) ;
startRole ( recruited . id ( ) , interf . id ( ) , " StorageServer " , details , " Restored " ) ;
DUMPTOKEN ( recruited . getVersion ) ;
DUMPTOKEN ( recruited . getValue ) ;
DUMPTOKEN ( recruited . getKey ) ;
DUMPTOKEN ( recruited . getKeyValues ) ;
DUMPTOKEN ( recruited . getShardState ) ;
DUMPTOKEN ( recruited . waitMetrics ) ;
DUMPTOKEN ( recruited . splitMetrics ) ;
DUMPTOKEN ( recruited . getPhysicalMetrics ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . getKeyValueStoreType ) ;
DUMPTOKEN ( recruited . watchValue ) ;
2017-09-16 01:57:58 +08:00
Promise < Void > recovery ;
Future < Void > f = storageServer ( kv , recruited , dbInfo , folder , recovery ) ;
recoveries . push_back ( recovery . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
f = handleIOErrors ( f , kv , s . storeID , kvClosed ) ;
f = storageServerRollbackRebooter ( f , s . storeType , s . filename , recruited , dbInfo , folder , & filesClosed , memoryLimit ) ;
errorForwarders . add ( forwardError ( errors , " StorageServer " , recruited . id ( ) , f ) ) ;
} else if ( s . storedComponent = = DiskStore : : TLogData ) {
IKeyValueStore * kv = openKVStore ( s . storeType , s . filename , s . storeID , memoryLimit , validateDataFiles ) ;
IDiskQueue * queue = openDiskQueue (
2017-12-02 07:05:17 +08:00
joinPath ( folder , fileLogQueuePrefix . toString ( ) + s . storeID . toString ( ) + " - " ) , s . storeID , 10 * SERVER_KNOBS - > TARGET_BYTES_PER_TLOG ) ;
2017-05-26 04:48:44 +08:00
filesClosed . add ( kv - > onClosed ( ) ) ;
filesClosed . add ( queue - > onClosed ( ) ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = s . storeType . toString ( ) ;
2017-06-23 08:21:42 +08:00
startRole ( s . storeID , interf . id ( ) , " SharedTLog " , details , " Restored " ) ;
2017-05-26 04:48:44 +08:00
Promise < Void > oldLog ;
2017-09-16 01:57:58 +08:00
Promise < Void > recovery ;
Future < Void > tl = tLog ( kv , queue , dbInfo , locality , tlog . isReady ( ) ? tlogRequests : PromiseStream < InitializeTLogRequest > ( ) , s . storeID , true , oldLog , recovery ) ;
recoveries . push_back ( recovery . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
tl = handleIOErrors ( tl , kv , s . storeID ) ;
tl = handleIOErrors ( tl , queue , s . storeID ) ;
if ( tlog . isReady ( ) ) {
tlog = oldLog . getFuture ( ) | | tl ;
}
2017-06-23 08:21:42 +08:00
errorForwarders . add ( forwardError ( errors , " SharedTLog " , s . storeID , tl ) ) ;
2017-05-26 04:48:44 +08:00
}
}
std : : map < std : : string , std : : string > details ;
details [ " Locality " ] = locality . toString ( ) ;
details [ " DataFolder " ] = folder ;
details [ " StoresPresent " ] = format ( " %d " , stores . size ( ) ) ;
startRole ( interf . id ( ) , interf . id ( ) , " Worker " , details ) ;
2017-09-16 01:57:58 +08:00
Void _ = wait ( waitForAll ( recoveries ) ) ;
2017-11-15 05:57:37 +08:00
errorForwarders . add ( registrationClient ( ccInterface , interf , asyncProcessClass , initialClass , asyncIsExcluded ) ) ;
2017-09-16 01:57:58 +08:00
TraceEvent ( " RecoveriesComplete " , interf . id ( ) ) ;
2017-05-26 04:48:44 +08:00
loop choose {
when ( RebootRequest req = waitNext ( interf . clientInterface . reboot . getFuture ( ) ) ) {
state RebootRequest rebootReq = req ;
if ( rebootReq . checkData ) {
Reference < IAsyncFile > checkFile = wait ( IAsyncFileSystem : : filesystem ( ) - > open ( joinPath ( folder , validationFilename ) , IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_READWRITE , 0600 ) ) ;
Void _ = wait ( checkFile - > sync ( ) ) ;
}
2017-08-29 02:25:37 +08:00
2017-05-26 04:48:44 +08:00
if ( g_network - > isSimulated ( ) ) {
TraceEvent ( " SimulatedReboot " ) . detail ( " Deletion " , rebootReq . deleteData ) ;
if ( rebootReq . deleteData ) {
throw please_reboot_delete ( ) ;
}
throw please_reboot ( ) ;
}
else {
TraceEvent ( " ProcessReboot " ) ;
ASSERT ( ! rebootReq . deleteData ) ;
flushAndExit ( 0 ) ;
}
}
2017-10-12 05:13:16 +08:00
when ( ProfilerRequest req = waitNext ( interf . clientInterface . profiler . getFuture ( ) ) ) {
2017-10-17 07:04:09 +08:00
state ProfilerRequest profilerReq = req ;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,
// thus we instead enforce a different requirement. One can only write to a file that's
// beneath the working directory, and we remove the ability to do any symlink or ../..
// tricks by resolving all paths through `abspath` first.
try {
2017-10-13 08:49:41 +08:00
std : : string realLogDir = abspath ( SERVER_KNOBS - > LOG_DIRECTORY ) ;
std : : string realOutPath = abspath ( realLogDir + " / " + profilerReq . outputFile . toString ( ) ) ;
if ( realLogDir . size ( ) < realOutPath . size ( ) & &
strncmp ( realLogDir . c_str ( ) , realOutPath . c_str ( ) , realLogDir . size ( ) ) = = 0 ) {
profilerReq . outputFile = realOutPath ;
uncancellable ( runProfiler ( profilerReq ) ) ;
2017-10-17 07:04:09 +08:00
profilerReq . reply . send ( Void ( ) ) ;
} else {
profilerReq . reply . sendError ( client_invalid_operation ( ) ) ;
}
} catch ( Error & e ) {
profilerReq . reply . sendError ( e ) ;
}
2017-07-13 15:27:56 +08:00
}
2017-05-26 04:48:44 +08:00
when ( RecruitMasterRequest req = waitNext ( interf . master . getFuture ( ) ) ) {
MasterInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
startRole ( recruited . id ( ) , interf . id ( ) , " MasterServer " ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getRateInfo ) ;
DUMPTOKEN ( recruited . tlogRejoin ) ;
DUMPTOKEN ( recruited . changeCoordinators ) ;
DUMPTOKEN ( recruited . getCommitVersion ) ;
//printf("Recruited as masterServer\n");
Future < Void > masterProcess = masterServer ( recruited , dbInfo , ServerCoordinators ( connFile ) , req . lifetime ) ;
errorForwarders . add ( zombie ( recruited , forwardError ( errors , " MasterServer " , recruited . id ( ) , masterProcess ) ) ) ;
req . reply . send ( recruited ) ;
}
when ( InitializeTLogRequest req = waitNext ( interf . tLog . getFuture ( ) ) ) {
tlogRequests . send ( req ) ;
if ( tlog . isReady ( ) ) {
UID logId = g_random - > randomUniqueID ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " ForMaster " ] = req . recruitmentID . shortString ( ) ;
details [ " StorageEngine " ] = req . storeType . toString ( ) ;
2017-08-29 02:25:37 +08:00
2017-05-26 04:48:44 +08:00
//FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor
2017-06-23 08:21:42 +08:00
startRole ( logId , interf . id ( ) , " SharedTLog " , details ) ;
2017-05-26 04:48:44 +08:00
std : : string filename = filenameFromId ( req . storeType , folder , fileLogDataPrefix . toString ( ) , logId ) ;
IKeyValueStore * data = openKVStore ( req . storeType , filename , logId , memoryLimit ) ;
IDiskQueue * queue = openDiskQueue ( joinPath ( folder , fileLogQueuePrefix . toString ( ) + logId . toString ( ) + " - " ) , logId ) ;
filesClosed . add ( data - > onClosed ( ) ) ;
filesClosed . add ( queue - > onClosed ( ) ) ;
2017-09-16 01:57:58 +08:00
tlog = tLog ( data , queue , dbInfo , locality , tlogRequests , logId , false , Promise < Void > ( ) , Promise < Void > ( ) ) ;
2017-05-26 04:48:44 +08:00
tlog = handleIOErrors ( tlog , data , logId ) ;
tlog = handleIOErrors ( tlog , queue , logId ) ;
2017-06-23 08:21:42 +08:00
errorForwarders . add ( forwardError ( errors , " SharedTLog " , logId , tlog ) ) ;
2017-05-26 04:48:44 +08:00
}
}
when ( InitializeStorageRequest req = waitNext ( interf . storage . getFuture ( ) ) ) {
if ( ! storageCache . exists ( req . reqId ) ) {
StorageServerInterface recruited ( req . interfaceId ) ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " StorageEngine " ] = req . storeType . toString ( ) ;
startRole ( recruited . id ( ) , interf . id ( ) , " StorageServer " , details ) ;
DUMPTOKEN ( recruited . getVersion ) ;
DUMPTOKEN ( recruited . getValue ) ;
DUMPTOKEN ( recruited . getKey ) ;
DUMPTOKEN ( recruited . getKeyValues ) ;
DUMPTOKEN ( recruited . getShardState ) ;
DUMPTOKEN ( recruited . waitMetrics ) ;
DUMPTOKEN ( recruited . splitMetrics ) ;
DUMPTOKEN ( recruited . getPhysicalMetrics ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getQueuingMetrics ) ;
DUMPTOKEN ( recruited . getKeyValueStoreType ) ;
DUMPTOKEN ( recruited . watchValue ) ;
//printf("Recruited as storageServer\n");
std : : string filename = filenameFromId ( req . storeType , folder , fileStoragePrefix . toString ( ) , recruited . id ( ) ) ;
IKeyValueStore * data = openKVStore ( req . storeType , filename , recruited . id ( ) , memoryLimit ) ;
Future < Void > kvClosed = data - > onClosed ( ) ;
filesClosed . add ( kvClosed ) ;
ReplyPromise < StorageServerInterface > storageReady = req . reply ;
storageCache . set ( req . reqId , storageReady . getFuture ( ) ) ;
Future < Void > s = storageServer ( data , recruited , req . seedTag , storageReady , dbInfo , folder ) ;
s = handleIOErrors ( s , data , recruited . id ( ) , kvClosed ) ;
s = storageCache . removeOnReady ( req . reqId , s ) ;
s = storageServerRollbackRebooter ( s , req . storeType , filename , recruited , dbInfo , folder , & filesClosed , memoryLimit ) ;
errorForwarders . add ( forwardError ( errors , " StorageServer " , recruited . id ( ) , s ) ) ;
} else
forwardPromise ( req . reply , storageCache . get ( req . reqId ) ) ;
}
when ( InitializeMasterProxyRequest req = waitNext ( interf . masterProxy . getFuture ( ) ) ) {
MasterProxyInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
details [ " ForMaster " ] = req . master . id ( ) . shortString ( ) ;
startRole ( recruited . id ( ) , interf . id ( ) , " MasterProxyServer " , details ) ;
DUMPTOKEN ( recruited . commit ) ;
DUMPTOKEN ( recruited . getConsistentReadVersion ) ;
DUMPTOKEN ( recruited . getKeyServersLocations ) ;
DUMPTOKEN ( recruited . getStorageServerRejoinInfo ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
DUMPTOKEN ( recruited . getRawCommittedVersion ) ;
DUMPTOKEN ( recruited . txnState ) ;
//printf("Recruited as masterProxyServer\n");
errorForwarders . add ( zombie ( recruited , forwardError ( errors , " MasterProxyServer " , recruited . id ( ) ,
masterProxyServer ( recruited , req , dbInfo ) ) ) ) ;
req . reply . send ( recruited ) ;
}
when ( InitializeResolverRequest req = waitNext ( interf . resolver . getFuture ( ) ) ) {
ResolverInterface recruited ;
recruited . locality = locality ;
recruited . initEndpoints ( ) ;
std : : map < std : : string , std : : string > details ;
startRole ( recruited . id ( ) , interf . id ( ) , " Resolver " , details ) ;
DUMPTOKEN ( recruited . resolve ) ;
DUMPTOKEN ( recruited . metrics ) ;
DUMPTOKEN ( recruited . split ) ;
DUMPTOKEN ( recruited . waitFailure ) ;
errorForwarders . add ( zombie ( recruited , forwardError ( errors , " Resolver " , recruited . id ( ) ,
resolver ( recruited , req , dbInfo ) ) ) ) ;
req . reply . send ( recruited ) ;
}
when ( DebugQueryRequest req = waitNext ( interf . debugQuery . getFuture ( ) ) ) {
errorForwarders . add ( forwardError ( errors , " DebugQuery " , UID ( ) , debugQueryServer ( req ) ) ) ;
}
when ( CoordinationPingMessage m = waitNext ( interf . coordinationPing . getFuture ( ) ) ) {
TraceEvent ( " CoordinationPing " , interf . id ( ) ) . detail ( " CCID " , m . clusterControllerId ) . detail ( " TimeStep " , m . timeStep ) ;
}
when ( SetMetricsLogRateRequest req = waitNext ( interf . setMetricsRate . getFuture ( ) ) ) {
TraceEvent ( " LoggingRateChange " , interf . id ( ) ) . detail ( " OldDelay " , loggingDelay ) . detail ( " NewLogPS " , req . metricsLogsPerSecond ) ;
if ( req . metricsLogsPerSecond ! = 0 ) {
loggingDelay = 1.0 / req . metricsLogsPerSecond ;
loggingTrigger = Void ( ) ;
}
}
when ( EventLogRequest req = waitNext ( interf . eventLogRequest . getFuture ( ) ) ) {
Standalone < StringRef > e ;
if ( req . getLastError )
e = StringRef ( latestEventCache . getLatestError ( ) ) ;
else
e = StringRef ( latestEventCache . get ( req . eventName . toString ( ) ) ) ;
req . reply . send ( e ) ;
}
when ( TraceBatchDumpRequest req = waitNext ( interf . traceBatchDumpRequest . getFuture ( ) ) ) {
g_traceBatch . dump ( ) ;
req . reply . send ( Void ( ) ) ;
}
when ( DiskStoreRequest req = waitNext ( interf . diskStoreRequest . getFuture ( ) ) ) {
Standalone < VectorRef < UID > > ids ;
for ( DiskStore d : getDiskStores ( folder ) ) {
bool included = true ;
if ( ! req . includePartialStores ) {
if ( d . storeType = = KeyValueStoreType : : SSD_BTREE_V1 ) {
included = fileExists ( d . filename + " .fdb-wal " ) ;
}
else if ( d . storeType = = KeyValueStoreType : : SSD_BTREE_V2 ) {
included = fileExists ( d . filename + " .sqlite-wal " ) ;
}
else {
ASSERT ( d . storeType = = KeyValueStoreType : : MEMORY ) ;
included = fileExists ( d . filename + " 1.fdq " ) ;
}
if ( d . storedComponent = = DiskStore : : COMPONENT : : TLogData & & included ) {
std : : string basename = fileLogQueuePrefix . toString ( ) + d . filename . substr ( fileLogDataPrefix . size ( ) ) ;
included = fileExists ( basename + " 0.fdq " ) & & fileExists ( basename + " 1.fdq " ) ;
}
}
if ( included ) {
ids . push_back ( ids . arena ( ) , d . storeID ) ;
}
}
req . reply . send ( ids ) ;
}
when ( Void _ = wait ( loggingTrigger ) ) {
systemMonitor ( ) ;
loggingTrigger = delay ( loggingDelay , TaskFlushTrace ) ;
}
when ( Void _ = wait ( errorForwarders . getResult ( ) ) ) { }
2017-05-27 08:43:28 +08:00
when ( Void _ = wait ( handleErrors ) ) { }
2017-05-26 04:48:44 +08:00
}
} catch ( Error & err ) {
state Error e = err ;
bool ok = e . code ( ) = = error_code_please_reboot | | e . code ( ) = = error_code_actor_cancelled | | e . code ( ) = = error_code_please_reboot_delete ;
endRole ( interf . id ( ) , " Worker " , " WorkerError " , ok , e ) ;
errorForwarders . clear ( false ) ;
2017-06-03 08:56:47 +08:00
tlog = Void ( ) ;
2017-05-26 04:48:44 +08:00
if ( e . code ( ) ! = error_code_actor_cancelled ) { // We get cancelled e.g. when an entire simulation times out, but in that case we won't be restarted and don't need to wait for shutdown
stopping . send ( Void ( ) ) ;
Void _ = wait ( filesClosed . getResult ( ) ) ; // Wait for complete shutdown of KV stores
Void _ = wait ( delay ( 0.0 ) ) ; //Unwind the callstack to make sure that IAsyncFile references are all gone
TraceEvent ( SevInfo , " WorkerShutdownComplete " , interf . id ( ) ) ;
}
throw e ;
}
}
ACTOR Future < Void > extractClusterInterface ( Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > a , Reference < AsyncVar < Optional < ClusterInterface > > > b ) {
loop {
if ( a - > get ( ) . present ( ) )
b - > set ( a - > get ( ) . get ( ) . clientInterface ) ;
else
b - > set ( Optional < ClusterInterface > ( ) ) ;
Void _ = wait ( a - > onChange ( ) ) ;
}
}
static std : : set < int > const & normalWorkerErrors ( ) {
static std : : set < int > s ;
if ( s . empty ( ) ) {
s . insert ( error_code_please_reboot ) ;
s . insert ( error_code_please_reboot_delete ) ;
}
return s ;
}
ACTOR Future < Void > fileNotFoundToNever ( Future < Void > f ) {
try {
Void _ = wait ( f ) ;
return Void ( ) ;
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_file_not_found ) {
TraceEvent ( SevWarn , " ClusterCoordinatorFailed " ) . error ( e ) ;
return Never ( ) ;
}
throw ;
}
}
ACTOR Future < Void > printTimeout ( ) {
Void _ = wait ( delay ( 5 ) ) ;
if ( ! g_network - > isSimulated ( ) ) {
fprintf ( stderr , " Warning: FDBD has not joined the cluster after 5 seconds. \n " ) ;
fprintf ( stderr , " Check configuration and availability using the 'status' command with the fdbcli \n " ) ;
}
return Void ( ) ;
}
ACTOR Future < Void > printOnFirstConnected ( Reference < AsyncVar < Optional < ClusterInterface > > > ci ) {
state Future < Void > timeoutFuture = printTimeout ( ) ;
loop {
choose {
when ( Void _ = wait ( ci - > get ( ) . present ( ) ? IFailureMonitor : : failureMonitor ( ) . onStateEqual ( ci - > get ( ) . get ( ) . openDatabase . getEndpoint ( ) , FailureStatus ( false ) ) : Never ( ) ) ) {
printf ( " FDBD joined cluster. \n " ) ;
TraceEvent ( " FDBDConnected " ) ;
return Void ( ) ;
}
when ( Void _ = wait ( ci - > onChange ( ) ) ) { }
}
}
}
ACTOR Future < Void > fdbd (
Reference < ClusterConnectionFile > connFile ,
LocalityData localities ,
ProcessClass processClass ,
std : : string dataFolder ,
std : : string coordFolder ,
int64_t memoryLimit ,
std : : string metricsConnFile ,
std : : string metricsPrefix )
{
2017-05-27 08:43:28 +08:00
try {
ServerCoordinators coordinators ( connFile ) ;
TraceEvent ( " StartingFDBD " ) . detailext ( " ZoneID " , localities . zoneId ( ) ) . detailext ( " machineId " , localities . machineId ( ) ) . detail ( " DiskPath " , dataFolder ) . detail ( " CoordPath " , coordFolder ) ;
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > cc ( new AsyncVar < Optional < ClusterControllerFullInterface > > ) ;
Reference < AsyncVar < Optional < ClusterInterface > > > ci ( new AsyncVar < Optional < ClusterInterface > > ) ;
2017-08-29 05:41:04 +08:00
Reference < AsyncVar < ProcessClass > > asyncProcessClass ( new AsyncVar < ProcessClass > ( ProcessClass ( processClass . classType ( ) , ProcessClass : : CommandLineSource ) ) ) ;
2017-11-15 05:57:37 +08:00
Reference < AsyncVar < bool > > asyncIsExcluded ( new AsyncVar < bool > ( false ) ) ;
2017-05-27 08:43:28 +08:00
vector < Future < Void > > v ;
if ( coordFolder . size ( ) )
v . push_back ( fileNotFoundToNever ( coordinationServer ( coordFolder ) ) ) ; //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
2017-11-15 05:57:37 +08:00
v . push_back ( reportErrors ( processClass = = ProcessClass : : TesterClass ? monitorLeader ( connFile , cc ) : clusterController ( connFile , cc , asyncProcessClass , asyncIsExcluded ) , " clusterController " ) ) ;
2017-05-27 08:43:28 +08:00
v . push_back ( reportErrors ( extractClusterInterface ( cc , ci ) , " extractClusterInterface " ) ) ;
v . push_back ( reportErrors ( failureMonitorClient ( ci , true ) , " failureMonitorClient " ) ) ;
2017-11-15 05:57:37 +08:00
v . push_back ( reportErrorsExcept ( workerServer ( connFile , cc , localities , asyncProcessClass , processClass , asyncIsExcluded , dataFolder , memoryLimit , metricsConnFile , metricsPrefix ) , " workerServer " , UID ( ) , & normalWorkerErrors ( ) ) ) ;
2017-05-27 08:43:28 +08:00
state Future < Void > firstConnect = reportErrors ( printOnFirstConnected ( ci ) , " ClusterFirstConnectedError " ) ;
Void _ = wait ( quorum ( v , 1 ) ) ;
ASSERT ( false ) ; // None of these actors should terminate normally
throw internal_error ( ) ;
} catch ( Error & e ) {
Error err = checkIOTimeout ( e ) ;
throw err ;
}
2017-05-26 04:48:44 +08:00
}