parallel decoding fix memory leakage bug

Offering: openGaussDev

More detail: parallel decoding fix initialization bug

Match-id-4b9ee91c71686dcaba8bbd3021d2363206294dba
This commit is contained in:
openGaussDev 2022-03-08 15:13:27 +08:00 committed by yanghao
parent 5547cd8c86
commit 88e3ad9fb4
2 changed files with 54 additions and 5 deletions

View File

@ -80,12 +80,14 @@ static void ParallelReorderBufferRestoreChange(ParallelReorderBuffer *rb, Parall
/* Parallel decoding batch sending unit length is set to 1MB. */
static const int g_batch_unit_length = 1 * 1024 * 1024;
/* Parallel decoding caches at most 512 transactions. */
static const Size g_max_cached_txns = 512;
void ParallelReorderBufferQueueChange(ParallelReorderBuffer *rb, logicalLog *change, int slotId)
{
ParallelReorderBufferTXN *txn = NULL;
txn = ParallelReorderBufferTXNByXid(rb, change->xid, true, NULL, change->lsn, true);
Assert(InvalidXLogRecPtr != change->lsn);
dlist_push_tail(&txn->changes, &change->node);
txn->nentries++;
txn->nentries_mem++;
@ -238,7 +240,13 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetTXN(ParallelReorderBuffer *rb)
ParallelReorderBufferTXN *txn = NULL;
int rc = 0;
/* check the slab cache */
txn = (ParallelReorderBufferTXN *)palloc(sizeof(ParallelReorderBufferTXN));
if (rb->nr_cached_transactions > 0) {
rb->nr_cached_transactions--;
txn = (ParallelReorderBufferTXN *)dlist_container(ParallelReorderBufferTXN, node,
dlist_pop_head_node(&rb->cached_transactions));
} else {
txn = (ParallelReorderBufferTXN *)palloc(sizeof(ParallelReorderBufferTXN));
}
rc = memset_s(txn, sizeof(ParallelReorderBufferTXN), 0, sizeof(ParallelReorderBufferTXN));
securec_check(rc, "", "");
@ -249,6 +257,7 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetTXN(ParallelReorderBuffer *rb)
return txn;
}
void ParallelReorderBufferReturnTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn)
{
/* clean the lookup cache if we were cached (quite likely) */
@ -270,6 +279,13 @@ void ParallelReorderBufferReturnTXN(ParallelReorderBuffer *rb, ParallelReorderBu
txn->nentries = 0;
txn->nentries_mem = 0;
/* check whether to put into the slab cache */
if (rb->nr_cached_transactions < g_max_cached_txns) {
rb->nr_cached_transactions++;
dlist_push_head(&rb->cached_transactions, &txn->node);
} else {
pfree(txn);
}
}
/* ---------------------------------------
* toast reassembly support
@ -1163,6 +1179,9 @@ static logicalLog *ParallelReorderBufferIterTXNNext(ParallelReorderBuffer *rb,
entry = &state->entries[off];
if (!dlist_is_empty(&state->old_change)) {
change = dlist_container(logicalLog, node, dlist_pop_head_node(&state->old_change));
FreeLogicalLog(change, slotId);
/* We should find atmost one old_change here */
Assert(dlist_is_empty(&state->old_change));
}
@ -1219,6 +1238,29 @@ static logicalLog *ParallelReorderBufferIterTXNNext(ParallelReorderBuffer *rb,
return change;
}
/*
* Deallocate the iterator
*/
static void ParallelReorderBufferIterTXNFinish(ParallelReorderBufferIterTXNState *state, int slotId)
{
for (Size off = 0; off < state->nr_txns; off++) {
if (state->entries[off].fd != -1) {
(void)CloseTransientFile(state->entries[off].fd);
}
}
/* free memory we might have "leaked" in the last *Next call */
if (!dlist_is_empty(&state->old_change)) {
logicalLog *change = NULL;
change = dlist_container(logicalLog, node, dlist_pop_head_node(&state->old_change));
FreeLogicalLog(change, slotId);
Assert(dlist_is_empty(&state->old_change));
}
binaryheap_free(state->heap);
pfree(state);
}
/*
* Forget the contents of a transaction if we aren't interested in it's
* contents. Needs to be first called for subtransactions and then for the
@ -1237,6 +1279,7 @@ void ParallelReorderBufferForget(ParallelReorderBuffer *rb, int slotId, Parallel
FreeLogicalLog(logChange, slotId);
}
ParallelReorderBufferCleanupTXN(rb, txn);
ParallelReorderBufferIterTXNFinish(iterstate, slotId);
}
/*
@ -1440,6 +1483,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change,
g_Logicaldispatcher[slotId].pOptions.sending_batch > 0);
}
ParallelReorderBufferCleanupTXN(rb, txn);
ParallelReorderBufferIterTXNFinish(iterstate, slotId);
ParallelCheckBatch(ctx->out, pdata, slotId, &oldCtx,
(pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes));
@ -1545,12 +1589,14 @@ ParallelReorderBuffer *ParallelReorderBufferAllocate(int slotId)
hash_ctl.hcxt = buffer->context;
const long reorderBufferNelem = 1000;
buffer->by_txn = hash_create("ReorderBufferByXid", reorderBufferNelem,
buffer->by_txn = hash_create("ParallelReorderBufferByXid", reorderBufferNelem,
&hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
buffer->nr_cached_transactions = 0;
buffer->nr_cached_tuplebufs = 0;
buffer->nr_cached_changes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;

View File

@ -340,12 +340,15 @@ struct ParallelReorderBuffer {
* ontop of reorderbuffer.c
*/
/* cached ParallelReorderBufferTXNs */
dlist_head cached_transactions;
Size nr_cached_transactions;
/* cached ReorderBufferChanges */
/* cached ParallelReorderBufferChanges */
dlist_head cached_changes;
Size nr_cached_changes;
/* cached ReorderBufferTupleBufs */
/* cached ParallelReorderBufferTupleBufs */
slist_head cached_tuplebufs;
Size nr_cached_tuplebufs;
TransactionId lastRunningXactOldestXmin;