iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >详解Python的Twisted框架中reactor事件管理器的用法
  • 244
分享到

详解Python的Twisted框架中reactor事件管理器的用法

管理器详解框架 2022-06-04 19:06:42 244人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

铺垫 在大量的实践中,似乎我们总是通过类似的方式来使用异步编程: 监听事件 事件发生执行对应的回调函数 回调完成(可能产生新的事件添加进监听队列) 回到1,监听事件 因此我们将这样的异步

铺垫
在大量的实践中,似乎我们总是通过类似的方式来使用异步编程

监听事件 事件发生执行对应的回调函数 回调完成(可能产生新的事件添加进监听队列) 回到1,监听事件

因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行。

查看图片

上图是boost对Reactor模式的描绘,Twisted的设计就是基于这样的Reactor模式,Twisted程序就是在等待事件、处理事件的过程中不断循环。


from twisted.internet import reactor
reactor.run()

reactor是Twisted程序中的单例对象。

reactor
reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。关于reactor有下面几个结论:

Twisted的reactor只有通过调用reactor.run()来启动。 reactor循环是在其开始的进程中运行,也就是运行在主进程中。 一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。 reactor循环并不会消耗任何CPU的资源。 并不需要显式的创建reactor,只需要引入就OK了。

最后一条需要解释清楚。在Twisted中,reactor是Singleton(也就是单例模式),即在一个程序中只能有一个reactor,并且只要你引入它就相应地创建一个。上面引入的方式这是twisted默认使用的方法,当然了,twisted还有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系统调用来poll来代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安装它。下面是安装pollreactor的方法:


from twisted.internet import pollreactor
pollreactor.install()

如果你没有安装其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted会根据操作系统安装默认的reactor。正因为如此,习惯性做法不要在最顶层的模块内引入reactor以避免安装默认reactor,而是在你要使用reactor的区域内安装。
下面是使用 pollreactor重写上上面的程序:


from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

那么reactor是如何实现单例的?来看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代码:


# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

注:python中所有加载到内存的模块都放在sys.modules,它是一个全局字典。当import一个模块时首先会在这个列表中查找是否已经加载了此模块,如果加载了则只是将模块的名字加入到正在调用import的模块的命名空间中。如果没有加载则从sys.path目录中按照模块名称查找模块文件,找到后将模块载入内存,并加入到sys.modules中,并将名称导入到当前的命名空间中。

假如我们是第一次运行from twisted.internet import reactor,因为sys.modules中还没有twisted.internet.reactor,所以会运行reactory.py中的代码,安装默认的reactor。之后,如果导入的话,因为sys.modules中已存在该模块,所以会直接将sys.modules中的twisted.internet.reactor导入到当前命名空间。

default中的install:


# twisted/internet/default.py
def _getInstallFunction(platfORM):
  """
  Return a function to install the reactor most suited for the given platform.

  @param platform: The platform for which to select a reactor.
  @type platform: L{twisted.Python.runtime.Platform}

  @return: A zero-argument callable which will install the selected
    reactor.
  """
  try:
    if platform.islinux():
      try:
        from twisted.internet.epollreactor import install
      except ImportError:
        from twisted.internet.pollreactor import install
    elif platform.getType() == 'posix' and not platform.isMacOSX():
      from twisted.internet.pollreactor import install
    else:
      from twisted.internet.selectreactor import install
  except ImportError:
    from twisted.internet.selectreactor import install
  return install


install = _getInstallFunction(platform)

很明显,default中会根据平台获取相应的install。Linux下会首先使用epollreactor,如果内核还不支持,就只能使用pollreactor。Mac平台使用pollreactor,windows使用selectreactor。每种install的实现差不多,这里我们抽取selectreactor中的install来看看。


# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  # 单例
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)

# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.

  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor键,值就是再install中创建的单例reactor。以后要使用reactor,就会导入这个单例了。


SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor实现了IReactorFDSet接口的方法,这里用到了zope.interface,它是python中的接口实现,有兴趣的同学可以去看下。

IReactorFDSet接口主要对描述符的获取、添加、删除等操作的方法。这些方法看名字就能知道意思,所以我就没有加注释。


# twisted/internet/interfaces.py
class IReactorFDSet(Interface):

  def addReader(reader):

  def addWriter(writer):

  def removeReader(reader):

  def removeWriter(writer):

  def removeAll():

  def getReaders():

  def getWriters():
reactor.listentcp()

示例中的reactor.listenTCP()注册了一个监听事件,它是父类PosixReactorBase中方法。


# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase):

  def listenTCP(self, port, factory, backlog=50, interface=''):
    p = tcp.Port(port, factory, backlog, interface, self)
    p.startListening()
    return p

# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
  def __init__(self, port, factory, backlog=50, interface='', reactor=None):
    """Initialize with a numeric port to listen on.
    """
    base.BasePort.__init__(self, reactor=reactor)
    self.port = port
    self.factory = factory
    self.backlog = backlog
    if abstract.isIPv6Address(interface):
      self.addressFamily = socket.AF_INET6
      self._addressType = address.IPv6Address
    self.interface = interface
  ...

  def startListening(self):
    """Create and bind my socket, and begin listening on it.
     创建并绑定套接字,开始监听。

    This is called on unserialization, and must be called after creating a
    server to begin listening on the specified port.
    """
    if self._preexistingSocket is None:
      # Create a new socket and make it listen
      try:
        # 创建套接字
        skt = self.createInternetSocket()
        if self.addressFamily == socket.AF_INET6:
          addr = _resolveIPv6(self.interface, self.port)
        else:
          addr = (self.interface, self.port)
        # 绑定
        skt.bind(addr)
      except socket.error as le:
        raise CannotListenError(self.interface, self.port, le)
      # 监听
      skt.listen(self.backlog)
    else:
      # Re-use the externally specified socket
      skt = self._preexistingSocket
      self._preexistingSocket = None
      # Avoid shutting it down at the end.
      self._shouldShutdown = False

    # Make sure that if we listened on port 0, we update that to
    # reflect what the OS actually assigned us.
    self._realPortNumber = skt.getsockname()[1]

    log.msg("%s starting on %s" % (
        self._getLogPrefix(self.factory), self._realPortNumber))

    # The order of the next 5 lines is kind of bizarre. If no one
    # can explain it, perhaps we should re-arrange them.
    self.factory.doStart()
    self.connected = True
    self.socket = skt
    self.fileno = self.socket.fileno
    self.numberAccepts = 100

    # startReading调用reactor的addReader方法将Port加入读集合
    self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环


# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)

# twisted/internet/base.py
class _SignalReactorMixin(object):

  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
    _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)

  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()

  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration是关键,select,poll,epool实现各有不同
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。


# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):

  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)

  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.

    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      # 调用select方法监控读写集合,返回准备好读写的描述符
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has Gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise

    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.

        # 调用_doReadOrWrite方法
        _logrun(selectable, _drdw, selectable, method)

  doIteration = doSelect

  def _doReadOrWrite(self, selectable, method):
    try:
      # 调用method,doRead或者是doWrite,
      # 这里的selectable可能是我们监听的tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。


# twisted/internet/tcp.py

@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):

  def doRead(self):
    """Called when my socket is ready for reading.
    当套接字准备好读的时候调用

    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          # 调用accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise

        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
        # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。


# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):

  def doRead(self):
    try:
      # 接收数据
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST

    return self._dataReceived(data)

  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    # 调用我们自定义protocol的dataReceived方法处理数据
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

--结束END--

本文标题: 详解Python的Twisted框架中reactor事件管理器的用法

本文链接: https://www.lsjlt.com/news/14850.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 详解Reactor中Context的用法
    目录一、使用介绍二、源码解读三、如何桥接现有的ThreadLocal系统四、总结在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为原生的,在处理一个数据流Flux/M...
    99+
    2023-02-19
    Reactor Context用法 Reactor Context
  • Python Django框架中表单的用法详解
    目录文件上传Form表单表单字段字段参数widget参数表单的验证表单模型文件上传例子模型表单AJAXDjango保证表单的正确显示需要添加CSRF(防止网站跨站请求伪造而默认开启的...
    99+
    2024-04-02
  • Python中的flask框架详解
    Flask是一个Python编写的Web 微框架,让我们可以使用Python语言快速实现一个网站或Web服务。本文参考自Flask官方文档,大部分代码引用自官方文档。 安装flask...
    99+
    2024-04-02
  • Spring框架对于Bean的管理详解
    目录什么是Bean管理Bean管理操作的两种方式基于注解的方式实现Bean管理和注入属性(常用)1.什么是注解2.Spring针对Bean管理中创建对象提供的注解3.用注解的方式创建...
    99+
    2024-04-02
  • Python中的Unittest框架的用法
    本篇内容主要讲解“Python中的Unittest框架的用法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Python中的Unittest框架的用法”吧!  Unittest  1.环境  Un...
    99+
    2023-06-02
  • Pytest框架conftest.py文件的使用详解
    目录conftest.py文件特点1、conftest.py文件介绍2、conftest.py的注意事项3、conftest.py的使用4、不同位置conftest.py文件的优先级...
    99+
    2024-04-02
  • Gin框架使用panic处理中间件问题详解
    目录背景实现总结背景 在 Gin 框架中,错误处理和 panic 处理是非常重要的功能。当处理 HTTP 请求时,可能会出现各种各样的错误,例如数据库连接错误、网络错误、权限问题等等...
    99+
    2023-05-15
    Gin panic处理中间件 Gin panic中间件 Gin处理中间件
  • 详解Python中的上下文管理器原理
    目录with语句上下文管理器原理自定义上下文管理器contextmanager 装饰器with语句 在我们日常使用场景中,经常会操作一些资源,比如文件对象、数据库连接、Socket连...
    99+
    2023-03-11
    Python上下文管理器原理 Python上下文管理器使用 Python上下文管理器
  • JavaScript中BOM,DOM和事件的用法详解
    目录BOM概念对象组成Window:窗口对象Location:地址栏对象History:历史记录对象DOM概念W3C DOM 标准被分为 3 个不同的部分核心DOM模型HTML DO...
    99+
    2024-04-02
  • 无UI 组件Headless框架逻辑原理用法示例详解
    目录概述精读总结概述 Headless 组件即无 UI 组件,框架仅提供逻辑,UI 交给业务实现。这样带来的好处是业务有极大的 UI 自定义空间,而对框架来说,只考虑逻辑可以让自己更...
    99+
    2022-11-13
    无UI组件Headless框架逻辑  Headless组件原理
  • 如何使用Python打包文件框架轻松管理你的项目?
    Python是一门非常强大的编程语言,因为它可以在不同的应用场景下使用,如机器学习、数据科学、Web应用程序等。因此,许多开发人员使用Python来开发各种项目。但是,如果你不使用任何工具来管理你的项目,可能会变得混乱无序。所以,今天我们将...
    99+
    2023-06-22
    打包 文件 框架
  • PHP 框架中的事件处理和消息传递机制
    非常抱歉,由于您没有提供文章标题,我无法为您生成一篇高质量的文章。请您提供文章标题,我将尽快为您生成一篇优质的文章。...
    99+
    2024-05-23
  • Winform开发框架中的通用附件管理模块是什么
    这篇文章将为大家详细讲解有关Winform开发框架中的通用附件管理模块是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。在做Winform项目的时候,一直有一个梦想,就是希望把所有的组件模...
    99+
    2023-06-17
  • JavaScript事件的委托(代理)的用法示例详解
    目录简介示例:事件委托写法1:事件委托写法2:每个子元素都绑定事件示例:新增元素写法1:事件委托写法2:每个子元素都绑定事件简介 说明 本文用示例介绍JavaScript中的事件(E...
    99+
    2024-04-02
  • python中SQLAlchemy框架的用法是什么
    SQLAlchemy是一个Python的ORM(对象关系映射)工具,它提供了一种在Python中操作关系型数据库的方式,并且支持多种...
    99+
    2024-04-09
    python SQLAlchemy
  • Python 分布式框架在文件处理中的应用,你了解吗?
    在现代大数据时代,数据量已经不再是一个局限。当我们需要进行大规模数据处理时,单机的计算资源已经无法满足我们的需求。这时候,分布式系统就成为了我们的选择。Python 作为一门高效的编程语言,也提供了很多分布式框架,比如 PySpark、D...
    99+
    2023-10-14
    分布式 框架 文件
  • ECharts鼠标事件的处理方法详解
    事件是用户或浏览器自身执行的某种动作,如click、mouseover、页面加载完毕后触发load事件,都属于事件。 为了记录用户的操作和行为路径,需要完成鼠标事件处理和组件交互的行...
    99+
    2024-04-02
  • Node.js中Express框架的使用教程详解
    目录Express简介Express生成器1. 什么是Express生成器2. 安装Express生成器创建Express项目安装项目依赖运行Express项目Express目录结构...
    99+
    2024-04-02
  • 如何使用 Python 解析 Spring 框架中的配置文件?
    Spring 框架是一个广泛使用的 Java 应用程序框架,它的主要特点是将业务逻辑与底层代码分离。在 Spring 中,应用程序的配置文件扮演着至关重要的角色,它们描述了应用程序的各种设置和配置,如数据库连接、日志设置、Web 应用程序上...
    99+
    2023-09-04
    文件 关键字 spring
  • Spring框架中一个有用的小组件之Spring Retry组件详解
    1、概述 Spring Retry 是Spring框架中的一个组件, 它提供了自动重新调用失败操作的能力。这在错误可能是暂时发生的(如瞬时网络故障)的情况下很有帮助。 在本文中,我们...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作