MPI全称叫做消息传递接口(Message
passing interface)
安装mpi4py
Python可以使用mpi4py库,使用pip安装的话需要提前安装好openmpi库:
1
| sudo apt-get install libopenmpi-dev
|
由于Anaconda存在的缘故,pip安装编译会报找不到一些so库。解决方法见issue
不过既然Anaconda存在,可以直接使用conda安装:
1
| conda install -c conda-forge mpi4py openmpi
|
名词介绍
- world: 一个进程组,聚集通信在这个进程组中进行
- rank: 当前进程(自己)的编号,0是master进程
- world_size: 进程组中进程的个数
点对点通信
举例
- send(isend 非阻塞)
- recv(irecv 非阻塞)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from mpi4py import MPI import numpy as np
comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size()
recv_data = None
if rank == 0: matrix = np.random.randint(-9, 9, size=(10, 10)) comm.send(matrix, 1) print("process {} send matrix {} to other processes".format(rank, matrix)) else: matrix_recv = comm.recv() print("process {} recv matrix {} from master".format(rank, matrix_recv))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| mpiexec -np 2 python mpi_test.py
process 1 recv matrix [[ 0 1 -7 3 -7 -1 -2 2 -3 7] [ 2 -6 7 4 -5 -1 0 -4 2 4] [ 0 1 3 5 8 -3 -8 -4 -3 -7] [ 1 -9 -8 5 -9 -7 -7 -6 -6 -2] [-2 3 -3 -4 3 7 0 2 7 8] [-3 -6 -4 -5 -5 -1 2 -3 -9 3] [-6 6 5 4 -5 8 -2 -2 3 6] [ 1 7 -6 -7 0 -2 8 4 7 -6] [-2 -7 -5 -4 -9 0 -3 8 -2 7] [-3 2 2 6 0 -8 -5 0 -4 5]] from master process 0 send matrix [[ 0 1 -7 3 -7 -1 -2 2 -3 7] [ 2 -6 7 4 -5 -1 0 -4 2 4] [ 0 1 3 5 8 -3 -8 -4 -3 -7] [ 1 -9 -8 5 -9 -7 -7 -6 -6 -2] [-2 3 -3 -4 3 7 0 2 7 8] [-3 -6 -4 -5 -5 -1 2 -3 -9 3] [-6 6 5 4 -5 8 -2 -2 3 6] [ 1 7 -6 -7 0 -2 8 4 7 -6] [-2 -7 -5 -4 -9 0 -3 8 -2 7] [-3 2 2 6 0 -8 -5 0 -4 5]] to other processes
|
聚集通信
举例
bcast
广播消息到所有进程组内进程,调用者如果是master,则是发送;其他进程是接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from mpi4py import MPI import numpy as np
comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size()
recv_data = None
if rank == 0: matrix = np.random.randint(-9, 9, size=(10, 10)) print("process {} prepare random matrix {}".format(rank, matrix)) else: matrix = np.zeros((10, 10), dtype=int) print("process {} use empty matrix {}".format(rank, matrix)) matrix = comm.bcast(matrix) print("process {} bcast matrix {}".format(rank, matrix))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| process 1 use empty matrix [[0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]] process 0 prepare random matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3] [-6 -5 6 -1 -8 5 7 2 3 7] [-2 -9 -9 7 0 8 -4 2 -8 -1] [ 3 -1 3 3 -7 7 -5 -5 -9 -7] [ 0 -4 -7 -8 -8 -8 7 2 2 1] [-2 0 1 7 6 3 8 -1 1 8] [ 6 -4 7 -7 6 -1 -4 6 -6 -9] [-3 2 8 -7 8 -8 8 3 -1 -3] [ 6 7 2 -2 1 -5 1 2 1 -2] [ 2 -3 -2 -3 -7 6 7 -8 -9 4]] process 0 bcast matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3] [-6 -5 6 -1 -8 5 7 2 3 7] [-2 -9 -9 7 0 8 -4 2 -8 -1] [ 3 -1 3 3 -7 7 -5 -5 -9 -7] [ 0 -4 -7 -8 -8 -8 7 2 2 1] [-2 0 1 7 6 3 8 -1 1 8] [ 6 -4 7 -7 6 -1 -4 6 -6 -9] [-3 2 8 -7 8 -8 8 3 -1 -3] [ 6 7 2 -2 1 -5 1 2 1 -2] [ 2 -3 -2 -3 -7 6 7 -8 -9 4]] process 1 bcast matrix [[ 5 -8 8 4 1 -7 -8 -6 1 3] [-6 -5 6 -1 -8 5 7 2 3 7] [-2 -9 -9 7 0 8 -4 2 -8 -1] [ 3 -1 3 3 -7 7 -5 -5 -9 -7] [ 0 -4 -7 -8 -8 -8 7 2 2 1] [-2 0 1 7 6 3 8 -1 1 8] [ 6 -4 7 -7 6 -1 -4 6 -6 -9] [-3 2 8 -7 8 -8 8 3 -1 -3] [ 6 7 2 -2 1 -5 1 2 1 -2] [ 2 -3 -2 -3 -7 6 7 -8 -9 4]]
|
scatter会将发送的数据拆分,然后分给进程组中的进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from mpi4py import MPI import numpy as np
comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size()
recv_data = None
if rank == 0: matrix = np.random.randint(-9, 9, size=(10, 10)) line = np.array_split(matrix, 10) print("process {} prepare random matrix {}".format(rank, matrix)) else: line = None one_piece = comm.scatter(line) print("process {} bcast matrix {}".format(rank, one_piece))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| mpiexec --oversubscribe -np 10 python mpi_test.py # 数据的分组数量和进程数要对应。 默认mpi最大进程数量和cpu相同,可以添加--oversubscribe解除限制
process 0 prepare random matrix [[-9 -2 -6 4 1 7 5 5 -1 -3] [-4 6 3 5 -9 -6 7 3 0 -1] [-9 -6 2 5 -1 2 -2 -5 -8 -7] [ 3 -4 -1 -9 -2 5 4 -8 0 1] [ 3 1 -2 6 -4 0 -2 -3 6 8] [ 6 5 -8 -7 -3 -7 -5 -8 5 8] [-3 -9 2 -2 0 -7 6 -5 5 -2] [-6 -4 8 5 -6 6 2 -3 -2 4] [-5 7 3 -2 4 -8 3 -5 7 -5] [ 0 4 -9 -1 -5 -2 -1 -5 7 7]] process 0 bcast matrix [[-9 -2 -6 4 1 7 5 5 -1 -3]] process 2 bcast matrix [[-9 -6 2 5 -1 2 -2 -5 -8 -7]] process 4 bcast matrix [[ 3 1 -2 6 -4 0 -2 -3 6 8]] process 6 bcast matrix [[-3 -9 2 -2 0 -7 6 -5 5 -2]] process 1 bcast matrix [[-4 6 3 5 -9 -6 7 3 0 -1]] process 3 bcast matrix [[ 3 -4 -1 -9 -2 5 4 -8 0 1]] process 5 bcast matrix [[ 6 5 -8 -7 -3 -7 -5 -8 5 8]] process 8 bcast matrix [[-5 7 3 -2 4 -8 3 -5 7 -5]] process 9 bcast matrix [[ 0 4 -9 -1 -5 -2 -1 -5 7 7]] process 7 bcast matrix [[-6 -4 8 5 -6 6 2 -3 -2 4]]
|
gather收集所有分发后的数据,比如,我们需要对scatter分发的所有数据+1,然后再收集回来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| from mpi4py import MPI import numpy as np
comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size()
recv_data = None
if rank == 0: matrix = np.random.randint(-9, 9, size=(10, 10)) line = np.array_split(matrix, 10) print("process {} prepare random matrix {}".format(rank, matrix)) else: line = None line = comm.scatter(line)
for item in line: item += 1
recv_result = comm.gather(line)
if rank == 0: matrix = np.vstack(recv_result) print("process {} get calculated matrix {}".format(rank, matrix))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| process 0 prepare random matrix [[-2 4 0 4 -2 -1 7 3 -1 4] [-4 -8 -1 -9 8 7 8 -3 -6 1] [ 1 -8 0 6 -6 -8 -7 2 -9 -1] [ 7 -7 0 -4 -4 0 -2 4 -7 7] [-7 3 0 -8 5 -8 -2 0 7 2] [-6 -9 -3 -8 0 0 0 -7 -4 -5] [ 2 -6 3 -8 8 -8 -2 0 -2 -9] [ 8 8 -8 -8 7 8 4 -3 -9 3] [ 8 -4 4 2 8 -2 2 7 0 -6] [ 3 1 -3 2 -8 -6 -4 -5 3 0]] process 0 get calculated matrix [[-1 5 1 5 -1 0 8 4 0 5] [-3 -7 0 -8 9 8 9 -2 -5 2] [ 2 -7 1 7 -5 -7 -6 3 -8 0] [ 8 -6 1 -3 -3 1 -1 5 -6 8] [-6 4 1 -7 6 -7 -1 1 8 3] [-5 -8 -2 -7 1 1 1 -6 -3 -4] [ 3 -5 4 -7 9 -7 -1 1 -1 -8] [ 9 9 -7 -7 8 9 5 -2 -8 4] [ 9 -3 5 3 9 -1 3 8 1 -5] [ 4 2 -2 3 -7 -5 -3 -4 4 1]]
|
reduce 将收集到的数据进行规约,例如SUM,MAX等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from mpi4py import MPI import numpy as np
comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size()
recv_data = None
if rank == 0: matrix = np.random.randint(-9, 9, size=(10, 10)) line = np.array_split(matrix, 10) print("process {} prepare random matrix {}".format(rank, matrix)) else: line = None line = comm.scatter(line) sum_result = comm.reduce(line, op=MPI.SUM)
if rank == 0: print("process {} get calculated sum result {}".format(rank, sum_result))
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| process 0 prepare random matrix [[ 2 -9 -9 -8 -9 -8 -7 8 -6 5] [ 0 6 6 -2 -6 4 -5 -7 3 -4] [-9 2 6 -5 -1 -6 5 -2 3 4] [ 7 4 -6 -5 1 4 -5 5 -1 1] [ 1 2 -8 -7 5 2 -6 -4 5 -8] [-8 0 7 1 0 -8 3 -8 -2 3] [ 6 5 2 -7 -6 -3 -7 4 2 -1] [ 2 -8 8 -1 -3 -9 0 4 -9 -9] [-4 -7 -1 -8 4 7 1 6 4 5] [-2 6 3 3 7 7 -4 1 -7 -3]] process 0 get calculated sum result [[ -5 1 8 -39 -8 -10 -25 7 -8 -7]]
|