在aDAG中,NPU之间的数据走带外通信,相比于数据拷贝会Host在传输的方式,效率会高很多。Ray中GPU使用cupy的库中实现的NCCL通信,但是HCCL没有这样的库,需要用C++实现,并提供Python接口。

由于Ray的不同Actor之间的资源是隔离的,所以HCCL初始化的时候,并不能看到其他Actor的卡,所以需要验证这种场景下的可行性。

HCCL功能验证

Demo使用RootInfo的方式创建HCCL group,为了实现简单,这里使用unix socket来传输RootInfo,然后创建HCCL group,再传输数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#include <iostream>
#include "acl/acl.h"
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define SOCKET_PATH "/tmp/my_socket"

// 发送root_info
int send_data(const char *data) {
int sockfd;
struct sockaddr_un addr;

sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
return -1;
}

memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);

if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("connect");
close(sockfd);
return -1;
}

if (send(sockfd, data, HCCL_ROOT_INFO_BYTES, 0) == -1) {
perror("send");
close(sockfd);
return -1;
}

printf("Data sent successfully!\n");

close(sockfd);
return 0;
}

// 接受root_info
int receive_data(char *buffer) {
int server_fd, client_fd;
struct sockaddr_un addr;

server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket");
return -1;
}

memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);

if (bind(server_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) == -1) {
perror("bind");
close(server_fd);
return -1;
}

if (listen(server_fd, 1) == -1) {
perror("listen");
close(server_fd);
return -1;
}

printf("Waiting for connection...\n");

client_fd = accept(server_fd, NULL, NULL);
if (client_fd == -1) {
perror("accept");
close(server_fd);
return -1;
}

if (recv(client_fd, buffer, HCCL_ROOT_INFO_BYTES, 0) == -1) {
perror("recv");
close(client_fd);
close(server_fd);
return -1;
}

printf("Data received successfully!\n");

close(client_fd);
close(server_fd);
return 0;
}

#define ACLCHECK(ret) do {\
if(ret != ACL_SUCCESS)\
{\
printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)

#define HCCLCHECK(ret) do {\
if(ret != HCCL_SUCCESS)\
{\
printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)

int main(int argc, char *argv[]) {
// rank=0是发送者,否则是接收者
int rank = 0;
if (argc < 2){
printf("Usage: %s <0|1>\n", argv[0]);
return -1;
}
if (strcmp(argv[1], "0") == 0) {
rank = 0;
} else {
rank = 1;
}

ACLCHECK(aclInit(NULL));
// 由于ASCEND_RT_VISIBLE_DEVICES的限制,每个进程只能看到一张卡
ACLCHECK(aclrtSetDevice(0));
aclrtStream stream;
ACLCHECK(aclrtCreateStream(&stream));
HcclRootInfo rootInfo;
HcclComm hcclComm;
if (rank == 0) {
// 发送者获取root_info,并发送给接收者
HCCLCHECK(HcclGetRootInfo(&rootInfo));
send_data((const char *)&rootInfo);
} else {
// 接收者接受root_info
receive_data((char *)&rootInfo);
}

// 初始化HCCL
HCCLCHECK(HcclCommInitRootInfo(2, &rootInfo, rank, &hcclComm));
printf("HCCL init success\n");

void* deviceBuff;
void* hostBuff;
int count = 32;
int mallocSize = count * sizeof(char);
ACLCHECK(aclrtMallocHost((void**)&hostBuff, mallocSize));
ACLCHECK(aclrtMalloc((void**)&deviceBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST));
printf("Buffer alloced\n");

if(rank == 0) {
// 使用HCCL发送数据
memcpy(hostBuff, "Hello World\n", 13);
ACLCHECK(aclrtMemcpy(deviceBuff, mallocSize, hostBuff, mallocSize, ACL_MEMCPY_HOST_TO_DEVICE));
HCCLCHECK(HcclSend(deviceBuff, count, HCCL_DATA_TYPE_INT8, 1, hcclComm, stream));
ACLCHECK(aclrtSynchronizeStream(stream));
printf("HCCL sent\n");
} else {
// 使用HCCL接收数据
HCCLCHECK(HcclRecv(deviceBuff, count, HCCL_DATA_TYPE_INT8, 0, hcclComm, stream));
ACLCHECK(aclrtSynchronizeStream(stream));
ACLCHECK(aclrtMemcpy(hostBuff, mallocSize, deviceBuff, mallocSize, ACL_MEMCPY_DEVICE_TO_HOST));
printf("HCCL recived: %s\n", (char*)hostBuff);
}

// 去初始化
HCCLCHECK(HcclCommDestroy(hcclComm));
ACLCHECK(aclrtResetDevice(0));
ACLCHECK(aclFinalize());
return 0;
}

编译命令

1
g++ -o test test.cpp -I ${ASCEND_HOME_PATH}/include -L ${ASCEND_HOME_PATH}/lib64 -lascendcl -lhccl

执行方法

shell 1:

1
2
3
4
5
6
7
ASCEND_RT_VISIBLE_DEVICES=1 ./test 1

输出:
Waiting for connection...
HCCL init success
Buffer alloced
HCCL recived: Hello World

shell 2:

1
2
3
4
5
6
7
ASCEND_RT_VISIBLE_DEVICES=0 ./test 0

输出:
Data sent successfully!
HCCL init success
Buffer alloced
HCCL sent

cython实现

hccl_utils.pyx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
cimport cython  # NOQA
from libc.stdint cimport intptr_t

cdef extern from "acl/acl.h":
ctypedef void *aclrtStream

cdef extern from "hccl/hccl.h":
ctypedef void *HcclComm
ctypedef unsigned int __uint32_t
ctypedef __uint32_t uint32_t
ctypedef unsigned long int __uint64_t
ctypedef __uint64_t uint64_t
ctypedef enum HcclDataType:
pass
ctypedef enum HcclResult:
HCCL_SUCCESS

cdef enum:
HCCL_ROOT_INFO_BYTES = 4108
ctypedef struct HcclRootInfo:
char internal[HCCL_ROOT_INFO_BYTES]

HcclResult HcclGetRootInfo(HcclRootInfo *rootInfo) nogil
HcclResult HcclCommInitRootInfo(uint32_t nRanks, const HcclRootInfo *rootInfo, uint32_t rank, HcclComm *comm) nogil
HcclResult HcclSend(void* sendBuf, uint64_t count, HcclDataType dataType, uint32_t destRank, HcclComm comm, aclrtStream stream) nogil
HcclResult HcclRecv(void* recvBuf, uint64_t count, HcclDataType dataType, uint32_t srcRank, HcclComm comm, aclrtStream stream) nogil
HcclResult HcclCommDestroy(HcclComm comm) nogil
const char *HcclGetErrorString(HcclResult code) nogil

cdef dict HCCL_ERR_STR = {
0 : 'HCCL_SUCCESS',
1 : 'HCCL_E_PARA',
2 : 'HCCL_E_PTR',
3 : 'HCCL_E_MEMORY',
4 : 'HCCL_E_INTERNAL',
5 : 'HCCL_E_NOT_SUPPORT',
6 : 'HCCL_E_NOT_FOUND',
7 : 'HCCL_E_UNAVAIL',
8 : 'HCCL_E_SYSCALL',
9 : 'HCCL_E_TIMEOUT',
10 : 'HCCL_E_OPEN_FILE_FAILURE',
11 : 'HCCL_E_TCP_CONNECT',
12 : 'HCCL_E_ROCE_CONNECT',
13 : 'HCCL_E_TCP_TRANSFER',
14 : 'HCCL_E_ROCE_TRANSFER',
15 : 'HCCL_E_RUNTIME',
16 : 'HCCL_E_DRV',
17 : 'HCCL_E_PROFILING',
18 : 'HCCL_E_CCE',
19 : 'HCCL_E_NETWORK',
20 : 'HCCL_E_AGAIN',
21 : 'HCCL_E_REMOTE',
22 : 'HCCL_E_SUSPENDING',
23 : 'HCCL_E_RESERVED'
}

class HcclError(RuntimeError):
def __init__(self, int status):
self.status = status
cdef const char* msg
with nogil:
msg = HcclGetErrorString(<HcclResult>status)
super(HcclError, self).__init__(
'%s: %s' % (HCCL_ERR_STR[status], msg.decode()))

def __reduce__(self):
return (type(self), (self.status,))


@cython.profile(False)
cpdef inline check_hccl_status(HcclResult status):
if status != HCCL_SUCCESS:
raise HcclError(status)


def get_unique_id():
cdef HcclRootInfo root_info
with nogil:
status = HcclGetRootInfo(&root_info)
check_hccl_status(status)
ret = tuple([root_info.internal[i]
for i in range(HCCL_ROOT_INFO_BYTES)])
return ret


cdef class HCCLCommunicator:
cdef:
HcclComm _comm

@property
def comm(self):
return <intptr_t>self._comm

def __cinit__(self):
self._comm = <HcclComm>0

def __dealloc__(self):
if self._comm:
with nogil:
status = HcclCommDestroy(self._comm)
check_hccl_status(status)
self._comm = <HcclComm>0

def __init__(self, int ndev, tuple commId, int rank):
cdef HcclRootInfo _root_info
assert len(commId) == HCCL_ROOT_INFO_BYTES
for i in range(HCCL_ROOT_INFO_BYTES):
_root_info.internal[i] = commId[i]
with nogil:
status = HcclCommInitRootInfo(ndev, &_root_info, rank, &self._comm)
check_hccl_status(status)

def send(self, intptr_t sendbuf, size_t count, int datatype, int peer, intptr_t stream):
with nogil:
status = HcclSend(<void*>sendbuf, count, <HcclDataType>datatype, peer, self._comm, <aclrtStream>stream)
check_hccl_status(status)

def recv(self, intptr_t recvbuf, size_t count, int datatype, int peer, intptr_t stream):
with nogil:
status = HcclRecv(<void*>recvbuf, count, <HcclDataType>datatype, peer, self._comm, <aclrtStream>stream)
check_hccl_status(status)



def create_hccl_communicator(world_size, hccl_root_info, rank):
return HCCLCommunicator(world_size, hccl_root_info, rank)

编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from distutils.core import setup, Extension
from Cython.Build import cythonize
import os

ascend_home_path = os.getenv('ASCEND_HOME_PATH', '/usr/local/Ascend/ascend-toolkit/latest')

setup(name='Hello world app',
ext_modules=cythonize(Extension(
'hccl_utils',
sources = ['hccl_utils.pyx'],
language='c++',
include_dirs=[os.path.join(ascend_home_path, 'include')],
library_dirs=[os.path.join(ascend_home_path, 'lib64')],
libraries=['ascendcl','hccl'],
extra_compile_args=[],
extra_link_args=[]
)))

# python setup.py build_ext --inplace

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import hccl_utils

import multiprocessing
import time
import torch, torch_npu

def sender(queue):
torch.npu.set_device(0)
id = hccl_utils.get_unique_id()
queue.put(id)
print("sender: Root info sent!")

comm = hccl_utils.create_hccl_communicator(2, id, 0)
print("sender: communicator created!")

stream = torch.npu.Stream()
t1 = torch.ones(2, 2, dtype=torch.float32).npu()
print("sender: ready to send!")
comm.send(t1.data_ptr(), t1.numel(), 4, 1, stream.npu_stream)
print("sender: data sent!")


def receiver(queue):
torch.npu.set_device(1)
id = queue.get()
print(f"receiver: Received Root info")

comm = hccl_utils.create_hccl_communicator(2, id, 1)
print("receiver: communicator created!")

stream = torch.npu.Stream()
t1 = torch.zeros(2, 2, dtype=torch.float32).npu()
print("receiver: ready to recv!")
comm.recv(t1.data_ptr(), t1.numel(), 4, 0, stream.npu_stream)
stream.synchronize()
print(f"receiver: data received: {t1.cpu()}")



if __name__ == '__main__':
queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=sender, args=(queue,))
p2 = multiprocessing.Process(target=receiver, args=(queue,))

p1.start()
p2.start()

p1.join()
p2.join()


## (base) hua@hua-docker:~/code/hccl$ python test_hccl_utils.py
## sender: Root info sent!
## receiver: Received Root info
## sender: communicator created!
## receiver: communicator created!
## sender: ready to send!
## receiver: ready to recv!
## sender: data sent!
## receiver: data received: tensor([[1., 1.],
## [1., 1.]])

验证创建多个group

由于aDAG当前昇腾支持的问题,验证在同一个进程,绑定的同一张卡上创建多个group的情况是否可行,将上面的代码简单修改,先创建group1,然后发送数据,接着再创建group2,但是使用与group1不同的rank,再次通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import hccl_utils

import multiprocessing
import time
import torch, torch_npu

def sender(queue):
# create hccl group 1
torch.npu.set_device(1)
id = hccl_utils.get_unique_id()
queue.put(id)
print("sender: Root info sent!")
comm = hccl_utils.create_hccl_communicator(2, id, 0)
print("sender: communicator created!")
stream = torch.npu.Stream()
t1 = torch.ones(2, 2, dtype=torch.float32).npu()
print("sender: ready to send!")
comm.send(t1.data_ptr(), t1.numel(), 4, 1, stream.npu_stream)
print("sender: data sent!")

# create another group with different rank id
print("sender2: create new group")
id2 = hccl_utils.get_unique_id()
queue.put(id2)
print("sender2: Root info sent!")
comm2 = hccl_utils.create_hccl_communicator(2, id2, 1)
print("sender2: communicator created!")
stream2 = torch.npu.Stream()
t2 = torch.ones(3, 3, dtype=torch.float32).npu()
print("sender2: ready to send!")
comm2.send(t2.data_ptr(), t2.numel(), 4, 0, stream2.npu_stream)



def receiver(queue):
# create hccl group 1
torch.npu.set_device(0)
id = queue.get()
print(f"receiver: Received Root info")
comm = hccl_utils.create_hccl_communicator(2, id, 1)
print("receiver: communicator created!")
stream = torch.npu.Stream()
t1 = torch.zeros(2, 2, dtype=torch.float32).npu()
print("receiver: ready to recv!")
comm.recv(t1.data_ptr(), t1.numel(), 4, 0, stream.npu_stream)
stream.synchronize()
print(f"receiver: data received: {t1.cpu()}")

# create another group with different rank id
print(f"receiver2: create new group")
id2 = queue.get()
print(f"receiver2: Received Root info")
comm2 = hccl_utils.create_hccl_communicator(2, id2, 0)
print("receiver2: communicator created!")
stream2 = torch.npu.Stream()
t2 = torch.zeros(3, 3, dtype=torch.float32).npu()
print("receiver2: ready to recv!")
comm2.recv(t2.data_ptr(), t2.numel(), 4, 1, stream2.npu_stream)
stream2.synchronize()
print(f"receiver2: data received: {t2.cpu()}")



if __name__ == '__main__':
queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=sender, args=(queue,))
p2 = multiprocessing.Process(target=receiver, args=(queue,))

p1.start()
p2.start()

p1.join()
p2.join()



## hua@hua-docker:~/code/hccl$ python test_hccl_utils1.py
## sender: Root info sent!
## receiver: Received Root info
## sender: communicator created!
## sender: ready to send!
## receiver: communicator created!
## receiver: ready to recv!
## sender: data sent!
## sender2: create new group
## sender2: Root info sent!
## receiver: data received: tensor([[1., 1.],
## [1., 1.]])
## receiver2: create new group
## receiver2: Received Root info
## sender2: communicator created!
## sender2: ready to send!
## receiver2: communicator created!
## receiver2: ready to recv!
## receiver2: data received: tensor([[1., 1., 1.],
## [1., 1., 1.],
## [1., 1., 1.]])

节点间通信

Communucator是不同actor之间数据传输的工具,目前支持CPU和GPU两种Communucator。

PlantUML diagram

Communicator主要记录了分布式计算的相关信息,例如,world_sizerank等,并实现了all_reduce接口。CPU通过共享内存进行通信(跨节点怎么通信?),而GPU使用nccl进行卡间带外通信。

Communucator核心功能提供了sendrecvallreduce等通信操作。CPU通信中由于使用的是共享内存,所以不需要实现sendrecvall_reduce使用了CPUCommBarrier来等待所有actor完成计算并执行op,这里使用了barrier远程对象,该对象会被多个actor调用传递数据,并且计算完成后,每个actor都能拿到规约后的数据。GPU使用nccl库完成上述操作。

数据传输通道

Channel是多个actor之间的数据传输通道,每个channel有一个writer,以及多个reader。

PlantUML diagram

write表示将数据写入到channel中,read表示从channel中读取数据,这是同步操作的,如果写入时还有数据未读,或者读取时没有数据可读,这两个方法会block等待,可以通过timeout控制block时间。

CachedChannel

CachedChannel表示一次写入需要被读取多次的Channel,它可以接受一个内部的ChannelInterface,它作为一个warpper来缓存数据供reader多次读取。如果没有内部ChannelInterface,它会将write的数据写入到ChannelContext(单例)中,然后供reader多次读取。

ChannelContext的serialization_context使用了一个字典存储写入的数据,当读取次数达到设定后,会将数据从字典中清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def write(self, value: Any, timeout: Optional[float] = None):
# TODO: better organize the imports
from ray.experimental.channel import ChannelContext

if self._inner_channel is not None:
self._inner_channel.write(value, timeout) # 如果有内部channel,直接写到内部channel中。
return

# Otherwise no need to check timeout as the operation is non-blocking.

# Because both the reader and writer are in the same worker process,
# we can directly store the data in the context instead of storing
# it in the channel object. This removes the serialization overhead of `value`.
ctx = ChannelContext.get_current().serialization_context
ctx.set_data(self._channel_id, value, self._num_reads)

def read(self, timeout: Optional[float] = None) -> Any:
# TODO: better organize the imports
from ray.experimental.channel import ChannelContext

ctx = ChannelContext.get_current().serialization_context
if ctx.has_data(self._channel_id):
# No need to check timeout as the operation is non-blocking.
return ctx.get_data(self._channel_id) # 如果缓存里有数据,从缓存读。

assert (
self._inner_channel is not None
), "Cannot read from the serialization context while inner channel is None."
value = self._inner_channel.read(timeout) # 如果缓存里没有,就从内部channel中读,然后写到缓存供后续reader读取。
ctx.set_data(self._channel_id, value, self._num_reads)
return ctx.get_data(self._channel_id)

IntraProcessChannel

如果两个task在同一个worker进程中执行(一个worker可以同时执行多个task么?顺序执行么?),那么直接使用serialization_context来进行数据的传递(不需要加锁么?不是多线程,先后执行的么?)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def write(self, value: Any, timeout: Optional[float] = None):
# No need to check timeout as the operation is non-blocking.

# Because both the reader and writer are in the same worker process,
# we can directly store the data in the context instead of storing
# it in the channel object. This removes the serialization overhead of `value`.
ctx = ChannelContext.get_current().serialization_context
ctx.set_data(self._channel_id, value, self._num_readers)

def read(self, timeout: Optional[float] = None, deserialize: bool = True) -> Any:
assert deserialize, "Data passed from the actor to itself is never serialized"
# No need to check timeout as the operation is non-blocking.
ctx = ChannelContext.get_current().serialization_context
return ctx.get_data(self._channel_id)

Channel

Channel可以提供节点内或者节点之间的数据传输。Channel是对ray.ObjectRef的一个封装,使用plasma共享存储进行数据的传输。

  • 在channel创建时,write角色的actor会使用global worker执行put_object写入一个全0的buffer;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
worker = ray._private.worker.global_worker
worker.check_connected()

value = b"0" * buffer_size_bytes

try:
object_ref = worker.put_object(
value, owner_address=None, _is_experimental_channel=True
)
except ray.exceptions.ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects."
)
raise
return object_ref
  • 然后对所有reader中与writer不是同一节点的节点上创建一个对象的引用,每个节点只需要创建一个即可,这样当前节点上的所有reader都可以获取到这个数据(为什么不直接把writer_ref传过去,而是每个reader node上创建一个引用,并且这个引用需要拷贝?)。
1
2
3
4
5
6
7
8
9
10
11
12
# Find 1 reader in a remote node to create a reference that's
# shared by all readers. When a new value is written to a reference,
# it is sent to this reference.
reader = readers[0]
fn = reader.__ray_call__
self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo(
reader_ref=ray.get(
fn.remote(_create_channel_ref, buffer_size_bytes)
),
ref_owner_actor_id=reader._actor_id,
num_reader_actors=len(readers),
)
  • 向channel写数据时,先把数据序列化,然后直接写入channel,这部分调用的是core_woker中的c++代码。写入的数据还有可能是resize_buffer的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if not isinstance(value, SerializedObject):
try:
serialized_value = self._worker.get_serialization_context().serialize(
value
)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
msg = (
"Could not serialize the put value "
f"{repr(value)}:\n"
f"{sio.getvalue()}"
)
raise TypeError(msg) from e
else:
serialized_value = value

start_time = time.monotonic()
self._resize_channel_if_needed(serialized_value, timeout_ms)
if timeout is not None:
timeout_ms -= int((time.monotonic() - start_time) * 1000)
timeout_ms = max(timeout_ms, 0)

self._worker.core_worker.experimental_channel_put_serialized(
serialized_value,
self._writer_ref,
self._num_local_readers,
timeout_ms,
)
  • 读取信息时,如果是resize_buffer消息,则会重新注册reader,然后再读取信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ret = self._worker.get_objects(
[self._local_reader_ref], timeout=timeout, return_exceptions=True
)[0][0]

if isinstance(ret, _ResizeChannel):
self._node_id_to_reader_ref_info = ret._node_id_to_reader_ref_info
self._local_reader_ref = self._get_local_reader_ref(
self._node_id_to_reader_ref_info
)
# We need to register the new reader_ref.
self._reader_registered = False
self.ensure_registered_as_reader()
if timeout is not None:
timeout -= time.monotonic() - start_time
timeout = max(timeout, 0)
ret = self._worker.get_objects(
[self._local_reader_ref], timeout=timeout, return_exceptions=True
)[0][0]

BufferedSharedMemoryChannel

这是对上面Channel的一个封装,BufferedSharedMemoryChannel允许创建多个buffer,循环写入和读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def write(self, value: Any, timeout: Optional[float] = None) -> None:
"""Write a value to a channel.

If the next buffer is available, it returns immediately. If the next
buffer is not read by downstream consumers, it blocks until a buffer is
available to write. If a buffer is not available within timeout, it raises
RayChannelTimeoutError.
"""
# A single channel is not supposed to read and write at the same time.
assert self._next_read_index == 0
self._buffers[self._next_write_index].write(value, timeout)
self._next_write_index += 1
self._next_write_index %= self._num_shm_buffers

def read(self, timeout: Optional[float] = None) -> Any:
"""Read a value from a channel.

If the next buffer is available, it returns immediately. If the next
buffer is not written by an upstream producer, it blocks until a buffer is
available to read. If a buffer is not available within timeout, it raises
RayChannelTimeoutError.
"""
# A single channel is not supposed to read and write at the same time.
assert self._next_write_index == 0
output = self._buffers[self._next_read_index].read(timeout)
self._next_read_index += 1
self._next_read_index %= self._num_shm_buffers
return output

CompositeChannel

CompositeChannel允许一个writer将数据写入多个channel,当reader和writer是同一个actor时,创建一个IntraProcessChannel,如果不是,则创建一个BufferedSharedMemoryChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(
remote_reader_and_node_list,
local_reader_and_node_list,
) = utils.split_readers_by_locality(self._writer, self._reader_and_node_list)
# There are some local readers which are the same worker process as the writer.
# Create a local channel for the writer and the local readers.
num_local_readers = len(local_reader_and_node_list)
if num_local_readers > 0:
# Use num_readers = 1 when creating the local channel,
# because we have channel cache to support reading
# from the same channel multiple times.
local_channel = IntraProcessChannel(num_readers=1)
self._channels.add(local_channel)
actor_id = self._get_actor_id(self._writer)
self._channel_dict[actor_id] = local_channel
# There are some remote readers which are not the same Ray actor as the writer.
# Create a shared memory channel for the writer and the remote readers.
if len(remote_reader_and_node_list) != 0:
remote_channel = BufferedSharedMemoryChannel(
self._writer, remote_reader_and_node_list, num_shm_buffers
)
self._channels.add(remote_channel)

for reader, _ in remote_reader_and_node_list:
actor_id = self._get_actor_id(reader)
self._channel_dict[actor_id] = remote_channel

写入数据时,循环将数据写入所有的channel,读取时,仅需要读取当前reader actorid对应的channel。

(一个channel对象好像被write和reader一起持有?)

1
2
3
4
5
6
7
8
def write(self, value: Any, timeout: Optional[float] = None) -> None:
self.ensure_registered_as_writer()
for channel in self._channels:
channel.write(value, timeout)

def read(self, timeout: Optional[float] = None) -> Any:
self.ensure_registered_as_reader()
return self._channel_dict[self._resolve_actor_id()].read(timeout)

_TorchTensorNcclChannel

用户传输torch.tensor类型的数据,不能包含其他CPU传输的数据。使用该Channel之前,首先需要调用_init_communicator初始化nccl_group,然后根据writer和reader设置nccl_group的writer和reader对应的rank。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
self._writer_rank = self._nccl_group.get_rank(self._writer)
self._reader_ranks = [
self._nccl_group.get_rank(reader)
for reader, _ in self._reader_and_node_list
]

if (
self._writer_rank is not None
and self._writer_rank == self._nccl_group.get_self_rank()
):
self._writer_registered = True

if (
self._reader_ranks
and self._nccl_group.get_self_rank() in self._reader_ranks
):
self._reader_registered = True

如果没有指定元数据的Channel(shape和dtype),那么还会初始化一个meta_channel来通过CPU传输元数据。

1
2
3
4
5
6
7
8
9
10
if self._meta_channel is None and self._writer_registered:
# We are the writer. Therefore, we also need to allocate a metadata
# channel that will be used to send the shape and dtype of the
# tensor to the receiver(s).
metadata_type = SharedMemoryType()
self._meta_channel = metadata_type.create_channel(
self._writer,
self._reader_and_node_list,
None,
)

在发送数据时,需要先处理元数据,遍历所有tensor,获取他们的shape和dtype,写入到metadata数组中。如果设置了static_tensor,还会做元数据校验,如果出现了元数据不匹配会报错。

接受数据时,会先读取meta_channel,先获取到元数据信息,然后读取tensor数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def write(
self,
tensors: List["torch.Tensor"],
timeout: Optional[float] = None,
):

import torch

for tensor in tensors:
assert isinstance(
tensor, torch.Tensor
), f"{tensor} must be instance of torch.Tensor"

metadata = self._get_send_tensors_metadata(tensors) # 先发送元数据
if metadata is not None:
self._meta_channel.write(metadata)

for tensor in tensors:
# TODO: If there are multiple readers, can replace with a
# broadcast.
for rank in self._reader_ranks:
self._nccl_group.send(tensor, rank) # 发送tensor数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read(
self,
timeout: Optional[float] = None,
) -> Union["torch.Tensor", List["torch.Tensor"]]:
meta_list: List[_TorchTensorMetadata] = self._get_recv_tensors_metadata(timeout) # 先接受所有元数据

bufs: List["torch.Tensor"] = []
for meta in meta_list:
buf = self._nccl_group.recv(
meta.shape, meta.dtype, self._writer_rank, _torch_zeros_allocator # 在接收每个tensor
)
bufs.append(buf)
# TODO: Sync CUDA stream after receiving all tensors, instead of after
# each tensor.
return bufs

TorchTensorNcclChannel

该Channel可以发送包含torch.tensor的混合数据,其包含cpu_data_channel,gpu_data_channel:(_TorchTensorNcclChannel),发送前会先对value进行序列化,序列化会将torch.tensor对象记录下来,在value中记录一个pleaceholder,然后将其他数据完成序列化。Tensor数据不会进行序列化,直接通过nccl发送。如果typ标记为_direct_return,那么说明发送的数据仅有torch.tensor,可以不调用value的序列化以提高执行效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
try:
# Serialize the data. All tensors that match our current device
# will be extracted into the serialization context and replaced
# with a placeholder.
cpu_data = self._worker.get_serialization_context().serialize(value)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
msg = (
"Could not serialize the put value "
f"{repr(value)}:\n"
f"{sio.getvalue()}"
)
raise TypeError(msg) from e
finally:
# Pop the tensors that were found during serialization of `value`.
gpu_tensors, _ = self.serialization_ctx.reset_out_of_band_tensors([])
# Reset the serialization method to now serialize torch.Tensors
# normally.
self.serialization_ctx.set_use_external_transport(False)

# First send the extracted tensors through a GPU-specific channel.
self._gpu_data_channel.write(gpu_tensors)
# Next send the non-tensor data through a CPU-specific channel. The
# data contains placeholders for the extracted tensors.
self._cpu_data_channel.write(cpu_data)

Channel读取时,先读取tensor数据,然后读取cpu数据,这个过程会将placeholder替换回Tensor,然后返回数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 self.serialization_ctx.reset_out_of_band_tensors(tensors)

# Next, read and deserialize the non-tensor data. The registered custom
# deserializer will replace the found tensor placeholders with
# `tensors`.
data = self._cpu_data_channel.read(
timeout=timeout,
)
# Check that all placeholders had a corresponding tensor.
(
_,
deserialized_tensor_placeholders,
) = self.serialization_ctx.reset_out_of_band_tensors([])
assert deserialized_tensor_placeholders == set(range(len(tensors)))

return data

ChannelOutputType

Channel的创建,一般是根据DAGNode的输出类型来创建,此时一般创建一个ChannelOutputType的结构,然后调用其create_channel方法来创建channel。

该类型表示当前Channel的输出类型,目前有两个子类SharedMemoryTypeTorchTensorType。其中SharedMemoryType比较简单,仅提供一个create_channel方法,该方法会创建一个CompositeChannel类型的channel。

TorchTensorType还需要注册自定义序列化和反序列化函数,如果使用nccl通信,torch.tensor类型的对象会直接通过nccl发送,并且在原数据的序列化中记录一个placeholder。如果没有nccl,那么会转成numpy对象,然后进行序列化通过CPU发送,这将带来额外的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def register_custom_serializer(self) -> None:
super().register_custom_serializer()

import torch

def serialize(t):
ctx = ChannelContext.get_current()
return ctx.serialization_context.serialize_tensor(t)

def deserialize(b):
ctx = ChannelContext.get_current()
return ctx.serialization_context.deserialize_tensor(b)

ray.util.serialization.register_serializer(
torch.Tensor,
serializer=serialize,
deserializer=deserialize,
)

Reader/WriterInterface

Reader/WriterInterface是从actor的角度整合相关Channel,也就是某个actor可能会读取多个输入,并且将输出写入到多个channel中。这两个Interface分别有两个子类,同步和异步的Interface。同步Interface较简单,从channel list中读取所有的数据即可。异步的Interface会使用异步(asyncio)来等待数据。

生成执行计划

DAGOperationGraph是更细粒度的任务图,构建图能够理清任务之间的依赖关系,便于生成每个actor的schedule。

DAGOperationGraphNode

DAGNodeOperationNode一共有三种类型,分别是READ,COMPUTEWRITE。DAG计算图经过编译后,会生成DAGOperationGraph,并根据这个图,生成每个actor执行的schedule,编译图执行时,会循环执行schedule中的任务,直到channel关闭。

DAGOperationGraphNode主要实现的是一个优先级比较函数,用于任务优先级队列的排序。(exec_task_id和task_id的区别?这里的compare函数是不是完备的)

其内部实现了compare函数,比较逻辑是:如果exec_task_id不同,则比较operation的exec_task_id,否则比较task_id

  1. 如果任务节点来自同一个actor,使用compare比较;
  2. 如果参与比较的两个任务一个是需要nccl参与,另外一个不需要,那么不需要参与的优先级高;
  3. 如果都依赖nccl,那么也用compare比较。

_add_edge && _build_dag_node_operation_graph

该函数建立所有GraphNode之间的依赖关系,为拓扑排序做准备。在_build_dag_node_operation_graph建图过程中,会使用该函数添加所有节点之间的边。每条边要记录两端节点的in_edgesout_edges

每个DAG计算图中的节点会在编译时创建三个DAGOperationGraphNode,分别是READ,COMPUTEWRITE,遍历所有的Node,按一下规则进行连接:

  1. 相同task中,从READ节点连接到COMPUTE节点,然后从COMPUTE节点连接到WRITE节点。
  2. 同一个actor中的不同bind_index的COMPUTE节点,按bind的顺序添加边。(看起来是为了保证actor执行顺序的)。
  3. 遍历所有的task的下游节点,task的WRITE连接到下游的READ节点。

注意,如果是class_method_output类型的ClassMethodNode,需要连接其上游节点和其下游节点,因为class_method_output类型的Node,仅用于保存数据,不涉及节点依赖关系。

_generate_actor_to_execution_schedule

根据OperationGraph构建actor schedule。

  1. 先找出入度为0的节点,这些节点可以直接执行;
  2. 然后找出这些节点的后续节点;
  3. 在检查next_node的后续节点,如果入度已经为0(或者所有read_connective_Idxs全部ready),则可以执行。

直到所有的节点都访问过。这样就获得了每个actor的node的执行顺序。如果node是一个计算节点,并且需要使用nccl,说明其后是一个collective_node,当前节点is_ready后,将当前节点信息记录到collective_node的read_connective_Idxs中,方便该节点检查是否ready。

_generate_overlapped_execution_schedule

该方法对已经构建的每个actor的schedule进行顺序调整,让其支持计算和通信并行。目前支持nccl read操作。方法是,根据这个READ操作向前找,直到找到一个COMPUTE操作,然后将该READ操作插入到COMPUTE操作之前,由于异步读取的缘故,读取过程和COMPUTE过程可以并行执行。

编译计算图

简单的理解,计算图的编译就是将DAG涉及的节点之间的数据传递关系创建好通道(channel),然后将所有的Task运行起来,每个Task分为三个操作,分别是读取,计算和发送,循环执行。

这样避免了临时的资源创建的调度开销,并且,还支持NCCL这种带外通信,避免了Host拷贝和传输的开销。

ExecutableTask

ExecutableTask近似于代表一个DAGNode,但是多了一些元数据信息。例如输出输出的Channel等,Task会将所有的输入封装成ReaderInterface,所有的输出封装成WriterInterface

该Task对外提供exec_operation方法,此方法根据传入的op_type来决定执行读,计算还是发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def exec_operation(
self,
class_handle,
op_type: _DAGNodeOperationType,
overlap_gpu_communication: bool = False,
) -> bool:
if op_type == _DAGNodeOperationType.READ:
with _device_context_manager():
with self._recv_stream:
return self._read(overlap_gpu_communication)
elif op_type == _DAGNodeOperationType.COMPUTE:
return self._compute(overlap_gpu_communication, class_handle)
elif op_type == _DAGNodeOperationType.WRITE:
with _device_context_manager():
with self._send_stream:
return self._write()

其中为了支持gpu的计算通信并行,还涉及到AwaitableBackgroundReaderAwaitableBackgroundWriter,以及GPUFuture。也就是说Read和Compute操作可以返回(或操作)一个future对象。

CompiledDAG

编译流程比较长,重点的两个函数是preprocessget_or_compile

preprocess

  1. 找到InputNode和OutputNode,检查所有的Task是不是都是Actor,task暂时不支持;
  2. 配置带外通信相关信息,并且收集collective_ops;
  3. 处理上游节点,包括配置type_hint,记录上游actor等;
  4. 根据带外通信信息,初始化communicator;

get_or_compile

  1. 从input_task广度优先遍历DAG,创建所有的Channel;
  2. 做死锁检测,目前尚未实现逻辑;
  3. 针对每一个task,构建所有arg和其消费者的map;
  4. 如果有多个消费者,创建Cached Channel供多次读取;
  5. 针对每个Task创建Executable_task;
  6. 创建每个task的执行计划build_execution_schedule:
    1. 创建DAGOperationGraph节点,每个Task对应读,计算,写;
    2. 根据依赖关系,创建DAGOperationGraph;
    3. 根据图的拓扑排序,生成每个actor的执行计划;
    4. 如果配置了gpu并行计算(overlapped_execution),将actor的执行计划做调整;
  7. 然后将执行计划交给exec_task_func(do_exec_tasks)执行,这个函数是一个死循环,会循环执行执行计划中的Operation;
  8. 开启Monitor线程,定期检测所有Task的运行情况,如果有Exception,终止执行。

其他关注的点

  1. 全篇Accelerators的通信全部交xxx_nccl,但是只有Nvidia的ccl库才叫nccl,应该修改来避免歧义;同理也有gpu_xxx,应该改为device_xxx等;
  2. TorchTensorType的transform允许传入一个Comminicator对象,但是会校验名称是否在AUTO,NCCL或者CPU中,这里存在bug,应当加入传入的Communicator的名称。同时,AUTO其实没有作用,默认使用CPU通信,这个AUTO实际上可以通过自动检测硬件的方式来初始化对应的Communicator。
  3. 自定义Communicator目前只实现了CPUCommunicator,为了方便调试,需要补充文档和样例,来指导使用自定义Communicator。

什么是DAG

Ray DAG(Directed Acyclic Graph)是 Ray 计算框架中用于表示任务依赖关系的有向无环图结构。它定义了一组计算任务(Tasks 或 Actors)及其依赖关系,以 DAG 形式组织任务的执行,简称计算图。

DAG的作用有:

  1. 延迟计算,与remote不同,bind方法仅会构建计算图,而不是立即执行。可以打包复杂的调用关系,然后直接执行图,DAG可以重复执行;
  2. 是实现workflow的核心组件;
  3. 是实现Accelerate DAG的核心组件。

DAG的使用方法

使用方法参考官方手册,DAG节点可以包含Class,ClassMethod,Function,Input, Output等类型的节点。

更直观的方式是将DAG的可视化,当前支持生成图片或者生成ascii格式的DAG示意图,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import ray
import time
import random
from ray.dag.input_node import InputNode
from ray.dag.output_node import MultiOutputNode
ray.init(address = "10.218.163.33:6274")
import os
@ray.remote
class Actor:
def __init__(self, init_value, fail_after=None, sys_exit=False):
self.i = init_value
self.fail_after = fail_after
self.sys_exit = sys_exit

self.count = 0

def echo(self, x):
self.count += 1
return x

def sleep(self, x):
time.sleep(x)
return x

@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

def read_input(self, x):
return x


a = Actor.remote(0)
b = Actor.remote(0)
single_fetch = True
with InputNode() as i:
o1, o2 = a.return_two.bind(i)
o3 = b.echo.bind(o1)
o4 = b.echo.bind(o2)
dag = MultiOutputNode([o3, o4])

compiled_dag = dag.experimental_compile()
for _ in range(3):
refs = compiled_dag.execute(1)
if single_fetch:
for i, ref in enumerate(refs):
res = ray.get(ref)
assert res == i + 1
else:
res = ray.get(refs)
assert res == [1, 2]
compiled_dag.visualize()
compiled_dag.teardown()
image-20250210140344597

DAGNode

DAG是由DAGNode组成,每个DAGNode代表一个Task或者Actor,图的边代表数据的依赖关系,由被依赖者指向依赖者。整个图从上到下分别是InputNode,各个任务Node以及OutputNode。

PlantUML diagram

DAGNode

DAGNode是对操作和其参数的一个封装,其包含实际执行的逻辑,例如,针对FunctionNode而言,就是函数对象,以及该函数的所有入参。DAGNode还会记录当前节点的上下游信息,并且能够按依赖顺序执行每一个Node。

PyObjScanner

DAGNode中有多个函数均使用到了PyObjScanner,这是利用pickle库来扫描给定对象中所有的DAGNodeBase类型的对象。比如,在获取节点的上游节点(所有依赖的节点)collect_upstream_nodes以及扫描所有children节点get_all_child_nodes时,就是通过扫描所有入参,来确定在执行当前Node之前,有哪些Node需要先完成计算。

apply_recursive

DAGNode执行的核心是这个递归执行函数,该函数从叶子节点开始,根据依赖关系递归的执行。apply_recursive提供了一个Cachingfn的内部类,该类能够缓存当前节点执行结果(也有可能是一个future对象)而避免节点被重复计算。当Cachingfn对象创建后,会将原来的执行函数替换成自己。

  • 如果apply_recursive不是第一次被调用,则直接返回cache中缓存的结果。

  • 如果是第一次被调用,首先根据传入的执行函数fn来生成Cachingfn对象,然后将fn函数替换成Cachingfn对象,该对象提供__call__方法,该方法是一个warpper,调用原始回调函数fn,并且记录缓存,并返回结果。

以FunctionNode为例,apply_recursive的回调函数是将FunctionNode中记录的函数构造成remote对象,然后直接调用remote方法。所以该回调函数的参数需要是一个数据对象,例如:标准的数据类型,或者是一个object_ref。但是DAGNode的arg是另外一个DAGNode,并不能直接被回调函数所执行,所以,在执行当前节点之前,需要对入参进行替换。

apply_and_replace_all_child_nodes

该函数就提供了替换DAGNode入参类型的功能。该函数接受一个回调函数作为参数,该回调函数即对为Node的apply_recursive替换也就是将当前的DAGNode,通过apply_recursive替换成当前节点执行完成后的结果。而这个apply_recursive函数会递归的处理子节点的所有子节点。

最终,会递归到InputNode,获取到InputNode的值,然后交给InputNode的后续节点计算,然后再将计算结果交个在后续的节点,直到执行完毕,执行完成后会返回一个object_ref,可以使用get方法获取实际的计算结果。

PR点:get_toplevel_child_nodes没有引用了,可以删除

FunctionNode

FunctionNode是DAGNode的子类,代表一个函数节点,该节点通过remote函数的.bind方法生成,除了基类的属性之外,FunctionNode还需要保存函数体。

execute_impl即调用该函数的remote方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
def _execute_impl(self, *args, **kwargs):
"""Executor of FunctionNode by ray.remote().

Args and kwargs are to match base class signature, but not in the
implementation. All args and kwargs should be resolved and replaced
with value in bound_args and bound_kwargs via bottom-up recursion when
current node is executed.
"""
return (
ray.remote(self._body)
.options(**self._bound_options)
.remote(*self._bound_args, **self._bound_kwargs)
)

ClassNode和ClassMethodNode

ClassNode是DAGNode的子类,代表一个类,该节点通过remote类的.bind方法生成,该Node中保存了Class本身。ClassNode的execute_impl函数仅执行了remote方法,将对象在集群中实例化。

当使用该Node对应类的函数时,需要对这些函数在进行一次bind。ClassNode通过__getattr__获得其中Class的函数,该函数也提供了.bind方法,调用可以生成ClassMethodNode对象。在对ClassMethod进行bind是,还会记录actor的句柄。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ClassNode:
def __getattr__(self, method_name: str):
# User trying to call .bind() without a bind class method
if method_name == "bind" and "bind" not in dir(self._body):
raise AttributeError(f".bind() cannot be used again on {type(self)} ")
# Raise an error if the method is invalid.
getattr(self._body, method_name)
call_node = _UnboundClassMethodNode(self, method_name, {})
return call_node

class _UnboundClassMethodNode:
def bind(self, *args, **kwargs):
other_args_to_resolve = {
PARENT_CLASS_NODE_KEY: self._actor,
PREV_CLASS_METHOD_CALL_KEY: self._actor._last_call,
}

node = ClassMethodNode(
self._method_name,
args,
kwargs,
self._options,
other_args_to_resolve=other_args_to_resolve,
)
self._actor._last_call = node
return node

ClassMethodNode也是DAGNode的子类,代表一个类的成员函数,分为class_method_call以及class_method_output两种类型,class_method_output类型是一个特殊的Node,该node仅作为存储ClassMethodNode的结果使用。ClassMethodNode的bind函数返回的就是一个class_method_output类型的tuple。class_method_output类型的Node执行即为返回其中实际存储的CLassMethod的返回结果。简单的说,就是将一个函数返回的tuple做了一个封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _execute_impl(self, *args, **kwargs):
"""Executor of ClassMethodNode by ray.remote()

Args and kwargs are to match base class signature, but not in the
implementation. All args and kwargs should be resolved and replaced
with value in bound_args and bound_kwargs via bottom-up recursion when
current node is executed.
"""
if self.is_class_method_call:
method_body = getattr(self._parent_class_node, self._method_name)
# Execute with bound args.
return method_body.options(**self._bound_options).remote(
*self._bound_args,
**self._bound_kwargs,
)
else:
assert self._class_method_output is not None
return self._bound_args[0][self._class_method_output.output_idx]

PR点:其实这个output类型的Node仅作为数据的存储,和ClassMethodNode混在一起比较难理解,不如搞一个新的Node类型

InputNode和InputAttributeNode

InputNode是DAGNode的子类,代表一组输入,InputAttributeNode也是DAGNode的子类,代表一组输入中的一个元素,可以通过下标或者.操作符来获取。

1
2
with ray.dag.InputNode as inp:
x = fun.bind(inp)

上述代码是使用整个input作为fun函数的入参,这个参数就是InputNode;

1
2
3
with ray.dag.InputNode as inp:
x = fun1.bind(inp[0])
y = fun2.bind(inp.x)

这段代码中,fun1和fun2使用了input中的部分数据,这里的参数就是InputAttributeNode。

InputNode的execute_impl比较简单,就是返回args,InputAttributeNode内部有InputNode的引用,通过__get_attr__以及__get_item__函数来获取InputNode中的arg。

MultiOutputNode

MultiOutputNode是DAGNode的子类,可以储存多个输出。比如,我们想执行一个计算图,但是希望获取超过1个的结果,这时就需要构建一个MultiOutputNode。

MultiOutputNode的execute_impl就是将入参转成一个list返回。

CollectiveOutputNode

CollectiveOutputNode是ClassMethodNode的子类,这个Node不能被执行,所以仅用于计算图编译中。该类使用other_args_to_resolve传入一个_CollectiveOperation。

_CollectiveOperation目前仅支持allReduce,该类提供了nccl_group的初始化功能,将all_reduce的节点放到一个group中。其execute方法就是执行group的all_reduce方法

1
2
3
4
5
6
7
8
9
type_hint = self._type_hint
if type_hint.communicator_id is not None:
return type_hint.communicator_id
if communicator_id is None:
communicator_id = _init_communicator(
self._actor_handles, type_hint.get_custom_communicator()
)
type_hint.set_communicator_id(communicator_id)
return communicator_id
1
2
3
4
5
6
7
8
import torch

if not isinstance(send_buf, torch.Tensor):
raise ValueError("Expected a torch tensor")
communicator = self.get_communicator()
recv_buf = torch.empty_like(send_buf)
communicator.allreduce(send_buf, recv_buf, self._op)
return recv_buf

背景知识

Ray是一个使用Bazel构建的,基于gRPC上层打造的开源分布式计算框架,旨在简化分布式应用的开发和运行。它支持无缝地将 Python 代码扩展到多核、多节点环境,适合构建高性能的分布式系统。Ray 提供灵活的任务调度和状态管理,支持多种编程模型,包括任务并行和 actor 模式,并通过自动化的资源管理和容错机制简化复杂分布式工作的部署。它还拥有丰富的生态系统,包含机器学习库(如 Ray Tune、Ray Serve 和 RLlib),适用于模型训练、超参数调优、在线服务等场景,是云原生应用和大规模计算的理想选择。

社区主页

Bazel

Bazel是一种高效、可扩展的构建工具,最初由Google开发,专为管理大型代码库和复杂项目而设计。它支持多语言和多平台构建,包括C++, Java, Python, Go等,并能够跨操作系统(如Linux、macOS和Windows)执行构建任务。Bazel通过声明式的构建规则(BUILD文件)和依赖管理,实现了高性能的增量构建,避免了不必要的重复编译。其特点包括分布式构建、沙盒化执行和强大的缓存机制,可以加快构建速度并提高构建的稳定性。此外,Bazel还提供高度可配置的扩展机制,方便开发者为特定需求编写自定义规则,适合从小型项目到超大规模工程的使用场景。

社区主页

gRPC

gRPC 是由 Google 开发的高性能开源 RPC 框架,基于 HTTP/2 协议,支持多语言和跨平台通信。它使用 Protocol Buffers 定义接口和序列化数据,简化了服务间的集成开发。gRPC 提供高效的请求-响应模型、流式传输、负载均衡和内置 TLS 安全特性,非常适合云原生应用、微服务架构和实时通信场景,广泛应用于分布式系统和高性能应用开发中。

社区主页

cython

Cython 是一种优化的 Python 扩展语言,结合了 Python 的易用性和 C 的高性能,旨在提升 Python 代码的运行速度。通过将 Python 代码转译为 C 或 C++ 并进行编译,Cython 可以显著减少运行时的性能开销,同时支持调用 C/C++ 库,从而实现与底层代码的高效交互。Cython 保留了大部分 Python 的语法,同时允许使用 C 类型声明进行性能优化,非常适合计算密集型任务或对性能要求较高的场景,如科学计算、机器学习和数据处理。

社区主页

角色

Ray集群的整体架构如下图所示

Ray架构图

一个Ray集群包括多个Node节点,其中每个Node节点包含Actor,Worker,共享内存,本地调度器。其中Head Node还有GCS服务,包含各类元数据存储,WebUI,全局调度等功能。

Node

Ray Nodes

Node中包含一个Raylet进程,负责本地调度以及共享内存。Raylet会根据任务情况启动一个或者多个worker或者Actor。Head node是一个特殊的Node,除了普通Node的功能之外,还有一些外的进程,包括gcs_server服务,dashboard等。并且每个Node节点还会启动monitor进程,log_monitor进程,agent进程等。

Raylet和gcs_server是非Python进程,其他辅助进程,包括Driver,worker以及Actor均是python进程(针对Python语言而言)。

gcs_server

Ray 的 GCS Server 是 Ray 框架的核心组件,负责元数据存储、任务调度、资源管理和集群状态维护。它通过存储模块(Redis 或 内存)管理节点和任务的生命周期,使用 Pub-Sub 系统进行状态广播,并通过高效的调度协调与其他组件(如 Scheduler, Worker, Actor 或 Object)协作,确保系统的高性能、高可用性和灵活扩展性。gcs_server是一个非Python进程,二进制文件路径在 ray/python/ray/core/src/ray/gcs/gcs_server

raylet

Raylet 是 Ray 集群中每个节点的核心运行时组件,负责任务执行、资源管理和数据依赖协调。它接收 GCS Server 分配的任务,管理本地资源,启动 Worker 进程执行任务,并通过 Plasma Store 处理数据存储与传输。同时,Raylet 定期向 GCS Server 汇报节点状态,协作实现任务调度、资源分配和故障恢复,是 Ray 分布式运行的关键执行单元。raylet是一个非Python进程,二进制文件路径在 ray/python/ray/core/src/ray/raylet/raylet

worker

Worker 是 Ray 中的核心计算单元,由 Raylet 启动,负责具体任务的执行和 Actor 的运行。它与 Plasma Store 协作进行数据存取,并通过与 Raylet 的通信完成任务调度和资源管理。Worker 支持多语言运行环境(如 Python、Java),能够高效并行处理任务,是 Ray 框架实现分布式计算的基础组件。

actor

Actor 是 Ray 框架中的一种状态管理单元,允许用户在分布式系统中创建带有持久状态的计算对象。每个 Actor 由一个独立的 Worker 进程运行,支持并行调用方法并维护自身状态。Actor 可以通过远程调用接口与其他组件交互,实现任务分解和动态扩展,是 Ray 中用于构建有状态应用和分布式服务的重要抽象。

driver

Driver就是用户程序(例如,用@ray.remote修饰的用户python代码),Driver负责Task的定义和提交,需要运行在Ray的Head或者Node节点上。

编译

安装Ray有多种方法,包括wheel包,pip,conda,容器镜像等。这些内容可以参考社区手册。这里介绍从源码安装

使用Conda环境

官方推荐conda或者venv两种虚拟环境安装ray,建议选择conda。无虚拟环境将无法编译ray,并且在实测中发现了venv的未知错误。

1
2
conda create -c conda-forge python=3.9 -n myenv
conda activate myenv

安装依赖

1
2
sudo apt-get update
sudo apt-get install -y build-essential curl clang pkg-config psmisc unzip

安装bazel

1
ci/env/install-bazel.sh

安装npm

用于dashboard

1
2
3
$(curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh)
nvm install 14
nvm use 14

构建

构建dashboard

1
2
3
cd ray/python/ray/dashboard/client
npm ci
npm run build

构建ray

1
2
3
4
5
6
7
8
cd ../../..
pip install -r requirements.txt

##如果构建机器的内存小于32G,需要限制内存使用,避免oom
export BAZEL_ARGS="--local_ram_resources=8"
##debug编译,保留符号表供调试
export RAY_DEBUG_BUILD=debug
pip install -e . --verbose

注意:有可能构建环境会错误的选择到gcc和lld,会导致一些奇怪的编译错误(例如,你的环境变量中存在lld,被识别,但是构建过程中并不会使用非系统路径下的lld等)。这时可以通过指定LD来解决:

在 ~/.bazelrc 中加入:build --linkopt=-fuse-ld=gold

可选的编译环境变量

  • RAY_INSTALL_JAVA: If set and equal to 1, extra build steps will be executed to build java portions of the codebase
  • RAY_INSTALL_CPP: If set and equal to 1, ray-cpp will be installed
  • RAY_DISABLE_EXTRA_CPP: If set and equal to 1, a regular (non - cpp) build will not provide some cpp interfaces
  • SKIP_BAZEL_BUILD: If set and equal to 1, no Bazel build steps will be executed
  • SKIP_THIRDPARTY_INSTALL: If set will skip installation of third-party python packages
  • RAY_DEBUG_BUILD: Can be set to debug, asan, or tsan. Any other value will be ignored
  • BAZEL_ARGS: If set, pass a space-separated set of arguments to Bazel. This can be useful for restricting resource usage during builds, for example. See https://bazel.build/docs/user-manual for more information about valid arguments.
  • IS_AUTOMATED_BUILD: Used in CI to tweak the build for the CI machines
  • SRC_DIR: Can be set to the root of the source checkout, defaults to None which is cwd()
  • BAZEL_SH: used on Windows to find a bash.exe, see below
  • BAZEL_PATH: used on Windows to find bazel.exe, see below
  • MINGW_DIR: used on Windows to find bazel.exe if not found in BAZEL_PATH

启动一个Ray集群

接下来使用一个简单的例子来使用Ray,这是一个使用概率计算圆周率π的程序(蒙特卡洛法)。蒙特卡洛方法在计算圆周率时设一个正方形内部相切一个圆,这时圆和正方形的面积之比是π/4。在这个正方形内部,随机产生n个点(这些点服从均匀分布),计算它们与中心点的距离是否大于圆的半径,以此判断是否落在圆的内部。统计圆内的点数,与n的比值乘以4,就是π的值。理论上,n越大,计算的π值越精确。

蒙特卡洛法
蒙特卡洛法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import ray
import random

# 初始化 Ray
ray.init()

# 单个任务:生成点并统计圆内点数
@ray.remote
def count_points_in_circle(num_samples: int) -> int:
count = 0
for _ in range(num_samples):
x, y = random.uniform(0, 1), random.uniform(0, 1)
if x**2 + y**2 <= 1:
count += 1
return count

def calculate_pi(num_samples: int, num_workers: int) -> float:
# 每个 worker 分配的样本数
samples_per_worker = num_samples // num_workers

# 创建并运行任务
futures = [
count_points_in_circle.remote(samples_per_worker) for _ in range(num_workers)
]

# 收集结果
total_in_circle = sum(ray.get(futures))

# 计算圆周率
pi_estimate = 4 * total_in_circle / num_samples
return pi_estimate

if __name__ == "__main__":
# 总样本数和并行任务数
total_samples = 100_000_000
num_workers = 10

# 计算 π
pi = calculate_pi(total_samples, num_workers)
print(f"Estimated π: {pi}")

# 关闭 Ray
ray.shutdown()

临时启动

如果不启动Ray集群,直接执行该Python程序,那会在当前节点上默认拉起一个ray集群供计算。

1
2
3
4
python pi.py

##2025-01-21 09:35:24,180 INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
##Estimated π: 3.12

启动集群

启动Head节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ray start --head

##Local node IP: 192.168.64.8
##
##--------------------
##Ray runtime started.
##--------------------
##
##Next steps
## To add another node to this Ray cluster, run
## ray start --address='192.168.64.8:6379'
##
## To connect to this Ray cluster:
## import ray
## ray.init()
##
## To submit a Ray job using the Ray Jobs CLI:
## RAY_ADDRESS='http://127.0.0.1:8265' ray job submit --working-dir . -- python my_script.py
##
## See https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html
## for more information on submitting Ray jobs to the Ray cluster.
##
## To terminate the Ray runtime, run
## ray stop
##
## To view the status of the cluster, use
## ray status
##
## To monitor and debug Ray, view the dashboard at
## 127.0.0.1:8265
##
## If connection to the dashboard fails, check your firewall settings and network configuration.

可以在http://127.0.0.1:8265查看Ray控制台。

启动Node节点

1
2
3
4
5
6
7
8
9
10
11
ray start --address='192.168.64.8:6379'

##Local node IP: 192.168.64.9
##[2025-01-21 10:49:34,903 W 1882 1882] global_state_accessor.cc:463: Retrying to get node with node ID ba8aafea9f23f6f29ff6cd174e31aaac37cddb0e832c4e3170ddcf63
##
##--------------------
##Ray runtime started.
##--------------------
##
##To terminate the Ray runtime, run
## ray stop

集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
======== Autoscaler status: 2025-01-21 10:54:25.170247 ========
Node status
---------------------------------------------------------------
Active:
1 node_948847515a43d4fba13c1bdb6a5e5611c2580ecb60f60183ef033771
1 node_ba8aafea9f23f6f29ff6cd174e31aaac37cddb0e832c4e3170ddcf63
Pending:
(no pending nodes)
Recent failures:
(no failures)

Resources
---------------------------------------------------------------
Usage:
0.0/16.0 CPU
0B/8.40GiB memory
0B/3.93GiB object_store_memory

Demands:
(no resource demands)

启用监控

Prometheus

ray提供了一个命令来下载和部署普罗米修斯,ray提供了数据采集接口,可以让普罗米修斯通过这个接口来收集集群数据,注意,简易命令拉起的普罗米修斯不能用于生产环境,完整部署可参考官方手册

1
ray metrics launch-prometheus

可以在这个地址上查看普罗米修斯状态:http://localhost:9090,可查看其采集的信息ray_dashboard_api_requests_count_requests_total

普罗米修斯dashboard

grafana

普罗米修斯采集的数据,通过grafana的方式进行可视化显示,并且ray dashboard中的metric页面的信息也是来自于grafana。可以通过启动新的grafana服务来完成配置。

1
2
3
4
cd /usr/share/grafana

##启动grafana需要创建data目录,需要sudo执行
sudo ./bin/grafana-server --config /tmp/ray/session_latest/metrics/grafana/grafana.ini web

将grafana dashboard加入到已有的grafana server可以参考官方手册

可以在这个地址上查看grafana的dashboard:http://localhost:3000

grafana dashboard

Ray dashboard

完成上述两个步骤后,Ray dashboard中的metric即可正常显示,如果不是本机部署,你可能需要配置允许所有ip访问:

1
RAY_GRAFANA_HOST=http://192.168.64.8:3000 ray start --head --dashboard-host=0.0.0.0

RAY_GRAFANA_HOST的作用是让ray的dashboard能够访问到grafana服务;

--dashboard-host=0.0.0.0允许所有ip访问。

Ray dashboard
Ray dashboard

提交一个任务

1
2
3
4
5
python pi.py

##2025-01-21 11:17:11,760 INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 192.168.64.8:6379...
##2025-01-21 11:17:11,775 INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at 192.168.64.8:8265
##Estimated π: 3.36
Ray 任务列表
Ray 任务详情
Ray 任务详情

部署一个服务

Ray除了提供基础的分布式计算能力之外,还提供了一系列的AI libs,其中可以在其上部署服务,Ray自动提供proxy和负载均衡能力。这里使用一个翻译的服务举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from starlette.requests import Request

import ray
from ray import serve

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
def __init__(self):
# Load model
self.model = pipeline("translation_en_to_fr", model="t5-small")

def translate(self, text: str) -> str:
# Run inference
model_output = self.model(text)

# Post-process output to return only the translation text
translation = model_output[0]["translation_text"]

return translation

async def __call__(self, http_request: Request) -> str:
english_text: str = await http_request.json()
return self.translate(english_text)


translator_app = Translator.bind()

if __name__ == "__main__":
ray.init()
serve.start(http_options={"host": "0.0.0.0"}) # 设置监听地址为 0.0.0.0
serve.run(translator_app)

具体修改方法,可以参考官方手册

直接运行这个python程序即可完成服务的部署:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
python translate.py

##2025-01-21 11:23:45,794 INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 192.168.64.8:6379...
##2025-01-21 11:23:45,810 INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at 192.168.64.8:8265
##INFO 2025-01-21 11:23:46,673 serve 8302 -- Started Serve in namespace "serve".
##INFO 2025-01-21 11:23:46,675 serve 8302 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
##WARNING 2025-01-21 11:23:46,675 serve 8302 -- The new client HTTP config differs from the existing one in the following fields: ['host', 'location']. The new HTTP config is ignored.
##(ServeController pid=6931) INFO 2025-01-21 11:23:46,687 controller 6931 -- Deploying new version of Deployment(name='Translator', app='default') (initial target replicas: 2).
##(ProxyActor pid=8045) INFO 2025-01-21 11:23:46,644 proxy 192.168.64.8 -- Proxy starting on node 8e8707766c1fc9b7d838c24446c99440be5881c04ea44b6e4e83a7aa (HTTP port: 8000).
##(ProxyActor pid=8045) INFO 2025-01-21 11:23:46,660 proxy 192.168.64.8 -- Got updated endpoints: {}.
##(ProxyActor pid=8045) INFO 2025-01-21 11:23:46,690 proxy 192.168.64.8 -- Got updated endpoints: {Deployment(name='Translator', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
##(ServeController pid=6931) INFO 2025-01-21 11:23:46,792 controller 6931 -- Adding 2 replicas to Deployment(name='Translator', app='default').
##(ServeReplica:default:Translator pid=8044) Device set to use cpu
##INFO 2025-01-21 11:23:50,711 serve 8302 -- Application 'default' is ready at http://0.0.0.0:8000/.
##INFO 2025-01-21 11:23:50,711 serve 8302 -- Deployed app 'default' successfully.
##(ServeReplica:default:Translator pid=2371, ip=192.168.64.9) Device set to use cpu

在dashboard上可以看到服务的详情:

Ray Serve详情
Ray Serve metrics

通过curl命令可以验证服务运行情况:

1
2
3
curl -X POST http://127.0.0.1:8000/ -H "Content-Type: application/json" -d '"Hello world!"'

##Bonjour monde!

调试

Ray是一个多进程,Python和C++混合调用的程序(以Python语言为例),调试上需要掌握一定的技巧。调试Python,Driver,以及自动拉起的gcs_server,raylet以及worker,actor的方法都不同。下面以VsCode为例。

调试python

Python调试与普通程序调试相同,直接点debug python文件,或者配置launch.json即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: pi.py",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/pi.py",
"console": "integratedTerminal",
"justMyCode": true,
"args": [],
"cwd": "${workspaceFolder}"
}
]
}

调试Driver

Driver就是用户python程序,调试Driver的Python部分参考上一节,如果调试Driver的C++部分,需要调试python进程,前提是Ray是debug编译的,否则没有符号表无法调试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"version": "0.2.0",
"configurations": [
{
"name": "Ray C++",
"type": "cppdbg",
"request": "launch",
"program": "/home/hua/miniconda3/envs/myenv/bin/python3.9",
"args": [
"${workspaceFolder}/pi.py"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"setupCommands": [
{
"description": "为 gdb 启用整齐打印",
"text": "-enable-pretty-printing",
"ignoreFailures": true
},
{
"description": "将反汇编风格设置为 Intel",
"text": "-gdb-set disassembly-flavor intel",
"ignoreFailures": true
}
]
}
]
}

调试Worker或者其他进程

gcs_server,raylet,worker以及actor都是自动拉起的进程,调试的时候需要attach到这些进程上进行调试。

注意,需要接触gdb attach的限制,永久接触方法如下:

sudo vi /etc/sysctl.d/10-ptrace.conf

kernel.yama.ptrace_scope = 0

sudo sysctl --system

调试上述pi.py,在一个worker的情况下大概需要8G内存,否则会导致Ray kill掉worker或者gdb异常退出。在调试worker过程中,为了方便,可以限制仅启动一个worker,在本地拉起的情况下,配置ray.init(num_cpus=1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"version": "0.2.0",
"configurations": [
{
"name": "Attach to worker",
"type": "cppdbg",
"request": "attach",
"processId": "${command:pickProcess}",
"program": "/home/hua/miniconda3/envs/myenv/bin/python3.9",
"sourceFileMap": {
"/proc/self/cwd": "/home/hua/code/ray"
},
"setupCommands": [
{
"description": "为 gdb 启用整齐打印",
"text": "-enable-pretty-printing",
"ignoreFailures": true
},
{
"description": "将反汇编风格设置为 Intel",
"text": "-gdb-set disassembly-flavor intel",
"ignoreFailures": true
}
]
}
]
}

worker和actor是python进程,gcs_server和reylet是非python进程,二进制在 ray/python/ray/core/src/ray/下。

gRPC流程

gRPC是什么

简单来说,RPC框架就是像调用本地函数一样调用远程函数。gRPC使用protobuf来定义服务和传输的对象,在客户端中,有一个存根(Stub),与服务有相同的函数签名,通过调用这个存根,即可完成一次RPC调用。

gRPC原理

Ray是基于gRPC构建的分布式计算系统,有关gRPC的代码存放在 ray/src/ray/rpc目录下,下面,我们通过worker进程的gRPC服务来分析。

gRPC client

涉及到gRPC client的几个文件:grpc_client.h, client_call.h

gRPC client类图

ClientCall是对RPC调用的一个封装,主要包括需要调用的stub函数指针,以及相关的状态和结果获取,当gRPC调用返回时,需要回调ClientCall中注册的回调函数。ClientCallManager是对gRPC调用发起的管理,包括结果队列,监听线程等, GrpcClient保存的是gRPC的连接句柄,可以通过该对象发起一个gRPC请求。

对于CoreWorker来说,在此之上还有一层封装(worker/core_worker_client.h, worker/core_worker_client_pool.h/cc),CoreWorkerClient,该类封装了CoreWorkerService可用的所用调用,直接调用提供的函数接口即可完成RPC调用。与之匹配的还有一个CoreWorkerClientPool,用于CoreWorkerClient的缓存。

CoreWorkerClient 类图

CoreWorkerClientPool维护一个map<WorkerId, CoreWorkerClient>,当已经存在对应的CoreWorkerClient时直接取出使用。如果不存在,则调用CoreWorkerClientFactoryFn工厂方法创建一个gRPC的client连接。该工厂方法在CoreWorker的构造函数中定义,通过一个rpc::Address创建对应的CoreWorkerClient对象。

每个CoreWorkerClient对象构造过程中,会创建gRPC连接,并且通过ClientCallManager来发起RPC请求,并通过监听CompletionQueue来响应RPC的处理结果。

gRPC server

涉及到gRPC server的几个文件:grpc_server.h/cc, client_server.h/cc

gPRC Server类图

GrpcServer是gRPC的服务端,其中定义了初始化,关闭,注册服务,运行等操作。它会根据其中注册的Service来向gRPC服务中注册服务和对应的处理方法。GrpcService是一个虚拟类,其本身没有实现,需要不同的组件来继承实现,例如,CoreWorker就会用CoreWorkerGrpcService来实现一个Worker对应的Service。Service中需要提供一组ServiceCallFactory,这些Factory记录了服务,gRPC的stub,回调函数,本地异步IO组件等信息,供GrpcServer来注册对应的服务。ServiceCall即服务端服务的本身,包括一系列回调函数处理对应的事件,这个call对象会以Tag的方式放入gRPC请求中,处理时取出call对象对相应的处理。

对于CoreWorker来说,需要基于GrpcService实现GrpcCoreWorkerGrpcServicework/core_worker_server.h)。实际上的工作就是将CoreWorkerService中的服务全部注册到ServiceCallFactory中。

CoreWorkerService 类图

CoreWorkerServiceHandler是一组Handle方法的集合,包含CoreWorkerService中的所有服务的处理方法,CoreWorkerGrpcService中会通过一组宏来构造protobuf中的注册,响应等必要信息的对象集合(ServiceCallFactory)。

注册完成后,GrpcServer在运行之前,会将所有的事件和响应注册到队列中,这样,队列中进入事件时,就可以调用对应的处理函数进行处理。

本地异步调用

Ray使用了大量的异步处理,例如gRPC框架中的请求和响应,以及本地的异步处理框架。Ray的Worker等进程中,除了gRPC的异步框架之外,还有一个boost::asio::io_context框架,所有gRPC的响应并不是在pull_threads中处理,而是把事件转交给本地的异步处理框架,然后在该异步处理框架中处理。并且该框架中还内置了一个EventTracker,来记录所有时间的处理信息。

结果的处理交给本地异步处理来运行,猜测是为了加快gRPC队列中的数据消费。

Driver提交流程

以无状态任务为例,描述任务的提交流程。

Python部分

@ray.remote

@ray.remote装饰的函数会被ray分布式处理。该装饰器会将函数(或者对象,后续的描述均为函数的装饰)封装成RemoteFunction对象。该对象保存了被装饰函数的function对象,并且提供remote方法。

remote方法被调用时,会将python函数包装成PythonFunctionDescriptor,记录了module/function/class name,以及分配的uuid。随后使用pickle_dump将函数序列化,交给worker处理。worker会将序列化后的函数存储到gcs服务的function table中,并记录该函数的uuid,以便于通过函数描述找到函数体。

以上工作完成后,remote方法会调用worker的submit_task方法提交任务,该任务即可通过gRPC发送到集群中处理。submit_task返回一个object_ref。

1
2
3
4
5
6
7
8
invocation (/home/hua/code/ray/python/ray/remote_function.py:485)
_remote (/home/hua/code/ray/python/ray/remote_function.py:504)
_invocation_remote_span (/home/hua/code/ray/python/ray/util/tracing/tracing_helper.py:310)
auto_init_wrapper (/home/hua/code/ray/python/ray/_private/auto_init_hook.py:21)
_remote_proxy (/home/hua/code/ray/python/ray/remote_function.py:156)
<listcomp> (/home/hua/code/ray/pi.py:23)
calculate_pi (/home/hua/code/ray/pi.py:22)
<module> (/home/hua/code/ray/pi.py:39)

Driver部分

任务提交到本地

Driver的python代码调用submit_task后,会通过cython调用到C++ extention中。对应的函数是CoreWorker::SubmitTask,这里会将相关的任务信息打包成TaskSpec,然后提交到本地异步IO中。

1
2
3
4
_raylet.so!ray::core::CoreWorker::SubmitTask() (/home/hua/code/ray/src/ray/core_worker/core_worker.cc:2467)

cython ...
Python ...

解决依赖

从异步IO调度到该任务后(NormalTaskSubmitter::SubmitTask),会先等待依赖的资源处理结束,这里使用了回调的方式异步等待依赖的任务结束。

1
2
3
4
5
6
7
8
9
_raylet.so!ray::core::NormalTaskSubmitter::SubmitTask() (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:23)

_raylet.so!operator()(const struct {...} * const __closure) (/home/hua/code/ray/src/ray/core_worker/core_worker.cc:2469)

_raylet.so!EventTracker::RecordExecution() (/home/hua/code/ray/src/ray/common/event_stats.cc:113)

_raylet.so!std::_Function_handler<void(), instrumented_io_context::post() (/home/hua/code/ray/src/ray/common/asio/instrumented_io_context.cc:97)

从异步IO调度

请求资源

依赖的任务执行结束后,准备执行当前任务,但是对当前SchedulingKey来说目前没有空闲的Worker,需要先向reylet请求Worker,NormalTaskSubmitter::RequestNewWorkerIfNeeded

1
2
3
4
5
6
7
_raylet.so!ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded() (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:347)

_raylet.so!operator()(struct {...} * const __closure, ray::Status status) (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:80)

_raylet.so!ray::core::LocalDependencyResolver::ResolveDependencies() (/home/hua/code/ray/src/ray/core_worker/transport/dependency_resolver.cc:84)

异步回调

任务提交到集群

Worker资源异步请求会返回空闲Worker的Address,然后可以通过RPC将任务直接提交(PushTask)给这个Worker。

1
2
3
4
5
6
7
8
9
_raylet.so!ray::rpc::CoreWorkerClient::PushNormalTask() (/home/hua/code/ray/src/ray/rpc/worker/core_worker_client.h:399)

_raylet.so!ray::core::NormalTaskSubmitter::PushNormalTask() (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:561)

_raylet.so!ray::core::NormalTaskSubmitter::OnWorkerIdle() (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:181)

_raylet.so!operator()() (/home/hua/code/ray/src/ray/core_worker/transport/normal_task_submitter.cc:436)

RPC回调

Worker执行流程

调试技巧

  1. 首先启动Driver,在INVOKE_RPC_CALL执行之前打断点,阻塞任务提交到集群。
  2. attach到Worker进程上,并且在CoreWorker::HandlePushTask打断点,这里是处理RPC请求的入口。
  3. 让Driver继续执行,Worker就会命中断点,可以继续调试Worker。

gRPC将任务提交到本地

GrpcServer::PollEventsFromCompletionQueue会等待gRPC请求,当收到请求后,就会调用从Tag中取出ServerCall对象,该对象中保存着该请求的所有处理的必要信息。然后将该任务提交给异步IO。

1
2
3
4
5
_raylet.so!ray::rpc::ServerCallImpl<ray::rpc::CoreWorkerServiceHandler, ray::rpc::GetCoreWorkerStatsRequest, ray::rpc::GetCoreWorkerStatsReply, (ray::rpc::AuthType)0>::HandleRequest() (/home/hua/code/ray/bazel-out/aarch64-dbg/bin/_virtual_includes/grpc_common_lib/ray/rpc/server_call.h:237)

_raylet.so!ray::rpc::GrpcServer::PollEventsFromCompletionQueue() (/home/hua/code/ray/src/ray/rpc/grpc_server.cc:199)

gRPC pulling thread

调用gRPC注册的Handler方法

异步IO会回调注册的方法CoreWorker::HandlePushTask,配置定义send_reply_callback回调函数,最后将任务通过异步IO提交给task_execution_service(就是另外一个异步IO队列)。

执行函数

当执行调度到PushTask任务时,就会回调上一步配置的send_reply_callback回调,远程函数的执行就在这个回调中运行

1
2
3
4
5
6
7
8
9
10
_raylet.so!operator()(const struct {...} * const __closure, const ray::TaskSpecification & task_spec, ray::rpc::SendReplyCallback send_reply_callback) (/home/hua/code/ray/src/ray/core_worker/transport/task_receiver.cc:100)

_raylet.so!ray::core::InboundRequest::Accept(ray::core::InboundRequest * const this) (/home/hua/code/ray/src/ray/core_worker/transport/actor_scheduling_util.cc:36)

_raylet.so!ray::core::NormalSchedulingQueue::ScheduleRequests(ray::core::NormalSchedulingQueue * const this) (/home/hua/code/ray/src/ray/core_worker/transport/normal_scheduling_queue.cc:87)

_raylet.so!ray::core::TaskReceiver::RunNormalTasksFromQueue(ray::core::TaskReceiver * const this) (/home/hua/code/ray/src/ray/core_worker/transport/task_receiver.cc:294)

_raylet.so!operator()(const struct {...} * const __closure) (/home/hua/code/ray/src/ray/core_worker/core_worker.cc:3777)

回调中的task_handler_ 就是注册进去的CoreWorker::ExecuteTask,将这个对象封装成了一个lamda函数:

1
2
3
4
5
6
7
8
9
10
auto execute_task = std::bind(&CoreWorker::ExecuteTask,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5,
std::placeholders::_6,
std::placeholders::_7,
std::placeholders::_8);

最终调用到了options_.task_execution_callback,这个callback会根据语言的不同而不同,以Python为例,这个callback调用的是注册进来的一个Python方法,将Python的远程函数交还给Python解释器来执行。这部分代码在ray/python/ray/_raylet.pyx中

1
2
3
4
cdef void execute_task: return function(actor, *arguments, **kwarguments)
cdef execute_task_with_cancellation_handler: execute_task
cdef CRayStatus task_execution_handler: execute_task_with_cancellation_handler
CoreWorker::__cinit__: options.task_execution_callback = task_execution_handler

OpenCV (Open Source Computer Vision Library) 是一个开源计算机视觉和机器学习软件库,由Intel在1999年发布。OpenCV提供了丰富的图像和视频处理功能,广泛应用于各种计算机视觉任务,如面部识别、物体检测、运动跟踪、图像增强等。它支持多种编程语言(如C++、Python、Java等)和操作系统(如Windows、Linux、macOS等),并且可以与其他深度学习框架(如TensorFlow、PyTorch)无缝集成。

OpenCV-contrib 是一个附加模块的集合,为OpenCV核心库提供扩展功能。由于OpenCV核心库为了保持稳定性,通常只包含相对成熟和通用的模块,而OpenCV-contrib则提供了实验性和前沿的功能。这些模块包括一些新的特征检测算法、图像处理技术、深度学习工具、3D重建等。OpenCV-contrib项目中的模块也可能包含特定领域的工具包,例如面部识别、人脸标志检测、目标跟踪等。

在计算机视觉和深度学习任务中,硬件加速器(如GPU、TPU、NPU等)被广泛用于加速计算。OpenCV的性能也可以通过这些硬件加速器来提高,这通常通过使用专用的后端硬件加速库来实现。

1. 需求分析

华为Ascend系列AI处理器,凭借其强大的计算能力和高能效,在各种AI应用场景中得到了广泛应用。为了充分发挥Ascend硬件的优势,基于Ascend的OpenCV后端加速项目旨在利用Ascend的计算能力,加速OpenCV库中的部分核心算法。

1.1 目标

开发一个基于华为Ascend AI处理器的OpenCV硬件加速后端,优化并加速OpenCV中的特定算法。该后端将通过集成Ascend Computing Library (ACL) 来实现对OpenCV算法的硬件加速,从而提升计算性能,减少延迟,并提高能源效率。

1.2 需求概述

支持的算法类型

  • 识别出OpenCV中最常用的传统图像算法,并优先为这些算法实现Ascend后端加速支持。
  • 常见的候选算法包括算数运算,图像变换,色域转换等等。
  • 根据Ascend硬件特性,可能需要修改或重新设计部分算法,以适应硬件架构,进一步优化性能。

实现Ascend Runtime

  • 实现Ascend设备控制、Device-Host内存复制、流与事件管理等方面的功能。

异步算子支持

  • 接口调用使用异步任务提交,实现异步计算结果获取,提高设备利用率,系统的响应速度与吞吐量。
  • 支持ACL算子调用能力,以及AscendC自定义算子。
  • 支持OpenCV矩阵结构向ACL矩阵结构转换能力。

兼容性与集成

  • 确保该后端能够无缝集成到现有的OpenCV框架中,用户无需进行大量修改即可使用加速功能。
  • 保证与OpenCV其他后端的兼容性,用户可以根据具体硬件环境选择最优的加速方案。

性能评估与优化

  • 针对不同算法,设计性能评估测试用例,并基于测试结果持续优化算法与后端实现。
  • 与基线(CPU)性能进行对比,明确性能优势与改进方向。

文档与用户指南

  • 编写详细的开发文档与用户指南,帮助开发者理解如何使用该后端进行算法加速。
  • 提供API描述、使用示例、常见问题解答、性能优化建议等。

2. 架构设计

2.1 OpenCV项目架构

OpenCV项目架构

OpenCV内部模块较多,如上图所示,这是一个简化的OpenCV整体架构。按模块类型可以分为以下几类(含内部模块举例):

核心模块 (Core Modules)

  • 核心模块 (Core) 基础数据结构和算法(如矩阵运算、线性代数)。
  • 图像处理 (Imgproc) 图像滤波、形态学变换、边缘检测等。
  • 视频处理 (Video) 视频捕捉、帧处理、运动检测等。
  • 相机校正 (Calib3d) 相机标定、3D重建、立体匹配等。
  • 特征检测 (Features2d) 特征点检测与描述子计算(如SIFT、ORB)。

算法库 (Algorithm Libraries)

  • 机器学习 (ML)

    支持分类、回归、聚类、神经网络等机器学习算法。

  • 对象检测 (Objdetect)

    人脸检测、目标跟踪等高级检测算法。

  • 图像分割 (Imgsegm)

    超像素分割、图像聚类等。

硬件加速 (Hardware Acceleration)

  • CUDA 支持

    基于NVIDIA GPU的CUDA加速模块。

  • OpenCL 支持

    基于OpenCL的跨平台硬件加速支持。

  • Vulkan 支持

    基于Vulkan的图像处理加速。

第三方集成 (Third-Party Integrations)

  • Python 接口

    提供对Python的API绑定。

  • Java 接口

    提供对Java的API绑定。

  • Android/IOS 支持

    移动设备上的OpenCV应用开发支持。

应用层 (Application Layer)

  • 图像和视频处理应用
  • 增强现实 (AR)
  • 机器人视觉
  • 自动驾驶

2.2 OpenCV CANN硬件加速模块架构

OpenCV的昇腾原生支持将在硬件加速(Hardware Acceleration)中添加对Ascend NPU的支持。

OpenCV昇腾支持示意图

针对该项目的需求,需要实现以下模块:

Ascend Runtime

  • 设备控制:负责管理与Ascend NPU硬件的通信和控制。

  • 设备-主机内存复制:处理数据在Ascend NPU设备和主机之间的内存传输。

  • 流管理:管理计算任务流的调度和执行。

  • 事件管理:处理计算过程中的事件和同步问题。

AscendC内核

  • AscendC构建框架:提供内核构建的工具和框架。

  • 内核实现:实现具体的计算内核,提供加速计算功能。

  • 内核调用管理:管理内核的调用过程和参数。

  • 内核结果获取:从内核执行中获取计算结果。

ACL算子

  • OpenCV到ACL结构转换:将OpenCV数据结构转换为ACL支持的格式。

  • 算子编译和调用:编译并调用ACL算子来执行计算任务。

  • 异步结果获取:支持异步获取ACL算子的计算结果。

CANN模块

  • cann_module:作为核心模块,定义各类数据结构,Allocator,Ascend Runtime接口封装等。

  • element_operator:处理基本的元素级操作,例如加法、乘法等。

  • core:提供核心图像变换功能,例如,merge,flip等。

  • cvtcolor:专门处理颜色空间转换操作,如RGB到灰度转换等。

接口和绑定

  • C++接口:为C++应用程序提供接口,允许直接调用CANN模块的功能。

  • Python绑定:为Python应用程序提供接口,方便用户在Python环境中使用CANN模块。

其他

  • 错误处理:负责管理和记录各类级别日志,方便问题排查定位。
  • 功能测试:确保模块各项功能的正确性和稳定性。
  • 性能测试:对模块进行性能测试,验证加速效果和计算效率。
  • 样例:提供示例代码,帮助用户了解如何使用CANN模块。
  • 教程:提供详细的用户指南和教程,帮助用户快速上手和使用CANN模块。

CANN各个模块的依赖和调用关系如下图所示:

OpenCV算法调用时序图

3. Cann_Module

CANN模块中定义了OpenCV中的关键结构体,AscendMat,AscendStream和AscendEvent。其中AscendMat结构与Mat结构类似,需要有与InputArray(各类Mat的通用结构)相互转换的能力。与其他后端的Mat,Stream和Event类型类似,需要实现以下接口。AscendMat中存储着矩阵的shape和数据,并且有Device-Host内存拷贝能力,以及类型转换等能力。

Cann Module类图

3.1 类和组件

AscendMat 类

AscendMat 是一个封装了 Ascend 设备内存的矩阵类,类似于 OpenCV 的 Mat 类,但专为 Ascend 硬件设计。它支持各种矩阵操作,并通过内部的 Allocator 进行内存管理。

主要属性:

  • Allocator* allocator:用于内存分配的分配器。

  • int flags:包括魔术签名、连续性标志、深度和通道数等信息的位字段。

  • int rows, cols:矩阵的行数和列数。

  • size_t step:每行的字节数。

  • std::shared_ptr data:指向矩阵数据的智能指针。

  • uchar* datastart, const uchar* dataend:辅助字段用于 ROI 定位和调整。

主要方法:

  • 构造函数和拷贝构造函数,用于初始化和复制矩阵。

  • setTo:设置矩阵中的所有元素。

  • create:分配新的矩阵数据。

  • upload 和 download:将数据上传到设备或从设备下载。

  • convertTo:将矩阵转换为其他数据类型。

  • isContinuous, elemSize, size 等方法用于获取矩阵的属性和信息。

  • defaultAllocator():获取默认分配器。

  • setDefaultAllocator(Allocator* allocator):设置默认分配器。

Allocator 类

Allocator 是 AscendMat 的内部类,用于处理内存分配。

主要方法:

  • allocate(size_t size):分配指定大小的内存。

  • allocate(AscendMat* mat, int rows, int cols, size_t elemSize):为矩阵分配内存并初始化相关字段。

DefaultAllocator 类

DefaultAllocator 继承自 Allocator,实现了具体的内存分配和释放方法。

主要方法:

  • allocate(size_t size):使用 aclrtMalloc 分配内存。

  • allocate(AscendMat* mat, int rows, int cols, size_t elemSize):为 AscendMat 分配内存并设置步幅。

AscendStream 类

AscendStream 管理 Ascend 设备上的任务流,支持任务的异步执行和同步。

主要方法:

  • waitForCompletion():阻塞当前线程直到流中的所有操作完成。

  • waitAscendEvent(const AscendEvent& event):阻塞当前线程直到事件触发。

  • Null():返回默认的空流对象。

  • addTensorHolder(const std::shared_ptr& holder):向流中添加张量持有者。

AscendEvent 类

AscendEvent 用于流之间的同步。

主要方法:

  • record(AscendStream& stream):记录事件。
  • waitForComplete():等待事件完成。

4. ACL封装

AscendTensor结构与AscendMat相对应,用于将AscendMat转换成Ascend亲和的格式。其中矩阵数据使用智能指针,用户任务异步执行。

OperatorRunner是算子执行的类,用于设置算子执行所需的算子名称,属性,输入和输出矩阵,该类的成员函数均返回自身指针,方便设置多个属性。

除此之外,ACL封装也为了隔离ACL相关的库,整个工程中仅在此文件中会依赖ACL相关符号,避免OpenCV和ACL库的过度耦合,除了下述类图之外,其他的设备管理均做了封装,例如初始化,去初始化,内存管理和拷贝等。

ACL 模块类图

ACL算子允许异步提交,来提高硬件利用率,提高数据处理吞吐。所以当某个算子任务提交后,无法直接判断其执行进度,所以需要对矩阵数据进行保存,避免计算完成前数据被释放。

算子提交后,其智能指针会保存到AscendStream的tensorHolder中,即使超出某个AscendTensor的生命周期,该部分数据仍会保存,直到Stream Sync后,这些tensor才会真正释放。

为了避免内存浪费,可以在AscendStream中插入AscendEvent,通过判断Event是否完成,来判断有那些tensor已经计算完成,可以尽快释放。

为了进一步提高内存的分配效率,后续可以添加内存池,由应用来进行Device上的内存管理,避免频繁调用Device的内存申请释放。

5. 支持的算法

5.1 算数计算

本节详细描述针对 Ascend 硬件加速器的 OpenCV 算术操作实现。包括各种算术操作的计算公式及其在代码中的实现细节。

加法(Add)

  • 描述:计算两个输入图像对应像素值的和。支持输入图像的类型为 AscendMat 和 Scalar。

  • 计算公式dst(i,j)=alpha×src1(i,j)+beta×src2(i,j)

  • 说明:alpha 和 beta 参数允许对输入图像进行加权和调整。

减法 (subtract)

  • 描述:计算两个输入图像对应像素值的差。支持输入图像的类型为 AscendMat 和 Scalar。

  • 计算公式dst(i,j)=src1(i,j)src2(i,j)

乘法 (multiply)

  • 描述:计算两个输入图像对应像素值的乘积。支持输入图像的类型为 AscendMat 和 Scalar,还支持缩放因子的应用。

  • 计算公式dst(i,j)=scale×(src1(i,j)×src2(i,j))

  • 说明:scale 参数允许对结果进行缩放,以调整输出图像的亮度或对比度。

除法 (divide)

  • 描述:计算两个输入图像对应像素值的商。支持输入图像的类型为 AscendMat 和 Scalar,还支持缩放因子的应用。

  • 计算公式dst(i,j)=scale×src1(i,j)src2(i,j)

  • 说明:scale 参数用于调整除法结果的缩放。

按位与 (bitwise_and)

  • 描述:计算两个输入图像对应像素值的按位与操作。支持输入图像的类型为 AscendMat 和 Scalar。

  • 计算公式dst(i,j)=src1(i,j)&src2(i,j)

  • 说明:用于图像的遮罩和掩盖操作。

按位或 (bitwise_or)

  • 描述:计算两个输入图像对应像素值的按位或操作。支持输入图像的类型为 AscendMat 和 Scalar。

  • 计算公式dst(i,j)=src1(i,j)src2(i,j)

  • 说明:用于图像合成和区域提取。

按位异或 (bitwise_xor)

  • 描述:计算两个输入图像对应像素值的按位异或操作。支持输入图像的类型为 AscendMat 和 Scalar。

  • 计算公式dst(i,j)=src1(i,j)src2(i,j)

  • 说明:用于图像的特殊编码和数据加密。

按位取反 (bitwise_not)

  • 描述:计算图像像素值的按位取反操作。支持输入图像的类型为 AscendMat。

  • 计算公式dst(i,j)=∼src(i,j)

  • 说明:用于反转图像中的每个像素值。

加权和 (addWeighted)

  • 描述:计算两个输入图像的加权和。支持输入图像的类型为 AscendMat 和 Scalar,并且可以指定加权系数和加法常数。

  • 计算公式dst(i,j)=alpha×src1(i,j)+beta×src2(i,j)+gamma

  • 说明:alpha 和 beta 用于加权输入图像,gamma 用于加法常数。

Threshold

  • 描述:对输入图像中的像素值进行阈值操作,根据指定的阈值和类型,将像素值调整为新的值。

  • 计算公式Binary:dst(i,j)={maxValif src(i,j)>thresh0otherwiseBinaryInverted:dst(i,j)={0if src(i,j)>threshmaxValotherwiseTruncate:dst(i,j)={threshif src(i,j)>threshsrc(i,j)otherwiseToZero:dst(i,j)={src(i,j)if src(i,j)>thresh0otherwiseToZeroInverted:dst(i,j)={0if src(i,j)>threshsrc(i,j)otherwise

  • 说明:Threshold 操作广泛用于图像分割和预处理阶段。不同的阈值类型允许对图像中的不同区域进行区分和处理。

上述算法接口较为类似,为了避免重复代码,需要将此类函数调用使用模板的方式进行抽象。

接口分为外部接口和内部接口,外部接口是对内部接口的封装,避免代码重复和额外的数据类型转换。

5.2 图像核心算法

这段代码是 OpenCV 项目中的一部分,专门为 Ascend 硬件加速器提供了图像处理操作的实现。这些操作包括数据格式转换、图像合并与分割、转置、翻转、旋转、裁剪和缩放。以下是对每个操作的简要介绍:

数据转换 (transData)

  • 描述:将输入数据从一种格式转换为另一种格式,例如从 NCHW 转换为 NHWC。

  • 说明:此函数用于将输入图像或矩阵的存储格式在不同的维度顺序之间进行转换,以适应不同的深度学习模型或计算需求。

图像合并 (merge)

  • 描述:将多个输入矩阵按通道维度合并为一个矩阵。输入矩阵的数量和类型必须相同。

  • 计算公式

dst(x,y)=[B(x,y) G(x,y) R(x,y)]

  • 说明:此函数在图像处理和深度学习中,用于将多通道图像合并为单一矩阵,以适应后续的处理或模型输入要求。

图像分割 (split)

  • 描述:将一个多通道矩阵按通道维度分割为多个单通道矩阵。
  • 计算公式

B(x,y)=dst(x,y)[0]G(x,y)=dst(x,y)[1]R(x,y)=dst(x,y)[2]

  • 说明:此函数用于将多通道图像分割成独立的单通道图像,通常用于图像分析和预处理。

转置 (transpose)

  • 描述:对输入矩阵执行转置操作,交换指定的维度。
  • 计算公式

dst(i,j)=src(j,i)

  • 说明:转置操作通常用于调整矩阵的维度顺序,以适应特定的算法或网络层要求。

翻转 (flip)

  • 描述:根据指定的轴对图像进行翻转操作。
  • 计算公式

dst(x,y)=src(x,H1y)dst(x,y)=src(H1x,y)

  • 说明:图像翻转常用于数据增强,帮助模型学习不同的视角和方向。

旋转 (rotate)

  • 描述:根据指定的模式对图像进行旋转操作,支持 90 度顺时针、180 度和 90 度逆时针旋转。
  • 计算公式

90dst(x,y)=src(Hy1,x)180dst(x,y)=src(Hx1,Wy1)90dst(x,y)=src(y,Wx1)

  • 说明:旋转操作在图像处理和数据增强中常用,用于产生不同角度的视图。

裁剪 (crop)

  • 描述:从输入图像中裁剪指定矩形区域。
  • 计算公式

dst(x,y)=src(x+xoffset,y+yoffset)xoffsetyoffset

  • 说明:裁剪操作用于提取图像中的特定区域,以进行更细致的分析或处理。

调整大小 (resize)

  • 描述:将输入图像缩放到指定的大小,支持不同的插值方法,如双三次插值和区域插值。
  • 计算公式

线(Bilinear):dst(x,y)=(y2y)[(x2x)src(x1,y1)+(xx1)src(x2,y1)]+(yy1)[(x2x)src(x1,y2)+(xx1)src(x2,y2)](Cubic):dst(x,y)=m=12n=12w(m)w(n)src(x+m,y+n)(Area):dst(x,y)=1area(x,y)regionsrc(x,y)

  • 说明:调整大小操作在图像预处理阶段非常重要,用于将图像缩放到模型要求的输入尺寸。

裁剪+调整大小 (crop_resize)

  • 描述:从一张大图中扣出一张或多张子图,并缩放到指定尺寸。

  • 计算公式为crop和resize算子依次执行,此处不做赘述。

  • 说明

    • 若crop的宽高与resize之后的宽高一致,则不进行缩放;resize宽高必须与输出宽高一致。

    • 输入图片分辨率在[10×6, 4096×4096]范围内,支持图片格式、宽高对齐、内存约束处说明的输入图片格式。

原地边框填充 (copyMakeBorder)

  • 描述:该函数在使用指定的外推边界模式时,计算并返回与指定外推像素相对应的供体像素的坐标。

  • 计算公式常数填充:src(x,y)=value,如果 (x,y)是边界元素复制边缘:src(x,y)=src(x,y),其中 (x,y)为最近的图像内的像素坐标

  • 说明

    • scalar_value仅在填充类型为HI_BORDER_CONSTANT的时候有效,指定填充的像素值。
    • 输入图片分辨率在[10×6, 4096×4096]范围内,支持图片格式、宽高对齐、内存约束处说明的输入图片格式。

裁剪+调整大小+边框填充 (cropResizeMakeBorder)

  • 描述:按指定区域从一张输入图片中抠出一个或多个子图,对子图缩放后,再将每个子图按指定类型填充,作为一张或多张目标图片输出,主要用于等比例缩放场景。

  • 计算公式为crop、resize和copyMakeBorder算子依次执行,此处不做赘述。

  • 说明

    • 若crop的宽高与resize之后的宽高一致,则不进行缩放。
    • scalar_value仅在填充类型为HI_BORDER_CONSTANT的时候有效,指定填充的像素值。
    • 输入图片分辨率在[10×6, 4096×4096]范围内,支持图片格式、宽高对齐、内存约束处说明的输入图片格式。

接口分为外部接口和内部接口,外部接口是对内部接口的封装,避免代码重复和额外的数据类型转换。

5.3 色域转换

本节是用于在华为Ascend硬件上加速OpenCV图像处理操作的。以下是对主要函数的解释:

cvtBGRtoBGR

  • 描述: 将 BGR 图像转换为指定的通道数(DCN)并根据选项交换蓝色通道的位置。
  • 说明 将 BGR 图像的三个通道分离,并根据是否需要交换蓝色通道进行处理。然后将处理后的通道合并为指定的通道数(3 或 4)。

cvtBGRtoGray

  • 描述:将 BGR 图像转换为灰度图像。
  • 计算公式

Gray=0.299×Red+0.587×Green+0.114×Blue

  • 说明:将 BGR 图像转换为灰度图像,通过应用加权系数来计算每个像素的灰度值。

cvtGraytoBGR

  • 描述: 将灰度图像转换为 BGR 图像。
  • 计算公式

BGR=[Gray Gray Gray]

  • 说明:将灰度图像的单通道复制为三个通道(或四个通道),然后合并为 BGR(或 BGRA)图像。

cvtBGRtoXYZ

  • 描述:将 BGR 图像转换为 XYZ 颜色空间。
  • 计算公式
$$ [XYZ] [0.4124530.3575800.1804230.2126710.7151600.0721690.0193340.1191930.950227][BlueGreenRed]

$$

说明:将 BGR 图像通过矩阵乘法转换为 XYZ 颜色空间。

cvtXYZtoBGR

  • 描述:将 XYZ 颜色空间图像转换为 BGR 图像。
  • 计算公式
$$ [BlueGreenRed] [3.2404791.537150.4985350.9692561.8759910.0415560.0556480.2040431.057311][XYZ]

$$

  • 说明:将 XYZ 图像通过矩阵乘法转换为 BGR 图像,并根据需要添加 alpha 通道。

cvtBGRtoYCrCb

  • 描述:将 BGR 图像转换为 YCrCb 颜色空间。
  • 计算公式

Y=0.299×Red+0.587×Green+0.114×BlueCr=0.713×(RedY)+128Cb=0.564×(BlueY)+128

  • 说明:将 BGR 图像转换为 YCrCb 颜色空间,并根据需要调整通道顺序。

cvtYCrCbtoBGR

  • 描述:将 YCrCb 颜色空间图像转换为 BGR 图像。
  • 计算公式

Red=Y+1.402×(Cr128)Blue=Y+1.772×(Cb128)Green=Y0.344136×(Cr128)0.714136×(Cb128)

  • 说明:将 YCrCb 图像转换为 BGR 图像,通过矩阵计算恢复到 BGR 颜色空间,并根据需要添加 alpha 通道。

RGB月YUV相互转换的计算方法与YCrCb计算方法类似,以及RGB,BGR, RGBA,BGRA与其他色域转换实现类似,使用不用参数的方式复用上述色域转换代码。

6. Python绑定

OpenCV的python绑定通过一个Python脚本对C++的接口函数解析实现,重载的函数会在python绑定中生成多个绑定,根据参数类型试探的方式选择正确的重载函数。

OpenCV对输入和输出有一个通用结构,分别是InputArray和OutputArray,这两个结构会在python绑定中自动与Mat(UMat,GpuMat)进行转换,在python调用中,这些算法接口可以传入numpy结构或者任意一种Mat结构。由于后端加速器包在OpenCV-Contrib中,为了避免修改OpenCV主仓库,所以InputArray和OutputArray无法识别到AscendMat,也就无法做自动转换。

为了提供一致的使用体验,在提供CANN模块接口时,需要提供InputArray/OutputArray的接口,也需要提供AscendMat的接口,在CANN模块内部做InputArray/OutputArray与AscendMat的类型转换。

该接口既提供C++接口,也提供python绑定接口。该接口是使用CANN模块的入口,根据OpenCV的项目要求,需要提供详细的Doxygen描述,以生成标准的doc手册。

7. AscendC支持

AscendC是自定义算子的编程语言,为了提高算子的执行效率,最好的方式使用AscendC来编写合适的融合算子。在OpenCV昇腾支持中,AscendC支持是一个实验性质的特性。

整体AscendC应该当做昇腾支持的一个子模块,于OpenCV一同链接到二进制中。需要实现AscendC编译链接框架,并且实现一个简单的算子(Threshold),当做自定义算子的样例。

1
2
3
opencv
└── opencv_cann.so
└── ascendc_kernels.so

8. DVPP支持

DVPP是昇腾AI处理器内置的图像处理单元,专门用于图像和视频的处理和加速,可以通过AscendCL的媒体数据处理接口进行调用,提供了强大的媒体处理硬加速能力。在OpenCV昇腾支持中,DVPP支持主要提供高性能算子特性。

DVPP支持应作为昇腾支持的子模块,由于其数据对齐及读取格式等约束不同于执行于AI core和AI cpu的Aclop算子,需为其单独设计运行时管理、内存管理及数据处理流程的类及方法,初步设计如下:

DVPP类图

其中,DvppOperatorDesc管理DVPP初始化、重置,及创建通道、输入、输出、获取结果等算子运行流程管理,WrapperFunctions完成具体的参数配置、算子调用等。

9. 测试

9.1 功能测试

功能测试使用OpenCV的功能测试框架,验证所有的接口以及不同入参组合,将Ascend执行结果与CPU结果做比对,要求误差在允许范围内。

9.1.1 AscendMat验证

AscendMat 的构造函数测试 测试目的:

  • 验证 AscendMat 的默认构造函数和自定义构造函数的正确性。
  • 验证 AscendMat 构造时传递的大小、类型、值是否正确分配。

测试步骤:

  1. 使用默认构造函数创建 AscendMat 对象,并检查默认分配器是否正确。
  2. 设置和获取自定义分配器,确保分配器设置和获取功能正常。
  3. 创建指定大小和类型的 AscendMat 对象,并检查行数、列数、深度和通道数是否与预期一致。
  4. 使用特定值填充 AscendMat 对象,并检查填充值是否正确。
  5. 从主机内存创建 AscendMat 对象,并检查数据的正确性。

预期结果:

  • 默认构造函数应使用默认分配器。
  • 自定义分配器应成功设置和获取。
  • 创建的 AscendMat 对象的大小、类型和填充值应与预期匹配。
  • 从主机内存构造的 AscendMat 数据应与输入数据一致。

AscendMat 的赋值操作测试 测试目的:

  • 验证 AscendMat 对象间的赋值操作是否能正确复制数据和元数据。

测试步骤:

  1. 创建两个 AscendMat 对象,一个使用自定义分配器分配内存。
  2. 将一个 AscendMat 对象赋值给另一个,并检查行数、列数、深度、通道数及数据指针是否一致。

预期结果:

  • 赋值操作应正确复制行数、列数、深度、通道数及数据指针。

**

AscendMat 的 setTo 方法测试 测试目的:

  • 验证 AscendMat 的 setTo 方法能否正确将矩阵的所有元素设置为指定值。

测试步骤:

  1. 创建一个 AscendMat 对象,并使用 setTo 方法将其所有元素设置为随机生成的标量值。
  2. 下载数据到主机并与手动创建的矩阵数据进行比较。

预期结果:

  • AscendMat 对象应成功设置所有元素,并且与主机上的期望数据一致。

AscendMat 的 convertTo 方法测试 测试目的:

  • 验证 AscendMat 的 convertTo 方法能否正确转换矩阵数据类型。

测试步骤:

  1. 创建一个 AscendMat 对象,并使用随机生成的标量值初始化。
  2. 使用 convertTo 方法将矩阵数据类型转换为另一个类型。
  3. 下载数据到主机并与手动创建的转换后矩阵进行比较。

预期结果:

  • AscendMat 对象应成功转换数据类型,且转换后的数据与主机上的期望数据一致。

合并测试(MERGE) 测试目的:

  • 验证merge函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建三个单通道的矩阵m1、m2和m3。
  2. 使用merge函数在CPU上合并这三个矩阵为一个多通道矩阵。
  3. 在Ascend后端上分别通过数组和向量的方式合并矩阵,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的合并结果应与CPU的结果一致。

9.1.2 算术算法验证

算术算法需要验证一下几种类型:

基础矩阵运算操作:

  • 测试包括矩阵加法、减法、乘法、除法、按位与、按位或、按位异或等操作。
  • 验证以上操作在 Ascend 设备与 CPU 上的结果一致性。

带掩码的矩阵运算:

  • 验证矩阵运算操作在引入掩码后的正确性,包括掩码的生成与使用。

带缩放参数的矩阵运算:

  • 验证矩阵乘法和除法操作在引入缩放参数后的正确性。

流管理:

  • 测试 Ascend Stream 的创建、运算调度与结果同步功能,确保异步处理与同步处理的结果一致。

测试方法如下:

测试目的:

  • 验证加法、减法、乘法、除法、按位与、按位或、按位异或,加权和,阈值计算等操作方法计算结果是否正确。

测试步骤:

  1. 创建一个矩阵对象,并使用随机数据进行填充。
  2. 使用需要验证的算法进行计算,分别使用默认流,以及显式创建的流进行计算。
  3. 比较CPU和Ascend后端的结果,确保他们的精度一致。

预期结果:

  • Ascend后端的合并结果应与CPU的结果一致。

9.1.3 图像核心算法验证

拆分测试(SPLIT) 测试目的:

  • 验证split函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个多通道的矩阵m,并使用split函数在CPU上将其拆分为三个单通道矩阵。
  2. 使用Ascend后端进行同样的拆分操作,并将结果下载到主机端。
  3. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的拆分结果应与CPU的结果一致。

转置测试(TRANSPOSE) 测试目的:

  • 验证transpose函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat。
  2. 使用transpose函数在CPU上转置矩阵,并存储结果。
  3. 在Ascend后端上进行相同的转置操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的转置结果应与CPU的结果一致。

翻转测试(FLIP) 测试目的:

  • 验证flip函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat。
  2. 使用flip函数在CPU上分别以不同的翻转模式(水平、垂直、同时翻转)进行翻转,并存储结果。
  3. 在Ascend后端上执行相同的翻转操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的翻转结果应与CPU的结果一致。

旋转测试(ROTATE) 测试目的:

  • 验证rotate函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat。
  2. 使用rotate函数在CPU上以不同的旋转模式进行旋转,并存储结果。
  3. 在Ascend后端上进行相同的旋转操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的旋转结果应与CPU的结果一致。

裁剪测试(CROP) 测试目的:

  • 验证裁剪操作在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个矩阵cpuMat,并定义一个裁剪区域Rect b。
  2. 使用Mat对象在CPU上执行裁剪操作,并存储结果。
  3. 在Ascend后端上执行相同的裁剪操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的裁剪结果应与CPU的结果一致。

调整大小测试(RESIZE) 测试目的:

  • 验证resize函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat。
  2. 使用resize函数在CPU上对矩阵进行调整大小操作,使用不同的插值方法,并存储结果。
  3. 在Ascend后端上进行相同的调整大小操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的调整大小结果应与CPU的结果一致。

裁剪及调整大小测试(CROP_RESIZE) 测试目的:

  • 验证cropResize函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat,并定义裁剪区域Rect b,目的矩阵大小Size dsize。
  2. 依次使用crop和resize函数在CPU上对矩阵进行裁剪和调整大小操作,使用不同的插值方法,并存储结果。
  3. 在Ascend后端上调用cropResize函数执行相同操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的调整大小结果应与CPU的结果一致。

裁剪及调整大小测试(COPY_MAKE_BORDER) 测试目的:

  • 验证copyMakeborder函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat,并定义top、bottom、left、right四个方向的border宽度。
  2. 使用copyMakeBorder函数在CPU上对矩阵进行填充边框操作,使用不同的边框填充插值方法,并存储结果。
  3. 在Ascend后端上调用copyMakeborder函数执行相同操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的调整大小结果应与CPU的结果一致。

裁剪及调整大小测试(CROP_RESIZE_MAKE_BORDER) 测试目的:

  • 验证cropResizeMakeborder函数在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 创建一个随机矩阵cpuMat,并定义裁剪区域Rect b,目的矩阵大小Size dsize,top、bottom、left、right四个方向的border宽度。
  2. 依次使用crop、resize和copyMakeBorder函数在CPU上对矩阵进行裁剪、调整大小和填充边框操作,使用不同的边框填充插值方法,并存储结果。
  3. 在Ascend后端上调用cropResizeMakeborder函数执行相同操作,并将结果下载到主机端。
  4. 比较CPU和Ascend后端的结果,确保它们一致。

预期结果:

  • Ascend后端的调整大小结果应与CPU的结果一致。

9.1.4 色域转换算法验证

色域转换算法验证需要覆盖以下转换操作:

  • BGR到BGRA
  • BGRA到BGR
  • BGR到RGBA
  • RGBA到BGR
  • BGR到RGB
  • BGRA到RGBA
  • BGR到灰度图
  • RGB到灰度图
  • 灰度图到BGR
  • 灰度图到BGRA
  • BGRA到灰度图
  • RGBA到灰度图
  • RGB到XYZ
  • BGR到XYZ
  • XYZ到BGR
  • XYZ到RGB
  • BGR到YCrCb
  • RGB到YCrCb
  • YCrCb到BGR
  • YCrCb到RGB
  • BGR到YUV
  • RGB到YUV
  • YUV到BGR
  • YUV到RGB

测试目的:

  • 验证色域转换操作在Ascend后端的实现是否与CPU一致。

测试步骤:

  1. 生成随机图像矩阵。
  2. 执行颜色空间转换。
  3. 比较CPU和NPU计算结果,确认在允许误差范围内。

预期结果:

  • Ascend后端的计算结果应与CPU的结果一致。

9.1.5 其他说明

所有功能测试需要同步测试C++接口以及Python接口,保证二者可用性及准确性。

9.2 性能测试

性能测试使用OpenCV性能测试框架,使用不同图像大小,不同图像数据类型,将数据在Ascend NPU上执行多次,计算每次运行的平均时间。

初始化:

  1. 生成给定尺寸的测试矩阵。
  2. 构造随机输入矩阵,并完成算子预热。

Ascend NPU 测试:

  1. 设置 Ascend 设备 (cv::cann::setDevice),并上传矩阵至 AscendMat。
  2. 在 TEST_CYCLE() 内执行操作(如 merge、split 等)。
  3. 复位 Ascend 设备 (cv::cann::resetDevice)。

CPU 测试:

  • 使用 OpenCV 自带的方法执行相同操作。

验证:

  • 每个测试用例执行完后,通过 SANITY_CHECK_NOTHING() 检查无异常。

性能验证结果如下

算子/平均运算耗时*(ms) CPU GPU NPU 相对CPU性能提升 相对GPU性能提升
16 Intel(R) Xeon(R) Gold 6151 Nvidia V100 Ascend 310P GPU NPU NPU
merge 83.50 48.00 30.50 42.51% 63.47% 36.46%
split 142.00 59.00 81.50 58.45% 42.61% -38.14%
flip 221.25 77.75 1069.00 64.86% -383.16% -1274.92%
crop 49.25 61.50 75.75 -24.87% -53.81% -23.17%
transpose 446.00 84.50 277.50 81.05% 37.78% -228.40%
resize 136.50 188.75 229.75 -38.28% -68.32% -21.72%
threshold 172.75 174.00 251.50 -0.72% -45.59% -44.54%
rotate 584.50 76.50 1067.50 86.91% -82.63% -1295.42%
add 529.63 324.50 268.75 38.73% 49.26% 17.18%
addWeighted 553.88 326.38 422.88 41.07% 23.65% -29.57%
subtract 528.25 355.63 269.75 32.68% 48.94% 24.15%
multiply 534.63 354.25 265.88 33.74% 50.27% 24.95%
divide 542.50 355.63 266.75 34.45% 50.83% 24.99%
bitwise_add 529.25 355.13 266.00 32.90% 49.74% 25.10%
bitwise_or 529.50 354.63 266.00 33.03% 49.76% 24.99%
bitwise_xor 529.88 354.50 268.00 33.10% 49.42% 24.40%
bitwise_not 324.38 177.38 448.38 45.32% -38.23% -152.78%
cvtColor 57.31 66.96 118.76 -16.83% -107.21% -77.35%

9.3. CI

OpenCV社区没有昇腾测试设备,后续昇腾相关特性提交需要经过昇腾设备的CI验证。

  1. 向社区贡献昇腾机器用于CI验证。
  2. 使用PR label的方式,仅针对昇腾相关特性运行CI。
  3. 使用容器的方式运行CI,将Dockerfile合入OpenCV的基础设施仓库,并归档容器镜像。
  4. 配置github workflow,配置昇腾CI相关逻辑。

10. 教程

10.1 样例

为了方便用户使用该模块,需要提供调用样例,选择一个常见的图像预处理逻辑,实现C++版本以及Python版本的样例代码。

10.2 使用文档

按照社区的文档规范,编写模块使用指南,该使用指南将会构建到OpenCV doc中,作为教程供用户参考。

OpenCV昇腾开源使用手册

1. 项目背景

llama.cpp 是一个开源项目,旨在将大模型高效地部署在低资源环境中,例如个人电脑或移动设备。这个项目由 Georgi Gerganov 创建,目标是通过优化和精简,使得 LLaMA 模型能够在不依赖 GPU 的情况下高效运行。llama.cpp 支持多平台和多后端,且兼容大部分 Transformer 模型和部分 CLIP 模型,便于在各种环境中部署。其模块化设计包括模型分片、KV 缓存、推理引擎和输出处理,适合边缘计算、隐私保护和低成本推理场景,帮助用户在普通设备上实现大模型推理。

1.1 目标

开发基于昇腾的llama.cpp后端,实现昇腾runtime和核心算子。后端使用CANN和昇腾算子库的能力来加速大模型的推理。使得常见的模型能够在llama.cpp中使用昇腾推理,加速推理效率。

1.2 项目概述

昇腾后端和Runtime接入

在 llama.cpp 中,为Ascend加速器提供接口适配层,使 llama.cpp 的模型推理请求能通过接口层传递至 Ascend Runtime。

涉及:

  • 设备接入,支持多卡接入;
  • 内存管理和Tensor管理;
  • Stream,Event管理;

昇腾算子

为了支持大部分的模型推理,需要实现43个算子。这些算子可以利用aclnn的算子能力构建,如果aclnn的算子不足以满足llama.cpp的算子,

则:

  • 优先使用aclnn算子组合的方式实现功能;
  • 使用AscendC编写自定义算子。

对算子的需求,可用性大于性能,为了减少开发工作量并快速完成支持,不考虑acl op算子。能使用aclnn组合的算子,优先使用算子组合实现。

精度和性能

  • 实现的算子需要通过llama.cpp的精度对比测试,以及内存越界检查,确保实现的算子实现正确。

  • 910B算子性能需要超过Intel CPU水平(以Intel(R) Xeon(R) Gold 6348 CPU @ 2.60GHz为例)。

  • 910B模型推理(llama3 8B)性能延迟不高于100ms,吞吐率不低于300token/s。

多芯片支持

  • 首先支持910B系列芯片,包括主要的模型端到端推理,q4_0,q8_0量化格式;
  • 然后支持310P(910A)系列芯片,除了q4_0外(310P不支持4bit量化),其他功能应当与910B芯片能力持平;
  • 最后尝试支持310B系列芯片,310B的支持程度以aclnn和AscendC库的支持情况而定。

文档和用户指南

  • 用户指南,介绍文档结构和使用说明,帮助用户理解如何在 llama.cpp 中配置和使用 Ascend 后端;

  • 安装配置步骤,详细说明 Ascend 后端的安装流程,包括环境依赖、编译步骤及配置方法,以确保用户可以顺利完成安装;

  • 常见问题和解决方法,总结用户在使用 Ascend 后端时可能遇到的问题,并提供解决方案,如内存溢出、兼容性问题和性能调优建议等。

2. 设计思路

2.1 llama.cpp项目架构

llama.cpp架构图

llama.cpp的核心功能主要涉及以上几个部分:

模型管理

llama.cpp不仅支持llama,而且支持多种大语言模型和一些clip模型。llama.cpp使用模型管理模块来搭建模型结构,包括算子,量化等并且加载gguf模型的信息和模型权重。由于llama.cpp支持模型拆分的功能,以便于支持多卡推理和GPU/CPU混合推理,所以模型结构会进行合适的拆分,并且管理子图之间的数据拷贝。

kv-cache

kv-cache有助于加速attention的计算速度,将历史的kv信息做缓存。kv-cache会直接当做算子融合到模型中,kv-cache模块本身负责cache的管理,包括cache写入,更新和替换。

server和api接口

llama.cpp提供了一个简单的服务端,提供api接口。server支持并发推理。

推理引擎

llama.cpp对推理引擎进行了抽象,以便于支持不同的后端。推理引擎负责管理设备的内存,流,事件,多卡以及GPU/CPU数据拷贝。并且计算由模型管理模块构建的模型图。

2.2 昇腾后端接入方法

llama.cpp提供了一系列抽象接口来接入后端加速器:

  1. ggml_backend_cann_device_interface:用于描述设备接口的模块,定义了设备的基本功能。
  2. ggml_backend_cann_interface:用于管理后端通用接口的模块,包含常见的张量异步处理和图计算功能。
  3. ggml_backend_cann_buffer_type_host:负责分配主机缓冲区,确保与后端设备内存的接口兼容。
  4. ggml_cann_compute_forward:主计算模块,负责分派和执行各个算子操作。

ggml_cann_compute_forward 中,所有的算子都作为 case 分支进行注册,表示算子名称对应具体操作,例如 GGML_OP_ADDGGML_OP_MUL 等等。

昇腾接入需要实现llama.cpp的runtime接口,并且实现推理所必须的算子。

3. 实现原理

3.1 运行时

runtime提供了多个抽象接口,第一阶段主要目标是基本功能支持,所以仅需要支持必要的接口。其中split tensor功能和图推理功能暂时不实现。llama.cpp的后端接入主要是通过注册三组接口实现,分别是设备访问接口,资源管理接口,内存管理接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static const ggml_backend_device_i ggml_backend_cann_device_interface = {
/* .get_name = */ ggml_backend_cann_device_get_name,
/* .get_description = */ ggml_backend_cann_device_get_description,
/* .get_memory = */ ggml_backend_cann_device_get_memory,
/* .get_type = */ ggml_backend_cann_device_get_type,
/* .get_props = */ ggml_backend_cann_device_get_props,
/* .init_backend = */ ggml_backend_cann_device_init,
/* .get_buffer_type = */ ggml_backend_cann_device_get_buffer_type,
/* .get_host_buffer_type = */ ggml_backend_cann_device_get_host_buffer_type,
/* .buffer_from_host_ptr = */ NULL,
/* .supports_op = */ ggml_backend_cann_supports_op,
/* .supports_buft = */ ggml_backend_cann_supports_buft,
/* .offload_op = */ ggml_backend_cann_offload_op,
/* .event_new = */ ggml_backend_cann_device_event_new,
/* .event_free = */ ggml_backend_cann_device_event_free,
/* .event_synchronize = */ ggml_backend_cann_device_event_synchronize,
};

此接口 ggml_backend_cann_device_interface 为 CANN 后端在 llama.cpp 中提供了一个通用的设备访问与操作抽象层,便于整合并统一管理 CANN 设备资源。通过实现接口中的各个函数,用户可以控制 CANN 设备的初始化、资源分配、操作支持检测等关键功能,从而确保 llama.cpp 中的模型计算能够顺利利用 CANN 的加速能力。

以下是接口中各函数的功能描述:

  • ggml_backend_cann_device_get_name:返回设备的名称,用于识别不同的设备类型。例如可以返回 "CANN 设备" 或者具体的设备型号。

  • ggml_backend_cann_device_get_description:返回设备的详细描述信息,通常包含设备的硬件特性以及版本信息等,帮助用户理解设备特性。

  • ggml_backend_cann_device_get_memory:获取设备的内存信息,包括总内存大小和当前可用内存,以便 llama.cpp 优化内存分配策略。

  • ggml_backend_cann_device_get_type:返回设备类型,用于区分不同种类的设备(如 CPU、GPU、NPU 等),便于进行不同类型设备的适配。

  • ggml_backend_cann_device_get_props:获取设备的属性信息,包括计算能力、内存带宽等。这些属性信息可用于优化计算分配和选择适合的算子。

  • ggml_backend_cann_device_init:初始化后端设备,确保设备的资源和状态准备就绪。这一步通常在加载模型或开始计算之前调用。

  • ggml_backend_cann_device_get_buffer_type:返回设备内存缓冲区的类型信息,帮助 llama.cpp 决定如何在设备端管理数据缓冲。

  • ggml_backend_cann_device_get_host_buffer_type:返回主机端缓冲区类型,用于在主机和设备之间进行高效的数据交换。

  • buffer_from_host_ptr:该接口可用于将主机端内存直接映射或转换为设备端缓冲区,若未来需求可扩展。

  • ggml_backend_cann_supports_op: 检查 CANN 设备是否支持指定的操作(op),确保模型中的特定操作能够得到设备的加速支持。

  • ggml_backend_cann_supports_buft:检查设备是否支持指定的缓冲区类型,确保数据在缓冲区类型上的一致性和兼容性。

  • ggml_backend_cann_offload_op:将计算操作卸载到设备端执行,提升操作效率和加速模型推理过程。

  • ggml_backend_cann_device_event_new:创建新的事件对象,用于异步操作的状态跟踪,如操作完成的通知。

  • ggml_backend_cann_device_event_free:释放事件对象,清理事件资源,确保内存不被泄漏。

  • ggml_backend_cann_device_event_synchronize:同步事件,确保指定的异步操作完成。这通常用于确保操作的执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static const ggml_backend_i ggml_backend_cann_interface = {
/* .get_name = */ ggml_backend_cann_name,
/* .free = */ ggml_backend_cann_free,
/* .set_tensor_async = */ ggml_backend_cann_set_tensor_async,
/* .get_tensor_async = */ ggml_backend_cann_get_tensor_async,
/* .cpy_tensor_async = */ ggml_backend_cann_cpy_tensor_async,
/* .synchronize = */ ggml_backend_cann_synchronize,
/* .graph_plan_create = */ NULL,
/* .graph_plan_free = */ NULL,
/* .graph_plan_update = */ NULL,
/* .graph_plan_compute = */ NULL,
/* .graph_compute = */ ggml_backend_cann_graph_compute,
/* .event_record = */ ggml_backend_cann_event_record,
/* .event_wait = */ ggml_backend_cann_event_wait,
};

ggml_backend_cann_interface 接口提供了 CANN 后端在 llama.cpp 中的资源管理、异步数据传输、计算图执行等功能接口,实现了与 CANN 后端的深度集成。通过该接口,llama.cpp 可以高效地管理张量的异步操作、事件记录、同步及图计算,确保计算任务能够顺畅运行在 CANN 设备上。

以下是接口中各函数的功能描述:

  • ggml_backend_cann_name: 返回后端名称,通常用于标识该后端为 CANN 后端。

  • ggml_backend_cann_free:释放后端资源,确保内存和其他资源在后端不再使用时被正确回收。

  • ggml_backend_cann_set_tensor_async:异步设置张量数据到设备端,为后续计算提供数据准备。异步设置可提高数据传输的效率。

  • ggml_backend_cann_get_tensor_async :异步获取张量数据,方便在计算完成后从设备端提取数据,避免阻塞主线程。

  • ggml_backend_cann_cpy_tensor_async:异步复制张量数据,支持设备端和主机端之间的数据交互或设备内部的数据拷贝,以便于多任务并行处理。

  • ggml_backend_cann_synchronize :同步操作,确保所有异步任务完成,通常用于确保张量操作和事件顺序执行。

  • graph_plan_create:该接口目前未实现。将来可用于创建计算图执行计划,优化计算图的操作顺序和资源分配。

  • graph_plan_free:该接口目前未实现。可以释放计算图计划的资源,确保内存使用的高效管理。

  • graph_plan_update:该接口目前未实现。可用于在图执行过程中动态更新计算计划,以适应运行时的资源情况。

  • graph_plan_compute:该接口目前未实现。未来可能用于执行图计划中的所有操作,便于更复杂的任务调度。

  • ggml_backend_cann_graph_compute:执行计算图中的所有节点操作,是核心计算接口之一。该函数负责协调图中的计算任务,使之并行或顺序执行。

  • ggml_backend_cann_event_record:记录事件,用于标记特定操作的时间点,便于在异步计算中追踪进度和执行状态。

  • ggml_backend_cann_event_wait:等待特定事件完成,通常用于确保在后续操作开始前当前任务已完成,以保持计算图的执行正确性。

1
2
3
4
5
6
7
8
static const ggml_backend_buffer_type_i ggml_backend_cann_buffer_type_interface = {
/* .get_name = */ ggml_backend_cann_buffer_type_name,
/* .alloc_buffer = */ ggml_backend_cann_buffer_type_alloc_buffer,
/* .get_alignment = */ ggml_backend_cann_buffer_type_get_alignment,
/* .get_max_size = */ NULL, // defaults to SIZE_MAX
/* .get_alloc_size = */ ggml_backend_cann_buffer_type_get_alloc_size,
/* .is_host = */ ggml_backend_cann_buffer_type_is_host,
};

ggml_backend_cann_buffer_type_interface 结构体定义了 CANN 后端缓冲区类型的接口,它提供了一组操作缓冲区属性和行为的函数接口。这个接口使得 CANN 后端的缓冲区能够在 llama.cpp 中被正确管理和使用,确保内存分配、对齐、大小等操作的一致性和高效性。

以下是 ggml_backend_cann_buffer_type_interface 结构体中各字段的功能描述:

  • ggml_backend_cann_buffer_type_name:返回缓冲区类型的名称。该函数用于标识当前缓冲区类型,主要用于调试和日志记录。
  • ggml_backend_cann_buffer_type_alloc_buffer:用于分配缓冲区的内存。通过该函数,llama.cpp 可以请求 CANN 后端分配指定大小的内存块,用于存储数据和张量。
  • ggml_backend_cann_buffer_type_get_alignment:返回缓冲区的对齐方式。内存对齐对于性能至关重要,因为不适当的对齐可能导致 CPU 或 GPU 在访问数据时的效率降低。该函数可以确保数据在内存中的对齐符合硬件的要求。
  • get_max_size :该字段指示缓冲区的最大尺寸,若设置为 NULL,则默认最大值为 SIZE_MAX,即没有固定的尺寸限制。此函数适用于不希望为缓冲区大小设定上限的场景。
  • ggml_backend_cann_buffer_type_get_alloc_size :获取缓冲区实际分配的内存大小。该函数确保返回正确的分配大小,便于用户跟踪内存使用情况。
  • ggml_backend_cann_buffer_type_is_host:判断缓冲区是否为主机缓冲区。该函数用于区分主机内存和设备内存,以便进行适当的内存管理和数据传输。

Host buffer

Host buffer是一种特殊的buffer type,用于在CPU上申请内存,用于一些中间数据的临时存储,为后端设备提供了以快速访问的内存区域。

Pin memory,又称“锁页内存”或“固定内存”,是指将主机内存中的一部分内存固定在物理内存上,以便快速传输至计算设备(如GPU或NPU)。通常情况下,操作系统会将不经常使用的内存页移至虚拟内存中,这可能导致数据传输时出现额外的内存访问延迟。而使用Pin memory则可以避免这种情况,因为锁页内存不会被系统交换出物理内存,从而大大加速数据传输过程。

Host buffer使用Pin memory实现,用户加速Host和Device之间的内存拷贝速度。Host buffer与buffer_type的结构相同,以接口注册的方式提供Host buffer的能力。

Split Tensor

Split Tensor用于在做复杂计算的时候充分利用多卡能力,llama.cpp中,对矩阵乘法,使用到了Split Tensor,计算时会相乘的矩阵其中一个进行拆分,使用多卡进行并行计算,计算完成后做结果的合并。

Split Tensor实现复杂,并且无法利用已有的aclnn算子,在本次设计中不考虑,待后续性能提升中考虑实现。

3.2 算子

llama.cpp主要的推理是单算子推理功能,图推理功能在本次设计中暂不考虑实现。昇腾的单算子支持aclop以及aclnn两种调用方式。经过简单的demo进行性能对比,aclop编译执行的方式执行效率较低,主要算子均通过aclnn实现,aclnn不支持的算子使用aclnn基本算子组合的方式实现,后续需要使用AscendC将组合算子进行融合以提高性能。

3.2.1 Tensor转换

llama.cpp和昇腾算子对Tensor的定义有一定的差异,为了能够使用昇腾算子,需要在调用的时候对Tensor结构做转换。

结构差异

两者的Tensor基本上都是数据和dims,nelements,nstride,dtype的属性集合,但是有一些差异:

  • llama.cpp的ne和nb的顺序是从内到外,也就是与传统意义的维度顺序相反,序号小的是最内的维度。
  • llama.cpp的stride的单位是字节,而aclnn的stride单位是元素。

广义broadcast

当两个计算的tensor维度不同时,会尝试做broadcast,aclnn接口支持的是传统broadcast方式,而llama.cpp支持的是广义的broadcast:

  • aclnn接口的broadcast仅会在Tensor的某个维度不同,但是其中一个Tensor的维度为1的时候发生;
  • llama.cpp的broadcast会在Tensor的某个维度不同,但一个Tensor的维度大小是另外一个的整数倍的时候发生。

为了减少显示broadcast带来的性能和内存的开销,需要进行维度的调整,以便于利用算子的broadcast特性:

例如,Tensor A(9,5,2,7), Tensor B(9,10,2,7),这两个Tensor对aclnn接口来说不可自动broadcast,但是对llama.cpp来说允许自动broadcast。当数据内容连续时,可以通过添加一个维度来兼容aclnn的broadcast规则。 通过将 A(9,5,2,7)转换成A'(9,5,1,2,7),B(9,10,2,7)转换成B'(9,5,2,2,7)。仅通过调整dims信息,即可利用aclnn算子的自动broadcast能力。

3.2.1 aclnn算子

算子名称 描述 计算公式
Elementwise Add 对两个张量进行逐元素加法,并将结果存储在目标张量中。 dst(i)=src0(i)+src1(i)
Leaky ReLU 对输入张量应用 Leaky ReLU 激活函数,并将结果存储在目标张量中。 dst(i)={src(i)src(i)0negative-slope×src(i)src(i)<0
Arange 创建一个从 start 开始,到 stop 结束,每次增长 step 的 Tensor。 out i+1=out i+step
Clamp 将 input 张量的每个元素夹紧到区间 [min, max] 中,并将结果返回到新的张量中。 dst(i)={minsrc(i)<minsrc(i)minsrc(i)minmaxsrc(i)>max
Scale 使用 scale 缩放一个 Tensor 的所有元素,将结果返回到新的张量中。 dst(i)=src(i)×scale
Argsort 将输入 Tensor 中的元素根据某个维度进行升序 / 降序排序,返回对应的 index 值。 -
Layer Norm 对指定层进行均值为 0、标准差为 1 的归一化计算,并将结果写入到新的张量中。 out=xE[x]Var[x]+ϵ×w+b
Group Norm 计算输入的组归一化结果返回到新的张量中。 $ out &= + $
Acc 将 src 张量的数据累加到 dst 中。 dst(i)=src(i)+dst(i)
Sum Rows 返回给定维度中输入张量每行的和。 -
Upsample Nearest2d 对由多个输入通道组成的输入信号应用最近邻插值算法进行上采样。 -
Pad 将 Tensor 填充到与目标 Tensor 相同的尺寸。 -
avg pool2d 对输入 Tensor 进行窗口为 kH×kW、步长为 sH×sW 的二维平均池化操作。 out(Ni,Ci,h,w)=1kHkWm=0kH1n=0kW1input(Ni,Ci,stride[0]×h+m,stride[1]×w+n)
max pooling 对于 dim=3 或 4 维的输入张量,进行最大池化操作。 out(Ni,Ci,h,w)=maxm=0kH1maxn=0kW1input(Ni,Ci,stride[0]×h+m,stride[1]×w+n)
rms norm 计算给定 Tensor 的均方根归一化函数,并将结果写入到输出 Tensor 中。 RmsNorm(xi)=xiRms(x)gi,<br/> where Rms(x)=1ni=1nxi2+eps
diag mask 将 Tensor 进行三角形掩码运算,将下三角部分保留,上三角部分置 1。 -
img2col 用于将二维 Tensor 数据转换成矩阵形式,以便于高效地进行卷积运算。 -
timestep_embedding 用于生成时间步嵌入。 dst(t)=[sin(t100002i/d),cos(t100002i/d)]
softmax 将输入的张量转化为概率分布,其值范围在 [0, 1] 之间,总和为 1。 (xi)=
matmul 计算两个 Tensor 的矩阵乘法,结果返回到新的 Tensor 中。 $ C_{ij} = {k=1}^{n} A{ik} B_{kj}$
Rope 算子是一种位置编码方法,通过旋转操作为输入序列引入位置信息,增强模型对位置关系的感知能力。 ROPE(q,k)=[qevencos(θ)qoddsin(θ),;qoddcos(θ)+qevensin(θ)]
repeat 对输入张量的元素沿特定维度重复,扩展原始数据的维度或增加相同数据的次数。 repeat(x)=[x,x,,x](repeated along specified dimension)
concat 将两个或多个张量在指定维度上拼接。 concat(x1,x2,,xn)=[x1,x2,,xn](along specifieddimension)
Cast 将张量的数据类型从一种类型转换为另一种类型。 -
permute 重新排列张量的维度顺序。 -
exp 对 Tensor 的每个元素执行 exp 指数运算。 dsti=esrci
Elementwise Mul 对两个张量对应元素进行乘法运算。 z=x×y
Cos 对张量的每个元素计算余弦值。 y=cos(x)
Sin 对张量的每个元素计算正弦值。 y=sin(x)
fill scalar 将张量的所有元素填充为指定的标量值。 x[:]=scalar
pow tensor 将一个张量的每个元素提升到对应的指数幂。 y=xpower
Alibi 一种相对位置嵌入策略,在注意力分数中加入线性偏置,帮助捕获相对位置信息。 $(i, j) = -m
repeat interleave 对张量的每个元素按指定次数重复,以在张量中插入更多的副本。 dst(x,repeats)=[x1,x1,,x1,x2,x2,,x2,]
roll 将张量元素沿指定维度循环移动,即滚动。 roll(x,shift)=xshifted along axis
index fill tensor 在张量的特定索引位置填充指定值。 dst[index]=src

3.2.2 AscendC算子

以下算子没有aclnn接口可调用,也无法使用基础算子组合,需要通过AscendC编程语言实现。为了简化算子的调用流程,采用kernel call的方式进行调用。AscendC算子独立编译,以.a的方式链接到llama.cpp中。

dup

dup和copy语义相同,均为Tensor之间的拷贝,需要支持:

  • 量化Tensor和非量化Tensor之间的拷贝,拷贝过程中涉及到量化和反量化的计算过程。需要支持Q4_0和Q8_0两种量化格式。非量化格式需要支持fp32和fp16两种格式。
  • 连续Tensor和非连续Tensor之间的拷贝(量化格式Tensor不涉及非连续场景)。

get rows

从Tensor中按照index获取每行内容。

需要支持多种数据格式,包括fp32,fp16,Q4_0和Q8_0。获取后的数据均为fp32格式。

AscendC算子通过kernel launch的方式调用,调用时需要判断AI core的数量,来配置合适的数量以提升执行效率。

为了兼容多种芯片,CMake时需要检测或根据提供的芯片类型进行编译和链接。

3.3 内存管理

aclnn执行时,有些需要申请临时的NPU上内存做临时数据存储,频繁的内存分配和释放效率很低,需要内存池来提高内存分配性能。

在llama.cpp中,需要实现2中内存池:

legacy pool

使用N(256)个buffer做内存缓存,所有的内存释放必须放回内存池(防止异步执行访问到已释放内存),内存申请首先选择内存池中大小最合适的缓存,内存池为空则去申请内存

legacy pool流程图

会占用额外的内存,并且存在内存块查找的开销,并且,如果free的内存块超过N(256),则会出现assert失败问题。

vmm pool

使用虚拟内存,业务代码看到的是一段连续的内存,方便使用。实际上申请的物理内存是非连续的,当内存不足时申请一段物理内存映射到虚拟内存中。避免内存碎片和占用额外内存。

vmm pool示意图

在虚拟内存中,申请的数据紧密排列,申请和销毁的顺序是相反的。比如,buffer1早于buffer2申请,那么buffer2必须要早于buffer1释放。在内存管理中,仅维护一个free指针,指示下一个buffer申请的起始地址。

vmm pool示意图

异步计算的内存延迟释放

由于所有的算子计算都是异步的,但是内存的申请和释放并不是异步的,所以,需要保证在异步计算完成之前,申请的内存是有效的。

vmm pool示意图

如图所示,当算子提交完成后,buffer3就会释放,free指针指向buffer3的起始地址。接着,下个算子开始执行,会从free指针开始申请内存,此时buffer3和buffer4是重叠的,但是由于stream中的算子计算有序,所以buffer3内的数据在完成计算之前,是不会被buffer4修改的。

3.4 量化格式

以4bit量化为例:

量化分组格式

1
2
3
4
5
#define QK4_0 32 // 每组32个f32数据
typedef struct {
ggml_half d; // 公共系数
uint8_t qs[QK4_0 / 2]; // 4bit存储的数据
} block_q4_0;

量化算法描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void quantize_row_q4_0_reference(const float * restrict x, block_q4_0 * restrict y, int64_t k) {
static const int qk = QK4_0;

assert(k % qk == 0);

const int nb = k / qk;

// 1. 找到绝对值最大的数的值
for (int i = 0; i < nb; i++) {
float amax = 0.0f; // absolute max
float max = 0.0f;

for (int j = 0; j < qk; j++) {
const float v = x[i*qk + j];
if (amax < fabsf(v)) {
amax = fabsf(v);
max = v;
}
}

// 2. 公共系数是第一步的值除以 -8
const float d = max / -8;
const float id = d ? 1.0f/d : 0.0f;

y[i].d = GGML_FP32_TO_FP16(d);

// 3. 对组内的所有数据,除以公共系数,然后按以下顺序存储
// 量化前: 1,2,3,4,5,6,7,8 ...... 30,31
// 量化后: 1,17,2,18,3,19 ...... 16,32
// 也就是数据按顺序先填充量化后组的低4位,然后再填充高4位。
for (int j = 0; j < qk/2; ++j) {
const float x0 = x[i*qk + 0 + j]*id;
const float x1 = x[i*qk + qk/2 + j]*id;
// 量化的值+8.5,向上去整并转无符号数。
const uint8_t xi0 = MIN(15, (int8_t)(x0 + 8.5f));
const uint8_t xi1 = MIN(15, (int8_t)(x1 + 8.5f));

y[i].qs[j] = xi0;
y[i].qs[j] |= xi1 << 4;
}
}
}

NPU善于做向量和矩阵计算,按字节的计算,以及位计算性能不佳。 所以需要调整数据存储格式。

在set tensor过程中,使用cpu做tensor的内存调整,让后续的计算能够充分利用NPU能力。

量化内存格式调整示意图

如上图所示,输入的Tensor是按组存放的,每组存放了该组的公共系数,以及32个数据的量化后的值,int4类型量化值是先填充高4位,再填充低4位。为了昇腾的计算效率,在做这类伪量化算法时,将原始Tensor拆解成2个Tensor,一个按顺序记录所有的值,另外一个记录每一组的公共系数,并且值和公共系数按32:1的方式对应。然后昇腾算子按照weight和group scale的方式输入进算子,能够提高量化后Tensor的执行效率。

所有的内存布局修改的时机是set tensor和get tensor过程中,对于整个程序来说,对内存布局的修改是不感知的,拷贝到NPU上时进行布局修改,从NPU下载时再进行布局复原,这样,及时设计CPU,NPU混合运算,也不会影响数据的正确性。

对于更加复杂的量化方式,例如q5_0,需要设计到位运算,此类量化由于性能问题尚未支持。

3.5 代码风格和注释

llama.cpp社区对代码风格没有详细的要求,社区仅要求”清除所有尾随空格,使用 4 个空格缩进,括号在同一行void * ptr,int & a`”。并且,对注释也没有明确的要求。为了能够保持一致的风格,以及方便社区开发者了解昇腾后端的业务逻辑,需要在编写代码时遵循一致的编码规范和详尽的注释。

  • 除了社区要求的内容之外,其他代码规范需要遵循google编码规范;
  • 注释需要包含函数和变量的介绍,参数和返回值说明,算子相关代码需要注释算法的数学公式。其他的复杂逻辑按需求添加注释;
  • 注释需要符合doxygen风格,以便于生成方便阅读的手册。

4. 测试和验证

本设计文档主要是昇腾的后端支持,llama.cpp已经做了后端抽象,所有测试用例可以复用社区的内容。针对社区用例没有看护到的部分,添加必要的用例来看护。

4.1 Runtime测试

Runtime测试主要是验证设备注册,内存分配,stream和event管理相关功能。

设备注册和卸载(单卡,多卡)

测试目的

  • 验证昇腾设备可以正常注册到llama.cpp中,支持单卡和多卡注册。

测试步骤

  1. 调用设备注册接口,注册单卡以及多卡;
  2. 查看设备信息是否正常获取。

预期结果

无报错信息,并根据ASCEND_VISIBLE_DEVICES的设置情况,能够正常获取到对应的设备信息。

buffer和Tensor创建

测试目的

  • 验证昇腾后端可以正常的创建buffer以及llama.cpp的Tensor结构;

测试步骤

  • 构造若干个Tensor结构,并给这些Tensor分配内存buffer;

预期结果

  • 内存完成分配,无错误信息。

Tensor的上传和下载(同步,异步)

测试目的

  • 验证数据可以正确的上传和下载。

测试步骤

  1. 构造随机数据;
  2. 将数据拷贝到创建好的Tensor中;
  3. 将Tensor中的数据下载;
  4. 与原始随机数据进行对比。
  5. 分别使用同步拷贝和异步拷贝,重复以上过程。

预期结果

  • 数据比对与原始数据相同。

Tensor卡间拷贝(包括event同步)

测试目的

  • 验证卡间拷贝以及事件同步的正确性。

测试步骤

  1. 构造随机数据;
  2. 将数据拷贝到卡1的Tensor中;
  3. 开启卡1和卡2的卡间拷贝开关;
  4. 在卡1的stream提交卡1Tensor向卡2Tensor拷贝的任务;
  5. 在拷贝流中插入卡2的event事件;
  6. 在卡2的流中等待event事件;
  7. 从卡2中下载Tensor数据;
  8. 与原始数据做比对。

预期结果

  1. 数据比对与原始数据相同;
  2. 卡2event同步正确,在卡1stream中构造耗时操作,确保event能够等待拷贝动作结束。

量化拷贝验证

测试目的

  • 量化Tensor拷贝需要调整内存布局,验证量化Tensor的拷贝结果正确。

测试步骤

  1. 构造随机的量化Tensor;
  2. 将量化Tensor上传到设备上;
  3. 使用aclrtmemcpy直接拷贝数据;
  4. 从设备上将Tensor下载下来;
  5. 与原始数据作对比。

预期结果

  1. 步骤3的memcpy的结果与原始数据不同,因为上传过程做了内存布局调整;
  2. 步骤5数据对比与原始数据相同。

4.2 算子单元测试

单元测试复用社区的单元测试用例(test-backend-ops),包含1500多个用例。其覆盖的场景有:

  • 算子多shape多dtype验证,保证该算子所有的输出输出的shape和dtype类型都能够覆盖;
  • 算子的精度验证,用例会构造随机数据,分别在设备上和CPU上运行,最后对比精度,两个Tensor的归一化方差需要小于 1e-6。
  • 计算结果越界检查,由于推理过程中,Tensor是紧密排列,所以每个tensor的计算结果不能越界,否则会损坏其他tensor的数据,用例会在每个输入和输出tensor前后分别放置一个随机tensor,通过对比随机tensor的计算前后的结果,来检查是否存在越界行为。

单元测试用例会判断后端的算子支持情况,理论上,所有支持的算子(包括shape和dtype)都需要通过该测试用例集的验证。

4.3 性能测试

算子的性能测试用例与单元测试用例相同,区别是性能测试用例不会验证精度,也不会创建随机tensor用作越界检查。性能测试会构造一个特殊的图,包含最多8192个计算节点,然后交给后端进行推理,并计算平均每次的执行时间,以及数据吞吐率。

  • 910B对于简单算子(包括直接调用aclnn接口的,或者做了简单的参数调整的)性能要超过Intel主流CPU的性能。
  • 对于复杂算子(包括构造多个临时tensor,以及需要多个算子组合的)暂不做算子的性能要求。
  • 非910B芯片,不做性能要求。

910B模型推理(llama3 8B)整体性能,token延迟需要小于100ms(人类的阅读速度大致是10个token/s,延迟小于100ms,可以满足人类的阅读需求),吞吐需要超过300token/s(0.6 * A100 vllm llama3 8B的推理性能)。

以下为 Qwen 2.5 全系列模型在昇腾 910B 上的推理性能表现汇总数据,包括 Qwen2.5 0.5B、1.5B、3B 的 Q8_0 和Q4_0 量化的推理性能数据作为对比参考:

Model Tokens / Second NPU Util NPU Mem NPU Card(64G/Card)
Qwen2.5 0.5B FP16 42 tokens/second Util 6~7% Mem 7% 单卡
Qwen2.5 1.5B FP16 35 tokens/second Util 11~13% Mem 10% 单卡
Qwen2.5 3B FP16 29 tokens/second Util 15~16% Mem 15% 单卡
Qwen2.5 7B FP16 32 tokens/second Util 16~21% Mem 16% 单卡
Qwen2.5 14B FP16 19 tokens/second Util 19~22% Mem 28% 单卡
Qwen2.5 32B FP16 10.5 tokens/second Util 10~45% Mem 54% 双卡
Qwen2.5 72B FP16 6 tokens/second Util 10~60% Mem 78% 三卡
Qwen2.5 0.5B Q8_0 6.5 tokens/second Util 2~5% Mem 6% 单卡
Qwen2.5 0.5B Q4_0 6 tokens/second Util 4~5% Mem 6% 单卡
Qwen2.5 1.5B Q8_0 3.5 tokens/second Util 4~11% Mem 8% 单卡
Qwen2.5 1.5B Q4_0 17~18 tokens/second Util 9~12% Mem 7% 单卡
Qwen2.5 3B Q8_0 3.2 tokens/second Util 10~15% Mem 10% 单卡
Qwen2.5 3B Q4_0 14.5 tokens/second Util 8~15% Mem 8% 单卡

对其中的 Qwen 2.5 0.5B FP16 模型进行并发测试的性能表现如下:

Concurrency Tokens / Second Throughput NPU Util NPU Mem
1 39 tokens/second 39 Util 6~7% Mem 7%
2 38 tokens/second 76 Util 6~7% Mem 7%
3 37.66 tokens/second 113 Util 6~7% Mem 7%
4 34.25 tokens/second 137 Util 6~7% Mem 7%
5 31 tokens/second 155 Util 6~7% Mem 7%
6 28.16 tokens/second 169 Util 6~7% Mem 7%
7 27.57 tokens/second 193 Util 6~7% Mem 7%
8 26.87 tokens/second 215 Util 6~7% Mem 7%
9 26 tokens/second 234 Util 6~7% Mem 7%
10 26.9 tokens/second 269 Util 6~7% Mem 7%
20 20.3 tokens/second 406 Util 6~7% Mem 8%
50 10.34 tokens/second 517 Util 3~5% Mem 8%
100 4.17 tokens/second 417 Util 2~5% Mem 9%

4.4 模型精度验证

除了算子的精度验证以外,对模型需要做整体的精度验证,以避免在数据加载拷贝,kv_cache操作等过程中出现错误。

eval-callback

llama.cpp社区提供了一个精度对比工具:eval-callback,这个工具会执行一次推理过程,并将推理过程中所有涉及的算子的计算结果进行打印。通过对比相同seed情况下的NPU和CPU的推理结果,判断整个推理过程是否存在异常。

需要注意的是,tensor的内容在会存在微小的差异,这不属于精度异常。

CPU推理对比

使用llama3模型,使用相同的seed,分别在NPU和CPU上进行相同的推理内容,理论上前数百token应该完全一致。由于存在精度的微小差异,推理累计的过程中,在长回复的后段,可能会出现细微差异。

4.5 模型支持验证

目前,llama.cpp支持以下模型以及多种量化格式,我们仅关注fp16,Q8_0和Q4_0三种dtype。

模型支持的原则是不存在不支持的算子,检查方式是查看切图的情况,如果出现了大量子图(超过100),说明存在算子不支持,已经fallback到CPU进行推理,此类模型虽然能够完成推理,但是推理性能较低。

模型 FP16 Q8_0 Q4_0
AquilaChat2-7B
Baichuan-7b
Baichuan2-7B-Chat
bitnet_b1_58-large
bloom-560m x
bloomz-alpaca-560m x
c4ai-command-r-35B-v01 x x x
chatglm3-6B x x x
chinese-alpaca-2-1.3b
CodeShell-7B
deepseek-ai_deepseek-coder-1.3B-base x x x
deepseek-ai_DeepSeek-V2-Lite x x x
deepseek-coder-6.7B-instruct x x x
DeepSeek-V2-Lite-64x1.5B x x x
falcon-7b-instruct
flan-t5-large
gemma-2-9b-it
glm-4-9B x x x
gpt2
Gpt2-163M
granite-3B-code-instruct
GritLM-7B
internlm2_5-7b-chat
koala-7B-HF
Llama-2-7b-chat-hf
Llama-3-Smaug-8B
Llama2-Chinese-7b-Chat
Llama3-8B
Llama3-8b-chinese
mamba-130m-hf
Mistral-7B-Instruct-v0.2
Mixtral-8x7B-Instruct-v0.1 X
mpt-7B
OLMo-1B-hf
OpenELM-3B-Instruct
Orion-14b-base
phi1 x x x
phi2 x x x
Phi-3-mini-4k-instruct
plamo-13b
pythia-70M x x x
Qwen-7B
Qwen2-1.5B-Instruct x
Refact-1_6B-fim
SmolLM-135M
stablelm-zephyr x x x
stablelm-2-zephyr-1_6b x x x
starcoderbase-1b
starcoder2-3b
vigogne-7b-chat
xverse-7b-chat
Yi-6b-Chat

4.6 社区CI

目前由于资源限制,暂时无法向社区提供开发机和CI机器,但是需要保证编译通过,防止社区的重构导致的昇腾后端被破坏的问题。编译不需要昇腾硬件,可以使用社区的CI机器。

  • 提供昇腾构建的容器镜像,避免配置复杂的环境。
  • 提供github workflow的job,添加昇腾的CI验证,并作为门禁。

5. Ollama支持

Ollama 是一个旨在提升本地大型语言模型(LLM)运行效率和灵活性的开源平台,快速在本地部署启动大模型的应用。Ollama 的设计初衷是通过优化硬件加速和支持更高效的推理计算,帮助开发者和研究人员更方便地在本地部署和运行 LLM,从而不依赖云计算资源或其他昂贵的基础设施。,Ollama使用llama.cpp作为推理引擎。一条命令可以完成安装和模型拉起。

安装

1
curl -fsSL https://ollama.com/install.sh | sh

运行

1
ollama run llama3

除此之外,Ollama还有有一个模型的仓库,保存有海量的gguf模型,其兼容openAI API,有着众多的前端应用。

为了能够充分的利用llama.cpp的昇腾后端能力,简化昇腾使用门槛,同时需要完成Ollama的昇腾适配。简单来说,OIlama需要适配一下几个关键部分:

构建

Ollama会构建llama.cpp工程,并将二进制打包到ollama的二进制文件中,在构建ollama的过程中,需要完成llama.cpp的昇腾版本的构建。

NPU检测

Ollama在运行时会检测NPU硬件,显存容量等,来判断模型是否能够运行,以及合理的模型拆分方式,所以需要在ollama中实现必要的昇腾硬件检测接口。

拉起

Ollama运行模型时,会拉起对应后端的llama.cpp服务器,这里需要根据硬件检测的结果来拉起NPU版本的llama.cpp服务器。

这里仅做Ollama兼容昇腾后端的简单洞察,不做详细设计,社区方案已完成,PR提交中。

6. 社区跟进

llama.cpp是一个非常活跃的社区,平均每天有十几个提交的合入,包括大量的重构和大粒度特性的合入。昇腾后端需要紧跟社区的发展路线,根据社区的重构和特性进行适配。

同时,在社区也存在对昇腾后端的需求,以及问题反馈,需要及时完成解决。

社区没有要求SLA,原则上,简单问题修复和重构适配应当在5个工作日内完成,特性需求根据实际情况灵活处理。

7. 文档和说明

为了帮助llama.cpp的昇腾用户,需要编写详尽的文档,包括环境搭建,构建,运行,模型和数据类型支持情况以及贡献指导等。

7.1 社区doc

  • 在社区README添加Ascend的支持描述,并且提供跳转链接。
  • 提供环境搭建步骤,包括操作系统版本,昇腾驱动和CANN的版本要求和安装方法。
  • 提供Dockerfile,包含llama.cpp所需的环境配置,能够避免复杂的环境部署。
  • 提供构建,运行的命令。
  • 提供模型和数据类型支持情况。
  • 提供issue和PR提交规范。

7.2 昇腾开源手册

为了方便中文用户,以及昇腾社区入口的用户,还需要在昇腾开源文档中提供中文版的step by step构建和推理手册。

其他参与review的PR和issue见链接

llama.cpp昇腾开源使用手册

8. 项目引用

ollama

Ollama是一款专注于在本地运行大型语言模型的工具,旨在简化模型的部署和使用,提供高性能且无需云端依赖的AI推理体验,使用llama.cpp作为推理引擎,以git submodule的方式引用llama.cpp代码。目前已与2012同事一同完成设计并提交PR

llama edge

Llama Edge是一个为边缘设备优化的轻量级大语言模型框架,旨在支持本地化、高效的推理,以满足低延迟和有限资源的计算需求,使用llama.cpp作为其推理后端。llama edge官方发表了一篇知乎的回复以及一篇官方文档

llamabox&gpu stack

Llamabox是一个便捷的平台,提供开箱即用的大语言模型部署方案,使用户能够轻松运行和管理AI模型;而gpustack是一项云服务,专为高性能计算和AI模型训练优化,提供灵活的GPU资源共享和管理功能,其使用了llama.cpp作为其推理后端之一。有一篇使用gpustack使用昇腾推理的实践文章

为什么需要编译

相比于Eager模式,编译有以下优势:

  • 优化性能:通过全局优化,减少算子之间的数据拷贝来提升执行速度。

  • 降低延迟:提前优化代码逻辑,减少运行时解释开销,适用于实时应用。

  • 复杂优化策略:实现操作融合、并行执行等高级优化。

  • 更快的算法支持:不需要实现对应的各种融合算法,完成lower和pass即可适配。

  • 静态分析与错误检查:提前捕捉潜在错误,提高代码质量。

所以编译模式是一定要支持的技术路线。

为什么PyTorch编译需要Triton

PyTorch 2.0

为什么选择Inductor

PyTorch有多个后端编译器,都可以从FX Graph进行模型编译。Intree的有cudagraph,onnxrt,openxla,tvm以及inductor,out of tree的有nvFuser和AITemplate等。Inductor相较于其他编译器,有以下优势,并逐渐成为PyTorch主力发展的编译器。

  • 动态编译:能够根据输入数据的特性在运行时生成优化代码,实现最佳性能。

  • 实时优化:允许在推理过程中对计算图进行修改和优化,无需重新编译整个模型。

  • 支持复杂操作:处理多种复杂操作和动态控制流,增强模型的表达能力。

  • 集成现有 API:与 PyTorch API 无缝集成,降低学习曲线,方便开发者使用。

  • 硬件适配:根据不同硬件生成特定优化代码,充分利用计算能力,目前支持CPU和GPU。

  • 配置选项:提供多种参数和选项,允许用户自定义优化策略,增强灵活性。

  • 性能调优:在运行时监控性能,根据实际情况自动选择最优执行路径。

Inductor会做很多通用优化,使得使用Inductor IR的后端均可以享受到这些优化的能力。

  • Integrating a new backend at the AtenIR/PrimsIR level is straightforward and provides more freedom to the new backend to optimize the captured graph. But it might be suboptimal performance if the backend lacks the DL compiler capability because the decomposed operations for a single operation might need more memory and worse data locality compared with the single operation if the decomposed operations can't be fused.
  • Integrating at the Inductor loop IR level can significantly simplify the complexity of design and implementation by leveraging the Inductor's fusion capability and other optimizations directly. The new backend just needs to focus on how to generate optimal code for a particular device.

为什么选择Triton

Inductor目前支持三个后端:OpenMP,Halide以及Trition

Triton 是一种用于并行编程的语言和编译器。它的目标是提供一个基于 Python 的编程环境,以便高效地编写自定义的深度神经网络(DNN)计算核心,这些核心能够在现代 GPU 硬件上以最大吞吐量运行。

AI算子编写2个最关键的点:

  1. 编写的复杂程度;
  2. 算子执行的效率。

Triton的目标是使用简单的实现方法(类似Python语言),让绝大部分的开发者能够开发出媲美CUDA专家编写出的算子执行效率。

01 vector add
02 fused softmax
03 matrix multiplication
05 layer norm
08 grouped gemm

上图可以看出,使用简单的算子实现可以与cuBLAS以及Torch aten算子相同甚至更高的执行效率。

除此之外,FlagGems算子库,unsloth微调框架也基于Triton编写算子,并且字节等互联网公司也在使用Triton来高效的编写后端无关的高性能算子。

Triton架构

img
1
2
3
4
5
6
Nvidia GPU: TritonDSL -------------> ttir -> ttgir -> ptx -> cubin
ast.parse ^ \/ ^ \/
| pass| | pass |
+-----+ +--=---+
| |
|-------MLIR-----|

Triton的工作原理就是将TritonDSL一步步转换成适合于硬件执行的二进制的过程。

  • Triton利用llvm的MLIR框架,注册TritonIR以及TritonGpuIR,以及相应的pass和convert;
  • 首先Triton会将代码用ast.parse解析,将ast语法书转换成ttir;
  • 使用注册的pass和convert函数,对ttir进行优化并转换成ttgir;
  • 继续用pass和convert函数,对ttgir转换成llvm ir;
  • 使用nvidia提供的工具将llvm ir转换成ptx文件;
  • 使用nvidia提供的工具将ptx文件编译成二进制cubin。

洞察启示

  1. Triton是昇腾支持Torch.compile的必经之路;
  2. Triton的支持对昇腾的三方库原生支持有很大的帮助;
  3. 借助triton-share的能力,可以进一步将ttir转换成linalg IR,后者对昇腾亲和,减少适配工作量;

安装conda

python最低要求 3.8版本及以上。

1
2
3
4
5
6
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm -rf ~/miniconda3/miniconda.sh

~/miniconda3/bin/conda init bash

下载源码

pytorch社区下载源码,并下载git的submodule。

1
2
3
4
5
git clone --recursive https://github.com/pytorch/pytorch
cd pytorch
# if you are updating an existing checkout
git submodule sync
git submodule update --init --recursive

拉代码特别是拉三方的库的时候,会有访问不到的情况,可以重试,或者配置http代理以及ssh代理:

http代理:

1
2
export http_proxy=host:ip
export https_proxy=host:ip

ssh代理:

1
2
3
4
5
cat ~/.ssh/config

Host github.com
User git
ProxyCommand nc -v -x localhost:7890 %h %p

安装依赖

pip最好配置下清华源,不然会很慢。

1
2
python -m pip install --upgrade pip
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

安装pytorch依赖,gcc需要支持C++ 17,需要使用gcc 9.4.0及以上。

1
2
3
conda install cmake ninja
# Run this command from the PyTorch directory after cloning the source code using the “Get the PyTorch Source“ section below
pip install -r requirements.txt

编译

1
2
export CMAKE_PREFIX_PATH=${CONDA_PREFIX:-"$(dirname $(which conda))/../"}
python setup.py develop

一些有用的编译时的环境变量:

  • DEBUG=1 will enable debug builds (-g -O0)
  • REL_WITH_DEB_INFO=1 will enable debug symbols with optimizations (-g -O3)
  • USE_DISTRIBUTED=0 will disable distributed (c10d, gloo, mpi, etc.) build.
  • USE_MKLDNN=0 will disable using MKL-DNN.
  • USE_CUDA=0 will disable compiling CUDA (in case you are developing on something not CUDA related), to save compile time.
  • BUILD_TEST=0 will disable building C++ test binaries.
  • USE_FBGEMM=0 will disable using FBGEMM (quantized 8-bit server operators).
  • USE_NNPACK=0 will disable compiling with NNPACK.
  • USE_QNNPACK=0 will disable QNNPACK build (quantized 8-bit operators).
  • USE_XNNPACK=0 will disable compiling with XNNPACK.
  • USE_FLASH_ATTENTION=0 and USE_MEM_EFF_ATTENTION=0 will disable compiling flash attention and memory efficient kernels respectively

验证

1
2
3
4
5
import torch

a = torch.randn(2,3)
b = torch.randn(2,3)
a + b

参考:

[1] PyTorch installation from source.

[2] PyTorch contributing doc.

推理场景

  • 计算机视觉:图像分类,目标检测,人脸识别,图像生成。

  • 自然语言处理(NLP):翻译,语音识别,文本分类,内容审核。

  • 自动驾驶

  • 生物学:医学影响分析,基因分析,蛋白质预测。

  • 金融:信用,风控。

  • 制造业:质量控制,成本控制。

  • 零售业:推荐广告系统,库存管理。

  • 能源:智能电网。

  • 大模型推理:LLM,多模态模型推理。

推理的一般流程

以目标检测为例:

  1. 预训练模型获取:从 Huggingface或者其他预训练模型平台上下载模型;

  2. 模型转换:根据使用的推理框架,将下载的模型转换成对应的格式;

  3. 数据预处理:视频解码,并从流中抓取图像,将图像进行裁剪,旋转,翻转,调整色彩空间,标准化等操作;

  4. 模型推理:将处理完成的图像数据输入模型进行推理,得到推理结果;

  5. 后处理:将模型推理结果进行处理,获取目标框,标签等信息;

  6. 结果展示:将原始图像和推理结果进行融合,给检测的目标加上框和目标类型标签。

推理技术栈

Nvidia

image-20231116101903233

昇腾

image-20231116101849666

推理技术栈自下而上一般为:

  • 硬件:参与推理的硬件,例如CPU,GPU和NPU等,Nvidia目前常用的GPU为Volta->Turing->Ampere->Hopper等架构,Ascend为310系列和910系列芯片, 采用Davinci架构。
  • 驱动和基础软件:此类软件包括加速卡的驱动程序,异构计算运行时(CUDA RT, CANN RT),kernel开发调试工具等。除此之外,Ascend还提供了常用算子库。
  • 推理引擎:推理引擎一般提供模型转换,模型优化,以及模型推理功能,并且提供运行的性能指标供性能分析和自动负载均衡。 大部分推理引擎都原生支持CUDA,对昇腾的原生支持较弱。
  • 推理服务化:推理服务化工具一般提供restful和rpc接口,模型服务化部署。另外可以配合容器技术,调度技术和负载均衡等实现自动扩缩容,提高推理速度,提高资源利用率。基本上大多深度学习框架均提供了服务化部署能力,其中Triton支持多种后端,并提供了友好的接入接口。
  • 行业应用:针对特定行业的预处理,pipeline或者相关的SDK用于简化行业应用的开发复杂度,甚至通过配置可以直接在行业内应用。
  • 其他配套:其他配套例如预训练模型的仓库,模型调优工具,算法加速库以及边缘计算平台等。

推理流程中涉及的软件

image-20231116102732237
  • 模型预处理hub:hub不仅能够保存预训练模型,并且能够通过代码api的方式直接下载并加载模型。昇腾的Model Zoo需要手动下载模型,Huggingface的预训练模型可以通过python api下载和使用。
  • 模型转换:ONNX是一种开发的模型格式,可以与常见的深度学习框架进行转换。除此之外,其他的框架一般提供有限的模型转换能力,大多是同一个框架内的不同模型格式的转换。
  • 预处理:计算机视觉中,Nvidia自研了部分适配GPU的加速库,并且常见的OpenCV,torchvision也原生支持GPU。目前对昇腾的支持较弱,目前仅有mxBase和OpenCV实现了少量的常用接口。昇腾推理的CV预处理,还需要依赖CPU处理。自然语言处理库由于算法的特殊性,无法充分利用并行计算能力,上述库基本上都仅在CPU上运行。Nvidia提供了推荐系统大量数据并能处理能力的库NVTabular,昇腾在此方面可以使用mxRec提供的加速能力。
  • 模型分析优化:推理框架一般分为优化和运行两部分,其中优化部分对传入的模型根据底层架构进行优化。并且在执行过程中可以通过组件监控模型的运行情况,以用来模型调优,或者提供弹性扩缩容能力。
  • 推理框架:推理框架是推理业务的重点,不同的推理框架能力各有优劣:
    • TensorRT (TensorRT by NVIDIA):
      • 优点:
        • 面向 NVIDIA GPU 的深度学习推理优化库。
        • 针对高性能、低延迟的推理任务进行了优化。
      • 缺点:
        • 仅适用于 NVIDIA GPU,不具备跨平台性。
    • ONNX Runtime:
      • 优点:
        • 开放的模型表示格式,允许在不同框架之间共享和部署模型。
        • 支持多种深度学习框架,如TensorFlow、PyTorch、Caffe等。
      • 缺点:
        • 部分框架的支持可能不如原生框架的性能优越。
    • OpenVINO
      • 优点:
        • 多平台支持,支持多深度学习框架。
        • 针对各设备硬件进行优化,能在多种设备上高性能推理。
      • 缺点:
        • 大量优化针对Intel硬件,对其他硬件厂商的优化有限。
        • 开源版本功能限制,有些特性需要商业版支持。
    • ncnn, TNN, MNN,ARMNN
      • 优点:
        • 面向移动端和嵌入式CPU或GPU,轻量级,弱依赖。
        • 支持多种模型类型,有模型转换能力。
      • 缺点:
        • 非嵌入式平台(ARMNN在非ARM平台)支持较弱。
  • 推理服务化:主流的深度学习框架基本上都提供了服务化能力,可以通过restful或者rpc接口进行模型推理。其中Triton设计更为灵活,能够方便的集成不同的后端,目前已经支持主流深度学习框架的推理服务。目前还没有支持昇腾推理框架,但是可以通过pytorch插件或者ONNX runtime进行推理。
  • 后处理:后处理将图例结果进行加工处理,并展示推理结果,所需软件与预处理大致相同。
image-20231116102541095

除了推理流程中的软件之外,还有行业应用,边缘计算,以及算法加速库。Nvidia和昇腾在这些领域均有涉及。

总结

  1. 昇腾在主流深度学习框架,推理框架以及推理服务化软件中,原生支持较弱,大部分框架在设计之初均考虑GPU支持,目前已经支持昇腾的框架多为后期开发。如果框架在后端支持上设计不够友好,接入难度较高。
  2. 计算机视觉预处理能力与GPU能力差距较大,包括OpenCV,torchvision等开源CV软件均原生支持GPU,并且Nvidia还有自研的图像预处理库,昇腾仅支持少量高频使用的接口,并且性能还存在差距。
  3. Nvidia和开源在框架,应用软件和加速库的使用上较为容易,社区活跃,文档完整规范,学习成本低。昇腾相关软件使用门槛较高,使用上相较而言较为繁琐。
  4. 建议在推理全流程中选择一个技术路线,做昇腾支持,在功能,性能上追平或超过友商,然后再考虑自研更适合昇腾场景的自研软件。

LOGO

在当今数字化时代,图像处理不再是一个陌生的领域,而是推动科学、技术和创新的引擎。OpenCV(开源计算机视觉库)凭借其丰富、强大的功能和灵活性在图像处理领域占据引领者地位。为了进一步加速OpenCV图像处理,即将推出的OpenCV 4.9.0版本提供了图像处理相关的高频使用接口的昇腾(Ascend)支持。这意味着在计算机视觉领域的数据预处理和后处理流程中,可以充分发挥昇腾软硬件的强大算力和计算效率,加速图像处理操作。

除此之外,2023年10月17日,OpenCV社区正式接纳昇腾+openEuler作为持续集成(Continuous Integration,CI)系统的操作系统之一,这意味着未来OpenCV的代码修改均会在昇腾+openEuler环境中进行自动化构建及测试,保障了OpenCV在openEuler上支持的稳定性。

图像处理昇腾接口使用

本次提供的以昇腾为后端的图像处理接口封装在OpenCV扩展包(contrib)的cannops模块中,包括图像矩阵的算术运算,通道拆分合并,图片裁剪,翻转,调整大小,转置等图像处理的Python和C++接口,处理精度与CPU后端的计算结果相同。

那么如何调用昇腾后端实现图像处理呢?除了对昇腾必要的初始化、去初始化之外,用户无需学习CANN API,仅需要将原来的接口添加cann包名(C++接口为使用cann命名空间),即可使用昇腾算力:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import cv2

# cann初始化
cv2.cann.initAcl()
# 选择执行运算的设备编号
cv2.cann.setDevice(0)

# 图像处理(以图像旋转为例)
img = cv2.imread('/path/to/image')
# 添加cann包名调用昇腾接口
ret = cv2.cann.rotate(img, 0)
cv2.imwrite('/path/to/result', ret)

# cann去初始化
cv2.cann.finalizeAcl()
昇腾接口使用方法

详细示例代码接口列表请参考OpenCV官方文档

OpenCV官方支持昇腾CI

OpenCV社区正式接纳昇腾+openEuler作为持续集成(Continuous Integration,CI)系统的操作系统之一,对昇腾和openEuler提供上游原生支持,给广大OpenCV及昇腾用户提供了稳定性保障。让我们跟随下面两个问题,了解昇腾CI是如何维护稳定性的。

持续集成做了什么

CI提供了一个高效、自动化的开发环境,在开发者提交代码后,会触发一系列自动构建及测试,以确保软件系统的稳定性和质量。持续集成系统不仅缩短了开发周期,也为开发者提供了更快速、可靠的反馈机制,从而推动了软件开发的创新和进步。

OpenCV支持昇腾CI意味着什么?

未来OpenCV社区的代码提交将会在昇腾CI环境下自动化构建和测试,有助于确保OpenCV在昇腾上的稳定性和性能。同时为后续OpenCV的昇腾相关贡献代码的提交提供了稳定可靠的测试平台,有助于推动OpenCV+昇腾的蓬勃发展。感谢OpenCV和华为计算团队的共同努力。

OpenCV社区CI的昇腾支持
OpenCV社区CI的昇腾支持

OpenCV简介

OpenCV是一个开源的跨平台计算机视觉库,提供了图像处理,机器学习,视频分析等功能,支持多种开发语言,多平台和多类型后端。 其中图像处理模块提供了矩阵的算术和逻辑运算,图像处理,视频编解码等,被广泛运用于计算机视觉领域的数据预处理和后处理。

除了图像处理模块,OpenCV DNN模块已经在2022年12月支持了昇腾作为推理后端,支持读取包括ONNX,TensorFLow,Caffeine,Darknet在内的多种主流格式的深度网络模型,并支持在多平台,多设备中运行,目前已经达到了主流神经网络推理框架的性能。在昇腾算力的加持下,推理性能进一步提升,3毫秒即可完成ResNet50推理

OpenCV架构示意图
OpenCV架构示意图

CANN简介

CANN是华为推出的异构计算架构,是上层应用调用底层昇腾处理器的桥梁,在本次OpenCV昇腾支持中是至关重要的一层。同时,昇腾+CANN支持昇思MindSpore,OpenCV DNN、PyTorch、TensorFlow、飞桨、ONNX等多种深度学习框架,以极致性能、极简开发、开放生态为目标,助力昇腾构建全场景人工智能平台。

CANN AI异构计算框架
CANN AI异构计算框架

结语

未来OpenCV会进一步完善能使用昇腾后端的接口,并持续优化接口性能,进一步发挥昇腾算力性能,提升用户体验。

目前,用户需要自行编译OpenCV最新代码才能体验到接口的昇腾支持,编译方法可以参考OpenCV wiki中提供的详细指导,由于cannops模块在OpenCV扩展包中,编译还需要下载扩展包源码以及配置扩展包的保存路径:

1
2
3
4
$ git clone https://github.com/opencv/opencv_contrib.git
$ cd <opencv_build_directory>
$ cmake -DOPENCV_EXTRA_MODULES_PATH=<opencv_contrib>/modules <opencv_source_directory>
$ make -j5

如果大家在使用上遇到任何问题,欢迎反馈到OpenCV扩展包GitHub仓库的Issues页面,我们会及时为大家解决问题。

0%