forked from openGauss-Ecosystem/openGauss-server
9cca11c915e458323d0e746c68203f2c11da0302 提升sum、avg的速度
This commit is contained in:
parent
529f5e505e
commit
72c07837ab
|
@ -56,6 +56,48 @@ typedef struct {
|
|||
NumericVar step;
|
||||
} generate_series_numeric_fctx;
|
||||
|
||||
/* ----------
|
||||
* Fast sum accumulator.
|
||||
*
|
||||
* NumericSumAccum is used to implement SUM(), and other standard aggregates
|
||||
* that track the sum of input values. It uses 32-bit integers to store the
|
||||
* digits, instead of the normal 16-bit integers (with NBASE=10000). This
|
||||
* way, we can safely accumulate up to NBASE - 1 values without propagating
|
||||
* carry, before risking overflow of any of the digits. 'num_uncarried'
|
||||
* tracks how many values have been accumulated without propagating carry.
|
||||
*
|
||||
* Positive and negative values are accumulated separately, in 'pos_digits'
|
||||
* and 'neg_digits'. This is simpler and faster than deciding whether to add
|
||||
* or subtract from the current value, for each new value (see sub_var() for
|
||||
* the logic we avoid by doing this). Both buffers are of same size, and
|
||||
* have the same weight and scale. In accum_sum_final(), the positive and
|
||||
* negative sums are added together to produce the final result.
|
||||
*
|
||||
* When a new value has a larger ndigits or weight than the accumulator
|
||||
* currently does, the accumulator is enlarged to accommodate the new value.
|
||||
* We normally have one zero digit reserved for carry propagation, and that
|
||||
* is indicated by the 'have_carry_space' flag. When accum_sum_carry() uses
|
||||
* up the reserved digit, it clears the 'have_carry_space' flag. The next
|
||||
* call to accum_sum_add() will enlarge the buffer, to make room for the
|
||||
* extra digit, and set the flag again.
|
||||
*
|
||||
* To initialize a new accumulator, simply reset all fields to zeros.
|
||||
*
|
||||
* The accumulator does not handle NaNs.
|
||||
* ----------
|
||||
*/
|
||||
typedef struct NumericSumAccum
|
||||
{
|
||||
int ndigits;
|
||||
int weight;
|
||||
int dscale;
|
||||
int num_uncarried;
|
||||
bool have_carry_space;
|
||||
int32 *pos_digits;
|
||||
int32 *neg_digits;
|
||||
} NumericSumAccum;
|
||||
|
||||
|
||||
/* ----------
|
||||
* Sort support.
|
||||
* ----------
|
||||
|
@ -74,8 +116,8 @@ typedef struct NumericAggState
|
|||
bool isNaN; /* true if any processed number was NaN */
|
||||
MemoryContext agg_context; /* context we're calculating in */
|
||||
int64 N; /* count of processed numbers */
|
||||
NumericVar sumX; /* sum of processed numbers */
|
||||
NumericVar sumX2; /* sum of squares of processed numbers */
|
||||
NumericSumAccum sumX; /* sum of processed numbers */
|
||||
NumericSumAccum sumX2; /* sum of squares of processed numbers */
|
||||
} NumericAggState;
|
||||
|
||||
#define NUMERIC_ABBREV_BITS (SIZEOF_DATUM * BITS_PER_BYTE)
|
||||
|
@ -218,6 +260,14 @@ static void strip_var(NumericVar* var);
|
|||
static void compute_bucket(
|
||||
Numeric operand, Numeric bound1, Numeric bound2, NumericVar* count_var, NumericVar* result_var);
|
||||
|
||||
static void accum_sum_add(NumericSumAccum *accum, NumericVar *var1);
|
||||
static void accum_sum_rescale(NumericSumAccum *accum, NumericVar *val);
|
||||
static void accum_sum_carry(NumericSumAccum *accum);
|
||||
static void accum_sum_reset(NumericSumAccum *accum);
|
||||
static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
|
||||
static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
|
||||
static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
|
||||
|
||||
/*
|
||||
* @Description: call corresponding big integer operator functions.
|
||||
*
|
||||
|
@ -3300,23 +3350,13 @@ do_numeric_accum(NumericAggState *state, Numeric newval)
|
|||
/* The rest of this needs to work in the aggregate context */
|
||||
old_context = MemoryContextSwitchTo(state->agg_context);
|
||||
|
||||
if (state->N++ > 0)
|
||||
{
|
||||
/* Accumulate sums */
|
||||
add_var(&X, &(state->sumX), &(state->sumX));
|
||||
|
||||
if (state->calcSumX2)
|
||||
add_var(&X2, &(state->sumX2), &(state->sumX2));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* First input, so initialize sums */
|
||||
set_var_from_var(&X, &(state->sumX));
|
||||
|
||||
if (state->calcSumX2)
|
||||
set_var_from_var(&X2, &(state->sumX2));
|
||||
}
|
||||
|
||||
state->N++;
|
||||
|
||||
/* Accumulate sums */
|
||||
accum_sum_add(&(state->sumX), &X);
|
||||
|
||||
if (state->calcSumX2)
|
||||
accum_sum_add(&(state->sumX2), &X2);
|
||||
MemoryContextSwitchTo(old_context);
|
||||
}
|
||||
|
||||
|
@ -3464,6 +3504,7 @@ Datum numeric_avg(PG_FUNCTION_ARGS)
|
|||
NumericAggState *state;
|
||||
Datum N_datum;
|
||||
Datum sumX_datum;
|
||||
NumericVar sumX_var;
|
||||
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
|
||||
if (state == NULL) /* there were no non-null inputs */
|
||||
PG_RETURN_NULL();
|
||||
|
@ -3471,8 +3512,10 @@ Datum numeric_avg(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_NUMERIC(make_result(&const_nan));
|
||||
|
||||
N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
|
||||
sumX_datum = NumericGetDatum(make_result(&state->sumX));
|
||||
|
||||
init_var(&sumX_var);
|
||||
accum_sum_final(&state->sumX, &sumX_var);
|
||||
sumX_datum = NumericGetDatum(make_result(&sumX_var));
|
||||
free_var(&sumX_var);
|
||||
PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
|
||||
}
|
||||
|
||||
|
@ -3480,15 +3523,22 @@ Datum
|
|||
numeric_sum(PG_FUNCTION_ARGS)
|
||||
{
|
||||
NumericAggState *state;
|
||||
|
||||
NumericVar sumX_var;
|
||||
Numeric result;
|
||||
|
||||
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
|
||||
if (state == NULL) /* there were no non-null inputs */
|
||||
PG_RETURN_NULL();
|
||||
|
||||
if (state->isNaN) /* there was at least one NaN input */
|
||||
PG_RETURN_NUMERIC(make_result(&const_nan));
|
||||
|
||||
PG_RETURN_NUMERIC(make_result(&(state->sumX)));
|
||||
|
||||
init_var(&sumX_var);
|
||||
accum_sum_final(&state->sumX, &sumX_var);
|
||||
result = make_result(&sumX_var);
|
||||
free_var(&sumX_var);
|
||||
|
||||
PG_RETURN_NUMERIC(result);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3592,8 +3642,8 @@ static Numeric numeric_stddev_internal(NumericAggState* state, bool variance, bo
|
|||
/*
|
||||
* Handle Big Integer
|
||||
*/
|
||||
set_var_from_var(&(state->sumX), &vsumX);
|
||||
set_var_from_var(&(state->sumX2), &vsumX2);
|
||||
accum_sum_final(&(state->sumX), &vsumX);
|
||||
accum_sum_final(&(state->sumX2), &vsumX2);
|
||||
|
||||
/* compute rscale for mul_var calls */
|
||||
rscale = vsumX.dscale * 2;
|
||||
|
@ -3645,8 +3695,13 @@ void stddev_create_state_4_vector(PG_FUNCTION_ARGS)
|
|||
{
|
||||
NumericAggState *state = makeNumericAggState(fcinfo, true);
|
||||
state->N = DatumGetInt64(DirectFunctionCall1(numeric_int8, PG_GETARG_DATUM(1)));
|
||||
init_var_from_num(DatumGetNumeric(PG_GETARG_DATUM(2)), &(state->sumX));
|
||||
init_var_from_num(DatumGetNumeric(PG_GETARG_DATUM(3)), &(state->sumX2));
|
||||
NumericVar* sumX = (NumericVar*)palloc0(sizeof(NumericVar));
|
||||
NumericVar* sumX2 = (NumericVar*)palloc0(sizeof(NumericVar));
|
||||
init_var_from_num(DatumGetNumeric(PG_GETARG_DATUM(2)), sumX);
|
||||
init_var_from_num(DatumGetNumeric(PG_GETARG_DATUM(3)), sumX2);
|
||||
accum_sum_add(&(state->sumX), sumX);
|
||||
accum_sum_add(&(state->sumX2), sumX2);
|
||||
|
||||
fcinfo->arg[0] = PointerGetDatum(state);
|
||||
return ;
|
||||
}
|
||||
|
@ -19358,3 +19413,321 @@ Datum bool_numeric(PG_FUNCTION_ARGS)
|
|||
|
||||
PG_RETURN_NUMERIC(res);
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------------
|
||||
*
|
||||
* Fast sum accumulator functions
|
||||
*
|
||||
* ----------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Reset the accumulator's value to zero. The buffers to hold the digits
|
||||
* are not free'd.
|
||||
*/
|
||||
static void
|
||||
accum_sum_reset(NumericSumAccum *accum)
|
||||
{
|
||||
int i;
|
||||
|
||||
accum->dscale = 0;
|
||||
for (i = 0; i < accum->ndigits; i++)
|
||||
{
|
||||
accum->pos_digits[i] = 0;
|
||||
accum->neg_digits[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Accumulate a new value.
|
||||
*/
|
||||
static void
|
||||
accum_sum_add(NumericSumAccum *accum, NumericVar *val)
|
||||
{
|
||||
int32 *accum_digits;
|
||||
int i,
|
||||
val_i;
|
||||
int val_ndigits;
|
||||
NumericDigit *val_digits;
|
||||
|
||||
/*
|
||||
* If we have accumulated too many values since the last carry
|
||||
* propagation, do it now, to avoid overflowing. (We could allow more
|
||||
* than NBASE - 1, if we reserved two extra digits, rather than one, for
|
||||
* carry propagation. But even with NBASE - 1, this needs to be done so
|
||||
* seldom, that the performance difference is negligible.)
|
||||
*/
|
||||
if (accum->num_uncarried == NBASE - 1)
|
||||
accum_sum_carry(accum);
|
||||
|
||||
/*
|
||||
* Adjust the weight or scale of the old value, so that it can accommodate
|
||||
* the new value.
|
||||
*/
|
||||
accum_sum_rescale(accum, val);
|
||||
|
||||
/* */
|
||||
if (val->sign == NUMERIC_POS)
|
||||
accum_digits = accum->pos_digits;
|
||||
else
|
||||
accum_digits = accum->neg_digits;
|
||||
|
||||
/* copy these values into local vars for speed in loop */
|
||||
val_ndigits = val->ndigits;
|
||||
val_digits = val->digits;
|
||||
|
||||
i = accum->weight - val->weight;
|
||||
for (val_i = 0; val_i < val_ndigits; val_i++)
|
||||
{
|
||||
accum_digits[i] += (int32) val_digits[val_i];
|
||||
i++;
|
||||
}
|
||||
|
||||
accum->num_uncarried++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Propagate carries.
|
||||
*/
|
||||
static void
|
||||
accum_sum_carry(NumericSumAccum *accum)
|
||||
{
|
||||
int i;
|
||||
int ndigits;
|
||||
int32 *dig;
|
||||
int32 carry;
|
||||
int32 newdig = 0;
|
||||
|
||||
/*
|
||||
* If no new values have been added since last carry propagation, nothing
|
||||
* to do.
|
||||
*/
|
||||
if (accum->num_uncarried == 0)
|
||||
return;
|
||||
|
||||
/*
|
||||
* We maintain that the weight of the accumulator is always one larger
|
||||
* than needed to hold the current value, before carrying, to make sure
|
||||
* there is enough space for the possible extra digit when carry is
|
||||
* propagated. We cannot expand the buffer here, unless we require
|
||||
* callers of accum_sum_final() to switch to the right memory context.
|
||||
*/
|
||||
Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
|
||||
|
||||
ndigits = accum->ndigits;
|
||||
|
||||
/* Propagate carry in the positive sum */
|
||||
dig = accum->pos_digits;
|
||||
carry = 0;
|
||||
for (i = ndigits - 1; i >= 0; i--)
|
||||
{
|
||||
newdig = dig[i] + carry;
|
||||
if (newdig >= NBASE)
|
||||
{
|
||||
carry = newdig / NBASE;
|
||||
newdig -= carry * NBASE;
|
||||
}
|
||||
else
|
||||
carry = 0;
|
||||
dig[i] = newdig;
|
||||
}
|
||||
/* Did we use up the digit reserved for carry propagation? */
|
||||
if (newdig > 0)
|
||||
accum->have_carry_space = false;
|
||||
|
||||
/* And the same for the negative sum */
|
||||
dig = accum->neg_digits;
|
||||
carry = 0;
|
||||
for (i = ndigits - 1; i >= 0; i--)
|
||||
{
|
||||
newdig = dig[i] + carry;
|
||||
if (newdig >= NBASE)
|
||||
{
|
||||
carry = newdig / NBASE;
|
||||
newdig -= carry * NBASE;
|
||||
}
|
||||
else
|
||||
carry = 0;
|
||||
dig[i] = newdig;
|
||||
}
|
||||
if (newdig > 0)
|
||||
accum->have_carry_space = false;
|
||||
|
||||
accum->num_uncarried = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Re-scale accumulator to accommodate new value.
|
||||
*
|
||||
* If the new value has more digits than the current digit buffers in the
|
||||
* accumulator, enlarge the buffers.
|
||||
*/
|
||||
static void
|
||||
accum_sum_rescale(NumericSumAccum *accum, NumericVar *val)
|
||||
{
|
||||
int old_weight = accum->weight;
|
||||
int old_ndigits = accum->ndigits;
|
||||
int accum_ndigits;
|
||||
int accum_weight;
|
||||
int accum_rscale;
|
||||
int val_rscale;
|
||||
|
||||
accum_weight = old_weight;
|
||||
accum_ndigits = old_ndigits;
|
||||
|
||||
/*
|
||||
* Does the new value have a larger weight? If so, enlarge the buffers,
|
||||
* and shift the existing value to the new weight, by adding leading
|
||||
* zeros.
|
||||
*
|
||||
* We enforce that the accumulator always has a weight one larger than
|
||||
* needed for the inputs, so that we have space for an extra digit at the
|
||||
* final carry-propagation phase, if necessary.
|
||||
*/
|
||||
if (val->weight >= accum_weight)
|
||||
{
|
||||
accum_weight = val->weight + 1;
|
||||
accum_ndigits = accum_ndigits + (accum_weight - old_weight);
|
||||
}
|
||||
|
||||
/*
|
||||
* Even though the new value is small, we might've used up the space
|
||||
* reserved for the carry digit in the last call to accum_sum_carry(). If
|
||||
* so, enlarge to make room for another one.
|
||||
*/
|
||||
else if (!accum->have_carry_space)
|
||||
{
|
||||
accum_weight++;
|
||||
accum_ndigits++;
|
||||
}
|
||||
|
||||
/* Is the new value wider on the right side? */
|
||||
accum_rscale = accum_ndigits - accum_weight - 1;
|
||||
val_rscale = val->ndigits - val->weight - 1;
|
||||
if (val_rscale > accum_rscale)
|
||||
accum_ndigits = accum_ndigits + (val_rscale - accum_rscale);
|
||||
|
||||
if (accum_ndigits != old_ndigits ||
|
||||
accum_weight != old_weight)
|
||||
{
|
||||
int32 *new_pos_digits;
|
||||
int32 *new_neg_digits;
|
||||
int weightdiff;
|
||||
|
||||
weightdiff = accum_weight - old_weight;
|
||||
|
||||
new_pos_digits = (int32*)palloc0(accum_ndigits * sizeof(int32));
|
||||
new_neg_digits = (int32*)palloc0(accum_ndigits * sizeof(int32));
|
||||
|
||||
if (accum->pos_digits)
|
||||
{
|
||||
memcpy(&new_pos_digits[weightdiff], accum->pos_digits,
|
||||
old_ndigits * sizeof(int32));
|
||||
pfree(accum->pos_digits);
|
||||
|
||||
memcpy(&new_neg_digits[weightdiff], accum->neg_digits,
|
||||
old_ndigits * sizeof(int32));
|
||||
pfree(accum->neg_digits);
|
||||
}
|
||||
|
||||
accum->pos_digits = new_pos_digits;
|
||||
accum->neg_digits = new_neg_digits;
|
||||
|
||||
accum->weight = accum_weight;
|
||||
accum->ndigits = accum_ndigits;
|
||||
|
||||
Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
|
||||
accum->have_carry_space = true;
|
||||
}
|
||||
|
||||
if (val->dscale > accum->dscale)
|
||||
accum->dscale = val->dscale;
|
||||
}
|
||||
|
||||
/*
|
||||
* Return the current value of the accumulator. This perform final carry
|
||||
* propagation, and adds together the positive and negative sums.
|
||||
*
|
||||
* Unlike all the other routines, the caller is not required to switch to
|
||||
* the memory context that holds the accumulator.
|
||||
*/
|
||||
static void
|
||||
accum_sum_final(NumericSumAccum *accum, NumericVar *result)
|
||||
{
|
||||
int i;
|
||||
NumericVar pos_var;
|
||||
NumericVar neg_var;
|
||||
|
||||
if (accum->ndigits == 0)
|
||||
{
|
||||
set_var_from_var(&const_zero, result);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Perform final carry */
|
||||
accum_sum_carry(accum);
|
||||
|
||||
/* Create NumericVars representing the positive and negative sums */
|
||||
init_var(&pos_var);
|
||||
init_var(&neg_var);
|
||||
|
||||
pos_var.ndigits = neg_var.ndigits = accum->ndigits;
|
||||
pos_var.weight = neg_var.weight = accum->weight;
|
||||
pos_var.dscale = neg_var.dscale = accum->dscale;
|
||||
pos_var.sign = NUMERIC_POS;
|
||||
neg_var.sign = NUMERIC_NEG;
|
||||
|
||||
pos_var.buf = pos_var.digits = digitbuf_alloc(accum->ndigits);
|
||||
neg_var.buf = neg_var.digits = digitbuf_alloc(accum->ndigits);
|
||||
|
||||
for (i = 0; i < accum->ndigits; i++)
|
||||
{
|
||||
Assert(accum->pos_digits[i] < NBASE);
|
||||
pos_var.digits[i] = (int16) accum->pos_digits[i];
|
||||
|
||||
Assert(accum->neg_digits[i] < NBASE);
|
||||
neg_var.digits[i] = (int16) accum->neg_digits[i];
|
||||
}
|
||||
|
||||
/* And add them together */
|
||||
add_var(&pos_var, &neg_var, result);
|
||||
|
||||
/* Remove leading/trailing zeroes */
|
||||
strip_var(result);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy an accumulator's state.
|
||||
*
|
||||
* 'dst' is assumed to be uninitialized beforehand. No attempt is made at
|
||||
* freeing old values.
|
||||
*/
|
||||
static void
|
||||
accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src)
|
||||
{
|
||||
dst->pos_digits = (int32*)palloc(src->ndigits * sizeof(int32));
|
||||
dst->neg_digits = (int32*)palloc(src->ndigits * sizeof(int32));
|
||||
|
||||
memcpy(dst->pos_digits, src->pos_digits, src->ndigits * sizeof(int32));
|
||||
memcpy(dst->neg_digits, src->neg_digits, src->ndigits * sizeof(int32));
|
||||
dst->num_uncarried = src->num_uncarried;
|
||||
dst->ndigits = src->ndigits;
|
||||
dst->weight = src->weight;
|
||||
dst->dscale = src->dscale;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add the current value of 'accum2' into 'accum'.
|
||||
*/
|
||||
static void
|
||||
accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2)
|
||||
{
|
||||
NumericVar tmp_var;
|
||||
|
||||
init_var(&tmp_var);
|
||||
|
||||
accum_sum_final(accum2, &tmp_var);
|
||||
accum_sum_add(accum, &tmp_var);
|
||||
|
||||
free_var(&tmp_var);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue