并发与并行编程
基本概念
并发和并行是提升程序性能的两种核心思想,但它们的实现方式和适用场景存在本质差异:
并发指的是程序在逻辑上同时处理多个任务的能力,这些任务可能在单个 CPU 核心上通过快速切换来实现;
并行则是物理上同时执行多个任务,需要多个 CPU 核心的支持。
在 Python 的执行模型中, 我们需要理解三个关键维度:
第一个维度是同步与异步。同步意味着代码按顺序执行,每个操作必须等待前一个操作完成;异步则允许在等待某个操作完成时去执行其他任务;
第二个维度是单线程与多线程。单线程在一个执行流中运行代码,多线程创建多个执行流;
第三个维度是单进程与多进程。进程是操作系统资源分配的基本单位,每个进程拥有独立的内存空间。
Python 程序的性能瓶颈取决于任务类型,主要有两种:
IO 密集型操作包括网络请求、文件读写、数据库查询等。这些操作的特点是 CPU 大部分时间在等待外部资源响应;
CPU 密集型操作则是大量计算任务,如图像处理、科学计算、加密解密等。这类操作会持续占用 CPU 资源。
全局解释器锁 GIL 是 Python 并发编程中的核心概念。CPython 解释器使用 GIL 来保证同一时刻只有一个线程执行 Python 字节码,这意味着即使在多核 CPU 上,多线程程序也无法真正并行执行 Python 代码。GIL 的存在使得多线程对 CPU 密集型任务效果有限,但对 IO 密集型任务依然有效,因为线程在等待 IO 时会释放 GIL。
单线程同步编程
单线程同步是最直观的编程方式, 代码严格按照编写顺序执行。我们通过一个实际案例来观察其特点——下载多个网页内容并统计总字节数。
import urllib.request
import time
def download_page ( url ):
"""下载单个页面并返回内容长度"""
try :
with urllib . request . urlopen ( url , timeout = 10 ) as response :
content = response . read ()
return len ( content )
except Exception as e :
print ( f "下载失败 { url } : { e } " )
return 0
def sync_download ():
"""同步下载多个页面"""
urls = [
'http://www.example.com' ,
'http://www.python.org' ,
'http://www.github.com' ,
'http://www.baidu.com'
]
start_time = time . time ()
total_bytes = 0
for url in urls :
print ( f "开始下载: { url } " )
bytes_downloaded = download_page ( url )
total_bytes += bytes_downloaded
print ( f "完成下载: { url } , 大小: { bytes_downloaded } 字节" )
elapsed = time . time () - start_time
print ( f " \n 总下载: { total_bytes } 字节" )
print ( f "总耗时: { elapsed : .2f } 秒" )
sync_download ()
"""output
开始下载: http://www.example.com
完成下载: http://www.example.com, 大小: 513 字节
开始下载: http://www.python.org
完成下载: http://www.python.org, 大小: 11650 字节
开始下载: http://www.github.com
完成下载: http://www.github.com, 大小: 557606 字节
开始下载: http://www.baidu.com
完成下载: http://www.baidu.com, 大小: 29506 字节
总下载: 599275 字节
总耗时: 4.28 秒
"""
这段代码展示了同步编程的核心特征:
优点:代码逻辑清晰,易于理解和调试。错误处理也很直观,每个操作的结果都是确定的;
缺点:每次网络请求都会阻塞程序执行,直到服务器响应并完成数据传输。如果每个请求需要 2 秒, 那么 5 个请求就需要 10 秒。程序在等待网络响应的时间里完全处于空闲状态,CPU 资源没有得到充分利用。
单线程异步编程
异步编程通过事件循环 (Event Loop) 机制,让单线程能够在等待 IO 时切换到其他任务,从而提高程序的并发能力。Python 3.4 引入的 asyncio 模块提供了完整的异步编程支持。
异步编程的核心是协程 (Coroutine),它是可以暂停和恢复的函数。当协程遇到 IO 操作时,会主动让出控制权给事件循环,事件循环再调度其他就绪的协程执行。这个过程是协作式的,不需要操作系统的线程切换开销。
import asyncio
import time
from urllib.request import Request , urlopen
async def async_download_page ( url ):
"""异步下载单个页面"""
try :
# 使用asyncio的run_in_executor在线程池中执行同步IO
loop = asyncio . get_event_loop ()
request = Request ( url , headers = { 'User-Agent' : 'Mozilla/5.0' })
def sync_download ():
with urlopen ( request , timeout = 10 ) as response :
return response . read ()
content = await loop . run_in_executor ( None , sync_download )
return len ( content )
except Exception as e :
print ( f "下载失败 { url } : { e } " )
return 0
async def async_download_all ():
"""异步下载所有页面"""
urls = [
'http://www.example.com' ,
'http://www.python.org' ,
'http://www.github.com' ,
'http://www.baidu.com'
]
start_time = time . time ()
# 创建所有下载任务
tasks = []
for url in urls :
print ( f "创建任务: { url } " )
task = async_download_page ( url )
tasks . append ( task )
# 并发执行所有任务
results = await asyncio . gather ( * tasks , return_exceptions = True )
total_bytes = sum ( r for r in results if isinstance ( r , int ))
elapsed = time . time () - start_time
print ( f " \n 总下载: { total_bytes } 字节" )
print ( f "总耗时: { elapsed : .2f } 秒" )
asyncio . run ( async_download_all ())
"""output
创建任务: http://www.example.com
创建任务: http://www.python.org
创建任务: http://www.github.com
创建任务: http://www.baidu.com
总下载: 599275 字节
总耗时: 2.81 秒
"""
在这个异步实现中,所有下载任务几乎同时启动。当某个任务在等待网络响应时,事件循环会切换到其他任务。5 个请求的总时间接近单个最慢请求的时间,而不是所有请求时间的总和,这就是异步编程带来的效率提升。
Note
asyncio 本身只提供异步框架,标准库中的 urllib 是同步的,因此我们使用 run_in_executor 在后台线程池中执行同步 IO。真正的异步 IO 需要使用支持异步的库,如 aiohttp。但这个例子很好地展示了如何将同步操作集成到异步框架中。
多线程编程
多线程允许程序创建多个执行流,每个线程可以独立运行。
threading
Python 的 threading 模块提供了完整的线程管理功能。虽然 GIL 限制了多线程在 CPU 密集型任务上的性能,但对于 IO 密集型任务,多线程依然是有效的提速手段。
线程的基本使用涉及三个部分:创建线程 \(\to\) 启动线程 \(\to\) 等待线程结束。
import time
from queue import Queue
from threading import Thread
from urllib.request import urlopen
def download_page ( url ):
"""下载单个页面并返回内容长度"""
try :
with urlopen ( url , timeout = 10 ) as response :
content = response . read ()
return len ( content )
except Exception as e :
print ( f "下载失败 { url } : { e } " )
return 0
def multithread_download ():
"""使用多线程下载"""
urls = [
'http://www.example.com' ,
'http://www.python.org' ,
'http://www.github.com' ,
'http://www.baidu.com'
]
start_time = time . time ()
threads = []
results = Queue ()
for url in urls :
# 创建线程(用了一点小技巧,针对需要处理返回值的目标函数而言)
thread = Thread ( target = lambda u = url : results . put ( download_page ( u )))
# 启动线程
thread . start ()
threads . append ( thread )
print ( f "启动线程下载: { url } " )
# 等待所有线程结束
for thread in threads :
thread . join ()
# 收集结果
total_bytes = 0
while not results . empty ():
total_bytes += results . get ()
elapsed = time . time () - start_time
print ( f " \n 总下载: { total_bytes } 字节" )
print ( f "总耗时: { elapsed : .2f } 秒" )
multithread_download ()
"""output
启动线程下载: http://www.example.com
启动线程下载: http://www.python.org
启动线程下载: http://www.github.com
启动线程下载: http://www.baidu.com
总下载: 637895 字节
总耗时: 2.77 秒
"""
这个实现创建了与 URL 数量相同的线程,每个线程独立完成一次下载。线程之间通过 Queue 进行结果传递,这是线程安全的数据结构。主线程使用 join 方法等待所有工作线程完成,然后汇总结果。
当多个线程访问共享资源时,需要使用同步机制防止数据竞争。Lock 是最基本的同步原语,它确保同一时刻只有一个线程能够执行临界区代码。
import threading
import time
class BankAccount :
"""线程安全的银行账户类"""
def __init__ ( self , initial_balance ):
self . balance = initial_balance
self . lock = threading . Lock ()
self . transaction_count = 0
def deposit ( self , amount ):
"""存款操作"""
with self . lock :
current = self . balance
time . sleep ( 0.0001 ) # 模拟处理时间
self . balance = current + amount
self . transaction_count += 1
def withdraw ( self , amount ):
"""取款操作"""
with self . lock :
current = self . balance
time . sleep ( 0.0001 ) # 模拟处理时间
if current >= amount :
self . balance = current - amount
self . transaction_count += 1
return True
return False
def perform_transactions ( account , operations ):
"""执行一系列交易操作"""
for op_type , amount in operations :
if op_type == 'deposit' :
account . deposit ( amount )
else :
account . withdraw ( amount )
def test_thread_safety ():
"""测试线程安全性"""
account = BankAccount ( 1000 )
# 创建多个线程执行交易
threads = []
operations = [( 'deposit' , 100 ), ( 'withdraw' , 50 )] * 50
for i in range ( 4 ):
thread = threading . Thread ( target = perform_transactions ,
args = ( account , operations ))
threads . append ( thread )
thread . start ()
for thread in threads :
thread . join ()
print ( f "最终余额: { account . balance } " )
print ( f "交易次数: { account . transaction_count } " )
print ( f "预期余额: { 1000 + ( 100 - 50 ) * 50 * 4 } " )
if __name__ == '__main__' :
test_thread_safety ()
这个例子展示了 Lock 的使用。without 锁保护, 多个线程同时修改 balance 会导致数据不一致。使用 with self.lock 语句, 可以确保临界区代码的原子性。Lock 支持上下文管理器协议, 能够自动处理锁的获取和释放,即使发生异常也能正确释放锁。
除了 Lock,threading 模块还提供了 RLock 可重入锁、Semaphore 信号量、Event 事件、Condition 条件变量等同步工具。选择合适的同步机制需要根据具体场景。过度使用锁会降低并发性能,应该尽量缩小临界区范围,避免在持有锁时执行耗时操作。
ThreadPoolExecutor
对于大量任务,频繁创建和销毁线程会带来显著开销,线程池 concurrent.futures.ThreadPoolExecutor 提供了更优雅的解决方案,它维护一个固定大小的线程池,自动管理任务分配和线程重用。
from concurrent.futures import ThreadPoolExecutor , as_completed
import time
from urllib.request import Request , urlopen
def download_with_pool ( url ):
"""使用线程池下载单个URL"""
try :
request = Request ( url , headers = { 'User-Agent' : 'Mozilla/5.0' })
with urlopen ( request , timeout = 10 ) as response :
content = response . read ()
return { 'url' : url , 'size' : len ( content ), 'status' : 'success' }
except Exception as e :
return { 'url' : url , 'error' : str ( e ), 'status' : 'failed' }
def threadpool_download ():
"""使用线程池下载多个URL"""
urls = [
'http://www.example.com' ,
'http://www.python.org' ,
'http://www.github.com' ,
'http://www.stackoverflow.com' ,
'http://www.wikipedia.org'
] * 3 # 增加任务数量以展示线程池效果
start_time = time . time ()
# 创建线程池,最多5个工作线程
with ThreadPoolExecutor ( max_workers = 5 ) as executor :
# 提交所有任务
future_to_url = { executor . submit ( download_with_pool , url ): url
for url in urls }
total_bytes = 0
completed = 0
# 处理完成的任务
for future in as_completed ( future_to_url ):
completed += 1
result = future . result ()
if result [ 'status' ] == 'success' :
total_bytes += result [ 'size' ]
print ( f "[ { completed } / { len ( urls ) } ] 完成: { result [ 'url' ] } " )
elapsed = time . time () - start_time
print ( f " \n 总计下载: { total_bytes } 字节" )
print ( f "总耗时: { elapsed : .2f } 秒" )
print ( f "吞吐量: { len ( urls ) / elapsed : .2f } 个请求/秒" )
if __name__ == '__main__' :
threadpool_download ()
ThreadPoolExecutor 的优势在于简化了线程管理。我们只需要定义工作函数, 然后提交任务即可。线程池会自动调度任务到空闲线程, 并提供 Future 对象来跟踪任务状态和获取结果。使用 as_completed 可以在任务完成时立即处理结果, 而不必等待所有任务完成。
GIL
全局解释器锁对多线程的影响在 CPU 密集型任务中尤为明显。我们通过一个计算密集型的例子来观察:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def cpu_intensive_task ( n ):
"""CPU密集型任务:计算前n个数的平方和"""
total = 0
for i in range ( n ):
total += i * i
return total
def measure_threading_performance ():
"""测试多线程在CPU密集型任务上的性能"""
task_size = 10000000
num_tasks = 4
# 单线程执行
start = time . time ()
results_single = [ cpu_intensive_task ( task_size ) for _ in range ( num_tasks )]
single_time = time . time () - start
print ( f "单线程执行时间: { single_time : .2f } 秒" )
# 多线程执行
start = time . time ()
with ThreadPoolExecutor ( max_workers = 4 ) as executor :
results_multi = list ( executor . map ( cpu_intensive_task ,
[ task_size ] * num_tasks ))
multi_time = time . time () - start
print ( f "多线程执行时间: { multi_time : .2f } 秒" )
print ( f "加速比: { single_time / multi_time : .2f } x" )
if __name__ == '__main__' :
measure_threading_performance ()
运行这段代码会发现,多线程的执行时间与单线程相差无几,甚至可能更慢。这是因为 GIL 确保同一时刻只有一个线程执行 Python 字节码,线程切换还带来了额外开销。对于 CPU 密集型任务,多线程不仅无法利用多核,反而因为频繁的锁竞争和上下文切换降低了性能。
GIL 的设计是为了简化 CPython 解释器的实现,保护内部数据结构的线程安全。虽然 GIL 限制了多线程的性能,但对于 IO 密集型任务,线程在等待 IO 时会释放 GIL,其他线程就能获得执行机会,因此多线程依然有效。
no-GIL
Python 3.13 版本首次引入了实验性的 free-threading 构建选项,这是一个可以禁用 GIL 的独立构建版本。到了 Python 3.14,通过 PEP 779 的批准,free-threading 获得了正式官方支持。虽然仍需要单独安装特殊构建版本,但这标志着 no-GIL Python 已经准备好供生产环境使用。官方的 macOS 和 Windows 安装程序从 3.13 开始就支持安装 free-threaded 版本。
在 free-threading 模式下,多线程可以真正并行执行 Python 代码,不再受 GIL 限制。这对 CPU 密集型任务带来显著性能提升,在多核系统上可以获得接近线性的加速比。我们可以通过一个实际例子来对比有 GIL 和无 GIL 版本的性能差异:
import threading
import time
import sys
def cpu_intensive_work ( n , results , index ):
"""CPU密集型计算任务"""
total = 0
for i in range ( n ):
total += i * i
results [ index ] = total
def benchmark_threading ( num_threads = 4 , work_size = 10000000 ):
"""基准测试:对比单线程和多线程性能"""
# 检测是否启用了GIL
try :
gil_enabled = sys . _is_gil_enabled ()
except AttributeError :
gil_enabled = True # Python 3.13之前的版本
print ( f "GIL状态: { '启用' if gil_enabled else '禁用' } " )
print ( f "Python版本: { sys . version } " )
print ( "-" * 60 )
# 单线程基准测试
print ( f " \n 执行 { num_threads } 个任务:" )
results = [ 0 ] * num_threads
start = time . time ()
for i in range ( num_threads ):
cpu_intensive_work ( work_size , results , i )
single_time = time . time () - start
print ( f "单线程顺序执行: { single_time : .3f } 秒" )
# 多线程基准测试
results = [ 0 ] * num_threads
threads = []
start = time . time ()
for i in range ( num_threads ):
t = threading . Thread ( target = cpu_intensive_work ,
args = ( work_size , results , i ))
threads . append ( t )
t . start ()
for t in threads :
t . join ()
multi_time = time . time () - start
print ( f "多线程并发执行: { multi_time : .3f } 秒" )
speedup = single_time / multi_time
print ( f "加速比: { speedup : .2f } x" )
if gil_enabled :
print ( " \n 说明: GIL已启用,多线程无法真正并行执行CPU任务" )
else :
print ( " \n 说明: GIL已禁用,多线程可以充分利用多核CPU" )
if __name__ == '__main__' :
benchmark_threading ()
在传统的有 GIL 的 Python 版本中运行这段代码,多线程版本的耗时与单线程相近甚至更慢。而在 Python 3.13t 或 3.14t 的 free-threaded 版本中,四线程可以获得接近 3-4 倍的加速。这个差异清晰地展示了 GIL 对多线程性能的限制以及 free-threading 带来的改进。
如果需要在 free-threaded 版本中重新启用 GIL, 可以使用命令行选项-X gil = 1 或设置环境变量 PYTHON_GIL = 1。
no-GIL Python 的特征:
性能方面:no-GIL 带来了额外的运行开销。在 Python 3.13 中,单线程性能大约下降 40%,主要是因为自适应解释器被禁用了。但 Python 3.14 通过以线程安全的方式重新启用自适应解释器,将单线程性能损失降低到 5-10%;
内存方面:no-GIL Python 目前比 GIL Python 高出 15-20%。这是为了实现高效的线程安全所付出的代价。内置类型如 dict、list、set 都使用了内部锁来保护并发修改,这些锁机制增加了内存开销。
no-GIL Python 的线程安全:
no-GIL Python 致力于在 Python 层面提供与 GIL 版本类似的线程安全行为。内置类型使用内部锁来保护并发修改;
但开发者不应依赖这些内部实现细节,而应该使用显式的同步机制。
一个实际的线程安全示例, 展示在 no-GIL 环境下如何正确处理共享状态:
import threading
import time
from collections import defaultdict
class ThreadSafeCounter :
"""线程安全的计数器类"""
def __init__ ( self ):
self . _counts = defaultdict ( int )
self . _lock = threading . Lock ()
def increment ( self , key ):
"""增加计数"""
with self . _lock :
self . _counts [ key ] += 1
def get_counts ( self ):
"""获取所有计数"""
with self . _lock :
return dict ( self . _counts )
def worker ( counter , worker_id , iterations ):
"""工作线程函数"""
for i in range ( iterations ):
counter . increment ( f "worker_ { worker_id } " )
# 模拟一些计算工作
_ = sum ( j * j for j in range ( 100 ))
def test_thread_safety ():
"""测试线程安全性"""
counter = ThreadSafeCounter ()
num_workers = 8
iterations = 10000
threads = []
start = time . time ()
for i in range ( num_workers ):
t = threading . Thread ( target = worker ,
args = ( counter , i , iterations ))
threads . append ( t )
t . start ()
for t in threads :
t . join ()
elapsed = time . time () - start
counts = counter . get_counts ()
print ( f "执行时间: { elapsed : .3f } 秒" )
print ( f "总计数: { sum ( counts . values ()) } " )
print ( f "预期计数: { num_workers * iterations } " )
# 验证正确性
for i in range ( num_workers ):
key = f "worker_ { i } "
assert counts [ key ] == iterations , f " { key } 计数错误"
print ( "线程安全测试通过!" )
if __name__ == '__main__' :
test_thread_safety ()
这个例子展示了即使在 no-GIL 环境下,也需要使用 Lock 来保护共享状态。虽然内置类型有内部保护,但复杂的操作序列仍需要显式同步。正确的同步不仅保证了数据一致性,也确保了程序在不同 Python 版本间的可移植性。
多进程编程
多进程通过创建独立的进程来实现真正的并行。每个进程拥有独立的内存空间和 Python 解释器实例,不受 GIL 限制。这使得多进程成为 CPU 密集型任务的首选方案。
multiprocessing 模块提供了与 threading 类似的接口,降低了从多线程迁移到多进程的难度。但进程间通信比线程间通信更复杂,因为进程之间不共享内存空间。
import multiprocessing
import time
import os
def cpu_bound_task ( n ):
"""CPU密集型任务"""
pid = os . getpid ()
print ( f "进程 { pid } 开始计算" )
total = 0
for i in range ( n ):
total += i * i
print ( f "进程 { pid } 完成计算" )
return total
def compare_single_vs_multi ():
"""比较单进程与多进程性能"""
task_size = 10000000
num_tasks = 4
# 单进程执行
print ( "=== 单进程执行 ===" )
start = time . time ()
results_single = [ cpu_bound_task ( task_size ) for _ in range ( num_tasks )]
single_time = time . time () - start
print ( f "单进程总耗时: { single_time : .2f } 秒 \n " )
# 多进程执行
print ( "=== 多进程执行 ===" )
start = time . time ()
with multiprocessing . Pool ( processes = 4 ) as pool :
results_multi = pool . map ( cpu_bound_task , [ task_size ] * num_tasks )
multi_time = time . time () - start
print ( f "多进程总耗时: { multi_time : .2f } 秒" )
print ( f "加速比: { single_time / multi_time : .2f } x" )
if __name__ == '__main__' :
compare_single_vs_multi ()
在这个例子中,多进程版本在多核 CPU 上能够获得接近线性的加速比。四个进程可以真正并行执行,充分利用 CPU 资源。每个进程都在独立的 Python 解释器中运行,不受 GIL 约束。
Python 提供了两种进程池实现:multiprocessing.Pool 和 concurrent.futures.ProcessPoolExecutor。两者功能相似但接口和设计理念有所不同。multiprocessing.Pool 是较早的实现,提供了 map、apply 等传统接口。ProcessPoolExecutor 则是更现代的设计,与 ThreadPoolExecutor 共享统一的 API,使得在线程和进程之间切换更加容易。
multiprocessing
multiprocessing.Pool 提供了丰富的方法来处理不同场景:
map 方法适合批量处理同构任务,它会阻塞直到所有任务完成并按顺序返回结果;
apply_async 提供异步接口,可以提交单个任务并立即返回,通过回调函数处理结果;
imap 和 imap_unordered 则允许迭代处理结果,适合处理大量任务时减少内存占用。
下面这个例子展示了 Pool 的三种主要使用方式。map 方法最简单,适合需要所有结果的场景。imap 允许边处理边获取结果,内存效率更高。apply_async 提供了最大的灵活性,可以为不同任务设置不同参数,并通过 get 方法控制获取结果的时机。
import multiprocessing
import time
import os
def process_item ( item ):
"""处理单个数据项"""
pid = os . getpid ()
result = item * item
time . sleep ( 0.1 ) # 模拟处理时间
return { 'input' : item , 'output' : result , 'pid' : pid }
def demo_pool_methods ():
"""演示multiprocessing.Pool的不同方法"""
data = list ( range ( 20 ))
print ( "=== 使用 Pool.map ===" )
start = time . time ()
with multiprocessing . Pool ( processes = 4 ) as pool :
results = pool . map ( process_item , data )
print ( f "处理 { len ( data ) } 项,耗时: { time . time () - start : .2f } 秒" )
print ( f "使用的进程ID: { set ( r [ 'pid' ] for r in results ) } " )
print ( f "前3个结果: { results [: 3 ] } \n " )
print ( "=== 使用 Pool.imap (迭代处理) ===" )
start = time . time ()
with multiprocessing . Pool ( processes = 4 ) as pool :
for i , result in enumerate ( pool . imap ( process_item , data )):
if i < 3 : # 只打印前3个
print ( f " 接收到结果 { i } : { result } " )
print ( f "迭代处理完成,耗时: { time . time () - start : .2f } 秒 \n " )
print ( "=== 使用 Pool.apply_async (异步提交) ===" )
start = time . time ()
with multiprocessing . Pool ( processes = 4 ) as pool :
async_results = []
for item in data [: 8 ]: # 只处理部分数据演示
async_result = pool . apply_async ( process_item , ( item ,))
async_results . append ( async_result )
# 等待所有任务完成
results = [ ar . get () for ar in async_results ]
print ( f "异步处理 { len ( results ) } 项,耗时: { time . time () - start : .2f } 秒" )
print ( f "结果: { results [: 3 ] } \n " )
if __name__ == '__main__' :
demo_pool_methods ()
由于进程拥有独立内存空间,数据传递需要通过进程间通信 IPC 机制。multiprocessing 提供了 Queue、Pipe 等工具,以及共享内存对象。
import multiprocessing
import time
import random
def producer ( queue , producer_id , num_items ):
"""生产者进程:生成数据放入队列"""
for i in range ( num_items ):
item = f "Producer- { producer_id } -Item- { i } "
queue . put ( item )
print ( f "生产者 { producer_id } 生产: { item } " )
time . sleep ( random . uniform ( 0.1 , 0.3 ))
queue . put ( None ) # 发送结束信号
def consumer ( queue , consumer_id ):
"""消费者进程:从队列取数据处理"""
while True :
item = queue . get ()
if item is None :
queue . put ( None ) # 传递结束信号给其他消费者
break
print ( f "消费者 { consumer_id } 处理: { item } " )
time . sleep ( random . uniform ( 0.1 , 0.2 ))
def producer_consumer_demo ():
"""生产者-消费者模式演示"""
queue = multiprocessing . Queue ( maxsize = 10 )
# 创建2个生产者
producers = []
for i in range ( 2 ):
p = multiprocessing . Process ( target = producer ,
args = ( queue , i , 5 ))
producers . append ( p )
p . start ()
# 创建3个消费者
consumers = []
for i in range ( 3 ):
c = multiprocessing . Process ( target = consumer ,
args = ( queue , i ))
consumers . append ( c )
c . start ()
# 等待所有进程完成
for p in producers :
p . join ()
for c in consumers :
c . join ()
print ( "所有任务完成" )
if __name__ == '__main__' :
producer_consumer_demo ()
这个生产者-消费者模式展示了 Queue 在进程间的使用。Queue 是进程安全的,多个进程可以同时读写而不会发生数据竞争。生产者将数据放入队列,消费者从队列取出数据处理,Queue 自动处理同步和阻塞。
对于需要高效共享大量数据的场景,可以使用 shared memory 共享内存。Array 和 Value 允许多个进程直接访问同一块内存,避免了数据复制的开销。
import multiprocessing
import time
def parallel_sum_worker ( shared_array , start , end , result , index ):
"""计算数组片段的和"""
partial_sum = sum ( shared_array [ start : end ])
result [ index ] = partial_sum
def parallel_array_sum ():
"""使用多进程并行计算数组和"""
size = 10000000
num_processes = 4
# 创建共享内存数组
shared_array = multiprocessing . Array ( 'i' , range ( size ))
result = multiprocessing . Array ( 'i' , num_processes )
start_time = time . time ()
# 划分任务
chunk_size = size // num_processes
processes = []
for i in range ( num_processes ):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else size
p = multiprocessing . Process ( target = parallel_sum_worker ,
args = ( shared_array , start , end , result , i ))
processes . append ( p )
p . start ()
for p in processes :
p . join ()
total = sum ( result )
elapsed = time . time () - start_time
print ( f "并行计算结果: { total } " )
print ( f "耗时: { elapsed : .2f } 秒" )
if __name__ == '__main__' :
parallel_array_sum ()
共享内存适合读多写少的场景。虽然避免了数据复制,但访问共享内存时仍需要考虑同步问题。multiprocessing 提供了 Lock 等同步原语来保护共享内存的并发访问。
对于需要在进程间共享复杂对象的场景,可以考虑使用 Manager。Manager 提供了一个服务进程来管理共享对象,支持 list、dict 等复杂数据结构,但性能不如直接使用共享内存。
import multiprocessing
import time
def worker_with_manager ( shared_dict , worker_id ):
"""使用Manager共享字典的工作进程"""
for i in range ( 5 ):
key = f "worker_ { worker_id } _item_ { i } "
shared_dict [ key ] = f "Data from worker { worker_id } "
time . sleep ( 0.1 )
def manager_demo ():
"""Manager使用演示"""
with multiprocessing . Manager () as manager :
shared_dict = manager . dict ()
processes = []
for i in range ( 3 ):
p = multiprocessing . Process ( target = worker_with_manager ,
args = ( shared_dict , i ))
processes . append ( p )
p . start ()
for p in processes :
p . join ()
print ( "共享字典内容:" )
for key , value in shared_dict . items ():
print ( f " { key } : { value } " )
if __name__ == '__main__' :
manager_demo ()
Manager 提供了进程安全的共享对象,但每次访问都需要通过进程间通信,性能开销较大。对于性能敏感的场景,应该尽量减少对共享对象的访问频率,或者选择更高效的通信方式。
ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor 采用了 Future 模式,这是 concurrent.futures 模块的核心设计。Future 对象代表一个异步执行的操作,可以查询状态、等待完成、获取结果或异常。这种设计更加面向对象,与现代异步编程范式更契合。
from concurrent.futures import ProcessPoolExecutor , as_completed , wait , FIRST_COMPLETED
import time
import os
def process_item_with_delay ( item , delay = 0.1 ):
"""处理数据项,可配置延迟"""
pid = os . getpid ()
time . sleep ( delay )
if item == 15 : # 模拟某个任务失败
raise ValueError ( f "处理项 { item } 时发生错误" )
return { 'input' : item , 'output' : item * item , 'pid' : pid }
def demo_executor_basic ():
"""ProcessPoolExecutor基本用法"""
data = list ( range ( 20 ))
print ( "=== 使用 submit 提交任务 ===" )
start = time . time ()
with ProcessPoolExecutor ( max_workers = 4 ) as executor :
# 提交所有任务,返回Future对象
futures = { executor . submit ( process_item_with_delay , item ): item
for item in data }
# 使用as_completed按完成顺序处理结果
completed_count = 0
for future in as_completed ( futures ):
completed_count += 1
item = futures [ future ]
try :
result = future . result ()
if completed_count <= 3 :
print ( f " 任务 { item } 完成: { result } " )
except Exception as e :
print ( f " 任务 { item } 失败: { e } " )
print ( f "所有任务完成,耗时: { time . time () - start : .2f } 秒 \n " )
def demo_executor_map ():
"""ProcessPoolExecutor的map方法"""
data = list ( range ( 20 ))
print ( "=== 使用 map 方法 ===" )
start = time . time ()
with ProcessPoolExecutor ( max_workers = 4 ) as executor :
# map会按顺序返回结果
results = executor . map ( process_item_with_delay , data )
# 可以迭代处理结果
for i , result in enumerate ( results ):
if i < 3 :
print ( f " 结果 { i } : { result } " )
print ( f "Map处理完成,耗时: { time . time () - start : .2f } 秒 \n " )
def demo_executor_wait ():
"""使用wait控制任务完成"""
data = list ( range ( 12 ))
print ( "=== 使用 wait 等待部分任务 ===" )
with ProcessPoolExecutor ( max_workers = 4 ) as executor :
futures = [ executor . submit ( process_item_with_delay , item , 0.2 )
for item in data ]
# 等待至少一个任务完成
done , pending = wait ( futures , return_when = FIRST_COMPLETED )
print ( f "第一批完成: { len ( done ) } 个任务" )
print ( f "还在执行: { len ( pending ) } 个任务" )
# 等待所有任务完成
done , pending = wait ( futures )
print ( f "全部完成: { len ( done ) } 个任务 \n " )
if __name__ == '__main__' :
demo_executor_basic ()
demo_executor_map ()
demo_executor_wait ()
ProcessPoolExecutor 的 submit 方法返回 Future 对象,可以灵活地控制任务执行和结果获取。as_completed 函数允许按完成顺序处理结果,而不必等待所有任务完成。wait 函数提供了更细粒度的控制,可以等待第一个完成、全部完成、或任意数量完成。
两种进程池的对比可以通过以下维度来理解:
在接口设计上。Pool 提供了传统的函数式接口,如 map、apply 等,这些方法名称直观但功能相对固定。ProcessPoolExecutor 采用统一的 submit 接口,返回 Future 对象,提供了更大的灵活性和一致性;
在错误处理方面。Pool 的 map 会在任何任务失败时抛出异常,中断整个批处理。而 ProcessPoolExecutor 可以单独处理每个 Future 的异常,不影响其他任务继续执行。这在处理可能部分失败的大批量任务时很有价值;
在结果获取上。Pool 的 map 会阻塞直到所有结果就绪,imap 虽然可以迭代但仍按提交顺序返回。ProcessPoolExecutor 的 as_completed 允许按完成顺序立即处理结果,充分利用了并行执行的优势,对于任务耗时不均匀的场景特别有用。
运行下面这个对比示例可以清楚地看到差异。Pool 的 imap 必须等待第一个任务完成才能返回第一个结果,即使后面的任务已经完成。ProcessPoolExecutor 的 as_completed 则立即返回任何已完成的任务结果,显著改善了响应性和资源利用率。
import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor , as_completed
def variable_time_task ( n ):
"""耗时不均匀的任务"""
time . sleep ( n * 0.1 )
return n * n
def compare_result_handling ():
"""对比两种进程池的结果处理方式"""
tasks = [ 5 , 1 , 8 , 2 , 6 , 3 , 7 , 4 ]
print ( "=== multiprocessing.Pool (按顺序返回) ===" )
start = time . time ()
with Pool ( processes = 4 ) as pool :
for i , result in enumerate ( pool . imap ( variable_time_task , tasks )):
elapsed = time . time () - start
print ( f " { elapsed : .1f } 秒 - 获得第 { i } 个结果: { result } " )
print ( f "总耗时: { time . time () - start : .2f } 秒 \n " )
print ( "=== ProcessPoolExecutor (按完成顺序返回) ===" )
start = time . time ()
with ProcessPoolExecutor ( max_workers = 4 ) as executor :
futures = { executor . submit ( variable_time_task , n ): n for n in tasks }
for future in as_completed ( futures ):
elapsed = time . time () - start
n = futures [ future ]
result = future . result ()
print ( f " { elapsed : .1f } 秒 - 任务 { n } 完成,结果: { result } " )
print ( f "总耗时: { time . time () - start : .2f } 秒 \n " )
if __name__ == '__main__' :
compare_result_handling ()
两者在性能上没有显著差异,在选择使用哪种进程池时,需要考虑具体需求。对于简单的批量处理场景,Pool 的 map 方法可能更直接简洁。否则,建议使用 ProcessPoolExecutor 这种更现代的方法。
进程池还支持 map 方法, 它会自动将可迭代对象分块分配给工作进程。对于简单的并行任务,map 提供了最简洁的接口。
from concurrent.futures import ProcessPoolExecutor
import time
def process_data ( data_chunk ):
"""处理数据块"""
return sum ( x * x for x in data_chunk )
def batch_process_with_map ():
"""使用map方法批量处理数据"""
# 生成大量数据
data = list ( range ( 10000000 ))
chunk_size = 1000000
chunks = [ data [ i : i + chunk_size ] for i in range ( 0 , len ( data ), chunk_size )]
start = time . time ()
with ProcessPoolExecutor ( max_workers = 4 ) as executor :
results = executor . map ( process_data , chunks )
total = sum ( results )
elapsed = time . time () - start
print ( f "处理结果: { total } " )
print ( f "耗时: { elapsed : .2f } 秒" )
if __name__ == '__main__' :
batch_process_with_map ()
map 方法会按照输入顺序返回结果,这在需要保持顺序的场景下很有用。它的语法也更接近函数式编程风格,代码更简洁。
多进程的注意事项
使用多进程需要注意几个关键点:
进程启动开销。创建新进程比创建线程耗时更多,因此多进程更适合长时间运行的计算任务;
数据传递成本。进程间通信需要序列化和反序列化数据,大对象的传递会带来显著开销;
内存占用开销。每个进程都有独立的内存空间,多进程应用的总内存消耗会更高。
在 Windows 平台上,multiprocessing 的行为与 Unix 系统有所不同。Windows 不支持 fork,必须使用 spawn 方式启动进程,这要求主模块必须可以被安全导入。因此代码必须放在 if __name__ == '__main__' 保护块中, 否则会导致递归创建进程。
FAQ
GPU 加速与 CUDA
对于特定类型的计算密集型任务,GPU 可以提供远超 CPU 的并行计算能力。虽然 Python 标准库不直接支持 CUDA 编程,但了解这个方向对于高性能计算很重要。GPU 包含数千个计算核心,适合处理大规模并行的数值计算,如矩阵运算、图像处理、深度学习等。
使用 GPU 加速需要借助第三方库。CUDA 工具链需要 NVIDIA GPU 和相应的驱动程序。常用的 Python 库包括 CuPy 用于数组计算、Numba 用于即时编译、以及各种深度学习框架内置的 GPU 支持。这些工具能够将计算任务分配到 GPU 的众多核心上并行执行,在适合的场景下可以获得数十倍甚至上百倍的加速。
GPU 加速的适用场景具有明确特征: 任务可以分解为大量独立的小计算单元,数据访问模式规整,计算密度高。典型应用包括科学计算中的矩阵操作、信号处理、图像视频编解码、机器学习模型训练等。但 GPU 加速也有局限性,数据在 CPU 和 GPU 之间传输需要时间,对于小规模计算或者逻辑复杂的任务,GPU 的优势无法体现。
在设计高性能应用时,通常采用混合方案: 使用多进程充分利用 CPU 多核,将适合的计算任务 offload 到 GPU,通过异步 IO 处理网络和磁盘操作。这种架构能够最大化硬件利用率,适应不同类型的工作负载。
性能对比与选择策略
不同并发策略的性能特征可以通过表格清晰呈现:
策略
适用任务类型
并发数量
开销
GIL 限制
内存占用
单线程同步
简单任务
1
最低
无影响
最低
单线程异步
IO 密集型
数千+
低
无影响
低
多线程
IO 密集型
数十到数百
中等
受限
中等
多进程
CPU 密集型
CPU 核心数
高
无限制
高
选择合适的并发策略需要分析任务特征:
对于网络爬虫、API 调用、数据库查询等 IO 密集型任务,首选 asyncio 异步编程,其次是多线程。这类任务的瓶颈在等待外部响应,而不是 CPU 计算,因此单线程异步就能获得很好的并发效果,同时保持代码的简洁性;
对于图像处理、科学计算、加密解密等 CPU 密集型任务,必须使用多进程才能突破 GIL 限制。进程数量通常设置为 CPU 核心数或略多一些,过多的进程会因为上下文切换降低性能。如果任务规模巨大且适合并行,考虑使用 GPU 加速。
混合任务需要组合使用多种策略。例如一个 Web 服务需要处理大量并发连接,每个连接可能涉及 IO 操作和计算任务。可以使用 asyncio 处理网络 IO,用进程池处理 CPU 密集型计算,通过 asyncio 的 run_in_executor 将计算任务分配到进程池。
下面这个例子展示了如何结合异步 IO 和多进程的优势:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_computation ( n ):
"""CPU密集型计算"""
result = sum ( i * i for i in range ( n ))
return result
async def handle_request ( request_id , executor ):
"""处理单个请求"""
print ( f "请求 { request_id } 开始处理" )
# 模拟IO操作
await asyncio . sleep ( 0.1 )
# 将CPU密集型任务委托给进程池
loop = asyncio . get_event_loop ()
result = await loop . run_in_executor ( executor , heavy_computation , 1000000 )
# 模拟更多IO操作
await asyncio . sleep ( 0.1 )
print ( f "请求 { request_id } 完成,结果: { result } " )
return result
async def hybrid_server ():
"""混合使用异步和多进程的服务器"""
executor = ProcessPoolExecutor ( max_workers = 4 )
start = time . time ()
# 并发处理多个请求
tasks = [ handle_request ( i , executor ) for i in range ( 10 )]
results = await asyncio . gather ( * tasks )
elapsed = time . time () - start
print ( f " \n 处理 { len ( tasks ) } 个请求,耗时: { elapsed : .2f } 秒" )
executor . shutdown ()
if __name__ == '__main__' :
asyncio . run ( hybrid_server ())
asyncio 处理大量并发连接和 IO 操作,进程池处理 CPU 密集型计算。两者通过 run_in_executor 无缝集成,既保持了高并发能力,又能充分利用多核 CPU。
实际项目中还需要考虑错误处理、资源限制、监控调试等工程问题。异步代码的调试比同步代码困难,需要使用专门的工具。多进程应用需要处理进程崩溃和重启。合理设置超时、重试、熔断等机制,能够提高系统的健壮性。
实践建议与最佳实践
编写高质量的并发代码需要遵循一些原则。首先是避免过早优化,在性能问题真正出现之前,保持代码简单清晰比追求极致性能更重要。通过性能分析工具定位真正的瓶颈,然后针对性地优化热点代码。
并发代码的测试和调试具有特殊挑战。竞态条件和死锁等问题具有不确定性,难以重现和定位。编写并发代码时应该遵循 "共享尽可能少" 的原则,减少线程或进程间的共享状态。优先使用不可变数据结构,使用队列等线程安全的通信机制,而不是直接访问共享变量。
资源管理是并发编程的另一个重点。线程池和进程池应该合理配置大小,过大会浪费资源,过小会限制并发度。使用上下文管理器确保资源正确释放,避免连接泄漏、文件句柄耗尽等问题。对于长时间运行的服务,需要实现优雅关闭机制,确保未完成的任务能够正常结束。
监控和日志对于诊断并发问题至关重要。记录关键操作的时间戳,可以帮助分析性能瓶颈。在多进程应用中,每个进程应该标识自己的 ID,便于追踪日志来源。使用结构化日志和分布式追踪工具,能够更好地理解复杂的并发行为。
在选择并发策略时,还需要考虑团队的技术栈和维护成本。异步编程虽然性能出色,但学习曲线陡峭,可能增加新成员的上手难度。多进程虽然能够突破 GIL,但调试和部署更复杂。有时候一个简单的多线程方案,虽然不是最优的,但如果能够满足需求且易于维护,就是好的选择。
总结与展望
Python 提供了丰富的并发编程工具,从简单的多线程到复杂的异步编程,从多进程到 GPU 加速,每种技术都有其适用场景。理解任务的特性是选择正确策略的关键: IO 密集型任务适合异步或多线程,CPU 密集型任务需要多进程,特定的数值计算可以利用 GPU。
GIL 是 Python 并发模型的核心特征,它简化了解释器的实现,但也限制了多线程在 CPU 密集型任务上的性能。Python 3.13 引入的 free-threading 是一个重要的尝试,虽然目前还是实验性的,但预示着 Python 并发能力的未来演进方向。在 GIL 被彻底移除之前,多进程仍然是突破这一限制的主要手段。
并发编程如同一把双刃剑。它能够显著提升程序性能,让应用更好地利用硬件资源,但也引入了复杂性和潜在的 bug。数据竞争、死锁、资源泄漏等问题需要开发者更加谨慎地设计和测试。好的并发程序不仅仅是快速的,更应该是正确的、可维护的、易于理解的。
在实际开发中,很少有任务纯粹属于某一类型。大多数应用都是混合工作负载,需要组合使用多种技术。掌握各种并发工具的特性和适用场景,能够让开发者根据具体需求做出明智的选择,设计出既高效又可靠的系统。正如调音师需要理解每种乐器的音色来编排和谐的乐章,优秀的开发者需要理解每种并发技术的特点来构建高性能的应用。
并发与并行只是手段,性能和可维护性才是目的。