added add_shutdown_hook to fdb_c api and used it to detach java threads where appropriate

This commit is contained in:
Alec Grieser 2017-07-25 15:57:26 -07:00
parent 5a33d102a9
commit 83bf2ee312
6 changed files with 80 additions and 8 deletions

View File

@ -124,6 +124,12 @@ fdb_error_t fdb_stop_network() {
CATCH_AND_RETURN( API->stopNetwork(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_add_shutdown_hook(void (*hook)(void)) {
CATCH_AND_RETURN( API->addShutdownHook(hook); );
}
extern "C" DLLEXPORT
FDBFuture* fdb_cluster_configure_database( FDBCluster* c, int config_type,
int config_mode, uint8_t const* db_name,

View File

@ -89,6 +89,8 @@ extern "C" {
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_stop_network();
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_add_shutdown_hook(void (*hook)());
#pragma pack(push, 4)
typedef struct keyvalue {
const void* key;

View File

@ -41,6 +41,16 @@
static JavaVM* g_jvm = 0;
static thread_local JNIEnv* g_thread_jenv = 0; // Defined for the network thread once it is running, and for any thread that has called registerCallback
static thread_local jmethodID g_IFutureCallback_call_methodID = 0;
static thread_local bool is_external = false;
void detachIfExternalThread() {
if(is_external && g_thread_jenv != 0) {
JNIEnv *local_jenv;
g_thread_jenv = 0;
g_IFutureCallback_call_methodID = 0;
g_jvm->DetachCurrentThread();
}
}
void throwOutOfMem(JNIEnv *jenv) {
const char *className = "java/lang/OutOfMemoryError";
@ -121,15 +131,15 @@ static bool findCallbackMethods(JNIEnv *jenv) {
static void callCallback( FDBFuture* f, void* data ) {
if (g_thread_jenv == 0) {
// We are on an external thread and must attach to the JVM.
// Best practice is to detach at some point, but we make one
// of these per external client and cache it, so we *should*
// be fine.
// The shutdown hook will later detach this thread.
is_external = true;
if( g_jvm != 0 && g_jvm->AttachCurrentThreadAsDaemon((void **) &g_thread_jenv, JNI_NULL) == JNI_OK ) {
if( !findCallbackMethods( g_thread_jenv ) ) {
g_thread_jenv->FatalError("FDB: Could not find callback method.\n");
}
} else {
// Can't call FatalError, because we don't have a pointer to the jenv...
// There will be a segmentation fault from the attempt to call the callback method.
fprintf(stderr, "FDB: Could not attach external client thread to the JVM as daemon.\n");
}
}
@ -1062,6 +1072,11 @@ JNIEXPORT void JNICALL Java_com_apple_cie_foundationdb_FDB_Network_1run(JNIEnv *
return;
}
fdb_error_t hookErr = fdb_add_shutdown_hook( &detachIfExternalThread );
if( hookErr ) {
safeThrow( jenv, getThrowable( jenv, hookErr ) );
}
fdb_error_t err = fdb_run_network();
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );

View File

@ -106,7 +106,7 @@ public class TesterArgs {
for (j = i + 1; j < args.length && args[j].charAt(0) != '-'; j++) {
testsToRun.add(args[j]);
}
i = j;
i = j - 1;
} else {
System.out.println("No tests specified with argument " + arg + "\n");
printUsage();

View File

@ -1134,12 +1134,22 @@ void MultiVersionApi::setupNetwork() {
THREAD_FUNC_RETURN runNetworkThread(void *param) {
try {
((IClientApi*)param)->runNetwork();
((ClientInfo*)param)->api->runNetwork();
}
catch(Error &e) {
TraceEvent(SevError, "RunNetworkError").error(e);
}
std::vector<void (*)()> &hooks = ((ClientInfo*)param)->shutdownHooks;
for(auto hook : hooks) {
try {
hook();
}
catch(Error &e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
}
}
THREAD_RETURN;
}
@ -1156,12 +1166,31 @@ void MultiVersionApi::runNetwork() {
if(!bypassMultiClientApi) {
runOnExternalClients([&handles](Reference<ClientInfo> client) {
if(client->external) {
handles.push_back(g_network->startThread(&runNetworkThread, client->api));
handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
}
});
}
Error *runErr = NULL;
try {
localClient->api->runNetwork();
}
catch(Error &e) {
runErr = &e;
}
for(auto &hook : localClient->shutdownHooks) {
try {
hook();
}
catch(Error &e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
}
}
if(runErr != NULL) {
throw *runErr;
}
for(auto h : handles) {
waitThread(h);
@ -1185,6 +1214,23 @@ void MultiVersionApi::stopNetwork() {
}
}
void MultiVersionApi::addShutdownHook(void (*hook)(void)) {
lock.enter();
if(!networkSetup) {
lock.leave();
throw network_not_setup();
}
lock.leave();
shutdownHooks.push_back(hook);
if(!bypassMultiClientApi) {
for( auto it : externalClients ) {
it.second->shutdownHooks.push_back(hook);
}
}
}
ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clusterFilePath) {
lock.enter();
if(!networkSetup) {

View File

@ -320,6 +320,7 @@ struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
std::string libPath;
bool external;
bool failed;
vector<void (*)()> shutdownHooks;
ClientInfo() : protocolVersion(0), api(NULL), external(false), failed(true) {}
ClientInfo(IClientApi *api) : protocolVersion(0), api(api), libPath("internal"), external(false), failed(false) {}
@ -402,6 +403,7 @@ public:
void setupNetwork();
void runNetwork();
void stopNetwork();
void addShutdownHook(void (*hook)(void));
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
static MultiVersionApi* api;
@ -441,6 +443,7 @@ private:
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;
std::map<FDBNetworkOptions::Option, std::set<Standalone<StringRef>>> setEnvOptions;
volatile bool envOptionsLoaded;
std::vector<void (*)()> shutdownHooks;
};