广告
返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(9)- 插入数据#8(ExecutorRun和standard...
  • 113
分享到

PostgreSQL 源码解读(9)- 插入数据#8(ExecutorRun和standard...

2024-04-02 19:04:59 113人浏览 八月长安
摘要

本文简单介绍了PG插入数据部分的源码,主要内容包括ExecutorRun函数和standard_ExecutorRun函数的实现逻辑,这两个函数均位于execMain.c文件中。

本文简单介绍了PG插入数据部分的源码,主要内容包括ExecutorRun函数和standard_ExecutorRun函数的实现逻辑,这两个函数均位于execMain.c文件中。
值得一提的是:
1、解读方式:采用自底向上的方式,也就是从调用栈(调用栈请参加第一篇文章)的底层往上逐层解读,建议按此顺序阅读;
2、问题处理:上面几篇解读并不深入,或者说只是浮于表面,但随着调用栈的逐步解读,信息会慢慢浮现,需要耐心和坚持

一、基础信息

ExecutorRun、standard_ExecutorRun函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、QueryDesc

//查询结构体
//结构体中包含了执行查询所需要的所有信息
 
 typedef struct QueryDesc
 {
     
     CmdType     operation;      
     PlannedStmt *plannedstmt;   
     const char *sourceText;     
     Snapshot    snapshot;       
     Snapshot    crosscheck_snapshot;    
     DestReceiver *dest;         
     ParamListInfo params;       
     QueryEnvironment *queryEnv; 
     int         instrument_options; 
 
     
     TupleDesc   tupDesc;        
     EState     *estate;         
     PlanState  *planstate;      
 
     
     bool        already_executed;   
 
     
     struct Instrumentation *totaltime;  
 } QueryDesc;
 
//快照指针
typedef struct SnapshotData *Snapshot;
 
 #define InvalidSnapshot     ((Snapshot) NULL)
 
 
 typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
                                        Snapshot snapshot, Buffer buffer);
 
 
 typedef struct SnapshotData
 {
     SnapshotSatisfiesFunc satisfies;    
 
     
     TransactionId xmin;         
     TransactionId xmax;         
 
     
     TransactionId *xip;
     uint32      xcnt;           
 
     
     TransactionId *subxip;
     int32       subxcnt;        
     bool        suboverflowed;  
 
     bool        takenDuringRecovery;    
     bool        copied;         
 
     CommandId   curcid;         
 
     
     uint32      speculativeToken;
 
     
     uint32      active_count;   
     uint32      regd_count;     
     pairingheap_node ph_node;   
 
     TimestampTz whenTaken;      
     XLogRecPtr  lsn;            
 } SnapshotData;//存储快照的数据结构


 
//已Planned的Statement
//也就是说已生成了执行计划的语句
 typedef struct PlannedStmt
 {
     NodeTag     type;
 
     CmdType     commandType;    
 
     uint64      queryId;        
 
     bool        hasReturning;   
 
     bool        hasModifyinGCTE;    
 
     bool        canSetTag;      
 
     bool        transientPlan;  
 
     bool        dependsOnRole;  
 
     bool        parallelModeNeeded; 
 
     int         jitFlags;       
 
     struct Plan *planTree;      
 
     List       *rtable;         
 
     
     List       *resultRelations;    
 
     
     List       *nonleafResultRelations;
 
     
     List       *rootResultRelations;
 
     List       *subplans;       
 
     Bitmapset  *rewindPlanIDs;  
 
     List       *rowMarks;       
 
     List       *relationOids;   
 
     List       *invalItems;     
 
     List       *paramExecTypes; 
 
     Node       *utilityStmt;    
 
     
     int         stmt_location;  
     int         stmt_len;       
 } PlannedStmt;
 
 
//参数列表信息
 typedef struct ParamListInfoData
 {
     ParamFetchHook paramFetch;  
     void       *paramFetchArg;
     ParamCompileHook paramCompile;  
     void       *paramCompileArg;
     ParserSetupHook parserSetup;    
     void       *parserSetupArg;
     int         numParams;      
 
     
     ParamExternData params[FLEXIBLE_ARRAY_MEMBER];
 }           ParamListInfoData;
 
typedef struct ParamListInfoData *ParamListInfo;

//查询环境,使用List存储相关信息

 struct QueryEnvironment
 {
     List       *namedRelList;
 };
 
 
//TODO
 typedef struct Instrumentation
 {
     
     bool        need_timer;     
     bool        need_bufusage;  
     
     bool        running;        
     instr_time  starttime;      
     instr_time  counter;        
     double      firsttuple;     
     double      tuplecount;     
     BufferUsage bufusage_start; 
     
     double      startup;        
     double      total;          
     double      ntuples;        
     double      ntuples2;       
     double      nloops;         
     double      nfiltered1;     
     double      nfiltered2;     
     BufferUsage bufusage;       
 } Instrumentation;
 

依赖的函数
1、InstrStartNode

 
 void
 InstrStartNode(Instrumentation *instr)
 {
     if (instr->need_timer)
     {
         if (INSTR_TIME_IS_ZERO(instr->starttime))
             INSTR_TIME_SET_CURRENT(instr->starttime);
         else
             elog(ERROR, "InstrStartNode called twice in a row");
     }
 
     
     if (instr->need_bufusage)
         instr->bufusage_start = pgBufferUsage;
 }
 

2、ScanDirectionIsNoMovement

//简单判断
 
 #define ScanDirectionIsNoMovement(direction) \
     ((bool) ((direction) == NoMovementScanDirection))

3、ExecutePlan

//上一节已解读

4、InstrStopNode
//TODO Instrumentation 的理解

 
 void
 InstrStopNode(Instrumentation *instr, double nTuples)
 {
     instr_time  endtime;
 
     
     instr->tuplecount += nTuples;
 
     
     if (instr->need_timer)
     {
         if (INSTR_TIME_IS_ZERO(instr->starttime))
             elog(ERROR, "InstrStopNode called without start");
 
         INSTR_TIME_SET_CURRENT(endtime);
         INSTR_TIME_ACCUM_DIFF(instr->counter, endtime, instr->starttime);
 
         INSTR_TIME_SET_ZERO(instr->starttime);
     }
 
     
     if (instr->need_bufusage)
         BufferUsageAccumDiff(&instr->bufusage,
                              &pgBufferUsage, &instr->bufusage_start);
 
     
     if (!instr->running)
     {
         instr->running = true;
         instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
     }
 }

5、MemoryContextSwitchTo

 
 #ifndef FRONTEND
 static inline MemoryContext
 MemoryContextSwitchTo(MemoryContext context)
 {
     MemoryContext old = CurrentMemoryContext;
 
     CurrentMemoryContext = context;
     return old;
 }
 #endif                          

二、源码解读



void
ExecutorRun(QueryDesc *queryDesc,
            ScanDirection direction, uint64 count,
            bool execute_once)
{
    if (ExecutorRun_hook)//如果有钩子函数,则执行钩子函数
        (*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
    else//否则执行标准函数
        standard_ExecutorRun(queryDesc, direction, count, execute_once);
}

//标准函数

void
standard_ExecutorRun(QueryDesc *queryDesc,
                     ScanDirection direction, uint64 count, bool execute_once)
{
    EState     *estate;//执行器状态信息
    CmdType     operation;//命令类型,这里是INSERT
    DestReceiver *dest;//目标接收器
    bool        sendTuples;//是否需要传输Tuples
    MemoryContext oldcontext;//原内存上下文(PG自己的内存管理器)

    
    Assert(queryDesc != NULL);

    estate = queryDesc->estate;//获取执行器状态

    Assert(estate != NULL);
    Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));

    
    oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);//切换至当前查询上下文,切换前保存原上下文

    
    if (queryDesc->totaltime)//需要计时?如oraclesqlplus中设置set timing on的计时
        InstrStartNode(queryDesc->totaltime);//

    
    operation = queryDesc->operation;//操作类型
    dest = queryDesc->dest;//目标端

    
    estate->es_processed = 0;//进度
    estate->es_lastoid = InvalidOid;//最后一个Oid

    sendTuples = (operation == CMD_SELECT ||
                  queryDesc->plannedstmt->hasReturning);//查询语句或者需要返回值的才需要传输Tuples

    if (sendTuples)
        dest->rStartup(dest, operation, queryDesc->tupDesc);//启动目标端的接收器

    
    if (!ScanDirectionIsNoMovement(direction))//需要扫描
    {
        if (execute_once && queryDesc->already_executed)
            elog(ERROR, "can't re-execute query flagged for single execution");
        queryDesc->already_executed = true;

        ExecutePlan(estate,
                    queryDesc->planstate,
                    queryDesc->plannedstmt->parallelModeNeeded,
                    operation,
                    sendTuples,
                    count,
                    direction,
                    dest,
                    execute_once);//执行
    }

    
    if (sendTuples)
        dest->rShutdown(dest);//关闭目标端的接收器

    if (queryDesc->totaltime)
        InstrStopNode(queryDesc->totaltime, estate->es_processed);//完成计时

    MemoryContextSwitchTo(oldcontext);//执行完毕,切换回原内存上下文
}

三、跟踪分析

插入测试数据:

testdb=# -- #8 ExecutorRun&standard_ExecutorRun
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
 pg_backend_pid 
----------------
           1529
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');
(挂起)

启动gdb,跟踪调试:

[root@localhost ~]# gdb -p 3294
GNU gdb (GDB) Red Hat Enterprise linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b standard_ExecutorRun
Breakpoint 1 at 0x690d09: file execMain.c, line 322.
(gdb) c
Continuing.

Breakpoint 1, standard_ExecutorRun (queryDesc=0x2c2d4e0, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:322
322     estate = queryDesc->estate;
#查看参数
#1、queryDesc
(gdb) p *queryDesc
$1 = {operation = CMD_INSERT, plannedstmt = 0x2cc1488, 
  sourceText = 0x2c09ef0 "insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');", snapshot = 0x2c866e0, 
  crosscheck_snapshot = 0x0, dest = 0x2cc15e8, params = 0x0, queryEnv = 0x0, instrument_options = 0, tupDesc = 0x2c309d0, estate = 0x2c2f900, planstate = 0x2c2fc50, already_executed = false, 
  totaltime = 0x0}
(gdb) p *(queryDesc->plannedstmt)
$2 = {type = T_PlannedStmt, commandType = CMD_INSERT, queryId = 0, hasReturning = false, hasModifyingCTE = false, canSetTag = true, transientPlan = false, dependsOnRole = false, 
  parallelModeNeeded = false, jitFlags = 0, planTree = 0x2cc10f8, rtable = 0x2cc13b8, resultRelations = 0x2cc1458, nonleafResultRelations = 0x0, rootResultRelations = 0x0, subplans = 0x0, 
  rewindPlanIDs = 0x0, rowMarks = 0x0, relationOids = 0x2cc1408, invalItems = 0x0, paramExecTypes = 0x2c2f590, utilityStmt = 0x0, stmt_location = 0, stmt_len = 136}
(gdb) p *(queryDesc->snapshot)
$3 = {satisfies = 0x9f73fc <HeapTupleSatisfiesmvcC>, xmin = 1612874, xmax = 1612874, xip = 0x0, xcnt = 0, subxip = 0x0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = true, 
  curcid = 0, speculativeToken = 0, active_count = 1, regd_count = 2, ph_node = {first_child = 0x0, next_sibling = 0x0, prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb) p *(queryDesc->dest)
$4 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
(gdb) p *(queryDesc->tupDesc)
$5 = {natts = 0, tdtypeid = 2249, tdtypmod = -1, tdhasoid = false, tdrefcount = -1, constr = 0x0, attrs = 0x2c309f0}
(gdb) p *(queryDesc->estate)
$6 = {type = T_EState, es_direction = ForwardScanDirection, es_snapshot = 0x2c866e0, es_crosscheck_snapshot = 0x0, es_range_table = 0x2cc13b8, es_plannedstmt = 0x2cc1488, 
  es_sourceText = 0x2c09ef0 "insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');", es_junkFilter = 0x0, 
  es_output_cid = 0, es_result_relations = 0x2c2fb40, es_num_result_relations = 1, es_result_relation_info = 0x0, es_root_result_relations = 0x0, es_num_root_result_relations = 0, 
  es_tuple_routing_result_relations = 0x0, es_trig_target_relations = 0x0, es_trig_tuple_slot = 0x2c30ab0, es_trig_oldtup_slot = 0x0, es_trig_newtup_slot = 0x0, es_param_list_info = 0x0, 
  es_param_exec_vals = 0x2c2fb10, es_queryEnv = 0x0, es_query_cxt = 0x2c2f7f0, es_tupleTable = 0x2c30500, es_rowMarks = 0x0, es_processed = 0, es_lastoid = 0, es_top_eflags = 0, es_instrument = 0, 
  es_finished = false, es_exprcontexts = 0x2c2feb0, es_subplanstates = 0x0, es_auxmodifytables = 0x0, es_per_tuple_exprcontext = 0x0, es_epQtuple = 0x0, es_epqTupleSet = 0x0, es_epqScanDone = 0x0, 
  es_use_parallel_mode = false, es_query_dsa = 0x0, es_jit_flags = 0, es_jit = 0x0}
(gdb) p *(queryDesc->planstate)
$7 = {type = T_ModifyTableState, plan = 0x2cc10f8, state = 0x2c2f900, ExecProcNode = 0x69a78b <ExecProcNodeFirst>, ExecProcNodeReal = 0x6c2485 <ExecModifyTable>, instrument = 0x0, 
  worker_instrument = 0x0, qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, ps_ResultTupleSlot = 0x2c30a00, ps_ExprContext = 0x0, ps_ProjInfo = 0x0, 
  scandesc = 0x0}
#2、direction
(gdb) p direction
$8 = ForwardScanDirection
#3、count
(gdb) p count
$9 = 0
#4、execute_once
(gdb) p execute_once
$10 = true
#单步调试执行
(gdb) next
330     oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
(gdb) 
333     if (queryDesc->totaltime)
#MemoryContext是PG中很重要的内存管理数据结构,需深入理解
(gdb) p *oldcontext
$11 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c6f380, firstchild = 0x2c2f7f0, prevchild = 0x0, nextchild = 0x0, 
  name = 0xb8d2f1 "PortalContext", ident = 0x2c72e98 "", reset_cbs = 0x0}
(gdb) p *(estate->es_query_cxt)
$12 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c2d3D0, firstchild = 0x2cbce60, prevchild = 0x0, nextchild = 0x0, 
  name = 0xb1a840 "ExecutorState", ident = 0x0, reset_cbs = 0x0}
(gdb) next
339     operation = queryDesc->operation;
(gdb) 
340     dest = queryDesc->dest;
(gdb) 
345     estate->es_processed = 0;
(gdb) 
346     estate->es_lastoid = InvalidOid;
(gdb) 
348     sendTuples = (operation == CMD_SELECT ||
(gdb) 
349                   queryDesc->plannedstmt->hasReturning);
(gdb) 
348     sendTuples = (operation == CMD_SELECT ||
(gdb) 
351     if (sendTuples)
(gdb) 
357     if (!ScanDirectionIsNoMovement(direction))
(gdb) 
359         if (execute_once && queryDesc->already_executed)
(gdb) 
361         queryDesc->already_executed = true;
(gdb) 
363         ExecutePlan(estate,
(gdb) 
365                     queryDesc->plannedstmt->parallelModeNeeded,
(gdb) 
363         ExecutePlan(estate,
(gdb) 
377     if (sendTuples)
(gdb) 
380     if (queryDesc->totaltime)
(gdb) 
383     MemoryContextSwitchTo(oldcontext);
(gdb) 
384 }
(gdb) 
ExecutorRun (queryDesc=0x2c2d4e0, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:307
307 }
(gdb) 
#DONE!

四、小结

1、PG的扩展性:PG提供了钩子函数,可以对ExecutorRun进行Hack;
2、重要的数据结构:MemoryContext,内存上下文,需深入理解。

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(9)- 插入数据#8(ExecutorRun和standard...

本文链接: https://www.lsjlt.com/news/45370.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作