本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggregates/project_aggregates.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/* --------------------- * AggState information * * ss.ss_ScanTupleSlot refers to output of underlying plan. * ss.ss_ScanTupleSlot指的是基础计划的输出. * (ss = ScanState,ps = PlanState) * * Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and * ecxt_aggnulls arrays, which hold the computed agg values for the current * input group during evaluation of an Agg node's output tuple(s). We * create a second ExprContext, tmpcontext, in which to evaluate input * expressions and run the aggregate transition functions. * 注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, * 这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{ //第一个字段是NodeTag(继承自ScanState) ScanState ss; /* its first field is NodeTag */ //targetlist和quals中所有的Aggref List *aggs; /* all Aggref nodes in targetlist & quals */ //链表的大小(可以为0) int numaggs; /* length of list (could be zero!) */ //pertrans条目大小 int numtrans; /* number of pertrans items */ //Agg策略模式 AggStrategy aggstrategy; /* strategy mode */ //agg-splitting模式,参见nodes.h AggSplit aggsplit; /* agg-splitting mode, see nodes.h */ //指向当前步骤数据的指针 AggStatePerPhase phase; /* pointer to current phase data */ //步骤数(包括0) int numphases; /* number of phases (including phase 0) */ //当前步骤 int current_phase; /* current phase number */ //per-Aggref信息 AggStatePerAgg peragg; /* per-Aggref information */ //per-Trans状态信息 AggStatePerTrans pertrans; /* per-Trans state information */ //长生命周期数据的ExprContexts(hashtable) ExprContext *hashcontext; /* econtexts for long-lived data (hashtable) */ ////长生命周期数据的ExprContexts(每一个GS使用) ExprContext **aggcontexts; /* econtexts for long-lived data (per GS) */ //输入表达式的ExprContext ExprContext *tmpcontext; /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14 //当前活跃的aggcontext ExprContext *curaggcontext; /* currently active aggcontext */ //当前活跃的aggregate(如存在) AggStatePerAgg curperagg; /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16 //当前活跃的trans state AggStatePerTrans curpertrans; /* currently active trans state, if any */ //输入结束? bool input_done; /* indicates end of input */ //Agg扫描结束? bool agg_done; /* indicates completion of Agg scan */ //最后一个grouping set int projected_set; /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20 //将要解析的当前grouping set int current_set; /* The current grouping set being evaluated */ //当前投影操作的分组列 Bitmapset *grouped_cols; /* grouped cols in current projection */ //倒序的分组列链表 List *all_grouped_cols; /* list of all grouped cols in DESC order */ /* These fields are for grouping set phase data */ //-------- 下面的列用于grouping set步骤数据 //所有步骤中最大的sets大小 int maxsets; /* The max number of sets in any phase */ //所有步骤的数组 AggStatePerPhase phases; /* array of all phases */ //对于phases > 1,已排序的输入信息 Tuplesortstate *sort_in; /* sorted input to phases > 1 */ //对于下一个步骤,输入已拷贝 Tuplesortstate *sort_out; /* input is copied here for next phase */ //排序结果的slot TupleTableSlot *sort_slot; /* slot for sort results */ /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */ //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式: //per-group指针的grouping set编号数组 AggStatePerGroup *pergroups; /* grouping set indexed array of per-group * pointers */ //当前组的第一个元组拷贝 HeapTuple grp_firstTuple; /* copy of first tuple of current group */ /* these fields are used in AGG_HASHED and AGG_MIXED modes: */ //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式: //是否已填充hash表? bool table_filled; /* hash table filled yet? */ //hash桶数? int num_hashes; //相应的哈希表数据数组 AggStatePerHash perhash; /* array of per-hashtable data */ //per-group指针的grouping set编号数组 AggStatePerGroup *hash_pergroup; /* grouping set indexed array of * per-group pointers */ /* support for evaluation of agg input expressions: */ //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34 //首先是->pergroups,然后是hash_pergroup AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ //投影实现机制 ProjectionInfo *combinedproj; /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE 0x01 /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL 0x02 /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE 0x04 /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE 0x08 /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{ /* Basic, non-split aggregation: */ //基本 : 非split聚合 AGGSPLIT_SIMPLE = 0, /* Initial phase of partial aggregation, with serialization: */ //部分聚合的初始步骤,序列化 AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE, /* Final phase of partial aggregation, with deserialization: */ //部分聚合的最终步骤,反序列化 AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)二、源码解读

prepare_projection_slot
prepare_projection_slot函数基于指定的典型元组slot和grouping set准备finalize和project.
比如初始化isnull数组等.

/* * Prepare to finalize and project based on the specified representative tuple * slot and grouping set. * 基于指定的典型元组slot和grouping set准备finalize和project. * * In the specified tuple slot, force to null all attributes that should be * read as null in the context of the current grouping set. Also stash the * current group bitmap where GroupingExpr can get at it. * 在指定的元组slot,强制在当前grouping set上下文中应为null的所有属性值为null. * 还可以将当前组位图保存在GroupingExpr可以获得的位置. * * This relies on three conditions: * 这取决于下面3个条件: * * 1) Nothing is ever going to try and extract the whole tuple from this slot, * only reference it in evaluations, which will only access individual * attributes. * 1) 永远不会尝试从该slot中提取整个元组,只是在解析中依赖它,这只会访问单个属性. * * 2) No system columns are going to need to be nulled. (If a system column is * referenced in a group clause, it is actually projected in the outer plan * tlist.) * 2) 系统列不需要设置为null. * (如在group语句中依赖系统列,实际上已在outer plan tlist中已完成投影) * * 3) Within a given phase, we never need to recover the value of an attribute * once it has been set to null. * 3) 在给定的阶段,一旦属性被设置为null,就不需要恢复属性值. * * Poking into the slot this way is a bit ugly, but the consensus is that the * alternative was worse. * 以这种方法使用slot有点丑陋,但其他方式更糟糕. */static voidprepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet){ if (aggstate->phase->grouped_cols) { Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet]; aggstate->grouped_cols = grouped_cols; if (slot->tts_isempty) { /* * Force all values to be NULL if working on an empty input tuple * (i.e. an empty grouping set for which no input rows were * supplied). * 如输入tuple为空,则强制所有值为NULL. * (如不提供输入行的空grouping set) */ ExecStoreAllNullTuple(slot); } else if (aggstate->all_grouped_cols) { ListCell *lc; /* all_grouped_cols is arranged in desc order */ //all_grouped_cols以倒序的方式组织 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); foreach(lc, aggstate->all_grouped_cols) { int attnum = lfirst_int(lc); if (!bms_is_member(attnum, grouped_cols)) slot->tts_isnull[attnum - 1] = true; } } }}

finalize_aggregates
finalize_aggregates函数计算某一组所有聚合的最终值,实现函数是finalize_aggregate,该实现函数下节再行介绍.

/* * Compute the final value of all aggregates for one group. * 计算某一组所有聚合的最终值 * * This function handles only one grouping set at a time, which the caller must * have selected. It's also the caller's responsibility to adjust the supplied * pergroup parameter to point to the current set's transvalues. * 该函数一次只会处理一个grouping set(调用者负责选择). * 调用者同样有职责调整提供的pergroup参数为指向当前集合的transvalues. * * Results are stored in the output econtext aggvalues/aggnulls. */static voidfinalize_aggregates(AggState *aggstate, AggStatePerAgg peraggs, AggStatePerGroup pergroup){ ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; Datum *aggvalues = econtext->ecxt_aggvalues; bool *aggnulls = econtext->ecxt_aggnulls; int aggno; int transno; /* * If there were any DISTINCT and/or ORDER BY aggregates, sort their * inputs and run the transition functions. * 如存在DISTINCT或ORDER BY 聚合,排序这些输入并执行转换函数. */ //遍历转换函数 for (transno = 0; transno < aggstate->numtrans; transno++) { //转换函数 AggStatePerTrans pertrans = &aggstate->pertrans[transno]; //pergroup AggStatePerGroup pergroupstate; pergroupstate = &pergroup[transno]; if (pertrans->numSortCols > 0) { //--- 存在DISTINCT/ORDER BY //验证,Hash不需要排序 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED); if (pertrans->numInputs == 1) //单独 process_ordered_aggregate_single(aggstate, pertrans, pergroupstate); else //多个 process_ordered_aggregate_multi(aggstate, pertrans, pergroupstate); } } /* * Run the final functions. * 执行获取最终值的函数 */ //遍历聚合 for (aggno = 0; aggno < aggstate->numaggs; aggno++) { //获取peragg AggStatePerAgg peragg = &peraggs[aggno]; int transno = peragg->transno; AggStatePerGroup pergroupstate; //pergroup pergroupstate = &pergroup[transno]; if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) //并行处理结果 finalize_partialaggregate(aggstate, peragg, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); else //调用finalize_aggregate获取结果 finalize_aggregate(aggstate, peragg, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); }}

project_aggregates
project_aggregates函数投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).

/* * Project the result of a group (whose aggs have already been calculated by * finalize_aggregates). Returns the result slot, or NULL if no row is * projected (suppressed by qual). * 投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到). * 返回结果slot,如无结果行投影(通过qual处理)则返回NULL. */static TupleTableSlot *project_aggregates(AggState *aggstate){ ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; /* * Check the qual (HAVING clause); if the group does not match, ignore it. * 检查条件表达式(HAVING子句).如跟group不匹配,则忽略之. */ if (ExecQual(aggstate->ss.ps.qual, econtext)) { /* * Form and return projection tuple using the aggregate results and * the representative input tuple. * 使用聚合结果和相应的输入tuple组成并返回投影元组. */ return ExecProject(aggstate->ss.ps.ps_ProjInfo); } else InstrCountFiltered1(aggstate, 1); return NULL;}#define InstrCountFiltered1(node, delta) \ do { \ if (((PlanState *)(node))->instrument) \ ((PlanState *)(node))->instrument->nfiltered1 += (delta); \ } while(0)

ExecProject
ExecProject函数基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.

/* * ExecProject * * Projects a tuple based on projection info and stores it in the slot passed * to ExecBuildProjectInfo(). * 基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中. * * Note: the result is always a virtual tuple; therefore it may reference * the contents of the exprContext's scan tuples and/or temporary results * constructed in the exprContext. If the caller wishes the result to be * valid longer than that data will be valid, he must call ExecMaterializeSlot * on the result slot. * 注意:结果通常是虚拟元组.因此该元组可能会依赖exprContext扫描元组的内容和/或在exprContext中构建的临时结果. * 如果调用者希望结果比数据更长久有效,调用者必须调用在结果slot上调用ExecMaterializeSlot(物化). */#ifndef FRONTENDstatic inline TupleTableSlot *ExecProject(ProjectionInfo *projInfo){ ExprContext *econtext = projInfo->pi_exprContext; ExprState *state = &projInfo->pi_state; TupleTableSlot *slot = state->resultslot; bool isnull; /* * Clear any former contents of the result slot. This makes it safe for * us to use the slot's Datum/isnull arrays as workspace. * 清理结果slot的形式内容. * 这可以确保slot的Datum/isnull数组是OK的. */ ExecClearTuple(slot); /* Run the expression, discarding scalar result from the last column. */ //执行表达式解析,丢弃scalar结果. (void) ExecEvalExprSwitchContext(state, econtext, &isnull); /* * Successfully formed a result row. Mark the result slot as containing a * valid virtual tuple (inlined version of ExecStoreVirtualTuple()). * 成功组成一个结果行. * 标记结果slot为包含有效虚拟元组(内联版本的ExecStoreVirtualTuple) */ slot->tts_isempty = false; slot->tts_nvalid = slot->tts_tupleDescriptor->natts; return slot;}#endif三、跟踪分析

N/A

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构