ceph: handle frag mismatch between readdir request and reply

If client has outdated directory fragments information, it may request
readdir an non-existent directory fragment. In this case, the MDS finds
an approximate directory fragment and sends its contents back to the
client. When receiving a reply with fragment that is different than the
requested one, the client need to reset the 'readdir offset'.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Yan, Zheng 2013-09-18 09:44:13 +08:00 committed by Sage Weil
parent 53e879a485
commit 81c6aea527
3 changed files with 25 additions and 5 deletions

View File

@ -352,8 +352,18 @@ more:
} }
/* note next offset and last dentry name */ /* note next offset and last dentry name */
rinfo = &req->r_reply_info;
if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
frag = le32_to_cpu(rinfo->dir_dir->frag);
if (ceph_frag_is_leftmost(frag))
fi->next_offset = 2;
else
fi->next_offset = 0;
off = fi->next_offset;
}
fi->offset = fi->next_offset; fi->offset = fi->next_offset;
fi->last_readdir = req; fi->last_readdir = req;
fi->frag = frag;
if (req->r_reply_info.dir_end) { if (req->r_reply_info.dir_end) {
kfree(fi->last_name); kfree(fi->last_name);
@ -363,7 +373,6 @@ more:
else else
fi->next_offset = 0; fi->next_offset = 0;
} else { } else {
rinfo = &req->r_reply_info;
err = note_last_dentry(fi, err = note_last_dentry(fi,
rinfo->dir_dname[rinfo->dir_nr-1], rinfo->dir_dname[rinfo->dir_nr-1],
rinfo->dir_dname_len[rinfo->dir_nr-1]); rinfo->dir_dname_len[rinfo->dir_nr-1]);

View File

@ -1275,8 +1275,20 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
int err = 0, i; int err = 0, i;
struct inode *snapdir = NULL; struct inode *snapdir = NULL;
struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
u64 frag = le32_to_cpu(rhead->args.readdir.frag);
struct ceph_dentry_info *di; struct ceph_dentry_info *di;
u64 r_readdir_offset = req->r_readdir_offset;
u32 frag = le32_to_cpu(rhead->args.readdir.frag);
if (rinfo->dir_dir &&
le32_to_cpu(rinfo->dir_dir->frag) != frag) {
dout("readdir_prepopulate got new frag %x -> %x\n",
frag, le32_to_cpu(rinfo->dir_dir->frag));
frag = le32_to_cpu(rinfo->dir_dir->frag);
if (ceph_frag_is_leftmost(frag))
r_readdir_offset = 2;
else
r_readdir_offset = 0;
}
if (req->r_aborted) if (req->r_aborted)
return readdir_prepopulate_inodes_only(req, session); return readdir_prepopulate_inodes_only(req, session);
@ -1340,7 +1352,7 @@ retry_lookup:
} }
di = dn->d_fsdata; di = dn->d_fsdata;
di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
/* inode */ /* inode */
if (dn->d_inode) { if (dn->d_inode) {

View File

@ -2238,8 +2238,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) { if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP) && req->r_op == CEPH_MDS_OP_LSSNAP))
rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session); ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation); ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
} }