合并一个PG外排的优化

This commit is contained in:
pujr 2022-09-19 10:33:07 +08:00
parent 55a8dc30ed
commit 6f7eef3db6
1 changed files with 538 additions and 38 deletions

View File

@ -235,11 +235,13 @@ struct Tuplesortstate {
* tuples to return? */
bool boundUsed; /* true if we made use of a bounded heap */
int bound; /* if bounded, the maximum number of tuples */
bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */
MemoryContext sortcontext; /* memory context holding all sort data */
MemoryContext tuplecontext; /* memory context holding tuple data */
LogicalTapeSet* tapeset; /* logtape.c object for tapes in a temp file */
#ifdef PGXC
Oid current_xcnode; /* node from where we are got last tuple */
@ -281,6 +283,14 @@ struct Tuplesortstate {
*/
void (*readtup)(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len);
/*
* Function to move a caller tuple. This is usually implemented as a
* memmove() shim, but function may also perform additional fix-up of
* caller tuple where needed. Batch memory support requires the
* movement of caller tuples from one location in memory to another.
*/
void (*movetup) (void *dest, void *src, unsigned int len);
/*
* Function to reverse the sort direction from its current state. (We
* could dispense with this if we wanted to enforce that all variants
@ -303,6 +313,15 @@ struct Tuplesortstate {
int memtupsize; /* allocated length of memtuples array */
bool growmemtuples; /* memtuples' growth still underway? */
/*
* Memory for tuples is sometimes allocated in batch, rather than
* incrementally. This implies that incremental memory accounting has been
* abandoned. Currently, this only happens for the final on-the-fly merge
* step. Large batch allocations can store tuples (e.g. IndexTuples)
* without palloc() fragmentation and other overhead.
*/
bool batchUsed;
/*
* While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made.
@ -337,6 +356,21 @@ struct Tuplesortstate {
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
/*
* Per-tape batch state, when final on-the-fly merge consumes memory from
* just a few large allocations.
*
* Aside from the general benefits of performing fewer individual retail
* palloc() calls, this also helps make merging more cache efficient, since
* each tape's tuples must naturally be accessed sequentially (in sorted
* order).
*/
int64 spacePerTape; /* Space (memory) for tuples (not slots) */
char **mergetuples; /* Each tape's memory allocation */
char **mergecurrent; /* Current offset into each tape's memory */
char **mergetail; /* Last item's start point for each tape */
char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
* number, ie, an index into the tp_xxx[] arrays. Be careful to keep
@ -435,7 +469,6 @@ struct Tuplesortstate {
Oid datumType;
/* we need typelen and byval in order to know how to copy the Datums. */
int datumTypeLen;
bool datumTypeByVal;
// merge sort in remotequery
RemoteQueryState* combiner; /* tuple source, alternate to tapeset */
@ -470,6 +503,7 @@ struct Tuplesortstate {
#define COPYTUP(state, stup, tup) ((*(state)->copytup)(state, stup, tup))
#define WRITETUP(state, tape, stup) ((*(state)->writetup)(state, tape, stup))
#define READTUP(state, stup, tape, len) ((*(state)->readtup)(state, stup, tape, len))
#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
#ifdef PGXC
#define GETLEN(state, tape, eofOK) ((*(state)->getlen)(state, tape, eofOK))
#endif
@ -485,7 +519,8 @@ static bool LACKMEM(Tuplesortstate* state)
{
int64 usedMem = state->allowedMem - state->availMem;
if (state->availMem < 0 || gs_sysmemory_busy(usedMem * state->dop, true))
if ((state->availMem < 0 && !state->batchUsed) ||
gs_sysmemory_busy(usedMem * state->dop, true))
return true;
return false;
@ -584,7 +619,13 @@ static bool AutoSpreadMem(Tuplesortstate* state, double* growRatio)
* rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's
* a lot better than what we were doing before 7.3.
* a lot better than what we were doing before 7.3. As of 9.6, a
* separate memory context is used for caller passed tuples. Resetting
* it at certain key increments significantly ameliorates fragmentation.
* Note that this places a responsibility on readtup and copytup routines
* to use the right memory context for these tuples (and to not use the
* reset context for anything whose lifetime needs to span multiple
* external sort runs).
*/
/* When using this macro, beware of double evaluation of len */
@ -603,7 +644,14 @@ static void inittapestate(Tuplesortstate *state, int maxTapes);
static void selectnewtape(Tuplesortstate* state);
static void mergeruns(Tuplesortstate* state);
static void mergeonerun(Tuplesortstate* state);
static void beginmerge(Tuplesortstate* state);
static void beginmerge(Tuplesortstate *state, bool finalMerge);
static void batchmemtuples(Tuplesortstate *state);
static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
static void mergebatchone(Tuplesortstate *state, int srcTape,
SortTuple *stup, bool *should_free);
static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
SortTuple *rtup, bool *should_free);
static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
static void mergepreread(Tuplesortstate* state);
static void mergeprereadone(Tuplesortstate* state, int srcTape);
static void dumptuples(Tuplesortstate* state, bool alltuples);
@ -614,20 +662,24 @@ static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
static void tuplesort_heap_delete_top(Tuplesortstate* state);
static unsigned int getlen(Tuplesortstate* state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate* state, int tapenum);
static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
static int comparetup_heap(const SortTuple* a, const SortTuple* b, Tuplesortstate* state);
static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup);
static void writetup_heap(Tuplesortstate* state, int tapenum, SortTuple* stup);
static void readtup_heap(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len);
static void movetup_heap(void *dest, void *src, unsigned int len);
static void reversedirection_heap(Tuplesortstate* state);
static int comparetup_cluster(const SortTuple* a, const SortTuple* b, Tuplesortstate* state);
static void copytup_cluster(Tuplesortstate* state, SortTuple* stup, void* tup);
static void writetup_cluster(Tuplesortstate* state, int tapenum, SortTuple* stup);
static void readtup_cluster(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len);
static void movetup_cluster(void *dest, void *src, unsigned int len);
static int comparetup_index_btree(const SortTuple* a, const SortTuple* b, Tuplesortstate* state);
static int comparetup_index_hash(const SortTuple* a, const SortTuple* b, Tuplesortstate* state);
static void copytup_index(Tuplesortstate* state, SortTuple* stup, void* tup);
static void writetup_index(Tuplesortstate* state, int tapenum, SortTuple* stup);
static void readtup_index(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len);
static void movetup_index(void *dest, void *src, unsigned int len);
static int worker_get_identifier(const Tuplesortstate *state);
static void worker_freeze_result_tape(Tuplesortstate *state);
static void worker_nomergeruns(Tuplesortstate *state);
@ -638,6 +690,7 @@ static int comparetup_datum(const SortTuple* a, const SortTuple* b, Tuplesortsta
static void copytup_datum(Tuplesortstate* state, SortTuple* stup, void* tup);
static void writetup_datum(Tuplesortstate* state, int tapenum, SortTuple* stup);
static void readtup_datum(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len);
static void movetup_datum(void *dest, void *src, unsigned int len);
static void reversedirection_datum(Tuplesortstate* state);
static void free_sort_tuple(Tuplesortstate* state, SortTuple* stup);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
@ -696,6 +749,7 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess,
{
Tuplesortstate* state = NULL;
MemoryContext sortcontext;
MemoryContext tuplecontext;
MemoryContext oldcontext;
/* See leader_takeover_tapes() remarks on randomAccess support */
@ -706,8 +760,16 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess,
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
*/
sortcontext = AllocSetContextCreate(CurrentMemoryContext, "TupleSort", ALLOCSET_DEFAULT_MINSIZE,
sortcontext = AllocSetContextCreate(CurrentMemoryContext, "TupleSort main", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, STANDARD_CONTEXT, workMem * 1024L);
tuplecontext = AllocSetContextCreate(sortcontext,
"Caller tuples",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE,
STANDARD_CONTEXT,
workMem * 1024L);
/*
* Make the Tuplesortstate within the per-sort context. This way, we
@ -725,15 +787,18 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess,
state->status = TSS_INITIAL;
state->randomAccess = randomAccess;
state->bounded = false;
state->tuples = true;
state->boundUsed = false;
state->allowedMem = Max(workMem, 64) * (int64) 1024;
state->availMem = state->allowedMem;
state->sortcontext = sortcontext;
state->tuplecontext = tuplecontext;
state->tapeset = NULL;
state->memtupcount = 0;
state->memtupsize = 1024; /* initial guess */
state->growmemtuples = true;
state->batchUsed = false;
state->memtuples = (SortTuple*)palloc(state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
@ -814,6 +879,7 @@ Tuplesortstate* tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber* a
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
state->movetup = movetup_heap;
#ifdef PGXC
state->getlen = getlen;
#endif
@ -900,6 +966,7 @@ Tuplesortstate* tuplesort_begin_cluster(
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
state->movetup = movetup_cluster;
#ifdef PGXC
state->getlen = getlen;
#endif
@ -958,6 +1025,7 @@ Tuplesortstate* tuplesort_begin_index_btree(
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->movetup = movetup_index;
#ifdef PGXC
state->getlen = getlen;
#endif
@ -1000,6 +1068,7 @@ Tuplesortstate* tuplesort_begin_index_hash(
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->movetup = movetup_index;
#ifdef PGXC
state->getlen = getlen;
#endif
@ -1046,6 +1115,7 @@ Tuplesortstate* tuplesort_begin_datum(
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
state->movetup = movetup_datum;
#ifdef PGXC
state->getlen = getlen;
#endif
@ -1065,7 +1135,7 @@ Tuplesortstate* tuplesort_begin_datum(
/* lookup necessary attributes of the datum type */
get_typlenbyval(datumType, &typlen, &typbyval);
state->datumTypeLen = typlen;
state->datumTypeByVal = typbyval;
state->tuples = !typbyval;
(void)MemoryContextSwitchTo(oldcontext);
@ -1447,7 +1517,7 @@ void TuplesortPutheaptuple(Tuplesortstate* state, HeapTuple tup)
void tuplesort_putindextuplevalues(
Tuplesortstate* state, Relation rel, ItemPointer self, Datum* values, const bool* isnull)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
stup.tupindex = 0;
stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
@ -1456,6 +1526,8 @@ void tuplesort_putindextuplevalues(
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
/* set up first-column key value */
stup.datum1 = index_getattr((IndexTuple)stup.tuple, 1, RelationGetDescr(state->indexRel), &stup.isnull1);
MemoryContextSwitchTo(state->sortcontext);
puttuple_common(state, &stup);
(void)MemoryContextSwitchTo(oldcontext);
@ -1468,7 +1540,7 @@ void tuplesort_putindextuplevalues(
*/
void tuplesort_putdatum(Tuplesortstate* state, Datum val, bool isNull)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
stup.tupindex = 0;
@ -1476,15 +1548,17 @@ void tuplesort_putdatum(Tuplesortstate* state, Datum val, bool isNull)
* If it's a pass-by-reference value, copy it into memory we control, and
* decrease availMem. Then call the common code.
*/
if (isNull || state->datumTypeByVal) {
if (isNull || !state->tuples) {
stup.datum1 = val;
stup.isnull1 = isNull;
stup.tuple = NULL; /* no separate storage */
MemoryContextSwitchTo(state->sortcontext);
} else {
stup.datum1 = datumCopy(val, false, state->datumTypeLen);
stup.isnull1 = false;
stup.tuple = DatumGetPointer(stup.datum1);
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
MemoryContextSwitchTo(state->sortcontext);
}
puttuple_common(state, &stup);
@ -1802,6 +1876,7 @@ void tuplesort_performsort(Tuplesortstate* state)
* Internal routine to fetch the next tuple in either forward or back
* direction into *stup. Returns FALSE if no more tuples.
* If *should_free is set, the caller must pfree stup.tuple when done with it.
* Otherwise, caller should not use tuple following next call here.
*/
static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortTuple* stup, bool* should_free)
{
@ -1812,6 +1887,7 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT
switch (state->status) {
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
Assert(!state->batchUsed);
*should_free = false;
if (forward) {
if (state->current < state->memtupcount) {
@ -1929,7 +2005,8 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT
case TSS_FINALMERGE:
Assert(forward);
*should_free = true;
Assert(state->batchUsed || !state->tuples);
*should_free = false;
/*
* This code should match the inner loop of mergeonerun().
@ -1940,20 +2017,27 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT
int tupIndex;
SortTuple* newtup = NULL;
/*
* Returned tuple is still counted in our memory space most
* of the time. See mergebatchone() for discussion of why
* caller may occasionally be required to free returned
* tuple, and how preread memory is managed with regard to
* edge cases more generally.
*/
*stup = state->memtuples[0];
/* returned tuple is no longer counted in our memory space */
if (stup->tuple != NULL) {
tuplength = GetMemoryChunkSpace(stup->tuple);
state->availMem += tuplength;
state->mergeavailmem[srcTape] += tuplength;
}
if ((tupIndex = state->mergenext[srcTape]) == 0) {
/*
* out of preloaded data on this tape, try to read more
*
* Unlike mergeonerun(), we only preload from the single
* tape that's run dry. See mergepreread() comments.
*/
* out of preloaded data on this tape, try to read more
*
* Unlike mergeonerun(), we only preload from the single
* tape that's run dry, though not before preparing its
* batch memory for a new round of sequential consumption.
* See mergepreread() comments.
*/
if (state->batchUsed)
mergebatchone(state, srcTape, stup, should_free);
mergeprereadone(state, srcTape);
/*
@ -1961,6 +2045,8 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT
*/
if ((tupIndex = state->mergenext[srcTape]) == 0) {
/* Remove the top node from the heap */
if (state->batchUsed)
mergebatchfreetape(state, srcTape, stup, should_free);
tuplesort_heap_delete_top(state);
return true;
}
@ -2121,7 +2207,7 @@ bool tuplesort_getdatum(Tuplesortstate* state, bool forward, Datum* val, bool* i
return false;
}
if (stup.isnull1 || state->datumTypeByVal) {
if (stup.isnull1 || !state->tuples) {
*val = stup.datum1;
*isNull = stup.isnull1;
} else {
@ -2313,6 +2399,10 @@ static void inittapestate(Tuplesortstate *state, int maxTapes)
state->mergelast = (int*)palloc0(maxTapes * sizeof(int));
state->mergeavailslots = (int*)palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (long*)palloc0(maxTapes * sizeof(long));
state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *)palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *)palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *)palloc0(maxTapes * sizeof(int));
@ -2432,7 +2522,7 @@ static void mergeruns(Tuplesortstate* state)
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
beginmerge(state);
beginmerge(state, state->tuples);
state->status = TSS_FINALMERGE;
return;
}
@ -2515,7 +2605,7 @@ static void mergeonerun(Tuplesortstate* state)
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
beginmerge(state);
beginmerge(state, false);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
@ -2557,6 +2647,12 @@ static void mergeonerun(Tuplesortstate* state)
state->mergeavailslots[srcTape]++;
}
/*
* Reset tuple memory, now that no caller tuples are needed in memory.
* This prevents fragmentation.
*/
MemoryContextReset(state->tuplecontext);
/*
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs.
@ -2576,11 +2672,15 @@ static void mergeonerun(Tuplesortstate* state)
* beginmerge - initialize for a merge pass
*
* We decrease the counts of real and dummy runs for each tape, and mark
* which tapes contain active input runs in mergeactive[]. Then, load
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
*
* finalMergeBatch indicates if this is the beginning of a final on-the-fly
* merge where a batched allocation of tuple memory is required.
*/
static void beginmerge(Tuplesortstate* state)
static void
beginmerge(Tuplesortstate *state, bool finalMergeBatch)
{
int activeTapes;
int tapenum;
@ -2622,6 +2722,18 @@ static void beginmerge(Tuplesortstate* state)
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
if (finalMergeBatch)
{
/* Free outright buffers for tape never actually allocated */
FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
/*
* Grow memtuples one last time, since the palloc() overhead no longer
* incurred can make a big difference
*/
batchmemtuples(state);
}
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space. For cluster environment, the memtupsize initial
@ -2660,7 +2772,7 @@ static void beginmerge(Tuplesortstate* state)
}
Assert(slotsPerTape > 0);
spacePerTape = state->availMem / activeTapes;
spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
for (srcTape = 0; srcTape < state->maxTapes; srcTape++) {
if (state->mergeactive[srcTape]) {
state->mergeavailslots[srcTape] = slotsPerTape;
@ -2681,6 +2793,15 @@ static void beginmerge(Tuplesortstate* state)
}
#endif
/*
* Preallocate tuple batch memory for each tape. This is the memory used
* for tuples themselves (not SortTuples), so it's never used by
* pass-by-value datum sorts. Memory allocation is performed here at most
* once per sort, just in advance of the final on-the-fly merge step.
*/
if (finalMergeBatch)
mergebatch(state, spacePerTape);
/*
* Preread as many tuples as possible (and at least one) from each active
* tape
@ -2702,10 +2823,318 @@ static void beginmerge(Tuplesortstate* state)
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++;
#ifdef TRACE_SORT
if (u_sess->attr.attr_common.trace_sort && finalMergeBatch)
{
int64 perTapeKB = (spacePerTape + 1023) / 1024;
int64 usedSpaceKB;
int usedSlots;
/*
* Report how effective batchmemtuples() was in balancing
* the number of slots against the need for memory for the
* underlying tuples (e.g. IndexTuples). The big preread of
* all tapes when switching to FINALMERGE state should be
* fairly representative of memory utilization during the
* final merge step, and in any case is the only point at
* which all tapes are guaranteed to have depleted either
* their batch memory allowance or slot allowance. Ideally,
* both will be completely depleted for every tape by now.
*/
usedSpaceKB = (state->mergecurrent[srcTape] -
state->mergetuples[srcTape] + 1023) / 1024;
usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
elog(LOG, "tape %d initially used " INT64_FORMAT " KB of "
INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots "
"(%2.3f)", srcTape,
usedSpaceKB, perTapeKB,
(double) usedSpaceKB / (double) perTapeKB,
usedSlots, slotsPerTape,
(double) usedSlots / (double) slotsPerTape);
}
#endif
}
}
}
/*
* batchmemtuples - grow memtuples without palloc overhead
*
* When called, availMem should be approximately the amount of memory we'd
* require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
* that were allocated with palloc() overhead, and in doing so use up all
* allocated slots. However, though slots and tuple memory is in balance
* following the last grow_memtuples() call, that's predicated on the observed
* average tuple size for the "final" grow_memtuples() call, which includes
* palloc overhead.
*
* This will perform an actual final grow_memtuples() call without any palloc()
* overhead, rebalancing the use of memory between slots and tuples.
*/
static void
batchmemtuples(Tuplesortstate *state)
{
int64 refund;
int64 availMemLessRefund;
int memtupsize = state->memtupsize;
/* For simplicity, assume no memtuples are actually currently counted */
Assert(state->memtupcount == 0);
/*
* Refund STANDARDCHUNKHEADERSIZE per tuple.
*
* This sometimes fails to make memory use prefectly balanced, but it
* should never make the situation worse. Note that Assert-enabled builds
* get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
*/
refund = memtupsize * STANDARDCHUNKHEADERSIZE;
availMemLessRefund = state->availMem - refund;
/*
* To establish balanced memory use after refunding palloc overhead,
* temporarily have our accounting indicate that we've allocated all
* memory we're allowed to less that refund, and call grow_memtuples()
* to have it increase the number of slots.
*/
state->growmemtuples = true;
USEMEM(state, availMemLessRefund);
(void) grow_memtuples(state);
/* Should not matter, but be tidy */
FREEMEM(state, availMemLessRefund);
state->growmemtuples = false;
#ifdef TRACE_SORT
if (u_sess->attr.attr_common.trace_sort)
{
Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
(double) NewKb / (double) OldKb,
memtupsize, OldKb,
state->memtupsize, NewKb);
}
#endif
}
/*
* mergebatch - initialize tuple memory in batch
*
* This allows sequential access to sorted tuples buffered in memory from
* tapes/runs on disk during a final on-the-fly merge step. Note that the
* memory is not used for SortTuples, but for the underlying tuples (e.g.
* MinimalTuples).
*
* Note that when batch memory is used, there is a simple division of space
* into large buffers (one per active tape). The conventional incremental
* memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
* when each tape's memory budget is exceeded, a retail palloc() "overflow" is
* performed, which is then immediately detected in a way that is analogous to
* LACKMEM(). This keeps each tape's use of memory fair, which is always a
* goal.
*/
static void
mergebatch(Tuplesortstate *state, int64 spacePerTape)
{
int srcTape;
Assert(state->activeTapes > 0);
Assert(state->tuples);
/*
* For the purposes of tuplesort's memory accounting, the batch allocation
* is special, and regular memory accounting through USEMEM() calls is
* abandoned (see mergeprereadone()).
*/
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
char *mergetuples;
if (!state->mergeactive[srcTape])
continue;
/* Allocate buffer for each active tape */
mergetuples = (char *) palloc_huge(state->tuplecontext, spacePerTape);
/* Initialize state for tape */
state->mergetuples[srcTape] = mergetuples;
state->mergecurrent[srcTape] = mergetuples;
state->mergetail[srcTape] = mergetuples;
state->mergeoverflow[srcTape] = NULL;
}
state->batchUsed = true;
state->spacePerTape = spacePerTape;
}
/*
* mergebatchone - prepare batch memory for one merge input tape
*
* This is called following the exhaustion of preread tuples for one input
* tape. All that actually occurs is that the state for the source tape is
* reset to indicate that all memory may be reused.
*
* This routine must deal with fixing up the tuple that is about to be returned
* to the client, due to "overflow" allocations.
*/
static void
mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
bool *should_free)
{
Assert(state->batchUsed);
/*
* Tuple about to be returned to caller ("stup") is final preread tuple
* from tape, just removed from the top of the heap. Special steps around
* memory management must be performed for that tuple, to make sure it
* isn't overwritten early.
*/
if (!state->mergeoverflow[srcTape])
{
Size tupLen;
/*
* Mark tuple buffer range for reuse, but be careful to move final,
* tail tuple to start of space for next run so that it's available
* to caller when stup is returned, and remains available at least
* until the next tuple is requested.
*/
tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
state->mergecurrent[srcTape] = state->mergetuples[srcTape];
MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
tupLen);
/* Make SortTuple at top of the merge heap point to new tuple */
rtup->tuple = (void *) state->mergecurrent[srcTape];
state->mergetail[srcTape] = state->mergecurrent[srcTape];
state->mergecurrent[srcTape] += tupLen;
}
else
{
/*
* Handle an "overflow" retail palloc.
*
* This is needed when we run out of tuple memory for the tape.
*/
state->mergecurrent[srcTape] = state->mergetuples[srcTape];
state->mergetail[srcTape] = state->mergetuples[srcTape];
if (rtup->tuple)
{
Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
/* Caller should free palloc'd tuple */
*should_free = true;
}
state->mergeoverflow[srcTape] = NULL;
}
}
/*
* mergebatchfreetape - handle final clean-up for batch memory once tape is
* about to become exhausted
*
* All tuples are returned from tape, but a single final tuple, *rtup, is to be
* passed back to caller. Free tape's batch allocation buffer while ensuring
* that the final tuple is managed appropriately.
*/
static void
mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
bool *should_free)
{
Assert(state->batchUsed);
Assert(state->status == TSS_FINALMERGE);
/*
* Tuple may or may not already be an overflow allocation from
* mergebatchone()
*/
if (!*should_free && rtup->tuple)
{
/*
* Final tuple still in tape's batch allocation.
*
* Return palloc()'d copy to caller, and have it freed in a similar
* manner to overflow allocation. Otherwise, we'd free batch memory
* and pass back a pointer to garbage. Note that we deliberately
* allocate this in the parent tuplesort context, to be on the safe
* side.
*/
Size tuplen;
void *oldTuple = rtup->tuple;
tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
rtup->tuple = (char *) MemoryContextAlloc(state->sortcontext, tuplen);
MOVETUP(rtup->tuple, oldTuple, tuplen);
*should_free = true;
}
/* Free spacePerTape-sized buffer */
pfree(state->mergetuples[srcTape]);
}
/*
* mergebatchalloc - allocate memory for one tuple using a batch memory
* "logical allocation".
*
* This is used for the final on-the-fly merge phase only. READTUP() routines
* receive memory from here in place of palloc() and USEMEM() calls.
*
* Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
* contiguous order (while allowing safe reuse of memory made available to
* each tape). This maximizes locality of access as tuples are returned by
* final merge.
*
* Caller must not subsequently attempt to free memory returned here. In
* general, only mergebatch* functions know about how memory returned from
* here should be freed, and this function's caller must ensure that batch
* memory management code will definitely have the opportunity to do the right
* thing during the final on-the-fly merge.
*/
static void *
mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
{
Size reserve_tuplen = MAXALIGN(tuplen);
char *ret;
/* Should overflow at most once before mergebatchone() call: */
Assert(state->mergeoverflow[tapenum] == NULL);
Assert(state->batchUsed);
/* It should be possible to use precisely spacePerTape memory at once */
if (state->mergecurrent[tapenum] + reserve_tuplen <=
state->mergetuples[tapenum] + state->spacePerTape)
{
/*
* Usual case -- caller is returned pointer into its tape's buffer, and
* an offset from that point is recorded as where tape has consumed up
* to for current round of preloading.
*/
ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
state->mergecurrent[tapenum] += reserve_tuplen;
}
else
{
/*
* Allocate memory, and record as tape's overflow allocation. This
* will be detected quickly, in a similar fashion to a LACKMEM()
* condition, and should not happen again before a new round of
* preloading for caller's tape. Note that we deliberately allocate
* this in the parent tuplesort context, to be on the safe side.
*
* Sometimes, this does not happen because merging runs out of slots
* before running out of memory.
*/
ret = state->mergeoverflow[tapenum] =
(char *) MemoryContextAlloc(state->sortcontext, tuplen);
}
return ret;
}
/*
* mergepreread - load tuples from merge input tapes
*
@ -2757,7 +3186,9 @@ static void mergeprereadone(Tuplesortstate* state, int srcTape)
}
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
while (state->mergeavailslots[srcTape] > 0 || state->mergenext[srcTape] == 0) {
while ((state->mergeavailslots[srcTape] > 0 &&
state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
state->mergenext[srcTape] == 0) {
/* read next tuple, if any */
#ifdef PGXC
if ((tuplen = GETLEN(state, srcTape, true)) == 0)
@ -3268,6 +3699,42 @@ static void markrunend(Tuplesortstate* state, int tapenum)
pgstat_increase_session_spill_size(sizeof(len));
}
/*
* Get memory for tuple from within READTUP() routine. Allocate
* memory and account for that, or consume from tape's batch
* allocation.
*
* Memory returned here in the final on-the-fly merge case is recycled
* from tape's batch allocation. Otherwise, callers must pfree() or
* reset tuple child memory context, and account for that with a
* FREEMEM(). Currently, this only ever needs to happen in WRITETUP()
* routines.
*/
static void *
readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
{
if (state->batchUsed)
{
/*
* No USEMEM() call, because during final on-the-fly merge
* accounting is based on tape-private state. ("Overflow"
* allocations are detected as an indication that a new round
* or preloading is required. Preloading marks existing
* contents of tape's batch buffer for reuse.)
*/
return mergebatchalloc(state, tapenum, tuplen);
}
else
{
char *ret;
/* Batch allocation yet to be performed */
ret = (char *) MemoryContextAlloc(state->tuplecontext, tuplen);
USEMEM(state, GetMemoryChunkSpace(ret));
return ret;
}
}
/*
* Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
*/
@ -3398,6 +3865,7 @@ static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup)
Datum original;
MinimalTuple tuple;
HeapTupleData htup;
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */
tuple = ExecCopySlotMinimalTuple(slot);
@ -3411,6 +3879,8 @@ static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup)
htup.t_data = (HeapTupleHeader)((char*)tuple - MINIMAL_TUPLE_OFFSET);
original = tableam_tops_tuple_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &stup->isnull1);
MemoryContextSwitchTo(oldcontext);
if (!state->sortKeys->abbrev_converter || stup->isnull1) {
/*
* Store ordinary Datum representation, or NULL value. If there is a
@ -3481,11 +3951,10 @@ static void readtup_heap(Tuplesortstate* state, SortTuple* stup, int tapenum, un
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
MinimalTuple tuple = (MinimalTuple)palloc(tuplen);
MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
char* tupbody = (char*)tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
USEMEM(state, GetMemoryChunkSpace(tuple));
/* read in the tuple proper */
tuple->t_len = tuplen;
LogicalTapeReadExact(state->tapeset, tapenum, tupbody, tupbodylen);
@ -3511,6 +3980,12 @@ static void reversedirection_heap(Tuplesortstate* state)
}
}
static void
movetup_heap(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
@ -3610,9 +4085,11 @@ static int comparetup_cluster(const SortTuple* a, const SortTuple* b, Tuplesorts
static void copytup_cluster(Tuplesortstate* state, SortTuple* stup, Tuple tup)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
Tuple tuple = tableam_tops_copy_tuple(tup);
stup->tuple = tuple;
USEMEM(state, GetMemoryChunkSpace(tuple));
MemoryContextSwitchTo(oldcontext);
/* set up first-column key value, if it's a simple column */
if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) {
stup->datum1 = tableam_tops_tuple_getattr(tuple,
@ -3650,7 +4127,8 @@ static void writetup_cluster(Tuplesortstate* state, int tapenum, SortTuple* stup
static void readtup_cluster(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int) - sizeof(TransactionId) * 2;
HeapTuple tuple = (HeapTuple)heaptup_alloc(t_len + HEAPTUPLESIZE);
HeapTuple tuple = (HeapTuple) readtup_alloc(state, tapenum, t_len + HEAPTUPLESIZE);
tuple->tupTableType = HEAP_TUPLE;
USEMEM(state, GetMemoryChunkSpace(tuple));
/* Reconstruct the HeapTupleData header */
@ -3678,6 +4156,18 @@ static void readtup_cluster(Tuplesortstate* state, SortTuple* stup, int tapenum,
}
}
static void
movetup_cluster(void *dest, void *src, unsigned int len)
{
HeapTuple tuple;
memmove(dest, src, len);
/* Repoint the HeapTupleData header */
tuple = (HeapTuple) dest;
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
}
/*
* Routines specialized for IndexTuple case
*
@ -3878,7 +4368,7 @@ static void copytup_index(Tuplesortstate* state, SortTuple* stup, void* tup)
IndexTuple newtuple;
/* copy the tuple into sort storage */
newtuple = (IndexTuple)palloc(tuplen);
newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
errno_t rc = memcpy_s(newtuple, tuplen, tuple, tuplen);
securec_check(rc, "\0", "\0");
@ -3914,9 +4404,8 @@ static void writetup_index(Tuplesortstate* state, int tapenum, SortTuple* stup)
static void readtup_index(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple)palloc(tuplen);
IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
USEMEM(state, GetMemoryChunkSpace(tuple));
LogicalTapeReadExact(state->tapeset, tapenum, tuple, tuplen);
if (state->randomAccess) {
/* need trailing length word? */
@ -3945,6 +4434,12 @@ static void reversedirection_index_hash(Tuplesortstate* state)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("reversedirection_index_hash is not implemented"))));
}
static void
movetup_index(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Routines specialized for DatumTuple case
*/
@ -3971,7 +4466,7 @@ static void writetup_datum(Tuplesortstate* state, int tapenum, SortTuple* stup)
if (stup->isnull1) {
waddr = NULL;
tuplen = 0;
} else if (state->datumTypeByVal) {
} else if (!state->tuples) {
waddr = &stup->datum1;
tuplen = sizeof(Datum);
} else {
@ -4011,19 +4506,18 @@ static void readtup_datum(Tuplesortstate* state, SortTuple* stup, int tapenum, u
stup->datum1 = (Datum)0;
stup->isnull1 = true;
stup->tuple = NULL;
} else if (state->datumTypeByVal) {
} else if (!state->tuples) {
Assert(tuplen == sizeof(Datum));
LogicalTapeReadExact(state->tapeset, tapenum, &stup->datum1, tuplen);
stup->isnull1 = false;
stup->tuple = NULL;
} else {
void* raddr = palloc(tuplen);
void* raddr = readtup_alloc(state, tapenum, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum, raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
USEMEM(state, GetMemoryChunkSpace(raddr));
}
if (state->randomAccess) {
@ -4248,6 +4742,12 @@ static void leader_takeover_tapes(Tuplesortstate *state)
state->status = TSS_BUILDRUNS;
}
static void
movetup_datum(void *dest, void *src, unsigned int len)
{
memmove(dest, src, len);
}
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
@ -4651,7 +5151,7 @@ Tuplesortstate* tuplesort_begin_merge(TupleDesc tupDesc, int nkeys, AttrNumber*
state->tp_runs[i] = 1;
state->tp_tapenum[i] = i;
}
beginmerge(state);
beginmerge(state, state->tuples);
state->status = TSS_FINALMERGE;
(void)MemoryContextSwitchTo(oldcontext);