svcrdma: Use parsed chunk lists to encode Reply transport headers

Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing the egress RPC Reply transport header, use
the new parsed Write list and Reply chunk, which are version-
agnostic and already XDR decoded.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
This commit is contained in:
Chuck Lever 2020-06-17 11:50:34 -04:00
parent 7a1cbfa180
commit 6911f3e10c
2 changed files with 80 additions and 62 deletions

View File

@ -1447,9 +1447,44 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
TP_ARGS(handle, length, offset)) TP_ARGS(handle, length, offset))
DEFINE_SEGMENT_EVENT(send_rseg); DEFINE_SEGMENT_EVENT(send_rseg);
DEFINE_SEGMENT_EVENT(encode_wseg);
DEFINE_SEGMENT_EVENT(send_wseg); DEFINE_SEGMENT_EVENT(send_wseg);
TRACE_EVENT(svcrdma_encode_wseg,
TP_PROTO(
const struct svc_rdma_send_ctxt *ctxt,
u32 segno,
u32 handle,
u32 length,
u64 offset
),
TP_ARGS(ctxt, segno, handle, length, offset),
TP_STRUCT__entry(
__field(u32, cq_id)
__field(int, completion_id)
__field(u32, segno)
__field(u32, handle)
__field(u32, length)
__field(u64, offset)
),
TP_fast_assign(
__entry->cq_id = ctxt->sc_cid.ci_queue_id;
__entry->completion_id = ctxt->sc_cid.ci_completion_id;
__entry->segno = segno;
__entry->handle = handle;
__entry->length = length;
__entry->offset = offset;
),
TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
__entry->cq_id, __entry->completion_id,
__entry->segno, __entry->length,
(unsigned long long)__entry->offset, __entry->handle
)
);
TRACE_EVENT(svcrdma_decode_rseg, TRACE_EVENT(svcrdma_decode_rseg,
TP_PROTO( TP_PROTO(
const struct rpc_rdma_cid *cid, const struct rpc_rdma_cid *cid,

View File

@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
/** /**
* svc_rdma_encode_write_segment - Encode one Write segment * svc_rdma_encode_write_segment - Encode one Write segment
* @src: matching Write chunk in the RPC Call header
* @sctxt: Send context for the RPC Reply * @sctxt: Send context for the RPC Reply
* @chunk: Write chunk to push
* @remaining: remaining bytes of the payload left in the Write chunk * @remaining: remaining bytes of the payload left in the Write chunk
* @segno: which segment in the chunk
* *
* Return values: * Return values:
* On success, returns length in bytes of the Reply XDR buffer * On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Write segment * that was consumed by the Write segment, and updates @remaining
* %-EMSGSIZE on XDR buffer overflow * %-EMSGSIZE on XDR buffer overflow
*/ */
static ssize_t svc_rdma_encode_write_segment(__be32 *src, static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_chunk *chunk,
unsigned int *remaining) u32 *remaining, unsigned int segno)
{ {
const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
u32 length;
__be32 *p; __be32 *p;
const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
u32 handle, length;
u64 offset;
p = xdr_reserve_space(&sctxt->sc_stream, len); p = xdr_reserve_space(&sctxt->sc_stream, len);
if (!p) if (!p)
return -EMSGSIZE; return -EMSGSIZE;
xdr_decode_rdma_segment(src, &handle, &length, &offset); length = min_t(u32, *remaining, segment->rs_length);
*remaining -= length;
if (*remaining < length) { xdr_encode_rdma_segment(p, segment->rs_handle, length,
/* segment only partly filled */ segment->rs_offset);
length = *remaining; trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
*remaining = 0; segment->rs_offset);
} else {
/* entire segment was consumed */
*remaining -= length;
}
xdr_encode_rdma_segment(p, handle, length, offset);
trace_svcrdma_encode_wseg(handle, length, offset);
return len; return len;
} }
/** /**
* svc_rdma_encode_write_chunk - Encode one Write chunk * svc_rdma_encode_write_chunk - Encode one Write chunk
* @src: matching Write chunk in the RPC Call header
* @sctxt: Send context for the RPC Reply * @sctxt: Send context for the RPC Reply
* @remaining: size in bytes of the payload in the Write chunk * @chunk: Write chunk to push
* *
* Copy a Write chunk from the Call transport header to the * Copy a Write chunk from the Call transport header to the
* Reply transport header. Update each segment's length field * Reply transport header. Update each segment's length field
@ -411,33 +404,30 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
* that was consumed by the Write chunk * that was consumed by the Write chunk
* %-EMSGSIZE on XDR buffer overflow * %-EMSGSIZE on XDR buffer overflow
*/ */
static ssize_t svc_rdma_encode_write_chunk(__be32 *src, static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_chunk *chunk)
unsigned int remaining)
{ {
unsigned int i, nsegs; u32 remaining = chunk->ch_payload_length;
unsigned int segno;
ssize_t len, ret; ssize_t len, ret;
len = 0;
trace_svcrdma_encode_write_chunk(remaining); trace_svcrdma_encode_write_chunk(remaining);
src++; len = 0;
ret = xdr_stream_encode_item_present(&sctxt->sc_stream); ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
if (ret < 0) if (ret < 0)
return -EMSGSIZE; return ret;
len += ret; len += ret;
nsegs = be32_to_cpup(src++); ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
if (ret < 0) if (ret < 0)
return -EMSGSIZE; return ret;
len += ret; len += ret;
for (i = nsegs; i; i--) { for (segno = 0; segno < chunk->ch_segcount; segno++) {
ret = svc_rdma_encode_write_segment(src, sctxt, &remaining); ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
if (ret < 0) if (ret < 0)
return -EMSGSIZE; return ret;
src += rpcrdma_segment_maxsz;
len += ret; len += ret;
} }
@ -449,34 +439,23 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
* @rctxt: Reply context with information about the RPC Call * @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply * @sctxt: Send context for the RPC Reply
* *
* The client provides a Write chunk list in the Call message. Fill
* in the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused.
*
* Assumptions:
* - Client has provided only one Write chunk
*
* Return values: * Return values:
* On success, returns length in bytes of the Reply XDR buffer * On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Write list * that was consumed by the Reply's Write list
* %-EMSGSIZE on XDR buffer overflow * %-EMSGSIZE on XDR buffer overflow
*/ */
static ssize_t static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, struct svc_rdma_send_ctxt *sctxt)
struct svc_rdma_send_ctxt *sctxt)
{ {
struct svc_rdma_chunk *chunk; struct svc_rdma_chunk *chunk;
ssize_t len, ret; ssize_t len, ret;
len = 0; len = 0;
if (rctxt->rc_write_list) { pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
chunk = pcl_first_chunk(&rctxt->rc_write_pcl); ret = svc_rdma_encode_write_chunk(sctxt, chunk);
ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
chunk->ch_payload_length);
if (ret < 0) if (ret < 0)
return ret; return ret;
len = ret; len += ret;
} }
/* Terminate the Write list */ /* Terminate the Write list */
@ -493,24 +472,28 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
* @sctxt: Send context for the RPC Reply * @sctxt: Send context for the RPC Reply
* @length: size in bytes of the payload in the Reply chunk * @length: size in bytes of the payload in the Reply chunk
* *
* Assumptions:
* - Reply can always fit in the client-provided Reply chunk
*
* Return values: * Return values:
* On success, returns length in bytes of the Reply XDR buffer * On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Reply chunk * that was consumed by the Reply's Reply chunk
* %-EMSGSIZE on XDR buffer overflow * %-EMSGSIZE on XDR buffer overflow
* %-E2BIG if the RPC message is larger than the Reply chunk
*/ */
static ssize_t static ssize_t
svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt, svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt, struct svc_rdma_send_ctxt *sctxt,
unsigned int length) unsigned int length)
{ {
if (!rctxt->rc_reply_chunk) struct svc_rdma_chunk *chunk;
if (pcl_is_empty(&rctxt->rc_reply_pcl))
return xdr_stream_encode_item_absent(&sctxt->sc_stream); return xdr_stream_encode_item_absent(&sctxt->sc_stream);
return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt, chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
length); if (length > chunk->ch_length)
return -E2BIG;
chunk->ch_payload_length = length;
return svc_rdma_encode_write_chunk(sctxt, chunk);
} }
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@ -928,7 +911,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
*p++ = *rdma_argp; *p++ = *rdma_argp;
*p++ = *(rdma_argp + 1); *p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits; *p++ = rdma->sc_fc_credits;
*p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg; *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
if (svc_rdma_encode_read_list(sctxt) < 0) if (svc_rdma_encode_read_list(sctxt) < 0)
goto err0; goto err0;