广告
返回顶部
首页 > 资讯 > 数据库 >PostgreSQL中插入数据时与WAL相关的处理逻辑是什么
  • 247
分享到

PostgreSQL中插入数据时与WAL相关的处理逻辑是什么

2024-04-02 19:04:59 247人浏览 八月长安
摘要

本篇内容主要讲解“postgresql中插入数据时与WAL相关的处理逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中插入数据时与W

本篇内容主要讲解“postgresql中插入数据时与WAL相关的处理逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中插入数据时与WAL相关的处理逻辑是什么”吧!

一、数据结构

静态变量
进程中全局共享

static int  num_rdatas;         
//已分配的空间大小
static int  max_rdatas;         
//是否调用XLogBeginInsert函数
static bool begininsert_called = false;

static XLoGCtlData *XLogCtl = NULL;


static uint8 curinsert_flags = 0;


XLogRecPtr  ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr  XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;


//用于WALInsertLockAcquire/Release函数
static int  MyLockNo = 0;
static bool holdingAllLocks = false;

宏定义

typedef char* Pointer;//指针
typedef Pointer Page;//Page

#define XLOG_HEAP_INSERT   0x00


typedef uint64 XLogRecPtr;



#define PageGetLSN(page) \
    PageXLogRecPtrGet(((PageHeader) (page))->pd_lsn)
#define PageSetLSN(page, lsn) \
    PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn)


//存储压缩会后的块镜像所需要的缓存空间大小
#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)

//-------------------------------------------------- 相关

typedef int slock_t;

typedef uint32 pg_crc32c;

#define SpinLockInit(lock)  S_INIT_LOCK(lock)

#define SpinLockAcquire(lock) S_LOCK(lock)

#define SpinLockRelease(lock) S_UNLOCK(lock)

#define SpinLockFree(lock)  S_LOCK_FREE(lock)

#define XLogSegmentOffset(xlogptr, wal_segsz_bytes) \
    ((xlogptr) & ((wal_segsz_bytes) - 1))

#define LW_FLAG_HAS_WaiTERS         ((uint32) 1 << 30)
#define LW_FLAG_RELEASE_OK          ((uint32) 1 << 29)
#define LW_FLAG_LOCKED              ((uint32) 1 << 28)

#define LW_VAL_EXCLUSIVE            ((uint32) 1 << 24)
#define LW_VAL_SHARED               1

#define LW_LOCK_MASK                ((uint32) ((1 << 25)-1))

#define LW_SHARED_MASK              ((uint32) ((1 << 24)-1))

LWLock
lwlock.c外的代码不应直接操作这个结构的内容,但我们必须声明该结构体以便将LWLocks合并到其他数据结构中。


typedef struct LWLock
{
    uint16      tranche;        
    //独占/非独占locker的状态
    pg_atomic_uint32 state;     
    //正在等待的PGPROCs链表
    proclist_head waiters;      
#ifdef LOCK_DEBUG//用于DEBUG
    //waiters的数量
    pg_atomic_uint32 nwaiters;  
    //锁的最后独占者
    struct PGPROC *owner;       
#endif
} LWLock;

二、源码解读

heap_insert
主要实现逻辑是插入元组到堆中,其中存在对WAL(XLog)进行处理的部分.
参见PostgreSQL 源码解读(104)- WAL#1(Insert & WAL-heap_insert函数#1)

XLogInsert/XLogInsertRecord
插入一个具有指定的RMID和info字节的XLOG记录,该记录的主体是先前通过XLogReGISter*调用注册的数据和缓冲区引用。
参见PostgreSQL 源码解读(106)- WAL#3(Insert & WAL-heap_insert函数#3)

WALInsertLockXXX
包括WALInsertLockAcquireExclusive、WALInsertLockAcquire和WALInsertLockRelease等

//----------------------------------------------------------- WALInsertLockAcquireExclusive

static void
WALInsertLockAcquireExclusive(void)
{
    int         i;

    
    for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)//NUM_XLOGINSERT_LOCKS
    {
        LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
        LWLockUpdateVar(&WALInsertLocks[i].l.lock,
                        &WALInsertLocks[i].l.insertingAt,
                        PG_UINT64_MAX);
    }
    
    //在释放时,变量值重置为0
    LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
    //设置标记
    holdingAllLocks = true;
}



bool
LWLockAcquire(LWLock *lock, LWLockMode mode)
{
    PGPROC     *proc = MyProc;//PGPROC数据结构
    bool        result = true;
    int         extraWaits = 0;
#ifdef LWLOCK_STATS
    lwlock_stats *lwstats;

    lwstats = get_lwlock_stats_entry(lock);//获得锁的统计入口
#endif
    //模式验证
    AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);

    PRINT_LWDEBUG("LWLockAcquire", lock, mode);

#ifdef LWLOCK_STATS
    
    if (mode == LW_EXCLUSIVE)
        lwstats->ex_acquire_count++;
    else
        lwstats->sh_acquire_count++;
#endif                          

    
    Assert(!(proc == NULL && IsUnderPostmaster));

    
    //确保我们有足够的地方存储锁
    if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
        elog(ERROR, "too many LWLocks taken");

    
    HOLD_INTERRUPTS();

    
    for (;;)
    {
        bool        mustwait;

        
        mustwait = LWLockAttemptLock(lock, mode);

        if (!mustwait)
        {
            LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
            break;              
        }

        

        
        //添加到队列中
        LWLockQueueSelf(lock, mode);

        
        //在需要的时候,确保可以被唤醒
        mustwait = LWLockAttemptLock(lock, mode);

        
        //第二次尝试获取锁,需要取消排队
        if (!mustwait)
        {
            LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");

            LWLockDequeueSelf(lock);//出列
            break;
        }

        
        LOG_LWDEBUG("LWLockAcquire", lock, "waiting");

#ifdef LWLOCK_STATS
        lwstats->block_count++;//统计
#endif

        LWLockReportWaitStart(lock);//报告等待
        TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);

        for (;;)
        {
            PGSemaphoreLock(proc->sem);
            if (!proc->lwWaiting)//如果不是LWLock等待,跳出循环
                break;
            extraWaits++;//额外的等待
        }

        
        //重试,允许LWLockRelease再次释放waiters
        pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);

#ifdef LOCK_DEBUG
        {
            
            //无需等待
            uint32      nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);

            Assert(nwaiters < MAX_BACKENDS);
        }
#endif

        TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
        LWLockReportWaitEnd();

        LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
        //再次循环以再次请求锁
        
        result = false;
    }

    TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
    //获取成功!
    
    //在该后台进程持有的锁链表中添加锁
    held_lwlocks[num_held_lwlocks].lock = lock;
    held_lwlocks[num_held_lwlocks++].mode = mode;

    
    while (extraWaits-- > 0)
        PGSemaphoreUnlock(proc->sem);

    return result;
}
 


static bool
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
{
    uint32      old_state;

    AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);

    
    old_state = pg_atomic_read_u32(&lock->state);

    
    //循环指针我们确定是否可以获得锁位置
    while (true)
    {
        uint32      desired_state;
        bool        lock_free;

        desired_state = old_state;

        if (mode == LW_EXCLUSIVE)//独占
        {
            lock_free = (old_state & LW_LOCK_MASK) == 0;
            if (lock_free)
                desired_state += LW_VAL_EXCLUSIVE;
        }
        else
        {
            //非独占
            lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
            if (lock_free)
                desired_state += LW_VAL_SHARED;
        }

        
        if (pg_atomic_compare_exchange_u32(&lock->state,
                                           &old_state, desired_state))
        {
            if (lock_free)
            {
                
                //很好,获取锁!
#ifdef LOCK_DEBUG
                if (mode == LW_EXCLUSIVE)
                    lock->owner = MyProc;
#endif
                return false;
            }
            else
                return true;    
        }
    }
    pg_unreachable();//正常来说,程序逻辑不应到这里
}
 

//----------------------------------------------------------- WALInsertLockAcquire

static void
WALInsertLockAcquire(void)
{
    bool        immed;

    
    static int  lockToTry = -1;

    if (lockToTry == -1)
        lockToTry = MyProc->pgprocno % NUM_XLOGINSERT_LOCKS;
    MyLockNo = lockToTry;

    
    immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
    if (!immed)
    {
        
        lockToTry = (lockToTry + 1) % NUM_XLOGINSERT_LOCKS;
    }
}

//----------------------------------------------------------- WALInsertLockRelease

static void
WALInsertLockRelease(void)
{
    if (holdingAllLocks)//如持有所有锁
    {
        int         i;

        for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
            LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
                                  &WALInsertLocks[i].l.insertingAt,
                                  0);

        holdingAllLocks = false;
    }
    else
    {
        LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
                              &WALInsertLocks[MyLockNo].l.insertingAt,
                              0);
    }
}
 


void
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
{
    LWLockWaitListLock(lock);

    
    *valptr = val;
    LWLockWaitListUnlock(lock);

    LWLockRelease(lock);
}



static void
LWLockWaitListLock(LWLock *lock)
{
    uint32      old_state;
#ifdef LWLOCK_STATS
    lwlock_stats *lwstats;
    uint32      delays = 0;

    lwstats = get_lwlock_stats_entry(lock);
#endif

    while (true)
    {
        
        //首次尝试直接获取锁
        old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
        if (!(old_state & LW_FLAG_LOCKED))
            break;              

        
        //然后在没有原子操作的情况下spin,直到锁释放
        {
            SpinDelayStatus delayStatus;//SpinDelay状态

            init_local_spin_delay(&delayStatus);//初始化

            while (old_state & LW_FLAG_LOCKED)//获取Lock
            {
                perfORM_spin_delay(&delayStatus);
                old_state = pg_atomic_read_u32(&lock->state);
            }
#ifdef LWLOCK_STATS
            delays += delayStatus.delays;
#endif
            finish_spin_delay(&delayStatus);
        }

        
    }

#ifdef LWLOCK_STATS
    lwstats->spin_delay_count += delays;//延迟计数
#endif
}



 
static void
LWLockWaitListUnlock(LWLock *lock)
{
    uint32      old_state PG_USED_FOR_ASSERTS_ONLY;

    old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);

    Assert(old_state & LW_FLAG_LOCKED);
}



void
LWLockRelease(LWLock *lock)
{
    LWLockMode  mode;
    uint32      oldstate;
    bool        check_waiters;
    int         i;

    
    for (i = num_held_lwlocks; --i >= 0;)
        if (lock == held_lwlocks[i].lock)
            break;

    if (i < 0)
        elog(ERROR, "lock %s is not held", T_NAME(lock));

    mode = held_lwlocks[i].mode;//模式

    num_held_lwlocks--;//减一
    for (; i < num_held_lwlocks; i++)
        held_lwlocks[i] = held_lwlocks[i + 1];

    PRINT_LWDEBUG("LWLockRelease", lock, mode);

    
    if (mode == LW_EXCLUSIVE)
        oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
    else
        oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);

    
    //舍我其谁!
    Assert(!(oldstate & LW_VAL_EXCLUSIVE));


    
    if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
        (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
        (oldstate & LW_LOCK_MASK) == 0)
        check_waiters = true;
    else
        check_waiters = false;

    
    if (check_waiters)
    {
        
        //XXX: 在commit前清除?
        LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
        LWLockWakeup(lock);
    }

    TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));

    
    RESUME_INTERRUPTS();
}

到此,相信大家对“PostgreSQL中插入数据时与WAL相关的处理逻辑是什么”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL中插入数据时与WAL相关的处理逻辑是什么

本文链接: https://www.lsjlt.com/news/64717.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • PostgreSQL中插入数据时与WAL相关的处理逻辑是什么
    本篇内容主要讲解“PostgreSQL中插入数据时与WAL相关的处理逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中插入数据时与W...
    99+
    2022-10-18
  • PostgreSQL在执行逻辑优化中相关的数据结构是什么
    这篇文章主要介绍“PostgreSQL在执行逻辑优化中相关的数据结构是什么”,在日常操作中,相信很多人在PostgreSQL在执行逻辑优化中相关的数据结构是什么问题上存在疑惑,小编查阅了各式资料,整理出简单...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作