diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 8e420bbd0b..2fe17c4a8c 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -124,6 +124,12 @@ fdb_error_t fdb_stop_network() { CATCH_AND_RETURN( API->stopNetwork(); ); } +extern "C" DLLEXPORT +fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter) { + CATCH_AND_RETURN( API->addNetworkThreadCompletionHook(hook, hook_parameter); ); +} + + extern "C" DLLEXPORT FDBFuture* fdb_cluster_configure_database( FDBCluster* c, int config_type, int config_mode, uint8_t const* db_name, diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 9a9a60578b..563cea5a2e 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -89,6 +89,8 @@ extern "C" { DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_stop_network(); + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter); + #pragma pack(push, 4) typedef struct keyvalue { const void* key; diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 99bdfc3703..6056950ce7 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -38,8 +38,18 @@ #error Missing thread local storage #endif +static JavaVM* g_jvm = 0; static thread_local JNIEnv* g_thread_jenv = 0; // Defined for the network thread once it is running, and for any thread that has called registerCallback static thread_local jmethodID g_IFutureCallback_call_methodID = 0; +static thread_local bool is_external = false; + +void detachIfExternalThread(void *ignore) { + if(is_external && g_thread_jenv != 0) { + g_thread_jenv = 0; + g_IFutureCallback_call_methodID = 0; + g_jvm->DetachCurrentThread(); + } +} void throwOutOfMem(JNIEnv *jenv) { const char *className = "java/lang/OutOfMemoryError"; @@ -103,12 +113,6 @@ void throwParamNotNull(JNIEnv *jenv) { extern "C" { #endif -static void callCallback( FDBFuture* f, void* data ) { - jobject callback = (jobject)data; - g_thread_jenv->CallVoidMethod( callback, g_IFutureCallback_call_methodID ); - g_thread_jenv->DeleteGlobalRef(callback); -} - // If the methods are not found, exceptions are thrown and this will return false. // Returns TRUE on success, false otherwise. static bool findCallbackMethods(JNIEnv *jenv) { @@ -123,6 +127,27 @@ static bool findCallbackMethods(JNIEnv *jenv) { return true; } +static void callCallback( FDBFuture* f, void* data ) { + if (g_thread_jenv == 0) { + // We are on an external thread and must attach to the JVM. + // The shutdown hook will later detach this thread. + is_external = true; + if( g_jvm != 0 && g_jvm->AttachCurrentThreadAsDaemon((void **) &g_thread_jenv, JNI_NULL) == JNI_OK ) { + if( !findCallbackMethods( g_thread_jenv ) ) { + g_thread_jenv->FatalError("FDB: Could not find callback method.\n"); + } + } else { + // Can't call FatalError, because we don't have a pointer to the jenv... + // There will be a segmentation fault from the attempt to call the callback method. + fprintf(stderr, "FDB: Could not attach external client thread to the JVM as daemon.\n"); + } + } + + jobject callback = (jobject)data; + g_thread_jenv->CallVoidMethod( callback, g_IFutureCallback_call_methodID ); + g_thread_jenv->DeleteGlobalRef(callback); +} + // Attempts to throw 't', attempts to shut down the JVM if this fails. void safeThrow( JNIEnv *jenv, jthrowable t ) { if( jenv->Throw( t ) != 0 ) { @@ -1035,6 +1060,11 @@ JNIEXPORT void JNICALL Java_com_apple_cie_foundationdb_FDB_Network_1run(JNIEnv * return; } + fdb_error_t hookErr = fdb_add_network_thread_completion_hook( &detachIfExternalThread, NULL ); + if( hookErr ) { + safeThrow( jenv, getThrowable( jenv, hookErr ) ); + } + fdb_error_t err = fdb_run_network(); if( err ) { safeThrow( jenv, getThrowable( jenv, err ) ); @@ -1048,6 +1078,11 @@ JNIEXPORT void JNICALL Java_com_apple_cie_foundationdb_FDB_Network_1stop(JNIEnv } } +jint JNI_OnLoad(JavaVM *vm, void *reserved) { + g_jvm = vm; + return JNI_VERSION_1_1; +} + #ifdef __cplusplus } #endif diff --git a/bindings/java/src-completable/test/com/apple/cie/foundationdb/test/TesterArgs.java b/bindings/java/src-completable/test/com/apple/cie/foundationdb/test/TesterArgs.java index a2b27457b2..c0c004c32d 100644 --- a/bindings/java/src-completable/test/com/apple/cie/foundationdb/test/TesterArgs.java +++ b/bindings/java/src-completable/test/com/apple/cie/foundationdb/test/TesterArgs.java @@ -106,7 +106,7 @@ public class TesterArgs { for (j = i + 1; j < args.length && args[j].charAt(0) != '-'; j++) { testsToRun.add(args[j]); } - i = j; + i = j - 1; } else { System.out.println("No tests specified with argument " + arg + "\n"); printUsage(); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 0700b6ce4b..6103e22003 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1134,12 +1134,25 @@ void MultiVersionApi::setupNetwork() { THREAD_FUNC_RETURN runNetworkThread(void *param) { try { - ((IClientApi*)param)->runNetwork(); + ((ClientInfo*)param)->api->runNetwork(); } catch(Error &e) { TraceEvent(SevError, "RunNetworkError").error(e); } + std::vector> &hooks = ((ClientInfo*)param)->threadCompletionHooks; + for(auto &hook : hooks) { + try { + hook.first(hook.second); + } + catch(Error &e) { + TraceEvent(SevError, "NetworkShutdownHookError").error(e); + } + catch(...) { + TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()); + } + } + THREAD_RETURN; } @@ -1156,12 +1169,34 @@ void MultiVersionApi::runNetwork() { if(!bypassMultiClientApi) { runOnExternalClients([&handles](Reference client) { if(client->external) { - handles.push_back(g_network->startThread(&runNetworkThread, client->api)); + handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr())); } }); } - localClient->api->runNetwork(); + Error *runErr = NULL; + try { + localClient->api->runNetwork(); + } + catch(Error &e) { + runErr = &e; + } + + for(auto &hook : localClient->threadCompletionHooks) { + try { + hook.first(hook.second); + } + catch(Error &e) { + TraceEvent(SevError, "NetworkShutdownHookError").error(e); + } + catch(...) { + TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()); + } + } + + if(runErr != NULL) { + throw *runErr; + } for(auto h : handles) { waitThread(h); @@ -1185,6 +1220,24 @@ void MultiVersionApi::stopNetwork() { } } +void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter) { + lock.enter(); + if(!networkSetup) { + lock.leave(); + throw network_not_setup(); + } + lock.leave(); + + auto hookPair = std::pair(hook, hook_parameter); + threadCompletionHooks.push_back(hookPair); + + if(!bypassMultiClientApi) { + for( auto it : externalClients ) { + it.second->threadCompletionHooks.push_back(hookPair); + } + } +} + ThreadFuture> MultiVersionApi::createCluster(const char *clusterFilePath) { lock.enter(); if(!networkSetup) { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 7ca1379189..a8ba31b387 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -320,6 +320,7 @@ struct ClientInfo : ThreadSafeReferenceCounted { std::string libPath; bool external; bool failed; + std::vector> threadCompletionHooks; ClientInfo() : protocolVersion(0), api(NULL), external(false), failed(true) {} ClientInfo(IClientApi *api) : protocolVersion(0), api(api), libPath("internal"), external(false), failed(false) {} @@ -402,6 +403,7 @@ public: void setupNetwork(); void runNetwork(); void stopNetwork(); + void addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter); ThreadFuture> createCluster(const char *clusterFilePath); static MultiVersionApi* api; @@ -441,7 +443,8 @@ private: std::vector>>> options; std::map>> setEnvOptions; volatile bool envOptionsLoaded; + std::vector> threadCompletionHooks; }; -#endif \ No newline at end of file +#endif