Ray Accelerate DAG 源码解析

节点间通信

Communucator是不同actor之间数据传输的工具,目前支持CPU和GPU两种Communucator。

PlantUML diagram

Communicator主要记录了分布式计算的相关信息,例如,world_sizerank等,并实现了all_reduce接口。CPU通过共享内存进行通信(跨节点怎么通信?),而GPU使用nccl进行卡间带外通信。

Communucator核心功能提供了sendrecvallreduce等通信操作。CPU通信中由于使用的是共享内存,所以不需要实现sendrecvall_reduce使用了CPUCommBarrier来等待所有actor完成计算并执行op,这里使用了barrier远程对象,该对象会被多个actor调用传递数据,并且计算完成后,每个actor都能拿到规约后的数据。GPU使用nccl库完成上述操作。

数据传输通道

Channel是多个actor之间的数据传输通道,每个channel有一个writer,以及多个reader。

PlantUML diagram

write表示将数据写入到channel中,read表示从channel中读取数据,这是同步操作的,如果写入时还有数据未读,或者读取时没有数据可读,这两个方法会block等待,可以通过timeout控制block时间。

CachedChannel

CachedChannel表示一次写入需要被读取多次的Channel,它可以接受一个内部的ChannelInterface,它作为一个warpper来缓存数据供reader多次读取。如果没有内部ChannelInterface,它会将write的数据写入到ChannelContext(单例)中,然后供reader多次读取。

ChannelContext的serialization_context使用了一个字典存储写入的数据,当读取次数达到设定后,会将数据从字典中清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def write(self, value: Any, timeout: Optional[float] = None):
# TODO: better organize the imports
from ray.experimental.channel import ChannelContext

if self._inner_channel is not None:
self._inner_channel.write(value, timeout) # 如果有内部channel,直接写到内部channel中。
return

# Otherwise no need to check timeout as the operation is non-blocking.

# Because both the reader and writer are in the same worker process,
# we can directly store the data in the context instead of storing
# it in the channel object. This removes the serialization overhead of `value`.
ctx = ChannelContext.get_current().serialization_context
ctx.set_data(self._channel_id, value, self._num_reads)

def read(self, timeout: Optional[float] = None) -> Any:
# TODO: better organize the imports
from ray.experimental.channel import ChannelContext

ctx = ChannelContext.get_current().serialization_context
if ctx.has_data(self._channel_id):
# No need to check timeout as the operation is non-blocking.
return ctx.get_data(self._channel_id) # 如果缓存里有数据,从缓存读。

assert (
self._inner_channel is not None
), "Cannot read from the serialization context while inner channel is None."
value = self._inner_channel.read(timeout) # 如果缓存里没有,就从内部channel中读,然后写到缓存供后续reader读取。
ctx.set_data(self._channel_id, value, self._num_reads)
return ctx.get_data(self._channel_id)

IntraProcessChannel

如果两个task在同一个worker进程中执行(一个worker可以同时执行多个task么?顺序执行么?),那么直接使用serialization_context来进行数据的传递(不需要加锁么?不是多线程,先后执行的么?)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def write(self, value: Any, timeout: Optional[float] = None):
# No need to check timeout as the operation is non-blocking.

# Because both the reader and writer are in the same worker process,
# we can directly store the data in the context instead of storing
# it in the channel object. This removes the serialization overhead of `value`.
ctx = ChannelContext.get_current().serialization_context
ctx.set_data(self._channel_id, value, self._num_readers)

def read(self, timeout: Optional[float] = None, deserialize: bool = True) -> Any:
assert deserialize, "Data passed from the actor to itself is never serialized"
# No need to check timeout as the operation is non-blocking.
ctx = ChannelContext.get_current().serialization_context
return ctx.get_data(self._channel_id)

Channel

Channel可以提供节点内或者节点之间的数据传输。Channel是对ray.ObjectRef的一个封装,使用plasma共享存储进行数据的传输。

  • 在channel创建时,write角色的actor会使用global worker执行put_object写入一个全0的buffer;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
worker = ray._private.worker.global_worker
worker.check_connected()

value = b"0" * buffer_size_bytes

try:
object_ref = worker.put_object(
value, owner_address=None, _is_experimental_channel=True
)
except ray.exceptions.ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects."
)
raise
return object_ref
  • 然后对所有reader中与writer不是同一节点的节点上创建一个对象的引用,每个节点只需要创建一个即可,这样当前节点上的所有reader都可以获取到这个数据(为什么不直接把writer_ref传过去,而是每个reader node上创建一个引用,并且这个引用需要拷贝?)。
1
2
3
4
5
6
7
8
9
10
11
12
# Find 1 reader in a remote node to create a reference that's
# shared by all readers. When a new value is written to a reference,
# it is sent to this reference.
reader = readers[0]
fn = reader.__ray_call__
self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo(
reader_ref=ray.get(
fn.remote(_create_channel_ref, buffer_size_bytes)
),
ref_owner_actor_id=reader._actor_id,
num_reader_actors=len(readers),
)
  • 向channel写数据时,先把数据序列化,然后直接写入channel,这部分调用的是core_woker中的c++代码。写入的数据还有可能是resize_buffer的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if not isinstance(value, SerializedObject):
try:
serialized_value = self._worker.get_serialization_context().serialize(
value
)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
msg = (
"Could not serialize the put value "
f"{repr(value)}:\n"
f"{sio.getvalue()}"
)
raise TypeError(msg) from e
else:
serialized_value = value

start_time = time.monotonic()
self._resize_channel_if_needed(serialized_value, timeout_ms)
if timeout is not None:
timeout_ms -= int((time.monotonic() - start_time) * 1000)
timeout_ms = max(timeout_ms, 0)

self._worker.core_worker.experimental_channel_put_serialized(
serialized_value,
self._writer_ref,
self._num_local_readers,
timeout_ms,
)
  • 读取信息时,如果是resize_buffer消息,则会重新注册reader,然后再读取信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ret = self._worker.get_objects(
[self._local_reader_ref], timeout=timeout, return_exceptions=True
)[0][0]

if isinstance(ret, _ResizeChannel):
self._node_id_to_reader_ref_info = ret._node_id_to_reader_ref_info
self._local_reader_ref = self._get_local_reader_ref(
self._node_id_to_reader_ref_info
)
# We need to register the new reader_ref.
self._reader_registered = False
self.ensure_registered_as_reader()
if timeout is not None:
timeout -= time.monotonic() - start_time
timeout = max(timeout, 0)
ret = self._worker.get_objects(
[self._local_reader_ref], timeout=timeout, return_exceptions=True
)[0][0]

BufferedSharedMemoryChannel

这是对上面Channel的一个封装,BufferedSharedMemoryChannel允许创建多个buffer,循环写入和读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def write(self, value: Any, timeout: Optional[float] = None) -> None:
"""Write a value to a channel.

If the next buffer is available, it returns immediately. If the next
buffer is not read by downstream consumers, it blocks until a buffer is
available to write. If a buffer is not available within timeout, it raises
RayChannelTimeoutError.
"""
# A single channel is not supposed to read and write at the same time.
assert self._next_read_index == 0
self._buffers[self._next_write_index].write(value, timeout)
self._next_write_index += 1
self._next_write_index %= self._num_shm_buffers

def read(self, timeout: Optional[float] = None) -> Any:
"""Read a value from a channel.

If the next buffer is available, it returns immediately. If the next
buffer is not written by an upstream producer, it blocks until a buffer is
available to read. If a buffer is not available within timeout, it raises
RayChannelTimeoutError.
"""
# A single channel is not supposed to read and write at the same time.
assert self._next_write_index == 0
output = self._buffers[self._next_read_index].read(timeout)
self._next_read_index += 1
self._next_read_index %= self._num_shm_buffers
return output

CompositeChannel

CompositeChannel允许一个writer将数据写入多个channel,当reader和writer是同一个actor时,创建一个IntraProcessChannel,如果不是,则创建一个BufferedSharedMemoryChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(
remote_reader_and_node_list,
local_reader_and_node_list,
) = utils.split_readers_by_locality(self._writer, self._reader_and_node_list)
# There are some local readers which are the same worker process as the writer.
# Create a local channel for the writer and the local readers.
num_local_readers = len(local_reader_and_node_list)
if num_local_readers > 0:
# Use num_readers = 1 when creating the local channel,
# because we have channel cache to support reading
# from the same channel multiple times.
local_channel = IntraProcessChannel(num_readers=1)
self._channels.add(local_channel)
actor_id = self._get_actor_id(self._writer)
self._channel_dict[actor_id] = local_channel
# There are some remote readers which are not the same Ray actor as the writer.
# Create a shared memory channel for the writer and the remote readers.
if len(remote_reader_and_node_list) != 0:
remote_channel = BufferedSharedMemoryChannel(
self._writer, remote_reader_and_node_list, num_shm_buffers
)
self._channels.add(remote_channel)

for reader, _ in remote_reader_and_node_list:
actor_id = self._get_actor_id(reader)
self._channel_dict[actor_id] = remote_channel

写入数据时,循环将数据写入所有的channel,读取时,仅需要读取当前reader actorid对应的channel。

(一个channel对象好像被write和reader一起持有?)

1
2
3
4
5
6
7
8
def write(self, value: Any, timeout: Optional[float] = None) -> None:
self.ensure_registered_as_writer()
for channel in self._channels:
channel.write(value, timeout)

def read(self, timeout: Optional[float] = None) -> Any:
self.ensure_registered_as_reader()
return self._channel_dict[self._resolve_actor_id()].read(timeout)

_TorchTensorNcclChannel

用户传输torch.tensor类型的数据,不能包含其他CPU传输的数据。使用该Channel之前,首先需要调用_init_communicator初始化nccl_group,然后根据writer和reader设置nccl_group的writer和reader对应的rank。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
self._writer_rank = self._nccl_group.get_rank(self._writer)
self._reader_ranks = [
self._nccl_group.get_rank(reader)
for reader, _ in self._reader_and_node_list
]

if (
self._writer_rank is not None
and self._writer_rank == self._nccl_group.get_self_rank()
):
self._writer_registered = True

if (
self._reader_ranks
and self._nccl_group.get_self_rank() in self._reader_ranks
):
self._reader_registered = True

如果没有指定元数据的Channel(shape和dtype),那么还会初始化一个meta_channel来通过CPU传输元数据。

1
2
3
4
5
6
7
8
9
10
if self._meta_channel is None and self._writer_registered:
# We are the writer. Therefore, we also need to allocate a metadata
# channel that will be used to send the shape and dtype of the
# tensor to the receiver(s).
metadata_type = SharedMemoryType()
self._meta_channel = metadata_type.create_channel(
self._writer,
self._reader_and_node_list,
None,
)

在发送数据时,需要先处理元数据,遍历所有tensor,获取他们的shape和dtype,写入到metadata数组中。如果设置了static_tensor,还会做元数据校验,如果出现了元数据不匹配会报错。

接受数据时,会先读取meta_channel,先获取到元数据信息,然后读取tensor数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def write(
self,
tensors: List["torch.Tensor"],
timeout: Optional[float] = None,
):

import torch

for tensor in tensors:
assert isinstance(
tensor, torch.Tensor
), f"{tensor} must be instance of torch.Tensor"

metadata = self._get_send_tensors_metadata(tensors) # 先发送元数据
if metadata is not None:
self._meta_channel.write(metadata)

for tensor in tensors:
# TODO: If there are multiple readers, can replace with a
# broadcast.
for rank in self._reader_ranks:
self._nccl_group.send(tensor, rank) # 发送tensor数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read(
self,
timeout: Optional[float] = None,
) -> Union["torch.Tensor", List["torch.Tensor"]]:
meta_list: List[_TorchTensorMetadata] = self._get_recv_tensors_metadata(timeout) # 先接受所有元数据

bufs: List["torch.Tensor"] = []
for meta in meta_list:
buf = self._nccl_group.recv(
meta.shape, meta.dtype, self._writer_rank, _torch_zeros_allocator # 在接收每个tensor
)
bufs.append(buf)
# TODO: Sync CUDA stream after receiving all tensors, instead of after
# each tensor.
return bufs

TorchTensorNcclChannel

该Channel可以发送包含torch.tensor的混合数据,其包含cpu_data_channel,gpu_data_channel:(_TorchTensorNcclChannel),发送前会先对value进行序列化,序列化会将torch.tensor对象记录下来,在value中记录一个pleaceholder,然后将其他数据完成序列化。Tensor数据不会进行序列化,直接通过nccl发送。如果typ标记为_direct_return,那么说明发送的数据仅有torch.tensor,可以不调用value的序列化以提高执行效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
try:
# Serialize the data. All tensors that match our current device
# will be extracted into the serialization context and replaced
# with a placeholder.
cpu_data = self._worker.get_serialization_context().serialize(value)
except TypeError as e:
sio = io.StringIO()
ray.util.inspect_serializability(value, print_file=sio)
msg = (
"Could not serialize the put value "
f"{repr(value)}:\n"
f"{sio.getvalue()}"
)
raise TypeError(msg) from e
finally:
# Pop the tensors that were found during serialization of `value`.
gpu_tensors, _ = self.serialization_ctx.reset_out_of_band_tensors([])
# Reset the serialization method to now serialize torch.Tensors
# normally.
self.serialization_ctx.set_use_external_transport(False)

# First send the extracted tensors through a GPU-specific channel.
self._gpu_data_channel.write(gpu_tensors)
# Next send the non-tensor data through a CPU-specific channel. The
# data contains placeholders for the extracted tensors.
self._cpu_data_channel.write(cpu_data)

Channel读取时,先读取tensor数据,然后读取cpu数据,这个过程会将placeholder替换回Tensor,然后返回数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 self.serialization_ctx.reset_out_of_band_tensors(tensors)

# Next, read and deserialize the non-tensor data. The registered custom
# deserializer will replace the found tensor placeholders with
# `tensors`.
data = self._cpu_data_channel.read(
timeout=timeout,
)
# Check that all placeholders had a corresponding tensor.
(
_,
deserialized_tensor_placeholders,
) = self.serialization_ctx.reset_out_of_band_tensors([])
assert deserialized_tensor_placeholders == set(range(len(tensors)))

return data

ChannelOutputType

Channel的创建,一般是根据DAGNode的输出类型来创建,此时一般创建一个ChannelOutputType的结构,然后调用其create_channel方法来创建channel。

该类型表示当前Channel的输出类型,目前有两个子类SharedMemoryTypeTorchTensorType。其中SharedMemoryType比较简单,仅提供一个create_channel方法,该方法会创建一个CompositeChannel类型的channel。

TorchTensorType还需要注册自定义序列化和反序列化函数,如果使用nccl通信,torch.tensor类型的对象会直接通过nccl发送,并且在原数据的序列化中记录一个placeholder。如果没有nccl,那么会转成numpy对象,然后进行序列化通过CPU发送,这将带来额外的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def register_custom_serializer(self) -> None:
super().register_custom_serializer()

import torch

def serialize(t):
ctx = ChannelContext.get_current()
return ctx.serialization_context.serialize_tensor(t)

def deserialize(b):
ctx = ChannelContext.get_current()
return ctx.serialization_context.deserialize_tensor(b)

ray.util.serialization.register_serializer(
torch.Tensor,
serializer=serialize,
deserializer=deserialize,
)

Reader/WriterInterface

Reader/WriterInterface是从actor的角度整合相关Channel,也就是某个actor可能会读取多个输入,并且将输出写入到多个channel中。这两个Interface分别有两个子类,同步和异步的Interface。同步Interface较简单,从channel list中读取所有的数据即可。异步的Interface会使用异步(asyncio)来等待数据。

生成执行计划

DAGOperationGraph是更细粒度的任务图,构建图能够理清任务之间的依赖关系,便于生成每个actor的schedule。

DAGOperationGraphNode

DAGNodeOperationNode一共有三种类型,分别是READ,COMPUTEWRITE。DAG计算图经过编译后,会生成DAGOperationGraph,并根据这个图,生成每个actor执行的schedule,编译图执行时,会循环执行schedule中的任务,直到channel关闭。

DAGOperationGraphNode主要实现的是一个优先级比较函数,用于任务优先级队列的排序。(exec_task_id和task_id的区别?这里的compare函数是不是完备的)

其内部实现了compare函数,比较逻辑是:如果exec_task_id不同,则比较operation的exec_task_id,否则比较task_id

  1. 如果任务节点来自同一个actor,使用compare比较;
  2. 如果参与比较的两个任务一个是需要nccl参与,另外一个不需要,那么不需要参与的优先级高;
  3. 如果都依赖nccl,那么也用compare比较。

_add_edge && _build_dag_node_operation_graph

该函数建立所有GraphNode之间的依赖关系,为拓扑排序做准备。在_build_dag_node_operation_graph建图过程中,会使用该函数添加所有节点之间的边。每条边要记录两端节点的in_edgesout_edges

每个DAG计算图中的节点会在编译时创建三个DAGOperationGraphNode,分别是READ,COMPUTEWRITE,遍历所有的Node,按一下规则进行连接:

  1. 相同task中,从READ节点连接到COMPUTE节点,然后从COMPUTE节点连接到WRITE节点。
  2. 同一个actor中的不同bind_index的COMPUTE节点,按bind的顺序添加边。(看起来是为了保证actor执行顺序的)。
  3. 遍历所有的task的下游节点,task的WRITE连接到下游的READ节点。

注意,如果是class_method_output类型的ClassMethodNode,需要连接其上游节点和其下游节点,因为class_method_output类型的Node,仅用于保存数据,不涉及节点依赖关系。

_generate_actor_to_execution_schedule

根据OperationGraph构建actor schedule。

  1. 先找出入度为0的节点,这些节点可以直接执行;
  2. 然后找出这些节点的后续节点;
  3. 在检查next_node的后续节点,如果入度已经为0(或者所有read_connective_Idxs全部ready),则可以执行。

直到所有的节点都访问过。这样就获得了每个actor的node的执行顺序。如果node是一个计算节点,并且需要使用nccl,说明其后是一个collective_node,当前节点is_ready后,将当前节点信息记录到collective_node的read_connective_Idxs中,方便该节点检查是否ready。

_generate_overlapped_execution_schedule

该方法对已经构建的每个actor的schedule进行顺序调整,让其支持计算和通信并行。目前支持nccl read操作。方法是,根据这个READ操作向前找,直到找到一个COMPUTE操作,然后将该READ操作插入到COMPUTE操作之前,由于异步读取的缘故,读取过程和COMPUTE过程可以并行执行。

编译计算图

简单的理解,计算图的编译就是将DAG涉及的节点之间的数据传递关系创建好通道(channel),然后将所有的Task运行起来,每个Task分为三个操作,分别是读取,计算和发送,循环执行。

这样避免了临时的资源创建的调度开销,并且,还支持NCCL这种带外通信,避免了Host拷贝和传输的开销。

ExecutableTask

ExecutableTask近似于代表一个DAGNode,但是多了一些元数据信息。例如输出输出的Channel等,Task会将所有的输入封装成ReaderInterface,所有的输出封装成WriterInterface

该Task对外提供exec_operation方法,此方法根据传入的op_type来决定执行读,计算还是发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def exec_operation(
self,
class_handle,
op_type: _DAGNodeOperationType,
overlap_gpu_communication: bool = False,
) -> bool:
if op_type == _DAGNodeOperationType.READ:
with _device_context_manager():
with self._recv_stream:
return self._read(overlap_gpu_communication)
elif op_type == _DAGNodeOperationType.COMPUTE:
return self._compute(overlap_gpu_communication, class_handle)
elif op_type == _DAGNodeOperationType.WRITE:
with _device_context_manager():
with self._send_stream:
return self._write()

其中为了支持gpu的计算通信并行,还涉及到AwaitableBackgroundReaderAwaitableBackgroundWriter,以及GPUFuture。也就是说Read和Compute操作可以返回(或操作)一个future对象。

CompiledDAG

编译流程比较长,重点的两个函数是preprocessget_or_compile

preprocess

  1. 找到InputNode和OutputNode,检查所有的Task是不是都是Actor,task暂时不支持;
  2. 配置带外通信相关信息,并且收集collective_ops;
  3. 处理上游节点,包括配置type_hint,记录上游actor等;
  4. 根据带外通信信息,初始化communicator;

get_or_compile

  1. 从input_task广度优先遍历DAG,创建所有的Channel;
  2. 做死锁检测,目前尚未实现逻辑;
  3. 针对每一个task,构建所有arg和其消费者的map;
  4. 如果有多个消费者,创建Cached Channel供多次读取;
  5. 针对每个Task创建Executable_task;
  6. 创建每个task的执行计划build_execution_schedule:
    1. 创建DAGOperationGraph节点,每个Task对应读,计算,写;
    2. 根据依赖关系,创建DAGOperationGraph;
    3. 根据图的拓扑排序,生成每个actor的执行计划;
    4. 如果配置了gpu并行计算(overlapped_execution),将actor的执行计划做调整;
  7. 然后将执行计划交给exec_task_func(do_exec_tasks)执行,这个函数是一个死循环,会循环执行执行计划中的Operation;
  8. 开启Monitor线程,定期检测所有Task的运行情况,如果有Exception,终止执行。

其他关注的点

  1. 全篇Accelerators的通信全部交xxx_nccl,但是只有Nvidia的ccl库才叫nccl,应该修改来避免歧义;同理也有gpu_xxx,应该改为device_xxx等;
  2. TorchTensorType的transform允许传入一个Comminicator对象,但是会校验名称是否在AUTO,NCCL或者CPU中,这里存在bug,应当加入传入的Communicator的名称。同时,AUTO其实没有作用,默认使用CPU通信,这个AUTO实际上可以通过自动检测硬件的方式来初始化对应的Communicator。
  3. 自定义Communicator目前只实现了CPUCommunicator,为了方便调试,需要补充文档和样例,来指导使用自定义Communicator。