mpi入门

MPI全称叫做消息传递接口(Message passing interface)

安装mpi4py

Python可以使用mpi4py库,使用pip安装的话需要提前安装好openmpi库:

1
sudo apt-get install libopenmpi-dev

由于Anaconda存在的缘故,pip安装编译会报找不到一些so库。解决方法见issue

不过既然Anaconda存在,可以直接使用conda安装:

1
conda install -c conda-forge mpi4py openmpi

名词介绍

  • world: 一个进程组,聚集通信在这个进程组中进行
  • rank: 当前进程(自己)的编号,0是master进程
  • world_size: 进程组中进程的个数

点对点通信

举例

  • send(isend 非阻塞)
  • recv(irecv 非阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recv_data = None

if rank == 0:
matrix = np.random.randint(-9, 9, size=(10, 10))
comm.send(matrix, 1)
print("process {} send matrix {} to other processes".format(rank, matrix))
else:
matrix_recv = comm.recv()
print("process {} recv matrix {} from master".format(rank, matrix_recv))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
mpiexec -np 2 python mpi_test.py 

process 1 recv matrix
[[ 0 1 -7 3 -7 -1 -2 2 -3 7]
[ 2 -6 7 4 -5 -1 0 -4 2 4]
[ 0 1 3 5 8 -3 -8 -4 -3 -7]
[ 1 -9 -8 5 -9 -7 -7 -6 -6 -2]
[-2 3 -3 -4 3 7 0 2 7 8]
[-3 -6 -4 -5 -5 -1 2 -3 -9 3]
[-6 6 5 4 -5 8 -2 -2 3 6]
[ 1 7 -6 -7 0 -2 8 4 7 -6]
[-2 -7 -5 -4 -9 0 -3 8 -2 7]
[-3 2 2 6 0 -8 -5 0 -4 5]] from master

process 0 send matrix
[[ 0 1 -7 3 -7 -1 -2 2 -3 7]
[ 2 -6 7 4 -5 -1 0 -4 2 4]
[ 0 1 3 5 8 -3 -8 -4 -3 -7]
[ 1 -9 -8 5 -9 -7 -7 -6 -6 -2]
[-2 3 -3 -4 3 7 0 2 7 8]
[-3 -6 -4 -5 -5 -1 2 -3 -9 3]
[-6 6 5 4 -5 8 -2 -2 3 6]
[ 1 7 -6 -7 0 -2 8 4 7 -6]
[-2 -7 -5 -4 -9 0 -3 8 -2 7]
[-3 2 2 6 0 -8 -5 0 -4 5]] to other processes

聚集通信

举例

  • bcast

  • scatter

  • gather(all_gather所有的进程均收到gather结果)

  • reduce(all_reduce所有的进程均收到reduce结果)

bcast 广播消息到所有进程组内进程,调用者如果是master,则是发送;其他进程是接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recv_data = None

if rank == 0:
matrix = np.random.randint(-9, 9, size=(10, 10))
print("process {} prepare random matrix {}".format(rank, matrix))
else:
matrix = np.zeros((10, 10), dtype=int)
print("process {} use empty matrix {}".format(rank, matrix))
matrix = comm.bcast(matrix)
print("process {} bcast matrix {}".format(rank, matrix))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
process 1 use empty matrix [[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]]

process 0 prepare random matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3]
[-6 -5 6 -1 -8 5 7 2 3 7]
[-2 -9 -9 7 0 8 -4 2 -8 -1]
[ 3 -1 3 3 -7 7 -5 -5 -9 -7]
[ 0 -4 -7 -8 -8 -8 7 2 2 1]
[-2 0 1 7 6 3 8 -1 1 8]
[ 6 -4 7 -7 6 -1 -4 6 -6 -9]
[-3 2 8 -7 8 -8 8 3 -1 -3]
[ 6 7 2 -2 1 -5 1 2 1 -2]
[ 2 -3 -2 -3 -7 6 7 -8 -9 4]]

process 0 bcast matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3]
[-6 -5 6 -1 -8 5 7 2 3 7]
[-2 -9 -9 7 0 8 -4 2 -8 -1]
[ 3 -1 3 3 -7 7 -5 -5 -9 -7]
[ 0 -4 -7 -8 -8 -8 7 2 2 1]
[-2 0 1 7 6 3 8 -1 1 8]
[ 6 -4 7 -7 6 -1 -4 6 -6 -9]
[-3 2 8 -7 8 -8 8 3 -1 -3]
[ 6 7 2 -2 1 -5 1 2 1 -2]
[ 2 -3 -2 -3 -7 6 7 -8 -9 4]]

process 1 bcast matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3]
[-6 -5 6 -1 -8 5 7 2 3 7]
[-2 -9 -9 7 0 8 -4 2 -8 -1]
[ 3 -1 3 3 -7 7 -5 -5 -9 -7]
[ 0 -4 -7 -8 -8 -8 7 2 2 1]
[-2 0 1 7 6 3 8 -1 1 8]
[ 6 -4 7 -7 6 -1 -4 6 -6 -9]
[-3 2 8 -7 8 -8 8 3 -1 -3]
[ 6 7 2 -2 1 -5 1 2 1 -2]
[ 2 -3 -2 -3 -7 6 7 -8 -9 4]]

scatter会将发送的数据拆分,然后分给进程组中的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recv_data = None

if rank == 0:
matrix = np.random.randint(-9, 9, size=(10, 10))
line = np.array_split(matrix, 10)
print("process {} prepare random matrix {}".format(rank, matrix))
else:
line = None
one_piece = comm.scatter(line)
print("process {} bcast matrix {}".format(rank, one_piece))


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
mpiexec --oversubscribe -np 10 python mpi_test.py
# 数据的分组数量和进程数要对应。 默认mpi最大进程数量和cpu相同,可以添加--oversubscribe解除限制

process 0 prepare random matrix [[-9 -2 -6 4 1 7 5 5 -1 -3]
[-4 6 3 5 -9 -6 7 3 0 -1]
[-9 -6 2 5 -1 2 -2 -5 -8 -7]
[ 3 -4 -1 -9 -2 5 4 -8 0 1]
[ 3 1 -2 6 -4 0 -2 -3 6 8]
[ 6 5 -8 -7 -3 -7 -5 -8 5 8]
[-3 -9 2 -2 0 -7 6 -5 5 -2]
[-6 -4 8 5 -6 6 2 -3 -2 4]
[-5 7 3 -2 4 -8 3 -5 7 -5]
[ 0 4 -9 -1 -5 -2 -1 -5 7 7]]
process 0 bcast matrix [[-9 -2 -6 4 1 7 5 5 -1 -3]]
process 2 bcast matrix [[-9 -6 2 5 -1 2 -2 -5 -8 -7]]
process 4 bcast matrix [[ 3 1 -2 6 -4 0 -2 -3 6 8]]
process 6 bcast matrix [[-3 -9 2 -2 0 -7 6 -5 5 -2]]
process 1 bcast matrix [[-4 6 3 5 -9 -6 7 3 0 -1]]
process 3 bcast matrix [[ 3 -4 -1 -9 -2 5 4 -8 0 1]]
process 5 bcast matrix [[ 6 5 -8 -7 -3 -7 -5 -8 5 8]]
process 8 bcast matrix [[-5 7 3 -2 4 -8 3 -5 7 -5]]
process 9 bcast matrix [[ 0 4 -9 -1 -5 -2 -1 -5 7 7]]
process 7 bcast matrix [[-6 -4 8 5 -6 6 2 -3 -2 4]]

gather收集所有分发后的数据,比如,我们需要对scatter分发的所有数据+1,然后再收集回来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recv_data = None

if rank == 0:
matrix = np.random.randint(-9, 9, size=(10, 10))
line = np.array_split(matrix, 10)
print("process {} prepare random matrix {}".format(rank, matrix))
else:
line = None
line = comm.scatter(line)

for item in line:
item += 1

recv_result = comm.gather(line)

if rank == 0:
matrix = np.vstack(recv_result)
print("process {} get calculated matrix {}".format(rank, matrix))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
process 0 prepare random matrix 
[[-2 4 0 4 -2 -1 7 3 -1 4]
[-4 -8 -1 -9 8 7 8 -3 -6 1]
[ 1 -8 0 6 -6 -8 -7 2 -9 -1]
[ 7 -7 0 -4 -4 0 -2 4 -7 7]
[-7 3 0 -8 5 -8 -2 0 7 2]
[-6 -9 -3 -8 0 0 0 -7 -4 -5]
[ 2 -6 3 -8 8 -8 -2 0 -2 -9]
[ 8 8 -8 -8 7 8 4 -3 -9 3]
[ 8 -4 4 2 8 -2 2 7 0 -6]
[ 3 1 -3 2 -8 -6 -4 -5 3 0]]

process 0 get calculated matrix
[[-1 5 1 5 -1 0 8 4 0 5]
[-3 -7 0 -8 9 8 9 -2 -5 2]
[ 2 -7 1 7 -5 -7 -6 3 -8 0]
[ 8 -6 1 -3 -3 1 -1 5 -6 8]
[-6 4 1 -7 6 -7 -1 1 8 3]
[-5 -8 -2 -7 1 1 1 -6 -3 -4]
[ 3 -5 4 -7 9 -7 -1 1 -1 -8]
[ 9 9 -7 -7 8 9 5 -2 -8 4]
[ 9 -3 5 3 9 -1 3 8 1 -5]
[ 4 2 -2 3 -7 -5 -3 -4 4 1]]

reduce 将收集到的数据进行规约,例如SUM,MAX等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

recv_data = None

if rank == 0:
matrix = np.random.randint(-9, 9, size=(10, 10))
line = np.array_split(matrix, 10)
print("process {} prepare random matrix {}".format(rank, matrix))
else:
line = None
line = comm.scatter(line)
sum_result = comm.reduce(line, op=MPI.SUM)

if rank == 0:
print("process {} get calculated sum result {}".format(rank, sum_result))

1
2
3
4
5
6
7
8
9
10
11
12
13
process 0 prepare random matrix 
[[ 2 -9 -9 -8 -9 -8 -7 8 -6 5]
[ 0 6 6 -2 -6 4 -5 -7 3 -4]
[-9 2 6 -5 -1 -6 5 -2 3 4]
[ 7 4 -6 -5 1 4 -5 5 -1 1]
[ 1 2 -8 -7 5 2 -6 -4 5 -8]
[-8 0 7 1 0 -8 3 -8 -2 3]
[ 6 5 2 -7 -6 -3 -7 4 2 -1]
[ 2 -8 8 -1 -3 -9 0 4 -9 -9]
[-4 -7 -1 -8 4 7 1 6 4 5]
[-2 6 3 3 7 7 -4 1 -7 -3]]

process 0 get calculated sum result [[ -5 1 8 -39 -8 -10 -25 7 -8 -7]]