ceph: convert inline data to normal data before data write
Before any data write, convert inline data to normal data and set i_inline_version to CEPH_INLINE_NONE. The OSD request that saves inline data to object contains 3 operations (CMPXATTR, WRITE and SETXATTR). It compares a xattr named 'inline_version' to prevent old data overwrites newer data. Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
parent
83701246ae
commit
28127bdd2f
148
fs/ceph/addr.c
148
fs/ceph/addr.c
|
@ -1313,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
|
|||
size_t len;
|
||||
int want, got, ret;
|
||||
|
||||
if (ci->i_inline_version != CEPH_INLINE_NONE) {
|
||||
struct page *locked_page = NULL;
|
||||
if (off == 0) {
|
||||
lock_page(page);
|
||||
locked_page = page;
|
||||
}
|
||||
ret = ceph_uninline_data(vma->vm_file, locked_page);
|
||||
if (locked_page)
|
||||
unlock_page(locked_page);
|
||||
if (ret < 0)
|
||||
return VM_FAULT_SIGBUS;
|
||||
}
|
||||
|
||||
if (off + PAGE_CACHE_SIZE <= size)
|
||||
len = PAGE_CACHE_SIZE;
|
||||
else
|
||||
|
@ -1361,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
|
|||
ret = VM_FAULT_SIGBUS;
|
||||
}
|
||||
out:
|
||||
if (ret != VM_FAULT_LOCKED) {
|
||||
if (ret != VM_FAULT_LOCKED)
|
||||
unlock_page(page);
|
||||
} else {
|
||||
if (ret == VM_FAULT_LOCKED ||
|
||||
ci->i_inline_version != CEPH_INLINE_NONE) {
|
||||
int dirty;
|
||||
spin_lock(&ci->i_ceph_lock);
|
||||
ci->i_inline_version = CEPH_INLINE_NONE;
|
||||
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
|
||||
spin_unlock(&ci->i_ceph_lock);
|
||||
if (dirty)
|
||||
|
@ -1422,6 +1437,135 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
|
|||
}
|
||||
}
|
||||
|
||||
int ceph_uninline_data(struct file *filp, struct page *locked_page)
|
||||
{
|
||||
struct inode *inode = file_inode(filp);
|
||||
struct ceph_inode_info *ci = ceph_inode(inode);
|
||||
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
|
||||
struct ceph_osd_request *req;
|
||||
struct page *page = NULL;
|
||||
u64 len, inline_version;
|
||||
int err = 0;
|
||||
bool from_pagecache = false;
|
||||
|
||||
spin_lock(&ci->i_ceph_lock);
|
||||
inline_version = ci->i_inline_version;
|
||||
spin_unlock(&ci->i_ceph_lock);
|
||||
|
||||
dout("uninline_data %p %llx.%llx inline_version %llu\n",
|
||||
inode, ceph_vinop(inode), inline_version);
|
||||
|
||||
if (inline_version == 1 || /* initial version, no data */
|
||||
inline_version == CEPH_INLINE_NONE)
|
||||
goto out;
|
||||
|
||||
if (locked_page) {
|
||||
page = locked_page;
|
||||
WARN_ON(!PageUptodate(page));
|
||||
} else if (ceph_caps_issued(ci) &
|
||||
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
|
||||
page = find_get_page(inode->i_mapping, 0);
|
||||
if (page) {
|
||||
if (PageUptodate(page)) {
|
||||
from_pagecache = true;
|
||||
lock_page(page);
|
||||
} else {
|
||||
page_cache_release(page);
|
||||
page = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (page) {
|
||||
len = i_size_read(inode);
|
||||
if (len > PAGE_CACHE_SIZE)
|
||||
len = PAGE_CACHE_SIZE;
|
||||
} else {
|
||||
page = __page_cache_alloc(GFP_NOFS);
|
||||
if (!page) {
|
||||
err = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
err = __ceph_do_getattr(inode, page,
|
||||
CEPH_STAT_CAP_INLINE_DATA, true);
|
||||
if (err < 0) {
|
||||
/* no inline data */
|
||||
if (err == -ENODATA)
|
||||
err = 0;
|
||||
goto out;
|
||||
}
|
||||
len = err;
|
||||
}
|
||||
|
||||
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
|
||||
ceph_vino(inode), 0, &len, 0, 1,
|
||||
CEPH_OSD_OP_CREATE,
|
||||
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
|
||||
ci->i_snap_realm->cached_context,
|
||||
0, 0, false);
|
||||
if (IS_ERR(req)) {
|
||||
err = PTR_ERR(req);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
|
||||
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
|
||||
if (!err)
|
||||
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
|
||||
ceph_osdc_put_request(req);
|
||||
if (err < 0)
|
||||
goto out;
|
||||
|
||||
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
|
||||
ceph_vino(inode), 0, &len, 1, 3,
|
||||
CEPH_OSD_OP_WRITE,
|
||||
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
|
||||
ci->i_snap_realm->cached_context,
|
||||
ci->i_truncate_seq, ci->i_truncate_size,
|
||||
false);
|
||||
if (IS_ERR(req)) {
|
||||
err = PTR_ERR(req);
|
||||
goto out;
|
||||
}
|
||||
|
||||
osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
|
||||
|
||||
err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
|
||||
"inline_version", &inline_version,
|
||||
sizeof(inline_version),
|
||||
CEPH_OSD_CMPXATTR_OP_GT,
|
||||
CEPH_OSD_CMPXATTR_MODE_U64);
|
||||
if (err)
|
||||
goto out_put;
|
||||
|
||||
err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
|
||||
"inline_version", &inline_version,
|
||||
sizeof(inline_version), 0, 0);
|
||||
if (err)
|
||||
goto out_put;
|
||||
|
||||
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
|
||||
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
|
||||
if (!err)
|
||||
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
|
||||
out_put:
|
||||
ceph_osdc_put_request(req);
|
||||
if (err == -ECANCELED)
|
||||
err = 0;
|
||||
out:
|
||||
if (page && page != locked_page) {
|
||||
if (from_pagecache) {
|
||||
unlock_page(page);
|
||||
page_cache_release(page);
|
||||
} else
|
||||
__free_pages(page, 0);
|
||||
}
|
||||
|
||||
dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
|
||||
inode, ceph_vinop(inode), inline_version, err);
|
||||
return err;
|
||||
}
|
||||
|
||||
static struct vm_operations_struct ceph_vmops = {
|
||||
.fault = ceph_filemap_fault,
|
||||
.page_mkwrite = ceph_page_mkwrite,
|
||||
|
|
|
@ -963,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
|
|||
if (err)
|
||||
goto out;
|
||||
|
||||
if (ci->i_inline_version != CEPH_INLINE_NONE) {
|
||||
err = ceph_uninline_data(file, NULL);
|
||||
if (err < 0)
|
||||
goto out;
|
||||
}
|
||||
|
||||
retry_snap:
|
||||
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
|
||||
err = -ENOSPC;
|
||||
|
@ -1024,6 +1030,7 @@ retry_snap:
|
|||
if (written >= 0) {
|
||||
int dirty;
|
||||
spin_lock(&ci->i_ceph_lock);
|
||||
ci->i_inline_version = CEPH_INLINE_NONE;
|
||||
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
|
||||
spin_unlock(&ci->i_ceph_lock);
|
||||
if (dirty)
|
||||
|
@ -1269,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
|
|||
goto unlock;
|
||||
}
|
||||
|
||||
if (ci->i_inline_version != CEPH_INLINE_NONE) {
|
||||
ret = ceph_uninline_data(file, NULL);
|
||||
if (ret < 0)
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
size = i_size_read(inode);
|
||||
if (!(mode & FALLOC_FL_KEEP_SIZE))
|
||||
endoff = offset + length;
|
||||
|
@ -1295,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
|
|||
|
||||
if (!ret) {
|
||||
spin_lock(&ci->i_ceph_lock);
|
||||
ci->i_inline_version = CEPH_INLINE_NONE;
|
||||
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
|
||||
spin_unlock(&ci->i_ceph_lock);
|
||||
if (dirty)
|
||||
|
|
|
@ -888,7 +888,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
|
|||
extern int ceph_release(struct inode *inode, struct file *filp);
|
||||
extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
|
||||
char *data, size_t len);
|
||||
|
||||
int ceph_uninline_data(struct file *filp, struct page *locked_page);
|
||||
/* dir.c */
|
||||
extern const struct file_operations ceph_dir_fops;
|
||||
extern const struct inode_operations ceph_dir_iops;
|
||||
|
|
Loading…
Reference in New Issue