rbd: simplify rbd_rq_fn()

When processing a request, rbd_rq_fn() makes clones of the bio's in
the request's bio chain and submits the results to osd's to be
satisfied.  If a request bio straddles the boundary between objects
backing the rbd image, it must be represented by two cloned bio's,
one for the first part (at the end of one object) and one for the
second (at the beginning of the next object).

This has been handled by a function bio_chain_clone(), which
includes an interface only a mother could love, and which has
been found to have other problems.

This patch defines two new fairly generic bio functions (one which
replaces bio_chain_clone()) to help out the situation, and then
revises rbd_rq_fn() to make use of them.

First, bio_clone_range() clones a portion of a single bio, starting
at a given offset within the bio and including only as many bytes
as requested.  As a convenience, a request to clone the entire bio
is passed directly to bio_clone().

Second, bio_chain_clone_range() performs a similar function,
producing a chain of cloned bio's covering a sub-range of the
source chain.  No bio_pair structures are used, and if successful
the result will represent exactly the specified range.

Using bio_chain_clone_range() makes bio_rq_fn() a little easier
to understand, because it avoids the need to pass very much
state information between consecutive calls.  By avoiding the need
to track a bio_pair structure, it also eliminates the problem
described here:  http://tracker.newdream.net/issues/2933

Note that a block request (and therefore the complete length of
a bio chain processed in rbd_rq_fn()) is an unsigned int, while
the result of rbd_segment_length() is u64.  This change makes
this range trunctation explicit, and trips a bug if the the
segment boundary is too far off.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2012-10-20 22:17:27 -05:00
parent 0ed7285e00
commit f7760dad28
1 changed files with 152 additions and 79 deletions

View File

@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
} }
/* /*
* bio_chain_clone - clone a chain of bios up to a certain length. * Clone a portion of a bio, starting at the given byte offset
* might return a bio_pair that will need to be released. * and continuing for the number of bytes indicated.
*/ */
static struct bio *bio_chain_clone(struct bio **old, struct bio **next, static struct bio *bio_clone_range(struct bio *bio_src,
struct bio_pair **bp, unsigned int offset,
int len, gfp_t gfpmask) unsigned int len,
gfp_t gfpmask)
{ {
struct bio *old_chain = *old; struct bio_vec *bv;
struct bio *new_chain = NULL; unsigned int resid;
struct bio *tail; unsigned short idx;
int total = 0; unsigned int voff;
unsigned short end_idx;
unsigned short vcnt;
struct bio *bio;
if (*bp) { /* Handle the easy case for the caller */
bio_pair_release(*bp);
*bp = NULL; if (!offset && len == bio_src->bi_size)
return bio_clone(bio_src, gfpmask);
if (WARN_ON_ONCE(!len))
return NULL;
if (WARN_ON_ONCE(len > bio_src->bi_size))
return NULL;
if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
return NULL;
/* Find first affected segment... */
resid = offset;
__bio_for_each_segment(bv, bio_src, idx, 0) {
if (resid < bv->bv_len)
break;
resid -= bv->bv_len;
}
voff = resid;
/* ...and the last affected segment */
resid += len;
__bio_for_each_segment(bv, bio_src, end_idx, idx) {
if (resid <= bv->bv_len)
break;
resid -= bv->bv_len;
}
vcnt = end_idx - idx + 1;
/* Build the clone */
bio = bio_alloc(gfpmask, (unsigned int) vcnt);
if (!bio)
return NULL; /* ENOMEM */
bio->bi_bdev = bio_src->bi_bdev;
bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
bio->bi_rw = bio_src->bi_rw;
bio->bi_flags |= 1 << BIO_CLONED;
/*
* Copy over our part of the bio_vec, then update the first
* and last (or only) entries.
*/
memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
vcnt * sizeof (struct bio_vec));
bio->bi_io_vec[0].bv_offset += voff;
if (vcnt > 1) {
bio->bi_io_vec[0].bv_len -= voff;
bio->bi_io_vec[vcnt - 1].bv_len = resid;
} else {
bio->bi_io_vec[0].bv_len = len;
} }
while (old_chain && (total < len)) { bio->bi_vcnt = vcnt;
struct bio *tmp; bio->bi_size = len;
bio->bi_idx = 0;
tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); return bio;
if (!tmp) }
goto err_out;
gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
if (total + old_chain->bi_size > len) { /*
struct bio_pair *bp; * Clone a portion of a bio chain, starting at the given byte offset
* into the first bio in the source chain and continuing for the
* number of bytes indicated. The result is another bio chain of
* exactly the given length, or a null pointer on error.
*
* The bio_src and offset parameters are both in-out. On entry they
* refer to the first source bio and the offset into that bio where
* the start of data to be cloned is located.
*
* On return, bio_src is updated to refer to the bio in the source
* chain that contains first un-cloned byte, and *offset will
* contain the offset of that byte within that bio.
*/
static struct bio *bio_chain_clone_range(struct bio **bio_src,
unsigned int *offset,
unsigned int len,
gfp_t gfpmask)
{
struct bio *bi = *bio_src;
unsigned int off = *offset;
struct bio *chain = NULL;
struct bio **end;
/* /* Build up a chain of clone bios up to the limit */
* this split can only happen with a single paged bio,
* split_bio will BUG_ON if this is not the case
*/
dout("bio_chain_clone split! total=%d remaining=%d"
"bi_size=%u\n",
total, len - total, old_chain->bi_size);
/* split the bio. We'll release it either in the next if (!bi || off >= bi->bi_size || !len)
call, or it will have to be released outside */ return NULL; /* Nothing to clone */
bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
if (!bp)
goto err_out;
__bio_clone(tmp, &bp->bio1); end = &chain;
while (len) {
unsigned int bi_size;
struct bio *bio;
*next = &bp->bio2; if (!bi)
} else { goto out_err; /* EINVAL; ran out of bio's */
__bio_clone(tmp, old_chain); bi_size = min_t(unsigned int, bi->bi_size - off, len);
*next = old_chain->bi_next; bio = bio_clone_range(bi, off, bi_size, gfpmask);
if (!bio)
goto out_err; /* ENOMEM */
*end = bio;
end = &bio->bi_next;
off += bi_size;
if (off == bi->bi_size) {
bi = bi->bi_next;
off = 0;
} }
len -= bi_size;
tmp->bi_bdev = NULL;
tmp->bi_next = NULL;
if (new_chain)
tail->bi_next = tmp;
else
new_chain = tmp;
tail = tmp;
old_chain = old_chain->bi_next;
total += tmp->bi_size;
} }
*bio_src = bi;
*offset = off;
rbd_assert(total == len); return chain;
out_err:
bio_chain_put(chain);
*old = old_chain;
return new_chain;
err_out:
dout("bio_chain_clone with err\n");
bio_chain_put(new_chain);
return NULL; return NULL;
} }
@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq,
req_data->coll_index = coll_index; req_data->coll_index = coll_index;
} }
dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
(unsigned long long) ofs, (unsigned long long) len); object_name, (unsigned long long) ofs,
(unsigned long long) len, coll, coll_index);
osdc = &rbd_dev->rbd_client->client->osdc; osdc = &rbd_dev->rbd_client->client->osdc;
req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q)
{ {
struct rbd_device *rbd_dev = q->queuedata; struct rbd_device *rbd_dev = q->queuedata;
struct request *rq; struct request *rq;
struct bio_pair *bp = NULL;
while ((rq = blk_fetch_request(q))) { while ((rq = blk_fetch_request(q))) {
struct bio *bio; struct bio *bio;
struct bio *rq_bio, *next_bio = NULL;
bool do_write; bool do_write;
unsigned int size; unsigned int size;
u64 op_size = 0;
u64 ofs; u64 ofs;
int num_segs, cur_seg = 0; int num_segs, cur_seg = 0;
struct rbd_req_coll *coll; struct rbd_req_coll *coll;
struct ceph_snap_context *snapc; struct ceph_snap_context *snapc;
unsigned int bio_offset;
dout("fetched request\n"); dout("fetched request\n");
@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q)
/* deduce our operation (read, write) */ /* deduce our operation (read, write) */
do_write = (rq_data_dir(rq) == WRITE); do_write = (rq_data_dir(rq) == WRITE);
size = blk_rq_bytes(rq);
ofs = blk_rq_pos(rq) * SECTOR_SIZE;
rq_bio = rq->bio;
if (do_write && rbd_dev->mapping.read_only) { if (do_write && rbd_dev->mapping.read_only) {
__blk_end_request_all(rq, -EROFS); __blk_end_request_all(rq, -EROFS);
continue; continue;
@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q)
up_read(&rbd_dev->header_rwsem); up_read(&rbd_dev->header_rwsem);
size = blk_rq_bytes(rq);
ofs = blk_rq_pos(rq) * SECTOR_SIZE;
bio = rq->bio;
dout("%s 0x%x bytes at 0x%llx\n", dout("%s 0x%x bytes at 0x%llx\n",
do_write ? "write" : "read", do_write ? "write" : "read",
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q)
continue; continue;
} }
bio_offset = 0;
do { do {
/* a bio clone to be passed down to OSD req */ u64 limit = rbd_segment_length(rbd_dev, ofs, size);
unsigned int chain_size;
struct bio *bio_chain;
BUG_ON(limit > (u64) UINT_MAX);
chain_size = (unsigned int) limit;
dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
op_size = rbd_segment_length(rbd_dev, ofs, size);
kref_get(&coll->kref); kref_get(&coll->kref);
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
op_size, GFP_ATOMIC); /* Pass a cloned bio chain via an osd request */
if (bio)
bio_chain = bio_chain_clone_range(&bio,
&bio_offset, chain_size,
GFP_ATOMIC);
if (bio_chain)
(void) rbd_do_op(rq, rbd_dev, snapc, (void) rbd_do_op(rq, rbd_dev, snapc,
ofs, op_size, ofs, chain_size,
bio, coll, cur_seg); bio_chain, coll, cur_seg);
else else
rbd_coll_end_req_index(rq, coll, cur_seg, rbd_coll_end_req_index(rq, coll, cur_seg,
-ENOMEM, op_size); -ENOMEM, chain_size);
size -= op_size; size -= chain_size;
ofs += op_size; ofs += chain_size;
cur_seg++; cur_seg++;
rq_bio = next_bio;
} while (size > 0); } while (size > 0);
kref_put(&coll->kref, rbd_coll_release); kref_put(&coll->kref, rbd_coll_release);
if (bp)
bio_pair_release(bp);
spin_lock_irq(q->queue_lock); spin_lock_irq(q->queue_lock);
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc);
@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q)
/* /*
* a queue callback. Makes sure that we don't create a bio that spans across * a queue callback. Makes sure that we don't create a bio that spans across
* multiple osd objects. One exception would be with a single page bios, * multiple osd objects. One exception would be with a single page bios,
* which we handle later at bio_chain_clone * which we handle later at bio_chain_clone_range()
*/ */
static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
struct bio_vec *bvec) struct bio_vec *bvec)