staging: lustre: lu_object: move retry logic inside htable_lookup

The current retry logic, to wait when a 'dying' object is found,
spans multiple functions.  The process is attached to a waitqueue
and set TASK_UNINTERRUPTIBLE in htable_lookup, and this status
is passed back through lu_object_find_try() to lu_object_find_at()
where schedule() is called and the process is removed from the queue.

This can be simplified by moving all the logic (including
hashtable locking) inside htable_lookup(), which now never returns
EAGAIN.

Note that htable_lookup() is called with the hash bucket lock
held, and will drop and retake it if it needs to schedule.

I made this a 'goto' loop rather than a 'while(1)' loop as the
diff is easier to read.

Signed-off-by: NeilBrown <neilb@suse.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
This commit is contained in:
NeilBrown 2018-05-07 10:54:48 +10:00 committed by Greg Kroah-Hartman
parent e167b37036
commit bab8b4cc39
1 changed files with 27 additions and 43 deletions

View File

@ -584,16 +584,24 @@ void lu_object_print(const struct lu_env *env, void *cookie,
}
EXPORT_SYMBOL(lu_object_print);
/*
* NOTE: htable_lookup() is called with the relevant
* hash bucket locked, but might drop and re-acquire the lock.
*/
static struct lu_object *htable_lookup(struct lu_site *s,
struct cfs_hash_bd *bd,
const struct lu_fid *f,
wait_queue_entry_t *waiter,
__u64 *version)
{
struct cfs_hash *hs = s->ls_obj_hash;
struct lu_site_bkt_data *bkt;
struct lu_object_header *h;
struct hlist_node *hnode;
__u64 ver = cfs_hash_bd_version_get(bd);
__u64 ver;
wait_queue_entry_t waiter;
retry:
ver = cfs_hash_bd_version_get(bd);
if (*version == ver)
return ERR_PTR(-ENOENT);
@ -626,11 +634,15 @@ static struct lu_object *htable_lookup(struct lu_site *s,
* drained), and moreover, lookup has to wait until object is freed.
*/
init_waitqueue_entry(waiter, current);
add_wait_queue(&bkt->lsb_marche_funebre, waiter);
init_waitqueue_entry(&waiter, current);
add_wait_queue(&bkt->lsb_marche_funebre, &waiter);
set_current_state(TASK_UNINTERRUPTIBLE);
lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
return ERR_PTR(-EAGAIN);
cfs_hash_bd_unlock(hs, bd, 1);
schedule();
remove_wait_queue(&bkt->lsb_marche_funebre, &waiter);
cfs_hash_bd_lock(hs, bd, 1);
goto retry;
}
/**
@ -694,13 +706,14 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
}
/**
* Core logic of lu_object_find*() functions.
* Much like lu_object_find(), but top level device of object is specifically
* \a dev rather than top level device of the site. This interface allows
* objects of different "stacking" to be created within the same site.
*/
static struct lu_object *lu_object_find_try(const struct lu_env *env,
struct lu_device *dev,
const struct lu_fid *f,
const struct lu_object_conf *conf,
wait_queue_entry_t *waiter)
struct lu_object *lu_object_find_at(const struct lu_env *env,
struct lu_device *dev,
const struct lu_fid *f,
const struct lu_object_conf *conf)
{
struct lu_object *o;
struct lu_object *shadow;
@ -726,8 +739,6 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
* It is unnecessary to perform lookup-alloc-lookup-insert, instead,
* just alloc and insert directly.
*
* If dying object is found during index search, add @waiter to the
* site wait-queue and return ERR_PTR(-EAGAIN).
*/
if (conf && conf->loc_flags & LOC_F_NEW)
return lu_object_new(env, dev, f, conf);
@ -735,8 +746,9 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
s = dev->ld_site;
hs = s->ls_obj_hash;
cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
o = htable_lookup(s, &bd, f, waiter, &version);
o = htable_lookup(s, &bd, f, &version);
cfs_hash_bd_unlock(hs, &bd, 1);
if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
return o;
@ -752,7 +764,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
cfs_hash_bd_lock(hs, &bd, 1);
shadow = htable_lookup(s, &bd, f, waiter, &version);
shadow = htable_lookup(s, &bd, f, &version);
if (likely(PTR_ERR(shadow) == -ENOENT)) {
cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
cfs_hash_bd_unlock(hs, &bd, 1);
@ -767,34 +779,6 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
lu_object_free(env, o);
return shadow;
}
/**
* Much like lu_object_find(), but top level device of object is specifically
* \a dev rather than top level device of the site. This interface allows
* objects of different "stacking" to be created within the same site.
*/
struct lu_object *lu_object_find_at(const struct lu_env *env,
struct lu_device *dev,
const struct lu_fid *f,
const struct lu_object_conf *conf)
{
wait_queue_head_t *wq;
struct lu_object *obj;
wait_queue_entry_t wait;
while (1) {
obj = lu_object_find_try(env, dev, f, conf, &wait);
if (obj != ERR_PTR(-EAGAIN))
return obj;
/*
* lu_object_find_try() already added waiter into the
* wait queue.
*/
schedule();
wq = lu_site_wq_from_fid(dev->ld_site, (void *)f);
remove_wait_queue(wq, &wait);
}
}
EXPORT_SYMBOL(lu_object_find_at);
/**