With netfs helper library and fscache rework delayed, just a few cap

handling improvements to avoid grabbing mmap_lock in some code paths
 and deal with capsnaps better and a mount option cleanup.
 -----BEGIN PGP SIGNATURE-----
 
 iQFHBAABCAAxFiEEydHwtzie9C7TfviiSn/eOAIR84sFAmAzuGwTHGlkcnlvbW92
 QGdtYWlsLmNvbQAKCRBKf944AhHzixPqB/9kQxU8IkCF0wOm+dm0tBW3PjYxBFuz
 HryHU6WJHDbX9/enH6PgMj6ZpRwxgzDq8xUpmRKVeaPflej9PnfQyH/On+vQWRUX
 WyWyBx0QqbrKYvYK0cCjHzVC5kbtBA8C/1OSSs5EkJIh518RBMkeru9pYL7+TI5x
 zeQVXzOJB2Bz7y8Odd2RjlkAkix/J1m0LIggRaoWrTygz93PKXfjzhDpa4KC4WZj
 W6LjnYPpYjo34poKx/3N3ZSgGP+Y3F7ZDeNfSnPB2WKs7vzcYUCpWXBSHnHTz+lK
 H2O5GdmxQ6BFp4SZvYtf5e78igH/m/QmzAYGW2EmmKttOcyrb2282snb
 =8MQu
 -----END PGP SIGNATURE-----

Merge tag 'ceph-for-5.12-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "With netfs helper library and fscache rework delayed, just a few cap
  handling improvements to avoid grabbing mmap_lock in some code paths
  and deal with capsnaps better and a mount option cleanup"

* tag 'ceph-for-5.12-rc1' of git://github.com/ceph/ceph-client:
  ceph: defer flushing the capsnap if the Fb is used
  libceph: remove osdtimeout option entirely
  libceph: deprecate [no]cephx_require_signatures options
  ceph: allow queueing cap/snap handling after putting cap references
  ceph: clean up inode work queueing
  ceph: fix flush_snap logic after putting caps
This commit is contained in:
Linus Torvalds 2021-02-22 13:27:51 -08:00
commit 20bf195e93
7 changed files with 115 additions and 92 deletions

View File

@ -1662,7 +1662,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
inode, off, len, ceph_cap_string(got), ret);
ceph_put_cap_refs(ci, got);
ceph_put_cap_refs_async(ci, got);
out_free:
ceph_restore_sigs(&oldset);
sb_end_pagefault(inode->i_sb);

View File

@ -3027,6 +3027,12 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
return 0;
}
enum put_cap_refs_mode {
PUT_CAP_REFS_SYNC = 0,
PUT_CAP_REFS_NO_CHECK,
PUT_CAP_REFS_ASYNC,
};
/*
* Release cap refs.
*
@ -3037,10 +3043,11 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
* cap_snap, and wake up any waiters.
*/
static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
bool skip_checking_caps)
enum put_cap_refs_mode mode)
{
struct inode *inode = &ci->vfs_inode;
int last = 0, put = 0, flushsnaps = 0, wake = 0;
bool check_flushsnaps = false;
spin_lock(&ci->i_ceph_lock);
if (had & CEPH_CAP_PIN)
@ -3057,26 +3064,17 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
if (had & CEPH_CAP_FILE_BUFFER) {
if (--ci->i_wb_ref == 0) {
last++;
/* put the ref held by ceph_take_cap_refs() */
put++;
check_flushsnaps = true;
}
dout("put_cap_refs %p wb %d -> %d (?)\n",
inode, ci->i_wb_ref+1, ci->i_wb_ref);
}
if (had & CEPH_CAP_FILE_WR)
if (had & CEPH_CAP_FILE_WR) {
if (--ci->i_wr_ref == 0) {
last++;
if (__ceph_have_pending_cap_snap(ci)) {
struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps,
struct ceph_cap_snap,
ci_item);
capsnap->writing = 0;
if (ceph_try_drop_cap_snap(ci, capsnap))
put++;
else if (__ceph_finish_cap_snap(ci, capsnap))
flushsnaps = 1;
wake = 1;
}
check_flushsnaps = true;
if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
@ -3088,15 +3086,42 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
drop_inode_snap_realm(ci);
}
}
if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps,
struct ceph_cap_snap,
ci_item);
capsnap->writing = 0;
if (ceph_try_drop_cap_snap(ci, capsnap))
/* put the ref held by ceph_queue_cap_snap() */
put++;
else if (__ceph_finish_cap_snap(ci, capsnap))
flushsnaps = 1;
wake = 1;
}
spin_unlock(&ci->i_ceph_lock);
dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
last ? " last" : "", put ? " put" : "");
if (last && !skip_checking_caps)
ceph_check_caps(ci, 0, NULL);
else if (flushsnaps)
ceph_flush_snaps(ci, NULL);
switch (mode) {
case PUT_CAP_REFS_SYNC:
if (last)
ceph_check_caps(ci, 0, NULL);
else if (flushsnaps)
ceph_flush_snaps(ci, NULL);
break;
case PUT_CAP_REFS_ASYNC:
if (last)
ceph_queue_check_caps(inode);
else if (flushsnaps)
ceph_queue_flush_snaps(inode);
break;
default:
break;
}
if (wake)
wake_up_all(&ci->i_cap_wq);
while (put-- > 0)
@ -3105,12 +3130,17 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
{
__ceph_put_cap_refs(ci, had, false);
__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_SYNC);
}
void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had)
{
__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_ASYNC);
}
void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
{
__ceph_put_cap_refs(ci, had, true);
__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_NO_CHECK);
}
/*

View File

@ -1816,60 +1816,17 @@ void ceph_async_iput(struct inode *inode)
}
}
/*
* Write back inode data in a worker thread. (This can't be done
* in the message handler context.)
*/
void ceph_queue_writeback(struct inode *inode)
void ceph_queue_inode_work(struct inode *inode, int work_bit)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
set_bit(work_bit, &ci->i_work_mask);
ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ci->i_work)) {
dout("ceph_queue_writeback %p\n", inode);
if (queue_work(fsc->inode_wq, &ci->i_work)) {
dout("queue_inode_work %p, mask=%lx\n", inode, ci->i_work_mask);
} else {
dout("ceph_queue_writeback %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode);
}
}
/*
* queue an async invalidation
*/
void ceph_queue_invalidate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ceph_inode(inode)->i_work)) {
dout("ceph_queue_invalidate %p\n", inode);
} else {
dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode);
}
}
/*
* Queue an async vmtruncate. If we fail to queue work, we will handle
* the truncation the next time we call __ceph_do_pending_vmtruncate.
*/
void ceph_queue_vmtruncate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ci->i_work)) {
dout("ceph_queue_vmtruncate %p\n", inode);
} else {
dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
dout("queue_inode_work %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode);
}
@ -2008,6 +1965,12 @@ static void ceph_inode_work(struct work_struct *work)
if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
__ceph_do_pending_vmtruncate(inode);
if (test_and_clear_bit(CEPH_I_WORK_CHECK_CAPS, &ci->i_work_mask))
ceph_check_caps(ci, 0, NULL);
if (test_and_clear_bit(CEPH_I_WORK_FLUSH_SNAPS, &ci->i_work_mask))
ceph_flush_snaps(ci, NULL);
iput(inode);
}

View File

@ -623,6 +623,16 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
return 0;
}
/* Fb cap still in use, delay it */
if (ci->i_wb_ref) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"used WRBUFFER, delaying\n", inode, capsnap,
capsnap->context, capsnap->context->seq,
ceph_cap_string(capsnap->dirty), capsnap->size);
capsnap->writing = 1;
return 0;
}
ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context,

View File

@ -562,9 +562,11 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
/*
* Masks of ceph inode work.
*/
#define CEPH_I_WORK_WRITEBACK 0 /* writeback */
#define CEPH_I_WORK_INVALIDATE_PAGES 1 /* invalidate pages */
#define CEPH_I_WORK_VMTRUNCATE 2 /* vmtruncate */
#define CEPH_I_WORK_WRITEBACK 0
#define CEPH_I_WORK_INVALIDATE_PAGES 1
#define CEPH_I_WORK_VMTRUNCATE 2
#define CEPH_I_WORK_CHECK_CAPS 3
#define CEPH_I_WORK_FLUSH_SNAPS 4
/*
* We set the ERROR_WRITE bit when we start seeing write errors on an inode
@ -962,11 +964,36 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_queue_vmtruncate(struct inode *inode);
extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode);
extern void ceph_async_iput(struct inode *inode);
void ceph_queue_inode_work(struct inode *inode, int work_bit);
static inline void ceph_queue_vmtruncate(struct inode *inode)
{
ceph_queue_inode_work(inode, CEPH_I_WORK_VMTRUNCATE);
}
static inline void ceph_queue_invalidate(struct inode *inode)
{
ceph_queue_inode_work(inode, CEPH_I_WORK_INVALIDATE_PAGES);
}
static inline void ceph_queue_writeback(struct inode *inode)
{
ceph_queue_inode_work(inode, CEPH_I_WORK_WRITEBACK);
}
static inline void ceph_queue_check_caps(struct inode *inode)
{
ceph_queue_inode_work(inode, CEPH_I_WORK_CHECK_CAPS);
}
static inline void ceph_queue_flush_snaps(struct inode *inode)
{
ceph_queue_inode_work(inode, CEPH_I_WORK_FLUSH_SNAPS);
}
extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force);
static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
@ -1105,6 +1132,7 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
bool snap_rwsem_locked);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had);
extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,

View File

@ -32,10 +32,9 @@
#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */
#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes (msgr1) */
#define CEPH_OPT_NOMSGAUTH (1<<4) /* don't require msg signing feat */
#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */
#define CEPH_OPT_NOMSGSIGN (1<<6) /* don't sign msgs (msgr1) */
#define CEPH_OPT_ABORT_ON_FULL (1<<7) /* abort w/ ENOSPC when full */
#define CEPH_OPT_TCP_NODELAY (1<<4) /* TCP_NODELAY on TCP sockets */
#define CEPH_OPT_NOMSGSIGN (1<<5) /* don't sign msgs (msgr1) */
#define CEPH_OPT_ABORT_ON_FULL (1<<6) /* abort w/ ENOSPC when full */
#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)

View File

@ -252,7 +252,6 @@ static int parse_fsid(const char *str, struct ceph_fsid *fsid)
* ceph options
*/
enum {
Opt_osdtimeout,
Opt_osdkeepalivetimeout,
Opt_mount_timeout,
Opt_osd_idle_ttl,
@ -307,7 +306,8 @@ static const struct constant_table ceph_param_ms_mode[] = {
static const struct fs_parameter_spec ceph_parameters[] = {
fsparam_flag ("abort_on_full", Opt_abort_on_full),
fsparam_flag_no ("cephx_require_signatures", Opt_cephx_require_signatures),
__fsparam (NULL, "cephx_require_signatures", Opt_cephx_require_signatures,
fs_param_neg_with_no|fs_param_deprecated, NULL),
fsparam_flag_no ("cephx_sign_messages", Opt_cephx_sign_messages),
fsparam_flag_no ("crc", Opt_crc),
fsparam_string ("crush_location", Opt_crush_location),
@ -319,8 +319,6 @@ static const struct fs_parameter_spec ceph_parameters[] = {
fsparam_u32 ("osd_idle_ttl", Opt_osd_idle_ttl),
fsparam_u32 ("osd_request_timeout", Opt_osd_request_timeout),
fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout),
__fsparam (fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
fs_param_deprecated, NULL),
fsparam_enum ("read_from_replica", Opt_read_from_replica,
ceph_param_read_from_replica),
fsparam_enum ("ms_mode", Opt_ms_mode,
@ -552,9 +550,6 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
}
break;
case Opt_osdtimeout:
warn_plog(&log, "Ignoring osdtimeout");
break;
case Opt_osdkeepalivetimeout:
/* 0 isn't well defined right now, reject it */
if (result.uint_32 < 1 || result.uint_32 > INT_MAX / 1000)
@ -596,9 +591,9 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
break;
case Opt_cephx_require_signatures:
if (!result.negated)
opt->flags &= ~CEPH_OPT_NOMSGAUTH;
warn_plog(&log, "Ignoring cephx_require_signatures");
else
opt->flags |= CEPH_OPT_NOMSGAUTH;
warn_plog(&log, "Ignoring nocephx_require_signatures, use nocephx_sign_messages");
break;
case Opt_cephx_sign_messages:
if (!result.negated)
@ -686,8 +681,6 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
seq_puts(m, "noshare,");
if (opt->flags & CEPH_OPT_NOCRC)
seq_puts(m, "nocrc,");
if (opt->flags & CEPH_OPT_NOMSGAUTH)
seq_puts(m, "nocephx_require_signatures,");
if (opt->flags & CEPH_OPT_NOMSGSIGN)
seq_puts(m, "nocephx_sign_messages,");
if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
@ -756,7 +749,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
if (!ceph_test_opt(client, NOMSGAUTH))
if (!ceph_test_opt(client, NOMSGSIGN))
client->required_features |= CEPH_FEATURE_MSG_AUTH;
/* msgr */