diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index 1bc86ddac38b..4b97f30bb7c6 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -740,18 +740,6 @@ enum epoch_event { EV_CLEANUP = 32, /* used as flag */ }; -struct drbd_epoch_entry { - struct drbd_work w; - struct drbd_conf *mdev; - struct bio *private_bio; - struct hlist_node colision; - sector_t sector; - unsigned int size; - unsigned int flags; - struct drbd_epoch *epoch; - u64 block_id; -}; - struct drbd_wq_barrier { struct drbd_work w; struct completion done; @@ -762,17 +750,49 @@ struct digest_info { void *digest; }; -/* ee flag bits */ +struct drbd_epoch_entry { + struct drbd_work w; + struct hlist_node colision; + struct drbd_epoch *epoch; + struct drbd_conf *mdev; + struct page *pages; + atomic_t pending_bios; + unsigned int size; + /* see comments on ee flag bits below */ + unsigned long flags; + sector_t sector; + u64 block_id; +}; + +/* ee flag bits. + * While corresponding bios are in flight, the only modification will be + * set_bit WAS_ERROR, which has to be atomic. + * If no bios are in flight yet, or all have been completed, + * non-atomic modification to ee->flags is ok. + */ enum { __EE_CALL_AL_COMPLETE_IO, - __EE_CONFLICT_PENDING, __EE_MAY_SET_IN_SYNC, + + /* This epoch entry closes an epoch using a barrier. + * On sucessful completion, the epoch is released, + * and the P_BARRIER_ACK send. */ __EE_IS_BARRIER, + + /* In case a barrier failed, + * we need to resubmit without the barrier flag. */ + __EE_RESUBMITTED, + + /* we may have several bios per epoch entry. + * if any of those fail, we set this flag atomically + * from the endio callback */ + __EE_WAS_ERROR, }; #define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO) -#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING) #define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC) #define EE_IS_BARRIER (1<<__EE_IS_BARRIER) +#define EE_RESUBMITTED (1<<__EE_RESUBMITTED) +#define EE_WAS_ERROR (1<<__EE_WAS_ERROR) /* global flag bits */ enum { @@ -1441,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev) } -extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); +extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); +extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *); /* worker callbacks */ extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int); extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int); @@ -1465,6 +1486,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int); extern void resync_timer_fn(unsigned long data); /* drbd_receiver.c */ +extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, + const unsigned rw, const int fault_type); extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list); extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, u64 id, @@ -1620,6 +1643,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev, * inline helper functions *************************/ +/* see also page_chain_add and friends in drbd_receiver.c */ +static inline struct page *page_chain_next(struct page *page) +{ + return (struct page *)page_private(page); +} +#define page_chain_for_each(page) \ + for (; page && ({ prefetch(page_chain_next(page)); 1; }); \ + page = page_chain_next(page)) +#define page_chain_for_each_safe(page, n) \ + for (; page && ({ n = page_chain_next(page); 1; }); page = n) + +static inline int drbd_bio_has_active_page(struct bio *bio) +{ + struct bio_vec *bvec; + int i; + + __bio_for_each_segment(bvec, bio, i, 0) { + if (page_count(bvec->bv_page) > 1) + return 1; + } + + return 0; +} + +static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e) +{ + struct page *page = e->pages; + page_chain_for_each(page) { + if (page_count(page) > 1) + return 1; + } + return 0; +} + + static inline void drbd_state_lock(struct drbd_conf *mdev) { wait_event(mdev->misc_wait, diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 3aa0add1c230..d0fabace1452 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -2354,6 +2354,19 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio) return 1; } +static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) +{ + struct page *page = e->pages; + unsigned len = e->size; + page_chain_for_each(page) { + unsigned l = min_t(unsigned, len, PAGE_SIZE); + if (!_drbd_send_page(mdev, page, 0, l)) + return 0; + len -= l; + } + return 1; +} + static void consider_delay_probes(struct drbd_conf *mdev) { if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93) @@ -2430,7 +2443,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req) drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE)); if (ok && dgs) { dgb = mdev->int_dig_out; - drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); + drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); } if (ok) { @@ -2483,11 +2496,11 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd, sizeof(p), MSG_MORE); if (ok && dgs) { dgb = mdev->int_dig_out; - drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb); + drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb); ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); } if (ok) - ok = _drbd_send_zc_bio(mdev, e->private_bio); + ok = _drbd_send_zc_ee(mdev, e); drbd_put_data_sock(mdev); diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c index 93d150661f4b..28ef76bd5230 100644 --- a/drivers/block/drbd/drbd_nl.c +++ b/drivers/block/drbd/drbd_nl.c @@ -2215,9 +2215,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev, { struct cn_msg *cn_reply; struct drbd_nl_cfg_reply *reply; - struct bio_vec *bvec; unsigned short *tl; - int i; + struct page *page; + unsigned len; if (!e) return; @@ -2255,11 +2255,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev, put_unaligned(T_ee_data, tl++); put_unaligned(e->size, tl++); - __bio_for_each_segment(bvec, e->private_bio, i, 0) { - void *d = kmap(bvec->bv_page); - memcpy(tl, d + bvec->bv_offset, bvec->bv_len); - kunmap(bvec->bv_page); - tl=(unsigned short*)((char*)tl + bvec->bv_len); + len = e->size; + page = e->pages; + page_chain_for_each(page) { + void *d = kmap_atomic(page, KM_USER0); + unsigned l = min_t(unsigned, len, PAGE_SIZE); + memcpy(tl, d, l); + kunmap_atomic(d, KM_USER0); + tl = (unsigned short*)((char*)tl + l); + len -= l; } put_unaligned(TT_END, tl++); /* Close the tag list */ diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index fee0d249adf7..388a3e8bb0d0 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) -static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) +/* + * some helper functions to deal with single linked page lists, + * page->private being our "next" pointer. + */ + +/* If at least n pages are linked at head, get n pages off. + * Otherwise, don't modify head, and return NULL. + * Locking is the responsibility of the caller. + */ +static struct page *page_chain_del(struct page **head, int n) +{ + struct page *page; + struct page *tmp; + + BUG_ON(!n); + BUG_ON(!head); + + page = *head; + while (page) { + tmp = page_chain_next(page); + if (--n == 0) + break; /* found sufficient pages */ + if (tmp == NULL) + /* insufficient pages, don't use any of them. */ + return NULL; + page = tmp; + } + + /* add end of list marker for the returned list */ + set_page_private(page, 0); + /* actual return value, and adjustment of head */ + page = *head; + *head = tmp; + return page; +} + +/* may be used outside of locks to find the tail of a (usually short) + * "private" page chain, before adding it back to a global chain head + * with page_chain_add() under a spinlock. */ +static struct page *page_chain_tail(struct page *page, int *len) +{ + struct page *tmp; + int i = 1; + while ((tmp = page_chain_next(page))) + ++i, page = tmp; + if (len) + *len = i; + return page; +} + +static int page_chain_free(struct page *page) +{ + struct page *tmp; + int i = 0; + page_chain_for_each_safe(page, tmp) { + put_page(page); + ++i; + } + return i; +} + +static void page_chain_add(struct page **head, + struct page *chain_first, struct page *chain_last) +{ +#if 1 + struct page *tmp; + tmp = page_chain_tail(chain_first, NULL); + BUG_ON(tmp != chain_last); +#endif + + /* add chain to head */ + set_page_private(chain_last, (unsigned long)*head); + *head = chain_first; +} + +static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number) { struct page *page = NULL; + struct page *tmp = NULL; + int i = 0; /* Yes, testing drbd_pp_vacant outside the lock is racy. * So what. It saves a spin_lock. */ - if (drbd_pp_vacant > 0) { + if (drbd_pp_vacant >= number) { spin_lock(&drbd_pp_lock); - page = drbd_pp_pool; - if (page) { - drbd_pp_pool = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - drbd_pp_vacant--; - } + page = page_chain_del(&drbd_pp_pool, number); + if (page) + drbd_pp_vacant -= number; spin_unlock(&drbd_pp_lock); + if (page) + return page; } + /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD, * which in turn might block on the other node at this very place. */ - if (!page) - page = alloc_page(GFP_TRY); - if (page) - atomic_inc(&mdev->pp_in_use); - return page; + for (i = 0; i < number; i++) { + tmp = alloc_page(GFP_TRY); + if (!tmp) + break; + set_page_private(tmp, (unsigned long)page); + page = tmp; + } + + if (i == number) + return page; + + /* Not enough pages immediately available this time. + * No need to jump around here, drbd_pp_alloc will retry this + * function "soon". */ + if (page) { + tmp = page_chain_tail(page, NULL); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); + } + return NULL; } /* kick lower level device, if we have more than (arbitrary number) @@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed list_for_each_safe(le, tle, &mdev->net_ee) { e = list_entry(le, struct drbd_epoch_entry, w.list); - if (drbd_bio_has_active_page(e->private_bio)) + if (drbd_ee_has_active_page(e)) break; list_move(le, to_be_freed); } @@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) } /** - * drbd_pp_alloc() - Returns a page, fails only if a signal comes in + * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled) * @mdev: DRBD device. - * @retry: whether or not to retry allocation forever (or until signalled) + * @number: number of pages requested + * @retry: whether to retry, if not enough pages are available right now * - * Tries to allocate a page, first from our own page pool, then from the - * kernel, unless this allocation would exceed the max_buffers setting. - * If @retry is non-zero, retry until DRBD frees a page somewhere else. + * Tries to allocate number pages, first from our own page pool, then from + * the kernel, unless this allocation would exceed the max_buffers setting. + * Possibly retry until DRBD frees sufficient pages somewhere else. + * + * Returns a page chain linked via page->private. */ -static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) +static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry) { struct page *page = NULL; DEFINE_WAIT(wait); - if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); - if (page) - return page; - } + /* Yes, we may run up to @number over max_buffers. If we + * follow it strictly, the admin will get it wrong anyways. */ + if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) + page = drbd_pp_first_pages_or_try_alloc(mdev, number); - for (;;) { + while (page == NULL) { prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); drbd_kick_lo_and_reclaim_net(mdev); if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); + page = drbd_pp_first_pages_or_try_alloc(mdev, number); if (page) break; } @@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) } finish_wait(&drbd_pp_wait, &wait); + if (page) + atomic_add(number, &mdev->pp_in_use); return page; } /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. - * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ + * Is also used from inside an other spin_lock_irq(&mdev->req_lock); + * Either links the page chain back to the global pool, + * or returns all pages to the system. */ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) { - int free_it; - - spin_lock(&drbd_pp_lock); - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - free_it = 1; - } else { - set_page_private(page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = page; - drbd_pp_vacant++; - free_it = 0; - } - spin_unlock(&drbd_pp_lock); - - atomic_dec(&mdev->pp_in_use); - - if (free_it) - __free_page(page); - - wake_up(&drbd_pp_wait); -} - -static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio) -{ - struct page *p_to_be_freed = NULL; - struct page *page; - struct bio_vec *bvec; int i; - - spin_lock(&drbd_pp_lock); - __bio_for_each_segment(bvec, bio, i, 0) { - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); - p_to_be_freed = bvec->bv_page; - } else { - set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = bvec->bv_page; - drbd_pp_vacant++; - } + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) + i = page_chain_free(page); + else { + struct page *tmp; + tmp = page_chain_tail(page, &i); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); } - spin_unlock(&drbd_pp_lock); - atomic_sub(bio->bi_vcnt, &mdev->pp_in_use); - - while (p_to_be_freed) { - page = p_to_be_freed; - p_to_be_freed = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - put_page(page); - } - + atomic_sub(i, &mdev->pp_in_use); + i = atomic_read(&mdev->pp_in_use); + if (i < 0) + dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); wake_up(&drbd_pp_wait); } @@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, unsigned int data_size, gfp_t gfp_mask) __must_hold(local) { - struct request_queue *q; struct drbd_epoch_entry *e; struct page *page; - struct bio *bio; - unsigned int ds; + unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) return NULL; @@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, return NULL; } - bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); - if (!bio) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a bio failed\n"); - goto fail1; - } + page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT)); + if (!page) + goto fail; - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = sector; - - ds = data_size; - while (ds) { - page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT)); - if (!page) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a page failed\n"); - goto fail2; - } - if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) { - drbd_pp_free(mdev, page); - dev_err(DEV, "alloc_ee: bio_add_page(s=%llu," - "data_size=%u,ds=%u) failed\n", - (unsigned long long)sector, data_size, ds); - - q = bdev_get_queue(bio->bi_bdev); - if (q->merge_bvec_fn) { - struct bvec_merge_data bvm = { - .bi_bdev = bio->bi_bdev, - .bi_sector = bio->bi_sector, - .bi_size = bio->bi_size, - .bi_rw = bio->bi_rw, - }; - int l = q->merge_bvec_fn(q, &bvm, - &bio->bi_io_vec[bio->bi_vcnt]); - dev_err(DEV, "merge_bvec_fn() = %d\n", l); - } - - /* dump more of the bio. */ - dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs); - dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt); - dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size); - dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments); - - goto fail2; - break; - } - ds -= min_t(int, ds, PAGE_SIZE); - } - - D_ASSERT(data_size == bio->bi_size); - - bio->bi_private = e; - e->mdev = mdev; - e->sector = sector; - e->size = bio->bi_size; - - e->private_bio = bio; - e->block_id = id; INIT_HLIST_NODE(&e->colision); e->epoch = NULL; + e->mdev = mdev; + e->pages = page; + atomic_set(&e->pending_bios, 0); + e->size = data_size; e->flags = 0; + e->sector = sector; + e->sector = sector; + e->block_id = id; return e; - fail2: - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); - fail1: + fail: mempool_free(e, drbd_ee_mempool); - return NULL; } void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) { - struct bio *bio = e->private_bio; - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); + drbd_pp_free(mdev, e->pages); + D_ASSERT(atomic_read(&e->pending_bios) == 0); D_ASSERT(hlist_unhashed(&e->colision)); mempool_free(e, drbd_ee_mempool); } @@ -1120,6 +1132,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]); } +/** + * drbd_submit_ee() + * @mdev: DRBD device. + * @e: epoch entry + * @rw: flag field, see bio->bi_rw + */ +/* TODO allocate from our own bio_set. */ +int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, + const unsigned rw, const int fault_type) +{ + struct bio *bios = NULL; + struct bio *bio; + struct page *page = e->pages; + sector_t sector = e->sector; + unsigned ds = e->size; + unsigned n_bios = 0; + unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; + + /* In most cases, we will only need one bio. But in case the lower + * level restrictions happen to be different at this offset on this + * side than those of the sending peer, we may need to submit the + * request in more than one bio. */ +next_bio: + bio = bio_alloc(GFP_NOIO, nr_pages); + if (!bio) { + dev_err(DEV, "submit_ee: Allocation of a bio failed\n"); + goto fail; + } + /* > e->sector, unless this is the first bio */ + bio->bi_sector = sector; + bio->bi_bdev = mdev->ldev->backing_bdev; + /* we special case some flags in the multi-bio case, see below + * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */ + bio->bi_rw = rw; + bio->bi_private = e; + bio->bi_end_io = drbd_endio_sec; + + bio->bi_next = bios; + bios = bio; + ++n_bios; + + page_chain_for_each(page) { + unsigned len = min_t(unsigned, ds, PAGE_SIZE); + if (!bio_add_page(bio, page, len, 0)) { + /* a single page must always be possible! */ + BUG_ON(bio->bi_vcnt == 0); + goto next_bio; + } + ds -= len; + sector += len >> 9; + --nr_pages; + } + D_ASSERT(page == NULL); + D_ASSERT(ds == 0); + + atomic_set(&e->pending_bios, n_bios); + do { + bio = bios; + bios = bios->bi_next; + bio->bi_next = NULL; + + /* strip off BIO_RW_UNPLUG unless it is the last bio */ + if (bios) + bio->bi_rw &= ~(1<bi_next) + bios->bi_rw &= ~(1<bi_next; + bio_put(bio); + } + return -ENOMEM; +} + /** * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set * @mdev: DRBD device. @@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) { struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; - struct bio *bio = e->private_bio; - /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) so that we can finish that epoch in drbd_may_finish_epoch(). @@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea if (previous_epoch(mdev, e->epoch)) dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); - /* prepare bio for re-submit, - * re-init volatile members */ /* we still have a local reference, * get_ldev was done in receive_Data. */ - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = e->sector; - bio->bi_size = e->size; - bio->bi_idx = 0; - - bio->bi_flags &= ~(BIO_POOL_MASK - 1); - bio->bi_flags |= 1 << BIO_UPTODATE; - - /* don't know whether this is necessary: */ - bio->bi_phys_segments = 0; - bio->bi_next = NULL; - - /* these should be unchanged: */ - /* bio->bi_end_io = drbd_endio_write_sec; */ - /* bio->bi_vcnt = whatever; */ e->w.cb = e_end_block; - - /* This is no longer a barrier request. */ - bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); - - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); - + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) { + /* drbd_submit_ee fails for one reason only: + * if was not able to allocate sufficient bios. + * requeue, try again later. */ + e->w.cb = w_e_reissue; + drbd_queue_work(&mdev->data.work, &e->w); + } return 1; } @@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __ { const sector_t capacity = drbd_get_capacity(mdev->this_bdev); struct drbd_epoch_entry *e; - struct bio_vec *bvec; struct page *page; - struct bio *bio; - int dgs, ds, i, rr; + int dgs, ds, rr; void *dig_in = mdev->int_dig_in; void *dig_vv = mdev->int_dig_vv; unsigned long *data; @@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __ e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); if (!e) return NULL; - bio = e->private_bio; + ds = data_size; - bio_for_each_segment(bvec, bio, i) { - page = bvec->bv_page; + page = e->pages; + page_chain_for_each(page) { + unsigned len = min_t(int, ds, PAGE_SIZE); data = kmap(page); - rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); + rr = drbd_recv(mdev, data, len); if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { dev_err(DEV, "Fault injection: Corrupting data on receive\n"); data[0] = data[0] ^ (unsigned long)-1; } kunmap(page); - if (rr != min_t(int, ds, PAGE_SIZE)) { + if (rr != len) { drbd_free_ee(mdev, e); dev_warn(DEV, "short read receiving data: read %d expected %d\n", - rr, min_t(int, ds, PAGE_SIZE)); + rr, len); return NULL; } ds -= rr; } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED.\n"); drbd_bcast_ee(mdev, "digest failed", @@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size) if (!data_size) return TRUE; - page = drbd_pp_alloc(mdev, 1); + page = drbd_pp_alloc(mdev, 1, 1); data = kmap(page); while (data_size) { @@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req, } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); return 0; @@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u D_ASSERT(hlist_unhashed(&e->colision)); - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { drbd_set_in_sync(mdev, sector, e->size); ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); } else { @@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si struct drbd_epoch_entry *e; e = read_in_block(mdev, ID_SYNCER, sector, data_size); - if (!e) { - put_ldev(mdev); - return FALSE; - } + if (!e) + goto fail; dec_rs_pending(mdev); - e->private_bio->bi_end_io = drbd_endio_write_sec; - e->private_bio->bi_rw = WRITE; - e->w.cb = e_end_resync_block; - inc_unacked(mdev); /* corresponding dec_unacked() in e_end_resync_block() * respective _drbd_clear_done_ee */ + e->w.cb = e_end_resync_block; + spin_lock_irq(&mdev->req_lock); list_add(&e->w.list, &mdev->sync_ee); spin_unlock_irq(&mdev->req_lock); - drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); - /* accounting done in endio */ + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) + return TRUE; - maybe_kick_lo(mdev); - return TRUE; + drbd_free_ee(mdev, e); +fail: + put_ldev(mdev); + return FALSE; } static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) @@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) } if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { pcmd = (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn <= C_PAUSED_SYNC_T && e->flags & EE_MAY_SET_IN_SYNC) ? @@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_end_io = drbd_endio_write_sec; e->w.cb = e_end_block; spin_lock(&mdev->epoch_lock); @@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) drbd_al_begin_io(mdev, e->sector); } - e->private_bio->bi_rw = rw; - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); - /* accounting done in endio */ - - maybe_kick_lo(mdev); - return TRUE; + if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) + return TRUE; out_interrupted: /* yes, the epoch_size now is imbalanced. @@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_rw = READ; - e->private_bio->bi_end_io = drbd_endio_read_sec; - switch (h->command) { case P_DATA_REQUEST: e->w.cb = w_e_end_data_req; @@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) inc_unacked(mdev); - drbd_generic_make_request(mdev, fault_type, e->private_bio); - maybe_kick_lo(mdev); - - return TRUE; + if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) + return TRUE; out_free_e: kfree(di); @@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev) dev_info(DEV, "net_ee not empty, killed %u entries\n", i); i = atomic_read(&mdev->pp_in_use); if (i) - dev_info(DEV, "pp_in_use = %u, expected 0\n", i); + dev_info(DEV, "pp_in_use = %d, expected 0\n", i); D_ASSERT(list_empty(&mdev->read_ee)); D_ASSERT(list_empty(&mdev->active_ee)); diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index 0bbecf45b485..d771b1e0424b 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca /* defined here: drbd_md_io_complete - drbd_endio_write_sec - drbd_endio_read_sec + drbd_endio_sec drbd_endio_pri * more endio handlers: @@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error) /* reads on behalf of the partner, * "submitted" by the receiver */ -void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) +void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) { unsigned long flags = 0; - struct drbd_epoch_entry *e = NULL; - struct drbd_conf *mdev; - int uptodate = bio_flagged(bio, BIO_UPTODATE); - - e = bio->bi_private; - mdev = e->mdev; - - if (error) - dev_warn(DEV, "read: error=%d s=%llus\n", error, - (unsigned long long)e->sector); - if (!error && !uptodate) { - dev_warn(DEV, "read: setting error to -EIO s=%llus\n", - (unsigned long long)e->sector); - /* strange behavior of some lower level drivers... - * fail the request by clearing the uptodate flag, - * but do not return any error?! */ - error = -EIO; - } + struct drbd_conf *mdev = e->mdev; D_ASSERT(e->block_id != ID_VACANT); @@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) list_del(&e->w.list); if (list_empty(&mdev->read_ee)) wake_up(&mdev->ee_wait); + if (test_bit(__EE_WAS_ERROR, &e->flags)) + __drbd_chk_io_error(mdev, FALSE); spin_unlock_irqrestore(&mdev->req_lock, flags); - drbd_chk_io_error(mdev, error, FALSE); drbd_queue_work(&mdev->data.work, &e->w); put_ldev(mdev); } +static int is_failed_barrier(int ee_flags) +{ + return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) + == (EE_IS_BARRIER|EE_WAS_ERROR); +} + /* writes on behalf of the partner, or resync writes, - * "submitted" by the receiver. - */ -void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) + * "submitted" by the receiver, final stage. */ +static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) { unsigned long flags = 0; - struct drbd_epoch_entry *e = NULL; - struct drbd_conf *mdev; + struct drbd_conf *mdev = e->mdev; sector_t e_sector; int do_wake; int is_syncer_req; int do_al_complete_io; - int uptodate = bio_flagged(bio, BIO_UPTODATE); - int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER); - e = bio->bi_private; - mdev = e->mdev; - - if (error) - dev_warn(DEV, "write: error=%d s=%llus\n", error, - (unsigned long long)e->sector); - if (!error && !uptodate) { - dev_warn(DEV, "write: setting error to -EIO s=%llus\n", - (unsigned long long)e->sector); - /* strange behavior of some lower level drivers... - * fail the request by clearing the uptodate flag, - * but do not return any error?! */ - error = -EIO; - } - - /* error == -ENOTSUPP would be a better test, - * alas it is not reliable */ - if (error && is_barrier && e->flags & EE_IS_BARRIER) { + /* if this is a failed barrier request, disable use of barriers, + * and schedule for resubmission */ + if (is_failed_barrier(e->flags)) { drbd_bump_write_ordering(mdev, WO_bdev_flush); spin_lock_irqsave(&mdev->req_lock, flags); list_del(&e->w.list); + e->flags |= EE_RESUBMITTED; e->w.cb = w_e_reissue; /* put_ldev actually happens below, once we come here again. */ __release(local); @@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) D_ASSERT(e->block_id != ID_VACANT); - spin_lock_irqsave(&mdev->req_lock, flags); - mdev->writ_cnt += e->size >> 9; - is_syncer_req = is_syncer_block_id(e->block_id); - /* after we moved e to done_ee, * we may no longer access it, * it may be freed/reused already! * (as soon as we release the req_lock) */ e_sector = e->sector; do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; + is_syncer_req = is_syncer_block_id(e->block_id); + spin_lock_irqsave(&mdev->req_lock, flags); + mdev->writ_cnt += e->size >> 9; list_del(&e->w.list); /* has been on active_ee or sync_ee */ list_add_tail(&e->w.list, &mdev->done_ee); @@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) ? list_empty(&mdev->sync_ee) : list_empty(&mdev->active_ee); - if (error) + if (test_bit(__EE_WAS_ERROR, &e->flags)) __drbd_chk_io_error(mdev, FALSE); spin_unlock_irqrestore(&mdev->req_lock, flags); @@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) wake_asender(mdev); put_ldev(mdev); +} +/* writes on behalf of the partner, or resync writes, + * "submitted" by the receiver. + */ +void drbd_endio_sec(struct bio *bio, int error) +{ + struct drbd_epoch_entry *e = bio->bi_private; + struct drbd_conf *mdev = e->mdev; + int uptodate = bio_flagged(bio, BIO_UPTODATE); + int is_write = bio_data_dir(bio) == WRITE; + + if (error) + dev_warn(DEV, "%s: error=%d s=%llus\n", + is_write ? "write" : "read", error, + (unsigned long long)e->sector); + if (!error && !uptodate) { + dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", + is_write ? "write" : "read", + (unsigned long long)e->sector); + /* strange behavior of some lower level drivers... + * fail the request by clearing the uptodate flag, + * but do not return any error?! */ + error = -EIO; + } + + if (error) + set_bit(__EE_WAS_ERROR, &e->flags); + + bio_put(bio); /* no need for the bio anymore */ + if (atomic_dec_and_test(&e->pending_bios)) { + if (is_write) + drbd_endio_write_sec_final(e); + else + drbd_endio_read_sec_final(e); + } } /* read, readA or write requests on R_PRIMARY coming from drbd_make_request @@ -295,7 +300,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) return 1; /* Simply ignore this! */ } -void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) +void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) +{ + struct hash_desc desc; + struct scatterlist sg; + struct page *page = e->pages; + struct page *tmp; + unsigned len; + + desc.tfm = tfm; + desc.flags = 0; + + sg_init_table(&sg, 1); + crypto_hash_init(&desc); + + while ((tmp = page_chain_next(page))) { + /* all but the last page will be fully used */ + sg_set_page(&sg, page, PAGE_SIZE, 0); + crypto_hash_update(&desc, &sg, sg.length); + page = tmp; + } + /* and now the last, possibly only partially used page */ + len = e->size & (PAGE_SIZE - 1); + sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); + crypto_hash_update(&desc, &sg, sg.length); + crypto_hash_final(&desc, digest); +} + +void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) { struct hash_desc desc; struct scatterlist sg; @@ -329,11 +361,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel return 1; } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { digest_size = crypto_hash_digestsize(mdev->csums_tfm); digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); inc_rs_pending(mdev); ok = drbd_send_drequest_csum(mdev, @@ -369,23 +401,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) /* GFP_TRY, because if there is no memory available right now, this may * be rescheduled for later. It is "only" background resync, after all. */ e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); - if (!e) { - put_ldev(mdev); - return 2; - } + if (!e) + goto fail; spin_lock_irq(&mdev->req_lock); list_add(&e->w.list, &mdev->read_ee); spin_unlock_irq(&mdev->req_lock); - e->private_bio->bi_end_io = drbd_endio_read_sec; - e->private_bio->bi_rw = READ; e->w.cb = w_e_send_csum; + if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) + return 1; - mdev->read_cnt += size >> 9; - drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio); - - return 1; + drbd_free_ee(mdev, e); +fail: + put_ldev(mdev); + return 2; } void resync_timer_fn(unsigned long data) @@ -819,7 +849,7 @@ out: /* helper */ static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) { - if (drbd_bio_has_active_page(e->private_bio)) { + if (drbd_ee_has_active_page(e)) { /* This might happen if sendpage() has not finished */ spin_lock_irq(&mdev->req_lock); list_add_tail(&e->w.list, &mdev->net_ee); @@ -845,7 +875,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) return 1; } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { ok = drbd_send_block(mdev, P_DATA_REPLY, e); } else { if (__ratelimit(&drbd_ratelimit_state)) @@ -886,7 +916,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) put_ldev(mdev); } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { inc_rs_pending(mdev); ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); @@ -934,7 +964,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) di = (struct digest_info *)(unsigned long)e->block_id; - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { /* quick hack to try to avoid a race against reconfiguration. * a real fix would be much more involved, * introducing more locking mechanisms */ @@ -944,7 +974,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) digest = kmalloc(digest_size, GFP_NOIO); } if (digest) { - drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); eq = !memcmp(digest, di->digest, digest_size); kfree(digest); } @@ -986,14 +1016,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) if (unlikely(cancel)) goto out; - if (unlikely(!drbd_bio_uptodate(e->private_bio))) + if (unlikely((e->flags & EE_WAS_ERROR) != 0)) goto out; digest_size = crypto_hash_digestsize(mdev->verify_tfm); /* FIXME if this allocation fails, online verify will not terminate! */ digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); inc_rs_pending(mdev); ok = drbd_send_drequest_csum(mdev, e->sector, e->size, digest, digest_size, P_OV_REPLY); @@ -1042,11 +1072,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) di = (struct digest_info *)(unsigned long)e->block_id; - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { digest_size = crypto_hash_digestsize(mdev->verify_tfm); digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); D_ASSERT(digest_size == di->digest_size); eq = !memcmp(digest, di->digest, digest_size); diff --git a/drivers/block/drbd/drbd_wrappers.h b/drivers/block/drbd/drbd_wrappers.h index f93fa111ce50..defdb5013ea3 100644 --- a/drivers/block/drbd/drbd_wrappers.h +++ b/drivers/block/drbd/drbd_wrappers.h @@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev, #define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE) -static inline int drbd_bio_has_active_page(struct bio *bio) -{ - struct bio_vec *bvec; - int i; - - __bio_for_each_segment(bvec, bio, i, 0) { - if (page_count(bvec->bv_page) > 1) - return 1; - } - - return 0; -} - /* bi_end_io handlers */ extern void drbd_md_io_complete(struct bio *bio, int error); -extern void drbd_endio_read_sec(struct bio *bio, int error); -extern void drbd_endio_write_sec(struct bio *bio, int error); +extern void drbd_endio_sec(struct bio *bio, int error); extern void drbd_endio_pri(struct bio *bio, int error); /*