本篇内容介绍了“怎么使用postgresql ExecAgg函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
本篇内容介绍了“怎么使用postgresql ExecAgg函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体
//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
//第一个字段是NodeTag(继承自ScanState)
ScanState ss;
//targetlist和quals中所有的Aggref
List *aggs;
//链表的大小(可以为0)
int numaggs;
//pertrans条目大小
int numtrans;
//Agg策略模式
AggStrategy aggstrategy;
//agg-splitting模式,参见nodes.h
AggSplit aggsplit;
//指向当前步骤数据的指针
AggStatePerPhase phase;
//步骤数(包括0)
int numphases;
//当前步骤
int current_phase;
//per-Aggref信息
AggStatePerAgg peragg;
//per-Trans状态信息
AggStatePerTrans pertrans;
//长生命周期数据的ExprContexts(hashtable)
ExprContext *hashcontext;
////长生命周期数据的ExprContexts(每一个GS使用)
ExprContext **agGContexts;
//输入表达式的ExprContext
ExprContext *tmpcontext;
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
//当前活跃的aggcontext
ExprContext *curaggcontext;
//当前活跃的aggregate(如存在)
AggStatePerAgg curperagg;
#define FIELDNO_AGGSTATE_CURPERTRANS 16
//当前活跃的trans state
AggStatePerTrans curpertrans;
//输入结束?
bool input_done;
//Agg扫描结束?
bool agg_done;
//最后一个grouping set
int projected_set;
#define FIELDNO_AGGSTATE_CURRENT_SET 20
//将要解析的当前grouping set
int current_set;
//当前投影操作的分组列
Bitmapset *grouped_cols;
//倒序的分组列链表
List *all_grouped_cols;
//-------- 下面的列用于grouping set步骤数据
//所有步骤中最大的sets大小
int maxsets;
//所有步骤的数组
AggStatePerPhase phases;
//对于phases > 1,已排序的输入信息
Tuplesortstate *sort_in;
//对于下一个步骤,输入已拷贝
Tuplesortstate *sort_out;
//排序结果的slot
TupleTableSlot *sort_slot;
//------- 下面的列用于AGG_PLaiN和AGG_SORTED模式:
//per-group指针的grouping set编号数组
AggStatePerGroup *pergroups;
//当前组的第一个元组拷贝
HeapTuple grp_firstTuple;
//--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
//是否已填充hash表?
bool table_filled;
//hash桶数?
int num_hashes;
//相应的哈希表数据数组
AggStatePerHash perhash;
//per-group指针的grouping set编号数组
AggStatePerGroup *hash_pergroup;
//---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
//首先是->pergroups,然后是hash_pergroup
AggStatePerGroup *all_pergroups;
//投影实现机制
ProjectionInfo *combinedproj;
} AggState;
//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE 0x01
#define AGGSPLITOP_SKIPFINAL 0x02
#define AGGSPLITOP_SERIALIZE 0x04
#define AGGSPLITOP_DESERIALIZE 0x08
//支持的操作模式
typedef enum AggSplit
{
//基本 : 非split聚合
AGGSPLIT_SIMPLE = 0,
//部分聚合的初始步骤,序列化
AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
//部分聚合的最终步骤,反序列化
AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
ExecAgg函数,首先获取AggState运行状态,然后根据各个阶段(aggstate->phase)的策略(aggstrategy)执行相应的逻辑.如使用Hash聚合,则只有一个节点,但有两个策略,首先是AGG_HASHED,该策略对输入元组按照分组列值进行Hash,同时执行转换函数计算中间结果值,缓存到哈希表中;然后执行AGG_MIXED策略,从Hash表中获取结果元组并返回结果元组(每一result为一个结果行).
static TupleTableSlot *
ExecAgg(PlanState *pstate)
{
AggState *node = castNode(AggState, pstate);
TupleTableSlot *result = NULL;
CHECK_FOR_INTERRUPTS();
if (!node->agg_done)
{
//基于策略进行分发
switch (node->phase->aggstrategy)
{
case AGG_HASHED:
if (!node->table_filled)
agg_fill_hash_table(node);
//填充后,执行MIXED
case AGG_MIXED:
result = agg_retrieve_hash_table(node);
break;
case AGG_PLAIN:
case AGG_SORTED:
result = agg_retrieve_direct(node);
break;
}
if (!TupIsNull(result))
return result;
}
return NULL;
}
agg_fill_hash_table
读取输入并构建哈希表.
lookup_hash_entries函数根据输入元组构建分组列哈希表(搜索或新建条目),advance_aggregates调用转换函数计算中间结果并缓存.
static void
agg_fill_hash_table(AggState *aggstate)
{
TupleTableSlot *outerslot;
ExprContext *tmpcontext = aggstate->tmpcontext;
for (;;)
{
//--------- 循环直至完成所有元组的处理
//提取输入的元组
outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot))
break;//已完成处理,退出循环
//配置lookup_hash_entries和advance_aggregates函数
//把元组放在临时内存上下文中
tmpcontext->ecxt_outertuple = outerslot;
//检索或构建哈希表条目
lookup_hash_entries(aggstate);
//推动聚合(或组合函数)
advance_aggregates(aggstate);
ResetExprContext(aggstate->tmpcontext);
}
aggstate->table_filled = true;
//初始化用于遍历第一个哈希表
select_current_set(aggstate, 0, true);
ResetTupleHashIterator(aggstate->perhash[0].hashtable,
&aggstate->perhash[0].hashiter);
}
agg_retrieve_hash_table
agg_retrieve_hash_table函数在hash表中检索结果,执行投影等相关操作.
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
ExprContext *econtext;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
TupleHashEntryData *entry;
TupleTableSlot *firstSlot;
TupleTableSlot *result;
AggStatePerHash perhash;
econtext = aggstate->ss.ps.ps_ExprContext;
peragg = aggstate->peragg;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
perhash = &aggstate->perhash[aggstate->current_set];
while (!aggstate->agg_done)
{
//------------- 选好
//获取Slot
TupleTableSlot *hashslot = perhash->hashslot;
int i;
//检查中断
CHECK_FOR_INTERRUPTS();
entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
if (entry == NULL)
{
//条目为NULL,切换到下一个set
int nextset = aggstate->current_set + 1;
if (nextset < aggstate->num_hashes)
{
select_current_set(aggstate, nextset, true);
perhash = &aggstate->perhash[aggstate->current_set];
ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
continue;
}
else
{
//已完成检索,设置标记,退出
aggstate->agg_done = true;
return NULL;
}
}
ResetExprContext(econtext);
ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
slot_getallattrs(hashslot);
//清理元组
//重置firstSlot
ExecClearTuple(firstSlot);
memset(firstSlot->tts_isnull, true,
firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
for (i = 0; i < perhash->numhashgrpcols; i++)
{
//重置firstSlot
int varNumber = perhash->hashGrpcolIdxInput[i] - 1;
firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
}
ExecStoreVirtualTuple(firstSlot);
pergroup = (AggStatePerGroup) entry->additional;
econtext->ecxt_outertuple = firstSlot;
//准备投影slot
prepare_projection_slot(aggstate,
econtext->ecxt_outertuple,
aggstate->current_set);
//最终的聚合操作
finalize_aggregates(aggstate, peragg, pergroup);
//投影
result = project_aggregates(aggstate);
if (result)
return result;
}
//没有更多的groups了,返回NULL
return NULL;
}
“怎么使用Postgresql ExecAgg函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!
--结束END--
本文标题: 怎么使用PostgreSQL ExecAgg函数
本文链接: https://www.lsjlt.com/news/63988.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-14
2024-05-14
2024-05-14
2024-05-14
2024-05-14
2024-05-13
2024-05-13
2024-05-13
2024-05-13
2024-05-12
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0