From 1d7c484eeb167fc374294e38ae402de4097c8611 Mon Sep 17 00:00:00 2001 From: David Teigland Date: Tue, 15 May 2012 16:07:49 -0500 Subject: [PATCH] dlm: use idr instead of list for recovered rsbs When a large number of resources are being recovered, a linear search of the recover_list takes a long time. Use an idr in place of a list. Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 3 ++ fs/dlm/lockspace.c | 3 ++ fs/dlm/rcom.c | 2 +- fs/dlm/recover.c | 116 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 101 insertions(+), 23 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 3093207a7684..a5f82d5b3946 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -288,6 +288,7 @@ struct dlm_rsb { int res_nodeid; int res_master_nodeid; int res_dir_nodeid; + int res_id; /* for ls_recover_idr */ uint32_t res_lvbseq; uint32_t res_hash; uint32_t res_bucket; /* rsbtbl */ @@ -587,6 +588,8 @@ struct dlm_ls { struct list_head ls_recover_list; spinlock_t ls_recover_list_lock; int ls_recover_list_count; + struct idr ls_recover_idr; + spinlock_t ls_recover_idr_lock; wait_queue_head_t ls_wait_general; struct mutex ls_clear_proc_locks; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 065bb75ed609..d4d3b3165c6c 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -565,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); + idr_init(&ls->ls_recover_idr); + spin_lock_init(&ls->ls_recover_idr_lock); ls->ls_recover_list_count = 0; ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); @@ -636,6 +638,7 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock(&lslist_lock); list_del(&ls->ls_list); spin_unlock(&lslist_lock); + idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); out_lkbfree: idr_destroy(&ls->ls_lkbidr); diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index c8c298d81463..87f1a56eab32 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -325,7 +325,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) if (error) goto out; memcpy(rc->rc_buf, r->res_name, r->res_length); - rc->rc_id = (unsigned long) r; + rc->rc_id = (unsigned long) r->res_id; send_rcom(ls, mh, rc); out: diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 3c025fe49ad3..ff6f27629a0c 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -277,22 +277,6 @@ static void recover_list_del(struct dlm_rsb *r) dlm_put_rsb(r); } -static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id) -{ - struct dlm_rsb *r = NULL; - - spin_lock(&ls->ls_recover_list_lock); - - list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) { - if (id == (unsigned long) r) - goto out; - } - r = NULL; - out: - spin_unlock(&ls->ls_recover_list_lock); - return r; -} - static void recover_list_clear(struct dlm_ls *ls) { struct dlm_rsb *r, *s; @@ -313,6 +297,94 @@ static void recover_list_clear(struct dlm_ls *ls) spin_unlock(&ls->ls_recover_list_lock); } +static int recover_idr_empty(struct dlm_ls *ls) +{ + int empty = 1; + + spin_lock(&ls->ls_recover_idr_lock); + if (ls->ls_recover_list_count) + empty = 0; + spin_unlock(&ls->ls_recover_idr_lock); + + return empty; +} + +static int recover_idr_add(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + int rv, id; + + rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS); + if (!rv) + return -ENOMEM; + + spin_lock(&ls->ls_recover_idr_lock); + if (r->res_id) { + spin_unlock(&ls->ls_recover_idr_lock); + return -1; + } + rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id); + if (rv) { + spin_unlock(&ls->ls_recover_idr_lock); + return rv; + } + r->res_id = id; + ls->ls_recover_list_count++; + dlm_hold_rsb(r); + spin_unlock(&ls->ls_recover_idr_lock); + return 0; +} + +static void recover_idr_del(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + + spin_lock(&ls->ls_recover_idr_lock); + idr_remove(&ls->ls_recover_idr, r->res_id); + r->res_id = 0; + ls->ls_recover_list_count--; + spin_unlock(&ls->ls_recover_idr_lock); + + dlm_put_rsb(r); +} + +static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +{ + struct dlm_rsb *r; + + spin_lock(&ls->ls_recover_idr_lock); + r = idr_find(&ls->ls_recover_idr, (int)id); + spin_unlock(&ls->ls_recover_idr_lock); + return r; +} + +static int recover_idr_clear_rsb(int id, void *p, void *data) +{ + struct dlm_ls *ls = data; + struct dlm_rsb *r = p; + + r->res_id = 0; + r->res_recover_locks_count = 0; + ls->ls_recover_list_count--; + + dlm_put_rsb(r); + return 0; +} + +static void recover_idr_clear(struct dlm_ls *ls) +{ + spin_lock(&ls->ls_recover_idr_lock); + idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls); + idr_remove_all(&ls->ls_recover_idr); + + if (ls->ls_recover_list_count != 0) { + log_error(ls, "warning: recover_list_count %d", + ls->ls_recover_list_count); + ls->ls_recover_list_count = 0; + } + spin_unlock(&ls->ls_recover_idr_lock); +} + /* Master recovery: find new master node for rsb's that were mastered on nodes that have been removed. @@ -408,7 +480,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count) set_new_master(r); error = 0; } else { - recover_list_add(r); + recover_idr_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid); } @@ -493,10 +565,10 @@ int dlm_recover_masters(struct dlm_ls *ls) log_debug(ls, "dlm_recover_masters %u of %u", count, total); - error = dlm_wait_function(ls, &recover_list_empty); + error = dlm_wait_function(ls, &recover_idr_empty); out: if (error) - recover_list_clear(ls); + recover_idr_clear(ls); return error; } @@ -505,7 +577,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) struct dlm_rsb *r; int ret_nodeid, new_master; - r = recover_list_find(ls, rc->rc_id); + r = recover_idr_find(ls, rc->rc_id); if (!r) { log_error(ls, "dlm_recover_master_reply no id %llx", (unsigned long long)rc->rc_id); @@ -524,9 +596,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) r->res_nodeid = new_master; set_new_master(r); unlock_rsb(r); - recover_list_del(r); + recover_idr_del(r); - if (recover_list_empty(ls)) + if (recover_idr_empty(ls)) wake_up(&ls->ls_wait_general); out: return 0;