libceph: async MON client generic requests
For map check, we are going to need to send CEPH_MSG_MON_GET_VERSION messages asynchronously and get a callback on completion. Refactor MON client to allow firing off generic requests asynchronously and add an async variant of ceph_monc_get_version(). ceph_monc_do_statfs() is switched over and remains sync. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
parent
b07d3c4bd7
commit
d0b19705e9
|
@ -4896,7 +4896,7 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
|
|||
again:
|
||||
ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
|
||||
if (ret == -ENOENT && tries++ < 1) {
|
||||
ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
|
||||
ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
|
||||
&newest_epoch);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
|
|
@ -39,20 +39,31 @@ struct ceph_mon_request {
|
|||
ceph_monc_request_func_t do_request;
|
||||
};
|
||||
|
||||
typedef void (*ceph_monc_callback_t)(struct ceph_mon_generic_request *);
|
||||
|
||||
/*
|
||||
* ceph_mon_generic_request is being used for the statfs and
|
||||
* mon_get_version requests which are being done a bit differently
|
||||
* because we need to get data back to the caller
|
||||
*/
|
||||
struct ceph_mon_generic_request {
|
||||
struct ceph_mon_client *monc;
|
||||
struct kref kref;
|
||||
u64 tid;
|
||||
struct rb_node node;
|
||||
int result;
|
||||
void *buf;
|
||||
|
||||
struct completion completion;
|
||||
ceph_monc_callback_t complete_cb;
|
||||
u64 private_data; /* r_tid/linger_id */
|
||||
|
||||
struct ceph_msg *request; /* original request */
|
||||
struct ceph_msg *reply; /* and reply */
|
||||
|
||||
union {
|
||||
struct ceph_statfs *st;
|
||||
u64 newest;
|
||||
} u;
|
||||
};
|
||||
|
||||
struct ceph_mon_client {
|
||||
|
@ -124,8 +135,10 @@ extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
|
|||
extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
|
||||
struct ceph_statfs *buf);
|
||||
|
||||
extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
|
||||
const char *what, u64 *newest);
|
||||
int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
|
||||
u64 *newest);
|
||||
int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
|
||||
ceph_monc_callback_t cb, u64 private_data);
|
||||
|
||||
extern int ceph_monc_open_session(struct ceph_mon_client *monc);
|
||||
|
||||
|
|
|
@ -493,6 +493,10 @@ static void release_generic_request(struct kref *kref)
|
|||
struct ceph_mon_generic_request *req =
|
||||
container_of(kref, struct ceph_mon_generic_request, kref);
|
||||
|
||||
dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
|
||||
req->reply);
|
||||
WARN_ON(!RB_EMPTY_NODE(&req->node));
|
||||
|
||||
if (req->reply)
|
||||
ceph_msg_put(req->reply);
|
||||
if (req->request)
|
||||
|
@ -503,6 +507,7 @@ static void release_generic_request(struct kref *kref)
|
|||
|
||||
static void put_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
if (req)
|
||||
kref_put(&req->kref, release_generic_request);
|
||||
}
|
||||
|
||||
|
@ -511,6 +516,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req)
|
|||
kref_get(&req->kref);
|
||||
}
|
||||
|
||||
static struct ceph_mon_generic_request *
|
||||
alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
|
||||
{
|
||||
struct ceph_mon_generic_request *req;
|
||||
|
||||
req = kzalloc(sizeof(*req), gfp);
|
||||
if (!req)
|
||||
return NULL;
|
||||
|
||||
req->monc = monc;
|
||||
kref_init(&req->kref);
|
||||
RB_CLEAR_NODE(&req->node);
|
||||
init_completion(&req->completion);
|
||||
|
||||
dout("%s greq %p\n", __func__, req);
|
||||
return req;
|
||||
}
|
||||
|
||||
static void register_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
struct ceph_mon_client *monc = req->monc;
|
||||
|
||||
WARN_ON(req->tid);
|
||||
|
||||
get_generic_request(req);
|
||||
req->tid = ++monc->last_tid;
|
||||
insert_generic_request(&monc->generic_request_tree, req);
|
||||
}
|
||||
|
||||
static void send_generic_request(struct ceph_mon_client *monc,
|
||||
struct ceph_mon_generic_request *req)
|
||||
{
|
||||
WARN_ON(!req->tid);
|
||||
|
||||
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
|
||||
req->request->hdr.tid = cpu_to_le64(req->tid);
|
||||
ceph_con_send(&monc->con, ceph_msg_get(req->request));
|
||||
}
|
||||
|
||||
static void __finish_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
struct ceph_mon_client *monc = req->monc;
|
||||
|
||||
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
|
||||
erase_generic_request(&monc->generic_request_tree, req);
|
||||
|
||||
ceph_msg_revoke(req->request);
|
||||
ceph_msg_revoke_incoming(req->reply);
|
||||
}
|
||||
|
||||
static void finish_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
__finish_generic_request(req);
|
||||
put_generic_request(req);
|
||||
}
|
||||
|
||||
static void complete_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
if (req->complete_cb)
|
||||
req->complete_cb(req);
|
||||
else
|
||||
complete_all(&req->completion);
|
||||
put_generic_request(req);
|
||||
}
|
||||
|
||||
void cancel_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
struct ceph_mon_client *monc = req->monc;
|
||||
struct ceph_mon_generic_request *lookup_req;
|
||||
|
||||
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
lookup_req = lookup_generic_request(&monc->generic_request_tree,
|
||||
req->tid);
|
||||
if (lookup_req) {
|
||||
WARN_ON(lookup_req != req);
|
||||
finish_generic_request(req);
|
||||
}
|
||||
|
||||
mutex_unlock(&monc->mutex);
|
||||
}
|
||||
|
||||
static int wait_generic_request(struct ceph_mon_generic_request *req)
|
||||
{
|
||||
int ret;
|
||||
|
||||
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
|
||||
ret = wait_for_completion_interruptible(&req->completion);
|
||||
if (ret)
|
||||
cancel_generic_request(req);
|
||||
else
|
||||
ret = req->result; /* completed */
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
|
||||
struct ceph_msg_header *hdr,
|
||||
int *skip)
|
||||
|
@ -540,40 +642,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
|
|||
return m;
|
||||
}
|
||||
|
||||
static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
|
||||
struct ceph_mon_generic_request *req)
|
||||
{
|
||||
int err;
|
||||
|
||||
/* register request */
|
||||
req->tid = tid != 0 ? tid : ++monc->last_tid;
|
||||
req->request->hdr.tid = cpu_to_le64(req->tid);
|
||||
insert_generic_request(&monc->generic_request_tree, req);
|
||||
ceph_con_send(&monc->con, ceph_msg_get(req->request));
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
err = wait_for_completion_interruptible(&req->completion);
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
erase_generic_request(&monc->generic_request_tree, req);
|
||||
|
||||
if (!err)
|
||||
err = req->result;
|
||||
return err;
|
||||
}
|
||||
|
||||
static int do_generic_request(struct ceph_mon_client *monc,
|
||||
struct ceph_mon_generic_request *req)
|
||||
{
|
||||
int err;
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
err = __do_generic_request(monc, 0, req);
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
/*
|
||||
* statfs
|
||||
*/
|
||||
|
@ -584,22 +652,24 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
|
|||
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
|
||||
u64 tid = le64_to_cpu(msg->hdr.tid);
|
||||
|
||||
dout("%s msg %p tid %llu\n", __func__, msg, tid);
|
||||
|
||||
if (msg->front.iov_len != sizeof(*reply))
|
||||
goto bad;
|
||||
dout("handle_statfs_reply %p tid %llu\n", msg, tid);
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
req = lookup_generic_request(&monc->generic_request_tree, tid);
|
||||
if (req) {
|
||||
*(struct ceph_statfs *)req->buf = reply->st;
|
||||
req->result = 0;
|
||||
get_generic_request(req);
|
||||
}
|
||||
if (!req) {
|
||||
mutex_unlock(&monc->mutex);
|
||||
if (req) {
|
||||
complete_all(&req->completion);
|
||||
put_generic_request(req);
|
||||
return;
|
||||
}
|
||||
|
||||
req->result = 0;
|
||||
*req->u.st = reply->st; /* struct */
|
||||
__finish_generic_request(req);
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
complete_generic_request(req);
|
||||
return;
|
||||
|
||||
bad:
|
||||
|
@ -614,39 +684,38 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
|
|||
{
|
||||
struct ceph_mon_generic_request *req;
|
||||
struct ceph_mon_statfs *h;
|
||||
int err;
|
||||
int ret = -ENOMEM;
|
||||
|
||||
req = kzalloc(sizeof(*req), GFP_NOFS);
|
||||
req = alloc_generic_request(monc, GFP_NOFS);
|
||||
if (!req)
|
||||
return -ENOMEM;
|
||||
goto out;
|
||||
|
||||
kref_init(&req->kref);
|
||||
RB_CLEAR_NODE(&req->node);
|
||||
req->buf = buf;
|
||||
init_completion(&req->completion);
|
||||
|
||||
err = -ENOMEM;
|
||||
req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
|
||||
true);
|
||||
if (!req->request)
|
||||
goto out;
|
||||
req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
|
||||
true);
|
||||
|
||||
req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
|
||||
if (!req->reply)
|
||||
goto out;
|
||||
|
||||
req->u.st = buf;
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
register_generic_request(req);
|
||||
/* fill out request */
|
||||
h = req->request->front.iov_base;
|
||||
h->monhdr.have_version = 0;
|
||||
h->monhdr.session_mon = cpu_to_le16(-1);
|
||||
h->monhdr.session_mon_tid = 0;
|
||||
h->fsid = monc->monmap->fsid;
|
||||
send_generic_request(monc, req);
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
err = do_generic_request(monc, req);
|
||||
|
||||
ret = wait_generic_request(req);
|
||||
out:
|
||||
put_generic_request(req);
|
||||
return err;
|
||||
return ret;
|
||||
}
|
||||
EXPORT_SYMBOL(ceph_monc_do_statfs);
|
||||
|
||||
|
@ -659,7 +728,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
|
|||
void *end = p + msg->front_alloc_len;
|
||||
u64 handle;
|
||||
|
||||
dout("%s %p tid %llu\n", __func__, msg, tid);
|
||||
dout("%s msg %p tid %llu\n", __func__, msg, tid);
|
||||
|
||||
ceph_decode_need(&p, end, 2*sizeof(u64), bad);
|
||||
handle = ceph_decode_64(&p);
|
||||
|
@ -668,77 +737,110 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
|
|||
|
||||
mutex_lock(&monc->mutex);
|
||||
req = lookup_generic_request(&monc->generic_request_tree, handle);
|
||||
if (req) {
|
||||
*(u64 *)req->buf = ceph_decode_64(&p);
|
||||
req->result = 0;
|
||||
get_generic_request(req);
|
||||
}
|
||||
if (!req) {
|
||||
mutex_unlock(&monc->mutex);
|
||||
if (req) {
|
||||
complete_all(&req->completion);
|
||||
put_generic_request(req);
|
||||
return;
|
||||
}
|
||||
|
||||
req->result = 0;
|
||||
req->u.newest = ceph_decode_64(&p);
|
||||
__finish_generic_request(req);
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
complete_generic_request(req);
|
||||
return;
|
||||
|
||||
bad:
|
||||
pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
|
||||
ceph_msg_dump(msg);
|
||||
}
|
||||
|
||||
static struct ceph_mon_generic_request *
|
||||
__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
|
||||
ceph_monc_callback_t cb, u64 private_data)
|
||||
{
|
||||
struct ceph_mon_generic_request *req;
|
||||
|
||||
req = alloc_generic_request(monc, GFP_NOIO);
|
||||
if (!req)
|
||||
goto err_put_req;
|
||||
|
||||
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
|
||||
sizeof(u64) + sizeof(u32) + strlen(what),
|
||||
GFP_NOIO, true);
|
||||
if (!req->request)
|
||||
goto err_put_req;
|
||||
|
||||
req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
|
||||
true);
|
||||
if (!req->reply)
|
||||
goto err_put_req;
|
||||
|
||||
req->complete_cb = cb;
|
||||
req->private_data = private_data;
|
||||
|
||||
mutex_lock(&monc->mutex);
|
||||
register_generic_request(req);
|
||||
{
|
||||
void *p = req->request->front.iov_base;
|
||||
void *const end = p + req->request->front_alloc_len;
|
||||
|
||||
ceph_encode_64(&p, req->tid); /* handle */
|
||||
ceph_encode_string(&p, end, what, strlen(what));
|
||||
WARN_ON(p != end);
|
||||
}
|
||||
send_generic_request(monc, req);
|
||||
mutex_unlock(&monc->mutex);
|
||||
|
||||
return req;
|
||||
|
||||
err_put_req:
|
||||
put_generic_request(req);
|
||||
return ERR_PTR(-ENOMEM);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send MMonGetVersion and wait for the reply.
|
||||
*
|
||||
* @what: one of "mdsmap", "osdmap" or "monmap"
|
||||
*/
|
||||
int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
|
||||
int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
|
||||
u64 *newest)
|
||||
{
|
||||
struct ceph_mon_generic_request *req;
|
||||
void *p, *end;
|
||||
u64 tid;
|
||||
int err;
|
||||
int ret;
|
||||
|
||||
req = kzalloc(sizeof(*req), GFP_NOFS);
|
||||
if (!req)
|
||||
return -ENOMEM;
|
||||
req = __ceph_monc_get_version(monc, what, NULL, 0);
|
||||
if (IS_ERR(req))
|
||||
return PTR_ERR(req);
|
||||
|
||||
kref_init(&req->kref);
|
||||
RB_CLEAR_NODE(&req->node);
|
||||
req->buf = newest;
|
||||
init_completion(&req->completion);
|
||||
ret = wait_generic_request(req);
|
||||
if (!ret)
|
||||
*newest = req->u.newest;
|
||||
|
||||
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
|
||||
sizeof(u64) + sizeof(u32) + strlen(what),
|
||||
GFP_NOFS, true);
|
||||
if (!req->request) {
|
||||
err = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
|
||||
GFP_NOFS, true);
|
||||
if (!req->reply) {
|
||||
err = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
p = req->request->front.iov_base;
|
||||
end = p + req->request->front_alloc_len;
|
||||
|
||||
/* fill out request */
|
||||
mutex_lock(&monc->mutex);
|
||||
tid = ++monc->last_tid;
|
||||
ceph_encode_64(&p, tid); /* handle */
|
||||
ceph_encode_string(&p, end, what, strlen(what));
|
||||
|
||||
err = __do_generic_request(monc, tid, req);
|
||||
|
||||
mutex_unlock(&monc->mutex);
|
||||
out:
|
||||
put_generic_request(req);
|
||||
return err;
|
||||
return ret;
|
||||
}
|
||||
EXPORT_SYMBOL(ceph_monc_do_get_version);
|
||||
EXPORT_SYMBOL(ceph_monc_get_version);
|
||||
|
||||
/*
|
||||
* Send MMonGetVersion,
|
||||
*
|
||||
* @what: one of "mdsmap", "osdmap" or "monmap"
|
||||
*/
|
||||
int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
|
||||
ceph_monc_callback_t cb, u64 private_data)
|
||||
{
|
||||
struct ceph_mon_generic_request *req;
|
||||
|
||||
req = __ceph_monc_get_version(monc, what, cb, private_data);
|
||||
if (IS_ERR(req))
|
||||
return PTR_ERR(req);
|
||||
|
||||
put_generic_request(req);
|
||||
return 0;
|
||||
}
|
||||
EXPORT_SYMBOL(ceph_monc_get_version_async);
|
||||
|
||||
/*
|
||||
* Resend pending generic requests.
|
||||
|
@ -923,6 +1025,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
|
|||
|
||||
ceph_auth_destroy(monc->auth);
|
||||
|
||||
WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
|
||||
|
||||
ceph_msg_put(monc->m_auth);
|
||||
ceph_msg_put(monc->m_auth_reply);
|
||||
ceph_msg_put(monc->m_subscribe);
|
||||
|
|
Loading…
Reference in New Issue