广告
返回顶部
首页 > 资讯 > 后端开发 > Python >asyncio之Coroutines,T
  • 315
分享到

asyncio之Coroutines,T

asyncioCoroutines 2023-01-30 23:01:18 315人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

Coroutines and Tasks属于High-level apis,也就是高级层的api。 本节概述用于协程和任务的高级异步api。 Coroutines Coroutines翻译过来意思是协程,使用async/await语

Coroutines and Tasks属于High-level apis,也就是高级层的api。

本节概述用于协程和任务的高级异步api。

Coroutines

Coroutines翻译过来意思是协程,
使用async/await语法声明的协程是编写asyncio应用程序的首选方法。

import asyncio


async def main():
    print("hello")
    await asyncio.sleep(1)
    print("world")


if __name__ == '__main__':
    # asyncio.run(main())  # 3.7的用法
    # 阻塞直到hello world()协程结束时返回
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

第一个异步函数是通过创建loop循环去调用,其他异步函数之间通过await进行调用。
像下面的一个例子

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 阻塞直到hello world()协程结束时返回
    loop.run_until_complete(main())
    loop.close()

或者我们可以通过asyncio.create_task()将协程say_after封装任务去调用就像下面这样。

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待两个子任务完成
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

如果报错async没有create_task,可以用
ensure_future代替
 

Awaitables

我们说,如果一个对象可以用在await表达式中,那么它就是Awaitables的对象。
可等待对象主要有三种类型:coroutines, Tasks, and Futures.

Coroutines

 前面的代码中演示了协程的运作方式,这里主要强调两点。

  • 协程函数:asyc def定义的函数;
  • 协程对象:通过调用协程函数返回的对象。

    Tasks

    任务对协程进一步封装,其中包含任务的各种状态。
    协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。
import asyncio


async def nested():
    await asyncio.sleep(2)
    print("等待2s")


async def main():
    # 将协程包装成任务含有状态
    # task = asyncio.create_task(nested())
    task = asyncio.ensure_future(nested())
    print(task)
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
    print(task)
    print(task.done())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

可以看到

<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>>
等待2s
<Task finished coro=<nested() done, defined at /Users/chennan/Pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None>
True

创建task后,task在加入事件循环之前是pending状态然后调用nested函数等待2s之后打印task为finished状态。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,python3.7增加了asyncio.create_task(coro)。其中task是Future的一个子类

Future

future:代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
通常不需要在应用程序级别代码中创建Future对象。
future对象有几个状态:

  • Pending
  • Running
  • Done
  • Cancelled

通过上面的代码可以知道创建future的时候,task为pending,事件循环调用执行的时候是running,调用完毕自然就是done于是调用task.done()打印了true。

如果在命令行中运行上述代码,ctrl+c后会发现
输出以下内容

<Task pending coro=<nested() running at 1-2-1.py:9>>
^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
<Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>

因为我们调用了task.cancel() 所以可以看到此时的任务状态为取消状态。

并发的执行任务

通过使用await+asyncio.gather可以完成并发的操作。
asyncio.gather用法如下。
**asyncio.gather(*aws, loop=None, return_exceptions=False)
aws是一系列协程,协程都成功完成,就返回值一个结果列表。结果值的顺序与aws中添加协程的顺序相对应。
return_exceptions=False,其实就是如果有一个任务失败了,就直接抛出异常。如果等于True就把错误信息作为结果返回回来。
首先来一个正常情况不出错的例子:

import asyncio


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        if number == 2:
            1 / 0
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")


async def main():
    # Schedule three calls *concurrently*:
    res = await asyncio.gather(
        *[factorial("A", 2),
          factorial("B", 3),
          factorial("C", 4)]
        , return_exceptions=True)
    for item in res:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

输入以下内容:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
division by zero
None
None

可以发现async.gather最后会返回一系列的结果,如果出现了错误就把错误信息作为返回结果,这里我当数字为2时人为加了异常操作1/0,于是返回了结果division by zero,对于其他的任务因为没有返回值所以是None。这里return_exceptions=True来保证了如果其中一个任务出现异常,其他任务不会受其影响会执行到结束。

asyncio.wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

asyncio.wait和async.gather用法差不多只是async.wait接收的是个列表。
第三个参数和async.gather有点区别.

参数名 含义
FIRST_COMPLETED 任何一个future完成或取消时返回
FIRST_EXCEPTION 任何一个future出现错误将返回,如果出现异常等价于ALL_COMPLETED
ALL_COMPLETED 当所有任务完成或者被取消时返回结果,默认值。

Timeouts

通过使用asyncio.wait_for来完成一个超时函数回调操作,如果函数规定时间内未完成则报错。
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw代表一个协程,timeout单位秒。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

1秒内eternity没有完成就报错了。
python3.7中发生更改:当aw由于超时而被取消时,不再显示异常而是等待aw被取消。
说到timeout的,如果仅仅是对一个代码块做timeout操作而不是等待某个协程此时推荐第三方模块async_timeout

async_timeout

安装

pip installa async_timeout

使用方法很简单如下

async with async_timeout.timeout(1.5) as cm:
    await inner()
print(cm.expired)

如果1.5s可以运行完打印true,否则打印false,表示超时。

asyncio.as_completed

**asyncio.as_completed(aws, *, loop=None, timeout=None)**
使用as_completed会返回一个可以迭代的future对象,同样可以获取协程的运行结果,使用方法如下:

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.fORMat(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来
官网实例:

图解:

 1、run_until_complete运行,会注册task(协程:print_sum)并开启事件循环 →

 2、print_sum协程中嵌套了子协程,此时print_sum协程暂停(类似委托生成器),转到子协程(协程:compute)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操作 →

 3、1秒后,调用方将控制权给到子协程(调用方与子协程直接通信),子协程执行接下来的代码,直到再遇到wait(此实例没有)→

 4、 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句 →

 5、向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。

调度线程

asyncio.run_coroutine_threadsafe(coro, loop)
等待其他线程返回一个concurrent.futures.Future对象,这是一个线程安全的方法。
这个函数应该从不同的OS线程调用,而不是从事件循环所在的线程调用。

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。
run_in_executor

import time
import asyncio


async def main():
    print(f'{time.ctime()} Hello')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye')
    loop.stop()


def blocking():  # 1
    time.sleep(0.5)  # 2
    print(f'{time.ctime()} Hello from a thread!')


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)  # 3

loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop)  # 4
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

输出

Fri Jan  4 15:32:03 2019 Hello
Fri Jan  4 15:32:04 2019 Hello from a thread!
Fri Jan  4 15:32:04 2019 Goodbye

下面对上面的函数的序号进行讲解:

1 这个函数调用了常规的sleep(),这会阻塞主线程并阻止loop运行,我们不能使这个函数变成协程,更糟糕的是不能在主线程运行loop时调用它,解决办法是用一个executor来运行它;
2 注意一点,这个sleep运行时间比协程中的sleep运行时间要短,后文再讨论如果长的话会发生什么;
3 该方法帮助我们在事件loop里用额外的线程或进程执行函数,这个方法的返回值是一个Future对象,意味着可以用await来切换它;
4 挂起的task中不包含前面的阻塞函数,并且这个方法只返回task对象,绝对不会返回Future对象。

绑定回调

绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入

import time
import asyncio
 
now = lambda : time.time()
 
async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)
 
def callback(future):  # 回调函数
    print('Callback: ', future.result())
 
start = now()
 
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
get_future = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)  # 添加回调函数
loop.run_until_complete(get_future)
 
print('TIME: ', now() - start)

回调函数需要多个参数时,future参数要放最后。执行完成,我们可以通过参数future获取协程的执行结果:future.result()

import functools   # functools.partial:偏函数,能将带参数的函数包装成一个新的函数
def callback(t, future): # 回调函数 ,future放最后
    print('Callback:', t, future.result())
 
task.add_done_callback(functools.partial(callback, 2)

asyncio.iscoroutine(obj)

Return True if obj is a coroutine object.
判断是否为coroutine对象,如果是返回True

asyncio.iscoroutinefunction(func)

判断是否为coroutine函数,如果是返回True

参考资料

https://docs.python.org/3.7/library/asyncio-task.html
Https://www.jianshu.com/p/b5e347b3a17c

微信公众号:python学习开发 加微信italocxa 入群。

原文地址:https://www.cnblogs.com/c-x-a/p/10220398.html

--结束END--

本文标题: asyncio之Coroutines,T

本文链接: https://www.lsjlt.com/news/181102.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • asyncio之Coroutines,T
    Coroutines and Tasks属于High-level APIs,也就是高级层的api。 本节概述用于协程和任务的高级异步api。 Coroutines Coroutines翻译过来意思是协程,使用async/await语...
    99+
    2023-01-30
    asyncio Coroutines
  • Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用
    目录前记1.Asyncio的入口2.两种Coroutine调用方法的区别3.Task与Future3.1.Future3.2.Task4.总结前记 上一遍文章《Python中Asyn...
    99+
    2022-11-11
  • 异步编程之asyncio简单介绍
    引言: python由于GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python性能方面的短板. asyncio是做什么的 异步网络操作 ...
    99+
    2023-01-30
    简单 asyncio
  • SQLServer之T-SQL增、删、改、查
    增-插入语句:insert into 表名 (列名,列名,列名)values (数据,数据,数据)如-添加3班的张三成绩为90到成绩表中:insert into 成绩表 (姓名,成绩,班级)values (...
    99+
    2022-10-18
  • Python Asyncio库之asyncio.task常用函数详解
    目录前记0.基础1.休眠--asyncio.sleep2.屏蔽取消--asyncio.shield3.超时--asyncio.wait_for4.简单的等待--wait5.迭代可等待...
    99+
    2023-03-01
    Python Asyncio asyncio.task Python Asyncio Python asyncio.task
  • asyncio异步编程之Task对象详解
    目录1.Task对象的作用2.如何创建task对象3.示例一(目前不推荐这种写法)4.示例25.示例3(算是以上示例2的简化版)总结1.Task对象的作用 可以将多个任务添加到事件循...
    99+
    2022-11-13
  • SQL 和 T-SQL 之间的区别
    在这篇文章中,我们将了解 SQL 和 T-SQL 之间的区别。SQL它是非过程语言。关系数据库使用 SQL。它代表结构化查询语言。它使用查询来查看和操作数据。使用DML和DDL操作-数据操作语言和数据定义语言。 它被认为是一种开源语言。它有...
    99+
    2023-10-22
  • Python Asyncio库之asyncio.task常用函数有哪些
    0.基础在《Python Asyncio调度原理》中介绍了Asyncio的两种调度基本单位,Handler和TimeHandler,他们只能被loop.call_xx函数调用,开发者从表面上不知道他们的存在,他们和loop.call_xx属...
    99+
    2023-05-14
    Python asyncio
  • Python Asyncio 库之同步原语常用函数详解
    目录前记0.基础1.Lock2.Event4.Condition5.Semaphore前记 Asyncio的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现。...
    99+
    2023-03-01
    Python Asyncio 库同步原语 Python Asyncio 函数
  • Python Asyncio库之同步原语常用函数有哪些
    这篇文章主要讲解了“Python Asyncio库之同步原语常用函数有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Python Asyncio库之同步原语常用函数有...
    99+
    2023-07-05
  • Python异步之在Asyncio中怎么运行阻塞任务
    今天小编给大家分享一下Python异步之在Asyncio中怎么运行阻塞任务的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。正文...
    99+
    2023-07-05
  • JDK5.0垃圾收集优化之--Don't Pause
    原本想把题目更简单的定为--《不要停》的,但还是自己YY一下就算了。 Java开发Server最大的障碍,就是JDK1.4版之前的的串行垃圾收集机制会引起长时间的服务暂停,明白原理后,想想那些用JDK1.3写Server的先辈,不...
    99+
    2023-06-03
  • SAP MM 特殊库存之T库存初探
    SAP MM 特殊库存之T库存初探笔者所在的A项目里,销售业务广泛启用了POD功能。VL02N对交货单做了发货过账后物权并没有转移,而是将自有E库存转为一个叫做在途库存的特殊库存里。等到货物到了客户那里,客户确认实际收货数量后,才去系统上执...
    99+
    2023-06-05
  • Python 异步之在 Asyncio中如何运行阻塞任务详解
    目录正文1. 阻塞任务2. 如何运行阻塞任务3. 实例正文 阻塞任务是阻止当前线程继续进行的任务。 如果在 asyncio 程序中执行阻塞任务,它会停止整个事件循环,从而阻止任何其...
    99+
    2023-03-22
    Python 异步Asyncio阻塞运行 Python 异步
  • GaussDB T 1.0.2单机环境TPC-C之BenchmarkSQL性能测试
    原文链接: https://www.modb.pro/db/2251303=   摘要:GaussDB T 1.0.2单机环境TPC-C之BenchmarkSQL性能测试...
    99+
    2022-10-18
  • select count(?) from t之间的性能有什么差别
    这篇文章主要介绍“select count() from t之间的性能有什么差别”,在日常操作中,相信很多人在select count() from t之间的性能有什么差别问题上存在疑惑...
    99+
    2022-10-19
  • SQL学习之T-SQL编程之标识符、变量、批处理与运算符
    1、标识符:(1)定义:就像每个人都要有个名字一样,在SQL Server中,每一项对象也都要有一个作为标识用的名称,这就是标识符。例如数据库名称、数据表名称、字段名称等等,这些名称统称为标识符。(2)命名...
    99+
    2022-10-18
  • SQL Server使用T-SQL进阶之公用表表达式(CTE)
    在编写T-SQL代码时,往往需要临时存储某些结果集。前面我们已经广泛使用和介绍了两种临时存储结果集的方法:临时表和表变量。除此之外,还可以使用公用表表达式的方法。 公用表表达式(Co...
    99+
    2022-11-13
  • SQL(结构化查询语言)和T-SQL(Transact-SQL)之间的区别。
    SQLSQL,结构化查询语言是一种非过程语言,数据库引擎使用它来解释 SQL 查询以创建/修改/访问数据库T-SQLT-SQL(Transact-SQL)是 SQL 的过程扩展,由 SQL Server 使用。与Oracle的PL/SQL类...
    99+
    2023-10-22
  • Python字节单位转换之将字节转换为K M G T的示例
    这篇文章将为大家详细讲解有关Python字节单位转换之将字节转换为K M G T的示例,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。def bytes_to_human(n): &nb...
    99+
    2023-06-06
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作