diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile index e5fc3cfea0ab..9dcfc2fe3332 100644 --- a/fs/btrfs/Makefile +++ b/fs/btrfs/Makefile @@ -6,7 +6,7 @@ btrfs-y := super.o ctree.o extent-tree.o print-tree.o root-tree.o dir-item.o \ hash.o file-item.o inode-item.o inode-map.o disk-io.o \ transaction.o bit-radix.o inode.o file.o tree-defrag.o \ extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \ - extent_io.o volumes.o + extent_io.o volumes.o async-thread.o btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o else diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c new file mode 100644 index 000000000000..2911b67bd6f7 --- /dev/null +++ b/fs/btrfs/async-thread.c @@ -0,0 +1,288 @@ +/* + * Copyright (C) 2007 Oracle. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#include +#include +#include +#include +#include "async-thread.h" + +/* + * container for the kthread task pointer and the list of pending work + * One of these is allocated per thread. + */ +struct btrfs_worker_thread { + /* list of struct btrfs_work that are waiting for service */ + struct list_head pending; + + /* list of worker threads from struct btrfs_workers */ + struct list_head worker_list; + + /* kthread */ + struct task_struct *task; + + /* number of things on the pending list */ + atomic_t num_pending; + + /* protects the pending list. */ + spinlock_t lock; + + /* set to non-zero when this thread is already awake and kicking */ + int working; +}; + +/* + * main loop for servicing work items + */ +static int worker_loop(void *arg) +{ + struct btrfs_worker_thread *worker = arg; + struct list_head *cur; + struct btrfs_work *work; + do { + spin_lock_irq(&worker->lock); + while(!list_empty(&worker->pending)) { + cur = worker->pending.next; + work = list_entry(cur, struct btrfs_work, list); + list_del(&work->list); + clear_bit(0, &work->flags); + + work->worker = worker; + spin_unlock_irq(&worker->lock); + + work->func(work); + + atomic_dec(&worker->num_pending); + spin_lock_irq(&worker->lock); + } + worker->working = 0; + if (freezing(current)) { + refrigerator(); + } else { + set_current_state(TASK_INTERRUPTIBLE); + spin_unlock_irq(&worker->lock); + schedule(); + __set_current_state(TASK_RUNNING); + } + } while (!kthread_should_stop()); + return 0; +} + +/* + * this will wait for all the worker threads to shutdown + */ +int btrfs_stop_workers(struct btrfs_workers *workers) +{ + struct list_head *cur; + struct btrfs_worker_thread *worker; + + while(!list_empty(&workers->worker_list)) { + cur = workers->worker_list.next; + worker = list_entry(cur, struct btrfs_worker_thread, + worker_list); + kthread_stop(worker->task); + list_del(&worker->worker_list); + kfree(worker); + } + return 0; +} + +/* + * simple init on struct btrfs_workers + */ +void btrfs_init_workers(struct btrfs_workers *workers, int max) +{ + workers->num_workers = 0; + INIT_LIST_HEAD(&workers->worker_list); + workers->last = NULL; + spin_lock_init(&workers->lock); + workers->max_workers = max; +} + +/* + * starts new worker threads. This does not enforce the max worker + * count in case you need to temporarily go past it. + */ +int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) +{ + struct btrfs_worker_thread *worker; + int ret = 0; + int i; + + for (i = 0; i < num_workers; i++) { + worker = kzalloc(sizeof(*worker), GFP_NOFS); + if (!worker) { + ret = -ENOMEM; + goto fail; + } + + INIT_LIST_HEAD(&worker->pending); + INIT_LIST_HEAD(&worker->worker_list); + spin_lock_init(&worker->lock); + atomic_set(&worker->num_pending, 0); + worker->task = kthread_run(worker_loop, worker, "btrfs"); + if (IS_ERR(worker->task)) { + ret = PTR_ERR(worker->task); + goto fail; + } + + spin_lock_irq(&workers->lock); + list_add_tail(&worker->worker_list, &workers->worker_list); + workers->last = worker; + workers->num_workers++; + spin_unlock_irq(&workers->lock); + } + return 0; +fail: + btrfs_stop_workers(workers); + return ret; +} + +/* + * run through the list and find a worker thread that doesn't have a lot + * to do right now. This can return null if we aren't yet at the thread + * count limit and all of the threads are busy. + */ +static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) +{ + struct btrfs_worker_thread *worker; + struct list_head *next; + struct list_head *start; + int enforce_min = workers->num_workers < workers->max_workers; + + /* start with the last thread if it isn't busy */ + worker = workers->last; + if (atomic_read(&worker->num_pending) < 64) + goto done; + + next = worker->worker_list.next; + start = &worker->worker_list; + + /* + * check all the workers for someone that is bored. FIXME, do + * something smart here + */ + while(next != start) { + if (next == &workers->worker_list) { + next = workers->worker_list.next; + continue; + } + worker = list_entry(next, struct btrfs_worker_thread, + worker_list); + if (atomic_read(&worker->num_pending) < 64 || !enforce_min) + goto done; + next = next->next; + } + /* + * nobody was bored, if we're already at the max thread count, + * use the last thread + */ + if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) { + return workers->last; + } + return NULL; +done: + workers->last = worker; + return worker; +} + +static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) +{ + struct btrfs_worker_thread *worker; + unsigned long flags; + +again: + spin_lock_irqsave(&workers->lock, flags); + worker = next_worker(workers); + spin_unlock_irqrestore(&workers->lock, flags); + + if (!worker) { + spin_lock_irqsave(&workers->lock, flags); + if (workers->num_workers >= workers->max_workers) { + /* + * we have failed to find any workers, just + * return the force one + */ + worker = list_entry(workers->worker_list.next, + struct btrfs_worker_thread, worker_list); + spin_unlock_irqrestore(&workers->lock, flags); + } else { + spin_unlock_irqrestore(&workers->lock, flags); + /* we're below the limit, start another worker */ + btrfs_start_workers(workers, 1); + goto again; + } + } + return worker; +} + +/* + * btrfs_requeue_work just puts the work item back on the tail of the list + * it was taken from. It is intended for use with long running work functions + * that make some progress and want to give the cpu up for others. + */ +int btrfs_requeue_work(struct btrfs_work *work) +{ + struct btrfs_worker_thread *worker = work->worker; + unsigned long flags; + + if (test_and_set_bit(0, &work->flags)) + goto out; + + spin_lock_irqsave(&worker->lock, flags); + atomic_inc(&worker->num_pending); + list_add_tail(&work->list, &worker->pending); + spin_unlock_irqrestore(&worker->lock, flags); +out: + return 0; +} + +/* + * places a struct btrfs_work into the pending queue of one of the kthreads + */ +int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) +{ + struct btrfs_worker_thread *worker; + unsigned long flags; + int wake = 0; + + /* don't requeue something already on a list */ + if (test_and_set_bit(0, &work->flags)) + goto out; + + worker = find_worker(workers); + + spin_lock_irqsave(&worker->lock, flags); + atomic_inc(&worker->num_pending); + list_add_tail(&work->list, &worker->pending); + + /* + * avoid calling into wake_up_process if this thread has already + * been kicked + */ + if (!worker->working) + wake = 1; + worker->working = 1; + + spin_unlock_irqrestore(&worker->lock, flags); + + if (wake) + wake_up_process(worker->task); +out: + return 0; +} diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h new file mode 100644 index 000000000000..52fc9da0f9e7 --- /dev/null +++ b/fs/btrfs/async-thread.h @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2007 Oracle. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#ifndef __BTRFS_ASYNC_THREAD_ +#define __BTRFS_ASYNC_THREAD_ + +struct btrfs_worker_thread; + +/* + * This is similar to a workqueue, but it is meant to spread the operations + * across all available cpus instead of just the CPU that was used to + * queue the work. There is also some batching introduced to try and + * cut down on context switches. + * + * By default threads are added on demand up to 2 * the number of cpus. + * Changing struct btrfs_workers->max_workers is one way to prevent + * demand creation of kthreads. + * + * the basic model of these worker threads is to embed a btrfs_work + * structure in your own data struct, and use container_of in a + * work function to get back to your data struct. + */ +struct btrfs_work { + /* + * only func should be set to the function you want called + * your work struct is passed as the only arg + */ + void (*func)(struct btrfs_work *work); + + /* + * flags should be set to zero. It is used to make sure the + * struct is only inserted once into the list. + */ + unsigned long flags; + + /* don't touch these */ + struct btrfs_worker_thread *worker; + struct list_head list; +}; + +struct btrfs_workers { + /* current number of running workers */ + int num_workers; + + /* max number of workers allowed. changed by btrfs_start_workers */ + int max_workers; + + /* list with all the work threads */ + struct list_head worker_list; + + /* the last worker thread to have something queued */ + struct btrfs_worker_thread *last; + + /* lock for finding the next worker thread to queue on */ + spinlock_t lock; +}; + +int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work); +int btrfs_start_workers(struct btrfs_workers *workers, int num_workers); +int btrfs_stop_workers(struct btrfs_workers *workers); +void btrfs_init_workers(struct btrfs_workers *workers, int max); +int btrfs_requeue_work(struct btrfs_work *work); +#endif diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h index 49cbc62b42f9..6c91a510c965 100644 --- a/fs/btrfs/ctree.h +++ b/fs/btrfs/ctree.h @@ -30,6 +30,7 @@ #include "bit-radix.h" #include "extent_io.h" #include "extent_map.h" +#include "async-thread.h" struct btrfs_trans_handle; struct btrfs_transaction; @@ -518,13 +519,20 @@ struct btrfs_fs_info { struct list_head hashers; struct list_head dead_roots; struct list_head end_io_work_list; - struct list_head async_submit_work_list; struct work_struct end_io_work; - struct work_struct async_submit_work; spinlock_t end_io_work_lock; - spinlock_t async_submit_work_lock; atomic_t nr_async_submits; + /* + * there is a pool of worker threads for checksumming during writes + * and a pool for checksumming after reads. This is because readers + * can run with FS locks held, and the writers may be waiting for + * those locks. We don't want ordering in the pending list to cause + * deadlocks, and so the two are serviced separately. + */ + struct btrfs_workers workers; + struct btrfs_workers endio_workers; + #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) struct work_struct trans_work; #else diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c index b9a53646ceb2..98ff4fbcb386 100644 --- a/fs/btrfs/disk-io.c +++ b/fs/btrfs/disk-io.c @@ -31,6 +31,7 @@ #include "btrfs_inode.h" #include "volumes.h" #include "print-tree.h" +#include "async-thread.h" #if 0 static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf) @@ -46,8 +47,7 @@ static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf) #endif static struct extent_io_ops btree_extent_io_ops; -static struct workqueue_struct *end_io_workqueue; -static struct workqueue_struct *async_submit_workqueue; +static void end_workqueue_fn(struct btrfs_work *work); struct end_io_wq { struct bio *bio; @@ -57,6 +57,7 @@ struct end_io_wq { int error; int metadata; struct list_head list; + struct btrfs_work work; }; struct async_submit_bio { @@ -66,6 +67,7 @@ struct async_submit_bio { extent_submit_bio_hook_t *submit_bio_hook; int rw; int mirror_num; + struct btrfs_work work; }; struct extent_map *btree_get_extent(struct inode *inode, struct page *page, @@ -389,7 +391,6 @@ static int end_workqueue_bio(struct bio *bio, { struct end_io_wq *end_io_wq = bio->bi_private; struct btrfs_fs_info *fs_info; - unsigned long flags; #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) if (bio->bi_size) @@ -397,11 +398,10 @@ static int end_workqueue_bio(struct bio *bio, #endif fs_info = end_io_wq->info; - spin_lock_irqsave(&fs_info->end_io_work_lock, flags); end_io_wq->error = err; - list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list); - spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); - queue_work(end_io_workqueue, &fs_info->end_io_work); + end_io_wq->work.func = end_workqueue_fn; + end_io_wq->work.flags = 0; + btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work); #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) return 0; @@ -428,6 +428,19 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio, return 0; } +static void run_one_async_submit(struct btrfs_work *work) +{ + struct btrfs_fs_info *fs_info; + struct async_submit_bio *async; + + async = container_of(work, struct async_submit_bio, work); + fs_info = BTRFS_I(async->inode)->root->fs_info; + atomic_dec(&fs_info->nr_async_submits); + async->submit_bio_hook(async->inode, async->rw, async->bio, + async->mirror_num); + kfree(async); +} + int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, int rw, struct bio *bio, int mirror_num, extent_submit_bio_hook_t *submit_bio_hook) @@ -443,13 +456,10 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, async->bio = bio; async->mirror_num = mirror_num; async->submit_bio_hook = submit_bio_hook; - - spin_lock(&fs_info->async_submit_work_lock); - list_add_tail(&async->list, &fs_info->async_submit_work_list); + async->work.func = run_one_async_submit; + async->work.flags = 0; atomic_inc(&fs_info->nr_async_submits); - spin_unlock(&fs_info->async_submit_work_lock); - - queue_work(async_submit_workqueue, &fs_info->async_submit_work); + btrfs_queue_worker(&fs_info->workers, &async->work); return 0; } @@ -462,19 +472,32 @@ static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, offset = bio->bi_sector << 9; + /* + * when we're called for a write, we're already in the async + * submission context. Just jump ingo btrfs_map_bio + */ if (rw & (1 << BIO_RW)) { - return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); + return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, + mirror_num, 0); } + /* + * called for a read, do the setup so that checksum validation + * can happen in the async kernel threads + */ ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1); BUG_ON(ret); - return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); + return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1); } static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, int mirror_num) { + /* + * kthread helpers are used to submit writes so that checksumming + * can happen in parallel across all CPUs + */ if (!(rw & (1 << BIO_RW))) { return __btree_submit_bio_hook(inode, rw, bio, mirror_num); } @@ -1036,95 +1059,40 @@ static int bio_ready_for_csum(struct bio *bio) return ret; } -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -static void btrfs_end_io_csum(void *p) -#else -static void btrfs_end_io_csum(struct work_struct *work) -#endif +/* + * called by the kthread helper functions to finally call the bio end_io + * functions. This is where read checksum verification actually happens + */ +static void end_workqueue_fn(struct btrfs_work *work) { -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) - struct btrfs_fs_info *fs_info = p; -#else - struct btrfs_fs_info *fs_info = container_of(work, - struct btrfs_fs_info, - end_io_work); -#endif - unsigned long flags; - struct end_io_wq *end_io_wq; struct bio *bio; - struct list_head *next; + struct end_io_wq *end_io_wq; + struct btrfs_fs_info *fs_info; int error; - int was_empty; - while(1) { - spin_lock_irqsave(&fs_info->end_io_work_lock, flags); - if (list_empty(&fs_info->end_io_work_list)) { - spin_unlock_irqrestore(&fs_info->end_io_work_lock, - flags); - return; - } - next = fs_info->end_io_work_list.next; - list_del(next); - spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); + end_io_wq = container_of(work, struct end_io_wq, work); + bio = end_io_wq->bio; + fs_info = end_io_wq->info; - end_io_wq = list_entry(next, struct end_io_wq, list); - - bio = end_io_wq->bio; - if (end_io_wq->metadata && !bio_ready_for_csum(bio)) { - spin_lock_irqsave(&fs_info->end_io_work_lock, flags); - was_empty = list_empty(&fs_info->end_io_work_list); - list_add_tail(&end_io_wq->list, - &fs_info->end_io_work_list); - spin_unlock_irqrestore(&fs_info->end_io_work_lock, - flags); - if (was_empty) - return; - continue; - } - error = end_io_wq->error; - bio->bi_private = end_io_wq->private; - bio->bi_end_io = end_io_wq->end_io; - kfree(end_io_wq); + /* metadata bios are special because the whole tree block must + * be checksummed at once. This makes sure the entire block is in + * ram and up to date before trying to verify things. For + * blocksize <= pagesize, it is basically a noop + */ + if (end_io_wq->metadata && !bio_ready_for_csum(bio)) { + btrfs_queue_worker(&fs_info->endio_workers, + &end_io_wq->work); + return; + } + error = end_io_wq->error; + bio->bi_private = end_io_wq->private; + bio->bi_end_io = end_io_wq->end_io; + kfree(end_io_wq); #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) - bio_endio(bio, bio->bi_size, error); + bio_endio(bio, bio->bi_size, error); #else - bio_endio(bio, error); + bio_endio(bio, error); #endif - } -} - -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -static void btrfs_async_submit_work(void *p) -#else -static void btrfs_async_submit_work(struct work_struct *work) -#endif -{ -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) - struct btrfs_fs_info *fs_info = p; -#else - struct btrfs_fs_info *fs_info = container_of(work, - struct btrfs_fs_info, - async_submit_work); -#endif - struct async_submit_bio *async; - struct list_head *next; - - while(1) { - spin_lock(&fs_info->async_submit_work_lock); - if (list_empty(&fs_info->async_submit_work_list)) { - spin_unlock(&fs_info->async_submit_work_lock); - return; - } - next = fs_info->async_submit_work_list.next; - list_del(next); - atomic_dec(&fs_info->nr_async_submits); - spin_unlock(&fs_info->async_submit_work_lock); - - async = list_entry(next, struct async_submit_bio, list); - async->submit_bio_hook(async->inode, async->rw, async->bio, - async->mirror_num); - kfree(async); - } } struct btrfs_root *open_ctree(struct super_block *sb, @@ -1155,19 +1123,11 @@ struct btrfs_root *open_ctree(struct super_block *sb, err = -ENOMEM; goto fail; } - end_io_workqueue = create_workqueue("btrfs-end-io"); - BUG_ON(!end_io_workqueue); - async_submit_workqueue = create_workqueue("btrfs-async-submit"); - INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS); INIT_LIST_HEAD(&fs_info->trans_list); INIT_LIST_HEAD(&fs_info->dead_roots); INIT_LIST_HEAD(&fs_info->hashers); - INIT_LIST_HEAD(&fs_info->end_io_work_list); - INIT_LIST_HEAD(&fs_info->async_submit_work_list); spin_lock_init(&fs_info->hash_lock); - spin_lock_init(&fs_info->end_io_work_lock); - spin_lock_init(&fs_info->async_submit_work_lock); spin_lock_init(&fs_info->delalloc_lock); spin_lock_init(&fs_info->new_trans_lock); @@ -1222,13 +1182,8 @@ struct btrfs_root *open_ctree(struct super_block *sb, fs_info->do_barriers = 1; #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) - INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info); - INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work, - fs_info); INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info); #else - INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum); - INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work); INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner); #endif BTRFS_I(fs_info->btree_inode)->root = tree_root; @@ -1240,6 +1195,19 @@ struct btrfs_root *open_ctree(struct super_block *sb, mutex_init(&fs_info->trans_mutex); mutex_init(&fs_info->fs_mutex); + /* we need to start all the end_io workers up front because the + * queue work function gets called at interrupt time. The endio + * workers don't normally start IO, so some number of them <= the + * number of cpus is fine. They handle checksumming after a read. + * + * The other worker threads do start IO, so the max is larger than + * the number of CPUs. FIXME, tune this for huge machines + */ + btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2); + btrfs_init_workers(&fs_info->endio_workers, num_online_cpus()); + btrfs_start_workers(&fs_info->workers, 1); + btrfs_start_workers(&fs_info->endio_workers, num_online_cpus()); + #if 0 ret = add_hasher(fs_info, "crc32c"); if (ret) { @@ -1375,6 +1343,8 @@ fail_sb_buffer: extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); fail_iput: iput(fs_info->btree_inode); + btrfs_stop_workers(&fs_info->workers); + btrfs_stop_workers(&fs_info->endio_workers); fail: btrfs_close_devices(fs_info->fs_devices); btrfs_mapping_tree_free(&fs_info->mapping_tree); @@ -1623,16 +1593,10 @@ int close_ctree(struct btrfs_root *root) extent_io_tree_empty_lru(&fs_info->extent_ins); extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); - flush_workqueue(async_submit_workqueue); - flush_workqueue(end_io_workqueue); - truncate_inode_pages(fs_info->btree_inode->i_mapping, 0); - flush_workqueue(async_submit_workqueue); - destroy_workqueue(async_submit_workqueue); - - flush_workqueue(end_io_workqueue); - destroy_workqueue(end_io_workqueue); + btrfs_stop_workers(&fs_info->workers); + btrfs_stop_workers(&fs_info->endio_workers); iput(fs_info->btree_inode); #if 0 diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c index 0f14697becef..7daef8d37006 100644 --- a/fs/btrfs/inode.c +++ b/fs/btrfs/inode.c @@ -359,7 +359,7 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, kfree(sums); - return btrfs_map_bio(root, rw, bio, mirror_num); + return btrfs_map_bio(root, rw, bio, mirror_num, 1); } int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, @@ -383,7 +383,7 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, inode, rw, bio, mirror_num, __btrfs_submit_bio_hook); mapit: - return btrfs_map_bio(root, rw, bio, mirror_num); + return btrfs_map_bio(root, rw, bio, mirror_num, 0); } int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end) diff --git a/fs/btrfs/volumes.c b/fs/btrfs/volumes.c index 722eb4550154..c57458ce6339 100644 --- a/fs/btrfs/volumes.c +++ b/fs/btrfs/volumes.c @@ -27,6 +27,7 @@ #include "transaction.h" #include "print-tree.h" #include "volumes.h" +#include "async-thread.h" struct map_lookup { u64 type; @@ -110,6 +111,101 @@ static struct btrfs_fs_devices *find_fsid(u8 *fsid) return NULL; } +/* + * we try to collect pending bios for a device so we don't get a large + * number of procs sending bios down to the same device. This greatly + * improves the schedulers ability to collect and merge the bios. + * + * But, it also turns into a long list of bios to process and that is sure + * to eventually make the worker thread block. The solution here is to + * make some progress and then put this work struct back at the end of + * the list if the block device is congested. This way, multiple devices + * can make progress from a single worker thread. + */ +int run_scheduled_bios(struct btrfs_device *device) +{ + struct bio *pending; + struct backing_dev_info *bdi; + struct bio *tail; + struct bio *cur; + int again = 0; + unsigned long num_run = 0; + + bdi = device->bdev->bd_inode->i_mapping->backing_dev_info; +loop: + spin_lock(&device->io_lock); + + /* take all the bios off the list at once and process them + * later on (without the lock held). But, remember the + * tail and other pointers so the bios can be properly reinserted + * into the list if we hit congestion + */ + pending = device->pending_bios; + tail = device->pending_bio_tail; + WARN_ON(pending && !tail); + device->pending_bios = NULL; + device->pending_bio_tail = NULL; + + /* + * if pending was null this time around, no bios need processing + * at all and we can stop. Otherwise it'll loop back up again + * and do an additional check so no bios are missed. + * + * device->running_pending is used to synchronize with the + * schedule_bio code. + */ + if (pending) { + again = 1; + device->running_pending = 1; + } else { + again = 0; + device->running_pending = 0; + } + spin_unlock(&device->io_lock); + + while(pending) { + cur = pending; + pending = pending->bi_next; + cur->bi_next = NULL; + atomic_dec(&device->dev_root->fs_info->nr_async_submits); + submit_bio(cur->bi_rw, cur); + num_run++; + + /* + * we made progress, there is more work to do and the bdi + * is now congested. Back off and let other work structs + * run instead + */ + if (pending && num_run && bdi_write_congested(bdi)) { + struct bio *old_head; + + spin_lock(&device->io_lock); + old_head = device->pending_bios; + device->pending_bios = pending; + if (device->pending_bio_tail) + tail->bi_next = old_head; + else + device->pending_bio_tail = tail; + + spin_unlock(&device->io_lock); + btrfs_requeue_work(&device->work); + goto done; + } + } + if (again) + goto loop; +done: + return 0; +} + +void pending_bios_fn(struct btrfs_work *work) +{ + struct btrfs_device *device; + + device = container_of(work, struct btrfs_device, work); + run_scheduled_bios(device); +} + static int device_list_add(const char *path, struct btrfs_super_block *disk_super, u64 devid, struct btrfs_fs_devices **fs_devices_ret) @@ -141,6 +237,7 @@ static int device_list_add(const char *path, return -ENOMEM; } device->devid = devid; + device->work.func = pending_bios_fn; memcpy(device->uuid, disk_super->dev_item.uuid, BTRFS_UUID_SIZE); device->barriers = 1; @@ -925,6 +1022,7 @@ int btrfs_init_new_device(struct btrfs_root *root, char *device_path) } device->barriers = 1; + device->work.func = pending_bios_fn; generate_random_uuid(device->uuid); spin_lock_init(&device->io_lock); device->name = kstrdup(device_path, GFP_NOFS); @@ -1965,8 +2063,61 @@ static int end_bio_multi_stripe(struct bio *bio, #endif } +struct async_sched { + struct bio *bio; + int rw; + struct btrfs_fs_info *info; + struct btrfs_work work; +}; + +/* + * see run_scheduled_bios for a description of why bios are collected for + * async submit. + * + * This will add one bio to the pending list for a device and make sure + * the work struct is scheduled. + */ +int schedule_bio(struct btrfs_root *root, struct btrfs_device *device, + int rw, struct bio *bio) +{ + int should_queue = 1; + + /* don't bother with additional async steps for reads, right now */ + if (!(rw & (1 << BIO_RW))) { + submit_bio(rw, bio); + return 0; + } + + /* + * nr_async_sumbits allows us to reliably return congestion to the + * higher layers. Otherwise, the async bio makes it appear we have + * made progress against dirty pages when we've really just put it + * on a queue for later + */ + atomic_inc(&root->fs_info->nr_async_submits); + bio->bi_next = NULL; + bio->bi_rw |= rw; + + spin_lock(&device->io_lock); + + if (device->pending_bio_tail) + device->pending_bio_tail->bi_next = bio; + + device->pending_bio_tail = bio; + if (!device->pending_bios) + device->pending_bios = bio; + if (device->running_pending) + should_queue = 0; + + spin_unlock(&device->io_lock); + + if (should_queue) + btrfs_queue_worker(&root->fs_info->workers, &device->work); + return 0; +} + int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, - int mirror_num) + int mirror_num, int async_submit) { struct btrfs_mapping_tree *map_tree; struct btrfs_device *dev; @@ -2012,10 +2163,10 @@ int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, dev = multi->stripes[dev_nr].dev; if (dev && dev->bdev) { bio->bi_bdev = dev->bdev; - spin_lock(&dev->io_lock); - dev->total_ios++; - spin_unlock(&dev->io_lock); - submit_bio(rw, bio); + if (async_submit) + schedule_bio(root, dev, rw, bio); + else + submit_bio(rw, bio); } else { bio->bi_bdev = root->fs_info->fs_devices->latest_bdev; bio->bi_sector = logical >> 9; @@ -2054,6 +2205,7 @@ static struct btrfs_device *add_missing_dev(struct btrfs_root *root, device->barriers = 1; device->dev_root = root->fs_info->dev_root; device->devid = devid; + device->work.func = pending_bios_fn; fs_devices->num_devices++; spin_lock_init(&device->io_lock); memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE); diff --git a/fs/btrfs/volumes.h b/fs/btrfs/volumes.h index 4df6b1608f91..48a44f7a9385 100644 --- a/fs/btrfs/volumes.h +++ b/fs/btrfs/volumes.h @@ -20,6 +20,7 @@ #define __BTRFS_VOLUMES_ #include +#include "async-thread.h" struct buffer_head; struct btrfs_device { @@ -27,6 +28,9 @@ struct btrfs_device { struct list_head dev_alloc_list; struct btrfs_root *dev_root; struct buffer_head *pending_io; + struct bio *pending_bios; + struct bio *pending_bio_tail; + int running_pending; u64 generation; int barriers; @@ -36,8 +40,6 @@ struct btrfs_device { struct block_device *bdev; - u64 total_ios; - char *name; /* the internal btrfs device id */ @@ -63,6 +65,8 @@ struct btrfs_device { /* physical drive uuid (or lvm uuid) */ u8 uuid[BTRFS_UUID_SIZE]; + + struct btrfs_work work; }; struct btrfs_fs_devices { @@ -117,7 +121,7 @@ int btrfs_alloc_chunk(struct btrfs_trans_handle *trans, void btrfs_mapping_init(struct btrfs_mapping_tree *tree); void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree); int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, - int mirror_num); + int mirror_num, int async_submit); int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf); int btrfs_open_devices(struct btrfs_fs_devices *fs_devices, int flags, void *holder);