异步不是”更快”,而是”更聪明地等待”。


目录

  1. 为什么需要异步 IO?
  2. 核心概念
  3. asyncio 基础
  4. Task 与并发
  5. 异步上下文管理器与迭代器
  6. asyncio 同步原语
  7. Queue:生产者消费者模式
  8. 异步网络 IO:aiohttp
  9. 异步文件 IO:aiofiles
  10. 异步数据库
  11. 在同步代码中调用异步
  12. 事件循环进阶
  13. 常见陷阱与最佳实践
  14. 性能对比实验

1. 为什么需要异步 IO?

三种并发模型对比

模型 原理 适合场景 缺点
多进程 多个 CPU 核心并行 CPU 密集型(计算、压缩) 内存占用大,进程切换开销高
多线程 操作系统调度线程 IO 密集型(但受 GIL 限制) GIL、竞态条件、死锁
异步 IO 单线程事件循环 IO 密集型(网络、文件) 不适合 CPU 密集型

核心问题:等待的浪费

同步模式(一个厨师只做一道菜):
[请求A 发出] → [等待 200ms] → [处理结果] → [请求B 发出] → [等待 200ms] → ...
总耗时:200ms × N

异步模式(一个厨师同时照看多道菜):
[请求A 发出] ↘
[请求B 发出]  → [等待中...] → [A 返回,处理] → [B 返回,处理]
[请求C 发出] ↗
总耗时:≈ 200ms(最长单个请求时间)

结论: 异步 IO 的本质是在 等待 IO 的时候去做其他事,而不是真正的并行计算。


2. 核心概念

协程(Coroutine)

async def 定义的函数,调用后返回一个协程对象,不会立即执行

async def say_hello():
    print("Hello")

# 调用不会执行!只是创建了协程对象
coro = say_hello()
print(type(coro))  # <class 'coroutine'>

# 必须通过事件循环运行
import asyncio
asyncio.run(say_hello())  # 这才会打印 Hello

事件循环(Event Loop)

异步程序的”调度中心”,负责:

  • 运行协程
  • 监听 IO 事件
  • 调度回调
┌─────────────────────────────────────┐
│              Event Loop              │
│                                     │
│  Task A ──► await IO ──► 挂起       │
│  Task B ◄── IO 完成 ◄── 唤醒       │
│  Task C ──► await IO ──► 挂起       │
│  Task A ◄── IO 完成 ◄── 唤醒       │
└─────────────────────────────────────┘

await 关键字

await 只能在 async def 函数内使用,作用是:

  1. 暂停当前协程,把控制权交回事件循环
  2. 等待被 await 的对象完成
  3. 完成后恢复执行,获取返回值
async def fetch():
    # 暂停在这里,事件循环去运行其他任务
    result = await some_async_operation()
    # IO 完成后从这里继续
    return result

可等待对象(Awaitable)

可以被 await 的对象有三类:

# 1. 协程(Coroutine)
async def coro(): ...
await coro()

# 2. Task(asyncio.create_task 创建)
task = asyncio.create_task(coro())
await task

# 3. Future(底层接口,通常不直接使用)
future = asyncio.get_event_loop().create_future()
await future

3. asyncio 基础

asyncio.run():程序入口

import asyncio

async def main():
    print("开始")
    await asyncio.sleep(1)  # 模拟 IO 等待
    print("结束")

# Python 3.7+ 标准入口
asyncio.run(main())

asyncio.sleep()

异步版 time.sleep(),等待期间不阻塞事件循环

import asyncio
import time

async def task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)  # ✅ 异步等待
    print(f"{name} 完成")

async def main():
    start = time.time()
    # 顺序执行(总耗时 = 1 + 2 = 3s)
    await task("A", 1)
    await task("B", 2)
    print(f"耗时:{time.time() - start:.1f}s")

asyncio.run(main())
# 耗时:3.0s

4. Task 与并发

asyncio.create_task():真正并发

import asyncio
import time

async def task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return f"{name} 的结果"

async def main():
    start = time.time()

    # 创建 Task,立即调度(但还未运行)
    task_a = asyncio.create_task(task("A", 1))
    task_b = asyncio.create_task(task("B", 2))
    task_c = asyncio.create_task(task("C", 1))

    # 等待所有 Task 完成
    result_a = await task_a
    result_b = await task_b
    result_c = await task_c

    print(f"耗时:{time.time() - start:.1f}s")
    # 耗时:2.0s(并发执行,总耗时 = 最长的单个任务)

asyncio.run(main())

asyncio.gather():批量并发

最常用的并发方式,等待所有协程完成并收集结果:

async def fetch(url):
    await asyncio.sleep(1)  # 模拟网络请求
    return f"来自 {url} 的数据"

async def main():
    urls = ["url1", "url2", "url3", "url4", "url5"]

    # 并发执行所有请求
    results = await asyncio.gather(
        *[fetch(url) for url in urls]
    )

    for r in results:
        print(r)

asyncio.run(main())
# 5 个请求同时发出,总耗时约 1s

gather 的异常处理:

# 默认:任何一个出错会立刻抛出异常
results = await asyncio.gather(coro1(), coro2(), coro3())

# return_exceptions=True:异常作为结果返回,不中断其他任务
results = await asyncio.gather(
    coro1(), coro2(), coro3(),
    return_exceptions=True
)
for r in results:
    if isinstance(r, Exception):
        print(f"出错:{r}")
    else:
        print(f"成功:{r}")

asyncio.wait():更精细的控制

import asyncio

async def task(n):
    await asyncio.sleep(n)
    return n

async def main():
    tasks = [asyncio.create_task(task(i)) for i in [3, 1, 2]]

    # 等待第一个完成就继续
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    for t in done:
        print(f"完成:{t.result()}")

    # 取消剩余任务
    for t in pending:
        t.cancel()

asyncio.run(main())
return_when 参数 含义
ALL_COMPLETED(默认) 全部完成才返回
FIRST_COMPLETED 第一个完成就返回
FIRST_EXCEPTION 第一个出错就返回

asyncio.timeout():超时控制(Python 3.11+)

async def main():
    try:
        async with asyncio.timeout(5.0):
            await long_running_task()
    except TimeoutError:
        print("超时了!")

# Python 3.10 及以下用 wait_for
async def main():
    try:
        result = await asyncio.wait_for(
            long_running_task(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("超时了!")

5. 异步上下文管理器与迭代器

异步上下文管理器

实现 __aenter____aexit__

class AsyncDBConnection:
    async def __aenter__(self):
        print("连接数据库")
        self.conn = await create_connection()
        return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        print("关闭连接")
        await self.conn.close()

async def main():
    async with AsyncDBConnection() as conn:
        await conn.execute("SELECT 1")

异步迭代器

实现 __aiter____anext__

class AsyncRange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 模拟异步操作
        val = self.i
        self.i += 1
        return val

async def main():
    async for num in AsyncRange(5):
        print(num)

异步生成器

更简洁的写法:

async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for num in async_range(5):
        print(num)

    # 也可以用列表推导
    results = [i async for i in async_range(5)]

6. asyncio 同步原语

多个协程共享资源时需要同步控制(类比多线程的锁)。

Lock:互斥锁

import asyncio

lock = asyncio.Lock()
shared_resource = 0

async def worker(name):
    global shared_resource
    async with lock:  # 获取锁,其他协程等待
        val = shared_resource
        await asyncio.sleep(0.1)  # 模拟操作
        shared_resource = val + 1
        print(f"{name}: {shared_resource}")

async def main():
    await asyncio.gather(*[worker(f"W{i}") for i in range(5)])

asyncio.run(main())

Event:事件通知

async def waiter(event, name):
    print(f"{name} 等待事件...")
    await event.wait()
    print(f"{name} 收到事件,继续执行")

async def setter(event):
    await asyncio.sleep(2)
    print("触发事件!")
    event.set()

async def main():
    event = asyncio.Event()
    await asyncio.gather(
        waiter(event, "W1"),
        waiter(event, "W2"),
        setter(event)
    )

Semaphore:限制并发数

最常用!限制同时运行的协程数量:

async def fetch(session, url, semaphore):
    async with semaphore:  # 最多 10 个同时执行
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    semaphore = asyncio.Semaphore(10)  # 最大并发 10
    urls = [f"https://example.com/{i}" for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

7. Queue:生产者消费者模式

import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.5)  # 模拟生产耗时
        await queue.put(i)
        print(f"生产:{i},队列大小:{queue.qsize()}")
    await queue.put(None)  # 结束信号

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(1)  # 模拟消费耗时
        print(f"[{name}] 消费:{item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)  # 最大容量 5

    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "消费者A"),
        consumer(queue, "消费者B"),
    )

asyncio.run(main())

8. 异步网络 IO:aiohttp

pip install aiohttp

基础 GET 请求

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        data = await fetch(session, "https://api.github.com/users/octocat")
        print(data["name"])

asyncio.run(main())

并发抓取多个 URL

import asyncio
import aiohttp
import time

async def fetch(session, url):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            return {"url": url, "status": resp.status, "data": await resp.text()}
    except Exception as e:
        return {"url": url, "error": str(e)}

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
    ]

    start = time.time()
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[fetch(session, url) for url in urls])

    print(f"耗时:{time.time() - start:.1f}s")  # ≈ 2s,而非 4s
    for r in results:
        print(r["url"], r.get("status", r.get("error")))

asyncio.run(main())

POST 请求

async def post_data(session, url, payload):
    async with session.post(url, json=payload) as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        result = await post_data(
            session,
            "https://httpbin.org/post",
            {"key": "value", "name": "asyncio"}
        )
        print(result)

9. 异步文件 IO:aiofiles

标准的 open() 是同步的,会阻塞事件循环。使用 aiofiles

pip install aiofiles
import asyncio
import aiofiles

async def read_file(path):
    async with aiofiles.open(path, "r", encoding="utf-8") as f:
        return await f.read()

async def write_file(path, content):
    async with aiofiles.open(path, "w", encoding="utf-8") as f:
        await f.write(content)

async def main():
    # 并发读取多个文件
    contents = await asyncio.gather(
        read_file("file1.txt"),
        read_file("file2.txt"),
        read_file("file3.txt"),
    )
    for content in contents:
        print(content[:100])

asyncio.run(main())

10. 异步数据库

asyncpg(PostgreSQL)

pip install asyncpg
import asyncio
import asyncpg

async def main():
    # 创建连接
    conn = await asyncpg.connect(
        host="localhost", database="mydb",
        user="user", password="password"
    )

    # 查询
    rows = await conn.fetch("SELECT id, name FROM users WHERE active = $1", True)
    for row in rows:
        print(row["id"], row["name"])

    # 插入
    await conn.execute(
        "INSERT INTO users(name, email) VALUES($1, $2)",
        "张三", "zhangsan@example.com"
    )

    await conn.close()

asyncio.run(main())

连接池(生产环境必用)

async def main():
    pool = await asyncpg.create_pool(
        host="localhost", database="mydb",
        user="user", password="password",
        min_size=5, max_size=20
    )

    async def query(user_id):
        async with pool.acquire() as conn:
            return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)

    # 并发查询
    results = await asyncio.gather(*[query(i) for i in range(1, 11)])

    await pool.close()

aiosqlite(SQLite)

pip install aiosqlite
import aiosqlite

async def main():
    async with aiosqlite.connect("test.db") as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT
            )
        """)
        await db.execute("INSERT INTO users(name) VALUES(?)", ("张三",))
        await db.commit()

        async with db.execute("SELECT * FROM users") as cursor:
            async for row in cursor:
                print(row)

11. 在同步代码中调用异步

有时需要在同步代码(如普通函数、Django 视图)中调用异步函数:

asyncio.run()(最简单)

def sync_function():
    result = asyncio.run(async_function())
    return result

注意:asyncio.run() 会创建新的事件循环,不能在已有事件循环中调用

loop.run_until_complete()

def sync_function():
    loop = asyncio.new_event_loop()
    try:
        result = loop.run_until_complete(async_function())
    finally:
        loop.close()
    return result

在异步中运行同步阻塞函数

CPU 密集或遗留的同步阻塞代码,用 run_in_executor 放到线程池:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    import time
    time.sleep(2)      # 同步阻塞操作
    return "done"

def cpu_heavy():
    return sum(i * i for i in range(10**7))

async def main():
    loop = asyncio.get_event_loop()

    # 放到线程池(适合 IO 阻塞)
    result = await loop.run_in_executor(None, blocking_io)

    # 放到进程池(适合 CPU 密集)
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy)

12. 事件循环进阶

获取当前事件循环

async def main():
    loop = asyncio.get_event_loop()       # 获取当前循环
    loop = asyncio.get_running_loop()     # 只在协程中使用(推荐)

调度回调

async def main():
    loop = asyncio.get_running_loop()

    # 下一次迭代时执行(非协程,是普通函数)
    loop.call_soon(print, "马上执行")

    # 延迟执行
    loop.call_later(2.0, print, "2秒后执行")

    # 指定时间戳执行
    loop.call_at(loop.time() + 5.0, print, "5秒后执行")

    await asyncio.sleep(3)

uvloop:高性能事件循环

用 C 实现的事件循环,性能比默认提升 2~4 倍:

pip install uvloop
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())

13. 常见陷阱与最佳实践

❌ 陷阱 1:在协程中使用同步阻塞调用

import time
import requests  # 同步库

# ❌ 错误:阻塞整个事件循环
async def bad_fetch(url):
    time.sleep(1)              # 阻塞!
    return requests.get(url)   # 阻塞!

# ✅ 正确:使用异步库
async def good_fetch(url):
    await asyncio.sleep(1)
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            return await r.text()

❌ 陷阱 2:忘记 await

# ❌ 错误:coro() 只是创建了协程对象,根本没有运行
async def main():
    result = some_async_func()   # 漏掉了 await!
    print(result)                # 打印的是协程对象

# ✅ 正确
async def main():
    result = await some_async_func()
    print(result)

Python 会发出 RuntimeWarning: coroutine 'xxx' was never awaited 警告。

❌ 陷阱 3:顺序 await 而非并发

# ❌ 错误:顺序执行,没有并发
async def main():
    r1 = await fetch("url1")  # 等这个完成
    r2 = await fetch("url2")  # 再执行这个
    r3 = await fetch("url3")  # 总耗时 = 三者之和

# ✅ 正确:真正并发
async def main():
    r1, r2, r3 = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3"),
    )

❌ 陷阱 4:在已有事件循环中调用 asyncio.run()

# ❌ 在 Jupyter Notebook 或已有事件循环的环境中会报错
asyncio.run(main())

# ✅ Jupyter 中直接 await
await main()

# 或使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main())

✅ 最佳实践总结

# 1. Session 复用,不要每次请求都创建
async with aiohttp.ClientSession() as session:
    results = await asyncio.gather(*[fetch(session, url) for url in urls])

# 2. 用 Semaphore 控制并发量,避免打垮服务器
sem = asyncio.Semaphore(20)
async def safe_fetch(url):
    async with sem:
        return await fetch(url)

# 3. 总是处理异常,避免 Task 静默失败
results = await asyncio.gather(*tasks, return_exceptions=True)

# 4. 设置超时
await asyncio.wait_for(coro(), timeout=30)

# 5. 取消不需要的 Task
task = asyncio.create_task(coro())
task.cancel()
try:
    await task
except asyncio.CancelledError:
    pass  # 正常取消

14. 性能对比实验

实验:抓取 50 个 URL

import asyncio
import aiohttp
import requests
import time

URLS = [f"https://httpbin.org/delay/1" for _ in range(10)]

# ① 同步版本
def sync_main():
    start = time.time()
    results = [requests.get(url).status_code for url in URLS]
    print(f"同步耗时:{time.time() - start:.1f}s")

# ② 异步版本
async def async_main():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        async def fetch(url):
            async with session.get(url) as r:
                return r.status
        results = await asyncio.gather(*[fetch(url) for url in URLS])
    print(f"异步耗时:{time.time() - start:.1f}s")

# ③ 限制并发的异步版本
async def async_limited():
    sem = asyncio.Semaphore(5)
    start = time.time()
    async with aiohttp.ClientSession() as session:
        async def fetch(url):
            async with sem:
                async with session.get(url) as r:
                    return r.status
        results = await asyncio.gather(*[fetch(url) for url in URLS])
    print(f"限流异步耗时:{time.time() - start:.1f}s")

sync_main()               # 同步耗时:10.x s
asyncio.run(async_main()) # 异步耗时:1.x s
asyncio.run(async_limited()) # 限流异步:2.x s
方式 耗时(10个各延迟1s的请求)
同步 ≈ 10s
异步(无限制) ≈ 1s
异步(并发限 5) ≈ 2s

附录:速查表

# 运行协程
asyncio.run(coro())

# 并发等待,收集结果
await asyncio.gather(coro1(), coro2(), coro3())

# 创建 Task(立即调度)
task = asyncio.create_task(coro())

# 超时
await asyncio.wait_for(coro(), timeout=5)

# 等待第一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

# 限流
sem = asyncio.Semaphore(10)
async with sem: ...

# 同步转异步(线程池)
await loop.run_in_executor(None, sync_func, arg1, arg2)

# 获取事件循环
loop = asyncio.get_running_loop()

# 异步 sleep
await asyncio.sleep(1)