iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么
  • 754
分享到

PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么

2024-04-02 19:04:59 754人浏览 薄情痞子
摘要

本篇内容介绍了“postgresql中PortalRun->PortalRunSelect函数的实现逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带

本篇内容介绍了“postgresql中PortalRun->PortalRunSelect函数的实现逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

Portal
对于Portals(客户端请求),有几种执行策略,具体取决于要执行什么查询。
(注意:无论什么情况下,一个Portal只执行一个source-sql查询,因此从用户的角度来看只产生一个结果。


typedef enum PortalStrategy
{
    PORTAL_ONE_SELECT,
    PORTAL_ONE_RETURNING,
    PORTAL_ONE_MOD_WITH,
    PORTAL_UTIL_SELECT,
    PORTAL_MULTI_QUERY
} PortalStrategy;


typedef enum PortalStatus
{
    PORTAL_NEW,                 
    PORTAL_DEFINED,             
    PORTAL_READY,               
    PORTAL_ACTIVE,              
    PORTAL_DONE,                
    PORTAL_FaiLED               
} PortalStatus;

typedef struct PortalData *Portal;//结构体指针

typedef struct PortalData
{
    
    const char *name;           
    const char *prepStmtName;   
    MemoryContext portalContext;    
    ResourceOwner resowner;     
    void        (*cleanup) (Portal portal); 

    
    SubTransactionId createSubid;   
    SubTransactionId activeSubid;   

    
    //portal将会执行的查询
    const char *sourceText;     
    const char *commandTag;     
    List       *stmts;          
    CachedPlan *cplan;          

    ParamListInfo portalParams; 
    QueryEnvironment *queryEnv; 

    
    PortalStrategy strategy;    
    int         cursorOptions;  
    bool        run_once;       

    
    PortalStatus status;        
    bool        portalPinned;   
    bool        autoHeld;       

    
    //如不为NULL,执行器处于活动状态
    QueryDesc  *queryDesc;      

    
    //如Portal需要返回元组,这是元组的描述
    TupleDesc   tupDesc;        
    
    //列信息的格式码
    int16      *fORMats;        

    
    Tuplestorestate *holdStore; 
    MemoryContext holdContext;  

    
    Snapshot    holdSnapshot;   

    
    bool        atStart;//处于开始位置?
    bool        atEnd;//处于结束位置?
    uint64      portalPos;//实际行号

    
    //用于表示的数据,主要由pg_cursors系统视图使用
    TimestampTz creation_time;  
    bool        visible;        
}           PortalData;


#define PortalIsValid(p) PointerIsValid(p)

QueryDesc
QueryDesc封装了执行器执行查询所需的所有内容。


typedef struct QueryDesc
{
    
    //以下变量由CreateQueryDesc函数设置
    CmdType     operation;      
    PlannedStmt *plannedstmt;   
    const char *sourceText;     
    Snapshot    snapshot;       
    Snapshot    crosscheck_snapshot;    
    DestReceiver *dest;         
    ParamListInfo params;       
    QueryEnvironment *queryEnv; 
    int         instrument_options; 

    
    //以下变量由ExecutorStart函数设置
    TupleDesc   tupDesc;        
    EState     *estate;         
    PlanState  *planstate;      

    
    //以下变量由ExecutorRun设置
    bool        already_executed;   

    
    //内核设置为NULL,可由插件修改
    struct Instrumentation *totaltime;  
} QueryDesc;

二、源码解读

PortalRun->PortalRunSelect函数执行以PORTAL_ONE_SELECT模式运行的SQL.


static uint64
PortalRunSelect(Portal portal,
                bool forward,
                long count,
                DestReceiver *dest)
{
    QueryDesc  *queryDesc;
    ScanDirection direction;
    uint64      nprocessed;

    
    queryDesc = portal->queryDesc;

    
    //确保queryDescbuweiNULL或者持有提取的数据
    Assert(queryDesc || portal->holdStore);

    
    if (queryDesc)
        queryDesc->dest = dest;//设置dest

    
    if (forward)//前向
    {
        if (portal->atEnd || count <= 0)
        {
            //已到末尾或者行计数小于等于0
            direction = NoMovementScanDirection;
            count = 0;          
        }
        else
            direction = ForwardScanDirection;//前向扫描

        
        //在executor中,count=0意味着提取所有行
        if (count == FETCH_ALL)
            count = 0;

        if (portal->holdStore)
            //持有提取后的数据游标
            nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
        else
        {
            //没有持有游标(数据)
            PushActiveSnapshot(queryDesc->snapshot);//快照入栈
            ExecutorRun(queryDesc, direction, (uint64) count,
                        portal->run_once);//开始执行
            nprocessed = queryDesc->estate->es_processed;//结果行数
            PopActiveSnapshot();//快照出栈
        }

        if (!ScanDirectionIsNoMovement(direction))//扫描方向可移动
        {
            if (nprocessed > 0)//扫描行数>0
                portal->atStart = false;    
            if (count == 0 || nprocessed < (uint64) count)
                //count为0或者行数小于传入的计数器
                portal->atEnd = true;   
            portal->portalPos += nprocessed;//位置移动(+处理行数)
        }
    }
    else//非前向(后向)
    {
        if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)//如游标不可移动,报错
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     errmsg("cursor can only scan forward"),
                     errhint("Declare it with SCROLL option to enable backward scan.")));

        if (portal->atStart || count <= 0)
        {
            //处于开始或者count小于等于0
            direction = NoMovementScanDirection;
            count = 0;          
        }
        else
            //往后扫描
            direction = BackwardScanDirection;

        
        //参见forward=T的注释
        if (count == FETCH_ALL)
            count = 0;

        if (portal->holdStore)
            nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
        else
        {
            PushActiveSnapshot(queryDesc->snapshot);
            ExecutorRun(queryDesc, direction, (uint64) count,
                        portal->run_once);
            nprocessed = queryDesc->estate->es_processed;
            PopActiveSnapshot();
        }

        if (!ScanDirectionIsNoMovement(direction))
        {
            if (nprocessed > 0 && portal->atEnd)
            {
                portal->atEnd = false;  
                portal->portalPos++;    
            }
            if (count == 0 || nprocessed < (uint64) count)
            {
                portal->atStart = true; 
                portal->portalPos = 0;
            }
            else
            {
                portal->portalPos -= nprocessed;
            }
        }
    }

    return nprocessed;
}



static uint64
RunFromStore(Portal portal, ScanDirection direction, uint64 count,
             DestReceiver *dest)
{
    uint64      current_tuple_count = 0;
    TupleTableSlot *slot;//元组表slot

    slot = MakeSingleTupleTableSlot(portal->tupDesc);

    dest->rStartup(dest, CMD_SELECT, portal->tupDesc);//目标启动

    if (ScanDirectionIsNoMovement(direction))//无法移动
    {
        
        //不需要做任何事情
    }
    else
    {
        bool        forward = ScanDirectionIsForward(direction);//是否前向扫描

        for (;;)//循环
        {
            MemoryContext oldcontext;//内存上下文
            bool        ok;

            oldcontext = MemoryContextSwitchTo(portal->holdContext);//切换至相应的内存上下文

            ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
                                         slot);//获取元组

            MemoryContextSwitchTo(oldcontext);//切换回原上下文

            if (!ok)
                break;//如出错,则跳出循环

            
            if (!dest->receiveSlot(slot, dest))
                break;

            ExecClearTuple(slot);//执行清理

            
            current_tuple_count++;
            if (count && count == current_tuple_count)
                break;
        }
    }

    dest->rShutdown(dest);//关闭目标端

    ExecDropSingleTupleTableSlot(slot);//清除slot

    return current_tuple_count;//返回行数
}
 


void
ExecutorRun(QueryDesc *queryDesc,
            ScanDirection direction, uint64 count,
            bool execute_once)
{
    if (ExecutorRun_hook)
        (*ExecutorRun_hook) (queryDesc, direction, count, execute_once);//钩子函数
    else
        standard_ExecutorRun(queryDesc, direction, count, execute_once);//标准函数
}

void
standard_ExecutorRun(QueryDesc *queryDesc,
                     ScanDirection direction, uint64 count, bool execute_once)
{
    EState     *estate;//全局执行状态
    CmdType     operation;//命令类型
    DestReceiver *dest;//接收器
    bool        sendTuples;//是否需要传输元组
    MemoryContext oldcontext;//内存上下文

    
    Assert(queryDesc != NULL);//校验queryDesc不能为NULL

    estate = queryDesc->estate;//获取执行状态

    Assert(estate != NULL);//执行状态不能为NULL
    Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));//eflags标记不能为EXEC_FLAG_EXPLAIN_ONLY

    
    oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

    
    //允许全程instrumentation
    if (queryDesc->totaltime)
        InstrStartnode(queryDesc->totaltime);

    
    operation = queryDesc->operation;
    dest = queryDesc->dest;

    
    estate->es_processed = 0;
    estate->es_lastoid = InvalidOid;

    sendTuples = (operation == CMD_SELECT ||
                  queryDesc->plannedstmt->hasReturning);

    if (sendTuples)//如需发送元组
        dest->rStartup(dest, operation, queryDesc->tupDesc);

    
    if (!ScanDirectionIsNoMovement(direction))//如非ScanDirectionIsNoMovement
    {
        if (execute_once && queryDesc->already_executed)//校验
            elog(ERROR, "can't re-execute query flagged for single execution");
        queryDesc->already_executed = true;//修改标记

        ExecutePlan(estate,
                    queryDesc->planstate,
                    queryDesc->plannedstmt->parallelModeNeeded,
                    operation,
                    sendTuples,
                    count,
                    direction,
                    dest,
                    execute_once);//执行Plan
    }

    
    if (sendTuples)
        dest->rShutdown(dest);

    if (queryDesc->totaltime)//收集时间
        InstrStopNode(queryDesc->totaltime, estate->es_processed);

    MemoryContextSwitchTo(oldcontext);//切换内存上下文
}

三、跟踪分析

测试脚本如下

testdb=# explain select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Sort  (cost=20070.93..20320.93 rows=100000 width=47)
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3754.00..8689.61 rows=100000 width=47)
         Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
         ->  Hash Join  (cost=3465.00..8138.00 rows=100000 width=31)
               Hash Cond: ((jf.grbh)::text = (gr.grbh)::text)
               ->  Seq Scan on t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
               ->  Hash  (cost=1726.00..1726.00 rows=100000 width=16)
                     ->  Seq Scan on t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
         ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
               ->  Seq Scan on t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
(11 rows)

启动gdb,设置断点,进入PortalRunSelect

(gdb) b PortalRunSelect
Breakpoint 1 at 0x8cc0e8: file pquery.c, line 888.
(gdb) c
Continuing.

Breakpoint 1, PortalRunSelect (portal=0x1af2468, forward=true, count=9223372036854775807, dest=0x1b74668) at pquery.c:888
warning: Source file is more recent than executable.
888     queryDesc = portal->queryDesc;
(gdb)

查看输入参数portal&dest,forward为T表示前向扫描
portal:未命名的Portal,holdStore为NULL,atStart = true, atEnd = false, portalPos = 0
dest:接收器slot为printtup

(gdb) p *portal
$1 = {name = 0x1af5e90 "", prepStmtName = 0x0, portalContext = 0x1b795d0, resowner = 0x1abde80, 
  cleanup = 0x6711b6 <PortalCleanup>, createSubid = 1, activeSubid = 1, 
  sourceText = 0x1a8ceb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>..., 
  commandTag = 0xc5eed5 "SELECT", stmts = 0x1b74630, cplan = 0x0, portalParams = 0x0, queryEnv = 0x0, 
  strategy = PORTAL_ONE_SELECT, cursorOptions = 4, run_once = true, status = PORTAL_ACTIVE, portalPinned = false, 
  autoHeld = false, queryDesc = 0x1b796e8, tupDesc = 0x1b867d8, formats = 0x1b79780, holdStore = 0x0, holdContext = 0x0, 
  holdSnapshot = 0x0, atStart = true, atEnd = false, portalPos = 0, creation_time = 595566906253867, visible = false}
(gdb) p *dest
$2 = {receiveSlot = 0x48cc00 <printtup>, rStartup = 0x48c5c1 <printtup_startup>, rShutdown = 0x48d02e <printtup_shutdown>, 
  rDestroy = 0x48d0a7 <printtup_destroy>, mydest = DestRemote}

校验并设置dest

(gdb) n
891     Assert(queryDesc || portal->holdStore);
(gdb) 
899     if (queryDesc)
(gdb) 
900         queryDesc->dest = dest;

前向扫描

(gdb) n
913     if (forward)
(gdb) 
915         if (portal->atEnd || count <= 0)

进入ExecutorRun

...
(gdb) 
932             ExecutorRun(queryDesc, direction, (uint64) count,
(gdb) step
ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:304
warning: Source file is more recent than executable.
304     if (ExecutorRun_hook)

进入standard_ExecutorRun

(gdb) n
307         standard_ExecutorRun(queryDesc, direction, count, execute_once);
(gdb) step
standard_ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:321
321     Assert(queryDesc != NULL);

standard_ExecutorRun->校验并切换上下文

321     Assert(queryDesc != NULL);
(gdb) n
323     estate = queryDesc->estate;
(gdb) 
325     Assert(estate != NULL);
(gdb) 
326     Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
(gdb) 
331     oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
(gdb)

standard_ExecutorRun->变量赋值,判断是否需要传输元组

(gdb) 
334     if (queryDesc->totaltime)
(gdb) n
340     operation = queryDesc->operation;
(gdb) 
341     dest = queryDesc->dest;
(gdb) p operation
$3 = CMD_SELECT
(gdb) n
346     estate->es_processed = 0;
(gdb) 
347     estate->es_lastoid = InvalidOid;
(gdb) 
349     sendTuples = (operation == CMD_SELECT ||
(gdb) 
352     if (sendTuples)
(gdb) 
353         dest->rStartup(dest, operation, queryDesc->tupDesc);
(gdb) p sendTuples
$4 = true
(gdb)

standard_ExecutorRun->执行计划(ExecutePlan函数下节介绍)

(gdb) n
358     if (!ScanDirectionIsNoMovement(direction))
(gdb) 
360         if (execute_once && queryDesc->already_executed)
(gdb) 
362         queryDesc->already_executed = true;
(gdb) 
364         ExecutePlan(estate,
(gdb)

standard_ExecutorRun->关闭资源并切换上下文

(gdb) 
378     if (sendTuples)
(gdb) n
379         dest->rShutdown(dest);
(gdb) 
381     if (queryDesc->totaltime)
(gdb) 
384     MemoryContextSwitchTo(oldcontext);
(gdb) 
385 }
(gdb)

standard_ExecutorRun->回到PortalRunSelect

(gdb) n
ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308
308 }
(gdb) 
PortalRunSelect (portal=0x1af2468, forward=true, count=0, dest=0x1b74668) at pquery.c:934
934             nprocessed = queryDesc->estate->es_processed;

快照出栈,修改状态atStart/atEnd等

(gdb) n
935             PopActiveSnapshot();
(gdb) 
938         if (!ScanDirectionIsNoMovement(direction))
(gdb) 
940             if (nprocessed > 0)
(gdb) p nprocessed
$6 = 99991
(gdb) n
941                 portal->atStart = false;    
(gdb) 
942             if (count == 0 || nprocessed < (uint64) count)
(gdb)

完成调用

(gdb) n
943                 portal->atEnd = true;   
(gdb) p count
$7 = 0
(gdb) n
944             portal->portalPos += nprocessed;
(gdb) 
997     return nprocessed;
(gdb) 
998 }
(gdb) n
PortalRun (portal=0x1af2468, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x1b74668, altdest=0x1b74668, 
    completionTag=0x7ffc5ff58740 "") at pquery.c:780
780                 if (completionTag && portal->commandTag)
(gdb) p nprocessed
$8 = 99991

“PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么

本文链接: https://www.lsjlt.com/news/64781.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作