Python 如何同时处理很多事:进程、线程、协程

作者:袖梨 2026-06-09

从全局角度理解“程序如何同时做很多事”。并发与并行、同步与异步、进程、线程、协程、GIL、锁、队列、管道、进程池、线程池这些知识点,会放到各自真正适合的位置讲。

你可以先记住一句话:

进程解决“资源隔离”和“多核计算”。
线程解决“同一个进程里并发等待 I/O”。
协程解决“用更轻量的方式组织大量 I/O 等待”。

一、先建立全局地图

img_6a275c233dd9230.webp

当你执行:

python main.py

本质上是启动了一个 Python 解释器进程。这个进程里至少有一个主线程,主线程从上到下执行你的 Python 代码。

看这张图时,重点抓住三层:

  • 进程由操作系统创建和管理。
  • 线程也由操作系统调度。
  • 协程通常由程序里的事件循环调度。

所以不能简单说“协程就是更小的线程”。它们的管理者不一样。

进程更像一个资源容器,里面有自己的内存、文件句柄、网络连接和环境变量。线程是这个容器里的执行单位。协程通常运行在线程内部,由事件循环决定什么时候暂停、什么时候恢复。

img_6a275c233dd9831.webp

并发和并行

并发不等于并行。

单核 CPU 可以并发,因为它能快速切换任务;但单核 CPU 不能真正并行执行多个任务。

多核 CPU 可以并行,但你的程序是否能利用多核,还取决于你使用的是进程、线程、协程,以及语言运行时的限制。

同步和异步

同步 / 异步描述的不是 CPU 有几个核心,而是任务之间怎么等待。

这四个词要分开:

并发 / 并行:任务如何被 CPU 执行。
同步 / 异步:任务之间如何组织等待关系。

这里要特别注意:CPU 核心数和执行速度,不会改变任务之间的逻辑依赖关系。如果任务 B 必须等任务 A 的结果,那么 CPU 再多,也不能让 B 在 A 没有结果时正确完成。

二、什么时候该用什么

img_6a275c233dd9b32.webp

先学会判断任务类型。你要问的第一件事不是“用哪个库”,而是“这个任务主要在算、在等,还是在传数据”。

如果任务主要在算,比如图片处理、视频转码、压缩、加密、大量数学计算,优先考虑多进程或进程池。因为这类任务真正消耗 CPU,需要尽量利用多核。

如果任务主要在等,比如等网络接口、等数据库、等文件读写,优先考虑线程池或协程。因为等待期间 CPU 没有真正忙起来,关键是不要让当前执行流傻等。

如果任务之间要传数据,比如一个任务生产订单,另一个任务消费订单,就要考虑 Queue、Pipe、Redis、消息队列这类通信工具。

更具体一点:

场景更常见选择原因
CPU 密集型多进程 / ProcessPoolExecutor多个进程可以更好利用多核
同步阻塞 I/O多线程 / ThreadPoolExecutor等待 I/O 时可以切换到其他线程
异步 I/O协程 / asyncio单线程内用事件循环管理大量等待任务
任务需要隔离多进程一个进程崩溃,不一定影响其他进程
任务需要共享内存多线程同一进程内线程共享内存,但要注意锁
后台任务要可靠重试Celery / RQ / 消息队列比本机线程/进程更适合生产环境任务系统

用一个更贴近底层原因的例子理解:

假设有 4 个任务,分别要做 4 道很难的数学题。

如果只安排 1 个人做,他要一题一题算:

人 A
  -> 算第 1 题
  -> 算第 2 题
  -> 算第 3 题
  -> 算第 4 题

这类任务的问题是:人一直在动脑,没有空闲时间。对应到程序里,就是 CPU 一直在计算。

如果你把这 4 道题交给 4 个线程,在 Python 里不一定能同时变快。因为同一个 Python 进程里,多个线程执行纯 Python 代码时,会受到 GIL 影响,很多时候不能真正同时跑多段 Python 计算。

更像这样:

同一个房间里有 4 个人
  -> 但只有 1 支笔
  -> 谁拿到笔谁算
  -> 其他人只能等这支笔

这支“笔”可以粗略理解成 GIL。线程多了,但真正执行 Python 计算的机会仍然被限制住了。所以 CPU 密集型任务更常见的选择是多进程:

4 个独立房间
  -> 每个房间都有自己的笔
  -> 每个人可以同时算自己的题

对应到程序里:

多个进程
  -> 各自有独立的 Python 解释器
  -> 各自有自己的 GIL
  -> 更容易利用多个 CPU 核心同时计算

这就是“主要在算,优先考虑多进程”的原因。

再看“主要在等”的任务。

假设有 4 个任务,分别是等 4 个接口返回结果:

任务 1:请求接口 A,等 3 秒
任务 2:请求接口 B,等 2 秒
任务 3:请求接口 C,等 5 秒
任务 4:请求接口 D,等 1 秒

这时人不是一直在动脑,大部分时间是在等别人回复。对应到程序里,就是线程发出网络请求后,大部分时间在等待 I/O。

如果只用一个线程,它会这样傻等:

线程 1
  -> 请求接口 A,等 3 秒
  -> 请求接口 B,等 2 秒
  -> 请求接口 C,等 5 秒
  -> 请求接口 D,等 1 秒

总时间大概就是 3 + 2 + 5 + 1 秒。

如果用多个线程,就像几个人分别去窗口排队:

线程 1 -> 等接口 A
线程 2 -> 等接口 B
线程 3 -> 等接口 C
线程 4 -> 等接口 D

这些线程大部分时间都没有占着 CPU 算东西,只是在等网络、数据库、磁盘返回结果。一个线程在等 I/O 时,操作系统可以切换到另一个线程继续执行。这样总时间更接近最慢的那个等待时间,而不是所有等待时间相加。

所以“等”适合线程,不是因为线程计算更快,而是因为:

等待 I/O 时,CPU 是空的
  -> 当前线程可以先让出去
  -> 其他线程可以继续推进别的任务

因此可以简单记:

算:
  CPU 一直被占着
  -> 多线程很难绕开 GIL
  -> 多进程更容易用上多核等:
  CPU 大部分时间没事做
  -> 线程等待 I/O 时可以切换
  -> 多线程能减少整体等待时间

理解了吗?

三、进程

进程是操作系统分配资源的基本单位。

一个正在运行的程序,背后通常对应一个或多个进程。比如浏览器是多进程架构,打开多个标签页时,浏览器可能创建多个渲染进程。Python 脚本运行时,也会有一个 Python 解释器进程。

进程的核心特点

  • 进程有独立内存空间。
  • 进程之间默认不共享普通变量。
  • 进程之间通信成本比线程高。
  • 一个进程崩溃,通常不会直接破坏另一个独立进程的内存。
  • 多进程更适合 CPU 密集型任务。

查看当前进程

import osprint(f"当前进程 pid: {os.getpid()}")
print(f"父进程 pid: {os.getppid()}")
print(f"CPU 核心数: {os.cpu_count()}")

pid 是当前进程编号,ppid 是父进程编号。os.getpid()os.getppid() 常用于确认代码到底运行在哪个进程里。

创建子进程

import os
import time
from multiprocessing import Process
def speak():
    for index in range(5):
        print(f"说话 {index}, pid={os.getpid()}")
        time.sleep(1)
def study():
    for index in range(5):
        print(f"学习 {index}, pid={os.getpid()}")
        time.sleep(1)
if __name__ == "__main__":
    print(f"主进程 pid={os.getpid()}")    # 创建 Process 对象时,只是描述“未来要创建一个子进程”。
    # 此时操作系统还没有真正启动子进程。
    p1 = Process(target=speak)
    p2 = Process(target=study)    # start() 才是真正向操作系统申请启动子进程。
    p1.start()
    p2.start()    print("主进程继续执行")

这里的重点:

  • Process(target=speak):指定子进程启动后执行哪个函数。
  • start():真正启动子进程。
  • 子进程启动后,操作系统会调度它运行。

Process 常见参数可以这样看:

Process(
    group=None,          # 保留参数,基本固定写 None,日常不用管
    target=handle_order, # 子进程启动后要执行的函数
    name="order-worker", # 进程名称,方便日志和调试;不传时 Python 自动生成
    args=(1001,),        # 传给 target 的位置参数,必须是元组或类似序列
    kwargs={"debug": True}, # 传给 target 的关键字参数,字典
    daemon=False,        # 是否为守护进程;必须在 start() 前设置
)

新手最常用的是 targetargs

p = Process(target=handle_order, args=(1001,))

如果参数很多,或者想让调用更清晰,可以用 kwargs

p = Process(
    target=handle_order,
    kwargs={
        "order_id": 1001,
        "user_name": "copyer",
    },
)

需要注意:多进程要把参数传给另一个 Python 进程,所以 target 和传入参数通常要能被 Python 序列化。普通函数、字符串、数字、列表、字典通常没问题;临时写在局部作用域里的函数、复杂连接对象、打开中的文件对象就容易出问题。

为什么必须写入口保护

多进程代码里经常看到:

if __name__ == "__main__":
    ...

这不是形式主义。

在 Windows 和 macOS 的某些启动方式下,创建子进程时,Python 会启动新的解释器进程,并重新导入当前 .py 文件。如果没有入口保护,子进程导入文件时又执行创建子进程的代码,就可能无限创建子进程。

所以多进程代码要养成习惯:

if __name__ == "__main__":
    p = Process(target=speak)
    p.start()

传参数

from multiprocessing import Process
def handle_order(order_id, user_name):
    print(f"处理订单: {order_id}, 用户: {user_name}")
if __name__ == "__main__":
    p = Process(
        target=handle_order,
        args=(1001, "copyer"),
    )    p.start()
    p.join()

args 必须是元组。只有一个参数时,要写成:

args=(1001,)

join:让当前进程等待子进程

import time
from multiprocessing import Process
def work(name, seconds):
    print(f"{name} 开始")
    time.sleep(seconds)
    print(f"{name} 结束")
if __name__ == "__main__":
    p1 = Process(target=work, args=("任务 A", 2))
    p2 = Process(target=work, args=("任务 B", 3))    p1.start()
    p2.start()    # join 不是让 p1 等。
    # join 是让执行这行代码的主进程等待 p1。
    p1.join()
    p2.join()    print("两个子进程都结束了")

这里有一个很容易说错的点:p.join() 不是让进程 p 等,而是让“执行 join 这行代码的进程”等。

如果你这样写:

p1.start()
p1.join()
p2.start()
p2.join()

效果就是 p1 完成后才启动 p2,并发效果就没了。

更常见写法是:

p1.start()
p2.start()p1.join()
p2.join()

terminate:强制终止子进程

import time
from multiprocessing import Process
def long_task():
    while True:
        print("任务执行中")
        time.sleep(1)
if __name__ == "__main__":
    p = Process(target=long_task)
    p.start()    time.sleep(3)    # 强制终止。
    # 生产环境要谨慎,因为可能导致文件、锁、数据库连接没有正常释放。
    p.terminate()
    p.join()    print("子进程已经终止")

terminate() 是暴力中止,不是优雅退出。实际开发里,如果任务涉及写文件、写数据库、释放锁,要优先设计“退出信号”,让任务自己收尾。

守护进程

守护进程依附于主进程。主进程结束时,守护进程会跟着结束。

import time
from multiprocessing import Process
def monitor():
    while True:
        print("后台监控中")
        time.sleep(1)
if __name__ == "__main__":
    p = Process(target=monitor)    # 必须在 start() 之前设置。
    p.daemon = True
    p.start()    time.sleep(3)
    print("主进程结束")

守护进程适合后台陪跑任务,例如简单监控、日志采样。不适合必须完整执行的核心业务,因为主进程一结束,它可能来不及收尾。

进程之间默认不共享变量

from multiprocessing import Processcount = 0
def add():
    global count
    count += 1
    print(f"子进程里的 count: {count}")
if __name__ == "__main__":
    p1 = Process(target=add)
    p2 = Process(target=add)    p1.start()
    p2.start()
    p1.join()
    p2.join()    print(f"主进程里的 count: {count}")

你可能以为主进程最后打印 2,但它通常还是 0

原因是:

主进程有自己的 count。
子进程 p1 有自己的 count 副本。
子进程 p2 有自己的 count 副本。

它们不是同一块内存。

如果进程之间要交换数据,要使用通信机制。

四、进程通信

进程之间不共享普通变量,所以需要专门的通信方式。

Queue

Queue 是队列,特点是先进先出。

putgetemptyfullput_nowaitget_nowait 都属于队列操作。

import time
from multiprocessing import Process, Queue
def producer(queue):
    for order_id in range(1, 6):
        print(f"生产订单: {order_id}")        # put:把数据放进队列。
        # 如果队列满了,默认会等待。
        queue.put(order_id)        time.sleep(0.5)    # 用 None 作为结束标记。
    queue.put(None)
def consumer(queue):
    while True:
        # get:从队列取数据。
        # 如果队列为空,默认会等待。
        order_id = queue.get()        if order_id is None:
            print("消费者收到结束标记")
            break        print(f"消费订单: {order_id}")
        time.sleep(1)
if __name__ == "__main__":
    queue = Queue()    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))    p1.start()
    p2.start()    p1.join()
    p2.join()

队列常见等待行为:

from multiprocessing import Queuequeue = Queue(3)queue.put(1)
queue.put(2)
queue.put(3)# 队列满了,继续 put 会等待。
# queue.put(4)# 最多等 2 秒。
# queue.put(4, timeout=2)# 不等待,放不进去就抛异常。
# queue.put_nowait(4)

读取也是类似:

# 队列为空时,默认等待。
# value = queue.get()# 最多等 2 秒。
# value = queue.get(timeout=2)# 不等待,没有数据就抛异常。
# value = queue.get_nowait()

实际开发中,不建议用 empty()full() 做严肃业务判断,因为多进程环境下,刚判断完状态就可能被其他进程改变。

Pipe

Pipe 是管道,适合两个进程之间点对点通信。

import time
from multiprocessing import Pipe, Process
def receiver(conn):
    print("接收方等待数据")    # recv 会阻塞,直到另一端 send 数据。
    data = conn.recv()    print(f"接收方收到: {data}")
    conn.close()
def sender(conn):
    time.sleep(2)    # send 把数据发送到管道另一端。
    conn.send({"type": "ORDER_DONE", "order_id": 1001})    print("发送方发送完成")
    conn.close()
if __name__ == "__main__":
	# conn1 和 conn2 既可以是发送方,也可以是接收方,所以是双向的
    conn1, conn2 = Pipe()    p1 = Process(target=receiver, args=(conn1,))
    p2 = Process(target=sender, args=(conn2,))    p1.start()
    p2.start()    p1.join()
    p2.join()

默认 Pipe() 是双向的。如果只需要单向通信,可以写:

recv_conn, send_conn = Pipe(duplex=False)

Lock

进程之间虽然不共享普通变量,但可能同时操作同一个外部资源,比如同一个文件、同一个终端输出、同一条数据库记录。

这时需要锁。

这里就是需要使用 lock, 进入关键代码前手动 acquire(),执行完后手动 release()

import time
from multiprocessing import Lock, Process
def write_log(lock, name):
    for index in range(3):
        # acquire 表示“我要开始独占这段代码了”。
        # 如果另一个进程已经拿到锁,这里会等待。
        lock.acquire()        print(f"{name}: 开始写第 {index} 行", end=" ")
        time.sleep(0.2)
        print("写入中", end=" ")
        time.sleep(0.2)
        print("写完")        # release 表示“我用完了,其他进程可以进来了”。
        lock.release()
if __name__ == "__main__":
    lock = Lock()    p1 = Process(target=write_log, args=(lock, "进程 A"))
    p2 = Process(target=write_log, args=(lock, "进程 B"))    p1.start()
    p2.start()    p1.join()
    p2.join()

这段代码能工作,但它有一个明显缺点:acquire()release() 必须成对出现。中间代码一旦报错,release() 可能执行不到,锁就一直不释放,其他进程会一直等。

比如这种写法就有风险:

lock.acquire()
do_something()
lock.release()

如果 do_something() 抛异常,lock.release() 就可能不会执行。

所以更推荐写成 with lock

import time
from multiprocessing import Lock, Processdef write_log(lock, name):
    for index in range(3):
        # with lock 等价于:
        # 进入代码块时自动 acquire。
        # 离开代码块时自动 release。
        # 即使中途抛异常,也会尽量自动释放锁。
        with lock:
            print(f"{name}: 开始写第 {index} 行", end=" ")
            time.sleep(0.2)
            print("写入中", end=" ")
            time.sleep(0.2)
            print("写完")

五、进程池

如果有 100 个任务,不应该手动创建 100 个进程。

创建进程有成本,进程太多也会增加操作系统调度压力。进程池就是提前准备固定数量的 worker 进程,然后把任务分配给它们。

先看整体流程:

创建进程池
  -> 提交任务
  -> 进程池把任务分给 worker 进程
  -> 取回结果
  -> 关闭进程池

可以把进程池的 API 可以分成三层:

第一层:进程池生命周期  with ProcessPoolExecutor(max_workers=3) as executor:
      ...  # ProcessPoolExecutor:创建进程池
  # max_workers:最多同时有几个 worker 进程干活
  # with:代码块结束时自动 shutdown(wait=True)
  # shutdown:不用 with 时才需要手动写
第二层:提交任务的两条路线  路线 A:executor.map(func, iterable)
    -> 一次提交一批任务
    -> 适合“同一个函数处理一批数据”
    -> 返回结果顺序和输入顺序一致
    -> 更像并发版本的 for 循环  路线 B:future = executor.submit(func, *args)
    -> 一次提交一个任务
    -> 立刻返回 Future
    -> Future 是“未来会有结果的任务凭证”
第三层:处理 Future 的几种方式  future.result()
    -> 主动等待这个任务完成
    -> 拿这个任务的返回值  future.add_done_callback(fn)
    -> 不主动等
    -> 任务完成后自动调用 fn(future)  as_completed(futures)
    -> 传入一组 Future
    -> 谁先完成,就先处理谁

map 路线:批量提交,按输入顺序取结果

map 适合这种场景:一个函数,要处理一批数据。它的返回结果顺序和输入顺序一致。

就比如一个偏计算的例子:统计几个大数字分别有多少个因子。这个过程一直在循环取余,CPU 会持续忙着算。

import os
from concurrent.futures import ProcessPoolExecutor
def count_factors(number):
    count = 0    # 这段循环没有在等网络、等数据库、等文件。
    # 它一直在做取余计算,所以属于偏 CPU 密集型。
    for value in range(1, number + 1):
        if number % value == 0:
            count += 1    return {
        "number": number,
        "factor_count": count,
        "pid": os.getpid(),
    }
if __name__ == "__main__":
    numbers = [120_000, 120_001, 120_002, 120_003, 120_004, 120_005]    with ProcessPoolExecutor(max_workers=3) as executor:
        # 最多同时使用 3 个 worker 进程。
        # 每个 worker 进程都有自己的 pid。
        # 对 CPU 计算来说,多进程更容易用上多个 CPU 核心。
        results = executor.map(count_factors, numbers)        print(list(results))

max_workers=3 表示最多同时有 3 个 worker 进程执行任务。

submit + Future

submitmap 不一样。

map
  -> 一次提交一批任务
  -> 更像并发版本的 for 循环submit
  -> 一次提交一个任务
  -> 立刻返回 Future

Future 可以先理解成“未来会有结果的任务凭证”:

executor.submit(...)
  -> 任务已经交给进程池
  -> 现在先拿到 Future
  -> 之后用 future.result() 取结果
from concurrent.futures import ProcessPoolExecutor
def square(n):
    return n * n
if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        # submit 只负责提交任务,不会马上给你最终结果。
        future = executor.submit(square, 10)        # result 会等待任务完成,并拿到返回值。
        result = future.result()        print(result)

add_done_callback:任务完成后自动回调

如果你不想只靠 future.result() 主动等待,也可以给 Future 添加完成回调。

from concurrent.futures import ProcessPoolExecutor
def square(n):
    return n * n
def handle_done(future):
    # 回调函数会收到 future 自己。
    # 任务正常完成时,可以用 result() 拿返回值。
    print(f"任务完成,结果是: {future.result()}")
if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        future = executor.submit(square, 10)        # add_done_callback 不会立刻执行 handle_done。
        # 它只是登记一个回调:等 future 完成后再调用。
        future.add_done_callback(handle_done)

回调函数必须能接收一个 future 参数。如果任务本身抛异常,在回调里调用 future.result() 时也会重新抛出这个异常。

as_completed:按完成顺序取结果

as_completed 适合这种场景:你提交了多个任务,不关心输入顺序,只想谁先完成就先处理谁。

它和 submit 是一组常见搭配:

submit 多个任务
  -> 得到多个 Future
  -> as_completed(futures)
  -> 谁先完成,先返回谁

下面这段代码只演示 as_completed 的取结果顺序。

import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def work(seconds):
    time.sleep(seconds)
    return f"睡了 {seconds} 秒"
if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = [
            executor.submit(work, 3),
            executor.submit(work, 1),
            executor.submit(work, 2),
        ]        # 谁先完成,先处理谁。
        for future in as_completed(futures):
            print(future.result())

map 注重输入顺序,as_completed 注重完成顺序。

shutdown:关闭进程池

进程池用完后要关闭,否则 worker 进程和相关资源可能不能及时释放。

日常最推荐用 with,因为它会自动调用 shutdown(wait=True)

from concurrent.futures import ProcessPoolExecutor
def work(item):
    return item * 2
if __name__ == "__main__":
    # 进入 with:创建进程池。
    with ProcessPoolExecutor(max_workers=3) as executor:
        results = executor.map(work, [1, 2, 3, 4])
        print(list(results))    # 离开 with:自动 shutdown(wait=True)。

如果不用 with,就要自己调用 shutdown()

from concurrent.futures import ProcessPoolExecutor
def work(item):
    return item * 2
if __name__ == "__main__":
    executor = ProcessPoolExecutor(max_workers=3)    try:
        results = executor.map(work, [1, 2, 3, 4])
        print(list(results))
    finally:
        # 手动关闭进程池。
        # wait=True 表示等待已经提交的任务执行完再真正退出。
        executor.shutdown(wait=True)

进程池关闭后,不能再继续 submitmap 新任务。

六、线程

线程是进程内部的执行单元。一个进程至少有一个主线程,也可以创建多个子线程。

线程和进程最大的区别之一是:同一进程内的线程共享内存。

进程 A
├── 线程 1
├── 线程 2
└── 线程 3这三个线程可以访问进程 A 的同一批对象和变量。

这带来两个结果:

  • 共享数据方便。
  • 多线程同时改同一份数据时,需要考虑锁。

新手可以先把线程理解成“同一个办公室里的多个办事员”。他们共用一张桌子上的资料,所以传数据方便;但如果同时改同一张表,就容易把数据改乱。

在语法层面上,基本跟进程差不多。

创建线程

import os
import time
from threading import Thread, get_native_id
def speak():
    for index in range(5):
        print(
            f"说话 {index}, "
            f"pid={os.getpid()}, "
            f"thread={get_native_id()}"
        )
        time.sleep(1)
def study():
    for index in range(5):
        print(
            f"学习 {index}, "
            f"pid={os.getpid()}, "
            f"thread={get_native_id()}"
        )
        time.sleep(1)
if __name__ == "__main__":
    print(f"主线程 pid={os.getpid()}, thread={get_native_id()}")    t1 = Thread(target=speak)
    t2 = Thread(target=study)    t1.start()
    t2.start()    t1.join()
    t2.join()    print("两个线程执行完")

运行后你会发现:

pid 通常相同。
thread id 不同。

这说明它们属于同一个进程里的不同线程。

Thread 常见参数和 Process 很像:

Thread(
    group=None,          # 保留参数,基本固定写 None,日常不用管
    target=fetch_user,   # 子线程启动后要执行的函数
    name="fetch-worker", # 线程名称,方便日志和调试;不传时 Python 自动生成
    args=(1001,),        # 传给 target 的位置参数,元组
    kwargs={"debug": True}, # 传给 target 的关键字参数,字典
    daemon=False,        # 是否为守护线程;必须在 start() 前设置
)

新手最常用的还是 targetargs

t = Thread(target=fetch_user, args=(1001,))

如果想在日志里区分线程,可以设置 name

t = Thread(
    target=fetch_user,
    args=(1001,),
    name="fetch-user-1001",
)

daemon=True 表示守护线程。主程序结束时,守护线程不会阻止程序退出。它适合一些后台辅助任务,但不适合必须完整保存数据、写文件、提交数据库的任务。

线程锁

因为线程共享内存,所以多个线程可能同时修改同一个变量。

from threading import Lock, Threadcount = 0
lock = Lock()
def add_many_times():
    global count    for _ in range(100_000):
        # 同一时刻只允许一个线程进入这个代码块。
        with lock:
            count += 1
if __name__ == "__main__":
    t1 = Thread(target=add_many_times)
    t2 = Thread(target=add_many_times)    t1.start()
    t2.start()    t1.join()
    t2.join()    print(count)

锁保护的是临界区,也就是不能被多个线程同时执行的那段关键代码。

LockRLock 的区别:

Lock
  -> 普通锁
  -> 同一个线程拿到锁后,不能再次拿同一把锁
  -> 如果重复 acquire,自己也会把自己卡住RLock
  -> 可重入锁
  -> 同一个线程拿到锁后,可以再次拿同一把锁
  -> acquire 几次,就要 release 几次

先看 Lock 容易卡住的情况:

from threading import Locklock = Lock()
def inner():
    # outer 已经拿着 lock。
    # 这里再次获取同一把普通 Lock,会一直等自己释放自己。
    with lock:
        print("inner")
def outer():
    with lock:
        print("outer")
        inner()
outer()

这段代码的问题不是有两个线程抢锁,而是同一个线程重复进入了同一把普通锁。

这种情况下可以用 RLock

from threading import RLocklock = RLock()
def inner():
    # RLock 允许同一个线程重复拿同一把锁。
    with lock:
        print("inner")
def outer():
    with lock:
        print("outer")
        inner()
outer()

但不要一上来就把所有 Lock 都换成 RLock。大多数场景用 Lock 更直接;只有当“同一个线程可能在函数嵌套调用里重复进入同一把锁”时,才考虑 RLock

multiprocessing 里也有 LockRLock,核心区别类似:Lock 不可重入,RLock 可重入。

线程池

和进程池的用法基本一致。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_user(user_id):
    print(f"开始请求用户 {user_id}")    # 模拟网络 I/O 等待。
    # 真实项目里这里可能是 requests.get(...)、数据库查询、读文件等。
    # 等待期间 CPU 没有一直忙着计算,所以适合用线程池并发等待。
    time.sleep(1)    return {"id": user_id, "name": f"user-{user_id}"}
if __name__ == "__main__":
    user_ids = [1, 2, 3, 4, 5]    with ThreadPoolExecutor(max_workers=3) as executor:
        # 把 5 个用户查询任务交给线程池。
        # max_workers=3 表示同一时间最多 3 个线程在处理。
        futures = [
            executor.submit(fetch_user, user_id)
            for user_id in user_ids
        ]        # as_completed 会按“谁先完成”返回结果。
        # 这适合接口请求:哪个接口先回来,就先处理哪个结果。
        for future in as_completed(futures):
            print(future.result())

这个场景是 I/O 密集型:线程大部分时间在等接口返回,不是在持续占用 CPU。

七、GIL

先来看两个案例:

先看“下载文件”。

假设有 3 个图片要下载:

图片 A:等服务器返回 3 秒
图片 B:等服务器返回 2 秒
图片 C:等服务器返回 1 秒

如果只有 1 个线程,它大概会这样:

线程 1
  -> 下载 A,等 3 秒
  -> 下载 B,等 2 秒
  -> 下载 C,等 1 秒总时间接近 3 + 2 + 1 秒

如果用 3 个线程,它大概会这样:

线程 1 -> 下载 A,等待服务器
线程 2 -> 下载 B,等待服务器
线程 3 -> 下载 C,等待服务器总时间更接近最慢的那个任务,也就是 3 秒

这里多线程有用,是因为线程大部分时间都在“等别人”。等待网络、数据库、磁盘时,CPU 没有一直忙着执行 Python 代码。一个线程在等,程序就可以切到另一个线程继续推进。

再看“做计算”。

假设有 3 个很重的计算任务:

任务 A:一直循环计算
任务 B:一直循环计算
任务 C:一直循环计算

这些任务不是在等别人,而是一直需要 CPU 执行 Python 代码。如果你把它们放到同一个 Python 进程的多个线程里,大概是这种感觉:

一个 Python 进程
  线程 1:想执行 Python 计算
  线程 2:想执行 Python 计算
  线程 3:想执行 Python 计算但同一时刻通常只有一个线程能真正执行 Python 字节码。

这就是 GIL 要解释的现象。

GIL 是 Global Interpreter Lock,可以先理解成“同一个 Python 进程里执行 Python 代码的通行证”。一个线程想执行 Python 字节码,通常要先拿到这张通行证。

同一个 Python 进程里:线程 1 拿到 GIL
  -> 线程 1 执行 Python 代码线程 2 没拿到 GIL
  -> 线程 2 先等线程 3 没拿到 GIL
  -> 线程 3 先等

所以对于纯 Python CPU 计算,多线程经常不是“3 个线程同时在 3 个 CPU 核心上疯狂计算”。更常见的是:多个线程轮流拿到执行 Python 代码的机会。

这就是为什么前面一直说:

任务主要在等
  -> 多线程有意义
  -> 因为等待期间可以切到别的线程任务主要在算
  -> 多线程不一定能加速
  -> 因为同一个进程里会受到 GIL 限制

那为什么“算”更常用多进程?

因为多进程不是在同一个 Python 进程里抢同一把 GIL。每个进程都有自己的 Python 解释器,也有自己的 GIL。

进程 A
  -> 自己的 Python 解释器
  -> 自己的 GIL
  -> 可以在一个 CPU 核心上算进程 B
  -> 自己的 Python 解释器
  -> 自己的 GIL
  -> 可以在另一个 CPU 核心上算

所以对新手来说,可以先这样记:

下载、请求接口、查数据库、读写文件
  -> 大量时间在等
  -> 线程有用循环计算、图片压缩、视频处理、加密压缩
  -> 大量时间在算
  -> 多进程更常见

这里说的是普通 CPython,也就是最常用的 Python 解释器实现。以后你可能会看到“free-threaded Python”“某些 C 扩展释放 GIL”这类说法,那是更进阶的情况。刚开始先按上面的判断就够了。

最后注意:GIL 和你自己写的 Lock / RLock 不是一回事。

GIL
  -> Python 解释器内部的锁
  -> 主要影响同一进程里多个线程执行 Python 字节码
  -> 程序员通常不手动控制Lock / RLock
  -> 你自己创建的业务锁
  -> 用来保护库存、余额、文件写入、共享变量这类业务数据
  -> 由你决定锁哪里、什么时候释放

所以不要因为有 GIL 就不加业务锁。GIL 不会替你保证库存、余额、文件写入这些业务数据一定正确。

八、协程

协程这一块最容易误解成“和线程差不多,都是等的时候切换”。这个感觉没有错,但还不够准确。

线程和协程都能在等待时推进别的任务,区别在于谁负责切换:

线程:
  -> 由操作系统调度
  -> 每个线程都有自己的执行栈
  -> 线程多了,创建、切换、内存成本都会上来协程:
  -> 由程序里的事件循环调度
  -> 通常在一个线程里运行很多个任务
  -> 只有遇到 await,当前任务才主动让出执行权

所以协程的实际意义不是“发明了一种新的等待”,而是:

用更少的线程,管理大量正在等待的 I/O 任务。

把它想成一个窗口工作人员。

普通同步代码像这样:

工作人员只处理订单 A
  -> A 要等接口返回
  -> 工作人员站着等 A
  -> A 回来后,才处理订单 B
  -> B 回来后,才处理订单 C

协程像这样:

工作人员处理订单 A
  -> A 要等接口返回
  -> A 在 await 这里暂停
  -> 工作人员先去处理订单 B订单 B 也要等
  -> B 在 await 这里暂停
  -> 工作人员再去处理订单 C哪个订单的接口先回来
  -> 工作人员再回到那个订单继续处理

这里的“工作人员”就是事件循环,await 就是任务主动说:“我现在要等外部结果,你先去处理别的任务。”

这和线程的区别是:

线程模型:
  多个工作人员
  每个人等一个窗口协程模型:
  一个工作人员
  同时跟进很多张等待中的单子
  只在 await 这些明确的等待点切换

所以协程适合这种任务:

请求很多接口
查很多数据库
维护很多连接
等待很多文件或网络结果

它不适合用来加速纯 CPU 计算。因为 CPU 计算没有在等外部结果,也就没有多少机会通过 await 把执行权让出去。

什么时候用线程,什么时候用协程

先不要只看“任务多不多”,更关键的是看你调用的库是什么类型。

用线程:
  -> 你调用的是同步阻塞库
  -> 例如 requests、普通文件读写、同步数据库驱动
  -> 这些库没有 await 写法
  -> 把它们放到线程里等,主流程就不用一直卡着用协程:
  -> 你调用的是异步库
  -> 例如 aiohttp、httpx.AsyncClient、异步数据库驱动
  -> 这些库本身支持 await
  -> 一个事件循环就能管理大量等待任务

看两个最常见的写法对比:

# 同步库:没有 await,只能阻塞等待。
# 这种任务如果要并发处理,通常交给线程池。
def fetch_by_thread(url):
    response = requests.get(url)
    return response.text
# 异步库:请求时可以 await。
# 当前协程等待网络结果时,事件循环可以去调度其他协程。
async def fetch_by_coroutine(client, url):
    response = await client.get(url)
    return response.text

不要为了用协程而用协程。如果项目里全是同步阻塞库,把代码硬改成 async def,但里面没有真正的 await 等待点,事件循环还是会被卡住。

先认识四个核心动作

async def
  -> 定义协程函数
  -> 它描述“这个任务可以暂停和恢复”协程函数()
  -> 只会得到一个协程对象
  -> 不会立刻执行函数体asyncio.run(main())
  -> 启动事件循环
  -> 让 asyncio 开始调度协程await
  -> 当前协程遇到等待点
  -> 暂停当前协程
  -> 把执行权交回事件循环
  -> 等待完成后,再从暂停的位置继续

最小例子:先跑通一个协程

下面这个例子只能帮你认识语法。它只有一个任务,所以还体现不出协程的价值。

import asyncio
async def work():
    print("work 开始")    # asyncio.sleep 是异步等待。
    # 当前协程会暂停 1 秒,但事件循环没有被卡死。
    await asyncio.sleep(1)    print("work 结束")
    return "工作结果"
async def main():
    # await work() 会执行 work。
    # 当 work 内部 await 等待时,main 也会暂停等待结果。
    result = await work()
    print(result)
# asyncio.run 是入口。
# 没有它,协程不会真正跑起来。
asyncio.run(main())

简单理解:

async def work()
  -> 定义协程函数work()
  -> 得到协程对象,不是立刻执行await work()
  -> 执行并等待这个协程完成asyncio.run(main())
  -> 启动事件循环,运行入口协程

对比:顺序等待和协程并发等待

先看顺序等待。三个接口一个个请求:

import asyncio
async def request_api(name, seconds):
    print(f"{name} 开始请求")    # 用 sleep 模拟接口等待。
    # 真实项目里这里通常是异步 HTTP 请求、异步数据库查询等。
    await asyncio.sleep(seconds)    print(f"{name} 请求完成")
    return f"{name} 的响应"
async def main():
    # 这里是顺序等待。
    # 接口 A 完成后,才会开始接口 B。
    # 接口 B 完成后,才会开始接口 C。
    result1 = await request_api("接口 A", 2)
    result2 = await request_api("接口 B", 1)
    result3 = await request_api("接口 C", 3)    print(result1, result2, result3)
asyncio.run(main())

这段代码大概要等:

接口 A 2 秒
  + 接口 B 1 秒
  + 接口 C 3 秒
  = 接近 6 秒

现在改成协程并发等待:

import asyncio
async def request_api(name, seconds):
    print(f"{name} 开始请求")
    await asyncio.sleep(seconds)
    print(f"{name} 请求完成")
    return f"{name} 的响应"
async def main():
    # create_task 会把协程包装成 Task,并交给事件循环调度。
    # 这三行执行后,A、B、C 都开始等待。
    task1 = asyncio.create_task(request_api("接口 A", 2))
    task2 = asyncio.create_task(request_api("接口 B", 1))
    task3 = asyncio.create_task(request_api("接口 C", 3))    # await task 不是重新执行任务。
    # 它只是等待这个已经启动的任务完成,并拿到结果。
    result1 = await task1
    result2 = await task2
    result3 = await task3    print(result1, result2, result3)
asyncio.run(main())

这段代码大概要等:

接口 A、B、C 同时开始等待
  -> B 大约 1 秒后回来
  -> A 大约 2 秒后回来
  -> C 大约 3 秒后回来总时间更接近最慢的 C,也就是 3 秒

这就是协程的意义:不是把某一个接口变快,而是在一个接口等待时,先去推进其他接口。

gather:同时等一组任务,按传入顺序拿结果

如果你的需求是“这几个任务都跑起来,最后一起拿结果”,gather 更简洁。

import asyncio
async def request_api(name, seconds):
    await asyncio.sleep(seconds)
    return f"{name} 完成"
async def main():
    results = await asyncio.gather(
        request_api("接口 A", 2),
        request_api("接口 B", 1),
        request_api("接口 C", 3),
    )    # gather 返回结果的顺序,和传入顺序一致。
    print(results)
asyncio.run(main())

create_taskgather 可以这样区分:

create_task
  -> 先把一个协程启动成任务
  -> 你后面可以单独 await 它gather
  -> 同时等待一组协程或任务
  -> 最后一次性拿到结果列表

as_completed:谁先完成,先处理谁

如果你不关心传入顺序,而是想接口谁先回来就先处理谁,用 as_completed

import asyncio
async def request_api(name, seconds):
    await asyncio.sleep(seconds)
    return f"{name} 完成"
async def main():
    tasks = [
        asyncio.create_task(request_api("接口 A", 2)),
        asyncio.create_task(request_api("接口 B", 1)),
        asyncio.create_task(request_api("接口 C", 3)),
    ]    # 谁先完成,as_completed 就先给谁。
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)
asyncio.run(main())

简单对比:

gather
  -> 等全部完成
  -> 按传入顺序返回结果as_completed
  -> 谁先完成先处理谁
  -> 适合“结果一回来就要处理”的场景

不要在协程里写阻塞代码

协程能切换的前提是:是在 async def 代码中写的是异步等待,而不是普通的阻塞代码

错误示范:

import time
async def bad_work():
    # time.sleep 是同步阻塞等待。
    # 它会卡住整个事件循环。
    time.sleep(3)

应该写:

import asyncio
async def good_work():
    # asyncio.sleep 是异步等待。
    # 当前协程暂停时,事件循环还能调度其他协程。
    await asyncio.sleep(3)

网络请求也是一样。requests 是同步阻塞库,不适合直接放在 async def 里大量调用。异步 HTTP 通常使用 httpx.AsyncClientaiohttp

限制并发数量

协程适合大量 I/O,但不能无限制地同时发起任务。比如一次性发起 5000 个请求,可能压垮自己,也可能压垮对方服务。

这时用 Semaphore 控制同一时间最多有多少个协程进入关键区域。

import asyncio
async def fetch(index):
    await asyncio.sleep(1)
    return f"第 {index} 个请求完成"
async def limited_fetch(semaphore, index):
    # async with semaphore 表示:
    # 同一时刻最多允许固定数量的协程进入这里。
    async with semaphore:
        return await fetch(index)
async def main():
    # 最多同时执行 5 个请求。
    semaphore = asyncio.Semaphore(5)    tasks = [
        limited_fetch(semaphore, index)
        for index in range(20)
    ]    results = await asyncio.gather(*tasks)
    print(results)
asyncio.run(main())

Queue:协程之间传递任务

如果一个协程负责生产任务,另一个协程负责处理任务,可以用 asyncio.Queue

producer
  -> 把订单放进 queueconsumer
  -> 从 queue 里取订单处理

示例:

import asyncio
async def producer(queue):
    for order_id in range(1, 6):
        print(f"生产订单: {order_id}")        # put 是异步的。
        # 如果队列满了,producer 会在这里等待。
        await queue.put(order_id)
        await asyncio.sleep(0.2)    # 用 None 当结束信号。
    await queue.put(None)
async def consumer(queue):
    while True:
        # get 是异步的。
        # 如果队列为空,consumer 会在这里等待。
        order_id = await queue.get()        if order_id is None:
            print("消费者结束")
            queue.task_done()
            break        print(f"处理订单: {order_id}")
        await asyncio.sleep(0.5)        # 告诉队列:刚才 get 出来的任务处理完了。
        queue.task_done()
async def main():
    queue = asyncio.Queue()    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )
asyncio.run(main())

协程里的 asyncio.Queue 和多进程的 multiprocessing.Queue 不是同一个东西,但它们都可以表达“生产者消费者”模型。

九、常见坑

把并发当成并行

协程可以高并发,但不是多核并行计算。纯 CPU 计算放进协程里,不会神奇变快。

在 async 函数里写阻塞代码

async def bad():
    time.sleep(3)

这会卡住事件循环。

多进程忘记入口保护

if __name__ == "__main__":
    ...

多进程程序尤其要写。

以为进程之间共享普通变量

普通变量不会自动跨进程共享。要通信就用 Queue、Pipe、数据库、Redis、消息队列等。

锁拿了不释放

lock.acquire()
do_something()
lock.release()

如果 do_something() 抛异常,release() 可能执行不到。更推荐:

with lock:
    do_something()

线程越多越快

线程太多会增加调度成本、内存压力和上下文切换。线程池、进程池都要设置合理的 max_workers

没有限制协程并发

一次性发起几千个请求,可能压垮自己或对方服务。用 Semaphore、连接池、限流策略控制压力。

十、最后怎么记

进程:
  操作系统分配资源的基本单位。
  内存隔离,通信成本高。
  适合 CPU 密集型、隔离要求高的任务。线程:
  操作系统调度 CPU 的基本单位。
  同一进程内线程共享内存。
  适合同步阻塞 I/O,但共享数据要加锁。协程:
  用户态的可暂停、可恢复执行单元。
  由事件循环调度,创建和切换成本低。
  适合高并发 I/O,前提是使用异步库。并发:
  一段时间内多个任务都在推进。并行:
  同一时刻多个任务真的在不同 CPU 核心执行。同步:
  发起任务后,等它完成再继续。异步:
  发起任务后,不必一直等在原地,完成后再取结果。

真正写代码时,优先按这个顺序思考:

任务主要是在算,还是在等?
任务之间有没有依赖?
任务之间要不要共享数据?
任务数量会不会很多?
失败后要不要重试?
是否需要跨机器执行?

想清楚这些,再选择进程、线程、协程,才不会被 API 牵着走。

相关文章

精彩推荐