组团学

asyncio模块

阅读 (257181)

一、概述

  • asyncio模块

    是python3.4版本引入的标准库,直接内置了对异步IO的操作

  • 编程模式

    是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO

  • 说明

    到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能

  • 关键字的说明

    关键字 说明
    event_loop 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
    coroutine 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用
    task 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
    future 代表将来执行或没有执行的任务的结果,它和task上没有本质上的区别
    async/await python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口

二、asyncio基本使用

  • 定义一个协程

    import asyncio import time # 通过async关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中 async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束run") #得到一个协程对象 coroutine = run(2) # 创建一个消息循环 # 注意:真实是在asyncio模块中获取一个引用 loop = asyncio.get_event_loop() #将协程对象加入到消息循环 loop.run_until_complete(coroutine)
  • 创建一个任务

    import asyncio import time async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束run") coroutine = run(2) #创建任务 task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() # 协程对象加入到消息循环中,协程对象不能直接运行的,在注册消息循环时run_until_complete方法将加入的协程对象包装成一个任务,task对象时Future类的子类对象,保存协程运行后的状态,用于未来获取协程的结果 # loop.run_until_complete(coroutine) # 将任务加入到消息循环 loop.run_until_complete(task)
  • 绑定回调

    回调:不需要手动调用,触发某种条件才会调用

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络IO asyncio.sleep(5) data = "'%s'的数据"%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") # 创建一个任务对象 task = asyncio.ensure_future(coroutine) # 给任务添加回调,在任务结束后调用回调函数 task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------") while 1: time.sleep(2)

    注意:asyncio.sleep(5)会报RuntimeWarning

截屏2020011317.22.14.png

  • 阻塞和await

    async可以定义协程,使用await可以针对耗时操作进行挂起,就与生成器的yield一样,函数交出控制权。协程遇到await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络IO await asyncio.sleep(5) data = "'%s'的数据"%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------")

三、多任务

  • 同步

    同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒

    import time def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络IO time.sleep(2) data = "'%s'的数据"%(url) return data if __name__ == "__main__": t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: print(run(url)) t2 = time.time() print("总耗时:%.2f"%(t2-t1))
  • 异步

    同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s'的数据"%(url) return data def call_back(future): print("call_back:", future.result()) if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = [] t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) #同时添加4个异步任务 loop.run_until_complete(asyncio.gather(*tasks)) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

四、协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来

截屏2020011317_34_29.png

import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s'的数据"%(url) return data def call_back(future): print("call_back:", future.result()) async def main(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) # task.add_done_callback(call_back) tasks.append(task) # #1、可以没有回调函数 # dones, pendings = await asyncio.wait(tasks) # #处理数据,类似回调,建议使用回调 # for t in dones: # print("数据:%s"%(t.result())) # #2、可以没有回调函数 # results = await asyncio.gather(*tasks) # # 处理数据,类似回调,建议使用回调 # for result in results: # print("数据:%s"%(result)) # 3、 # return await asyncio.wait(tasks) # 4、 # return await asyncio.gather(*tasks) # 5、 # for t in asyncio.as_completed(tasks): # await t # 6、 for t in asyncio.as_completed(tasks): #可以没有回调 result = await t print("数据:%s"%(result)) if __name__ == "__main__": loop = asyncio.get_event_loop() t1 = time.time() #1、 # loop.run_until_complete(main()) #2、 # loop.run_until_complete(main()) # # 3、 # dones, pendings = loop.run_until_complete(main()) # #处理数据,类似回调,建议使用回调 # for t in dones: # print("数据:%s"%(t.result())) # 4、 # results = loop.run_until_complete(main()) # for result in results: # print("数据:%s"%(result)) # 5、 # loop.run_until_complete(main()) # 6、 loop.run_until_complete(main()) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

整理协程嵌套

import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s'的数据"%(url) return data def call_back(future): print("call_back:", future.result()) async def main(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) await asyncio.wait(tasks) if __name__ == "__main__": loop = asyncio.get_event_loop() t1 = time.time() loop.run_until_complete(main()) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

五、消息循环在另一个线程中启动

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block

import asyncio import threading import time def run(url): print("开始向'%s'要数据……"%(url)) time.sleep(2) data = "'%s'的数据"%(url) print("结束请求") return data def start_loop(loop): # 启动消息循环 asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": # 创建消息循环(类似死循环) # 注意:此时消息循环没有启动 loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() t1 = time.time() # 给消息循环添加任务 # loop.run_until_complete(create_tasks()) loop.call_soon_threadsafe(run, "百度") loop.call_soon_threadsafe(run, "腾讯") t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

六、asyncio终极使用

使用到协程嵌套与消息循环在另一个线程中启动相关联

import asyncio import threading async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s'的数据"%(url) print("结束请求") return data def call_back(future): print("call_back:", future.result()) async def create_tasks(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) await asyncio.wait(tasks) def start_loop(loop): # 启动消息循环 asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": # 创建消息循环(类似死循环) # 注意:此时消息循环没有启动 loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() #给消息循环添加任务 asyncio.run_coroutine_threadsafe(create_tasks(), loop) # asyncio.run_coroutine_threadsafe(run("百度"), loop) # asyncio.run_coroutine_threadsafe(run("腾讯"), loop) # asyncio.run_coroutine_threadsafe(run("阿里"), loop) # asyncio.run_coroutine_threadsafe(run("新浪"), loop)

七、获取网页信息

import asyncio import threading async def run(url): print("开始加载'%s'页面……" % (url)) # 发起链接,耗时IO connet = asyncio.open_connection(url, 80) reader, writer = await connet # 链接成功 # 发起请求,耗时IO header = "GET / HTTP/1.0\r\nHost: %s\r\n\r\n"%(url) writer.write(header.encode("utf-8")) await writer.drain() #接收数据 with open(url + ".html", "wb") as fp: while True: line = await reader.readline() if line == b"\r\n": break else: fp.write(line) fp.flush() async def create_tasks(): tasks = [] for url in ["www.baidu.com", "www.zutuanxue.com", "www.sina.com.cn"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) tasks.append(task) await asyncio.wait(tasks) def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() asyncio.run_coroutine_threadsafe(create_tasks(), loop)
需要 登录 才可以提问哦