asyncio并发
GIL: global interpreter lock
event loop
asyncio:
同步:
- 代码顺序运行。如果一行代码运行很慢怎么办?该行代码之后的代码就要等待该行代码完成。
- 解决方案:协程,允许同时处理多个任务。
异步编程:
- 调用函数的位置不是执行函数的位置,执行函数的位置可以拖后。
- 耗时的任务从主应用程序中分离出来并在后台运行,不是阻塞其他代码等待该耗时任务完成,其他代码必须是不依赖耗时任务的代码。
- 协程是一种方法,该方法可以被暂停当有耗时任务时,然后恢复运行当耗时任务完成后。
- 使用async和await语法使异步代码看上去像同步代码运行一样。
- asyncio库以异步方式(使用协程模型:单线程事件循环)执行协程。
- asyncio能够通过与多线程和多进程的操作处理其他类型操作。
- asyncio可以用于I/O型操作也能用于CPU型操作。
I/O型和CPU型操作对比:
区别:阻碍代码更快运行的限制因素是I/O操作还是CPU操作。
并发,并行和多任务
并发和并行如何调度、执行各任务、方法、例行?
并发:虽然并发表示多任务在同时发生(被同时执行),但不意味着这些任务可以同时运行。CPU可以只有一个核。系统使用抢断式多任务模式实现任务间切换。
并行:不仅要求多任务同时发生(被同时执行),且要求多任务同时运行。CPU至少要有两个核。
简单用推球过程做比喻:
- 代码操作比做一个被推的球,越耗时球越重。
- 推球人相当于进程/线程。
- 非并发有且只有一个赛道可以让你推球,其他球装在口袋里(这是多啦A梦的魔法口袋),一个球推到目的地后再掏出下一个球推。
- 并发是可以存在多个赛道,每个赛道同时放置一个球,但是只有你一个人推,你要在不同赛道推球,来回换赛道的过程可以当作开销。
- 并行是可以有多个赛道,每个赛道同时放置一个球和至多一个人来推球。
多任务模式
- 抢断式多任务:操作系统通过时间切片切换任务
- 协作式多任务:在编程中显示地标明任务切换点。
asyncio使用协作多任务模式实现并发,在代码执行到切换点时(在该点需要等待执行结果返回)我们显示地在代码中标注他,使本行代码等待结果时其他代码继续执行。当我们标注的代码任务完成后,再继续执行后续代码任务。 - 对比抢断模式,协作模式的优势:
- 低资源强度:无需进程/线程上下文切换
- 切换时机更好:无需依赖进程/线程切换调度算法,我们显示地在正确的位置进行切换。
多进程、多线程
- 进程:一个应用运行,拥有独立存储空间。
- 线程:是轻量化的进程(无独立存储空间),操作系统管理的最小结构单元。
一个进程至少有一个线程称为主线程。
thtreading库在编程语言中是通过多线程实现并发的普遍方式,但是在python中会有难度,受制于全局解释锁GIL,多线程只对I/O工作有效。
multiprocessing库是通过多进程实现并发工作,很适合高强度CPU代码,也受限于python全局解释锁。
全局解释锁GIL:
GIL阻止一个python进程中的线程执行另一个python的字节码指令。
每个python进程都有自己的GIL,因此多进程可以跑多个python的字节码指令
GIL的存在是因为CPython对内存的管理方式(引用计数)。引用计数用于追踪谁当前访问了python对象(字典、列表、整数等)。当对象的引用计数达到0时该对象从内存中删除。
CPython是线程不安全的,即多个线程修改共享变量时,该变量的结果是不确定的状态,取决于修改次序(又称为竞争状况)。
CPU型python代码
下面两端代码一个单线程顺序执行,一个是多线程并发并调用并行执行,但是时间是相似的,这是由于GIL的存在有且只允许一个线程运行python代码。另一个线程处于等待状态,所以对于CPU型python代码没有价值。
I/O型python代码
对于I/O操作全局解释锁会被解开,因此可以使用多线程实现并发工作(多线程交替运行,一次只能运行一个)。
为什么GIL只对I/O型代码释放不对CPU型代码释放
这取决于后台生成的系统调用,I/O情况下系统调用不在python运行时中,不会和python对象有直接交互,GIL被允许释放。GIL允许我们并发执行(交替运行)而不是并行运行,不像java和C++。
asyncio库中使用coroutine时对象,他被当作执行轻量线程,就像多线程运行一样。注意的是asyncio没有规避掉GIL。如果是CPU任务我们依然需要使用多进程并发执行,只不过这个过程由asyncio来完成。
阻塞和非阻塞模式
以套接字为例,套接字可以视为一个接受和发送字节流的邮箱
- 在阻塞模式下,套接字发送字节后,我们要等待服务器返回数据,这期间我们暂停应用或阻塞应用直到套接字接收到服务器的返回数据、有错误发生或者请求超时。
- 在非阻塞模式下,套接字发送字节后,应用就可以执行其他任务,操作系统会通知我们接收到了服务器的返回数据,到那时我们再处理返回数据。
- 不同操作系统有不同的通知系统:
- FreeBSD and MacOS: kqueue
- Linux: epoll
- Windows: IOCP(I/O completion port)
- asyncio利用通知系统实现并发,GIL要求每时每刻有且只有一条线程在运行。
非阻塞并发执行流程图:
- 代码执行到I/O操作。
- 将I/O操作提交给事件通知系统,让他给我们追踪I/O操作。
- 完成上述切换后,线程运行python其他代码或者添加更多的I/O操作让通知系统追踪。
- I/O完成后,唤醒需要等待I/O结果的任务,然后继续运行出现在I/O后的python其他代码。
如何追踪等待I/O结果的任务
事件循环用于追踪等待I/O返回的任务。
事件循环工作方式:
- 建立队列放置事件或消息。
- 永远循环遍历队列。
- 非空队列时一次只处理一个事件或消息。
asyncio事件循环工作方式:
事件循环保存着一个队列的任务,初始的事件循环会创建一个空队列。
任务被封装到协程中。
协程走到I/O代码处时协程会被暂停执行。
事件循环运行其他封装了任务的且没有等待I/O操作的协程。
同时让操作系统监听I/O操作是否完成,完成后操作系统唤醒本任务的协程以便事件循环运行该协程。
如下伪代码为例:
1
2
3
4
5
6
7def make_request():
cpu_bound_setup()
io_bound_web_request()
cpu_bound_postprocess()
task_one = make_request()
task_two = make_request()
task_three = make_request()单个线程并发运行3个任务, 3个任务中的cpu代码任何时刻至多有一个运行在线程中,而3个I/O代码可以同时运行。
asyncio基础
背景:ayncio实现单线程并发
协程:协程就像常规的函数但是协程可以停止自己的执行,当碰到花时间的操作时;当费时的操作完成后协程又可以唤醒自身继续执行剩余代码。该协程等待时其他协程就会并发执行自身代码。
创建和停止协程
创建协程
async定义一个协程,await暂停协程运行当遇到耗时代码时(碰到awai关键字协程就停止继续运行await之后的代码)1
2async def my_coroutine()->None
print('Hello world!')
上述函数没有运行耗时代码,当该协程放入时间循环中时会立刻被执行,不会被停止执行。
对比协程函数和普通函数:
调用普通函数我们得到了期望的被执行的结果值,而调用协程时我们得到的是没有执行代码的协程对象。
当直接调用协程时,协程不会被执行(因为没有放入事件循环中);定义的协程需要在事件循环中被显式的运行。
- 简单的方法是可以使用asyncio.run创建一个新的事件循环分支并运行传入的协程。
- asyncio.run工作流程:
- 创建一个新的事件循环分支。
- 运行传入的协程直到运行完成返回结果。
- 主协程完成后asyncio.run清理还在运行的东西。
- 关闭事件循环。
- asyncio.run是我们创建的asyncio应用的主入口。
- asyncio.run只能执行一个协程,该协程需要再启动应用的其他协程,其他协程可以用并发工作模式。
暂停协程
asyncio的优点就是停止协程运行让事件循环运行其他任务。
- await关键字后跟要调用的协程,await后是要跟awaitable类型对象,而协程不一定是awaitable类型对象。
- await让跟在他后面的协程运行起来返回运行结果(等待的协程),而不像直接调用协程返回一个协程对象。
- await同时也会暂停包含它的协程运行,直到我们等待的协程运行完成和返回结果。
- 当我们等待的协程运行完毕,包含await的协程被唤醒继续执行。
看流程图asyncio的协程运行方式和正常顺序执行方式相同。
可以使用asyncio.sleep模拟网页请求、数据库查询等耗时代码。
上述对asyncio的使用也没有实现协程的并发执行,当等待hello_world()时add_one()没有运行,main()也没有运行。要实现这个想法需要引入tasks。
当前我们运行协程的工具只有await和asyncio.run(),这种方式只能写异步代码但是无法实现并发。
tasks
tasks是对协程的封装,它会安排协程运行在事件循环中。tasks封装的协程是非阻塞的,创建完tasks后一旦tasks运行起来协程会立即运行其他代码。而await是会阻塞协程运行的,包含它的协程得等await执行完成才能运行。
创建tasks
asyncio.create_task()
- create_task()会自动调度传入的协程运行,注:如果不强引创建的任务,python回收机制会清理弱引的任务。
- 输入:协程。
- 输出:task对象而不是协程对象,task对象之间可以并发执行。
- 得到task对象后再放到await表达式中,task对象就会等待,直到得到运行的返回结果。
- coroutine对象实现的是代码异步执行(阻塞式执行),而task对象不但可以异步执行而且是并发执行(非阻塞式执行)
await + task 对比 await + coroutine:
task被创建的类型不同于coroutine被创建的类型。
await task用于等待task运行结果,await coroutine是等待coroutine运行结果,两者都会阻塞包含await表达式的协程运行,区别是:
- create_task()就已经开始调度任务到事件循环中运行协程了,不需要await处发执行,而coroutine必须得在await coroutine处才能运行。
- 多个task之间是非阻塞并发执行以及task和await coroutine之间也是并发执行,await coroutine是阻塞式运行,所以是顺序执行。
RUN delay(3)代表delay函数中的print语句,他们是CPU型代码不会并发执行。
输出的结果
task运行时间过程怎么办
当发生网络变慢、服务器崩了无法请求时,我们不能无限制的等待这会导致应用被挂起。
取消任务
- done():判断task是否完成。
- cancel():取消任务,并抛出CancelledError异常,注:CancelledError只能在await语句中被抛出。
- 在并发位置(await语句)之前不会出发取消任务操作,只是在布置、安排、放置任务/协程点位。
设置超时
输出:
- asyncio.wait_for: 允许设置超时时间,不用再编写检测是否超时的代码,超时后自动取消任务。
- 输入:协程或任务对象,超时时间。
- 输出:可以使用await的协程。
- 当超时时wait_for抛出TimeoutError异常。
- cancelled():判断是否任务取消。
超时任务继续运行
某些情况可能即便超时我们也想让任务继续执行。
输出:
- asyncio.shield: 对任务的封装,阻止任务被取消执行。
- 输入:协程或任务。
- 输出:任务
task, coroutine, future, awaitable
future:是python对象,表示一个将来会得到其值但是现在还没得到其值的对象。
建立future对象时它不包含任何结果值(future状态是未完成)。
当以后计算出结果了再将结果设成future的值(future状态是已完成)。
- 之后可以从future里取值。
- Future()构造建立future对象,此时没有值。
- Future().done()判断future是否完成。
- Future().set_result()给future设置值。
- Future().set_exception()给future设置异常。
- Future().resutlt()获取值。
- future对象可以用于await表达,输出结果。
- 无引用的asyncio.create_task()对象会被python垃圾回收机制回收,会引发错误,要强引用task=asyncio.create_task()才不会被回收。
- value = await future会暂停main协程,等待future设置完值才能继续运行。
- main()和make_request()是同步非阻塞执行,set_future_value()同make_request()和main()是异步并发执行。
- future表示了一个一段时间获取不到值的对象。
- task认为是一个coroutine和future的结合体。创建task就是创建一个空值的future同时运行着coroutine,当coroutine完成就会产生结果或者运行错误,这两个结果就可以设置成future的值也就是task的值。
- future和coroutine都是可等待对象awaiable,都可以用于await表达式中。
测量协程运行时间
封装一个await表达的计时器,不用总写一堆同样的计时代码。
输出: