iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Netty分布式pipeline传播inbound事件源码分析
  • 853
分享到

Netty分布式pipeline传播inbound事件源码分析

2024-04-02 19:04:59 853人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

目录传播inbound事件这里给大家看两种写法我们先以写法2为例, 将这种写法进行剖析我们跟进invokeChannelRead方法:我们跟到invokeChannelRead方法中

前一小结回顾:pipeline管道Handler删除

传播inbound事件

有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程

在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢, 也是我们这一小节要剖析的内容

在业务代码中, 传递channelRead事件方式是通过fireChannelRead方法进行传播的

这里给大家看两种写法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里重写了channelRead方法, 并且方法体内继续通过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?

我们先以写法2为例, 将这种写法进行剖析

这里首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline

我们跟到DefaultChannelpipeline的fireChannelRead方法中:

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

我们跟进invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的Object m m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理

我们跟到invokeChannelRead方法中

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 其实这里我们基本上就明白了, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中:

HeadContext的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //向下传递channelRead事件
    ctx.fireChannelRead(msg);
}

在这里我们看到, 这里通过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是我们刚才分析用户代码的第一种调用方式:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里直接通过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别的, 我会回到HeadConetx的channelRead方法, 我们来剖析ctx.fireChannelRead(msg)这句, 大家就会对这个问题有答案了, 跟到ctx的fireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

跟到AbstractChannelHandlerContext类中的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

这里我们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

跟到findContextInbound方法

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

这里的逻辑也比较简单, 是通过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

有关事件的标注, 之前的小节已经剖析过了, 如果是用户定义的handler, 是通过handler继承的接口而定的, 如果tail或者head, 那么是在初始化的时候就已经定义好, 这里不再赘述

回到AbstractChannelHandlerContext类的fireChannelRead方法中:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //第一次执行next其实就是head
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的逻辑我们又不陌生了, 因为我们传入的是当前context的下一个节点, 所以这里会调用下一个节点invokeChannelRead方法, 因我们刚才剖析的是head节点, 所以下一个节点有可能是用户添加的handler的包装类HandlerConext的对象

这里我们跟进invokeChannelRead方法中去:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try { 
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //发生异常的时候在这里捕获异常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

又是我们熟悉的逻辑, 调用了自身handler的channelRead方法, 如果是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 所以这里就解释了为什么通过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

同样, 也解释了该小节最初提到过的两种写法的区别:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

写法1是通过当前节点往下传播事件

写法2是通过头节点往下传递事件

所以, 在handler中如果如果要在channelRead方法中传递channelRead事件, 一定要采用写法2的方式向下传递, 或者交给其父类处理, 如果采用1的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常

这里有一点需要注意, 如果用户代码中channelRead方法, 如果没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 所以如果我们写业务代码的时候要考虑有关资源释放的相关操作

如果ctx.fireChannelRead(msg)则事件会继续往下传播, 如果每一个handler都向下传播事件, 当然, 根据我们之前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

我们跟到tailConext的channelRead方法中

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

我们跟进到onUnhandledInboundMessage方法中:

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //释放资源
        ReferenceCountUtil.release(msg);
    }
}

这里做了释放资源的相关的操作

至此, channelRead事件传输相关罗辑剖析完整, 其实对于inbound事件的传输流程都会遵循这一逻辑, 小伙伴们可以自行剖析其他inbound事件的传输流程,更多关于Netty分布式pipeline传播inbound事件的资料请关注编程网其它相关文章!

--结束END--

本文标题: Netty分布式pipeline传播inbound事件源码分析

本文链接: https://www.lsjlt.com/news/144106.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Netty分布式pipeline传播inbound事件源码分析
    目录传播inbound事件这里给大家看两种写法我们先以写法2为例, 将这种写法进行剖析我们跟进invokeChannelRead方法:我们跟到invokeChannelRead方法中...
    99+
    2024-04-02
  • netty pipeline中的inbound和outbound事件传播分析
    目录传播inbound事件两种写法DefaultChannelPipeline.fireChannelRead(msg)AbstractChannelHandlerContext.i...
    99+
    2023-05-17
    netty pipeline事件传播 inbound outbound
  • Netty分布式pipeline管道传播outBound事件源码解析
    目录outbound事件传输流程这里我们同样给出两种写法跟到其write方法中:跟到findContextOutbound中回到write方法:继续跟invokeWrite0我们跟到...
    99+
    2024-04-02
  • Netty分布式pipeline管道异常传播事件源码解析
    目录传播异常事件简单的异常处理的场景我们跟到invokeChannelRead这个方法我还是通过两种写法来进行剖析跟进invokeExceptionCaught方法跟到invokeE...
    99+
    2024-04-02
  • Netty分布式pipeline管道传播outBound事件的示例分析
    这篇文章将为大家详细讲解有关Netty分布式pipeline管道传播outBound事件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。outbound事件传输流程在我们业务代码中, 有可能使用w...
    99+
    2023-06-29
  • Netty分布式pipeline管道异常传播事件的示例分析
    这篇文章主要介绍了Netty分布式pipeline管道异常传播事件的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。传播异常事件简单的异常处理的场景@Overridep...
    99+
    2023-06-29
  • Netty分布式pipeline管道传播事件的逻辑总结分析
    目录问题分析首先看第一个问题我们看一下ChannelInitializer这个类的继承关系回到callHandlerCallbackLater方法中紧接着我们看第二个问题章节总结我们...
    99+
    2024-04-02
  • Netty分布式源码分析监听读事件
    前文传送门:NioSocketChannel注册到selector 我们回到AbstractUnsafe的register0()方法: private void register0(...
    99+
    2024-04-02
  • 分布式Netty源码分析
    这篇文章主要介绍了分布式Netty源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇分布式Netty源码分析文章都会有所收获,下面我们一起来看看吧。服务器端demo看下一个简单的Netty服务器端的例子pu...
    99+
    2023-06-29
  • 分布式Netty源码EventLoopGroup分析
    这篇文章主要介绍“分布式Netty源码EventLoopGroup分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“分布式Netty源码EventLoopGroup分析”文章能帮助大家解决问题。Ev...
    99+
    2023-06-29
  • 分布式Netty源码分析概览
    目录服务器端demoEventLoopGroup介绍功能1:先来看看注册Channel功能2:执行一些Runnable任务ChannelPipeline介绍bind过程sync介绍误...
    99+
    2024-04-02
  • netty中pipeline异常事件分析
    目录异常处理的场景AbstractChannelHandlerContext.invokeChannelReadAbstractChannelHandlerContext.notif...
    99+
    2023-05-17
    netty pipeline异常事件 netty pipeline
  • 分布式Netty源码分析EventLoopGroup及介绍
    目录EventLoopGroup介绍功能1:先来看看注册Channel功能2:执行一些Runnable任务EventLoop介绍NioEventLoop介绍EpollEventLoo...
    99+
    2024-04-02
  • Netty分布式ByteBuf的分类方式源码解析
    目录ByteBuf根据不同的分类方式 会有不同的分类结果1.Pooled和Unpooled2.基于直接内存的ByteBuf和基于堆内存的ByteBuf3.safe和unsafe上一小...
    99+
    2024-04-02
  • Netty分布式客户端处理接入事件handle源码解析
    目录处理接入事件创建handle我们看其RecvByteBufAllocator接口跟进newHandle()方法中继续回到read()方法我们跟进reset中前文传送门 :客户端接...
    99+
    2024-04-02
  • Netty分布式NioEventLoop优化selector源码解析
    目录优化selectorselector的创建过程代码剖析这里一步创建了这个优化后的数据结构最后返回优化后的selector优化selector selector的创建过程...
    99+
    2024-04-02
  • Netty分布式监听读事件的示例分析
    小编给大家分享一下Netty分布式监听读事件的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!我们回到AbstractUnsafe的register0()方...
    99+
    2023-06-29
  • Netty分布式NioEventLoop任务队列执行源码分析
    目录执行任务队列跟进runAllTasks方法:我们跟进fetchFromScheduledTaskQueue()方法回到runAllTasks(long timeoutNanos)...
    99+
    2024-04-02
  • Netty分布式ByteBuf缓冲区分配器源码解析
    目录缓冲区分配器以其中的分配ByteBuf的方法为例, 对其做简单的介绍跟到directBuffer()方法中我们回到缓冲区分配的方法然后通过validate方法进行参数验...
    99+
    2024-04-02
  • Netty分布式行解码器逻辑源码解析
    目录行解码器LineBasedFrameDecoder首先看其参数我们跟到重载的decode方法中我们看findEndOfLine(buffer)方法前文传送门:Netty分布式固定...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作