# Summary ## 进程/线程/协程 - **进程**:计算机系统分配资源的最小单位,操作系统调度 - **线程**:CPU调度的基本单位,操作系统调度 - **协程**:轻量级线程,用户程序调度 # Cues # Notes # 15分钟简单了解Python并发编程 *时间:20250625* --- ## 1. 前置知识 ### 进程/线程/协程 - **进程**:计算机系统分配资源的最小单位,操作系统调度 - **线程**:CPU调度的基本单位,操作系统调度 - **协程**:轻量级线程,用户程序调度 ### 并发 VS 并行 - **并发**:同一时间段 - **并行**:同一时间(多核CPU,多处理器系统,多机并行) --- ## 2. 多线程 ### 2.1 实现方式 Python多线程实现方式主要有3种 #### 2.1.1 实例化Thread并传入目标函数 ```python import time import threading def worker(sleep_time: int): print(f"线程{threading.current_thread().name}开始执行") time.sleep(sleep_time) print(f"线程{threading.current_thread().name}执行完成") # 创建两个线程 t1 = threading.Thread(target=worker, args=(1,)) t2 = threading.Thread(target=worker, args=(2,)) # 启动线程 t1.start() t2.start() # 等待线程执行完毕 t1.join() t2.join() ``` > **output** > ```Java > 线程Thread-1 (worker)开始执行 > 线程Thread-2 (worker)开始执行 > 线程Thread-1 (worker)执行完成 > 线程Thread-2 (worker)执行完成 > ``` #### 2.1.2 继承Thread并重写run方法 ```python import time import threading class MyThread(threading.Thread): def __init__(self, sleep_time: int): super().__init__() self.sleep_time = sleep_time def run(self): print(f"线程{threading.current_thread().name}开始执行") time.sleep(self.sleep_time) print(f"线程{threading.current_thread().name}执行完成") # 创建两个线程 t1 = MyThread(1) t2 = MyThread(2) # 启动线程 t1.start() t2.start() # 等待线程执行完毕 t1.join() t2.join() ``` > **output** > ```Java > 线程Thread-1开始执行 > 线程Thread-2开始执行 > 线程Thread-1执行完成 > 线程Thread-2执行完成 > ``` #### 2.1.3 使用ThreadPoolExecutor来实现线程池 `concurrent.futures`模块中实现了线程池,`feature`对象可以方便的获取方法的返回值 ```python import concurrent.futures import time import threading def worker(sleep_time: int): print(f"线程{threading.current_thread().name}开始执行") time.sleep(sleep_time) print(f"线程{threading.current_thread().name}执行完成") return sleep_time # 创建线程池,最大线程数为3 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # 提交任务 future_to_num = {executor.submit(worker, num): num for num in range(3)} for future in concurrent.futures.as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print(f"任务 {num} 执行失败: {e}") else: print(f"任务 {num} 结果: {result}") ``` > **output** > ```Java > 线程ThreadPoolExecutor-0_0开始执行 > 线程ThreadPoolExecutor-0_1开始执行 > 线程ThreadPoolExecutor-0_0执行完成 > 任务 0 结果: 0 > 线程ThreadPoolExecutor-0_0开始执行 > 线程ThreadPoolExecutor-0_1执行完成 > 任务 2 结果: 2 > 线程ThreadPoolExecutor-0_0执行完成 > 任务 1 结果: 1 > ``` > *注:原文输出有误,根据代码逻辑修正。* > 和Java多线程的API类似,继承Thread,实现runable接口,ExecutorService 创建线程池 ### 2.2 多核CPU下的多线程 **问题:在多核CPU上分别执行下面代码,结果如何?** **Python多线程** ```python import threading import time def count_down(n): while n > 0: n -= 1 # 单线程 start = time.time() count_down(100000000) count_down(100000000) print(f"Python单线程: {time.time()-start:.2f}秒") # 多线程 start = time.time() t1 = threading.Thread(target=count_down, args=(100000000,)) t2 = threading.Thread(target=count_down, args=(100000000,)) t1.start() t2.start() t1.join() t2.join() print(f"Python多线程: {time.time()-start:.2f}秒") ``` **Java多线程** ```java public class Test { static void countDown(long n) { while (n > 0) { n--; } } public static void main(String[] args) throws InterruptedException { // 单线程 long start = System.currentTimeMillis(); countDown(100000000L); countDown(100000000L); System.out.printf("Java单线程: %.2f秒\n", (System.currentTimeMillis()-start)/1000.0); // 多线程 start = System.currentTimeMillis(); Thread t1 = new Thread(() -> countDown(100000000L)); Thread t2 = new Thread(() -> countDown(100000000L)); t1.start(); t2.start(); t1.join(); t2.join(); System.out.printf("Java多线程: %.2f秒\n", (System.currentTimeMillis()-start)/1000.0); } } ``` > ```Java > Python单线程: 4.88秒 > Python多线程: 4.85秒 > > Java单线程: 0.4920秒 > Java多线程: 0.0010秒 > ``` **多线程几乎没有加速?** - CPU密集型任务 vs IO密集型? - 全局解释器锁 ### 2.3 全局解释器锁 #### 2.3.1 是什么 全局解释器锁(Global Interpreter Lock,GIL)是Python 解释器的一个特性,确保同一时刻只有一个线程能执行 Python 字节码。 **线程切换时机?** - 固定时间间隔 / 一定的数量字节码 - 遇到IO操作 / 阻塞(`open()`, `time.sleep()`等) #### 2.3.2 为什么 1. **简化设计**:Python使用引用计数管理内存,GIL避免了复杂的锁机制 - 没有GIL时:每次增减引用计数都需要加锁,性能反而更差 2. **保护数据**:防止多线程同时修改Python对象导致数据混乱 - 比如两个线程同时修改同一个列表 - 没有GIL时,每个C扩展都需要自己实现线程安全 3. **历史原因**:Python诞生时多核CPU还不普及 #### 2.3.3 影响 1. C语言扩展 2. 多进程 3. 异步IO优化 ### 2.4 优缺点 - **优点**:可以加速IO密集型任务的效率 - **缺点**:受GIL限制,实际只能单线程并发,无法利用多核CPU的全部能力 --- ## 3. 多进程 `multiprocessing`模块充分利用多核 CPU的优势,绕开GIL,大幅提升CPU密集型任务的处理效率。 每个进程都有自己独立的内存空间和独立的Python解释器,因此也有自己独立的GIL。 ### 3.1 实现方式 #### 3.1.1 实例化Process并传入目标函数 ```python import time import multiprocessing def worker(sleep_time: int): print(f"进程{multiprocessing.current_process().name}开始执行") time.sleep(sleep_time) print(f"进程{multiprocessing.current_process().name}执行完成") if __name__ == '__main__': # 创建两个进程 p1 = multiprocessing.Process(target=worker, args=(1,)) p2 = multiprocessing.Process(target=worker, args=(2,)) # 启动进程 p1.start() p2.start() # 等待进程执行完毕 p1.join() p2.join() ``` > **output** > ```Java > 进程Process-1开始执行 > 进程Process-2开始执行 > 进程Process-1执行完成 > 进程Process-2执行完成 > ``` #### 3.1.2 继承Process并重写run方法 ```python import time import multiprocessing class MyProcess(multiprocessing.Process): def __init__(self, sleep_time: int): super().__init__() self.sleep_time = sleep_time def run(self): print(f"进程{multiprocessing.current_process().name}开始执行") time.sleep(self.sleep_time) print(f"进程{multiprocessing.current_process().name}执行完成") if __name__ == '__main__': # 创建两个线进程 p1 = MyProcess(1) p2 = MyProcess(2) # 启动进程 p1.start() p2.start() # 等待进程执行完毕 p1.join() p2.join() ``` > **output** > ```Java > 进程MyProcess-1开始执行 > 进程MyProcess-2开始执行 > 进程MyProcess-1执行完成 > 进程MyProcess-2执行完成 > ``` #### 3.1.3 使用PoolExecutorlExecutor来实现进程池 `concurrent.futures`模块中实现了进程池,`feature`对象可以方便的获取方法的返回值 ```python import multiprocessing import concurrent.futures import time def worker(sleep_time: int): print(f"进程{multiprocessing.current_process().name}开始执行") time.sleep(sleep_time) print(f"进程{multiprocessing.current_process().name}执行完成") return sleep_time if __name__ == '__main__': # 创建线程池,最大线程数为3 with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: # 提交任务 future_to_num = {executor.submit(worker, num): num for num in range(3)} for future in concurrent.futures.as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print(f"任务 {num} 执行失败: {e}") else: print(f"任务 {num} 结果: {result}") ``` > **output** > ```Java > 进程SpawnProcess-1开始执行 > 进程SpawnProcess-2开始执行 > 进程SpawnProcess-1执行完成 > 任务 0 结果: 0 > 进程SpawnProcess-1开始执行 > 进程SpawnProcess-2执行完成 > 任务 1 结果: 1 > 进程SpawnProcess-1执行完成 > 任务 2 结果: 2 > ``` > *注:原文输出有误,根据代码逻辑修正。* > API与线程方式相似,`thread` -> `process` ### 3.2 优缺点 - **优点**:绕开GIL,充分利用多核CPU资源 - **缺点**:资源开销大(创建进程比线程慢,占用内存多),进程间通信(IPC)比线程间共享数据要复杂 --- ## 4. 协程 - 轻量级线程,用户程序调度 协程在单个线程内执行,通过在执行代码中暂停和恢复,实现多任务的并发执行。 异步模块`asyncio`在Python 3.4 版本中被首次引入,在Python 3.5时,引入了`async`和`await`关键字。 ### 核心概念 - `async def`: 定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。 - `await`: 暂停当前协程的执行,将控制权交还给事件循环,去执行其他任务,直到等待的操作完成。`await`只能用在`async def`函数内部。 ### 4.1 从一个简单的协程案例介绍 ```python import asyncio import time # 1. 定义协程函数 async def coroutine(delay: int): print(f'开始执行协程函数, {delay}') # 模拟耗时的操作 await asyncio.sleep(delay) print(f'协程函数执行结束, {delay}') return f'协程函数{delay}执行结束' async def main(): a = await coroutine(1) b = await coroutine(2) print(f"a: {a}, b: {b}") if __name__ == '__main__': c = coroutine(1) print(c) start = time.time() # 2. 运行最高层级的协程 asyncio.run(main()) print(f"执行协程函数耗时: {time.time() - start}s") ``` > **output** > ```Java > <coroutine object coroutine at 0x000002A0B2638040> > 开始执行协程函数, 1 > 协程函数执行结束, 1 > 开始执行协程函数, 2 > 协程函数执行结束, 2 > a: 协程函数1执行结束, b: 协程函数2执行结束 > 执行协程函数耗时: 3.0214524269104004s > ``` #### 4.1.1 async - 协程函数 -> 生成器函数 - 运行协程 -> 生成器`next()` `asyncio.run()`创建一个事件循环,运行指定的协程函数。 #### 4.1.2 事件循环 1. **检查协程**:检查可以执行的协程 2. **让出控制**:控制权转交给要执行的协程 3. **等待协程**:等待协程暂停或结束后,重新得到控制权 **为什么没有加速?** 协程`main()`未就绪,无法被事件循环调度。 #### 4.1.3 await 1. 暂停当前协程 2. 注册事件循环 3. 获取`await`后的协程结果 可等待对象:协程对象,`task`,`future`。 `asyncio.create_task()`: 将协程对象变为一个`task`,并注册到事件循环中。 ```python import asyncio import time # 1. 定义协程函数 async def coroutine(delay: int): print(f'开始执行协程函数, {delay}') # 模拟耗时的操作 await asyncio.sleep(delay) print(f'协程函数执行结束, {delay}') return f'协程函数{delay}执行结束' async def main(): task_a = asyncio.create_task(coroutine(1)) task_b = asyncio.create_task(coroutine(2)) a = await task_a b = await task_b print(f"a: {a}, b: {b}") if __name__ == '__main__': c = coroutine(1) print(c) start = time.time() # 2. 运行最高层级的协程 asyncio.run(main()) print(f"执行协程函数耗时: {time.time() - start}s") ``` > **output** > ```Java > <coroutine object coroutine at 0x00000284D5808040> > 开始执行协程函数, 1 > 开始执行协程函数, 2 > 协程函数执行结束, 1 > 协程函数执行结束, 2 > a: 协程函数1执行结束, b: 协程函数2执行结束 > 执行协程函数耗时: 2.014808416366577s > ``` 可以通过`asyncio.gather()`将协程对象/task统一注册到事件循环中,等待每一个可等待对象都执行完成后才完成(类似Java `CompletableFuture.allOf()`) ```python async def main(): gather = asyncio.gather(coroutine(1), coroutine(2)) a, b = await gather print(f"a: {a}, b: {b}") ``` ### 4.2 优缺点 - **优点**:单线程实现协程管理,不经过操作系统的线程切换开销,无锁,性能比`thread`好 - **缺点**:CPU密集型任务无法利用多核CPU(实际为单线程模拟并发),异步编程模型相对复杂,需要手动释放控制。需要使用专门的异步库(如 `aiohttp`),具有一定的“传染性”,即 `async` 的调用链需要一直保持。 --- ## 总结 | 特性 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) | |:--- |:--- |:--- |:--- | | **定义** | 操作系统资源分配的基本单位 | 进程内的执行单元,共享进程资源 | 用户级轻量级线程,由程序控制调度 | | **并行能力** | 多核并行(受CPU核心数限制)| 受GIL限制,实际只能单线程并发 | 单线程内并发 | | **内存隔离** | 独立内存空间(默认不共享数据)| 共享进程内存(需线程同步)| 共享线程内存 | | **切换开销** | 高(系统内核调度,涉及内存切换)| 中(系统内核调度)| 极低(无系统调用)| | **创建开销** | 高(需复制父进程资源)| 中(需分配线程栈)| 极低(≈函数调用)| | **适用场景** | CPU密集型任务 | I/O密集型任务(如文件/网络操作)| 高并发I/O密集型任务(如Web服务)| | **通信方式** | 管道/队列/共享内存(需显式IPC)| 共享变量(需加锁)| 直接共享变量(无需锁)| | **崩溃影响** | 独立(一个进程崩溃不影响其他)| 全局(线程崩溃导致整个进程终止)| 局部(协程异常可捕获)| | **Python模块** | `multiprocessing` | `threading` | `asyncio` | **任务是CPU密集型吗?** - 是 -> 使用 `multiprocessing` 来利用多核。 **任务是I/O密集型吗?** - 是 -> - 需要处理海量(上千/上万)的并发连接?-> 优先考虑 `asyncio` (协程) - 并发量不大,或者需要和大量不支持异步的旧代码库交互?-> 使用 `threading` **有了协程后,多线程就没有意义了?** 协程需要通过`await`手动释放控制权,假设有一个后台(不紧急)CPU密集型任务和一个的需要及时响应的小任务,如果在编程时没有考虑好,协程可能会陷入计算密集型但不紧急的任务中,使得需要即时响应的任务没有被及时处理。 多线程使用操作系统进行线程上下文切换,天然保证任务不会出现饥饿现象。