SUNRPC: Fix priority queue fairness

Fix up the priority queue to not batch by owner, but by queue, so that
we allow '1 << priority' elements to be dequeued before switching to
the next priority queue.
The owner field is still used to wake up requests in round robin order
by owner to avoid single processes hogging the RPC layer by loading the
queues.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
This commit is contained in:
Trond Myklebust 2018-09-08 22:09:48 -04:00
parent 95f7691daa
commit f42f7c2830
2 changed files with 56 additions and 59 deletions

View File

@ -189,7 +189,6 @@ struct rpc_timer {
struct rpc_wait_queue {
spinlock_t lock;
struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */
pid_t owner; /* process id of last task serviced */
unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */
unsigned char priority; /* current priority */
unsigned char nr; /* # tasks remaining for cookie */
@ -205,7 +204,6 @@ struct rpc_wait_queue {
* from a single cookie. The aim is to improve
* performance of NFS operations such as read/write.
*/
#define RPC_BATCH_COUNT 16
#define RPC_IS_PRIORITY(q) ((q)->maxpriority > 0)
/*

View File

@ -99,37 +99,64 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
}
static void rpc_rotate_queue_owner(struct rpc_wait_queue *queue)
{
struct list_head *q = &queue->tasks[queue->priority];
struct rpc_task *task;
if (!list_empty(q)) {
task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
if (task->tk_owner == queue->owner)
list_move_tail(&task->u.tk_wait.list, q);
}
}
static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
{
if (queue->priority != priority) {
/* Fairness: rotate the list when changing priority */
rpc_rotate_queue_owner(queue);
queue->priority = priority;
queue->nr = 1U << priority;
}
}
static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
{
queue->owner = pid;
queue->nr = RPC_BATCH_COUNT;
}
static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
{
rpc_set_waitqueue_priority(queue, queue->maxpriority);
rpc_set_waitqueue_owner(queue, 0);
}
/*
* Add a request to a queue list
*/
static void
__rpc_list_enqueue_task(struct list_head *q, struct rpc_task *task)
{
struct rpc_task *t;
list_for_each_entry(t, q, u.tk_wait.list) {
if (t->tk_owner == task->tk_owner) {
list_add_tail(&task->u.tk_wait.links,
&t->u.tk_wait.links);
/* Cache the queue head in task->u.tk_wait.list */
task->u.tk_wait.list.next = q;
task->u.tk_wait.list.prev = NULL;
return;
}
}
INIT_LIST_HEAD(&task->u.tk_wait.links);
list_add_tail(&task->u.tk_wait.list, q);
}
/*
* Remove request from a queue list
*/
static void
__rpc_list_dequeue_task(struct rpc_task *task)
{
struct list_head *q;
struct rpc_task *t;
if (task->u.tk_wait.list.prev == NULL) {
list_del(&task->u.tk_wait.links);
return;
}
if (!list_empty(&task->u.tk_wait.links)) {
t = list_first_entry(&task->u.tk_wait.links,
struct rpc_task,
u.tk_wait.links);
/* Assume __rpc_list_enqueue_task() cached the queue head */
q = t->u.tk_wait.list.next;
list_add_tail(&t->u.tk_wait.list, q);
list_del(&task->u.tk_wait.links);
}
list_del(&task->u.tk_wait.list);
}
/*
@ -139,22 +166,9 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
struct rpc_task *task,
unsigned char queue_priority)
{
struct list_head *q;
struct rpc_task *t;
INIT_LIST_HEAD(&task->u.tk_wait.links);
if (unlikely(queue_priority > queue->maxpriority))
queue_priority = queue->maxpriority;
if (queue_priority > queue->priority)
rpc_set_waitqueue_priority(queue, queue_priority);
q = &queue->tasks[queue_priority];
list_for_each_entry(t, q, u.tk_wait.list) {
if (t->tk_owner == task->tk_owner) {
list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
return;
}
}
list_add_tail(&task->u.tk_wait.list, q);
__rpc_list_enqueue_task(&queue->tasks[queue_priority], task);
}
/*
@ -194,13 +208,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
*/
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
{
struct rpc_task *t;
if (!list_empty(&task->u.tk_wait.links)) {
t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
}
__rpc_list_dequeue_task(task);
}
/*
@ -212,7 +220,8 @@ static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_tas
__rpc_disable_timer(queue, task);
if (RPC_IS_PRIORITY(queue))
__rpc_remove_wait_queue_priority(task);
list_del(&task->u.tk_wait.list);
else
list_del(&task->u.tk_wait.list);
queue->qlen--;
dprintk("RPC: %5u removed from queue %p \"%s\"\n",
task->tk_pid, queue, rpc_qname(queue));
@ -545,17 +554,9 @@ static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *q
* Service a batch of tasks from a single owner.
*/
q = &queue->tasks[queue->priority];
if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
if (queue->owner == task->tk_owner) {
if (--queue->nr)
goto out;
list_move_tail(&task->u.tk_wait.list, q);
}
/*
* Check if we need to switch queues.
*/
goto new_owner;
if (!list_empty(q) && --queue->nr) {
task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
goto out;
}
/*
@ -567,7 +568,7 @@ static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *q
else
q = q - 1;
if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
goto new_queue;
}
} while (q != &queue->tasks[queue->priority]);
@ -577,8 +578,6 @@ static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *q
new_queue:
rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
new_owner:
rpc_set_waitqueue_owner(queue, task->tk_owner);
out:
return task;
}