fs: dlm: remove send repeat remove handling

This patch removes the send repeat remove handling. This handling is
there to repeatingly DLM_MSG_REMOVE messages in cases the dlm stack
thinks it was not received at the first time. In cases of message drops
this functionality is necessary, but since the DLM midcomms layer
guarantees there are no messages drops between cluster nodes this
feature became not strict necessary anymore. Due message
delays/processing it could be that two send_repeat_remove() are sent out
while the other should be still on it's way. We remove the repeat remove
handling because we are sure that the message cannot be dropped due
communication errors.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2022-10-27 16:45:13 -04:00 committed by David Teigland
parent f0f4bb431b
commit 57a5724ef0
1 changed files with 0 additions and 74 deletions

View File

@ -4044,66 +4044,6 @@ out:
return error; return error;
} }
static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
{
char name[DLM_RESNAME_MAXLEN + 1];
struct dlm_message *ms;
struct dlm_mhandle *mh;
struct dlm_rsb *r;
uint32_t hash, b;
int rv, dir_nodeid;
memset(name, 0, sizeof(name));
memcpy(name, ms_name, len);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
spin_lock(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on keep %s", name);
return;
}
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on toss %s", name);
return;
}
/* use ls->remove_name2 to avoid conflict with shrink? */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
rv = _create_message(ls, sizeof(struct dlm_message) + len,
dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
if (rv)
goto out;
memcpy(ms->m_extra, name, len);
ms->m_hash = cpu_to_le32(hash);
send_message(mh, ms);
out:
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
wake_up(&ls->ls_remove_wait);
}
static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
@ -4173,25 +4113,11 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
ENOTBLK request failures when the lookup reply designating us ENOTBLK request failures when the lookup reply designating us
as master is delayed. */ as master is delayed. */
/* We could repeatedly return -EBADR here if our send_remove() is
delayed in being sent/arriving/being processed on the dir node.
Another node would repeatedly lookup up the master, and the dir
node would continue returning our nodeid until our send_remove
took effect.
We send another remove message in case our previous send_remove
was lost/ignored/missed somehow. */
if (error != -ENOTBLK) { if (error != -ENOTBLK) {
log_limit(ls, "receive_request %x from %d %d", log_limit(ls, "receive_request %x from %d %d",
le32_to_cpu(ms->m_lkid), from_nodeid, error); le32_to_cpu(ms->m_lkid), from_nodeid, error);
} }
if (namelen && error == -EBADR) {
send_repeat_remove(ls, ms->m_extra, namelen);
msleep(1000);
}
setup_stub_lkb(ls, ms); setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
return error; return error;