Python 异步 IO 学习笔记
异步不是”更快”,而是”更聪明地等待”。
目录
- 为什么需要异步 IO?
- 核心概念
- asyncio 基础
- Task 与并发
- 异步上下文管理器与迭代器
- asyncio 同步原语
- Queue:生产者消费者模式
- 异步网络 IO:aiohttp
- 异步文件 IO:aiofiles
- 异步数据库
- 在同步代码中调用异步
- 事件循环进阶
- 常见陷阱与最佳实践
- 性能对比实验
1. 为什么需要异步 IO?
三种并发模型对比
| 模型 | 原理 | 适合场景 | 缺点 |
|---|---|---|---|
| 多进程 | 多个 CPU 核心并行 | CPU 密集型(计算、压缩) | 内存占用大,进程切换开销高 |
| 多线程 | 操作系统调度线程 | IO 密集型(但受 GIL 限制) | GIL、竞态条件、死锁 |
| 异步 IO | 单线程事件循环 | IO 密集型(网络、文件) | 不适合 CPU 密集型 |
核心问题:等待的浪费
同步模式(一个厨师只做一道菜):
[请求A 发出] → [等待 200ms] → [处理结果] → [请求B 发出] → [等待 200ms] → ...
总耗时:200ms × N
异步模式(一个厨师同时照看多道菜):
[请求A 发出] ↘
[请求B 发出] → [等待中...] → [A 返回,处理] → [B 返回,处理]
[请求C 发出] ↗
总耗时:≈ 200ms(最长单个请求时间)
结论: 异步 IO 的本质是在 等待 IO 的时候去做其他事,而不是真正的并行计算。
2. 核心概念
协程(Coroutine)
用 async def 定义的函数,调用后返回一个协程对象,不会立即执行:
async def say_hello():
print("Hello")
# 调用不会执行!只是创建了协程对象
coro = say_hello()
print(type(coro)) # <class 'coroutine'>
# 必须通过事件循环运行
import asyncio
asyncio.run(say_hello()) # 这才会打印 Hello
事件循环(Event Loop)
异步程序的”调度中心”,负责:
- 运行协程
- 监听 IO 事件
- 调度回调
┌─────────────────────────────────────┐
│ Event Loop │
│ │
│ Task A ──► await IO ──► 挂起 │
│ Task B ◄── IO 完成 ◄── 唤醒 │
│ Task C ──► await IO ──► 挂起 │
│ Task A ◄── IO 完成 ◄── 唤醒 │
└─────────────────────────────────────┘
await 关键字
await 只能在 async def 函数内使用,作用是:
- 暂停当前协程,把控制权交回事件循环
- 等待被 await 的对象完成
- 完成后恢复执行,获取返回值
async def fetch():
# 暂停在这里,事件循环去运行其他任务
result = await some_async_operation()
# IO 完成后从这里继续
return result
可等待对象(Awaitable)
可以被 await 的对象有三类:
# 1. 协程(Coroutine)
async def coro(): ...
await coro()
# 2. Task(asyncio.create_task 创建)
task = asyncio.create_task(coro())
await task
# 3. Future(底层接口,通常不直接使用)
future = asyncio.get_event_loop().create_future()
await future
3. asyncio 基础
asyncio.run():程序入口
import asyncio
async def main():
print("开始")
await asyncio.sleep(1) # 模拟 IO 等待
print("结束")
# Python 3.7+ 标准入口
asyncio.run(main())
asyncio.sleep()
异步版 time.sleep(),等待期间不阻塞事件循环:
import asyncio
import time
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # ✅ 异步等待
print(f"{name} 完成")
async def main():
start = time.time()
# 顺序执行(总耗时 = 1 + 2 = 3s)
await task("A", 1)
await task("B", 2)
print(f"耗时:{time.time() - start:.1f}s")
asyncio.run(main())
# 耗时:3.0s
4. Task 与并发
asyncio.create_task():真正并发
import asyncio
import time
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 的结果"
async def main():
start = time.time()
# 创建 Task,立即调度(但还未运行)
task_a = asyncio.create_task(task("A", 1))
task_b = asyncio.create_task(task("B", 2))
task_c = asyncio.create_task(task("C", 1))
# 等待所有 Task 完成
result_a = await task_a
result_b = await task_b
result_c = await task_c
print(f"耗时:{time.time() - start:.1f}s")
# 耗时:2.0s(并发执行,总耗时 = 最长的单个任务)
asyncio.run(main())
asyncio.gather():批量并发
最常用的并发方式,等待所有协程完成并收集结果:
async def fetch(url):
await asyncio.sleep(1) # 模拟网络请求
return f"来自 {url} 的数据"
async def main():
urls = ["url1", "url2", "url3", "url4", "url5"]
# 并发执行所有请求
results = await asyncio.gather(
*[fetch(url) for url in urls]
)
for r in results:
print(r)
asyncio.run(main())
# 5 个请求同时发出,总耗时约 1s
gather 的异常处理:
# 默认:任何一个出错会立刻抛出异常
results = await asyncio.gather(coro1(), coro2(), coro3())
# return_exceptions=True:异常作为结果返回,不中断其他任务
results = await asyncio.gather(
coro1(), coro2(), coro3(),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"出错:{r}")
else:
print(f"成功:{r}")
asyncio.wait():更精细的控制
import asyncio
async def task(n):
await asyncio.sleep(n)
return n
async def main():
tasks = [asyncio.create_task(task(i)) for i in [3, 1, 2]]
# 等待第一个完成就继续
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(f"完成:{t.result()}")
# 取消剩余任务
for t in pending:
t.cancel()
asyncio.run(main())
return_when 参数 |
含义 |
|---|---|
ALL_COMPLETED(默认) |
全部完成才返回 |
FIRST_COMPLETED |
第一个完成就返回 |
FIRST_EXCEPTION |
第一个出错就返回 |
asyncio.timeout():超时控制(Python 3.11+)
async def main():
try:
async with asyncio.timeout(5.0):
await long_running_task()
except TimeoutError:
print("超时了!")
# Python 3.10 及以下用 wait_for
async def main():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.TimeoutError:
print("超时了!")
5. 异步上下文管理器与迭代器
异步上下文管理器
实现 __aenter__ 和 __aexit__:
class AsyncDBConnection:
async def __aenter__(self):
print("连接数据库")
self.conn = await create_connection()
return self.conn
async def __aexit__(self, exc_type, exc, tb):
print("关闭连接")
await self.conn.close()
async def main():
async with AsyncDBConnection() as conn:
await conn.execute("SELECT 1")
异步迭代器
实现 __aiter__ 和 __anext__:
class AsyncRange:
def __init__(self, n):
self.n = n
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.i >= self.n:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
val = self.i
self.i += 1
return val
async def main():
async for num in AsyncRange(5):
print(num)
异步生成器
更简洁的写法:
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(5):
print(num)
# 也可以用列表推导
results = [i async for i in async_range(5)]
6. asyncio 同步原语
多个协程共享资源时需要同步控制(类比多线程的锁)。
Lock:互斥锁
import asyncio
lock = asyncio.Lock()
shared_resource = 0
async def worker(name):
global shared_resource
async with lock: # 获取锁,其他协程等待
val = shared_resource
await asyncio.sleep(0.1) # 模拟操作
shared_resource = val + 1
print(f"{name}: {shared_resource}")
async def main():
await asyncio.gather(*[worker(f"W{i}") for i in range(5)])
asyncio.run(main())
Event:事件通知
async def waiter(event, name):
print(f"{name} 等待事件...")
await event.wait()
print(f"{name} 收到事件,继续执行")
async def setter(event):
await asyncio.sleep(2)
print("触发事件!")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "W1"),
waiter(event, "W2"),
setter(event)
)
Semaphore:限制并发数
最常用!限制同时运行的协程数量:
async def fetch(session, url, semaphore):
async with semaphore: # 最多 10 个同时执行
async with session.get(url) as resp:
return await resp.text()
async def main():
semaphore = asyncio.Semaphore(10) # 最大并发 10
urls = [f"https://example.com/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
7. Queue:生产者消费者模式
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.5) # 模拟生产耗时
await queue.put(i)
print(f"生产:{i},队列大小:{queue.qsize()}")
await queue.put(None) # 结束信号
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(1) # 模拟消费耗时
print(f"[{name}] 消费:{item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5) # 最大容量 5
await asyncio.gather(
producer(queue, 10),
consumer(queue, "消费者A"),
consumer(queue, "消费者B"),
)
asyncio.run(main())
8. 异步网络 IO:aiohttp
pip install aiohttp
基础 GET 请求
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
data = await fetch(session, "https://api.github.com/users/octocat")
print(data["name"])
asyncio.run(main())
并发抓取多个 URL
import asyncio
import aiohttp
import time
async def fetch(session, url):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return {"url": url, "status": resp.status, "data": await resp.text()}
except Exception as e:
return {"url": url, "error": str(e)}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
start = time.time()
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch(session, url) for url in urls])
print(f"耗时:{time.time() - start:.1f}s") # ≈ 2s,而非 4s
for r in results:
print(r["url"], r.get("status", r.get("error")))
asyncio.run(main())
POST 请求
async def post_data(session, url, payload):
async with session.post(url, json=payload) as resp:
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
result = await post_data(
session,
"https://httpbin.org/post",
{"key": "value", "name": "asyncio"}
)
print(result)
9. 异步文件 IO:aiofiles
标准的 open() 是同步的,会阻塞事件循环。使用 aiofiles:
pip install aiofiles
import asyncio
import aiofiles
async def read_file(path):
async with aiofiles.open(path, "r", encoding="utf-8") as f:
return await f.read()
async def write_file(path, content):
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(content)
async def main():
# 并发读取多个文件
contents = await asyncio.gather(
read_file("file1.txt"),
read_file("file2.txt"),
read_file("file3.txt"),
)
for content in contents:
print(content[:100])
asyncio.run(main())
10. 异步数据库
asyncpg(PostgreSQL)
pip install asyncpg
import asyncio
import asyncpg
async def main():
# 创建连接
conn = await asyncpg.connect(
host="localhost", database="mydb",
user="user", password="password"
)
# 查询
rows = await conn.fetch("SELECT id, name FROM users WHERE active = $1", True)
for row in rows:
print(row["id"], row["name"])
# 插入
await conn.execute(
"INSERT INTO users(name, email) VALUES($1, $2)",
"张三", "zhangsan@example.com"
)
await conn.close()
asyncio.run(main())
连接池(生产环境必用)
async def main():
pool = await asyncpg.create_pool(
host="localhost", database="mydb",
user="user", password="password",
min_size=5, max_size=20
)
async def query(user_id):
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# 并发查询
results = await asyncio.gather(*[query(i) for i in range(1, 11)])
await pool.close()
aiosqlite(SQLite)
pip install aiosqlite
import aiosqlite
async def main():
async with aiosqlite.connect("test.db") as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT
)
""")
await db.execute("INSERT INTO users(name) VALUES(?)", ("张三",))
await db.commit()
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(row)
11. 在同步代码中调用异步
有时需要在同步代码(如普通函数、Django 视图)中调用异步函数:
asyncio.run()(最简单)
def sync_function():
result = asyncio.run(async_function())
return result
注意:
asyncio.run()会创建新的事件循环,不能在已有事件循环中调用。
loop.run_until_complete()
def sync_function():
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(async_function())
finally:
loop.close()
return result
在异步中运行同步阻塞函数
CPU 密集或遗留的同步阻塞代码,用 run_in_executor 放到线程池:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
import time
time.sleep(2) # 同步阻塞操作
return "done"
def cpu_heavy():
return sum(i * i for i in range(10**7))
async def main():
loop = asyncio.get_event_loop()
# 放到线程池(适合 IO 阻塞)
result = await loop.run_in_executor(None, blocking_io)
# 放到进程池(适合 CPU 密集)
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy)
12. 事件循环进阶
获取当前事件循环
async def main():
loop = asyncio.get_event_loop() # 获取当前循环
loop = asyncio.get_running_loop() # 只在协程中使用(推荐)
调度回调
async def main():
loop = asyncio.get_running_loop()
# 下一次迭代时执行(非协程,是普通函数)
loop.call_soon(print, "马上执行")
# 延迟执行
loop.call_later(2.0, print, "2秒后执行")
# 指定时间戳执行
loop.call_at(loop.time() + 5.0, print, "5秒后执行")
await asyncio.sleep(3)
uvloop:高性能事件循环
用 C 实现的事件循环,性能比默认提升 2~4 倍:
pip install uvloop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())
13. 常见陷阱与最佳实践
❌ 陷阱 1:在协程中使用同步阻塞调用
import time
import requests # 同步库
# ❌ 错误:阻塞整个事件循环
async def bad_fetch(url):
time.sleep(1) # 阻塞!
return requests.get(url) # 阻塞!
# ✅ 正确:使用异步库
async def good_fetch(url):
await asyncio.sleep(1)
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
return await r.text()
❌ 陷阱 2:忘记 await
# ❌ 错误:coro() 只是创建了协程对象,根本没有运行
async def main():
result = some_async_func() # 漏掉了 await!
print(result) # 打印的是协程对象
# ✅ 正确
async def main():
result = await some_async_func()
print(result)
Python 会发出 RuntimeWarning: coroutine 'xxx' was never awaited 警告。
❌ 陷阱 3:顺序 await 而非并发
# ❌ 错误:顺序执行,没有并发
async def main():
r1 = await fetch("url1") # 等这个完成
r2 = await fetch("url2") # 再执行这个
r3 = await fetch("url3") # 总耗时 = 三者之和
# ✅ 正确:真正并发
async def main():
r1, r2, r3 = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
)
❌ 陷阱 4:在已有事件循环中调用 asyncio.run()
# ❌ 在 Jupyter Notebook 或已有事件循环的环境中会报错
asyncio.run(main())
# ✅ Jupyter 中直接 await
await main()
# 或使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main())
✅ 最佳实践总结
# 1. Session 复用,不要每次请求都创建
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch(session, url) for url in urls])
# 2. 用 Semaphore 控制并发量,避免打垮服务器
sem = asyncio.Semaphore(20)
async def safe_fetch(url):
async with sem:
return await fetch(url)
# 3. 总是处理异常,避免 Task 静默失败
results = await asyncio.gather(*tasks, return_exceptions=True)
# 4. 设置超时
await asyncio.wait_for(coro(), timeout=30)
# 5. 取消不需要的 Task
task = asyncio.create_task(coro())
task.cancel()
try:
await task
except asyncio.CancelledError:
pass # 正常取消
14. 性能对比实验
实验:抓取 50 个 URL
import asyncio
import aiohttp
import requests
import time
URLS = [f"https://httpbin.org/delay/1" for _ in range(10)]
# ① 同步版本
def sync_main():
start = time.time()
results = [requests.get(url).status_code for url in URLS]
print(f"同步耗时:{time.time() - start:.1f}s")
# ② 异步版本
async def async_main():
start = time.time()
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as r:
return r.status
results = await asyncio.gather(*[fetch(url) for url in URLS])
print(f"异步耗时:{time.time() - start:.1f}s")
# ③ 限制并发的异步版本
async def async_limited():
sem = asyncio.Semaphore(5)
start = time.time()
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with sem:
async with session.get(url) as r:
return r.status
results = await asyncio.gather(*[fetch(url) for url in URLS])
print(f"限流异步耗时:{time.time() - start:.1f}s")
sync_main() # 同步耗时:10.x s
asyncio.run(async_main()) # 异步耗时:1.x s
asyncio.run(async_limited()) # 限流异步:2.x s
| 方式 | 耗时(10个各延迟1s的请求) |
|---|---|
| 同步 | ≈ 10s |
| 异步(无限制) | ≈ 1s |
| 异步(并发限 5) | ≈ 2s |
附录:速查表
# 运行协程
asyncio.run(coro())
# 并发等待,收集结果
await asyncio.gather(coro1(), coro2(), coro3())
# 创建 Task(立即调度)
task = asyncio.create_task(coro())
# 超时
await asyncio.wait_for(coro(), timeout=5)
# 等待第一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# 限流
sem = asyncio.Semaphore(10)
async with sem: ...
# 同步转异步(线程池)
await loop.run_in_executor(None, sync_func, arg1, arg2)
# 获取事件循环
loop = asyncio.get_running_loop()
# 异步 sleep
await asyncio.sleep(1)