rpm/db/rep/rep_record.c

1024 lines
28 KiB
C

/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 2001
* Sleepycat Software. All rights reserved.
*/
#include "db_config.h"
#ifndef lint
static const char revid[] = "Id: rep_record.c,v 1.64 2001/11/16 16:29:10 bostic Exp ";
#endif /* not lint */
#ifndef NO_SYSTEM_INCLUDES
#include <string.h>
#endif
#include "db_int.h"
#include "log.h"
#include "txn.h"
#include "rep.h"
#include "db_page.h"
#include "db_am.h"
#include "db_shash.h"
#include "lock.h"
static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *));
static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
static int __rep_process_txn __P((DB_ENV *, DBT *));
#define IS_SIMPLE(R) \
((R) != DB_txn_regop && (R) != DB_txn_ckp && (R) != DB_log_register)
/*
* This is a bit of a hack. If we set the offset to be the sizeof the
* persistent log structure, then we'll match the correct LSN on the
* next log write.
*
* If lp->ready_lsn is [1][0], we need to "change" to the first log
* file (we currently have none). However, in this situation, we
* don't want to wind up at LSN [2][whatever], we want to wind up at
* LSN [1][whatever], so don't set LOG_NEWFILE. The guts of the log
* system will take care of actually writing the persistent header,
* since we're doing a log_put to an empty log.
*
* If lp->ready_lsn is [m-1][n] for some m > 1, n > 0, we really do need to
* change to the first log file. Not only do we need to jump to lsn
* [m][0], we need to write out a persistent header there, so set
* LOG_NEWFILE so the right stuff happens in the bowels of log_put.
* Note that we could dispense with LOG_NEWFILE by simply relying upon
* the log system to decide to switch files at the same time the
* master did--lg_max should be the same in both places--but this is
* scary.
*/
#define CHANGE_FILES do { \
if (!(lp->ready_lsn.file == 1 && lp->ready_lsn.offset == 0)) { \
lp->ready_lsn.file++; \
F_SET(lp, LOG_NEWFILE); \
} \
lp->ready_lsn.offset = sizeof(struct __log_persist) + \
sizeof(struct __hdr); \
/* Make this evaluate to a simple rectype. */ \
rectype = 0; \
} while (0)
/*
* __rep_process_message --
*
* This routine takes an incoming message and processes it.
*
* control: contains the control fields from the record
* rec: contains the actual record
* eidp: contains the machine id of the sender of the message;
* in the case of a DB_NEWMASTER message, returns the eid
* of the new master.
*
* PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *));
*/
int
__rep_process_message(dbenv, control, rec, eidp)
DB_ENV *dbenv;
DBT *control, *rec;
int *eidp;
{
DBT *d, data_dbt, lsndbt, mylog;
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN lsn, newfilelsn, oldfilelsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
REP_CONTROL *rp;
REP_VOTE_INFO *vi;
u_int32_t gen, type;
int done, i, master, old, recovering, ret, t_ret, *tally;
PANIC_CHECK(dbenv);
/* Control argument must be non-Null. */
if (control == NULL || control->size == 0) {
__db_err(dbenv,
"DB_ENV->rep_process_message: control argument must be specified");
return (EINVAL);
}
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
gen = rep->gen;
recovering = F_ISSET(rep, REP_F_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
/*
* dbenv->rep_db is the handle for the repository used for applying log
* records.
*/
rp = (REP_CONTROL *)control->data;
/* Complain if we see an improper version number. */
if (rp->rep_version != DB_REPVERSION) {
__db_err(dbenv,
"unexpected replication message version %d, expected %d",
rp->rep_version, DB_REPVERSION);
return (EINVAL);
}
if (rp->log_version != DB_LOGVERSION) {
__db_err(dbenv,
"unexpected log record version %d, expected %d",
rp->log_version, DB_LOGVERSION);
return (EINVAL);
}
/*
* Check for generation number matching. Ignore any old messages
* except requests for ALIVE since the sender needs those to
* sync back up. If the message is newer, then we are out of
* sync and need to catch up with the rest of the system.
*/
if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
rp->rectype != REP_NEWCLIENT)
return (0);
if (rp->gen > gen && rp->rectype != REP_ALIVE &&
rp->rectype != REP_NEWMASTER)
return (__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0));
/*
* We need to check if we're in recovery and if we are
* then we need to ignore any messages except VERIFY, VOTE,
* ELECT (the master might fail while we are recovering), and
* ALIVE_REQ.
*/
if (recovering)
switch(rp->rectype) {
case REP_ALIVE:
case REP_ALIVE_REQ:
case REP_ELECT:
case REP_NEWCLIENT:
case REP_NEWMASTER:
case REP_NEWSITE:
case REP_VERIFY:
case REP_VOTE1:
case REP_VOTE2:
break;
default:
return (0);
}
switch(rp->rectype) {
case REP_ALIVE:
ANYSITE(dbenv);
if (rp->gen > gen && rp->flags)
return (__rep_new_master(dbenv, rp, *eidp));
break;
case REP_ALIVE_REQ:
ANYSITE(dbenv);
dblp = dbenv->lg_handle;
R_LOCK(dbenv, &dblp->reginfo);
lsn = ((LOG *)dblp->reginfo.primary)->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (__rep_send_message(dbenv,
*eidp, REP_ALIVE, &lsn, NULL,
F_ISSET(dbenv, DB_ENV_REP_MASTER) ? 1 : 0));
case REP_ALL_REQ:
MASTER_ONLY(dbenv);
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
goto err;
memset(&data_dbt, 0, sizeof(data_dbt));
oldfilelsn = lsn = rp->lsn;
for (ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET);
ret == 0;
ret = logc->get(logc, &lsn, &data_dbt, DB_NEXT)) {
/*
* lsn.offset will only be 0 if this is the
* beginning of the log; DB_SET, but not DB_NEXT,
* can set the log cursor to [n][0].
*/
if (lsn.offset == 0)
ret = __rep_send_message(dbenv, *eidp,
REP_NEWFILE, &lsn, NULL, 0);
else {
/*
* DB_NEXT will never run into offsets
* of 0; thus, when a log file changes,
* we'll have a real log record with
* some lsn [n][m], and we'll also want to send
* a NEWFILE message with lsn [n][0].
* So that the client can detect gaps,
* send in the rec parameter the
* last LSN in the old file.
*/
if (lsn.file != oldfilelsn.file) {
newfilelsn.file = lsn.file;
newfilelsn.offset = 0;
memset(&lsndbt, 0, sizeof(DBT));
lsndbt.size = sizeof(DB_LSN);
lsndbt.data = &oldfilelsn;
if ((ret = __rep_send_message(dbenv,
*eidp, REP_NEWFILE, &newfilelsn,
&lsndbt, 0)) != 0)
break;
}
ret = __rep_send_message(dbenv, *eidp,
REP_LOG, &lsn, &data_dbt, 0);
}
/*
* In case we're about to change files and need it
* for a NEWFILE message, save the current LSN.
*/
oldfilelsn = lsn;
}
if (ret == DB_NOTFOUND)
ret = 0;
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
ret = t_ret;
return (ret);
case REP_ELECT:
if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0));
}
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
ret = IN_ELECTION(rep) ? 0 : DB_REP_HOLDELECTION;
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
return (ret);
#ifdef NOTYET
case REP_FILE: /* TODO */
CLIENT_ONLY(dbenv);
break;
case REP_FILE_REQ:
MASTER_ONLY(dbenv);
return (__rep_send_file(dbenv, rec, *eidp));
break;
#endif
case REP_LOG:
CLIENT_ONLY(dbenv);
return (__rep_apply(dbenv, rp, rec));
case REP_LOG_REQ:
MASTER_ONLY(dbenv);
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
goto err;
memset(&data_dbt, 0, sizeof(data_dbt));
lsn = rp->lsn;
if ((ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET)) == 0) {
/*
* If the log file has changed, we may get back a
* log record with a later LSN than we requested.
* This most likely means that the log file
* changed, so we need to send a NEWFILE message.
*/
if (log_compare(&lsn, &rp->lsn) < 0 &&
rp->lsn.offset == 0)
ret = __rep_send_message(dbenv, *eidp,
REP_NEWFILE, &lsn, NULL, 0);
else
ret = __rep_send_message(dbenv, *eidp,
REP_LOG, &rp->lsn, &data_dbt, 0);
}
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
ret = t_ret;
return (ret);
case REP_NEWSITE:
/* This is a rebroadcast; simply tell the application. */
if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
}
return (DB_REP_NEWSITE);
case REP_NEWCLIENT:
/*
* This message was received and should have resulted in the
* application entering the machine ID in its machine table.
* We respond to this with an ALIVE to send relevant information
* to the new client. But first, broadcast the new client's
* record to all the clients.
*/
if ((ret = __rep_send_message(dbenv,
DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0)) != 0)
goto err;
if (F_ISSET(dbenv, DB_ENV_REP_CLIENT))
return (0);
/* FALLTHROUGH */
case REP_MASTER_REQ:
MASTER_ONLY(dbenv);
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0));
case REP_NEWFILE:
CLIENT_ONLY(dbenv);
return (__rep_apply(dbenv, rp, rec));
case REP_NEWMASTER:
ANYSITE(dbenv);
if (F_ISSET(dbenv, DB_ENV_REP_MASTER) &&
*eidp != dbenv->rep_eid)
return (DB_REP_DUPMASTER);
return (__rep_new_master(dbenv, rp, *eidp));
case REP_PAGE: /* TODO */
CLIENT_ONLY(dbenv);
break;
case REP_PAGE_REQ: /* TODO */
MASTER_ONLY(dbenv);
break;
case REP_PLIST: /* TODO */
CLIENT_ONLY(dbenv);
break;
case REP_PLIST_REQ: /* TODO */
MASTER_ONLY(dbenv);
break;
case REP_VERIFY:
CLIENT_ONLY(dbenv);
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
goto err;
memset(&mylog, 0, sizeof(mylog));
if ((ret = logc->get(logc, &rp->lsn, &mylog, DB_SET)) != 0)
goto rep_verify_err;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
if (mylog.size == rec->size &&
memcmp(mylog.data, rec->data, rec->size) == 0) {
ret = __db_apprec(dbenv, &rp->lsn, 0);
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
F_CLR(rep, REP_F_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
ret = __rep_send_message(dbenv, rep->master_id,
REP_ALL_REQ, &rp->lsn, NULL, 0);
} else if ((ret = logc->get(logc, &lsn, &mylog, DB_PREV)) == 0)
ret = __rep_send_message(dbenv,
*eidp, REP_VERIFY_REQ, &lsn, NULL, 0);
rep_verify_err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
ret = t_ret;
goto err;
case REP_VERIFY_FAIL:
return (DB_REP_OUTDATED);
case REP_VERIFY_REQ:
MASTER_ONLY(dbenv);
type = REP_VERIFY;
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
goto err;
d = &data_dbt;
memset(d, 0, sizeof(data_dbt));
ret = logc->get(logc, &rp->lsn, d, DB_SET);
/*
* If the LSN was invalid, then we might get a not
* found, we might get an EIO, we could get anything.
* If we get a DB_NOTFOUND, then there is a chance that
* the LSN comes before the first file present in which
* case we need to return a fail so that the client can return
* a DB_OUTDATED.
*/
if (ret == DB_NOTFOUND &&
__log_is_outdated(dbenv, rp->lsn.file, &old) == 0 &&
old != 0)
type = REP_VERIFY_FAIL;
if (ret != 0)
d = NULL;
ret = __rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0);
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
ret = t_ret;
goto err;
case REP_VOTE1:
if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Master received vote");
#endif
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0));
}
vi = (REP_VOTE_INFO *)rec->data;
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
/*
* If you get a vote and you're not in an election, simply
* return an indicator to hold an election which will trigger
* this site to send its vote again.
*/
if (!IN_ELECTION(rep)) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Not in election, but received vote1");
#endif
ret = DB_REP_HOLDELECTION;
goto unlock;
}
if (F_ISSET(rep, REP_F_EPHASE2))
goto unlock;
/* Check if this site knows about more sites than we do. */
if (vi->nsites > rep->nsites)
rep->nsites = vi->nsites;
/* Check if we've heard from this site already. */
tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
for (i = 0; i < rep->sites; i++) {
if (tally[i] == *eidp)
/* Duplicate vote. */
goto unlock;
}
/*
* We are keeping vote, let's see if that changes our count of
* the number of sites.
*/
if (rep->sites + 1 > rep->nsites)
rep->nsites = rep->sites + 1;
if (rep->nsites > rep->asites &&
(ret = __rep_grow_sites(dbenv, rep->nsites)) != 0)
goto unlock;
tally[rep->sites] = *eidp;
rep->sites++;
/*
* Change winners if the incoming record has a higher
* priority, or an equal priority but a larger LSN.
*/
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
__db_err(dbenv,
"Existing vote: (eid)%d (pri)%d (gen)%d [%d,%d]",
rep->winner, rep->w_priority, rep->w_gen,
rep->w_lsn.file, rep->w_lsn.offset);
__db_err(dbenv,
"Incoming vote: (eid)%d (pri)%d (gen)%d [%d,%d]",
*eidp, vi->priority, rp->gen, rp->lsn.file,
rp->lsn.offset);
}
#endif
if (vi->priority > rep->w_priority ||
(vi->priority != 0 && vi->priority == rep->w_priority &&
log_compare(&rp->lsn, &rep->w_lsn) > 0)) {
#ifdef DIABNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Accepting new vote");
#endif
rep->winner = *eidp;
rep->w_priority = vi->priority;
rep->w_lsn = rp->lsn;
rep->w_gen = rp->gen;
}
master = rep->winner;
lsn = rep->w_lsn;
done = rep->sites == rep->nsites && rep->w_priority != 0;
if (done) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
__db_err(dbenv, "Phase1 election done");
__db_err(dbenv, "Voting for %d%s",
master, master == rep->eid ? "(self)" : "");
}
#endif
F_CLR(rep, REP_F_EPHASE1);
F_SET(rep, REP_F_EPHASE2);
}
if (done && master == rep->eid) {
rep->votes++;
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
return (0);
}
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
/* Vote for someone else. */
if (done)
return (__rep_send_message(dbenv,
master, REP_VOTE2, NULL, NULL, 0));
/* Election is still going on. */
break;
case REP_VOTE2:
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "We received a vote%s",
F_ISSET(dbenv, DB_ENV_REP_MASTER) ?
" (master)" : "");
#endif
if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0));
}
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
/* If we have priority 0, we should never get a vote. */
DB_ASSERT(rep->priority != 0);
if (!IN_ELECTION(rep) && rep->master_id != DB_EID_INVALID) {
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "Not in election, got vote");
#endif
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
return (DB_REP_HOLDELECTION);
}
/* avoid counting duplicates. */
rep->votes++;
done = rep->votes > rep->nsites / 2;
if (done) {
rep->master_id = rep->eid;
rep->gen = rep->w_gen + 1;
ELECTION_DONE(rep);
F_CLR(rep, REP_F_UPGRADE);
F_SET(rep, REP_F_MASTER);
*eidp = rep->master_id;
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv,
"Got enough votes to win; election done; winner is %d",
rep->master_id);
#endif
}
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
if (done) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
/* Declare me the winner. */
#ifdef DIAGNOSTIC
if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
__db_err(dbenv, "I won, sending NEWMASTER");
#endif
if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST,
REP_NEWMASTER, &lsn, NULL, 0)) != 0)
break;
return (DB_REP_NEWMASTER);
}
break;
default:
__db_err(dbenv,
"DB_ENV->rep_process_message: unknown replication message: type %lu",
(u_long)rp->rectype);
return (EINVAL);
}
return (0);
unlock: MUTEX_UNLOCK(dbenv, db_rep->mutexp);
err: return (ret);
}
/*
* __rep_apply --
*
* Handle incoming log records on a client, applying when possible and
* entering into the bookkeeping table otherwise. This is the guts of
* the routine that handles the state machine that describes how we
* process and manage incoming log records.
*/
static int
__rep_apply(dbenv, rp, rec)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
{
__txn_ckp_args ckp_args;
DB_REP *db_rep;
DBT data_dbt, key_dbt;
DB *dbp;
DBC *dbc;
DB_LOG *dblp;
DB_LSN ckp_lsn, lsn, next_lsn;
LOG *lp;
int cmp, eid, ret, retry_count, t_ret;
u_int32_t rectype;
db_rep = dbenv->rep_handle;
dbp = db_rep->rep_db;
dbc = NULL;
ret = 0;
retry_count = 0;
memset(&key_dbt, 0, sizeof(key_dbt));
memset(&data_dbt, 0, sizeof(data_dbt));
/*
* If this is a log record and it's the next one in line, simply
* write it to the log. If it's a "normal" log record, i.e., not
* a COMMIT or CHECKPOINT or something that needs immediate processing,
* just return. If it's a COMMIT, CHECKPOINT or LOG_REGISTER (i.e.,
* not SIMPLE), handle it now. If it's a NEWFILE record, then we
* have to be prepared to deal with a logfile change.
*/
dblp = dbenv->lg_handle;
R_LOCK(dbenv, &dblp->reginfo);
lp = dblp->reginfo.primary;
cmp = log_compare(&rp->lsn, &lp->ready_lsn);
/*
* This is written to assume that you don't end up with a lot of
* records after a hole. That is, it optimizes for the case where
* there is only a record or two after a hole. If you have a lot
* of records after a hole, what you'd really want to do is write
* all of them and then process all the commits, checkpoints, etc.
* together. That is more complicated processing that we can add
* later if necessary.
*
* That said, I really don't want to do db operations holding the
* log mutex, so the synchronization here is tricky.
*/
if (cmp == 0) {
if (rp->rectype == REP_NEWFILE) {
newfile: CHANGE_FILES;
} else {
ret = __log_put_int(dbenv, &rp->lsn, rec, rp->flags);
lp->ready_lsn = lp->lsn;
memcpy(&rectype, rec->data, sizeof(rectype));
}
while (ret == 0 && IS_SIMPLE(rectype) &&
log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
/*
* We just filled in a gap in the log record stream.
* Write subsequent records to the log.
*/
gap_check: R_UNLOCK(dbenv, &dblp->reginfo);
if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
goto err;
if ((ret = dbc->c_get(dbc,
&key_dbt, &data_dbt, DB_RMW | DB_FIRST)) != 0)
goto err;
rp = (REP_CONTROL *)key_dbt.data;
rec = &data_dbt;
memcpy(&rectype, rec->data, sizeof(rectype));
R_LOCK(dbenv, &dblp->reginfo);
/*
* We need to check again, because it's possible that
* some other thread of control changed the waiting_lsn
* or removed that record from the database.
*/
if (log_compare(&lp->ready_lsn, &rp->lsn) == 0) {
if (rp->rectype != REP_NEWFILE) {
ret = __log_put_int(dbenv,
&rp->lsn, &data_dbt, rp->flags);
lp->ready_lsn = lp->lsn;
} else
CHANGE_FILES;
R_UNLOCK(dbenv, &dblp->reginfo);
if ((ret = dbc->c_del(dbc, 0)) != 0)
goto err;
/*
* If the current rectype is simple, we're
* ready for another record; otherwise,
* don't get one, because we need to
* process the current one now.
*/
if (IS_SIMPLE(rectype)) {
ret = dbc->c_get(dbc,
&key_dbt, &data_dbt, DB_NEXT);
if (ret != DB_NOTFOUND && ret != 0)
goto err;
lsn =
((REP_CONTROL *)key_dbt.data)->lsn;
if ((ret = dbc->c_close(dbc)) != 0)
goto err;
R_LOCK(dbenv, &dblp->reginfo);
if (ret == DB_NOTFOUND) {
ZERO_LSN(lp->waiting_lsn);
break;
} else
lp->waiting_lsn = lsn;
} else {
R_LOCK(dbenv, &dblp->reginfo);
lp->waiting_lsn = lp->ready_lsn;
break;
}
}
}
} else if (cmp > 0) {
/*
* The LSN is higher than the one we were waiting for.
* If it is a NEWFILE message, this may not mean that
* there's a gap; in some cases, NEWFILE messages contain
* the LSN of the beginning of the new file instead
* of the end of the old.
*
* In these cases, the rec DBT will contain the last LSN
* of the old file, so we can tell whether there's a gap.
*/
if (rp->rectype == REP_NEWFILE &&
rp->lsn.file == lp->ready_lsn.file + 1 &&
rp->lsn.offset == 0) {
DB_ASSERT(rec != NULL && rec->data != NULL &&
rec->size == sizeof(DB_LSN));
memcpy(&lsn, rec->data, sizeof(DB_LSN));
if (log_compare(&lp->ready_lsn, &lsn) > 0)
/*
* The last LSN in the old file is smaller
* than the one we're expecting, so there's
* no gap--the one we're expecting just
* doesn't exist.
*/
goto newfile;
}
/*
* This record isn't in sequence; add it to the table and
* update waiting_lsn if necessary.
*/
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
next_lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
ret = dbp->put(dbp, NULL, &key_dbt, rec, 0);
/* Request the LSN we are still waiting for. */
MUTEX_LOCK(dbenv, db_rep->mutexp, dbenv->lockfhp);
eid = db_rep->region->master_id;
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
ret = __rep_send_message(dbenv, eid, REP_LOG_REQ,
&next_lsn, NULL, 0);
R_LOCK(dbenv, &dblp->reginfo);
if (ret == 0)
if (IS_ZERO_LSN(lp->waiting_lsn) ||
log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
lp->waiting_lsn = rp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
return (ret);
}
R_UNLOCK(dbenv, &dblp->reginfo);
if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype)))
return (ret);
/*
* If we got here, then we've got a log record in rp and rec that
* we need to process.
*/
switch(rectype) {
case DB_txn_ckp:
/* Sync the memory pool and write the log record. */
memcpy(&ckp_lsn, (u_int8_t *)rec->data +
((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args),
sizeof(DB_LSN));
retry: if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) {
ret = dbenv->memp_sync(dbenv, &ckp_lsn);
if (ret == DB_INCOMPLETE && retry_count < 4) {
(void)__os_sleep(dbenv, 1 << retry_count, 0);
retry_count++;
goto retry;
}
}
if (ret == 0) {
ret = dbenv->log_put(dbenv, &lsn, rec, rp->flags);
}
break;
case DB_log_register:
/* Simply redo the operation. */
if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
ret = __db_dispatch(dbenv,
NULL, rec, &rp->lsn, DB_TXN_APPLY, NULL);
break;
case DB_txn_regop:
if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
ret = __rep_process_txn(dbenv, rec);
break;
default:
goto err;
}
/* Check if we need to go back into the table. */
if (ret == 0) {
R_LOCK(dbenv, &dblp->reginfo);
if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
goto gap_check;
R_UNLOCK(dbenv, &dblp->reginfo);
}
err: if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
/*
* __rep_process_txn --
*
* This is the routine that actually gets a transaction ready for
* processing.
*/
static int
__rep_process_txn(dbenv, commit_rec)
DB_ENV *dbenv;
DBT *commit_rec;
{
DBT data_dbt;
DB_LOCKREQ req, *lvp;
DB_LOGC *logc;
DB_LSN prev_lsn;
LSN_PAGE *ap;
TXN_RECS recs;
__txn_regop_args *txn_args;
u_int32_t op;
int i, ret, t_ret;
int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
size_t dtabsize;
/*
* There are three phases: First, we have to traverse
* backwards through the log records gathering the list
* of all the pages accessed. Once we have this information
* we can acquire all the locks we need. Finally, we apply
* all the records in the transaction and release the locks.
*/
dtab = NULL;
/* Make sure this is really a commit and not an abort! */
if ((ret = __txn_regop_read(dbenv, commit_rec->data, &txn_args)) != 0)
return (ret);
op = txn_args->opcode;
prev_lsn = txn_args->prev_lsn;
__os_free(dbenv, txn_args, 0);
if (op != TXN_COMMIT)
return (0);
memset(&recs, 0, sizeof(recs));
recs.txnid = txn_args->txnid->txnid;
if ((ret = dbenv->lock_id(dbenv, &recs.lockid)) != 0)
return (ret);
/* Initialize the getpgno dispatch table. */
if ((ret = __rep_lockpgno_init(dbenv, &dtab, &dtabsize)) != 0)
goto err;
if ((ret = __rep_lockpages(dbenv,
dtab, NULL, &prev_lsn, &recs, recs.lockid)) != 0)
goto err;
if (recs.nalloc == 0)
goto err;
/* Phase 3: Apply updates and release locks. */
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
goto err;
memset(&data_dbt, 0, sizeof(data_dbt));
for (ap = &recs.array[0], i = 0; i < recs.npages; i++, ap++) {
if ((ret = logc->get(logc, &ap->lsn, &data_dbt, DB_SET)) != 0)
goto err;
if ((ret = __db_dispatch(dbenv, NULL,
&data_dbt, &ap->lsn, DB_TXN_APPLY, NULL)) != 0)
goto err;
}
err: if (recs.nalloc != 0) {
req.op = DB_LOCK_PUT_ALL;
if ((t_ret = dbenv->lock_vec(dbenv, recs.lockid,
DB_LOCK_FREE_LOCKER, &req, 1, &lvp)) != 0 && ret == 0)
ret = t_ret;
__os_free(dbenv, recs.array, recs.nalloc * sizeof(LSN_PAGE));
}
if ((t_ret =
dbenv->lock_id_free(dbenv, recs.lockid)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
ret = t_ret;
if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
__os_free(dbenv, data_dbt.data, 0);
if (dtab != NULL)
__os_free(dbenv, dtab, 0);
return (ret);
}
/*
* __rep_client_dbinit --
*
* Initialize the LSN database on the client side. This is called from the
* client initialization code. The startup flag value indicates if
* this is the first thread/process starting up and therefore should create
* the LSN database. This routine must be called once by each process acting
* as a client.
*
* PUBLIC: int __rep_client_dbinit __P((DB_ENV *, int));
*/
int
__rep_client_dbinit(dbenv, startup)
DB_ENV *dbenv;
int startup;
{
DB_REP *db_rep;
DB *rep_db;
int ret, t_ret;
u_int32_t flags;
PANIC_CHECK(dbenv);
db_rep = dbenv->rep_handle;
rep_db = NULL;
#define REPDBNAME "__db.rep.db"
/* Check if this has already been called on this environment. */
if (db_rep->rep_db != NULL)
return (0);
if (startup) {
if ((ret = db_create(&rep_db, dbenv, 0)) != 0)
goto err;
/*
* Ignore errors, because if the file doesn't exist, this
* is perfectly OK.
*/
(void)rep_db->remove(rep_db, REPDBNAME, NULL, 0);
}
if ((ret = db_create(&rep_db, dbenv, 0)) != 0)
goto err;
if ((ret = rep_db->set_bt_compare(rep_db, __rep_bt_cmp)) != 0)
goto err;
flags = (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0) |
(startup ? DB_CREATE : 0);
if ((ret = rep_db->open(rep_db,
"__db.rep.db", NULL, DB_BTREE, flags, 0)) != 0)
goto err;
/* Allow writes to this database on a client. */
F_SET(rep_db, DB_CL_WRITER);
db_rep->rep_db = rep_db;
return (0);
err:
if (rep_db != NULL &&
(t_ret = rep_db->close(rep_db, DB_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
db_rep->rep_db = NULL;
return (ret);
}
/*
* __rep_bt_cmp --
*
* Comparison function for the LSN table. We use the entire control
* structure as a key (for simplicity, so we don't have to merge the
* other fields in the control with the data field), but really only
* care about the LSNs.
*/
static int
__rep_bt_cmp(dbp, dbt1, dbt2)
DB *dbp;
const DBT *dbt1, *dbt2;
{
DB_LSN lsn1, lsn2;
REP_CONTROL *rp1, *rp2;
COMPQUIET(dbp, NULL);
rp1 = dbt1->data;
rp2 = dbt2->data;
__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
if (lsn1.file > lsn2.file)
return (1);
if (lsn1.file < lsn2.file)
return (-1);
if (lsn1.offset > lsn2.offset)
return (1);
if (lsn1.offset < lsn2.offset)
return (-1);
return (0);
}