ceph: pre-allocate ceph_cap struct for ceph_add_cap()

So that ceph_add_cap() can be used while i_ceph_lock is locked.
This simplifies the code that handle cap import/export.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
This commit is contained in:
Yan, Zheng 2014-04-18 09:57:11 +08:00
parent f98a128a55
commit d9df278350
3 changed files with 85 additions and 79 deletions

View File

@ -221,7 +221,7 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
return 0; return 0;
} }
static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx) struct ceph_cap_reservation *ctx)
{ {
struct ceph_cap *cap = NULL; struct ceph_cap *cap = NULL;
@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* it is < 0. (This is so we can atomically add the cap and add an * it is < 0. (This is so we can atomically add the cap and add an
* open file reference to it.) * open file reference to it.)
*/ */
int ceph_add_cap(struct inode *inode, void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id, struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted, int fmode, unsigned issued, unsigned wanted,
unsigned seq, unsigned mseq, u64 realmino, int flags, unsigned seq, unsigned mseq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation) struct ceph_cap **new_cap)
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *new_cap = NULL;
struct ceph_cap *cap; struct ceph_cap *cap;
int mds = session->s_mds; int mds = session->s_mds;
int actual_wanted; int actual_wanted;
@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
if (fmode >= 0) if (fmode >= 0)
wanted |= ceph_caps_for_mode(fmode); wanted |= ceph_caps_for_mode(fmode);
retry:
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); cap = __get_cap_for_mds(ci, mds);
if (!cap) { if (!cap) {
if (new_cap) { cap = *new_cap;
cap = new_cap; *new_cap = NULL;
new_cap = NULL;
} else {
spin_unlock(&ci->i_ceph_lock);
new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
}
cap->issued = 0; cap->issued = 0;
cap->implemented = 0; cap->implemented = 0;
@ -562,9 +551,6 @@ retry:
session->s_nr_caps++; session->s_nr_caps++;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
} else { } else {
if (new_cap)
ceph_put_cap(mdsc, new_cap);
/* /*
* auth mds of the inode changed. we received the cap export * auth mds of the inode changed. we received the cap export
* message, but still haven't received the cap import message. * message, but still haven't received the cap import message.
@ -626,7 +612,6 @@ retry:
ci->i_auth_cap = cap; ci->i_auth_cap = cap;
cap->mds_wanted = wanted; cap->mds_wanted = wanted;
} }
ci->i_cap_exporting_issued = 0;
} else { } else {
WARN_ON(ci->i_auth_cap == cap); WARN_ON(ci->i_auth_cap == cap);
} }
@ -648,9 +633,6 @@ retry:
if (fmode >= 0) if (fmode >= 0)
__ceph_get_fmode(ci, fmode); __ceph_get_fmode(ci, fmode);
spin_unlock(&ci->i_ceph_lock);
wake_up_all(&ci->i_cap_wq);
return 0;
} }
/* /*
@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
*/ */
int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{ {
int have = ci->i_snap_caps | ci->i_cap_exporting_issued; int have = ci->i_snap_caps;
struct ceph_cap *cap; struct ceph_cap *cap;
struct rb_node *p; struct rb_node *p;
@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
*/ */
static int __ceph_is_any_caps(struct ceph_inode_info *ci) static int __ceph_is_any_caps(struct ceph_inode_info *ci)
{ {
return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; return !RB_EMPTY_ROOT(&ci->i_caps);
} }
int ceph_is_any_caps(struct inode *inode) int ceph_is_any_caps(struct inode *inode)
@ -2796,7 +2778,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_session *tsession = NULL; struct ceph_mds_session *tsession = NULL;
struct ceph_cap *cap, *tcap; struct ceph_cap *cap, *tcap, *new_cap = NULL;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 t_cap_id; u64 t_cap_id;
unsigned mseq = le32_to_cpu(ex->migrate_seq); unsigned mseq = le32_to_cpu(ex->migrate_seq);
@ -2858,15 +2840,14 @@ retry:
} }
__ceph_remove_cap(cap, false); __ceph_remove_cap(cap, false);
goto out_unlock; goto out_unlock;
} } else if (tsession) {
if (tsession) {
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
spin_unlock(&ci->i_ceph_lock);
/* add placeholder for the export tagert */ /* add placeholder for the export tagert */
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
t_seq - 1, t_mseq, (u64)-1, flag, NULL); t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
goto retry;
__ceph_remove_cap(cap, false);
goto out_unlock;
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
@ -2885,6 +2866,7 @@ retry:
SINGLE_DEPTH_NESTING); SINGLE_DEPTH_NESTING);
} }
ceph_add_cap_releases(mdsc, tsession); ceph_add_cap_releases(mdsc, tsession);
new_cap = ceph_get_cap(mdsc, NULL);
} else { } else {
WARN_ON(1); WARN_ON(1);
tsession = NULL; tsession = NULL;
@ -2899,6 +2881,8 @@ out_unlock:
mutex_unlock(&tsession->s_mutex); mutex_unlock(&tsession->s_mutex);
ceph_put_mds_session(tsession); ceph_put_mds_session(tsession);
} }
if (new_cap)
ceph_put_cap(mdsc, new_cap);
} }
/* /*
@ -2914,7 +2898,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
void *snaptrace, int snaptrace_len) void *snaptrace, int snaptrace_len)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap; struct ceph_cap *cap, *new_cap = NULL;
int mds = session->s_mds; int mds = session->s_mds;
unsigned issued = le32_to_cpu(im->caps); unsigned issued = le32_to_cpu(im->caps);
unsigned wanted = le32_to_cpu(im->wanted); unsigned wanted = le32_to_cpu(im->wanted);
@ -2936,7 +2920,20 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
inode, ci, mds, mseq, peer); inode, ci, mds, mseq, peer);
retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
if (!new_cap) {
spin_unlock(&ci->i_ceph_lock);
new_cap = ceph_get_cap(mdsc, NULL);
goto retry;
}
}
ceph_add_cap(inode, session, cap_id, -1, issued, wanted, seq, mseq,
realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
if (cap && cap->cap_id == p_cap_id) { if (cap && cap->cap_id == p_cap_id) {
dout(" remove export cap %p mds%d flags %d\n", dout(" remove export cap %p mds%d flags %d\n",
@ -2951,7 +2948,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
cap->mseq, mds, le32_to_cpu(ph->seq), cap->mseq, mds, le32_to_cpu(ph->seq),
le32_to_cpu(ph->mseq)); le32_to_cpu(ph->mseq));
} }
ci->i_cap_exporting_issued = cap->issued;
__ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
} }
@ -2960,16 +2956,17 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
ci->i_requested_max_size = 0; ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
wake_up_all(&ci->i_cap_wq);
down_write(&mdsc->snap_rwsem); down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
false); false);
downgrade_write(&mdsc->snap_rwsem); downgrade_write(&mdsc->snap_rwsem);
ceph_add_cap(inode, session, cap_id, -1,
issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
NULL /* no caps context */);
kick_flushing_inode_caps(mdsc, session, inode); kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
if (new_cap)
ceph_put_cap(mdsc, new_cap);
} }
/* /*

View File

@ -341,7 +341,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_cap_snaps); INIT_LIST_HEAD(&ci->i_cap_snaps);
ci->i_head_snapc = NULL; ci->i_head_snapc = NULL;
ci->i_snap_caps = 0; ci->i_snap_caps = 0;
ci->i_cap_exporting_issued = 0;
for (i = 0; i < CEPH_FILE_MODE_NUM; i++) for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
ci->i_nr_by_mode[i] = 0; ci->i_nr_by_mode[i] = 0;
@ -407,7 +406,7 @@ void ceph_destroy_inode(struct inode *inode)
/* /*
* we may still have a snap_realm reference if there are stray * we may still have a snap_realm reference if there are stray
* caps in i_cap_exporting_issued or i_snap_caps. * caps in i_snap_caps.
*/ */
if (ci->i_snap_realm) { if (ci->i_snap_realm) {
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
@ -582,6 +581,7 @@ static int fill_inode(struct inode *inode,
unsigned long ttl_from, int cap_fmode, unsigned long ttl_from, int cap_fmode,
struct ceph_cap_reservation *caps_reservation) struct ceph_cap_reservation *caps_reservation)
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_reply_inode *info = iinfo->in; struct ceph_mds_reply_inode *info = iinfo->in;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int i; int i;
@ -591,7 +591,9 @@ static int fill_inode(struct inode *inode,
struct ceph_inode_frag *frag; struct ceph_inode_frag *frag;
struct rb_node *rb_node; struct rb_node *rb_node;
struct ceph_buffer *xattr_blob = NULL; struct ceph_buffer *xattr_blob = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0; int err = 0;
bool wake = false;
bool queue_trunc = false; bool queue_trunc = false;
bool new_version = false; bool new_version = false;
@ -599,6 +601,10 @@ static int fill_inode(struct inode *inode,
inode, ceph_vinop(inode), le64_to_cpu(info->version), inode, ceph_vinop(inode), le64_to_cpu(info->version),
ci->i_version); ci->i_version);
/* prealloc new cap struct */
if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
new_cap = ceph_get_cap(mdsc, caps_reservation);
/* /*
* prealloc xattr data, if it looks like we'll need it. only * prealloc xattr data, if it looks like we'll need it. only
* if len > 4 (meaning there are actually xattrs; the first 4 * if len > 4 (meaning there are actually xattrs; the first 4
@ -762,8 +768,37 @@ static int fill_inode(struct inode *inode,
dout(" marking %p complete (empty)\n", inode); dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
} }
/* were we issued a capability? */
if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
cap_fmode,
le32_to_cpu(info->cap.caps),
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm),
info->cap.flags, &new_cap);
wake = true;
} else {
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(le32_to_cpu(info->cap.caps)));
ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode);
}
} else if (cap_fmode >= 0) {
pr_warning("mds issued no caps on %llx.%llx\n",
ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode);
}
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (wake)
wake_up_all(&ci->i_cap_wq);
/* queue truncate if we saw i_size decrease */ /* queue truncate if we saw i_size decrease */
if (queue_trunc) if (queue_trunc)
ceph_queue_vmtruncate(inode); ceph_queue_vmtruncate(inode);
@ -806,41 +841,14 @@ static int fill_inode(struct inode *inode,
} }
mutex_unlock(&ci->i_fragtree_mutex); mutex_unlock(&ci->i_fragtree_mutex);
/* were we issued a capability? */
if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
cap_fmode,
le32_to_cpu(info->cap.caps),
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm),
info->cap.flags,
caps_reservation);
} else {
spin_lock(&ci->i_ceph_lock);
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(le32_to_cpu(info->cap.caps)));
ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode);
spin_unlock(&ci->i_ceph_lock);
}
} else if (cap_fmode >= 0) {
pr_warning("mds issued no caps on %llx.%llx\n",
ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode);
}
/* update delegation info? */ /* update delegation info? */
if (dirinfo) if (dirinfo)
ceph_fill_dirfrag(inode, dirinfo); ceph_fill_dirfrag(inode, dirinfo);
err = 0; err = 0;
out: out:
if (new_cap)
ceph_put_cap(mdsc, new_cap);
if (xattr_blob) if (xattr_blob)
ceph_buffer_put(xattr_blob); ceph_buffer_put(xattr_blob);
return err; return err;

View File

@ -292,7 +292,6 @@ struct ceph_inode_info {
struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
dirty|flushing caps */ dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */ unsigned i_snap_caps; /* cap bits for snapped files */
unsigned i_cap_exporting_issued;
int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
extern const char *ceph_cap_string(int c); extern const char *ceph_cap_string(int c);
extern void ceph_handle_caps(struct ceph_mds_session *session, extern void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
extern int ceph_add_cap(struct inode *inode, extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx);
extern void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id, struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted, int fmode, unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags, unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation); struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
extern void ceph_put_cap(struct ceph_mds_client *mdsc, extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap); struct ceph_cap *cap);