# Summary
## 进程/线程/协程
- **进程**:计算机系统分配资源的最小单位,操作系统调度
- **线程**:CPU调度的基本单位,操作系统调度
- **协程**:轻量级线程,用户程序调度
# Cues
# Notes
# 15分钟简单了解Python并发编程
*时间:20250625*
---
## 1. 前置知识
### 进程/线程/协程
- **进程**:计算机系统分配资源的最小单位,操作系统调度
- **线程**:CPU调度的基本单位,操作系统调度
- **协程**:轻量级线程,用户程序调度
### 并发 VS 并行
- **并发**:同一时间段
- **并行**:同一时间(多核CPU,多处理器系统,多机并行)
---
## 2. 多线程
### 2.1 实现方式
Python多线程实现方式主要有3种
#### 2.1.1 实例化Thread并传入目标函数
```python
import time
import threading
def worker(sleep_time: int):
print(f"线程{threading.current_thread().name}开始执行")
time.sleep(sleep_time)
print(f"线程{threading.current_thread().name}执行完成")
# 创建两个线程
t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
# 启动线程
t1.start()
t2.start()
# 等待线程执行完毕
t1.join()
t2.join()
```
> **output**
> ```Java
> 线程Thread-1 (worker)开始执行
> 线程Thread-2 (worker)开始执行
> 线程Thread-1 (worker)执行完成
> 线程Thread-2 (worker)执行完成
> ```
#### 2.1.2 继承Thread并重写run方法
```python
import time
import threading
class MyThread(threading.Thread):
def __init__(self, sleep_time: int):
super().__init__()
self.sleep_time = sleep_time
def run(self):
print(f"线程{threading.current_thread().name}开始执行")
time.sleep(self.sleep_time)
print(f"线程{threading.current_thread().name}执行完成")
# 创建两个线程
t1 = MyThread(1)
t2 = MyThread(2)
# 启动线程
t1.start()
t2.start()
# 等待线程执行完毕
t1.join()
t2.join()
```
> **output**
> ```Java
> 线程Thread-1开始执行
> 线程Thread-2开始执行
> 线程Thread-1执行完成
> 线程Thread-2执行完成
> ```
#### 2.1.3 使用ThreadPoolExecutor来实现线程池
`concurrent.futures`模块中实现了线程池,`feature`对象可以方便的获取方法的返回值
```python
import concurrent.futures
import time
import threading
def worker(sleep_time: int):
print(f"线程{threading.current_thread().name}开始执行")
time.sleep(sleep_time)
print(f"线程{threading.current_thread().name}执行完成")
return sleep_time
# 创建线程池,最大线程数为3
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务
future_to_num = {executor.submit(worker, num): num for num in range(3)}
for future in concurrent.futures.as_completed(future_to_num):
num = future_to_num[future]
try:
result = future.result()
except Exception as e:
print(f"任务 {num} 执行失败: {e}")
else:
print(f"任务 {num} 结果: {result}")
```
> **output**
> ```Java
> 线程ThreadPoolExecutor-0_0开始执行
> 线程ThreadPoolExecutor-0_1开始执行
> 线程ThreadPoolExecutor-0_0执行完成
> 任务 0 结果: 0
> 线程ThreadPoolExecutor-0_0开始执行
> 线程ThreadPoolExecutor-0_1执行完成
> 任务 2 结果: 2
> 线程ThreadPoolExecutor-0_0执行完成
> 任务 1 结果: 1
> ```
> *注:原文输出有误,根据代码逻辑修正。*
> 和Java多线程的API类似,继承Thread,实现runable接口,ExecutorService 创建线程池
### 2.2 多核CPU下的多线程
**问题:在多核CPU上分别执行下面代码,结果如何?**
**Python多线程**
```python
import threading
import time
def count_down(n):
while n > 0:
n -= 1
# 单线程
start = time.time()
count_down(100000000)
count_down(100000000)
print(f"Python单线程: {time.time()-start:.2f}秒")
# 多线程
start = time.time()
t1 = threading.Thread(target=count_down, args=(100000000,))
t2 = threading.Thread(target=count_down, args=(100000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Python多线程: {time.time()-start:.2f}秒")
```
**Java多线程**
```java
public class Test {
static void countDown(long n) {
while (n > 0) { n--; }
}
public static void main(String[] args) throws InterruptedException {
// 单线程
long start = System.currentTimeMillis();
countDown(100000000L);
countDown(100000000L);
System.out.printf("Java单线程: %.2f秒\n", (System.currentTimeMillis()-start)/1000.0);
// 多线程
start = System.currentTimeMillis();
Thread t1 = new Thread(() -> countDown(100000000L));
Thread t2 = new Thread(() -> countDown(100000000L));
t1.start(); t2.start();
t1.join(); t2.join();
System.out.printf("Java多线程: %.2f秒\n", (System.currentTimeMillis()-start)/1000.0);
}
}
```
> ```Java
> Python单线程: 4.88秒
> Python多线程: 4.85秒
>
> Java单线程: 0.4920秒
> Java多线程: 0.0010秒
> ```
**多线程几乎没有加速?**
- CPU密集型任务 vs IO密集型?
- 全局解释器锁
### 2.3 全局解释器锁
#### 2.3.1 是什么
全局解释器锁(Global Interpreter Lock,GIL)是Python 解释器的一个特性,确保同一时刻只有一个线程能执行 Python 字节码。
**线程切换时机?**
- 固定时间间隔 / 一定的数量字节码
- 遇到IO操作 / 阻塞(`open()`, `time.sleep()`等)
#### 2.3.2 为什么
1. **简化设计**:Python使用引用计数管理内存,GIL避免了复杂的锁机制
- 没有GIL时:每次增减引用计数都需要加锁,性能反而更差
2. **保护数据**:防止多线程同时修改Python对象导致数据混乱
- 比如两个线程同时修改同一个列表
- 没有GIL时,每个C扩展都需要自己实现线程安全
3. **历史原因**:Python诞生时多核CPU还不普及
#### 2.3.3 影响
1. C语言扩展
2. 多进程
3. 异步IO优化
### 2.4 优缺点
- **优点**:可以加速IO密集型任务的效率
- **缺点**:受GIL限制,实际只能单线程并发,无法利用多核CPU的全部能力
---
## 3. 多进程
`multiprocessing`模块充分利用多核 CPU的优势,绕开GIL,大幅提升CPU密集型任务的处理效率。
每个进程都有自己独立的内存空间和独立的Python解释器,因此也有自己独立的GIL。
### 3.1 实现方式
#### 3.1.1 实例化Process并传入目标函数
```python
import time
import multiprocessing
def worker(sleep_time: int):
print(f"进程{multiprocessing.current_process().name}开始执行")
time.sleep(sleep_time)
print(f"进程{multiprocessing.current_process().name}执行完成")
if __name__ == '__main__':
# 创建两个进程
p1 = multiprocessing.Process(target=worker, args=(1,))
p2 = multiprocessing.Process(target=worker, args=(2,))
# 启动进程
p1.start()
p2.start()
# 等待进程执行完毕
p1.join()
p2.join()
```
> **output**
> ```Java
> 进程Process-1开始执行
> 进程Process-2开始执行
> 进程Process-1执行完成
> 进程Process-2执行完成
> ```
#### 3.1.2 继承Process并重写run方法
```python
import time
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self, sleep_time: int):
super().__init__()
self.sleep_time = sleep_time
def run(self):
print(f"进程{multiprocessing.current_process().name}开始执行")
time.sleep(self.sleep_time)
print(f"进程{multiprocessing.current_process().name}执行完成")
if __name__ == '__main__':
# 创建两个线进程
p1 = MyProcess(1)
p2 = MyProcess(2)
# 启动进程
p1.start()
p2.start()
# 等待进程执行完毕
p1.join()
p2.join()
```
> **output**
> ```Java
> 进程MyProcess-1开始执行
> 进程MyProcess-2开始执行
> 进程MyProcess-1执行完成
> 进程MyProcess-2执行完成
> ```
#### 3.1.3 使用PoolExecutorlExecutor来实现进程池
`concurrent.futures`模块中实现了进程池,`feature`对象可以方便的获取方法的返回值
```python
import multiprocessing
import concurrent.futures
import time
def worker(sleep_time: int):
print(f"进程{multiprocessing.current_process().name}开始执行")
time.sleep(sleep_time)
print(f"进程{multiprocessing.current_process().name}执行完成")
return sleep_time
if __name__ == '__main__':
# 创建线程池,最大线程数为3
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
# 提交任务
future_to_num = {executor.submit(worker, num): num for num in range(3)}
for future in concurrent.futures.as_completed(future_to_num):
num = future_to_num[future]
try:
result = future.result()
except Exception as e:
print(f"任务 {num} 执行失败: {e}")
else:
print(f"任务 {num} 结果: {result}")
```
> **output**
> ```Java
> 进程SpawnProcess-1开始执行
> 进程SpawnProcess-2开始执行
> 进程SpawnProcess-1执行完成
> 任务 0 结果: 0
> 进程SpawnProcess-1开始执行
> 进程SpawnProcess-2执行完成
> 任务 1 结果: 1
> 进程SpawnProcess-1执行完成
> 任务 2 结果: 2
> ```
> *注:原文输出有误,根据代码逻辑修正。*
> API与线程方式相似,`thread` -> `process`
### 3.2 优缺点
- **优点**:绕开GIL,充分利用多核CPU资源
- **缺点**:资源开销大(创建进程比线程慢,占用内存多),进程间通信(IPC)比线程间共享数据要复杂
---
## 4. 协程
- 轻量级线程,用户程序调度
协程在单个线程内执行,通过在执行代码中暂停和恢复,实现多任务的并发执行。
异步模块`asyncio`在Python 3.4 版本中被首次引入,在Python 3.5时,引入了`async`和`await`关键字。
### 核心概念
- `async def`: 定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。
- `await`: 暂停当前协程的执行,将控制权交还给事件循环,去执行其他任务,直到等待的操作完成。`await`只能用在`async def`函数内部。
### 4.1 从一个简单的协程案例介绍
```python
import asyncio
import time
# 1. 定义协程函数
async def coroutine(delay: int):
print(f'开始执行协程函数, {delay}')
# 模拟耗时的操作
await asyncio.sleep(delay)
print(f'协程函数执行结束, {delay}')
return f'协程函数{delay}执行结束'
async def main():
a = await coroutine(1)
b = await coroutine(2)
print(f"a: {a}, b: {b}")
if __name__ == '__main__':
c = coroutine(1)
print(c)
start = time.time()
# 2. 运行最高层级的协程
asyncio.run(main())
print(f"执行协程函数耗时: {time.time() - start}s")
```
> **output**
> ```Java
> <coroutine object coroutine at 0x000002A0B2638040>
> 开始执行协程函数, 1
> 协程函数执行结束, 1
> 开始执行协程函数, 2
> 协程函数执行结束, 2
> a: 协程函数1执行结束, b: 协程函数2执行结束
> 执行协程函数耗时: 3.0214524269104004s
> ```
#### 4.1.1 async
- 协程函数 -> 生成器函数
- 运行协程 -> 生成器`next()`
`asyncio.run()`创建一个事件循环,运行指定的协程函数。
#### 4.1.2 事件循环
1. **检查协程**:检查可以执行的协程
2. **让出控制**:控制权转交给要执行的协程
3. **等待协程**:等待协程暂停或结束后,重新得到控制权
**为什么没有加速?**
协程`main()`未就绪,无法被事件循环调度。
#### 4.1.3 await
1. 暂停当前协程
2. 注册事件循环
3. 获取`await`后的协程结果
可等待对象:协程对象,`task`,`future`。
`asyncio.create_task()`: 将协程对象变为一个`task`,并注册到事件循环中。
```python
import asyncio
import time
# 1. 定义协程函数
async def coroutine(delay: int):
print(f'开始执行协程函数, {delay}')
# 模拟耗时的操作
await asyncio.sleep(delay)
print(f'协程函数执行结束, {delay}')
return f'协程函数{delay}执行结束'
async def main():
task_a = asyncio.create_task(coroutine(1))
task_b = asyncio.create_task(coroutine(2))
a = await task_a
b = await task_b
print(f"a: {a}, b: {b}")
if __name__ == '__main__':
c = coroutine(1)
print(c)
start = time.time()
# 2. 运行最高层级的协程
asyncio.run(main())
print(f"执行协程函数耗时: {time.time() - start}s")
```
> **output**
> ```Java
> <coroutine object coroutine at 0x00000284D5808040>
> 开始执行协程函数, 1
> 开始执行协程函数, 2
> 协程函数执行结束, 1
> 协程函数执行结束, 2
> a: 协程函数1执行结束, b: 协程函数2执行结束
> 执行协程函数耗时: 2.014808416366577s
> ```
可以通过`asyncio.gather()`将协程对象/task统一注册到事件循环中,等待每一个可等待对象都执行完成后才完成(类似Java `CompletableFuture.allOf()`)
```python
async def main():
gather = asyncio.gather(coroutine(1), coroutine(2))
a, b = await gather
print(f"a: {a}, b: {b}")
```
### 4.2 优缺点
- **优点**:单线程实现协程管理,不经过操作系统的线程切换开销,无锁,性能比`thread`好
- **缺点**:CPU密集型任务无法利用多核CPU(实际为单线程模拟并发),异步编程模型相对复杂,需要手动释放控制。需要使用专门的异步库(如 `aiohttp`),具有一定的“传染性”,即 `async` 的调用链需要一直保持。
---
## 总结
| 特性 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) |
|:--- |:--- |:--- |:--- |
| **定义** | 操作系统资源分配的基本单位 | 进程内的执行单元,共享进程资源 | 用户级轻量级线程,由程序控制调度 |
| **并行能力** | 多核并行(受CPU核心数限制)| 受GIL限制,实际只能单线程并发 | 单线程内并发 |
| **内存隔离** | 独立内存空间(默认不共享数据)| 共享进程内存(需线程同步)| 共享线程内存 |
| **切换开销** | 高(系统内核调度,涉及内存切换)| 中(系统内核调度)| 极低(无系统调用)|
| **创建开销** | 高(需复制父进程资源)| 中(需分配线程栈)| 极低(≈函数调用)|
| **适用场景** | CPU密集型任务 | I/O密集型任务(如文件/网络操作)| 高并发I/O密集型任务(如Web服务)|
| **通信方式** | 管道/队列/共享内存(需显式IPC)| 共享变量(需加锁)| 直接共享变量(无需锁)|
| **崩溃影响** | 独立(一个进程崩溃不影响其他)| 全局(线程崩溃导致整个进程终止)| 局部(协程异常可捕获)|
| **Python模块** | `multiprocessing` | `threading` | `asyncio` |
**任务是CPU密集型吗?**
- 是 -> 使用 `multiprocessing` 来利用多核。
**任务是I/O密集型吗?**
- 是 ->
- 需要处理海量(上千/上万)的并发连接?-> 优先考虑 `asyncio` (协程)
- 并发量不大,或者需要和大量不支持异步的旧代码库交互?-> 使用 `threading`
**有了协程后,多线程就没有意义了?**
协程需要通过`await`手动释放控制权,假设有一个后台(不紧急)CPU密集型任务和一个的需要及时响应的小任务,如果在编程时没有考虑好,协程可能会陷入计算密集型但不紧急的任务中,使得需要即时响应的任务没有被及时处理。
多线程使用操作系统进行线程上下文切换,天然保证任务不会出现饥饿现象。