rpm/db/repmgr/repmgr_util.c

427 lines
10 KiB
C

/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 2005,2007 Oracle. All rights reserved.
*
* $Id: repmgr_util.c,v 1.34 2007/06/11 18:29:34 alanb Exp $
*/
#include "db_config.h"
#define __INCLUDE_NETWORKING 1
#include "db_int.h"
/*
* Schedules a future attempt to re-establish a connection with the given site.
* Usually, we wait the configured retry_wait period. But if the "immediate"
* parameter is given as TRUE, we'll make the wait time 0, and put the request
* at the _beginning_ of the retry queue. Note how this allows us to preserve
* the property that the queue stays in time order simply by appending to the
* end.
*
* PUBLIC: int __repmgr_schedule_connection_attempt __P((DB_ENV *, u_int, int));
*
* !!!
* Caller should hold mutex.
*
* Unless an error occurs, we always attempt to wake the main thread;
* __repmgr_bust_connection relies on this behavior.
*/
int
__repmgr_schedule_connection_attempt(dbenv, eid, immediate)
DB_ENV *dbenv;
u_int eid;
int immediate;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_RETRY *retry;
db_timespec t, v;
int ret;
db_rep = dbenv->rep_handle;
if ((ret = __os_malloc(dbenv, sizeof(*retry), &retry)) != 0)
return (ret);
__os_gettime(dbenv, &t);
if (immediate)
TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
else {
DB_TIMEOUT_TO_TIMESPEC(db_rep->connection_retry_wait, &v);
timespecadd(&t, &v);
TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
}
retry->eid = eid;
retry->time = t;
site = SITE_FROM_EID(eid);
site->state = SITE_IDLE;
site->ref.retry = retry;
return (__repmgr_wake_main_thread(dbenv));
}
/*
* Initialize the necessary control structures to begin reading a new input
* message.
*
* PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
*/
void
__repmgr_reset_for_reading(con)
REPMGR_CONNECTION *con;
{
con->reading_phase = SIZES_PHASE;
__repmgr_iovec_init(&con->iovecs);
__repmgr_add_buffer(&con->iovecs, &con->msg_type,
sizeof(con->msg_type));
__repmgr_add_buffer(&con->iovecs, &con->control_size_buf,
sizeof(con->control_size_buf));
__repmgr_add_buffer(&con->iovecs, &con->rec_size_buf,
sizeof(con->rec_size_buf));
}
/*
* Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of
* connections. It does not initialize eid, since that isn't needed and/or
* immediately known in all cases.
*
* PUBLIC: int __repmgr_new_connection __P((DB_ENV *, REPMGR_CONNECTION **,
* PUBLIC: socket_t, u_int32_t));
*/
int
__repmgr_new_connection(dbenv, connp, s, flags)
DB_ENV *dbenv;
REPMGR_CONNECTION **connp;
socket_t s;
u_int32_t flags;
{
DB_REP *db_rep;
REPMGR_CONNECTION *c;
int ret;
db_rep = dbenv->rep_handle;
if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
return (ret);
c->fd = s;
c->flags = flags;
STAILQ_INIT(&c->outbound_queue);
c->out_queue_length = 0;
__repmgr_reset_for_reading(c);
TAILQ_INSERT_TAIL(&db_rep->connections, c, entries);
*connp = c;
return (0);
}
/*
* PUBLIC: int __repmgr_new_site __P((DB_ENV *, REPMGR_SITE**,
* PUBLIC: const repmgr_netaddr_t *, int));
*
* !!!
* Caller must hold mutex.
*/
int
__repmgr_new_site(dbenv, sitep, addr, state)
DB_ENV *dbenv;
REPMGR_SITE **sitep;
const repmgr_netaddr_t *addr;
int state;
{
DB_REP *db_rep;
REPMGR_SITE *site;
SITE_STRING_BUFFER buffer;
u_int new_site_max, eid;
int ret;
db_rep = dbenv->rep_handle;
if (db_rep->site_cnt >= db_rep->site_max) {
#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */
new_site_max = db_rep->site_max == 0 ?
INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
if ((ret = __os_realloc(dbenv,
sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0)
return (ret);
db_rep->site_max = new_site_max;
}
eid = db_rep->site_cnt++;
site = &db_rep->sites[eid];
memcpy(&site->net_addr, addr, sizeof(*addr));
ZERO_LSN(site->max_ack);
site->priority = -1; /* OOB value indicates we don't yet know. */
site->state = state;
RPRINT(dbenv, (dbenv, "EID %u is assigned for %s", eid,
__repmgr_format_site_loc(site, buffer)));
*sitep = site;
return (0);
}
/*
* Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to
* by the addr.
*
* PUBLIC: void __repmgr_cleanup_netaddr __P((DB_ENV *, repmgr_netaddr_t *));
*/
void
__repmgr_cleanup_netaddr(dbenv, addr)
DB_ENV *dbenv;
repmgr_netaddr_t *addr;
{
if (addr->address_list != NULL) {
__db_freeaddrinfo(dbenv, addr->address_list);
addr->address_list = addr->current = NULL;
}
if (addr->host != NULL) {
__os_free(dbenv, addr->host);
addr->host = NULL;
}
}
/*
* PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
*/
void
__repmgr_iovec_init(v)
REPMGR_IOVECS *v;
{
v->offset = v->count = 0;
v->total_bytes = 0;
}
/*
* PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
*
* !!!
* There is no checking for overflow of the vectors[5] array.
*/
void
__repmgr_add_buffer(v, address, length)
REPMGR_IOVECS *v;
void *address;
size_t length;
{
v->vectors[v->count].iov_base = address;
v->vectors[v->count++].iov_len = length;
v->total_bytes += length;
}
/*
* PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
*/
void
__repmgr_add_dbt(v, dbt)
REPMGR_IOVECS *v;
const DBT *dbt;
{
v->vectors[v->count].iov_base = dbt->data;
v->vectors[v->count++].iov_len = dbt->size;
v->total_bytes += dbt->size;
}
/*
* Update a set of iovecs to reflect the number of bytes transferred in an I/O
* operation, so that the iovecs can be used to continue transferring where we
* left off.
* Returns TRUE if the set of buffers is now fully consumed, FALSE if more
* remains.
*
* PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
*/
int
__repmgr_update_consumed(v, byte_count)
REPMGR_IOVECS *v;
size_t byte_count;
{
db_iovec_t *iov;
int i;
for (i = v->offset; ; i++) {
DB_ASSERT(NULL, i < v->count && byte_count > 0);
iov = &v->vectors[i];
if (byte_count > iov->iov_len) {
/*
* We've consumed (more than) this vector's worth.
* Adjust count and continue.
*/
byte_count -= iov->iov_len;
} else {
/* Adjust length of remaining portion of vector. */
iov->iov_len -= byte_count;
if (iov->iov_len > 0) {
/*
* Still some left in this vector. Adjust base
* address too, and leave offset pointing here.
*/
iov->iov_base = (void *)
((u_int8_t *)iov->iov_base + byte_count);
v->offset = i;
} else {
/*
* Consumed exactly to a vector boundary.
* Advance to next vector for next time.
*/
v->offset = i+1;
}
/*
* If offset has reached count, the entire thing is
* consumed.
*/
return (v->offset >= v->count);
}
}
}
/*
* Builds a buffer containing our network address information, suitable for
* publishing as cdata via a call to rep_start, and sets up the given DBT to
* point to it. The buffer is dynamically allocated memory, and the caller must
* assume responsibility for it.
*
* PUBLIC: int __repmgr_prepare_my_addr __P((DB_ENV *, DBT *));
*/
int
__repmgr_prepare_my_addr(dbenv, dbt)
DB_ENV *dbenv;
DBT *dbt;
{
DB_REP *db_rep;
size_t size, hlen;
u_int16_t port_buffer;
u_int8_t *ptr;
int ret;
db_rep = dbenv->rep_handle;
/*
* The cdata message consists of the 2-byte port number, in network byte
* order, followed by the null-terminated host name string.
*/
port_buffer = htons(db_rep->my_addr.port);
size = sizeof(port_buffer) +
(hlen = strlen(db_rep->my_addr.host) + 1);
if ((ret = __os_malloc(dbenv, size, &ptr)) != 0)
return (ret);
DB_INIT_DBT(*dbt, ptr, size);
memcpy(ptr, &port_buffer, sizeof(port_buffer));
ptr = &ptr[sizeof(port_buffer)];
memcpy(ptr, db_rep->my_addr.host, hlen);
return (0);
}
/*
* Provide the appropriate value for nsites, the number of sites in the
* replication group. If the application has specified a value, use that.
* Otherwise, just use the number of sites we know of.
*
* PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *));
*/
u_int
__repmgr_get_nsites(db_rep)
DB_REP *db_rep;
{
if (db_rep->config_nsites > 0)
return ((u_int)db_rep->config_nsites);
/*
* The number of other sites in our table, plus 1 to count ourself.
*/
return (db_rep->site_cnt + 1);
}
/*
* PUBLIC: void __repmgr_thread_failure __P((DB_ENV *, int));
*/
void
__repmgr_thread_failure(dbenv, why)
DB_ENV *dbenv;
int why;
{
(void)__repmgr_stop_threads(dbenv);
(void)__db_panic(dbenv, why);
}
/*
* Format a printable representation of a site location, suitable for inclusion
* in an error message. The buffer must be at least as big as
* MAX_SITE_LOC_STRING.
*
* PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *));
*/
char *
__repmgr_format_eid_loc(db_rep, eid, buffer)
DB_REP *db_rep;
int eid;
char *buffer;
{
if (IS_VALID_EID(eid))
return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer));
snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
return (buffer);
}
/*
* PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
*/
char *
__repmgr_format_site_loc(site, buffer)
REPMGR_SITE *site;
char *buffer;
{
snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
site->net_addr.host, (u_long)site->net_addr.port);
return (buffer);
}
/*
* __repmgr_timespec_diff_now --
* Calculate the time duration from now til "when".
*
* PUBLIC: void __repmgr_timespec_diff_now
* PUBLIC: __P((DB_ENV *, db_timespec *, db_timespec *));
*/
void
__repmgr_timespec_diff_now(dbenv, when, result)
DB_ENV *dbenv;
db_timespec *when, *result;
{
db_timespec now;
__os_gettime(dbenv, &now);
if (timespeccmp(&now, when, >=))
timespecclear(result);
else {
*result = *when;
timespecsub(result, &now);
}
}
/*
* PUBLIC: int __repmgr_repstart __P((DB_ENV *, u_int32_t));
*/
int
__repmgr_repstart(dbenv, flags)
DB_ENV *dbenv;
u_int32_t flags;
{
DBT my_addr;
int ret;
if ((ret = __repmgr_prepare_my_addr(dbenv, &my_addr)) != 0)
return (ret);
ret = __rep_start(dbenv, &my_addr, flags);
__os_free(dbenv, my_addr.data);
if (ret != 0)
__db_err(dbenv, ret, "rep_start");
return (ret);
}