减少agg算子中的投影次数

This commit is contained in:
cc_db_dev 2022-07-07 16:57:50 +08:00
parent 53541c813b
commit e0870fb47a
3 changed files with 103 additions and 37 deletions

View File

@ -131,6 +131,7 @@
#include "executor/executor.h"
#include "executor/node/nodeAgg.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
@ -292,14 +293,14 @@ static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate,
*/
if (peraggstate->numInputs == 1) {
peraggstate->sortstates[aggstate->current_set] =
tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid,
tuplesort_begin_datum(peraggstate->sortdesc->attrs[0]->atttypid,
peraggstate->sortOperators[0],
peraggstate->sortCollations[0],
peraggstate->sortNullsFirst[0],
local_work_mem,
false);
} else {
peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc,
peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->sortdesc,
peraggstate->numSortCols,
peraggstate->sortColIdx,
peraggstate->sortOperators,
@ -617,19 +618,21 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
int setno = 0;
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numAggs = aggstate->numaggs;
TupleTableSlot *slot = aggstate->evalslot;
/* compute input for all aggregates */
if (aggstate->evalproj)
aggstate->evalslot = ExecProject(aggstate->evalproj, NULL);
for (aggno = 0; aggno < aggstate->numaggs; aggno++) {
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
int numTransInputs = peraggstate->numTransInputs;
int i;
TupleTableSlot* slot = NULL;
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(peraggstate->evalproj, NULL);
int inputoff = peraggstate->inputoff;
if (peraggstate->numSortCols > 0) {
/* DISTINCT and/or ORDER BY case */
Assert(slot->tts_nvalid == peraggstate->numInputs);
Assert(slot->tts_nvalid >= (peraggstate->numInputs + inputoff));
/*
* If the transfn is strict, we want to check for nullity before
@ -640,7 +643,7 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
*/
if (peraggstate->transfn.fn_strict) {
for (i = 0; i < numTransInputs; i++) {
if (slot->tts_isnull[i])
if (slot->tts_isnull[i + inputoff])
break;
}
if (i < numTransInputs)
@ -651,9 +654,25 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
/* OK, put the tuple into the tuplesort object */
if (peraggstate->numInputs == 1)
tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[0], slot->tts_isnull[0]);
else
tuplesort_puttupleslot(peraggstate->sortstates[setno], slot);
tuplesort_putdatum(peraggstate->sortstates[setno], slot->tts_values[inputoff],
slot->tts_isnull[inputoff]);
else {
errno_t errorno = EOK;
/*
* Copy slot contents, starting from inputoff, into sort
* slot.
*/
ExecClearTuple(peraggstate->sortslot);
errorno = memcpy_s(peraggstate->sortslot->tts_values, peraggstate->numInputs * sizeof(Datum),
&slot->tts_values[inputoff], peraggstate->numInputs * sizeof(Datum));
securec_check(errorno, "\0", "\0");
errorno = memcpy_s(peraggstate->sortslot->tts_isnull, peraggstate->numInputs * sizeof(bool),
&slot->tts_isnull[inputoff], peraggstate->numInputs * sizeof(bool));
securec_check(errorno, "\0", "\0");
peraggstate->sortslot->tts_nvalid = peraggstate->numInputs;
ExecStoreVirtualTuple(peraggstate->sortslot);
tuplesort_puttupleslot(peraggstate->sortstates[setno], peraggstate->sortslot);
}
}
} else {
/* We can apply the transition function immediately */
@ -663,8 +682,8 @@ static void advance_aggregates(AggState* aggstate, AggStatePerGroup pergroup)
/* Start from 1, since the 0th arg will be the transition value */
Assert(slot->tts_nvalid >= numTransInputs);
for (i = 0; i < numTransInputs; i++) {
fcinfo->arg[i + 1] = slot->tts_values[i];
fcinfo->argnull[i + 1] = slot->tts_isnull[i];
fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
fcinfo->argTypes[i + 1] = InvalidOid;
}
for (setno = 0; setno < numGroupingSets; setno++) {
@ -797,7 +816,7 @@ static void process_ordered_aggregate_multi(
{
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
FunctionCallInfo fcinfo = &peraggstate->transfn_fcinfo;
TupleTableSlot* slot1 = peraggstate->evalslot;
TupleTableSlot* slot1 = peraggstate->sortslot;
TupleTableSlot* slot2 = peraggstate->uniqslot;
int numTransInputs = peraggstate->numTransInputs;
int numDistinctCols = peraggstate->numDistinctCols;
@ -1951,10 +1970,12 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
ExprContext* econtext = NULL;
int numaggs, aggno;
int phase;
List *combined_inputeval;
ListCell* l = NULL;
Bitmapset* all_grouped_cols = NULL;
int numGroupingSets = 1;
int numPhases;
int column_offset;
int currentsortno = 0;
int i = 0;
int j = 0;
@ -2496,20 +2517,6 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
"aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid)));
}
/*
* Get a tupledesc corresponding to the inputs (including sort
* expressions) of the agg.
*/
peraggstate->evaldesc = ExecTypeFromTL(aggref->args, false);
/* Create slot we're going to do argument evaluation in */
peraggstate->evalslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc);
/* Set up projection info for evaluation */
peraggstate->evalproj =
ExecBuildProjectionInfo(aggrefstate->args, aggstate->tmpcontext, peraggstate->evalslot, NULL);
/*
* If we're doing either DISTINCT or ORDER BY, then we have a list of
* SortGroupClause nodes; fish out the data in them and stick them
@ -2534,6 +2541,13 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
peraggstate->numDistinctCols = numDistinctCols;
if (numSortCols > 0) {
/*
* Get a tupledesc and slot corresponding to the aggregated inputs
* (including sort expressions) of the agg.
*/
peraggstate->sortdesc = ExecTypeFromTL(aggref->args, false);
peraggstate->sortslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->sortslot, peraggstate->sortdesc);
/*
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
* (yet)
@ -2546,7 +2560,7 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
} else if (numDistinctCols > 0) {
/* we will need an extra slot to store prior values */
peraggstate->uniqslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->evaldesc);
ExecSetSlotDescriptor(peraggstate->uniqslot, peraggstate->sortdesc);
}
/* Extract the sort information for use later */
@ -2596,6 +2610,47 @@ AggState* ExecInitAgg(Agg* node, EState* estate, int eflags)
/* Update numaggs to match number of unique aggregates found */
aggstate->numaggs = aggno + 1;
/*
* Build a single projection computing the aggregate arguments for all
* aggregates at once, that's considerably faster than doing it separately
* for each.
*
* First create a targetlist combining the targetlist of all the
* transitions.
*/
combined_inputeval = NIL;
column_offset = 0;
for (int transno = 0; transno < aggstate->numaggs; transno++) {
AggStatePerAggData* pertrans = &peragg[transno];
ListCell *arg;
pertrans->inputoff = column_offset;
/*
* Adjust resno in a copied target entries, to point into the combined
* slot.
*/
foreach(arg, pertrans->aggref->args)
{
TargetEntry *source_tle = (TargetEntry *) lfirst(arg);
TargetEntry *tle;
Assert(IsA(source_tle, TargetEntry));
tle = flatCopyTargetEntry(source_tle);
tle->resno += column_offset;
combined_inputeval = lappend(combined_inputeval, tle);
}
column_offset += list_length(pertrans->aggref->args);
}
/* and then create a projection for that targetlist */
aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
aggstate->evalslot = ExecInitExtraTupleSlot(estate);
combined_inputeval = (List *) ExecInitExpr((Expr *)combined_inputeval, (PlanState *)aggstate);
aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval, aggstate->tmpcontext, aggstate->evalslot, NULL);
ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
AggWriteFileControl* TempFilePara = (AggWriteFileControl*)palloc(sizeof(AggWriteFileControl));
TempFilePara->strategy = MEMORY_HASHAGG;

View File

@ -79,6 +79,9 @@ typedef struct AggStatePerAggData {
/* number of inputs including ORDER BY expressions */
int numInputs;
/* offset of input columns in AggState->evalslot */
int inputoff;
bool is_avg;
@ -151,19 +154,19 @@ typedef struct AggStatePerAggData {
bool inputtypeByVal, resulttypeByVal, transtypeByVal;
/*
* Stuff for evaluation of inputs. We used to just use ExecEvalExpr, but
* with the addition of ORDER BY we now need at least a slot for passing
* data to the sort object, which requires a tupledesc, so we might as
* well go whole hog and use ExecProject too.
* Stuff for evaluation of aggregate inputs in cases where the aggregate
* requires sorted input. The arguments themselves will be evaluated via
* AggState->evalslot/evalproj for all aggregates at once, but we only
* want to sort the relevant columns for individual aggregates.
*/
TupleDesc evaldesc; /* descriptor of input tuples */
ProjectionInfo* evalproj; /* projection machinery */
TupleDesc sortdesc; /* descriptor of input tuples */
/*
* Slots for holding the evaluated input arguments. These are set up
* during ExecInitAgg() and then used for each input row.
* during ExecInitAgg() and then used for each input row requiring
* procesessing besides what's done in AggState->evalproj.
*/
TupleTableSlot* evalslot; /* current input tuple */
TupleTableSlot *sortslot; /* current input tuple */
TupleTableSlot* uniqslot; /* used for multi-column DISTINCT */
/*
@ -188,6 +191,10 @@ typedef struct AggStatePerAggData {
* worth the extra space consumption. cached for transfn and collectfn now.
*/
FunctionCallInfoData transfn_fcinfo;
/* XXX: use for vector engine now, better remove later*/
TupleDesc evaldesc; /* descriptor of input tuples */
ProjectionInfo *evalproj; /* projection machinery */
TupleTableSlot *evalslot; /* current input tuple */
} AggStatePerAggData;
/*

View File

@ -2334,6 +2334,10 @@ typedef struct AggState {
#endif /* PGXC */
void* aggTempFileControl;
FmgrInfo* eqfunctions; /* per-grouping-field equality fns */
/* support for evaluation of agg inputs */
TupleTableSlot *evalslot; /* slot for agg inputs */
ProjectionInfo *evalproj; /* projection machinery */
TupleDesc evaldesc; /* descriptor of input tuples */
} AggState;
/* ----------------