OpenCloudOS-Kernel/include/linux/ceph/messenger.h

303 lines
8.9 KiB
C
Raw Normal View History

#ifndef __FS_CEPH_MESSENGER_H
#define __FS_CEPH_MESSENGER_H
#include <linux/blk_types.h>
#include <linux/kref.h>
#include <linux/mutex.h>
#include <linux/net.h>
#include <linux/radix-tree.h>
#include <linux/uio.h>
#include <linux/workqueue.h>
#include <net/net_namespace.h>
#include <linux/ceph/types.h>
#include <linux/ceph/buffer.h>
struct ceph_msg;
struct ceph_connection;
/*
* Ceph defines these callbacks for handling connection events.
*/
struct ceph_connection_operations {
struct ceph_connection *(*get)(struct ceph_connection *);
void (*put)(struct ceph_connection *);
/* handle an incoming message. */
void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
/* authorize an outgoing connection */
struct ceph_auth_handshake *(*get_authorizer) (
struct ceph_connection *con,
int *proto, int force_new);
int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
int (*invalidate_authorizer)(struct ceph_connection *con);
/* there was some error on the socket (disconnect, whatever) */
void (*fault) (struct ceph_connection *con);
/* a remote host as terminated a message exchange session, and messages
* we sent (or they tried to send us) may be lost. */
void (*peer_reset) (struct ceph_connection *con);
struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
int (*sign_message) (struct ceph_msg *msg);
int (*check_message_signature) (struct ceph_msg *msg);
};
/* use format string %s%d */
#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
struct ceph_messenger {
struct ceph_entity_inst inst; /* my name+address */
struct ceph_entity_addr my_enc_addr;
atomic_t stopping;
possible_net_t net;
/*
* the global_seq counts connections i (attempt to) initiate
* in order to disambiguate certain connect race conditions.
*/
u32 global_seq;
spinlock_t global_seq_lock;
};
enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
#ifdef CONFIG_BLOCK
CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */
#endif /* CONFIG_BLOCK */
};
static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
{
switch (type) {
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
case CEPH_MSG_DATA_PAGELIST:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
#endif /* CONFIG_BLOCK */
return true;
default:
return false;
}
}
struct ceph_msg_data {
struct list_head links; /* ceph_msg->data */
enum ceph_msg_data_type type;
union {
#ifdef CONFIG_BLOCK
struct {
struct bio *bio;
size_t bio_length;
};
#endif /* CONFIG_BLOCK */
struct {
struct page **pages; /* NOT OWNER. */
size_t length; /* total # bytes */
unsigned int alignment; /* first page */
};
struct ceph_pagelist *pagelist;
};
};
libceph: start defining message data cursor This patch lays out the foundation for using generic routines to manage processing items of message data. For simplicity, we'll start with just the trail portion of a message, because it stands alone and is only present for outgoing data. First some basic concepts. We'll use the term "data item" to represent one of the ceph_msg_data structures associated with a message. There are currently four of those, with single-letter field names p, l, b, and t. A data item is further broken into "pieces" which always lie in a single page. A data item will include a "cursor" that will track state as the memory defined by the item is consumed by sending data from or receiving data into it. We define three routines to manipulate a data item's cursor: the "init" routine; the "next" routine; and the "advance" routine. The "init" routine initializes the cursor so it points at the beginning of the first piece in the item. The "next" routine returns the page, page offset, and length (limited by both the page and item size) of the next unconsumed piece in the item. It also indicates to the caller whether the piece being returned is the last one in the data item. The "advance" routine consumes the requested number of bytes in the item (advancing the cursor). This is used to record the number of bytes from the current piece that were actually sent or received by the network code. It returns an indication of whether the result means the current piece has been fully consumed. This is used by the message send code to determine whether it should calculate the CRC for the next piece processed. The trail of a message is implemented as a ceph pagelist. The routines defined for it will be usable for non-trail pagelist data as well. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
2013-03-07 13:39:39 +08:00
struct ceph_msg_data_cursor {
libceph: implement multiple data items in a message This patch adds support to the messenger for more than one data item in its data list. A message data cursor has two more fields to support this: - a count of the number of bytes left to be consumed across all data items in the list, "total_resid" - a pointer to the head of the list (for validation only) The cursor initialization routine has been split into two parts: the outer one, which initializes the cursor for traversing the entire list of data items; and the inner one, which initializes the cursor to start processing a single data item. When a message cursor is first initialized, the outer initialization routine sets total_resid to the length provided. The data pointer is initialized to the first data item on the list. From there, the inner initialization routine finishes by setting up to process the data item the cursor points to. Advancing the cursor consumes bytes in total_resid. If the resid field reaches zero, it means the current data item is fully consumed. If total_resid indicates there is more data, the cursor is advanced to point to the next data item, and then the inner initialization routine prepares for using that. (A check is made at this point to make sure we don't wrap around the front of the list.) The type-specific init routines are modified so they can be given a length that's larger than what the data item can support. The resid field is initialized to the smaller of the provided length and the length of the entire data item. When total_resid reaches zero, we're done. This resolves: http://tracker.ceph.com/issues/3761 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
2013-04-06 03:46:01 +08:00
size_t total_resid; /* across all data items */
struct list_head *data_head; /* = &ceph_msg->data */
struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
union {
#ifdef CONFIG_BLOCK
struct { /* bio */
struct bio *bio; /* bio from list */
struct bvec_iter bvec_iter;
};
#endif /* CONFIG_BLOCK */
struct { /* pages */
unsigned int page_offset; /* offset in page */
unsigned short page_index; /* index in array */
unsigned short page_count; /* pages in array */
};
struct { /* pagelist */
struct page *page; /* page from list */
size_t offset; /* bytes from list */
};
};
libceph: start defining message data cursor This patch lays out the foundation for using generic routines to manage processing items of message data. For simplicity, we'll start with just the trail portion of a message, because it stands alone and is only present for outgoing data. First some basic concepts. We'll use the term "data item" to represent one of the ceph_msg_data structures associated with a message. There are currently four of those, with single-letter field names p, l, b, and t. A data item is further broken into "pieces" which always lie in a single page. A data item will include a "cursor" that will track state as the memory defined by the item is consumed by sending data from or receiving data into it. We define three routines to manipulate a data item's cursor: the "init" routine; the "next" routine; and the "advance" routine. The "init" routine initializes the cursor so it points at the beginning of the first piece in the item. The "next" routine returns the page, page offset, and length (limited by both the page and item size) of the next unconsumed piece in the item. It also indicates to the caller whether the piece being returned is the last one in the data item. The "advance" routine consumes the requested number of bytes in the item (advancing the cursor). This is used to record the number of bytes from the current piece that were actually sent or received by the network code. It returns an indication of whether the result means the current piece has been fully consumed. This is used by the message send code to determine whether it should calculate the CRC for the next piece processed. The trail of a message is implemented as a ceph pagelist. The routines defined for it will be usable for non-trail pagelist data as well. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
2013-03-07 13:39:39 +08:00
};
/*
* a single message. it contains a header (src, dest, message type, etc.),
* footer (crc values, mainly), a "front" message body, and possibly a
* data payload (stored in some number of pages).
*/
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
union {
struct ceph_msg_footer footer; /* footer */
struct ceph_msg_footer_old old_footer; /* old format footer */
};
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
size_t data_length;
struct list_head data;
struct ceph_msg_data_cursor cursor;
struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */
struct kref kref;
bool more_to_follow;
bool needs_out_seq;
int front_alloc_len;
unsigned long ack_stamp; /* tx: when we were acked */
struct ceph_msgpool *pool;
};
/* ceph connection fault delay defaults, for exponential backoff */
#define BASE_DELAY_INTERVAL (HZ/2)
#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
/*
* A single connection with another host.
*
* We maintain a queue of outgoing messages, and some session state to
* ensure that we can preserve the lossless, ordered delivery of
* messages in the case of a TCP disconnect.
*/
struct ceph_connection {
void *private;
const struct ceph_connection_operations *ops;
struct ceph_messenger *msgr;
atomic_t sock_state;
struct socket *sock;
struct ceph_entity_addr peer_addr; /* peer address */
struct ceph_entity_addr peer_addr_for_me;
unsigned long flags;
unsigned long state;
const char *error_msg; /* error message, if any */
struct ceph_entity_name peer_name; /* peer name */
u64 peer_features;
u32 connect_seq; /* identify the most recent connection
attempt for this connection, client */
u32 peer_global_seq; /* peer's global seq for this connection */
int auth_retry; /* true if we need a newer authorizer */
void *auth_reply_buf; /* where to put the authorizer reply */
int auth_reply_buf_len;
struct mutex mutex;
/* out queue */
struct list_head out_queue;
struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */
u64 in_seq, in_seq_acked; /* last message received, acked */
/* connection negotiation temps */
char in_banner[CEPH_BANNER_MAX_LEN];
struct ceph_msg_connect out_connect;
struct ceph_msg_connect_reply in_reply;
struct ceph_entity_addr actual_peer_addr;
/* message out temps */
libceph: fix ceph_msg_revoke() There are a number of problems with revoking a "was sending" message: (1) We never make any attempt to revoke data - only kvecs contibute to con->out_skip. However, once the header (envelope) is written to the socket, our peer learns data_len and sets itself to expect at least data_len bytes to follow front or front+middle. If ceph_msg_revoke() is called while the messenger is sending message's data portion, anything we send after that call is counted by the OSD towards the now revoked message's data portion. The effects vary, the most common one is the eventual hang - higher layers get stuck waiting for the reply to the message that was sent out after ceph_msg_revoke() returned and treated by the OSD as a bunch of data bytes. This is what Matt ran into. (2) Flat out zeroing con->out_kvec_bytes worth of bytes to handle kvecs is wrong. If ceph_msg_revoke() is called before the tag is sent out or while the messenger is sending the header, we will get a connection reset, either due to a bad tag (0 is not a valid tag) or a bad header CRC, which kind of defeats the purpose of revoke. Currently the kernel client refuses to work with header CRCs disabled, but that will likely change in the future, making this even worse. (3) con->out_skip is not reset on connection reset, leading to one or more spurious connection resets if we happen to get a real one between con->out_skip is set in ceph_msg_revoke() and before it's cleared in write_partial_skip(). Fixing (1) and (3) is trivial. The idea behind fixing (2) is to never zero the tag or the header, i.e. send out tag+header regardless of when ceph_msg_revoke() is called. That way the header is always correct, no unnecessary resets are induced and revoke stands ready for disabled CRCs. Since ceph_msg_revoke() rips out con->out_msg, introduce a new "message out temp" and copy the header into it before sending. Cc: stable@vger.kernel.org # 4.0+ Reported-by: Matt Conner <matt.conner@keepertech.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Tested-by: Matt Conner <matt.conner@keepertech.com> Reviewed-by: Sage Weil <sage@redhat.com>
2015-12-28 18:18:34 +08:00
struct ceph_msg_header out_hdr;
struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */
bool out_msg_done;
struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur;
int out_kvec_left; /* kvec's left in out_kvec */
int out_skip; /* skip this many bytes */
int out_kvec_bytes; /* total bytes left */
int out_more; /* there is more data after the kvecs */
__le64 out_temp_ack; /* for writing an ack */
struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
stamp */
/* message in temps */
struct ceph_msg_header in_hdr;
struct ceph_msg *in_msg;
u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
char in_tag; /* protocol control byte */
int in_base_pos; /* bytes read */
__le64 in_temp_ack; /* for reading an ack */
struct timespec last_keepalive_ack; /* keepalive2 ack stamp */
struct delayed_work work; /* send|recv work */
unsigned long delay; /* current delay interval */
};
extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
extern int ceph_parse_ips(const char *c, const char *end,
struct ceph_entity_addr *addr,
int max_count, int *count);
extern int ceph_msgr_init(void);
extern void ceph_msgr_exit(void);
extern void ceph_msgr_flush(void);
extern void ceph_messenger_init(struct ceph_messenger *msgr,
struct ceph_entity_addr *myaddr);
extern void ceph_messenger_fini(struct ceph_messenger *msgr);
extern void ceph_con_init(struct ceph_connection *con, void *private,
const struct ceph_connection_operations *ops,
struct ceph_messenger *msgr);
extern void ceph_con_open(struct ceph_connection *con,
__u8 entity_type, __u64 entity_num,
struct ceph_entity_addr *addr);
extern bool ceph_con_opened(struct ceph_connection *con);
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_msg_revoke(struct ceph_msg *msg);
extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);
extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
size_t length);
#endif /* CONFIG_BLOCK */
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail);
extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
extern void ceph_msg_put(struct ceph_msg *msg);
extern void ceph_msg_dump(struct ceph_msg *msg);
#endif