Python 多进程(multiprocessing)

Python多进程编程入门,基本使用,queue的使用,pool使用和共享变量的使用

完整代码点击这里

一、基本使用

Python有多进程(multiprocessing)与多线程(multithreading)两种并行的方式,在python中导入库

1
2
import multiprocessing as mp
import threading as td

我们编写一个代码来比较multiprocessing、multithreading和serial三种执行的效率

要使用并行,首先要先准备一个被调用的函数

1
2
3
4
5
6
7
def job(q, a):
res = 0
#两个进程/线程分别负责前半部分和后半部分
for i in range(50000*a, 50000*(a+1)):
res += i**3 + i**2 + i
#结果放入queue中
q.put(res)

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def multiprocess():
# 使用queue
q = mp.Queue()
#传入参数
p1 = mp.Process(target=job, args=(q, 0))
p2 = mp.Process(target=job, args=(q, 1))
p1.start()
p2.start()
p1.join()
p2.join()

res1 = q.get()
res2 = q.get()

print('multiprocess:',res1 + res2)

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def multithread():
# 使用queue
q = mp.Queue()
t1 = td.Thread(target=job, args=(q, 0))
t2 = td.Thread(target=job, args=(q, 1))
t1.start()
t2.start()
t1.join()
t2.join()

res1 = q.get()
res2 = q.get()

print('multithread:', res1 + res2)

串行执行

1
2
3
4
5
def serial():
res = 0
for i in range(100000):
res += i**3 + i**2 + i
print('serial:', res)

调用

1
2
3
4
5
6
7
8
9
10
11
12
if __name__ == '__main__':

import time
st = time.time()
serial()
st1 = time.time()
print('serial time:', st1 - st, "\n")
multithread()
st2 = time.time()
print('multithread time:', st2 - st1, "\n")
multiprocess()
print('multiprocess time:', time.time() - st2, "\n")

输出

1
2
3
4
5
6
7
8
serial: 24999833335833300000
serial time: 0.07318806648254395

multithread: 24999833335833300000
multithread time: 0.16390419006347656

multiprocess: 24999833335833300000
multiprocess time: 0.057524681091308594

可以看出,multiprocessing比普通串行的执行要快一些,但是multithreading比串行要慢一些。multithreading要体现性能的提升必须在任务有大量的IO时,在计算密集型的程序内,multithreading无法体现优越性。事实上,当处理的事情越少时,调度多进程的代价大于多进程带来的性能提升,两种并行都要比串行要慢

二、使用pool来调用python多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing as mp


def job(x):
return x**3

def multiprocess():
#mp.Pool()进程数processes默认为cpu核数
pool = mp.Pool(processes=2)
res = pool.map(job, range(10))
print(res)

#使用apply_async()只能传一组参数
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])

if __name__ == "__main__":
multiprocess()

输出

1
2
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

三、共享变量与互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing as mp
import time

def job(v, num, l):
#请求锁
l.acquire()
for _ in range(10):
time.sleep(0.1)
v.value = v.value + num
print(v.value)
#释放锁
l.release()

def multiprocess():

#共享变量
v = mp.Value('i', 0)
#互斥锁
l = mp.Lock()

p1 = mp.Process(target=job, args=(v, -1, l))
p2 = mp.Process(target=job, args=(v, 1, l))
p1.start()
p2.start()
p1.join()
p2.join()
print('final:', v.value)


if __name__ == "__main__":

multiprocess()

输出(不使用互斥锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-1
0
-1
0
-1
0
-1
0
-1
1
1
1
2
1
0
2
3
3
2
4
final: 4

答案出现了错误,并且两个进程的执行顺序很随机,因为可能出现A进程读取变量到寄存器进行计算后时间片被用完挂起,B进程开始进行了读取之后挂起,当A将计算完的结果放回到内存挂起后B进程又将计算完的结果放回内存,A的计算就被覆盖,产生了错误。

输出(使用互斥锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-1
-2
-3
-4
-5
-6
-7
-8
-9
-10
-9
-8
-7
-6
-5
-4
-3
-2
-1
0
final: 0

可以看到进程1先抢到了互斥锁开始了自己的循环,直到执行完释放后进程2才开始执行。