解析:分布式应用框架Ray架构源码 -4

Actor 管理

Actor 创建

Actor的创建任务是由GCS服务来进行调度的,如下图

  • 在Python代码中创建一个Actor时,负责创建的worker首先同步注册actor到GCS中,这样可以确保Actor被创建之前worker就failure的场景下,所有的worker使用Actor的reference都可以发现这个failure
  • 一旦actor创建任务的所有输入依赖都被解析完成,creator会将task specification 发送给GCS Service。 GCS Service会通过分布式调度协议(与普通task一样)调度创建actor的task,这是GCS就好像是这个创建task的owner, 由于GCS server会将所有的状态数据持久化到后端存储,一旦task specification被成功的发给了GCS Service, actor会立刻被创建出来。
  • 句柄的原始创建者可以开始在actor句柄上提交任务,甚至在GCS调度参与者之前将其传递给其他任务/参与者。创建actor后,GCS会通过Pub-sub通知任何具有actor句柄的worker。每个句柄都缓存新创建的actor的运行时元数据(例如,RPC地址和它所在的节点),在参与者句柄上提交的任何挂起任务都可以发送到actor执行。

Actor 任务执行

  • 每个actor可以有无限数量的调用者,actor句柄表示单个调用者:它包含它所引用的演员的RPC地址。呼叫工作者连接并将任务提交到此地址。

如上图,创建后,actor任务将转换为对actor进程的直接gRPC调用。一个actor可以处理许多并发呼叫,尽管在这里我们只显示一个。

  • 每个提交的任务都在调用方端分配了一个序列号,接收方使用该序列号来确保每个调用方的任务都按照提交的顺序执行,不管消息在过程中是否重新排序,不保证调用者之间的任务执行顺序。例如,调用者之间的执行顺序可能会根据消息延迟和任务依赖项可用的顺序而有所不同。

Actor Death

  • actor可以是detached,也可以是non-detached,默认actor都是non-detached,可以用于存储临时状态,当actor的父级或作业退出时,应自动收集其物理资源。

  • 对于non-detached 的actor,当actor的所有挂起任务都已完成,并且actor的所有句柄都超出范围(通过引用计数跟踪)时,actor的原始创建者通知GCS服务。然后,GCS服务向actor提交一个特殊的`ray_terate’任务,该任务将让actor优雅地退出其进程。如果GCS检测到创建者已经退出(通过心跳表发布),也会终止actor。然后,在此actor上提交的所有挂起任务和后续任务都将失败,并出现RayActorError。

如上图,Actor终止任务也通过GCS调度。

  • actor也可能在其运行时意外崩溃(例如,从segfault或调用sys.exit)。默认情况下,提交给崩溃的actor的任何任务都将失败,并出现RayActorError,就像actor正常退出一样。

  • Ray还提供了一个选项,可以自动重新启动actor,最多可以指定次数。如果启用此选项,GCS服务将尝试通过重新提交其创建任务来重新启动崩溃的actor。所有具有句柄的客户端都将将任何挂起的任务缓存,直到重新启动actor。如果actor不可重新启动或已达到最大重新启动次数,客户端将使所有挂起的任务失败。

  • 第二个选项可用于在参与者重新启动后启用失败的参与者任务的自动重试。这对于幂等任务和用户不需要自定义处理RayActorError的情况非常有用。

  • 参考代码:

  1. src/ray/core_worker/core_worker.cc
  2. src/ray/common/task/task_spec.h
  3. src/ray/core_worker/transport/direct_actor_transport.cc
  4. src/ray/gcs/gcs_server/gcs_actor_manager.cc
  5. src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
  6. src/ray/protobuf/core_worker.proto

Global Control Store

全局控制存储(GCS)保存关键但访问频率较低的群集元数据,如连接的客户端和节点的地址。在Ray的早期版本(<0.8)中,GCS还保存了小对象元数据,这意味着GCS处于大多数系统操作的关键路径上,例如任务调度。在较新版本的Ray (0.8+)中,对象和元数据大部分已经上移动到worker中,允许GCS在大多数系统操作期间不处于关键路径,这导致了整体性能的提高,并降低了GCS存储要求。

存储

GCS目前在Redis中实现,我们依赖Redis进行发布。但是,正在努力删除redis依赖项,并启用可插拔持久存储(例如MySQL)。

Actor Table

Actor Table中包含了actor及其状态的列表,这个表用于在失败时重新创建演员,并管理演员生存期。

Heartbeat Table

Heartbeat Table 保存连接到Ray的客户端、worker和节点的列表。

  • 每个raylet定期向GCS发送心跳,以指示节点处于活动状态,并报告其调度程序线程当前资源使用情况和负载的信息。GCS定期聚合来自raylet的所有心跳,以减少网络带宽使用,并将聚合广播回所有节点。这用于确定群集成员资格和分布式调度,广播信息也用于自动伸缩。
  • 如果GCS在可配置的心跳间隔数内没有听到来自raylet的消息,GCS将raylet标记为已死,并将此消息广播到所有节点。这就像一个墓碑:如果raylet听说它已被标记为已死,并且它的物理资源只能通过启动新的raylet来重用,该raylet被分配了不同的唯一ID。

Job Table

  • Job Table 保存了群集中运行的作业列表,当作业被终止时,Ray将取消作业创建的正在运行的task和actor,以避免资源泄漏。

Object Table

  • Object Table保存了大型共享内存对象的节点位置,Raylet使用redis pub-sub订阅感兴趣的对象并在对象可用时获得通知,选择要从中下载对象数据的节点。当在共享内存中本地创建或删除对象时,Raylet负责更新此表。
    社区目前正在努力将对象表从GCS移动到所有权表,以提高可扩展性和分布式对象传输性能。

Profile Table

Profiling 事件存储在Profile Table中,是用LRU淘汰机制。

持久化

  • GCS目前不提供持久化,尽管正在努力通过通用SQL数据库启用可插拔存储。Redis设置了最大大小,使用LRU淘汰非关键数据。但是,对于Ray (0.8+)的最新版本,超过此最大大小是非常罕见的,因为大多数对象和任务元数据不再存储在GCS中。
  • 代码参考:
  1. src/ray/gcs/gcs_server/gcs_server.cc
  2. src/ray/protobuf/gcs.proto
  3. src/ray/protobuf/gcs_service.proto
(完)