HCCL 通信验证demo

在aDAG中,NPU之间的数据走带外通信,相比于数据拷贝会Host在传输的方式,效率会高很多。Ray中GPU使用cupy的库中实现的NCCL通信,但是HCCL没有这样的库,需要用C++实现,并提供Python接口。

由于Ray的不同Actor之间的资源是隔离的,所以HCCL初始化的时候,并不能看到其他Actor的卡,所以需要验证这种场景下的可行性。

HCCL功能验证

Demo使用RootInfo的方式创建HCCL group,为了实现简单,这里使用unix socket来传输RootInfo,然后创建HCCL group,再传输数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#include <iostream>
#include "acl/acl.h"
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define SOCKET_PATH "/tmp/my_socket"

// 发送root_info
int send_data(const char *data) {
int sockfd;
struct sockaddr_un addr;

sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
return -1;
}

memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);

if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("connect");
close(sockfd);
return -1;
}

if (send(sockfd, data, HCCL_ROOT_INFO_BYTES, 0) == -1) {
perror("send");
close(sockfd);
return -1;
}

printf("Data sent successfully!\n");

close(sockfd);
return 0;
}

// 接受root_info
int receive_data(char *buffer) {
int server_fd, client_fd;
struct sockaddr_un addr;

server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket");
return -1;
}

memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);

if (bind(server_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("bind");
close(server_fd);
return -1;
}

if (listen(server_fd, 1) == -1) {
perror("listen");
close(server_fd);
return -1;
}

printf("Waiting for connection...\n");

client_fd = accept(server_fd, NULL, NULL);
if (client_fd == -1) {
perror("accept");
close(server_fd);
return -1;
}

if (recv(client_fd, buffer, HCCL_ROOT_INFO_BYTES, 0) == -1) {
perror("recv");
close(client_fd);
close(server_fd);
return -1;
}

printf("Data received successfully!\n");

close(client_fd);
close(server_fd);
return 0;
}

#define ACLCHECK(ret) do {\
if(ret != ACL_SUCCESS)\
{\
printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)

#define HCCLCHECK(ret) do {\
if(ret != HCCL_SUCCESS)\
{\
printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)

int main(int argc, char *argv[]) {
// rank=0是发送者,否则是接收者
int rank = 0;
if (argc < 2){
printf("Usage: %s <0|1>\n", argv[0]);
return -1;
}
if (strcmp(argv[1], "0") == 0) {
rank = 0;
} else {
rank = 1;
}

ACLCHECK(aclInit(NULL));
// 由于ASCEND_RT_VISIBLE_DEVICES的限制,每个进程只能看到一张卡
ACLCHECK(aclrtSetDevice(0));
aclrtStream stream;
ACLCHECK(aclrtCreateStream(&stream));
HcclRootInfo rootInfo;
HcclComm hcclComm;
if (rank == 0) {
// 发送者获取root_info,并发送给接收者
HCCLCHECK(HcclGetRootInfo(&rootInfo));
send_data((const char *)&rootInfo);
} else {
// 接收者接受root_info
receive_data((char *)&rootInfo);
}

// 初始化HCCL
HCCLCHECK(HcclCommInitRootInfo(2, &rootInfo, rank, &hcclComm));
printf("HCCL init success\n");

void* deviceBuff;
void* hostBuff;
int count = 32;
int mallocSize = count * sizeof(char);
ACLCHECK(aclrtMallocHost((void**)&hostBuff, mallocSize));
ACLCHECK(aclrtMalloc((void**)&deviceBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST));
printf("Buffer alloced\n");

if(rank == 0) {
// 使用HCCL发送数据
memcpy(hostBuff, "Hello World\n", 13);
ACLCHECK(aclrtMemcpy(deviceBuff, mallocSize, hostBuff, mallocSize, ACL_MEMCPY_HOST_TO_DEVICE));
HCCLCHECK(HcclSend(deviceBuff, count, HCCL_DATA_TYPE_INT8, 1, hcclComm, stream));
ACLCHECK(aclrtSynchronizeStream(stream));
printf("HCCL sent\n");
} else {
// 使用HCCL接收数据
HCCLCHECK(HcclRecv(deviceBuff, count, HCCL_DATA_TYPE_INT8, 0, hcclComm, stream));
ACLCHECK(aclrtSynchronizeStream(stream));
ACLCHECK(aclrtMemcpy(hostBuff, mallocSize, deviceBuff, mallocSize, ACL_MEMCPY_DEVICE_TO_HOST));
printf("HCCL recived: %s\n", (char*)hostBuff);
}

// 去初始化
HCCLCHECK(HcclCommDestroy(hcclComm));
ACLCHECK(aclrtResetDevice(0));
ACLCHECK(aclFinalize());
return 0;
}

编译命令

1
g++ -o test test.cpp -I ${ASCEND_HOME_PATH}/include -L ${ASCEND_HOME_PATH}/lib64 -lascendcl -lhccl

执行方法

shell 1:

1
2
3
4
5
6
7
ASCEND_RT_VISIBLE_DEVICES=1 ./test 1

输出:
Waiting for connection...
HCCL init success
Buffer alloced
HCCL recived: Hello World

shell 2:

1
2
3
4
5
6
7
ASCEND_RT_VISIBLE_DEVICES=0 ./test 0

输出:
Data sent successfully!
HCCL init success
Buffer alloced
HCCL sent

cython实现

hccl_utils.pyx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
cimport cython  # NOQA
from libc.stdint cimport intptr_t

cdef extern from "acl/acl.h":
ctypedef void *aclrtStream

cdef extern from "hccl/hccl.h":
ctypedef void *HcclComm
ctypedef unsigned int __uint32_t
ctypedef __uint32_t uint32_t
ctypedef unsigned long int __uint64_t
ctypedef __uint64_t uint64_t
ctypedef enum HcclDataType:
pass
ctypedef enum HcclResult:
HCCL_SUCCESS

cdef enum:
HCCL_ROOT_INFO_BYTES = 4108
ctypedef struct HcclRootInfo:
char internal[HCCL_ROOT_INFO_BYTES]

HcclResult HcclGetRootInfo(HcclRootInfo *rootInfo) nogil
HcclResult HcclCommInitRootInfo(uint32_t nRanks, const HcclRootInfo *rootInfo, uint32_t rank, HcclComm *comm) nogil
HcclResult HcclSend(void* sendBuf, uint64_t count, HcclDataType dataType, uint32_t destRank, HcclComm comm, aclrtStream stream) nogil
HcclResult HcclRecv(void* recvBuf, uint64_t count, HcclDataType dataType, uint32_t srcRank, HcclComm comm, aclrtStream stream) nogil
HcclResult HcclCommDestroy(HcclComm comm) nogil
const char *HcclGetErrorString(HcclResult code) nogil

cdef dict HCCL_ERR_STR = {
0 : 'HCCL_SUCCESS',
1 : 'HCCL_E_PARA',
2 : 'HCCL_E_PTR',
3 : 'HCCL_E_MEMORY',
4 : 'HCCL_E_INTERNAL',
5 : 'HCCL_E_NOT_SUPPORT',
6 : 'HCCL_E_NOT_FOUND',
7 : 'HCCL_E_UNAVAIL',
8 : 'HCCL_E_SYSCALL',
9 : 'HCCL_E_TIMEOUT',
10 : 'HCCL_E_OPEN_FILE_FAILURE',
11 : 'HCCL_E_TCP_CONNECT',
12 : 'HCCL_E_ROCE_CONNECT',
13 : 'HCCL_E_TCP_TRANSFER',
14 : 'HCCL_E_ROCE_TRANSFER',
15 : 'HCCL_E_RUNTIME',
16 : 'HCCL_E_DRV',
17 : 'HCCL_E_PROFILING',
18 : 'HCCL_E_CCE',
19 : 'HCCL_E_NETWORK',
20 : 'HCCL_E_AGAIN',
21 : 'HCCL_E_REMOTE',
22 : 'HCCL_E_SUSPENDING',
23 : 'HCCL_E_RESERVED'
}

class HcclError(RuntimeError):
def __init__(self, int status):
self.status = status
cdef const char* msg
with nogil:
msg = HcclGetErrorString(<HcclResult>status)
super(HcclError, self).__init__(
'%s: %s' % (HCCL_ERR_STR[status], msg.decode()))

def __reduce__(self):
return (type(self), (self.status,))


@cython.profile(False)
cpdef inline check_hccl_status(HcclResult status):
if status != HCCL_SUCCESS:
raise HcclError(status)


def get_unique_id():
cdef HcclRootInfo root_info
with nogil:
status = HcclGetRootInfo(&root_info)
check_hccl_status(status)
ret = tuple([root_info.internal[i]
for i in range(HCCL_ROOT_INFO_BYTES)])
return ret


cdef class HCCLCommunicator:
cdef:
HcclComm _comm

@property
def comm(self):
return <intptr_t>self._comm

def __cinit__(self):
self._comm = <HcclComm>0

def __dealloc__(self):
if self._comm:
with nogil:
status = HcclCommDestroy(self._comm)
check_hccl_status(status)
self._comm = <HcclComm>0

def __init__(self, int ndev, tuple commId, int rank):
cdef HcclRootInfo _root_info
assert len(commId) == HCCL_ROOT_INFO_BYTES
for i in range(HCCL_ROOT_INFO_BYTES):
_root_info.internal[i] = commId[i]
with nogil:
status = HcclCommInitRootInfo(ndev, &_root_info, rank, &self._comm)
check_hccl_status(status)

def send(self, intptr_t sendbuf, size_t count, int datatype, int peer, intptr_t stream):
with nogil:
status = HcclSend(<void*>sendbuf, count, <HcclDataType>datatype, peer, self._comm, <aclrtStream>stream)
check_hccl_status(status)

def recv(self, intptr_t recvbuf, size_t count, int datatype, int peer, intptr_t stream):
with nogil:
status = HcclRecv(<void*>recvbuf, count, <HcclDataType>datatype, peer, self._comm, <aclrtStream>stream)
check_hccl_status(status)



def create_hccl_communicator(world_size, hccl_root_info, rank):
return HCCLCommunicator(world_size, hccl_root_info, rank)

编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from distutils.core import setup, Extension
from Cython.Build import cythonize
import os

ascend_home_path = os.getenv('ASCEND_HOME_PATH', '/usr/local/Ascend/ascend-toolkit/latest')

setup(name='Hello world app',
ext_modules=cythonize(Extension(
'hccl_utils',
sources = ['hccl_utils.pyx'],
language='c++',
include_dirs=[os.path.join(ascend_home_path, 'include')],
library_dirs=[os.path.join(ascend_home_path, 'lib64')],
libraries=['ascendcl','hccl'],
extra_compile_args=[],
extra_link_args=[]
)))

# python setup.py build_ext --inplace

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import hccl_utils

import multiprocessing
import time
import torch, torch_npu

def sender(queue):
torch.npu.set_device(0)
id = hccl_utils.get_unique_id()
queue.put(id)
print("sender: Root info sent!")

comm = hccl_utils.create_hccl_communicator(2, id, 0)
print("sender: communicator created!")

stream = torch.npu.Stream()
t1 = torch.ones(2, 2, dtype=torch.float32).npu()
print("sender: ready to send!")
comm.send(t1.data_ptr(), t1.numel(), 4, 1, stream.npu_stream)
print("sender: data sent!")


def receiver(queue):
torch.npu.set_device(1)
id = queue.get()
print(f"receiver: Received Root info")

comm = hccl_utils.create_hccl_communicator(2, id, 1)
print("receiver: communicator created!")

stream = torch.npu.Stream()
t1 = torch.zeros(2, 2, dtype=torch.float32).npu()
print("receiver: ready to recv!")
comm.recv(t1.data_ptr(), t1.numel(), 4, 0, stream.npu_stream)
stream.synchronize()
print(f"receiver: data received: {t1.cpu()}")



if __name__ == '__main__':
queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=sender, args=(queue,))
p2 = multiprocessing.Process(target=receiver, args=(queue,))

p1.start()
p2.start()

p1.join()
p2.join()


## (base) hua@hua-docker:~/code/hccl$ python test_hccl_utils.py
## sender: Root info sent!
## receiver: Received Root info
## sender: communicator created!
## receiver: communicator created!
## sender: ready to send!
## receiver: ready to recv!
## sender: data sent!
## receiver: data received: tensor([[1., 1.],
## [1., 1.]])

验证创建多个group

由于aDAG当前昇腾支持的问题,验证在同一个进程,绑定的同一张卡上创建多个group的情况是否可行,将上面的代码简单修改,先创建group1,然后发送数据,接着再创建group2,但是使用与group1不同的rank,再次通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import hccl_utils

import multiprocessing
import time
import torch, torch_npu

def sender(queue):
# create hccl group 1
torch.npu.set_device(1)
id = hccl_utils.get_unique_id()
queue.put(id)
print("sender: Root info sent!")
comm = hccl_utils.create_hccl_communicator(2, id, 0)
print("sender: communicator created!")
stream = torch.npu.Stream()
t1 = torch.ones(2, 2, dtype=torch.float32).npu()
print("sender: ready to send!")
comm.send(t1.data_ptr(), t1.numel(), 4, 1, stream.npu_stream)
print("sender: data sent!")

# create another group with different rank id
print("sender2: create new group")
id2 = hccl_utils.get_unique_id()
queue.put(id2)
print("sender2: Root info sent!")
comm2 = hccl_utils.create_hccl_communicator(2, id2, 1)
print("sender2: communicator created!")
stream2 = torch.npu.Stream()
t2 = torch.ones(3, 3, dtype=torch.float32).npu()
print("sender2: ready to send!")
comm2.send(t2.data_ptr(), t2.numel(), 4, 0, stream2.npu_stream)



def receiver(queue):
# create hccl group 1
torch.npu.set_device(0)
id = queue.get()
print(f"receiver: Received Root info")
comm = hccl_utils.create_hccl_communicator(2, id, 1)
print("receiver: communicator created!")
stream = torch.npu.Stream()
t1 = torch.zeros(2, 2, dtype=torch.float32).npu()
print("receiver: ready to recv!")
comm.recv(t1.data_ptr(), t1.numel(), 4, 0, stream.npu_stream)
stream.synchronize()
print(f"receiver: data received: {t1.cpu()}")

# create another group with different rank id
print(f"receiver2: create new group")
id2 = queue.get()
print(f"receiver2: Received Root info")
comm2 = hccl_utils.create_hccl_communicator(2, id2, 0)
print("receiver2: communicator created!")
stream2 = torch.npu.Stream()
t2 = torch.zeros(3, 3, dtype=torch.float32).npu()
print("receiver2: ready to recv!")
comm2.recv(t2.data_ptr(), t2.numel(), 4, 1, stream2.npu_stream)
stream2.synchronize()
print(f"receiver2: data received: {t2.cpu()}")



if __name__ == '__main__':
queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=sender, args=(queue,))
p2 = multiprocessing.Process(target=receiver, args=(queue,))

p1.start()
p2.start()

p1.join()
p2.join()



## hua@hua-docker:~/code/hccl$ python test_hccl_utils1.py
## sender: Root info sent!
## receiver: Received Root info
## sender: communicator created!
## sender: ready to send!
## receiver: communicator created!
## receiver: ready to recv!
## sender: data sent!
## sender2: create new group
## sender2: Root info sent!
## receiver: data received: tensor([[1., 1.],
## [1., 1.]])
## receiver2: create new group
## receiver2: Received Root info
## sender2: communicator created!
## sender2: ready to send!
## receiver2: communicator created!
## receiver2: ready to recv!
## receiver2: data received: tensor([[1., 1., 1.],
## [1., 1., 1.],
## [1., 1., 1.]])