Move addNetworkThreadCompletionHook to IClientApi.
This commit is contained in:
parent
e6f9fc160b
commit
01a3360982
|
@ -42,7 +42,8 @@ int g_api_version = 0;
|
|||
#define CLUSTER(c) ((ICluster*)c)
|
||||
#define TXN(t) ((ITransaction*)t)
|
||||
|
||||
#define API (MultiVersionApi::api)
|
||||
#define API ((IClientApi*)MultiVersionApi::api)
|
||||
|
||||
|
||||
/* This must be true so that we can return the data pointer of a
|
||||
Standalone<RangeResultRef> as an array of FDBKeyValue. */
|
||||
|
|
|
@ -104,6 +104,8 @@ public:
|
|||
virtual void stopNetwork() = 0;
|
||||
|
||||
virtual ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath) = 0;
|
||||
|
||||
virtual void addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter) = 0;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -355,6 +355,11 @@ ThreadFuture<Reference<ICluster>> DLApi::createCluster(const char *clusterFilePa
|
|||
});
|
||||
}
|
||||
|
||||
void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter) {
|
||||
// All completion hooks are handled on the local client
|
||||
throw unsupported_operation();
|
||||
}
|
||||
|
||||
// MultiVersionTransaction
|
||||
MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db) : db(db) {
|
||||
updateTransaction();
|
||||
|
@ -1174,29 +1179,7 @@ void MultiVersionApi::runNetwork() {
|
|||
});
|
||||
}
|
||||
|
||||
Error *runErr = NULL;
|
||||
try {
|
||||
localClient->api->runNetwork();
|
||||
}
|
||||
catch(Error &e) {
|
||||
runErr = &e;
|
||||
}
|
||||
|
||||
for(auto &hook : localClient->threadCompletionHooks) {
|
||||
try {
|
||||
hook.first(hook.second);
|
||||
}
|
||||
catch(Error &e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
|
||||
}
|
||||
catch(...) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
|
||||
}
|
||||
}
|
||||
|
||||
if(runErr != NULL) {
|
||||
throw *runErr;
|
||||
}
|
||||
localClient->api->runNetwork();
|
||||
|
||||
for(auto h : handles) {
|
||||
waitThread(h);
|
||||
|
@ -1228,11 +1211,12 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *
|
|||
}
|
||||
lock.leave();
|
||||
|
||||
auto hookPair = std::pair<void (*)(void*), void*>(hook, hook_parameter);
|
||||
threadCompletionHooks.push_back(hookPair);
|
||||
localClient->api->addNetworkThreadCompletionHook(hook, hook_parameter);
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
auto hookPair = std::pair<void (*)(void*), void*>(hook, hook_parameter);
|
||||
for( auto it : externalClients ) {
|
||||
MutexHolder holder(lock);
|
||||
it.second->threadCompletionHooks.push_back(hookPair);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -203,6 +203,8 @@ public:
|
|||
|
||||
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
|
||||
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter);
|
||||
|
||||
private:
|
||||
const std::string fdbCPath;
|
||||
const Reference<FdbCApi> api;
|
||||
|
|
|
@ -357,7 +357,30 @@ void ThreadSafeApi::setupNetwork() {
|
|||
}
|
||||
|
||||
void ThreadSafeApi::runNetwork() {
|
||||
::runNetwork();
|
||||
Optional<Error> runErr;
|
||||
try {
|
||||
::runNetwork();
|
||||
}
|
||||
catch(Error &e) {
|
||||
runErr = e;
|
||||
}
|
||||
|
||||
for(auto &hook : threadCompletionHooks) {
|
||||
try {
|
||||
hook.first(hook.second);
|
||||
}
|
||||
catch(Error &e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
|
||||
}
|
||||
catch(...) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
|
||||
}
|
||||
}
|
||||
|
||||
if(runErr.present()) {
|
||||
throw runErr.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void ThreadSafeApi::stopNetwork() {
|
||||
|
@ -368,4 +391,16 @@ ThreadFuture<Reference<ICluster>> ThreadSafeApi::createCluster(const char *clust
|
|||
return ThreadSafeCluster::create(clusterFilePath, apiVersion);
|
||||
}
|
||||
|
||||
void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter) {
|
||||
if (!g_network) {
|
||||
throw network_not_setup();
|
||||
}
|
||||
|
||||
auto hookPair = std::pair<void (*)(void*), void*>(hook, hook_parameter);
|
||||
|
||||
MutexHolder holder(lock); // We could use the network thread to protect this action, but then we can't guarantee upon return that the hook is set.
|
||||
threadCompletionHooks.push_back(hookPair);
|
||||
}
|
||||
|
||||
|
||||
IClientApi* ThreadSafeApi::api = new ThreadSafeApi();
|
||||
|
|
|
@ -140,6 +140,8 @@ public:
|
|||
|
||||
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
|
||||
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hook_parameter);
|
||||
|
||||
static IClientApi* api;
|
||||
|
||||
private:
|
||||
|
@ -148,6 +150,9 @@ private:
|
|||
int apiVersion;
|
||||
const std::string clientVersion;
|
||||
uint64_t transportId;
|
||||
|
||||
Mutex lock;
|
||||
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue