libceph: support the RADOS copy-from operation

Add support for performing remote object copies using the 'copy-from'
operation.

[ Add COPY_FROM to get_num_data_items(). ]

Signed-off-by: Luis Henriques <lhenriques@suse.com>
Reviewed-by: Ilya Dryomov <idryomov@gmail.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Luis Henriques 2018-10-15 16:45:58 +01:00 committed by Ilya Dryomov
parent 2ee9dd958d
commit 23ddf9bea9
3 changed files with 135 additions and 0 deletions

View File

@ -136,6 +136,13 @@ struct ceph_osd_req_op {
u64 expected_object_size; u64 expected_object_size;
u64 expected_write_size; u64 expected_write_size;
} alloc_hint; } alloc_hint;
struct {
u64 snapid;
u64 src_version;
u8 flags;
u32 src_fadvise_flags;
struct ceph_osd_data osd_data;
} copy_from;
}; };
}; };
@ -510,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct timespec64 *mtime, struct timespec64 *mtime,
struct page **pages, int nr_pages); struct page **pages, int nr_pages);
int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 dst_fadvise_flags,
u8 copy_from_flags);
/* watch/notify */ /* watch/notify */
struct ceph_osd_linger_request * struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client *osdc, ceph_osdc_watch(struct ceph_osd_client *osdc,

View File

@ -410,6 +410,14 @@ enum {
enum { enum {
CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */
CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */
CEPH_OSD_OP_FLAG_FADVISE_RANDOM = 0x4, /* the op is random */
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED = 0x10,/* data will be accessed in
the near future */
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED = 0x20,/* data will not be accessed
in the near future */
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE = 0x40,/* data will be accessed only
once by this client */
}; };
#define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/
@ -431,6 +439,15 @@ enum {
CEPH_OSD_CMPXATTR_MODE_U64 = 2 CEPH_OSD_CMPXATTR_MODE_U64 = 2
}; };
enum {
CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1, /* part of a flush operation */
CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */
CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4, /* ignore osd cache logic */
CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to
* cloneid */
CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16, /* order with write */
};
enum { enum {
CEPH_OSD_WATCH_OP_UNWATCH = 0, CEPH_OSD_WATCH_OP_UNWATCH = 0,
CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
@ -497,6 +514,17 @@ struct ceph_osd_op {
__le64 expected_object_size; __le64 expected_object_size;
__le64 expected_write_size; __le64 expected_write_size;
} __attribute__ ((packed)) alloc_hint; } __attribute__ ((packed)) alloc_hint;
struct {
__le64 snapid;
__le64 src_version;
__u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */
/*
* CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags
* for src object, flags for dest object are in
* ceph_osd_op::flags.
*/
__le32 src_fadvise_flags;
} __attribute__ ((packed)) copy_from;
}; };
__le32 payload_len; __le32 payload_len;
} __attribute__ ((packed)); } __attribute__ ((packed));

View File

@ -410,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
case CEPH_OSD_OP_LIST_WATCHERS: case CEPH_OSD_OP_LIST_WATCHERS:
ceph_osd_data_release(&op->list_watchers.response_data); ceph_osd_data_release(&op->list_watchers.response_data);
break; break;
case CEPH_OSD_OP_COPY_FROM:
ceph_osd_data_release(&op->copy_from.osd_data);
break;
default: default:
break; break;
} }
@ -702,6 +705,7 @@ static void get_num_data_items(struct ceph_osd_request *req,
case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_COPY_FROM:
*num_request_data_items += 1; *num_request_data_items += 1;
break; break;
@ -1016,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE: case CEPH_OSD_OP_DELETE:
break; break;
case CEPH_OSD_OP_COPY_FROM:
dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
dst->copy_from.src_version =
cpu_to_le64(src->copy_from.src_version);
dst->copy_from.flags = src->copy_from.flags;
dst->copy_from.src_fadvise_flags =
cpu_to_le32(src->copy_from.src_fadvise_flags);
break;
default: default:
pr_err("unsupported osd opcode %s\n", pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op)); ceph_osd_op_name(src->op));
@ -1947,6 +1959,10 @@ static void setup_request_data(struct ceph_osd_request *req)
ceph_osdc_msg_data_add(request_msg, ceph_osdc_msg_data_add(request_msg,
&op->notify_ack.request_data); &op->notify_ack.request_data);
break; break;
case CEPH_OSD_OP_COPY_FROM:
ceph_osdc_msg_data_add(request_msg,
&op->copy_from.osd_data);
break;
/* reply */ /* reply */
case CEPH_OSD_OP_STAT: case CEPH_OSD_OP_STAT:
@ -5255,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
} }
EXPORT_SYMBOL(ceph_osdc_writepages); EXPORT_SYMBOL(ceph_osdc_writepages);
static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
u32 dst_fadvise_flags,
u8 copy_from_flags)
{
struct ceph_osd_req_op *op;
struct page **pages;
void *p, *end;
pages = ceph_alloc_page_vector(1, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
op->copy_from.snapid = src_snapid;
op->copy_from.src_version = src_version;
op->copy_from.flags = copy_from_flags;
op->copy_from.src_fadvise_flags = src_fadvise_flags;
p = page_address(pages[0]);
end = p + PAGE_SIZE;
ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
encode_oloc(&p, end, src_oloc);
op->indata_len = PAGE_SIZE - (end - p);
ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
op->indata_len, 0, false, true);
return 0;
}
int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 dst_fadvise_flags,
u8 copy_from_flags)
{
struct ceph_osd_request *req;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
if (!req)
return -ENOMEM;
req->r_flags = CEPH_OSD_FLAG_WRITE;
ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
ceph_oid_copy(&req->r_t.base_oid, dst_oid);
ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
src_oloc, src_fadvise_flags,
dst_fadvise_flags, copy_from_flags);
if (ret)
goto out;
ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
goto out;
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
out:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_copy_from);
int __init ceph_osdc_setup(void) int __init ceph_osdc_setup(void)
{ {
size_t size = sizeof(struct ceph_osd_request) + size_t size = sizeof(struct ceph_osd_request) +