本篇内容介绍了“怎么使用PostgreSQL ExecAgg函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/*---------------------*AggStateinformation**ss.ss_ScanTupleSlotreferstooutputofunderlyingplan.*ss.ss_ScanTupleSlot指的是基础计划的输出.*(ss=ScanState,ps=PlanState)**Note:ss.ps.ps_ExprContextcontainsecxt_aggvaluesand*ecxt_aggnullsarrays,whichholdthecomputedaggvaluesforthecurrent*inputgroupduringevaluationofanAggnode'soutputtuple(s).We*createasecondExprContext,tmpcontext,inwhichtoevaluateinput*expressionsandruntheaggregatetransitionfunctions.*注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组,*这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值.*---------------------*//*thesestructsareprivateinnodeAgg.c:*///在nodeAgg.c中私有的结构体typedefstructAggStatePerAggData*AggStatePerAgg;typedefstructAggStatePerTransData*AggStatePerTrans;typedefstructAggStatePerGroupData*AggStatePerGroup;typedefstructAggStatePerPhaseData*AggStatePerPhase;typedefstructAggStatePerHashData*AggStatePerHash;typedefstructAggState{//第一个字段是NodeTag(继承自ScanState)ScanStatess;/*itsfirstfieldisNodeTag*///targetlist和quals中所有的AggrefList*aggs;/*allAggrefnodesintargetlist&quals*///链表的大小(可以为0)intnumaggs;/*lengthoflist(couldbezero!)*///pertrans条目大小intnumtrans;/*numberofpertransitems*///Agg策略模式AggStrategyaggstrategy;/*strategymode*///agg-splitting模式,参见nodes.hAggSplitaggsplit;/*agg-splittingmode,seenodes.h*///指向当前步骤数据的指针AggStatePerPhasephase;/*pointertocurrentphasedata*///步骤数(包括0)intnumphases;/*numberofphases(includingphase0)*///当前步骤intcurrent_phase;/*currentphasenumber*///per-Aggref信息AggStatePerAggperagg;/*per-Aggrefinformation*///per-Trans状态信息AggStatePerTranspertrans;/*per-Transstateinformation*///长生命周期数据的ExprContexts(hashtable)ExprContext*hashcontext;/*econtextsforlong-liveddata(hashtable)*/////长生命周期数据的ExprContexts(每一个GS使用)ExprContext**aggcontexts;/*econtextsforlong-liveddata(perGS)*///输入表达式的ExprContextExprContext*tmpcontext;/*econtextforinputexpressions*/#defineFIELDNO_AGGSTATE_CURAGGCONTEXT14//当前活跃的aggcontextExprContext*curaggcontext;/*currentlyactiveaggcontext*///当前活跃的aggregate(如存在)AggStatePerAggcurperagg;/*currentlyactiveaggregate,ifany*/#defineFIELDNO_AGGSTATE_CURPERTRANS16//当前活跃的transstateAggStatePerTranscurpertrans;/*currentlyactivetransstate,ifany*///输入结束?boolinput_done;/*indicatesendofinput*///Agg扫描结束?boolagg_done;/*indicatescompletionofAggscan*///最后一个groupingsetintprojected_set;/*Thelastprojectedgroupingset*/#defineFIELDNO_AGGSTATE_CURRENT_SET20//将要解析的当前groupingsetintcurrent_set;/*Thecurrentgroupingsetbeingevaluated*///当前投影操作的分组列Bitmapset*grouped_cols;/*groupedcolsincurrentprojection*///倒序的分组列链表List*all_grouped_cols;/*listofallgroupedcolsinDESCorder*//*Thesefieldsareforgroupingsetphasedata*///--------下面的列用于groupingset步骤数据//所有步骤中最大的sets大小intmaxsets;/*Themaxnumberofsetsinanyphase*///所有步骤的数组AggStatePerPhasephases;/*arrayofallphases*///对于phases>1,已排序的输入信息Tuplesortstate*sort_in;/*sortedinputtophases>1*///对于下一个步骤,输入已拷贝Tuplesortstate*sort_out;/*inputiscopiedherefornextphase*///排序结果的slotTupleTableSlot*sort_slot;/*slotforsortresults*//*thesefieldsareusedinAGG_PLAINandAGG_SORTEDmodes:*///-------下面的列用于AGG_PLAIN和AGG_SORTED模式://per-group指针的groupingset编号数组AggStatePerGroup*pergroups;/*groupingsetindexedarrayofper-group*pointers*///当前组的第一个元组拷贝HeapTuplegrp_firstTuple;/*copyoffirsttupleofcurrentgroup*//*thesefieldsareusedinAGG_HASHEDandAGG_MIXEDmodes:*///---------下面的列用于AGG_HASHED和AGG_MIXED模式://是否已填充hash表?booltable_filled;/*hashtablefilledyet?*///hash桶数?intnum_hashes;//相应的哈希表数据数组AggStatePerHashperhash;/*arrayofper-hashtabledata*///per-group指针的groupingset编号数组AggStatePerGroup*hash_pergroup;/*groupingsetindexedarrayof*per-grouppointers*//*supportforevaluationofagginputexpressions:*///----------agg输入表达式解析支持#defineFIELDNO_AGGSTATE_ALL_PERGROUPS34//首先是->pergroups,然后是hash_pergroupAggStatePerGroup*all_pergroups;/*arrayoffirst->pergroups,than*->hash_pergroup*///投影实现机制ProjectionInfo*combinedproj;/*projectionmachinery*/}AggState;/*PrimitiveoptionssupportedbynodeAgg.c:*///nodeag.c支持的基本选项#defineAGGSPLITOP_COMBINE0x01/*substitutecombinefnfortransfn*/#defineAGGSPLITOP_SKIPFINAL0x02/*skipfinalfn,returnstateas-is*/#defineAGGSPLITOP_SERIALIZE0x04/*applyserializefntooutput*/#defineAGGSPLITOP_DESERIALIZE0x08/*applydeserializefntoinput*//*Supportedoperatingmodes(i.e.,usefulcombinationsoftheseoptions):*///支持的操作模式typedefenumAggSplit{/*Basic,non-splitaggregation:*///基本:非split聚合AGGSPLIT_SIMPLE=0,/*Initialphaseofpartialaggregation,withserialization:*///部分聚合的初始步骤,序列化AGGSPLIT_INITIAL_SERIAL=AGGSPLITOP_SKIPFINAL|AGGSPLITOP_SERIALIZE,/*Finalphaseofpartialaggregation,withdeserialization:*///部分聚合的最终步骤,反序列化AGGSPLIT_FINAL_DESERIAL=AGGSPLITOP_COMBINE|AGGSPLITOP_DESERIALIZE}AggSplit;/*TestwhetheranAggSplitvalueselectseachprimitiveoption:*///测试AggSplit选择了哪些基本选项#defineDO_AGGSPLIT_COMBINE(as)(((as)&AGGSPLITOP_COMBINE)!=0)#defineDO_AGGSPLIT_SKIPFINAL(as)(((as)&AGGSPLITOP_SKIPFINAL)!=0)#defineDO_AGGSPLIT_SERIALIZE(as)(((as)&AGGSPLITOP_SERIALIZE)!=0)#defineDO_AGGSPLIT_DESERIALIZE(as)(((as)&AGGSPLITOP_DESERIALIZE)!=0)二、源码解读

ExecAgg函数,首先获取AggState运行状态,然后根据各个阶段(aggstate->phase)的策略(aggstrategy)执行相应的逻辑.如使用Hash聚合,则只有一个节点,但有两个策略,首先是AGG_HASHED,该策略对输入元组按照分组列值进行Hash,同时执行转换函数计算中间结果值,缓存到哈希表中;然后执行AGG_MIXED策略,从Hash表中获取结果元组并返回结果元组(每一result为一个结果行).

/**ExecAgg-**ExecAggreceivestuplesfromitsoutersubplanandaggregatesover*theappropriateattributeforeachaggregatefunctionuse(Aggref*node)appearinginthetargetlistorqualofthenode.Thenumber*oftuplestoaggregateoverdependsonwhethergroupedorplain*aggregationisselected.Ingroupedaggregation,weproducearesult*rowforeachgroup;inplainaggregationthere'sasingleresultrow*forthewholequery.Ineithercase,thevalueofeachaggregateis*storedintheexpressioncontexttobeusedwhenExecProjectevaluates*theresulttuple.*ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合.*需要聚合的元组数量依赖于是否已分组或者选择普通聚合.*在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行.*不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组)*/staticTupleTableSlot*ExecAgg(PlanState*pstate){AggState*node=castNode(AggState,pstate);TupleTableSlot*result=NULL;CHECK_FOR_INTERRUPTS();if(!node->agg_done){/*Dispatchbasedonstrategy*///基于策略进行分发switch(node->phase->aggstrategy){caseAGG_HASHED:if(!node->table_filled)agg_fill_hash_table(node);/*FALLTHROUGH*///填充后,执行MIXEDcaseAGG_MIXED:result=agg_retrieve_hash_table(node);break;caseAGG_PLAIN:caseAGG_SORTED:result=agg_retrieve_direct(node);break;}if(!TupIsNull(result))returnresult;}returnNULL;}

agg_fill_hash_table
读取输入并构建哈希表.
lookup_hash_entries函数根据输入元组构建分组列哈希表(搜索或新建条目),advance_aggregates调用转换函数计算中间结果并缓存.

/**ExecAggforhashedcase:readinputandbuildhashtable*读取输入并构建哈希表*/staticvoidagg_fill_hash_table(AggState*aggstate){TupleTableSlot*outerslot;ExprContext*tmpcontext=aggstate->tmpcontext;/**Processeachouter-plantuple,andthenfetchthenextone,untilwe*exhausttheouterplan.*处理每一个outer-plan返回的元组,然后继续提取下一个,直至完成所有元组的处理.*/for(;;){//---------循环直至完成所有元组的处理//提取输入的元组outerslot=fetch_input_tuple(aggstate);if(TupIsNull(outerslot))break;//已完成处理,退出循环/*setupforlookup_hash_entriesandadvance_aggregates*///配置lookup_hash_entries和advance_aggregates函数//把元组放在临时内存上下文中tmpcontext->ecxt_outertuple=outerslot;/*Findorbuildhashtableentries*///检索或构建哈希表条目lookup_hash_entries(aggstate);/*Advancetheaggregates(orcombinefunctions)*///推动聚合(或组合函数)advance_aggregates(aggstate);/**Resetper-input-tuplecontextaftereachtuple,butnotethatthe*hashlookupsdothistoo*重置per-input-tuple内存上下文,但需要注意hash检索也会做这个事情*/ResetExprContext(aggstate->tmpcontext);}aggstate->table_filled=true;/*Initializetowalkthefirsthashtable*///初始化用于遍历第一个哈希表select_current_set(aggstate,0,true);ResetTupleHashIterator(aggstate->perhash[0].hashtable,&aggstate->perhash[0].hashiter);}

agg_retrieve_hash_table
agg_retrieve_hash_table函数在hash表中检索结果,执行投影等相关操作.

/**ExecAggforhashedcase:retrievinggroupsfromhashtable*ExecAgg(Hash实现版本):在hash表中检索组*/staticTupleTableSlot*agg_retrieve_hash_table(AggState*aggstate){ExprContext*econtext;AggStatePerAggperagg;AggStatePerGrouppergroup;TupleHashEntryData*entry;TupleTableSlot*firstSlot;TupleTableSlot*result;AggStatePerHashperhash;/**getstateinfofromnode.*从node节点中获取状态信息.**econtextistheper-output-tupleexpressioncontext.*econtext是per-output-tuple表达式上下文.*/econtext=aggstate->ss.ps.ps_ExprContext;peragg=aggstate->peragg;firstSlot=aggstate->ss.ss_ScanTupleSlot;/**Notethatperhash(andthereforeanythingaccessedthroughit)can*changeinsidetheloop,aswechangebetweengroupingsets.*注意,在分组之间切换时,perhash在循环中可能会改变*/perhash=&aggstate->perhash[aggstate->current_set];/**Weloopretrievinggroupsuntilwefindonesatisfying*aggstate->ss.ps.qual*循环检索groups,直至检索到一个符合aggstate->ss.ps.qual条件的组.*/while(!aggstate->agg_done){//-------------选好//获取SlotTupleTableSlot*hashslot=perhash->hashslot;inti;//检查中断CHECK_FOR_INTERRUPTS();/**Findthenextentryinthehashtable*检索hash表的下一个条目*/entry=ScanTupleHashTable(perhash->hashtable,&perhash->hashiter);if(entry==NULL){//条目为NULL,切换到下一个setintnextset=aggstate->current_set+1;if(nextset<aggstate->num_hashes){/**Switchtonextgroupingset,reinitialize,andrestartthe*loop.*切换至下一个groupingset,重新初始化并重启循环*/select_current_set(aggstate,nextset,true);perhash=&aggstate->perhash[aggstate->current_set];ResetTupleHashIterator(perhash->hashtable,&perhash->hashiter);continue;}else{/*Nomorehashtables,sodone*///已完成检索,设置标记,退出aggstate->agg_done=true;returnNULL;}}/**Cleartheper-output-tuplecontextforeachgroup*为每一个group清除per-output-tuple上下文**Weintentionallydon'tuseReScanExprContexthere;ifanyaggshave*registeredshutdowncallbacks,theymustn'tbecalledyet,sincewe*mightnotbedonewiththatagg.*在这里不会用到ReScanExprContext,如果存在aggs注册了shutdown回调,*那应该还没有调用,因为我们可能还没有完成该agg的处理.*/ResetExprContext(econtext);/**Transformrepresentativetuplebackintoonewiththeright*columns.*将典型元组转回具有正确列的元组.*/ExecStoreMinimalTuple(entry->firstTuple,hashslot,false);slot_getallattrs(hashslot);//清理元组//重置firstSlotExecClearTuple(firstSlot);memset(firstSlot->tts_isnull,true,firstSlot->tts_tupleDescriptor->natts*sizeof(bool));for(i=0;i<perhash->numhashGrpCols;i++){//重置firstSlotintvarNumber=perhash->hashGrpColIdxInput[i]-1;firstSlot->tts_values[varNumber]=hashslot->tts_values[i];firstSlot->tts_isnull[varNumber]=hashslot->tts_isnull[i];}ExecStoreVirtualTuple(firstSlot);pergroup=(AggStatePerGroup)entry->additional;/**Usetherepresentativeinputtupleforanyreferencesto*non-aggregatedinputcolumnsinthequalandtlist.*为qual和tlist中的非聚合输入列依赖使用典型输入元组*/econtext->ecxt_outertuple=firstSlot;//准备投影slotprepare_projection_slot(aggstate,econtext->ecxt_outertuple,aggstate->current_set);//最终的聚合操作finalize_aggregates(aggstate,peragg,pergroup);//投影result=project_aggregates(aggstate);if(result)returnresult;}/*Nomoregroups*///没有更多的groups了,返回NULLreturnNULL;}

“怎么使用PostgreSQL ExecAgg函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!