[async] impl add name flags and support multi compile multi impl

This commit is contained in:
xiehuc 2015-04-20 23:06:53 +08:00
parent e38f5adeca
commit 15392fd646
10 changed files with 135 additions and 59 deletions

View File

@ -45,5 +45,14 @@
#define LWQQ_RETRY_VALUE 3
#define LWQQ_CACHE_DIR "/tmp/lwqq"
typedef enum {
LWQQ_WITH_LIBEV = 1 << 0,
LWQQ_WITH_LIBUV = 1 << 1,
LWQQ_WITH_SQLITE = 1 << 2,
LWQQ_WITH_MOZJS = 1 << 3,
LWQQ_WITH_SSL = 1 << 4,
LWQQ_WITH_ASYNCHDNS = 1 << 5
} LwqqFeatures;
#endif /* __CONFIG_H__ */

View File

@ -52,6 +52,43 @@ typedef struct LwqqAsyncEvent_ {
LwqqAsyncEvent* chained;
} LwqqAsyncEvent_;
LwqqAsyncImplList lwqq__async_impl_list_ = LIST_HEAD_INITIALIZER();
LwqqAsyncImpl* lwqq__async_impl_ = NULL;
LWQQ_EXPORT
void lwqq_async_implement(LwqqAsyncImpl* i) {
lwqq__async_impl_ = i;
// if we doesn't need async,
// we don't check default settings
#ifndef WITHOUT_ASYNC
// check async_impl
assert(LWQQ__ASYNC_IMPL(loop_create) && "need implement loop_create");
assert(LWQQ__ASYNC_IMPL(io_new) && "need implement io_new");
assert(LWQQ__ASYNC_IMPL(timer_new) && "need implement timer_new");
#endif
}
void lwqq_async_global_init()
{
static int initialized=0;
if(initialized) return;
#ifdef WITH_LIBUV
LIST_INSERT_HEAD(&lwqq__async_impl_list_, &impl_libuv, entries);
#endif
#ifdef WITH_LIBEV
LIST_INSERT_HEAD(&lwqq__async_impl_list_, &impl_libev, entries);
#endif
initialized = 1;
}
void lwqq_async_init(LwqqClient* lc)
{
lwqq_async_global_init();
lc->dispatch = lwqq_async_dispatch_delay;
if(lwqq__async_impl_ == NULL)
lwqq__async_impl_ = LIST_FIRST(&lwqq__async_impl_list_);
}
static void dispatch_wrap(LwqqAsyncTimerHandle timer, void* p)
{
async_dispatch_data* data = (async_dispatch_data*)p;
@ -79,25 +116,6 @@ void lwqq_async_dispatch_delay(LwqqCommand cmd, unsigned long timeout)
#endif
}
void lwqq_async_init(LwqqClient* lc)
{
lc->dispatch = lwqq_async_dispatch_delay;
#ifdef WITH_LIBEV
LWQQ_ASYNC_IMPLEMENT(impl_libev);
#endif
#ifdef WITH_LIBUV
LWQQ_ASYNC_IMPLEMENT(impl_libuv);
#endif
// if we doesn't need async,
// we don't check default settings
#ifndef WITHOUT_ASYNC
// check async_impl
assert(LWQQ__ASYNC_IMPL(loop_create));
assert(LWQQ__ASYNC_IMPL(io_new));
assert(LWQQ__ASYNC_IMPL(timer_new));
#endif
}
LwqqAsyncEvent* lwqq_async_event_new(void* req)
{
LwqqAsyncEvent* event = s_malloc0(sizeof(LwqqAsyncEvent_));
@ -409,14 +427,9 @@ void lwqq_async_timer_repeat(LwqqAsyncTimerHandle timer)
{
LWQQ__ASYNC_IMPL(timer_again)(timer);
}
LwqqAsyncImpl lwqq__async_impl_ = { 0 };
LWQQ_EXPORT
void lwqq_async_implement(LwqqAsyncImpl* i) { lwqq__async_impl_ = *i; }
void lwqq_free_extension(LwqqClient* lc, LwqqExtension* ext)
{
ext->remove(lc, ext);
s_free(ext);
}

View File

@ -169,6 +169,10 @@ void lwqq_async_dispatch(LwqqCommand cmd);
* @param timeout: delay timeout ms, default 0 means a small delay
*/
void lwqq_async_dispatch_delay(LwqqCommand cmd, unsigned long timeout);
// initialize global internal async_impl
// you call this before you want to select another impl
// then, you can handel impl list
void lwqq_async_global_init();
// initialize lwqq client with default dispatch function
void lwqq_async_init(LwqqClient* lc);
/**

View File

@ -3,7 +3,7 @@
#include "async.h"
#define LWQQ__ASYNC_IMPL(impl) lwqq__async_impl_.impl
#define LWQQ__ASYNC_IMPL(impl) lwqq__async_impl_->impl
struct LwqqAsyncTimer {
LwqqAsyncTimerCallback func;
void* data;
@ -15,7 +15,13 @@ struct LwqqAsyncIo {
int fd;
int action;
};
typedef enum {
USE_THREAD = 1<<0,
NO_THREAD = 0
}LwqqAsyncImplFlags;
typedef struct LwqqAsyncImpl {
const char* name;
const LwqqAsyncImplFlags flags;
void (*loop_create)();
void (*loop_run)();
void (*loop_stop)();
@ -31,9 +37,14 @@ typedef struct LwqqAsyncImpl {
void (*timer_start)(void* h, unsigned int ms);
void (*timer_stop)(void* h);
void (*timer_again)(void* h);
LIST_ENTRY(LwqqAsyncImpl) entries;
} LwqqAsyncImpl;
extern LwqqAsyncImpl lwqq__async_impl_;
typedef LIST_HEAD(, LwqqAsyncImpl) LwqqAsyncImplList;
extern LwqqAsyncImplList lwqq__async_impl_list_;
extern LwqqAsyncImpl* lwqq__async_impl_;
void lwqq_async_implement(LwqqAsyncImpl*);
#define LWQQ_ASYNC_IMPLEMENT(impl) lwqq_async_implement(&impl);

View File

@ -102,6 +102,8 @@ static void(timer_again)(void* t)
}
static LwqqAsyncImpl impl_libev = {
.name = "libev",
.flags = USE_THREAD,
.loop_create = loop_create,
.loop_run = loop_run,
.loop_stop = loop_stop,

View File

@ -96,6 +96,8 @@ static void(timer_again)(void* t)
}
static LwqqAsyncImpl impl_libuv = {
.name = "libuv",
.flags = USE_THREAD,
.loop_create = loop_create,
.loop_run = loop_run,
.loop_stop = loop_stop,

View File

@ -22,6 +22,7 @@
#include "logger.h"
#include "queue.h"
#include "utility.h"
#include "async_impl.h"
#include "internal.h"
//#define LWQQ_HTTP_USER_AGENT "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)
@ -115,6 +116,7 @@ static TABLE_BEGIN(proxy_map, long, 0) TR(LWQQ_HTTP_PROXY_HTTP, CURLPROXY_HTTP)
static GLOBAL global = { 0 };
static pthread_cond_t async_cond = PTHREAD_COND_INITIALIZER;
static pthread_cond_t ev_block_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t async_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t add_lock = PTHREAD_MUTEX_INITIALIZER;
@ -152,6 +154,14 @@ int lwqq_gdb_whats_running()
}
#endif
LwqqFeatures lwqq__http_check_feature()
{
return (curl_version_info(CURLVERSION_NOW)->features
& CURL_VERSION_ASYNCHDNS)
? LWQQ_WITH_ASYNCHDNS
: 0;
}
#ifndef HAVE_OPEN_MEMSTREAM
static size_t write_content(const char* ptr, size_t size, size_t nmemb,
void* userdata)
@ -1060,6 +1070,7 @@ void lwqq_http_global_init()
lwqq_async_io_watch(global.add_listener, global.pipe_fd[0],
LWQQ_ASYNC_READ, delay_add_handle_cb, NULL);
#endif
#endif
}
}
@ -1076,22 +1087,32 @@ static void safe_remove_link(LwqqClient* lc)
curl_easy_pause(easy, CURLPAUSE_ALL);
curl_multi_remove_handle(global.multi, easy);
}
pthread_cond_signal(&async_cond);
if(LWQQ__ASYNC_IMPL(flags) & USE_THREAD){
pthread_mutex_lock(&async_lock);
// notify main thread have done jobs
pthread_cond_signal(&async_cond);
// wait and block sub thread to prevent do curl event
// because this time main thread do curl clean job
pthread_mutex_unlock(&async_lock);
pthread_mutex_lock(&async_lock);
pthread_cond_wait(&ev_block_cond, &async_lock);
pthread_mutex_unlock(&async_lock);
}
}
LWQQ_EXPORT
void lwqq_http_global_free(LwqqCleanUp cleanup)
{
struct timespec wait_time;
if (global.multi) {
if (!TAILQ_EMPTY(&global.conn_link)) {
lwqq_async_dispatch(_C_(p, safe_remove_link, NULL));
if(LWQQ__ASYNC_IMPL(flags) & USE_THREAD){
pthread_mutex_lock(&async_lock);
wait_time.tv_sec = 60;
wait_time.tv_nsec = 0;
pthread_cond_timedwait(&async_cond, &async_lock, &wait_time);
lwqq_async_dispatch(_C_(p, safe_remove_link, NULL));
// wait sub thread remove all curl handle
pthread_cond_wait(&async_cond, &async_lock);
pthread_mutex_unlock(&async_lock);
}
}else
lwqq_async_dispatch(_C_(p, safe_remove_link, NULL));
D_ITEM* item, *tvar;
TAILQ_FOREACH_SAFE(item, &global.conn_link, entries, tvar)
@ -1117,26 +1138,31 @@ void lwqq_http_global_free(LwqqCleanUp cleanup)
lwqq_async_timer_free(global.timer_event);
curl_global_cleanup();
global.conn_length = 0;
if(LWQQ__ASYNC_IMPL(flags) & USE_THREAD){
pthread_mutex_lock(&async_lock);
// notify sub thread we have already done curl clean job
pthread_cond_signal(&ev_block_cond);
pthread_mutex_unlock(&async_lock);
}
}
}
LWQQ_EXPORT
void lwqq_http_cleanup(LwqqClient* lc, LwqqCleanUp cleanup)
{
struct timespec wait_time = { 0 };
if (lc && global.multi) {
/**must dispatch safe_remove_link first
* then vp_do(item->cmd) because vp_do might release memory
*/
if (!TAILQ_EMPTY(&global.conn_link)) {
lwqq_async_dispatch(_C_(p, safe_remove_link, lc));
if(LWQQ__ASYNC_IMPL(flags) & USE_THREAD){
pthread_mutex_lock(&async_lock);
wait_time.tv_sec = 60;
wait_time.tv_nsec = 0;
// must use cond wait because timedcond might not trigger dispatch
pthread_cond_timedwait(&async_cond, &async_lock, &wait_time);
lwqq_async_dispatch(_C_(p, safe_remove_link, lc));
// wait sub thread remove all curl handles
pthread_cond_wait(&async_cond, &async_lock);
pthread_mutex_unlock(&async_lock);
}
}else
lwqq_async_dispatch(_C_(p, safe_remove_link, lc));
D_ITEM* item, *tvar;
TAILQ_FOREACH_SAFE(item, &global.conn_link, entries, tvar)
@ -1153,6 +1179,12 @@ void lwqq_http_cleanup(LwqqClient* lc, LwqqCleanUp cleanup)
}
s_free(item);
}
if(LWQQ__ASYNC_IMPL(flags) & USE_THREAD){
pthread_mutex_lock(&async_lock);
// notify sub thread have done all curl clean job
pthread_cond_signal(&ev_block_cond);
pthread_mutex_unlock(&async_lock);
}
}
}

View File

@ -154,5 +154,8 @@ json_t* json_find_first_label_all(const json_t* json, const char* text_label);
char* json_parse_simple_value(json_t* json, const char* key);
char* json_unescape_s(char* str);
// =================== http.h ===============================
LwqqFeatures lwqq__http_check_feature();
#endif

View File

@ -23,23 +23,32 @@
#include "utility.h"
LWQQ_EXPORT
const LwqqFeatures lwqq_features = 0
const LwqqFeatures lwqq_features()
{
static int initialized = 0;
static LwqqFeatures feature = 0;
if(initialized) return feature;
feature = feature
#ifdef WITH_LIBEV
| LWQQ_WITH_LIBEV
| LWQQ_WITH_LIBEV
#endif
#ifdef WITH_LIBUV
| LWQQ_WITH_LIBUV
| LWQQ_WITH_LIBUV
#endif
#ifdef WITH_SQLITE
| LWQQ_WITH_SQLITE
| LWQQ_WITH_SQLITE
#endif
#ifdef WITH_MOZJS
| LWQQ_WITH_MOZJS
| LWQQ_WITH_MOZJS
#endif
#ifdef SSL
| LWQQ_WITH_SSL
| LWQQ_WITH_SSL
#endif
;
;
feature |= lwqq__http_check_feature();
return feature;
}
LWQQ_EXPORT
const char* lwqq_version = LWQQ_VERSION;
@ -673,4 +682,3 @@ const LwqqHashEntry* lwqq_hash_get_last(LwqqClient* lc)
LwqqClient_* lc_ = (LwqqClient_*)lc;
return lc_->hash_idx;
}

View File

@ -49,15 +49,7 @@ typedef int (*LwqqProgressFunc)(void* data, size_t now, size_t total);
* there are some old LWQQ head that should be WEBQQ
*/
typedef enum {
LWQQ_WITH_LIBEV = 1 << 0,
LWQQ_WITH_LIBUV = 1 << 1,
LWQQ_WITH_SQLITE = 1 << 2,
LWQQ_WITH_MOZJS = 1 << 3,
LWQQ_WITH_SSL = 1 << 4
} LwqqFeatures;
extern const LwqqFeatures lwqq_features;
extern const LwqqFeatures lwqq_features();
extern const char* lwqq_version;
typedef enum {