defwrite(self, value: Any, timeout: Optional[float] = None): # TODO: better organize the imports from ray.experimental.channel import ChannelContext
if self._inner_channel isnotNone: self._inner_channel.write(value, timeout) # 如果有内部channel,直接写到内部channel中。 return
# Otherwise no need to check timeout as the operation is non-blocking.
# Because both the reader and writer are in the same worker process, # we can directly store the data in the context instead of storing # it in the channel object. This removes the serialization overhead of `value`. ctx = ChannelContext.get_current().serialization_context ctx.set_data(self._channel_id, value, self._num_reads)
defread(self, timeout: Optional[float] = None) -> Any: # TODO: better organize the imports from ray.experimental.channel import ChannelContext
ctx = ChannelContext.get_current().serialization_context if ctx.has_data(self._channel_id): # No need to check timeout as the operation is non-blocking. return ctx.get_data(self._channel_id) # 如果缓存里有数据,从缓存读。
assert ( self._inner_channel isnotNone ), "Cannot read from the serialization context while inner channel is None." value = self._inner_channel.read(timeout) # 如果缓存里没有,就从内部channel中读,然后写到缓存供后续reader读取。 ctx.set_data(self._channel_id, value, self._num_reads) return ctx.get_data(self._channel_id)
defwrite(self, value: Any, timeout: Optional[float] = None): # No need to check timeout as the operation is non-blocking.
# Because both the reader and writer are in the same worker process, # we can directly store the data in the context instead of storing # it in the channel object. This removes the serialization overhead of `value`. ctx = ChannelContext.get_current().serialization_context ctx.set_data(self._channel_id, value, self._num_readers)
defread(self, timeout: Optional[float] = None, deserialize: bool = True) -> Any: assert deserialize, "Data passed from the actor to itself is never serialized" # No need to check timeout as the operation is non-blocking. ctx = ChannelContext.get_current().serialization_context return ctx.get_data(self._channel_id)
try: object_ref = worker.put_object( value, owner_address=None, _is_experimental_channel=True ) except ray.exceptions.ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " "store was full of pinned objects." ) raise return object_ref
# Find 1 reader in a remote node to create a reference that's # shared by all readers. When a new value is written to a reference, # it is sent to this reference. reader = readers[0] fn = reader.__ray_call__ self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo( reader_ref=ray.get( fn.remote(_create_channel_ref, buffer_size_bytes) ), ref_owner_actor_id=reader._actor_id, num_reader_actors=len(readers), )
ifnotisinstance(value, SerializedObject): try: serialized_value = self._worker.get_serialization_context().serialize( value ) except TypeError as e: sio = io.StringIO() ray.util.inspect_serializability(value, print_file=sio) msg = ( "Could not serialize the put value " f"{repr(value)}:\n" f"{sio.getvalue()}" ) raise TypeError(msg) from e else: serialized_value = value
defwrite(self, value: Any, timeout: Optional[float] = None) -> None: """Write a value to a channel. If the next buffer is available, it returns immediately. If the next buffer is not read by downstream consumers, it blocks until a buffer is available to write. If a buffer is not available within timeout, it raises RayChannelTimeoutError. """ # A single channel is not supposed to read and write at the same time. assert self._next_read_index == 0 self._buffers[self._next_write_index].write(value, timeout) self._next_write_index += 1 self._next_write_index %= self._num_shm_buffers
defread(self, timeout: Optional[float] = None) -> Any: """Read a value from a channel. If the next buffer is available, it returns immediately. If the next buffer is not written by an upstream producer, it blocks until a buffer is available to read. If a buffer is not available within timeout, it raises RayChannelTimeoutError. """ # A single channel is not supposed to read and write at the same time. assert self._next_write_index == 0 output = self._buffers[self._next_read_index].read(timeout) self._next_read_index += 1 self._next_read_index %= self._num_shm_buffers return output
( remote_reader_and_node_list, local_reader_and_node_list, ) = utils.split_readers_by_locality(self._writer, self._reader_and_node_list) # There are some local readers which are the same worker process as the writer. # Create a local channel for the writer and the local readers. num_local_readers = len(local_reader_and_node_list) if num_local_readers > 0: # Use num_readers = 1 when creating the local channel, # because we have channel cache to support reading # from the same channel multiple times. local_channel = IntraProcessChannel(num_readers=1) self._channels.add(local_channel) actor_id = self._get_actor_id(self._writer) self._channel_dict[actor_id] = local_channel # There are some remote readers which are not the same Ray actor as the writer. # Create a shared memory channel for the writer and the remote readers. iflen(remote_reader_and_node_list) != 0: remote_channel = BufferedSharedMemoryChannel( self._writer, remote_reader_and_node_list, num_shm_buffers ) self._channels.add(remote_channel)
for reader, _ in remote_reader_and_node_list: actor_id = self._get_actor_id(reader) self._channel_dict[actor_id] = remote_channel
if self._meta_channel isNoneand self._writer_registered: # We are the writer. Therefore, we also need to allocate a metadata # channel that will be used to send the shape and dtype of the # tensor to the receiver(s). metadata_type = SharedMemoryType() self._meta_channel = metadata_type.create_channel( self._writer, self._reader_and_node_list, None, )
for tensor in tensors: assertisinstance( tensor, torch.Tensor ), f"{tensor} must be instance of torch.Tensor"
metadata = self._get_send_tensors_metadata(tensors) # 先发送元数据 if metadata isnotNone: self._meta_channel.write(metadata)
for tensor in tensors: # TODO: If there are multiple readers, can replace with a # broadcast. for rank in self._reader_ranks: self._nccl_group.send(tensor, rank) # 发送tensor数据
bufs: List["torch.Tensor"] = [] for meta in meta_list: buf = self._nccl_group.recv( meta.shape, meta.dtype, self._writer_rank, _torch_zeros_allocator # 在接收每个tensor ) bufs.append(buf) # TODO: Sync CUDA stream after receiving all tensors, instead of after # each tensor. return bufs
try: # Serialize the data. All tensors that match our current device # will be extracted into the serialization context and replaced # with a placeholder. cpu_data = self._worker.get_serialization_context().serialize(value) except TypeError as e: sio = io.StringIO() ray.util.inspect_serializability(value, print_file=sio) msg = ( "Could not serialize the put value " f"{repr(value)}:\n" f"{sio.getvalue()}" ) raise TypeError(msg) from e finally: # Pop the tensors that were found during serialization of `value`. gpu_tensors, _ = self.serialization_ctx.reset_out_of_band_tensors([]) # Reset the serialization method to now serialize torch.Tensors # normally. self.serialization_ctx.set_use_external_transport(False)
# First send the extracted tensors through a GPU-specific channel. self._gpu_data_channel.write(gpu_tensors) # Next send the non-tensor data through a CPU-specific channel. The # data contains placeholders for the extracted tensors. self._cpu_data_channel.write(cpu_data)
# Next, read and deserialize the non-tensor data. The registered custom # deserializer will replace the found tensor placeholders with # `tensors`. data = self._cpu_data_channel.read( timeout=timeout, ) # Check that all placeholders had a corresponding tensor. ( _, deserialized_tensor_placeholders, ) = self.serialization_ctx.reset_out_of_band_tensors([]) assert deserialized_tensor_placeholders == set(range(len(tensors)))