libceph: make message data be a pointer

Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.

A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.

Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.

When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.

This partially resolves:
    http://tracker.ceph.com/issues/4429

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2013-03-11 23:34:24 -05:00 committed by Sage Weil
parent 8ea299bcbc
commit 6644ed7b7e
2 changed files with 63 additions and 38 deletions

View File

@ -64,8 +64,6 @@ struct ceph_messenger {
u32 required_features; u32 required_features;
}; };
#define ceph_msg_has_data(m) ((m)->data.type != CEPH_MSG_DATA_NONE)
enum ceph_msg_data_type { enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */ CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */ CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
@ -141,8 +139,7 @@ struct ceph_msg {
struct kvec front; /* unaligned blobs of message */ struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle; struct ceph_buffer *middle;
/* data payload */ struct ceph_msg_data *data; /* data payload */
struct ceph_msg_data data;
struct ceph_connection *con; struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */ struct list_head list_head; /* links for connection lists */

View File

@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
/* Initialize data cursor */ /* Initialize data cursor */
ceph_msg_data_cursor_init(&msg->data, data_len); ceph_msg_data_cursor_init(msg->data, data_len);
} }
/* /*
@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
static int write_partial_message_data(struct ceph_connection *con) static int write_partial_message_data(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->out_msg; struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->data.cursor; struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
bool do_datacrc = !con->msgr->nocrc; bool do_datacrc = !con->msgr->nocrc;
u32 crc; u32 crc;
dout("%s %p msg %p\n", __func__, con, msg); dout("%s %p msg %p\n", __func__, con, msg);
if (WARN_ON(!ceph_msg_has_data(msg))) if (WARN_ON(!msg->data))
return -EINVAL; return -EINVAL;
/* /*
@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
bool need_crc; bool need_crc;
int ret; int ret;
page = ceph_msg_data_next(&msg->data, &page_offset, &length, page = ceph_msg_data_next(msg->data, &page_offset, &length,
&last_piece); &last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset, ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece); length, last_piece);
@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
} }
if (do_datacrc && cursor->need_crc) if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length); crc = ceph_crc32c_page(crc, page, page_offset, length);
need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret); need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
} }
dout("%s %p msg %p done\n", __func__, con, msg); dout("%s %p msg %p done\n", __func__, con, msg);
@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con) static int read_partial_msg_data(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->in_msg; struct ceph_msg *msg = con->in_msg;
struct ceph_msg_data_cursor *cursor = &msg->data.cursor; struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
const bool do_datacrc = !con->msgr->nocrc; const bool do_datacrc = !con->msgr->nocrc;
struct page *page; struct page *page;
size_t page_offset; size_t page_offset;
@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
int ret; int ret;
BUG_ON(!msg); BUG_ON(!msg);
if (WARN_ON(!ceph_msg_has_data(msg))) if (!msg->data)
return -EIO; return -EIO;
if (do_datacrc) if (do_datacrc)
crc = con->in_data_crc; crc = con->in_data_crc;
while (cursor->resid) { while (cursor->resid) {
page = ceph_msg_data_next(&msg->data, &page_offset, &length, page = ceph_msg_data_next(msg->data, &page_offset, &length,
NULL); NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) { if (ret <= 0) {
@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc) if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret); crc = ceph_crc32c_page(crc, page, page_offset, ret);
(void) ceph_msg_data_advance(&msg->data, (size_t) ret); (void) ceph_msg_data_advance(msg->data, (size_t)ret);
} }
if (do_datacrc) if (do_datacrc)
con->in_data_crc = crc; con->in_data_crc = crc;
@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
} }
EXPORT_SYMBOL(ceph_con_keepalive); EXPORT_SYMBOL(ceph_con_keepalive);
static void ceph_msg_data_init(struct ceph_msg_data *data) static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
{ {
data->type = CEPH_MSG_DATA_NONE; struct ceph_msg_data *data;
if (WARN_ON(!ceph_msg_data_type_valid(type)))
return NULL;
data = kzalloc(sizeof (*data), GFP_NOFS);
if (data)
data->type = type;
return data;
}
static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
if (!data)
return;
if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist);
kfree(data->pagelist);
}
kfree(data);
} }
void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment) size_t length, size_t alignment)
{ {
struct ceph_msg_data *data;
BUG_ON(!pages); BUG_ON(!pages);
BUG_ON(!length); BUG_ON(!length);
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); BUG_ON(msg->data != NULL);
msg->data.type = CEPH_MSG_DATA_PAGES; data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
msg->data.pages = pages; BUG_ON(!data);
msg->data.length = length; data->pages = pages;
msg->data.alignment = alignment & ~PAGE_MASK; data->length = length;
data->alignment = alignment & ~PAGE_MASK;
msg->data = data;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pages); EXPORT_SYMBOL(ceph_msg_data_set_pages);
void ceph_msg_data_set_pagelist(struct ceph_msg *msg, void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist) struct ceph_pagelist *pagelist)
{ {
struct ceph_msg_data *data;
BUG_ON(!pagelist); BUG_ON(!pagelist);
BUG_ON(!pagelist->length); BUG_ON(!pagelist->length);
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); BUG_ON(msg->data != NULL);
msg->data.type = CEPH_MSG_DATA_PAGELIST; data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
msg->data.pagelist = pagelist; BUG_ON(!data);
data->pagelist = pagelist;
msg->data = data;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pagelist); EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
{ {
BUG_ON(!bio); struct ceph_msg_data *data;
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
msg->data.type = CEPH_MSG_DATA_BIO; BUG_ON(!bio);
msg->data.bio = bio; BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
data->bio = bio;
msg->data = data;
} }
EXPORT_SYMBOL(ceph_msg_data_set_bio); EXPORT_SYMBOL(ceph_msg_data_set_bio);
@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head); INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref); kref_init(&m->kref);
ceph_msg_data_init(&m->data);
/* front */ /* front */
m->front_max = front_len; m->front_max = front_len;
if (front_len) { if (front_len) {
@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle); ceph_buffer_put(m->middle);
m->middle = NULL; m->middle = NULL;
} }
if (ceph_msg_has_data(m)) { ceph_msg_data_destroy(m->data);
if (m->data.type == CEPH_MSG_DATA_PAGELIST) { m->data = NULL;
ceph_pagelist_release(m->data.pagelist);
kfree(m->data.pagelist);
}
memset(&m->data, 0, sizeof m->data);
ceph_msg_data_init(&m->data);
}
if (m->pool) if (m->pool)
ceph_msgpool_put(m->pool, m); ceph_msgpool_put(m->pool, m);
@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg) void ceph_msg_dump(struct ceph_msg *msg)
{ {
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
msg->front_max, msg->data.length); msg->front_max, msg->data->length);
print_hex_dump(KERN_DEBUG, "header: ", print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1, DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true); &msg->hdr, sizeof(msg->hdr), true);