Python/python 使用 concurrent.futures 多进程注意事项

Python/python 使用 concurrent.futures 多进程注意事项

线程池和进程池

concurrent.futures 是 Python3.2 加入标准库的一个模块,它提供异步执行回调高层接口,是对线程池和进程池的进一步封装,让开发者可以使用统一的接口非常容易的使用线程池和进程池。

这个模块中有两个核心的类:ThreadPoolExecutor (线程池)和ProcessPoolExecutor(进程池)。

  1. 执行重 I/O 操作的任务 (IO 密集型) 选择 ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络、磁盘 I/O 相关的内容。
  2. 执行重 CPU 的任务 (CPU 密集型) 选择 ProcessPoolExecutor,例如大量消耗 CPU 的数学与逻辑运算、视频编解码等内容。并且 ProcessPoolExecutor可以避开 GIL 的问题,实现真正的并行处理。

submit 和 map 的区别

ProcessPoolExecutorThreadPoolExecutor类中最重要的 2 个方法如下:

  • submit: 提交一个任务执行,并返回 Future 对象。通过 Future 对象的result()方法可以获得返回结果。如果没有返回结果,则为 None。
  • map: 提交多个任务执行,返回结果列表。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import concurrent.futures
# 定义函数
def plus_1(n):
return n+1


# if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor() as executor:
# 使用 submit 提交一个任务到进程运行,第二个参数是传给 plus_1() 的参数
future1 = executor.submit(plus_1,1)
# 使用 submit 提交一个任务到进程运行
future2 = executor.submit(plus_1,2)
# 使用 submit 提交一个任务到进程运行
future3 = executor.submit(plus_1,3)
print(future1.result())
print(future2.result())
print(future3.result())

结果为:

1
2
3
2
3
4

上面定义了一个方法,功能是加 1。然后使用线程池的submit()方法提交这个函数执行,并传入对应的参数,通过future.result()方法获得运行结果。

如果使用map()来实现,代码如下:

1
2
3
4
5
6
7
8
9
10
import concurrent.futures
# 定义函数
def plus_1(n):
return n+1

with concurrent.futures.ThreadPoolExecutor() as executor:
# 多进程运行 plus_1() 函数,第二个此参数是参数列表,每次取一个传给 plus_1
results=executor.map(plus_1, [1, 2, 3])
for result in results:
print(result)

输出结果和上面的一致。

多进程和多进程的异常注意事项

需要注意的是,如果在 plus_1() 方法中出现了异常,默认情况下我们不会看到相关的异常信息。除非我们使用future.result()来打印结果,才会看到异常信息。

代码示例:

1
2
3
4
5
6
7
8
9
10
import concurrent.futures
# 定义函数
def plus_1(n):
# 在这里直接抛出异常,在多线程或者多进程下不会看到这个异常
raise Exception("My Error")
return n+1

with concurrent.futures.ThreadPoolExecutor() as executor:
# 多进程运行 plus_1() 函数,第二个此参数是参数列表,每次取一个传给 plus_1
results=executor.map(plus_1, [1, 2, 3])

上面的例子在 plus_1() 方法中抛出了异常,但是我们运行这段代码,发现什么异常没有,程序正常运行。此使我们需要添加打印results的代码,才可以看到异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
import concurrent.futures
# 定义函数
def plus_1(n):
# 在这里直接抛出异常,在多线程或者多进程下不会看到这个异常
raise Exception("My Error")
return n+1


with concurrent.futures.ThreadPoolExecutor() as executor:
# 多进程运行 plus_1() 函数,第二个此参数是参数列表,每次取一个传给 plus_1
results=executor.map(plus_1, [1, 2, 3])
for result in results:
print(result)

最后两行添加了打印results的代码,在 IDE 上可以看到异常。使用submit也同理,需要调用future.result()方法打印。

上面说的异常相关的注意事项对进程池和线程池同样适用。

map 在进程池需要注意的事项

1
2
3
4
5
6
7
8
9
10
import concurrent.futures
# 定义函数
def plus_1(n):
return n+1

with concurrent.futures.ProcessPoolExecutor() as executor:
# 多进程运行 plus_1() 函数,第二个此参数是参数列表,每次取一个传给 plus_1
results=executor.map(plus_1, [1, 2, 3])
for result in results:
print(result)

上面这段代码把 ThreadPoolExecutor 换成了 ProcessPoolExecutor,并且使用了 map() 方法来启动线程池,这种情况会报错concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.。因为会造成多进程循环调用。添加if __name__ == "__main__":即可,这样在创建子进程时就不会运行本身这个代码块。如下:

1
2
3
4
5
6
7
8
9
10
11
import concurrent.futures
# 定义函数
def plus_1(n):
return n+1

if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor() as executor:
# 多进程运行 plus_1() 函数,第二个此参数是参数列表,每次取一个传给 plus_1
results=executor.map(plus_1, [1, 2, 3])
for result in results:
print(result)

max_workers 的 tips

ProcessPoolExecutorThreadPoolExecutor都接受max_workers参数,表示用来执行任务的进程 / 线程数量。ProcessPoolExecutor 的默认值是 CPU 的个数 (通过 os.cpu_count () 获得),而 ThreadPoolExecutor 的默认值是 CPU 的个数的 5 倍。一般情况下不用修改这个参数。



参考:

python 多种并行提高运行效率

使用concurrent.futures的一些经验

评论