Merge pull request #131 from cie/33300740-with-shutdown-hooks

<rdar://problem/33300740> Java: support callbacks from external multi-version client threads
This commit is contained in:
A.J. Beamon 2017-10-04 09:17:25 -07:00 committed by GitHub Enterprise
commit d886b95628
6 changed files with 110 additions and 11 deletions

View File

@ -124,6 +124,12 @@ fdb_error_t fdb_stop_network() {
CATCH_AND_RETURN( API->stopNetwork(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter) {
CATCH_AND_RETURN( API->addNetworkThreadCompletionHook(hook, hook_parameter); );
}
extern "C" DLLEXPORT
FDBFuture* fdb_cluster_configure_database( FDBCluster* c, int config_type,
int config_mode, uint8_t const* db_name,

View File

@ -89,6 +89,8 @@ extern "C" {
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_stop_network();
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter);
#pragma pack(push, 4)
typedef struct keyvalue {
const void* key;

View File

@ -38,8 +38,18 @@
#error Missing thread local storage
#endif
static JavaVM* g_jvm = 0;
static thread_local JNIEnv* g_thread_jenv = 0; // Defined for the network thread once it is running, and for any thread that has called registerCallback
static thread_local jmethodID g_IFutureCallback_call_methodID = 0;
static thread_local bool is_external = false;
void detachIfExternalThread(void *ignore) {
if(is_external && g_thread_jenv != 0) {
g_thread_jenv = 0;
g_IFutureCallback_call_methodID = 0;
g_jvm->DetachCurrentThread();
}
}
void throwOutOfMem(JNIEnv *jenv) {
const char *className = "java/lang/OutOfMemoryError";
@ -103,12 +113,6 @@ void throwParamNotNull(JNIEnv *jenv) {
extern "C" {
#endif
static void callCallback( FDBFuture* f, void* data ) {
jobject callback = (jobject)data;
g_thread_jenv->CallVoidMethod( callback, g_IFutureCallback_call_methodID );
g_thread_jenv->DeleteGlobalRef(callback);
}
// If the methods are not found, exceptions are thrown and this will return false.
// Returns TRUE on success, false otherwise.
static bool findCallbackMethods(JNIEnv *jenv) {
@ -123,6 +127,27 @@ static bool findCallbackMethods(JNIEnv *jenv) {
return true;
}
static void callCallback( FDBFuture* f, void* data ) {
if (g_thread_jenv == 0) {
// We are on an external thread and must attach to the JVM.
// The shutdown hook will later detach this thread.
is_external = true;
if( g_jvm != 0 && g_jvm->AttachCurrentThreadAsDaemon((void **) &g_thread_jenv, JNI_NULL) == JNI_OK ) {
if( !findCallbackMethods( g_thread_jenv ) ) {
g_thread_jenv->FatalError("FDB: Could not find callback method.\n");
}
} else {
// Can't call FatalError, because we don't have a pointer to the jenv...
// There will be a segmentation fault from the attempt to call the callback method.
fprintf(stderr, "FDB: Could not attach external client thread to the JVM as daemon.\n");
}
}
jobject callback = (jobject)data;
g_thread_jenv->CallVoidMethod( callback, g_IFutureCallback_call_methodID );
g_thread_jenv->DeleteGlobalRef(callback);
}
// Attempts to throw 't', attempts to shut down the JVM if this fails.
void safeThrow( JNIEnv *jenv, jthrowable t ) {
if( jenv->Throw( t ) != 0 ) {
@ -1035,6 +1060,11 @@ JNIEXPORT void JNICALL Java_com_apple_cie_foundationdb_FDB_Network_1run(JNIEnv *
return;
}
fdb_error_t hookErr = fdb_add_network_thread_completion_hook( &detachIfExternalThread, NULL );
if( hookErr ) {
safeThrow( jenv, getThrowable( jenv, hookErr ) );
}
fdb_error_t err = fdb_run_network();
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
@ -1048,6 +1078,11 @@ JNIEXPORT void JNICALL Java_com_apple_cie_foundationdb_FDB_Network_1stop(JNIEnv
}
}
jint JNI_OnLoad(JavaVM *vm, void *reserved) {
g_jvm = vm;
return JNI_VERSION_1_1;
}
#ifdef __cplusplus
}
#endif

View File

@ -106,7 +106,7 @@ public class TesterArgs {
for (j = i + 1; j < args.length && args[j].charAt(0) != '-'; j++) {
testsToRun.add(args[j]);
}
i = j;
i = j - 1;
} else {
System.out.println("No tests specified with argument " + arg + "\n");
printUsage();

View File

@ -1134,12 +1134,25 @@ void MultiVersionApi::setupNetwork() {
THREAD_FUNC_RETURN runNetworkThread(void *param) {
try {
((IClientApi*)param)->runNetwork();
((ClientInfo*)param)->api->runNetwork();
}
catch(Error &e) {
TraceEvent(SevError, "RunNetworkError").error(e);
}
std::vector<std::pair<void (*)(void*), void*>> &hooks = ((ClientInfo*)param)->threadCompletionHooks;
for(auto &hook : hooks) {
try {
hook.first(hook.second);
}
catch(Error &e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
}
catch(...) {
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
}
}
THREAD_RETURN;
}
@ -1156,12 +1169,34 @@ void MultiVersionApi::runNetwork() {
if(!bypassMultiClientApi) {
runOnExternalClients([&handles](Reference<ClientInfo> client) {
if(client->external) {
handles.push_back(g_network->startThread(&runNetworkThread, client->api));
handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
}
});
}
localClient->api->runNetwork();
Error *runErr = NULL;
try {
localClient->api->runNetwork();
}
catch(Error &e) {
runErr = &e;
}
for(auto &hook : localClient->threadCompletionHooks) {
try {
hook.first(hook.second);
}
catch(Error &e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
}
catch(...) {
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
}
}
if(runErr != NULL) {
throw *runErr;
}
for(auto h : handles) {
waitThread(h);
@ -1185,6 +1220,24 @@ void MultiVersionApi::stopNetwork() {
}
}
void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter) {
lock.enter();
if(!networkSetup) {
lock.leave();
throw network_not_setup();
}
lock.leave();
auto hookPair = std::pair<void (*)(void*), void*>(hook, hook_parameter);
threadCompletionHooks.push_back(hookPair);
if(!bypassMultiClientApi) {
for( auto it : externalClients ) {
it.second->threadCompletionHooks.push_back(hookPair);
}
}
}
ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clusterFilePath) {
lock.enter();
if(!networkSetup) {

View File

@ -320,6 +320,7 @@ struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
std::string libPath;
bool external;
bool failed;
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
ClientInfo() : protocolVersion(0), api(NULL), external(false), failed(true) {}
ClientInfo(IClientApi *api) : protocolVersion(0), api(api), libPath("internal"), external(false), failed(false) {}
@ -402,6 +403,7 @@ public:
void setupNetwork();
void runNetwork();
void stopNetwork();
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter);
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
static MultiVersionApi* api;
@ -441,7 +443,8 @@ private:
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;
std::map<FDBNetworkOptions::Option, std::set<Standalone<StringRef>>> setEnvOptions;
volatile bool envOptionsLoaded;
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
};
#endif
#endif