2017-05-26 04:48:44 +08:00
/*
* ThreadSafeTransaction . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbclient/ThreadSafeTransaction.h"
# include "fdbclient/ReadYourWrites.h"
# include "fdbclient/DatabaseContext.h"
2019-02-09 04:24:32 +08:00
# if defined(CMAKE_BUILD) || !defined(WIN32)
2019-02-07 10:16:54 +08:00
# include "versions.h"
2019-02-09 04:24:32 +08:00
# endif
2017-05-26 04:48:44 +08:00
# include <new>
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't call addRef (e.g. C API follows this).
// Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any of these functions.
2018-09-22 06:58:14 +08:00
ThreadFuture < Void > ThreadSafeDatabase : : onConnected ( ) {
2019-03-15 01:26:22 +08:00
DatabaseContext * db = this - > db ;
return onMainThread ( [ db ] ( ) - > Future < Void > {
2018-09-22 06:58:14 +08:00
db - > checkDeferredError ( ) ;
return db - > onConnected ( ) ;
2017-05-26 04:48:44 +08:00
} ) ;
}
ThreadFuture < Reference < IDatabase > > ThreadSafeDatabase : : createFromExistingDatabase ( Database db ) {
return onMainThread ( [ db ] ( ) {
db - > checkDeferredError ( ) ;
2018-09-27 01:27:55 +08:00
DatabaseContext * cx = db . getPtr ( ) ;
cx - > addref ( ) ;
return Future < Reference < IDatabase > > ( Reference < IDatabase > ( new ThreadSafeDatabase ( cx ) ) ) ;
2018-09-22 06:58:14 +08:00
} ) ;
}
2017-05-26 04:48:44 +08:00
Reference < ITransaction > ThreadSafeDatabase : : createTransaction ( ) {
2019-03-26 07:11:50 +08:00
return Reference < ITransaction > ( new ThreadSafeTransaction ( db ) ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeDatabase : : setOption ( FDBDatabaseOptions : : Option option , Optional < StringRef > value ) {
2019-07-03 06:42:53 +08:00
auto itr = FDBDatabaseOptions : : optionInfo . find ( option ) ;
if ( itr ! = FDBDatabaseOptions : : optionInfo . end ( ) ) {
TraceEvent ( " SetDatabaseOption " ) . detail ( " Option " , itr - > second . name ) ;
}
else {
TraceEvent ( " UnknownDatabaseOption " ) . detail ( " Option " , option ) ;
throw invalid_option ( ) ;
}
2019-03-15 01:26:22 +08:00
DatabaseContext * db = this - > db ;
2017-05-26 04:48:44 +08:00
Standalone < Optional < StringRef > > passValue = value ;
2019-06-29 04:24:32 +08:00
// ThreadSafeDatabase is not allowed to do anything with options except pass them through to RYW.
2019-03-15 01:26:22 +08:00
onMainThreadVoid ( [ db , option , passValue ] ( ) {
db - > checkDeferredError ( ) ;
db - > setOption ( option , passValue . contents ( ) ) ;
} , & db - > deferredError ) ;
2018-09-27 01:27:55 +08:00
}
ThreadSafeDatabase : : ThreadSafeDatabase ( std : : string connFilename , int apiVersion ) {
2019-07-20 02:16:30 +08:00
ClusterConnectionFile * connFile = new ClusterConnectionFile ( ClusterConnectionFile : : lookupClusterFileName ( connFilename ) . first ) ;
2019-03-15 01:26:22 +08:00
// Allocate memory for the Database from this thread (so the pointer is known for subsequent method calls)
// but run its constructor on the main thread
DatabaseContext * db = this - > db = DatabaseContext : : allocateOnForeignThread ( ) ;
onMainThreadVoid ( [ db , connFile , apiVersion ] ( ) {
2018-10-03 06:28:46 +08:00
try {
2019-07-20 02:16:30 +08:00
Database : : createDatabase ( Reference < ClusterConnectionFile > ( connFile ) , apiVersion , false , LocalityData ( ) , db ) . extractPtr ( ) ;
2018-10-03 06:28:46 +08:00
}
catch ( Error & e ) {
2019-03-15 01:26:22 +08:00
new ( db ) DatabaseContext ( e ) ;
2018-10-03 06:28:46 +08:00
}
catch ( . . . ) {
2019-03-15 01:26:22 +08:00
new ( db ) DatabaseContext ( unknown_error ( ) ) ;
2018-10-03 06:28:46 +08:00
}
2018-09-27 01:27:55 +08:00
} , NULL ) ;
2017-05-26 04:48:44 +08:00
}
ThreadSafeDatabase : : ~ ThreadSafeDatabase ( ) {
2019-01-11 06:53:42 +08:00
DatabaseContext * db = this - > db ;
onMainThreadVoid ( [ db ] ( ) { db - > delref ( ) ; } , NULL ) ;
2017-05-26 04:48:44 +08:00
}
2019-03-26 07:11:50 +08:00
ThreadSafeTransaction : : ThreadSafeTransaction ( DatabaseContext * cx ) {
2017-05-26 04:48:44 +08:00
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
// but run its constructor on the main thread
// It looks strange that the DatabaseContext::addref is deferred by the onMainThreadVoid call, but it is safe
// because the reference count of the DatabaseContext is solely managed from the main thread. If cx is destructed
// immediately after this call, it will defer the DatabaseContext::delref (and onMainThread preserves the order of
// these operations).
ReadYourWritesTransaction * tr = this - > tr = ReadYourWritesTransaction : : allocateOnForeignThread ( ) ;
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
2019-03-26 07:11:50 +08:00
onMainThreadVoid (
[ tr , cx ] ( ) {
cx - > addref ( ) ;
new ( tr ) ReadYourWritesTransaction ( Database ( cx ) ) ;
} ,
NULL ) ;
2017-05-26 04:48:44 +08:00
}
ThreadSafeTransaction : : ~ ThreadSafeTransaction ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
if ( tr )
onMainThreadVoid ( [ tr ] ( ) { tr - > delref ( ) ; } , NULL ) ;
}
void ThreadSafeTransaction : : cancel ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
onMainThreadVoid ( [ tr ] ( ) { tr - > cancel ( ) ; } , NULL ) ;
}
void ThreadSafeTransaction : : setVersion ( Version v ) {
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , v ] ( ) { tr - > setVersion ( v ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
ThreadFuture < Version > ThreadSafeTransaction : : getReadVersion ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr ] ( ) - > Future < Version > {
tr - > checkDeferredError ( ) ;
return tr - > getReadVersion ( ) ;
} ) ;
}
ThreadFuture < Optional < Value > > ThreadSafeTransaction : : get ( const KeyRef & key , bool snapshot ) {
Key k = key ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , k , snapshot ] ( ) - > Future < Optional < Value > > {
tr - > checkDeferredError ( ) ;
return tr - > get ( k , snapshot ) ;
} ) ;
}
ThreadFuture < Key > ThreadSafeTransaction : : getKey ( const KeySelectorRef & key , bool snapshot ) {
KeySelector k = key ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , k , snapshot ] ( ) - > Future < Key > {
tr - > checkDeferredError ( ) ;
return tr - > getKey ( k , snapshot ) ;
} ) ;
}
ThreadFuture < Standalone < RangeResultRef > > ThreadSafeTransaction : : getRange ( const KeySelectorRef & begin , const KeySelectorRef & end , int limit , bool snapshot , bool reverse ) {
KeySelector b = begin ;
KeySelector e = end ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , b , e , limit , snapshot , reverse ] ( ) - > Future < Standalone < RangeResultRef > > {
tr - > checkDeferredError ( ) ;
return tr - > getRange ( b , e , limit , snapshot , reverse ) ;
} ) ;
}
ThreadFuture < Standalone < RangeResultRef > > ThreadSafeTransaction : : getRange ( const KeySelectorRef & begin , const KeySelectorRef & end , GetRangeLimits limits , bool snapshot , bool reverse ) {
KeySelector b = begin ;
KeySelector e = end ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , b , e , limits , snapshot , reverse ] ( ) - > Future < Standalone < RangeResultRef > > {
tr - > checkDeferredError ( ) ;
return tr - > getRange ( b , e , limits , snapshot , reverse ) ;
} ) ;
}
ThreadFuture < Standalone < VectorRef < const char * > > > ThreadSafeTransaction : : getAddressesForKey ( const KeyRef & key ) {
Key k = key ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , k ] ( ) - > Future < Standalone < VectorRef < const char * > > > {
tr - > checkDeferredError ( ) ;
return tr - > getAddressesForKey ( k ) ;
} ) ;
}
void ThreadSafeTransaction : : addReadConflictRange ( const KeyRangeRef & keys ) {
KeyRange r = keys ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , r ] ( ) { tr - > addReadConflictRange ( r ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : makeSelfConflicting ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr ] ( ) { tr - > makeSelfConflicting ( ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : atomicOp ( const KeyRef & key , const ValueRef & value , uint32_t operationType ) {
Key k = key ;
Value v = value ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , k , v , operationType ] ( ) { tr - > atomicOp ( k , v , operationType ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : set ( const KeyRef & key , const ValueRef & value ) {
Key k = key ;
Value v = value ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , k , v ] ( ) { tr - > set ( k , v ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : clear ( const KeyRangeRef & range ) {
KeyRange r = range ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , r ] ( ) { tr - > clear ( r ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : clear ( const KeyRef & begin , const KeyRef & end ) {
Key b = begin ;
Key e = end ;
ReadYourWritesTransaction * tr = this - > tr ;
onMainThreadVoid ( [ tr , b , e ] ( ) {
if ( b > e )
throw inverted_range ( ) ;
tr - > clear ( KeyRangeRef ( b , e ) ) ;
2018-09-22 06:58:14 +08:00
} , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
void ThreadSafeTransaction : : clear ( const KeyRef & key ) {
Key k = key ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , k ] ( ) { tr - > clear ( k ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
ThreadFuture < Void > ThreadSafeTransaction : : watch ( const KeyRef & key ) {
Key k = key ;
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , k ] ( ) - > Future < Void > {
tr - > checkDeferredError ( ) ;
return tr - > watch ( k ) ;
} ) ;
}
void ThreadSafeTransaction : : addWriteConflictRange ( const KeyRangeRef & keys ) {
KeyRange r = keys ;
ReadYourWritesTransaction * tr = this - > tr ;
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , r ] ( ) { tr - > addWriteConflictRange ( r ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
ThreadFuture < Void > ThreadSafeTransaction : : commit ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr ] ( ) - > Future < Void > {
tr - > checkDeferredError ( ) ;
return tr - > commit ( ) ;
} ) ;
}
Version ThreadSafeTransaction : : getCommittedVersion ( ) {
// This should be thread safe when called legally, but it is fragile
2019-06-26 07:32:27 +08:00
return tr - > getCommittedVersion ( ) ;
}
2019-06-29 01:15:37 +08:00
ThreadFuture < int64_t > ThreadSafeTransaction : : getApproximateSize ( ) {
2019-07-06 01:09:13 +08:00
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr ] ( ) - > Future < int64_t > { return tr - > getApproximateSize ( ) ; } ) ;
2017-05-26 04:48:44 +08:00
}
ThreadFuture < Standalone < StringRef > > ThreadSafeTransaction : : getVersionstamp ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr ] ( ) - > Future < Standalone < StringRef > > {
return tr - > getVersionstamp ( ) ;
} ) ;
}
void ThreadSafeTransaction : : setOption ( FDBTransactionOptions : : Option option , Optional < StringRef > value ) {
2019-07-11 09:48:54 +08:00
auto itr = FDBTransactionOptions : : optionInfo . find ( option ) ;
if ( itr = = FDBTransactionOptions : : optionInfo . end ( ) ) {
TraceEvent ( " UnknownTransactionOption " ) . detail ( " Option " , option ) ;
throw invalid_option ( ) ;
}
2017-05-26 04:48:44 +08:00
ReadYourWritesTransaction * tr = this - > tr ;
Standalone < Optional < StringRef > > passValue = value ;
2019-06-29 04:24:32 +08:00
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
2018-09-22 06:58:14 +08:00
onMainThreadVoid ( [ tr , option , passValue ] ( ) { tr - > setOption ( option , passValue . contents ( ) ) ; } , & tr - > deferredError ) ;
2017-05-26 04:48:44 +08:00
}
ThreadFuture < Void > ThreadSafeTransaction : : checkDeferredError ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr ] ( ) {
try {
tr - > checkDeferredError ( ) ;
} catch ( Error & e ) {
2018-09-22 06:58:14 +08:00
tr - > deferredError = Error ( ) ;
2017-05-26 04:48:44 +08:00
return Future < Void > ( e ) ;
}
return Future < Void > ( Void ( ) ) ;
} ) ;
}
ThreadFuture < Void > ThreadSafeTransaction : : onError ( Error const & e ) {
ReadYourWritesTransaction * tr = this - > tr ;
return onMainThread ( [ tr , e ] ( ) { return tr - > onError ( e ) ; } ) ;
}
2019-01-26 08:49:59 +08:00
void ThreadSafeTransaction : : operator = ( ThreadSafeTransaction & & r ) BOOST_NOEXCEPT {
2017-05-26 04:48:44 +08:00
tr = r . tr ;
r . tr = NULL ;
}
2019-01-26 08:49:59 +08:00
ThreadSafeTransaction : : ThreadSafeTransaction ( ThreadSafeTransaction & & r ) BOOST_NOEXCEPT {
2017-05-26 04:48:44 +08:00
tr = r . tr ;
r . tr = NULL ;
}
void ThreadSafeTransaction : : reset ( ) {
ReadYourWritesTransaction * tr = this - > tr ;
onMainThreadVoid ( [ tr ] ( ) { tr - > reset ( ) ; } , NULL ) ;
}
2019-11-16 04:26:51 +08:00
extern const char * getSourceVersion ( ) ;
2017-05-26 04:48:44 +08:00
2019-11-16 04:26:51 +08:00
ThreadSafeApi : : ThreadSafeApi ( ) : apiVersion ( - 1 ) , clientVersion ( format ( " %s,%s,%llx " , FDB_VT_VERSION , getSourceVersion ( ) , currentProtocolVersion ) ) , transportId ( 0 ) { }
2017-05-26 04:48:44 +08:00
void ThreadSafeApi : : selectApiVersion ( int apiVersion ) {
this - > apiVersion = apiVersion ;
}
const char * ThreadSafeApi : : getClientVersion ( ) {
// There is only one copy of the ThreadSafeAPI, and it never gets deleted. Also, clientVersion is never modified.
return clientVersion . c_str ( ) ;
}
void ThreadSafeApi : : setNetworkOption ( FDBNetworkOptions : : Option option , Optional < StringRef > value ) {
if ( option = = FDBNetworkOptions : : EXTERNAL_CLIENT_TRANSPORT_ID ) {
if ( value . present ( ) ) {
transportId = std : : stoull ( value . get ( ) . toString ( ) . c_str ( ) ) ;
}
}
else {
: : setNetworkOption ( option , value ) ;
}
}
void ThreadSafeApi : : setupNetwork ( ) {
: : setupNetwork ( transportId ) ;
}
void ThreadSafeApi : : runNetwork ( ) {
2018-05-09 07:33:43 +08:00
Optional < Error > runErr ;
try {
: : runNetwork ( ) ;
}
catch ( Error & e ) {
runErr = e ;
}
for ( auto & hook : threadCompletionHooks ) {
try {
hook . first ( hook . second ) ;
}
catch ( Error & e ) {
TraceEvent ( SevError , " NetworkShutdownHookError " ) . error ( e ) ;
}
catch ( . . . ) {
TraceEvent ( SevError , " NetworkShutdownHookError " ) . error ( unknown_error ( ) ) ;
}
}
if ( runErr . present ( ) ) {
throw runErr . get ( ) ;
}
2017-05-26 04:48:44 +08:00
}
void ThreadSafeApi : : stopNetwork ( ) {
: : stopNetwork ( ) ;
}
2018-09-27 01:27:55 +08:00
Reference < IDatabase > ThreadSafeApi : : createDatabase ( const char * clusterFilePath ) {
return Reference < IDatabase > ( new ThreadSafeDatabase ( clusterFilePath , apiVersion ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-05-10 03:28:51 +08:00
void ThreadSafeApi : : addNetworkThreadCompletionHook ( void ( * hook ) ( void * ) , void * hookParameter ) {
2018-05-09 07:33:43 +08:00
if ( ! g_network ) {
throw network_not_setup ( ) ;
}
MutexHolder holder ( lock ) ; // We could use the network thread to protect this action, but then we can't guarantee upon return that the hook is set.
2018-05-10 03:28:51 +08:00
threadCompletionHooks . push_back ( std : : make_pair ( hook , hookParameter ) ) ;
2018-05-09 07:33:43 +08:00
}
2017-05-26 04:48:44 +08:00
IClientApi * ThreadSafeApi : : api = new ThreadSafeApi ( ) ;