1415 lines
40 KiB
Groff
1415 lines
40 KiB
Groff
*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700
|
|
--- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 36,41 ****
|
|
--- 36,55 ----
|
|
#endif
|
|
|
|
/*
|
|
+ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
|
|
+ * a queue per connection, waiting for TCP buffer space to become available in
|
|
+ * the kernel. Rather than exceeding this limit, we simply discard additional
|
|
+ * messages (since this is always allowed by the replication protocol).
|
|
+ * As a special dispensation, if a message is destined for a specific remote
|
|
+ * site (i.e., it's not a broadcast), then we first try blocking the sending
|
|
+ * thread, waiting for space to become available (though we only wait a limited
|
|
+ * time). This is so as to be able to handle the immediate flood of (a
|
|
+ * potentially large number of) outgoing messages that replication generates, in
|
|
+ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
|
|
+ */
|
|
+ #define OUT_QUEUE_LIMIT 10
|
|
+
|
|
+ /*
|
|
* The system value is available from sysconf(_SC_HOST_NAME_MAX).
|
|
* Historically, the maximum host name was 256.
|
|
*/
|
|
***************
|
|
*** 47,52 ****
|
|
--- 61,71 ----
|
|
#define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
|
|
typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
|
|
|
|
+ /* Default timeout values, in seconds. */
|
|
+ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
|
|
+ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
|
|
+ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
|
|
+
|
|
struct __repmgr_connection;
|
|
typedef struct __repmgr_connection REPMGR_CONNECTION;
|
|
struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
|
|
***************
|
|
*** 171,178 ****
|
|
#ifdef DB_WIN32
|
|
WSAEVENT event_object;
|
|
#endif
|
|
! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */
|
|
! #define CONN_DEFUNCT 0x02 /* socket close pending */
|
|
u_int32_t flags;
|
|
|
|
/*
|
|
--- 190,198 ----
|
|
#ifdef DB_WIN32
|
|
WSAEVENT event_object;
|
|
#endif
|
|
! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */
|
|
! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */
|
|
! #define CONN_DEFUNCT 0x04 /* socket close pending */
|
|
u_int32_t flags;
|
|
|
|
/*
|
|
***************
|
|
*** 180,189 ****
|
|
* send() function's thread. But if TCP doesn't have enough network
|
|
* buffer space for us when we first try it, we instead allocate some
|
|
* memory, and copy the message, and then send it as space becomes
|
|
! * available in our main select() thread.
|
|
*/
|
|
OUT_Q_HEADER outbound_queue;
|
|
int out_queue_length;
|
|
|
|
/*
|
|
* Input: while we're reading a message, we keep track of what phase
|
|
--- 200,215 ----
|
|
* send() function's thread. But if TCP doesn't have enough network
|
|
* buffer space for us when we first try it, we instead allocate some
|
|
* memory, and copy the message, and then send it as space becomes
|
|
! * available in our main select() thread. In some cases, if the queue
|
|
! * gets too long we wait until it's drained, and then append to it.
|
|
! * This condition variable's associated mutex is the normal per-repmgr
|
|
! * db_rep->mutex, because that mutex is always held anyway whenever the
|
|
! * output queue is consulted.
|
|
*/
|
|
OUT_Q_HEADER outbound_queue;
|
|
int out_queue_length;
|
|
+ cond_var_t drained;
|
|
+ int blockers; /* ref count of msg threads waiting on us */
|
|
|
|
/*
|
|
* Input: while we're reading a message, we keep track of what phase
|
|
*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
|
|
--- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
|
|
***************
|
|
*** 1420,1425 ****
|
|
--- 1420,1428 ----
|
|
#define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
|
|
#define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
|
|
#define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
|
|
+ #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
|
|
+ #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
|
|
+ #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
|
|
#define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
|
|
#define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
|
|
#define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
|
|
*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
|
|
--- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
|
|
***************
|
|
*** 21,30 ****
|
|
int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
|
|
void __repmgr_stash_generation __P((DB_ENV *));
|
|
int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
|
|
! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
|
|
int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
|
|
! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
|
|
! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
|
|
int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
|
|
int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
|
|
--- 21,30 ----
|
|
int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
|
|
void __repmgr_stash_generation __P((DB_ENV *));
|
|
int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
|
|
! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
|
|
int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
|
|
! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
|
|
int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
|
|
int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
|
|
***************
|
|
*** 39,44 ****
|
|
--- 39,47 ----
|
|
int __repmgr_wake_waiting_senders __P((DB_ENV *));
|
|
int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
|
|
void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
|
|
+ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
|
|
+ int __repmgr_alloc_cond __P((cond_var_t *));
|
|
+ int __repmgr_free_cond __P((cond_var_t *));
|
|
int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
|
|
int __repmgr_close_sync __P((DB_ENV *));
|
|
int __repmgr_net_init __P((DB_ENV *, DB_REP *));
|
|
*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 196,204 ****
|
|
int ret;
|
|
|
|
/* Set some default values. */
|
|
! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
|
|
! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
|
|
! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
|
|
db_rep->config_nsites = 0;
|
|
db_rep->peer = DB_EID_INVALID;
|
|
db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
|
|
--- 196,204 ----
|
|
int ret;
|
|
|
|
/* Set some default values. */
|
|
! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
|
|
! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
|
|
! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
|
|
db_rep->config_nsites = 0;
|
|
db_rep->peer = DB_EID_INVALID;
|
|
db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
|
|
***************
|
|
*** 238,243 ****
|
|
--- 238,244 ----
|
|
DB_ENV *dbenv;
|
|
{
|
|
DB_REP *db_rep;
|
|
+ REPMGR_CONNECTION *conn;
|
|
int ret;
|
|
|
|
db_rep = dbenv->rep_handle;
|
|
***************
|
|
*** 254,259 ****
|
|
--- 255,266 ----
|
|
|
|
if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
|
|
goto unlock;
|
|
+
|
|
+ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
|
|
+ if (conn->blockers > 0 &&
|
|
+ ((ret = __repmgr_signal(&conn->drained)) != 0))
|
|
+ goto unlock;
|
|
+ }
|
|
UNLOCK_MUTEX(db_rep->mutex);
|
|
|
|
return (__repmgr_wake_main_thread(dbenv));
|
|
*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 183,192 ****
|
|
|
|
/*
|
|
* Acknowledges a message.
|
|
- *
|
|
- * !!!
|
|
- * Note that this cannot be called from the select() thread, in case we call
|
|
- * __repmgr_bust_connection(..., FALSE).
|
|
*/
|
|
static int
|
|
ack_message(dbenv, generation, lsn)
|
|
--- 183,188 ----
|
|
***************
|
|
*** 227,235 ****
|
|
rec2.size = 0;
|
|
|
|
conn = site->ref.conn;
|
|
if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
|
|
! &control2, &rec2)) == DB_REP_UNAVAIL)
|
|
! ret = __repmgr_bust_connection(dbenv, conn, FALSE);
|
|
}
|
|
|
|
UNLOCK_MUTEX(db_rep->mutex);
|
|
--- 223,236 ----
|
|
rec2.size = 0;
|
|
|
|
conn = site->ref.conn;
|
|
+ /*
|
|
+ * It's hard to imagine anyone would care about a lost ack if
|
|
+ * the path to the master is so congested as to need blocking;
|
|
+ * so pass "blockable" argument as FALSE.
|
|
+ */
|
|
if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
|
|
! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
|
|
! ret = __repmgr_bust_connection(dbenv, conn);
|
|
}
|
|
|
|
UNLOCK_MUTEX(db_rep->mutex);
|
|
*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 63,69 ****
|
|
static void setup_sending_msg
|
|
__P((struct sending_msg *, u_int, const DBT *, const DBT *));
|
|
static int __repmgr_send_internal
|
|
! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
|
|
static int enqueue_msg
|
|
__P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
|
|
static int flatten __P((DB_ENV *, struct sending_msg *));
|
|
--- 63,69 ----
|
|
static void setup_sending_msg
|
|
__P((struct sending_msg *, u_int, const DBT *, const DBT *));
|
|
static int __repmgr_send_internal
|
|
! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
|
|
static int enqueue_msg
|
|
__P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
|
|
static int flatten __P((DB_ENV *, struct sending_msg *));
|
|
***************
|
|
*** 73,85 ****
|
|
* __repmgr_send --
|
|
* The send function for DB_ENV->rep_set_transport.
|
|
*
|
|
- * !!!
|
|
- * This is only ever called as the replication transport call-back, which means
|
|
- * it's either on one of our message processing threads or an application
|
|
- * thread. It mustn't be called from the select() thread, because we might call
|
|
- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
|
|
- * select() thread.
|
|
- *
|
|
* PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
|
|
* PUBLIC: const DB_LSN *, int, u_int32_t));
|
|
*/
|
|
--- 73,78 ----
|
|
***************
|
|
*** 126,134 ****
|
|
}
|
|
|
|
conn = site->ref.conn;
|
|
if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
|
|
! control, rec)) == DB_REP_UNAVAIL &&
|
|
! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
|
|
ret = t_ret;
|
|
if (ret != 0)
|
|
goto out;
|
|
--- 119,128 ----
|
|
}
|
|
|
|
conn = site->ref.conn;
|
|
+ /* Pass the "blockable" argument as TRUE. */
|
|
if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
|
|
! control, rec, TRUE)) == DB_REP_UNAVAIL &&
|
|
! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
|
|
ret = t_ret;
|
|
if (ret != 0)
|
|
goto out;
|
|
***************
|
|
*** 222,228 ****
|
|
if (site->state != SITE_CONNECTED)
|
|
return (NULL);
|
|
|
|
! if (F_ISSET(site->ref.conn, CONN_CONNECTING))
|
|
return (NULL);
|
|
return (site);
|
|
}
|
|
--- 216,222 ----
|
|
if (site->state != SITE_CONNECTED)
|
|
return (NULL);
|
|
|
|
! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
|
|
return (NULL);
|
|
return (site);
|
|
}
|
|
***************
|
|
*** 235,244 ****
|
|
*
|
|
* !!!
|
|
* Caller must hold dbenv->mutex.
|
|
- *
|
|
- * !!!
|
|
- * Note that this cannot be called from the select() thread, in case we call
|
|
- * __repmgr_bust_connection(..., FALSE).
|
|
*/
|
|
static int
|
|
__repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
|
|
--- 229,234 ----
|
|
***************
|
|
*** 268,281 ****
|
|
!IS_VALID_EID(conn->eid))
|
|
continue;
|
|
|
|
! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
|
|
site = SITE_FROM_EID(conn->eid);
|
|
nsites++;
|
|
if (site->priority > 0)
|
|
npeers++;
|
|
} else if (ret == DB_REP_UNAVAIL) {
|
|
! if ((ret = __repmgr_bust_connection(
|
|
! dbenv, conn, FALSE)) != 0)
|
|
return (ret);
|
|
} else
|
|
return (ret);
|
|
--- 258,277 ----
|
|
!IS_VALID_EID(conn->eid))
|
|
continue;
|
|
|
|
! /*
|
|
! * Broadcast messages are either application threads committing
|
|
! * transactions, or replication status message that we can
|
|
! * afford to lose. So don't allow blocking for them (pass
|
|
! * "blockable" argument as FALSE).
|
|
! */
|
|
! if ((ret = __repmgr_send_internal(dbenv,
|
|
! conn, &msg, FALSE)) == 0) {
|
|
site = SITE_FROM_EID(conn->eid);
|
|
nsites++;
|
|
if (site->priority > 0)
|
|
npeers++;
|
|
} else if (ret == DB_REP_UNAVAIL) {
|
|
! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
|
|
return (ret);
|
|
} else
|
|
return (ret);
|
|
***************
|
|
*** 301,339 ****
|
|
* intersperse writes that are part of two single messages.
|
|
*
|
|
* PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
|
|
! * PUBLIC: u_int, const DBT *, const DBT *));
|
|
*/
|
|
int
|
|
! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
u_int msg_type;
|
|
const DBT *control, *rec;
|
|
{
|
|
struct sending_msg msg;
|
|
|
|
setup_sending_msg(&msg, msg_type, control, rec);
|
|
! return (__repmgr_send_internal(dbenv, conn, &msg));
|
|
}
|
|
|
|
/*
|
|
* Attempts a "best effort" to send a message on the given site. If there is an
|
|
! * excessive backlog of message already queued on the connection, we simply drop
|
|
! * this message, and still return 0 even in this case.
|
|
*/
|
|
static int
|
|
! __repmgr_send_internal(dbenv, conn, msg)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
struct sending_msg *msg;
|
|
{
|
|
! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
|
|
REPMGR_IOVECS iovecs;
|
|
SITE_STRING_BUFFER buffer;
|
|
int ret;
|
|
size_t nw;
|
|
size_t total_written;
|
|
|
|
DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
|
|
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
|
|
/*
|
|
--- 297,355 ----
|
|
* intersperse writes that are part of two single messages.
|
|
*
|
|
* PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
|
|
! * PUBLIC: u_int, const DBT *, const DBT *, int));
|
|
*/
|
|
int
|
|
! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
u_int msg_type;
|
|
const DBT *control, *rec;
|
|
+ int blockable;
|
|
{
|
|
struct sending_msg msg;
|
|
|
|
setup_sending_msg(&msg, msg_type, control, rec);
|
|
! return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
|
|
}
|
|
|
|
/*
|
|
* Attempts a "best effort" to send a message on the given site. If there is an
|
|
! * excessive backlog of message already queued on the connection, what shall we
|
|
! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of
|
|
! * time) for the queue to drain. Otherwise we'll simply drop the message. This
|
|
! * is always allowed by the replication protocol. But in the case of a
|
|
! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
|
|
! * almost always get a flood of messages that instantly fills our queue, so
|
|
! * blocking improves performance (by avoiding the need for the client to
|
|
! * re-request).
|
|
! *
|
|
! * How long shall we wait? We could of course create a new timeout
|
|
! * configuration type, so that the application could set it directly. But that
|
|
! * would start to overwhelm the user with too many choices to think about. We
|
|
! * already have an ACK timeout, which is the user's estimate of how long it
|
|
! * should take to send a message to the client, have it be processed, and return
|
|
! * a message back to us. We multiply that by the queue size, because that's how
|
|
! * many messages have to be swallowed up by the client before we're able to
|
|
! * start sending again (at least to a rough approximation).
|
|
*/
|
|
static int
|
|
! __repmgr_send_internal(dbenv, conn, msg, blockable)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
struct sending_msg *msg;
|
|
+ int blockable;
|
|
{
|
|
! DB_REP *db_rep;
|
|
REPMGR_IOVECS iovecs;
|
|
SITE_STRING_BUFFER buffer;
|
|
+ db_timeout_t drain_to;
|
|
int ret;
|
|
size_t nw;
|
|
size_t total_written;
|
|
|
|
+ db_rep = dbenv->rep_handle;
|
|
+
|
|
DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
|
|
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
|
|
/*
|
|
***************
|
|
*** 344,358 ****
|
|
RPRINT(dbenv, (dbenv, "msg to %s to be queued",
|
|
__repmgr_format_eid_loc(dbenv->rep_handle,
|
|
conn->eid, buffer)));
|
|
if (conn->out_queue_length < OUT_QUEUE_LIMIT)
|
|
return (enqueue_msg(dbenv, conn, msg, 0));
|
|
else {
|
|
RPRINT(dbenv, (dbenv, "queue limit exceeded"));
|
|
STAT(dbenv->rep_handle->
|
|
region->mstat.st_msgs_dropped++);
|
|
! return (0);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Send as much data to the site as we can, without blocking. Keep
|
|
--- 360,393 ----
|
|
RPRINT(dbenv, (dbenv, "msg to %s to be queued",
|
|
__repmgr_format_eid_loc(dbenv->rep_handle,
|
|
conn->eid, buffer)));
|
|
+ if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
|
|
+ blockable && !F_ISSET(conn, CONN_CONGESTED)) {
|
|
+ RPRINT(dbenv, (dbenv,
|
|
+ "block msg thread, await queue space"));
|
|
+
|
|
+ if ((drain_to = db_rep->ack_timeout) == 0)
|
|
+ drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
|
|
+ conn->blockers++;
|
|
+ ret = __repmgr_await_drain(dbenv,
|
|
+ conn, drain_to * OUT_QUEUE_LIMIT);
|
|
+ conn->blockers--;
|
|
+ if (db_rep->finished)
|
|
+ return (DB_TIMEOUT);
|
|
+ if (ret != 0)
|
|
+ return (ret);
|
|
+ if (STAILQ_EMPTY(&conn->outbound_queue))
|
|
+ goto empty;
|
|
+ }
|
|
if (conn->out_queue_length < OUT_QUEUE_LIMIT)
|
|
return (enqueue_msg(dbenv, conn, msg, 0));
|
|
else {
|
|
RPRINT(dbenv, (dbenv, "queue limit exceeded"));
|
|
STAT(dbenv->rep_handle->
|
|
region->mstat.st_msgs_dropped++);
|
|
! return (blockable ? DB_TIMEOUT : 0);
|
|
}
|
|
}
|
|
+ empty:
|
|
|
|
/*
|
|
* Send as much data to the site as we can, without blocking. Keep
|
|
***************
|
|
*** 498,521 ****
|
|
|
|
/*
|
|
* Abandons a connection, to recover from an error. Upon entry the conn struct
|
|
! * must be on the connections list.
|
|
! *
|
|
! * If the 'do_close' flag is true, we do the whole job; the clean-up includes
|
|
! * removing the struct from the list and freeing all its memory, so upon return
|
|
! * the caller must not refer to it any further. Otherwise, we merely mark the
|
|
! * connection for clean-up later by the main thread.
|
|
*
|
|
* PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
|
|
! * PUBLIC: REPMGR_CONNECTION *, int));
|
|
*
|
|
* !!!
|
|
* Caller holds mutex.
|
|
*/
|
|
int
|
|
! __repmgr_bust_connection(dbenv, conn, do_close)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
- int do_close;
|
|
{
|
|
DB_REP *db_rep;
|
|
int connecting, ret, eid;
|
|
--- 533,553 ----
|
|
|
|
/*
|
|
* Abandons a connection, to recover from an error. Upon entry the conn struct
|
|
! * must be on the connections list. For now, just mark it as unusable; it will
|
|
! * be fully cleaned up in the top-level select thread, as soon as possible.
|
|
*
|
|
* PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
|
|
! * PUBLIC: REPMGR_CONNECTION *));
|
|
*
|
|
* !!!
|
|
* Caller holds mutex.
|
|
+ *
|
|
+ * Must be idempotent
|
|
*/
|
|
int
|
|
! __repmgr_bust_connection(dbenv, conn)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
{
|
|
DB_REP *db_rep;
|
|
int connecting, ret, eid;
|
|
***************
|
|
*** 526,537 ****
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
eid = conn->eid;
|
|
connecting = F_ISSET(conn, CONN_CONNECTING);
|
|
! if (do_close)
|
|
! __repmgr_cleanup_connection(dbenv, conn);
|
|
! else {
|
|
! F_SET(conn, CONN_DEFUNCT);
|
|
! conn->eid = -1;
|
|
! }
|
|
|
|
/*
|
|
* When we first accepted the incoming connection, we set conn->eid to
|
|
--- 558,566 ----
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
eid = conn->eid;
|
|
connecting = F_ISSET(conn, CONN_CONNECTING);
|
|
!
|
|
! F_SET(conn, CONN_DEFUNCT);
|
|
! conn->eid = -1;
|
|
|
|
/*
|
|
* When we first accepted the incoming connection, we set conn->eid to
|
|
***************
|
|
*** 557,563 ****
|
|
dbenv, ELECT_FAILURE_ELECTION)) != 0)
|
|
return (ret);
|
|
}
|
|
! } else if (!do_close) {
|
|
/*
|
|
* One way or another, make sure the main thread is poked, so
|
|
* that we do the deferred clean-up.
|
|
--- 586,592 ----
|
|
dbenv, ELECT_FAILURE_ELECTION)) != 0)
|
|
return (ret);
|
|
}
|
|
! } else {
|
|
/*
|
|
* One way or another, make sure the main thread is poked, so
|
|
* that we do the deferred clean-up.
|
|
***************
|
|
*** 568,577 ****
|
|
}
|
|
|
|
/*
|
|
! * PUBLIC: void __repmgr_cleanup_connection
|
|
* PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
*/
|
|
! void
|
|
__repmgr_cleanup_connection(dbenv, conn)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
--- 597,610 ----
|
|
}
|
|
|
|
/*
|
|
! * PUBLIC: int __repmgr_cleanup_connection
|
|
* PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
+ *
|
|
+ * !!!
|
|
+ * Idempotent. This can be called repeatedly as blocking message threads (of
|
|
+ * which there could be multiples) wake up in case of error on the connection.
|
|
*/
|
|
! int
|
|
__repmgr_cleanup_connection(dbenv, conn)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
***************
|
|
*** 580,596 ****
|
|
QUEUED_OUTPUT *out;
|
|
REPMGR_FLAT *msg;
|
|
DBT *dbt;
|
|
|
|
db_rep = dbenv->rep_handle;
|
|
|
|
! TAILQ_REMOVE(&db_rep->connections, conn, entries);
|
|
if (conn->fd != INVALID_SOCKET) {
|
|
! (void)closesocket(conn->fd);
|
|
#ifdef DB_WIN32
|
|
! (void)WSACloseEvent(conn->event_object);
|
|
#endif
|
|
}
|
|
|
|
/*
|
|
* Deallocate any input and output buffers we may have.
|
|
*/
|
|
--- 613,643 ----
|
|
QUEUED_OUTPUT *out;
|
|
REPMGR_FLAT *msg;
|
|
DBT *dbt;
|
|
+ int ret;
|
|
|
|
db_rep = dbenv->rep_handle;
|
|
|
|
! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
|
|
!
|
|
if (conn->fd != INVALID_SOCKET) {
|
|
! ret = closesocket(conn->fd);
|
|
! conn->fd = INVALID_SOCKET;
|
|
! if (ret == SOCKET_ERROR) {
|
|
! ret = net_errno;
|
|
! __db_err(dbenv, ret, "closing socket");
|
|
! }
|
|
#ifdef DB_WIN32
|
|
! if (!WSACloseEvent(conn->event_object) && ret != 0)
|
|
! ret = net_errno;
|
|
#endif
|
|
+ if (ret != 0)
|
|
+ return (ret);
|
|
}
|
|
|
|
+ if (conn->blockers > 0)
|
|
+ return (__repmgr_signal(&conn->drained));
|
|
+
|
|
+ TAILQ_REMOVE(&db_rep->connections, conn, entries);
|
|
/*
|
|
* Deallocate any input and output buffers we may have.
|
|
*/
|
|
***************
|
|
*** 614,620 ****
|
|
--- 661,669 ----
|
|
__os_free(dbenv, out);
|
|
}
|
|
|
|
+ ret = __repmgr_free_cond(&conn->drained);
|
|
__os_free(dbenv, conn);
|
|
+ return (ret);
|
|
}
|
|
|
|
static int
|
|
***************
|
|
*** 1063,1069 ****
|
|
|
|
while (!TAILQ_EMPTY(&db_rep->connections)) {
|
|
conn = TAILQ_FIRST(&db_rep->connections);
|
|
! __repmgr_cleanup_connection(dbenv, conn);
|
|
}
|
|
|
|
for (i = 0; i < db_rep->site_cnt; i++) {
|
|
--- 1112,1118 ----
|
|
|
|
while (!TAILQ_EMPTY(&db_rep->connections)) {
|
|
conn = TAILQ_FIRST(&db_rep->connections);
|
|
! (void)__repmgr_cleanup_connection(dbenv, conn);
|
|
}
|
|
|
|
for (i = 0; i < db_rep->site_cnt; i++) {
|
|
*** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 21,26 ****
|
|
--- 21,28 ----
|
|
size_t __repmgr_guesstimated_max = (128 * 1024);
|
|
#endif
|
|
|
|
+ static int __repmgr_conn_work __P((DB_ENV *,
|
|
+ REPMGR_CONNECTION *, fd_set *, fd_set *, int));
|
|
static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
|
|
|
|
/*
|
|
***************
|
|
*** 189,194 ****
|
|
--- 191,284 ----
|
|
}
|
|
|
|
/*
|
|
+ * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
|
|
+ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t));
|
|
+ *
|
|
+ * Waits for space to become available on the connection's output queue.
|
|
+ * Various ways we can exit:
|
|
+ *
|
|
+ * 1. queue becomes non-full
|
|
+ * 2. exceed time limit
|
|
+ * 3. connection becomes defunct (due to error in another thread)
|
|
+ * 4. repmgr is shutting down
|
|
+ * 5. any unexpected system resource failure
|
|
+ *
|
|
+ * In cases #3 and #5 we return an error code. Caller is responsible for
|
|
+ * distinguishing the remaining cases if desired.
|
|
+ *
|
|
+ * !!!
|
|
+ * Caller must hold repmgr->mutex.
|
|
+ */
|
|
+ int
|
|
+ __repmgr_await_drain(dbenv, conn, timeout)
|
|
+ DB_ENV *dbenv;
|
|
+ REPMGR_CONNECTION *conn;
|
|
+ db_timeout_t timeout;
|
|
+ {
|
|
+ DB_REP *db_rep;
|
|
+ struct timespec deadline;
|
|
+ int ret;
|
|
+
|
|
+ db_rep = dbenv->rep_handle;
|
|
+
|
|
+ __repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
|
|
+
|
|
+ ret = 0;
|
|
+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
|
|
+ ret = pthread_cond_timedwait(&conn->drained,
|
|
+ &db_rep->mutex, &deadline);
|
|
+ switch (ret) {
|
|
+ case 0:
|
|
+ if (db_rep->finished)
|
|
+ goto out; /* #4. */
|
|
+ /*
|
|
+ * Another thread could have stumbled into an error on
|
|
+ * the socket while we were waiting.
|
|
+ */
|
|
+ if (F_ISSET(conn, CONN_DEFUNCT)) {
|
|
+ ret = DB_REP_UNAVAIL; /* #3. */
|
|
+ goto out;
|
|
+ }
|
|
+ break;
|
|
+ case ETIMEDOUT:
|
|
+ F_SET(conn, CONN_CONGESTED);
|
|
+ ret = 0;
|
|
+ goto out; /* #2. */
|
|
+ default:
|
|
+ goto out; /* #5. */
|
|
+ }
|
|
+ }
|
|
+ /* #1. */
|
|
+
|
|
+ out:
|
|
+ return (ret);
|
|
+ }
|
|
+
|
|
+ /*
|
|
+ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
|
|
+ *
|
|
+ * Initialize a condition variable (in allocated space).
|
|
+ */
|
|
+ int
|
|
+ __repmgr_alloc_cond(c)
|
|
+ cond_var_t *c;
|
|
+ {
|
|
+ return (pthread_cond_init(c, NULL));
|
|
+ }
|
|
+
|
|
+ /*
|
|
+ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
|
|
+ *
|
|
+ * Clean up a previously initialized condition variable.
|
|
+ */
|
|
+ int
|
|
+ __repmgr_free_cond(c)
|
|
+ cond_var_t *c;
|
|
+ {
|
|
+ return (pthread_cond_destroy(c));
|
|
+ }
|
|
+
|
|
+ /*
|
|
* PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
|
|
*
|
|
* Allocate/initialize all data necessary for thread synchronization. This
|
|
***************
|
|
*** 443,449 ****
|
|
REPMGR_RETRY *retry;
|
|
db_timespec timeout;
|
|
fd_set reads, writes;
|
|
! int ret, flow_control, maxfd, nready;
|
|
u_int8_t buf[10]; /* arbitrary size */
|
|
|
|
flow_control = FALSE;
|
|
--- 533,539 ----
|
|
REPMGR_RETRY *retry;
|
|
db_timespec timeout;
|
|
fd_set reads, writes;
|
|
! int ret, flow_control, maxfd;
|
|
u_int8_t buf[10]; /* arbitrary size */
|
|
|
|
flow_control = FALSE;
|
|
***************
|
|
*** 477,482 ****
|
|
--- 567,575 ----
|
|
* each one.
|
|
*/
|
|
TAILQ_FOREACH(conn, &db_rep->connections, entries) {
|
|
+ if (F_ISSET(conn, CONN_DEFUNCT))
|
|
+ continue;
|
|
+
|
|
if (F_ISSET(conn, CONN_CONNECTING)) {
|
|
FD_SET((u_int)conn->fd, &reads);
|
|
FD_SET((u_int)conn->fd, &writes);
|
|
***************
|
|
*** 533,616 ****
|
|
return (ret);
|
|
}
|
|
}
|
|
- nready = ret;
|
|
-
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
|
|
- /*
|
|
- * The first priority thing we must do is to clean up any
|
|
- * pending defunct connections. Otherwise, if they have any
|
|
- * lingering pending input, we get very confused if we try to
|
|
- * process it.
|
|
- *
|
|
- * The TAILQ_FOREACH macro would be suitable here, except that
|
|
- * it doesn't allow unlinking the current element, which is
|
|
- * needed for cleanup_connection.
|
|
- */
|
|
- for (conn = TAILQ_FIRST(&db_rep->connections);
|
|
- conn != NULL;
|
|
- conn = next) {
|
|
- next = TAILQ_NEXT(conn, entries);
|
|
- if (F_ISSET(conn, CONN_DEFUNCT))
|
|
- __repmgr_cleanup_connection(dbenv, conn);
|
|
- }
|
|
-
|
|
if ((ret = __repmgr_retry_connections(dbenv)) != 0)
|
|
goto out;
|
|
- if (nready == 0)
|
|
- continue;
|
|
|
|
/*
|
|
! * Traverse the linked list. (Again, like TAILQ_FOREACH, except
|
|
! * that we need the ability to unlink an element along the way.)
|
|
*/
|
|
for (conn = TAILQ_FIRST(&db_rep->connections);
|
|
conn != NULL;
|
|
conn = next) {
|
|
next = TAILQ_NEXT(conn, entries);
|
|
! if (F_ISSET(conn, CONN_CONNECTING)) {
|
|
! if (FD_ISSET((u_int)conn->fd, &reads) ||
|
|
! FD_ISSET((u_int)conn->fd, &writes)) {
|
|
! if ((ret = finish_connecting(dbenv,
|
|
! conn)) == DB_REP_UNAVAIL) {
|
|
! if ((ret =
|
|
! __repmgr_bust_connection(
|
|
! dbenv, conn, TRUE)) != 0)
|
|
! goto out;
|
|
! } else if (ret != 0)
|
|
! goto out;
|
|
! }
|
|
! continue;
|
|
! }
|
|
!
|
|
! /*
|
|
! * Here, the site is connected, and the FD_SET's are
|
|
! * valid.
|
|
! */
|
|
! if (FD_ISSET((u_int)conn->fd, &writes)) {
|
|
! if ((ret = __repmgr_write_some(
|
|
! dbenv, conn)) == DB_REP_UNAVAIL) {
|
|
! if ((ret =
|
|
! __repmgr_bust_connection(dbenv,
|
|
! conn, TRUE)) != 0)
|
|
! goto out;
|
|
! continue;
|
|
! } else if (ret != 0)
|
|
! goto out;
|
|
! }
|
|
!
|
|
! if (!flow_control &&
|
|
! FD_ISSET((u_int)conn->fd, &reads)) {
|
|
! if ((ret = __repmgr_read_from_site(dbenv, conn))
|
|
! == DB_REP_UNAVAIL) {
|
|
! if ((ret =
|
|
! __repmgr_bust_connection(dbenv,
|
|
! conn, TRUE)) != 0)
|
|
! goto out;
|
|
! continue;
|
|
! } else if (ret != 0)
|
|
! goto out;
|
|
! }
|
|
}
|
|
|
|
/*
|
|
--- 626,650 ----
|
|
return (ret);
|
|
}
|
|
}
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
|
|
if ((ret = __repmgr_retry_connections(dbenv)) != 0)
|
|
goto out;
|
|
|
|
/*
|
|
! * Examine each connection, to see what work needs to be done.
|
|
! *
|
|
! * The TAILQ_FOREACH macro would be suitable here, except that
|
|
! * it doesn't allow unlinking the current element, which is
|
|
! * needed for cleanup_connection.
|
|
*/
|
|
for (conn = TAILQ_FIRST(&db_rep->connections);
|
|
conn != NULL;
|
|
conn = next) {
|
|
next = TAILQ_NEXT(conn, entries);
|
|
! if ((ret = __repmgr_conn_work(dbenv,
|
|
! conn, &reads, &writes, flow_control)) != 0)
|
|
! goto out;
|
|
}
|
|
|
|
/*
|
|
***************
|
|
*** 637,642 ****
|
|
--- 671,719 ----
|
|
}
|
|
|
|
static int
|
|
+ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
|
|
+ DB_ENV *dbenv;
|
|
+ REPMGR_CONNECTION *conn;
|
|
+ fd_set *reads, *writes;
|
|
+ int flow_control;
|
|
+ {
|
|
+ int ret;
|
|
+ u_int fd;
|
|
+
|
|
+ if (F_ISSET(conn, CONN_DEFUNCT)) {
|
|
+ /*
|
|
+ * Deferred clean-up, from an error that happened in another
|
|
+ * thread, while we were sleeping in select().
|
|
+ */
|
|
+ return (__repmgr_cleanup_connection(dbenv, conn));
|
|
+ }
|
|
+
|
|
+ ret = 0;
|
|
+ fd = (u_int)conn->fd;
|
|
+
|
|
+ if (F_ISSET(conn, CONN_CONNECTING)) {
|
|
+ if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
|
|
+ ret = finish_connecting(dbenv, conn);
|
|
+ } else {
|
|
+ /*
|
|
+ * Here, the site is connected, and the FD_SET's are valid.
|
|
+ */
|
|
+ if (FD_ISSET(fd, writes))
|
|
+ ret = __repmgr_write_some(dbenv, conn);
|
|
+
|
|
+ if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
|
|
+ ret = __repmgr_read_from_site(dbenv, conn);
|
|
+ }
|
|
+
|
|
+ if (ret == DB_REP_UNAVAIL) {
|
|
+ if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
|
|
+ return (ret);
|
|
+ ret = __repmgr_cleanup_connection(dbenv, conn);
|
|
+ }
|
|
+ return (ret);
|
|
+ }
|
|
+
|
|
+ static int
|
|
finish_connecting(dbenv, conn)
|
|
DB_ENV *dbenv;
|
|
REPMGR_CONNECTION *conn;
|
|
***************
|
|
*** 657,662 ****
|
|
--- 734,740 ----
|
|
goto err_rpt;
|
|
}
|
|
|
|
+ DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
|
|
F_CLR(conn, CONN_CONNECTING);
|
|
return (__repmgr_send_handshake(dbenv, conn));
|
|
|
|
***************
|
|
*** 671,690 ****
|
|
"connecting to %s", __repmgr_format_site_loc(site, buffer));
|
|
|
|
/* If we've exhausted the list of possible addresses, give up. */
|
|
! if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
|
|
return (DB_REP_UNAVAIL);
|
|
|
|
/*
|
|
* This is just like a little mini-"bust_connection", except that we
|
|
* don't reschedule for later, 'cuz we're just about to try again right
|
|
! * now.
|
|
*
|
|
* !!!
|
|
* Which means this must only be called on the select() thread, since
|
|
* only there are we allowed to actually close a connection.
|
|
*/
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
! __repmgr_cleanup_connection(dbenv, conn);
|
|
ret = __repmgr_connect_site(dbenv, eid);
|
|
DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
|
|
return (ret);
|
|
--- 749,773 ----
|
|
"connecting to %s", __repmgr_format_site_loc(site, buffer));
|
|
|
|
/* If we've exhausted the list of possible addresses, give up. */
|
|
! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
|
|
! STAT(db_rep->region->mstat.st_connect_fail++);
|
|
return (DB_REP_UNAVAIL);
|
|
+ }
|
|
|
|
/*
|
|
* This is just like a little mini-"bust_connection", except that we
|
|
* don't reschedule for later, 'cuz we're just about to try again right
|
|
! * now. (Note that we don't have to worry about message threads
|
|
! * blocking on a full output queue: that can't happen when we're only
|
|
! * just connecting.)
|
|
*
|
|
* !!!
|
|
* Which means this must only be called on the select() thread, since
|
|
* only there are we allowed to actually close a connection.
|
|
*/
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
|
|
! return (ret);
|
|
ret = __repmgr_connect_site(dbenv, eid);
|
|
DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
|
|
return (ret);
|
|
*** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 36,45 ****
|
|
|
|
/*
|
|
* PUBLIC: int __repmgr_accept __P((DB_ENV *));
|
|
- *
|
|
- * !!!
|
|
- * Only ever called in the select() thread, since we may call
|
|
- * __repmgr_bust_connection(..., TRUE).
|
|
*/
|
|
int
|
|
__repmgr_accept(dbenv)
|
|
--- 36,41 ----
|
|
***************
|
|
*** 133,139 ****
|
|
case 0:
|
|
return (0);
|
|
case DB_REP_UNAVAIL:
|
|
! return (__repmgr_bust_connection(dbenv, conn, TRUE));
|
|
default:
|
|
return (ret);
|
|
}
|
|
--- 129,135 ----
|
|
case 0:
|
|
return (0);
|
|
case DB_REP_UNAVAIL:
|
|
! return (__repmgr_bust_connection(dbenv, conn));
|
|
default:
|
|
return (ret);
|
|
}
|
|
***************
|
|
*** 254,263 ****
|
|
* starting with the "current" element of its address list and trying as many
|
|
* addresses as necessary until the list is exhausted.
|
|
*
|
|
- * !!!
|
|
- * Only ever called in the select() thread, since we may call
|
|
- * __repmgr_bust_connection(..., TRUE).
|
|
- *
|
|
* PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
|
|
*/
|
|
int
|
|
--- 250,255 ----
|
|
***************
|
|
*** 332,338 ****
|
|
case 0:
|
|
break;
|
|
case DB_REP_UNAVAIL:
|
|
! return (__repmgr_bust_connection(dbenv, con, TRUE));
|
|
default:
|
|
return (ret);
|
|
}
|
|
--- 324,330 ----
|
|
case 0:
|
|
break;
|
|
case DB_REP_UNAVAIL:
|
|
! return (__repmgr_bust_connection(dbenv, con));
|
|
default:
|
|
return (ret);
|
|
}
|
|
***************
|
|
*** 437,443 ****
|
|
|
|
DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
|
|
|
|
! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
|
|
}
|
|
|
|
/*
|
|
--- 429,443 ----
|
|
|
|
DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
|
|
|
|
! /*
|
|
! * It would of course be disastrous to block the select() thread, so
|
|
! * pass the "blockable" argument as FALSE. Fortunately blocking should
|
|
! * never be necessary here, because the hand-shake is always the first
|
|
! * thing we send. Which is a good thing, because it would be almost as
|
|
! * disastrous if we allowed ourselves to drop a handshake.
|
|
! */
|
|
! return (__repmgr_send_one(dbenv,
|
|
! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
|
|
}
|
|
|
|
/*
|
|
***************
|
|
*** 854,859 ****
|
|
--- 854,872 ----
|
|
conn->out_queue_length--;
|
|
if (--msg->ref_count <= 0)
|
|
__os_free(dbenv, msg);
|
|
+
|
|
+ /*
|
|
+ * We've achieved enough movement to free up at least
|
|
+ * one space in the outgoing queue. Wake any message
|
|
+ * threads that may be waiting for space. Clear the
|
|
+ * CONGESTED status so that when the queue reaches the
|
|
+ * high-water mark again, the filling thread will be
|
|
+ * allowed to try waiting again.
|
|
+ */
|
|
+ F_CLR(conn, CONN_CONGESTED);
|
|
+ if (conn->blockers > 0 &&
|
|
+ (ret = __repmgr_signal(&conn->drained)) != 0)
|
|
+ return (ret);
|
|
}
|
|
}
|
|
|
|
*** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 103,108 ****
|
|
--- 103,113 ----
|
|
db_rep = dbenv->rep_handle;
|
|
if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
|
|
return (ret);
|
|
+ if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
|
|
+ __os_free(dbenv, c);
|
|
+ return (ret);
|
|
+ }
|
|
+ c->blockers = 0;
|
|
|
|
c->fd = s;
|
|
c->flags = flags;
|
|
*** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700
|
|
--- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700
|
|
***************
|
|
*** 11,16 ****
|
|
--- 11,19 ----
|
|
#define __INCLUDE_NETWORKING 1
|
|
#include "db_int.h"
|
|
|
|
+ /* Convert time-out from microseconds to milliseconds, rounding up. */
|
|
+ #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
|
|
+
|
|
typedef struct __ack_waiter {
|
|
HANDLE event;
|
|
const DB_LSN *lsnp;
|
|
***************
|
|
*** 120,136 ****
|
|
{
|
|
DB_REP *db_rep;
|
|
ACK_WAITER *me;
|
|
! DWORD ret;
|
|
! DWORD timeout;
|
|
|
|
db_rep = dbenv->rep_handle;
|
|
|
|
if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
|
|
goto err;
|
|
|
|
- /* convert time-out from microseconds to milliseconds, rounding up */
|
|
timeout = db_rep->ack_timeout > 0 ?
|
|
! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
|
|
me->lsnp = lsnp;
|
|
if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
|
|
FALSE)) == WAIT_FAILED) {
|
|
--- 123,137 ----
|
|
{
|
|
DB_REP *db_rep;
|
|
ACK_WAITER *me;
|
|
! DWORD ret, timeout;
|
|
|
|
db_rep = dbenv->rep_handle;
|
|
|
|
if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
|
|
goto err;
|
|
|
|
timeout = db_rep->ack_timeout > 0 ?
|
|
! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
|
|
me->lsnp = lsnp;
|
|
if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
|
|
FALSE)) == WAIT_FAILED) {
|
|
***************
|
|
*** 211,216 ****
|
|
--- 212,296 ----
|
|
db_rep->waiters->first_free = slot;
|
|
}
|
|
|
|
+ /* (See requirements described in repmgr_posix.c.) */
|
|
+ int
|
|
+ __repmgr_await_drain(dbenv, conn, timeout)
|
|
+ DB_ENV *dbenv;
|
|
+ REPMGR_CONNECTION *conn;
|
|
+ db_timeout_t timeout;
|
|
+ {
|
|
+ DB_REP *db_rep;
|
|
+ db_timespec deadline, delta, now;
|
|
+ db_timeout_t t;
|
|
+ DWORD duration, ret;
|
|
+ int round_up;
|
|
+
|
|
+ db_rep = dbenv->rep_handle;
|
|
+
|
|
+ __os_gettime(dbenv, &deadline);
|
|
+ DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
|
|
+ timespecadd(&deadline, &delta);
|
|
+
|
|
+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
|
|
+ if (!ResetEvent(conn->drained))
|
|
+ return (GetLastError());
|
|
+
|
|
+ /* How long until the deadline? */
|
|
+ __os_gettime(dbenv, &now);
|
|
+ if (timespeccmp(&now, &deadline, >=)) {
|
|
+ F_SET(conn, CONN_CONGESTED);
|
|
+ return (0);
|
|
+ }
|
|
+ delta = deadline;
|
|
+ timespecsub(&delta, &now);
|
|
+ round_up = TRUE;
|
|
+ DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
|
|
+ duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
|
|
+
|
|
+ ret = SignalObjectAndWait(db_rep->mutex,
|
|
+ conn->drained, duration, FALSE);
|
|
+ LOCK_MUTEX(db_rep->mutex);
|
|
+ if (ret == WAIT_FAILED)
|
|
+ return (GetLastError());
|
|
+ else if (ret == WAIT_TIMEOUT) {
|
|
+ F_SET(conn, CONN_CONGESTED);
|
|
+ return (0);
|
|
+ } else
|
|
+ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
|
|
+
|
|
+ if (db_rep->finished)
|
|
+ return (0);
|
|
+ if (F_ISSET(conn, CONN_DEFUNCT))
|
|
+ return (DB_REP_UNAVAIL);
|
|
+ }
|
|
+ return (0);
|
|
+ }
|
|
+
|
|
+ /*
|
|
+ * Creates a manual reset event, which is usually our best choice when we may
|
|
+ * have multiple threads waiting on a single event.
|
|
+ */
|
|
+ int
|
|
+ __repmgr_alloc_cond(c)
|
|
+ cond_var_t *c;
|
|
+ {
|
|
+ HANDLE event;
|
|
+
|
|
+ if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
|
|
+ return (GetLastError());
|
|
+ *c = event;
|
|
+ return (0);
|
|
+ }
|
|
+
|
|
+ int
|
|
+ __repmgr_free_cond(c)
|
|
+ cond_var_t *c;
|
|
+ {
|
|
+ if (CloseHandle(*c))
|
|
+ return (0);
|
|
+ return (GetLastError());
|
|
+ }
|
|
+
|
|
/*
|
|
* Make resource allocation an all-or-nothing affair, outside of this and the
|
|
* close_sync function. db_rep->waiters should be non-NULL iff all of these
|
|
***************
|
|
*** 488,493 ****
|
|
--- 568,576 ----
|
|
* don't hurt anything flow-control-wise.
|
|
*/
|
|
TAILQ_FOREACH(conn, &db_rep->connections, entries) {
|
|
+ if (F_ISSET(conn, CONN_DEFUNCT))
|
|
+ continue;
|
|
+
|
|
if (F_ISSET(conn, CONN_CONNECTING) ||
|
|
!STAILQ_EMPTY(&conn->outbound_queue) ||
|
|
(!flow_control || !IS_VALID_EID(conn->eid))) {
|
|
***************
|
|
*** 534,541 ****
|
|
conn != NULL;
|
|
conn = next) {
|
|
next = TAILQ_NEXT(conn, entries);
|
|
! if (F_ISSET(conn, CONN_DEFUNCT))
|
|
! __repmgr_cleanup_connection(dbenv, conn);
|
|
}
|
|
|
|
/*
|
|
--- 617,626 ----
|
|
conn != NULL;
|
|
conn = next) {
|
|
next = TAILQ_NEXT(conn, entries);
|
|
! if (F_ISSET(conn, CONN_DEFUNCT) &&
|
|
! (ret = __repmgr_cleanup_connection(dbenv,
|
|
! conn)) != 0)
|
|
! goto unlock;
|
|
}
|
|
|
|
/*
|
|
***************
|
|
*** 587,597 ****
|
|
return (ret);
|
|
}
|
|
|
|
- /*
|
|
- * !!!
|
|
- * Only ever called on the select() thread, since we may call
|
|
- * __repmgr_bust_connection(..., TRUE).
|
|
- */
|
|
static int
|
|
handle_completion(dbenv, conn)
|
|
DB_ENV *dbenv;
|
|
--- 672,677 ----
|
|
***************
|
|
*** 651,660 ****
|
|
}
|
|
}
|
|
|
|
! return (0);
|
|
!
|
|
! err: if (ret == DB_REP_UNAVAIL)
|
|
! return (__repmgr_bust_connection(dbenv, conn, TRUE));
|
|
return (ret);
|
|
}
|
|
|
|
--- 731,742 ----
|
|
}
|
|
}
|
|
|
|
! err:
|
|
! if (ret == DB_REP_UNAVAIL) {
|
|
! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
|
|
! return (ret);
|
|
! ret = __repmgr_cleanup_connection(dbenv, conn);
|
|
! }
|
|
return (ret);
|
|
}
|
|
|
|
***************
|
|
*** 708,714 ****
|
|
}
|
|
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
! __repmgr_cleanup_connection(dbenv, conn);
|
|
ret = __repmgr_connect_site(dbenv, eid);
|
|
DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
|
|
return (ret);
|
|
--- 790,797 ----
|
|
}
|
|
|
|
DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
|
|
! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
|
|
! return (ret);
|
|
ret = __repmgr_connect_site(dbenv, eid);
|
|
DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
|
|
return (ret);
|