Ray 是一个强大的分布式计算框架,它让 Python 开发者能够轻松地将单机代码扩展到分布式环境。

核心概念

  • Task(任务):通过 @ray.remote 装饰器,可以将普通 Python 函数转换为分布式任务,实现无状态的并行计算
  • Actor(角色):为分布式环境提供有状态计算的抽象,支持面向对象的并行编程模型
  • Object Store(对象存储):Ray 的分布式共享内存系统,实现高效的跨节点数据共享和传输

本教程分为三个部分内容:

  1. 通过示例讲解 Ray 的这三个核心概念
  2. 介绍 Ray 的常用操作
  3. 分享个人使用 Actor 过程中,遇到的问题或踩坑点,以及最佳实践

实战示例:Ray 核心功能

Task:无状态分布式任务

Ray 最基本的功能是将普通 Python 函数转换为分布式任务。我们通过一个加法示例,演示 Task 的基本用法:

import ray

# 初始化 Ray 集群,设置 2 个 CPU 核心
ray.init(num_cpus=2)

# 使用 @ray.remote 装饰器定义分布式任务
@ray.remote
def add(x, y):
    return x + y

# 异步提交任务,立即返回 future 对象(对象引用)
future1 = add.remote(1, 2)
future2 = add.remote(3, 4)

# 等待任务完成并获取结果
results = ray.get([future1, future2])
print(results)  # 输出: [3, 7]

# 关闭 Ray
ray.shutdown()

要点解析

  1. @ray.remote:将普通函数转换为可分布式执行的 Ray Task
  2. .remote():异步提交任务,返回对象引用(future)而非实际结果
  3. ray.get():等待任务完成并获取实际计算结果

Actor:有状态的分布式对象

与无状态的 Task 不同,Ray Actor 提供了一种面向对象的分布式编程模型,能够维护状态并封装方法。它特别适合需要保持状态的场景,如参数服务器、计数器等。

下面通过一个计数器示例来展示 Actor 的基本用法:

import ray
ray.init()

# 使用 @ray.remote 将类转换为分布式 Actor
@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
        return self.value
    
    def get_value(self):
        return self.value

# 创建 Actor 实例(在远程工作进程中)
counter = Counter.remote()

# 异步调用 Actor 方法
future1 = counter.increment.remote()  # 第一次增加
future2 = counter.increment.remote()  # 第二次增加
future3 = counter.get_value.remote()  # 获取当前值

# 获取执行结果
print(ray.get([future1, future2]))  # 输出: [1, 2]
print(ray.get(future3))  # 输出: 2

ray.shutdown()

Actor 的特性

  1. 状态持久化:Actor 实例在其生命周期内可以维持状态
  2. 串行执行:同一个 Actor 的方法调用按顺序执行,保证状态一致性
  3. 并发调用:不同 Actor 实例之间可以并行执行
  4. 远程通信:通过 .remote() 进行异步方法调用

与 Task 不同,Actor 更适合需要维护状态的长期运行的计算任务,如模型训练、参数更新等场景。

Ray 常用操作指南

本节介绍 Ray 中最常用的操作,这些是构建分布式应用的基础。

数据操作:ray.put()ray.get()

Ray 提供了高效的数据共享机制,通过对象存储实现跨节点数据访问:

import ray
ray.init()

# ray.put(): 将对象存入共享内存
data = [1, 2, 3, 4, 5]
data_ref = ray.put(data)  # 返回对象引用

# ray.get(): 从共享内存获取对象
result = ray.get(data_ref)  # [1, 2, 3, 4, 5]

任务管理:ray.wait()

用于监控和等待异步任务的完成状态:

import time

@ray.remote
def slow_function(i):
    time.sleep(i)
    return i

# 提交多个异步任务
refs = [slow_function.remote(i) for i in range(4)]

# 等待部分任务完成
ready_refs, remaining_refs = ray.wait(refs, num_returns=2)  
print(f"完成: {len(ready_refs)}, 等待中: {len(remaining_refs)}")  # 完成: 2, 等待中: 2

# 等待所有任务完成
ready_refs, remaining_refs = ray.wait(refs, num_returns=len(refs))
print(f"完成: {len(ready_refs)}, 等待中: {len(remaining_refs)}")  # 完成: 4, 等待中: 0
print("结果:", ray.get(ready_refs))  # 结果: [0, 1, 2, 3]

Actor 管理:ray.kill()

用于强制终止 Actor 实例:

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    def increment(self):
        self.value += 1
        return self.value

# 创建并终止 Actor
counter = Counter.remote()
ray.kill(counter)  # 立即终止 Actor

# 后续调用将抛出异常
try:
    result = ray.get(counter.increment.remote())
except ray.exceptions.RayActorError as e:
    print("Actor 已终止:", e)

资源管理

Ray 支持灵活的计算资源分配:

# 静态资源配置
@ray.remote(num_cpus=2)
def cpu_task():
    return 1

@ray.remote(num_gpus=1)
def gpu_task():
    return 1

# 动态资源配置
@ray.remote
def task():
    return 1

future = task.options(num_cpus=2, num_gpus=1).remote()

错误处理与可靠性

提供全面的异常处理和重试机制:

# 异常处理
@ray.remote
def might_fail(x):
    if x < 0:
        raise ValueError("不允许负值")
    return x

try:
    result = ray.get(might_fail.remote(-1))
except ray.exceptions.RayTaskError as e:
    print("任务执行失败:", e)

# 自动重试机制
@ray.remote(max_retries=3)
def unstable_function():
    # 可能失败的操作
    pass

注意事项

  • 使用 ray.put()时要考虑数据大小,避免内存压力
  • ray.wait() 适合处理不同执行时间的任务集合
  • 资源配置要根据实际硬件情况合理设置
  • 建议为关键任务添加适当的重试机制

Actor:进阶示例

在实际应用中,Actor 常用于处理有状态的并行计算任务。我们通过一些示例逐步解释 Actor 的某些特性。

对象序列化与反序列化

当从 Actor 获取数据时,Ray 会对数据进行序列化传输,这意味着我们获得的是对象的副本而非引用:

import ray

# 定义一个普通类作为 Actor 的属性
class Bar:
    pass

# 定义远程 Actor 类
@ray.remote
class Foo:
    def __init__(self):
        # 每个 Actor 实例都持有一个独立的 Bar 实例
        self.val = Bar()
        
    def get_val(self):
        return self.val
    
    def get_id(self):
        # 返回 Actor 内部对象的内存地址
        return id(self.val)

# 创建多个 Actor 实例
workers = [Foo.remote() for _ in range(4)]

# 并行获取所有 worker 中的 Bar 对象
futures = [w.get_val.remote() for w in workers]
bars = ray.get(futures)

# 获取所有 worker 中 Bar 对象的原始 ID
futures = [w.get_id.remote() for w in workers]
original_ids = ray.get(futures)

# 比较原始 ID 和本地副本的 ID
for original_id, bar in zip(original_ids, bars):
    print(f"Actor 中的对象 ID: {original_id}")
    print(f"本地副本的对象 ID: {id(bar)}")
    print("是否相同:", original_id == id(bar))

关键点:Actor 中的对象(self.val)在 Actor 进程中有其独特的内存地址,通过 get_val() 获取的对象是原始对象的序列化副本。

处理不可序列化对象

在 Ray 中,Actor 间的通信依赖于对象的序列化(pickle)。然而,并非所有 Python 对象都可以序列化,典型的例子包括线程锁、文件句柄、网络连接等。

具体讨论可以参看 Python 文档:what-can-be-pickled-and-unpickled

举个序列化报错的例子:

import ray
import threading

# 包含不可序列化对象(线程锁)的类
class UnserializableClass:
    def __init__(self):
        self.lock = threading.Lock()  # 线程锁不可序列化
        self.value = 0

@ray.remote
class MyActor:
    def __init__(self):
        # 这将导致序列化错误
        self.unserializable = UnserializableClass()
    
    def get_value(self):
        return self.unserializable

# 演示序列化错误
try:
    actor = MyActor.remote()
    result = ray.get(actor.get_value.remote())
except Exception as e:
    print(f"序列化错误: {e}")

解决方案:自定义序列化行为

通过实现 __getstate____setstate__ 方法,我们可以控制对象的序列化行为:

class SerializableClass:
    def __init__(self):
        self.lock = threading.Lock()
        self.value = 0
    
    def __getstate__(self):
        # 仅序列化可序列化的属性
        return {
            'value': self.value
            # 不包含 self.lock
        }
    
    def __setstate__(self, state):
        # 重建对象时重新创建锁
        self.value = state['value']
        self.lock = threading.Lock()

@ray.remote
class ImprovedActor:
    def __init__(self):
        self.obj = SerializableClass()  # 现在可以正常工作
    
    def get_value(self):
        return self.obj

# 测试改进后的版本
actor = ImprovedActor.remote()
result = ray.get(actor.get_value.remote())
print("成功获取序列化对象")

当然,不建议对某些不可序列化对象进行序列化,因为可能会导致不可预测的行为。

Actor 属性访问模式

Ray Actor 采用封装原则,不支持直接访问实例的属性。虽然可以通过 getter/setter 方法访问属性,但前边说了,拿到的都是拷贝的副本,需要考虑性能和设计。

借助 getattrsetattr 访问属性的方案:

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value1 = 0
        self.value2 = 100
    
    # 通用的属性获取方法
    def get_value(self, key):
        return getattr(self, key)
    
    # 通用的属性更新方法
    def update(self, key, value):
        setattr(self, key, value)
        
# 演示属性访问
counter = Counter.remote()

# 获取初始属性值
values = ray.get([
    counter.get_value.remote("value1"),
    counter.get_value.remote("value2")
])
print("初始值:", values)  # 输出: [0, 100]

# 更新属性值
counter.update.remote("value1", 10)
counter.update.remote("value2", 20)

# 获取更新后的值
updated_values = ray.get([
    counter.get_value.remote("value1"),
    counter.get_value.remote("value2")
])
print("更新后:", updated_values)  # 输出: [10, 20]

虽然上述方法可行,但设计上,我们应该将相关的计算逻辑直接封装在 Actor 方法中,减少不必要的属性访问,避免频繁的属性读写。

上下文和作用域管理

Ray 在分布式环境中执行任务时,需要特别注意变量作用域和上下文信息的传递。一个常见的陷阱是模块级变量的修改可能不会反映在远程任务中。

作用域问题示例

假设我们有一个配置文件 constants.py

# constants.py
num = 1

下面演示模块变量修改在远程任务中的行为:

import ray
import constants

# 修改模块变量
constants.num = 2
num = constants.num  # 本地变量赋值

@ray.remote
def get_num():
    # 直接访问模块变量
    return constants.num  # 将重新导入模块

@ray.remote
def get_num_direct():
    # 访问闭包中的变量
    return num  # 使用任务定义时捕获的值

# 测试两种方式
print("从模块获取:", ray.get(get_num.remote()))        # 输出: 1 (原始值)
print("从闭包获取:", ray.get(get_num_direct.remote())) # 输出: 2 (修改后的值)

出现这种现象的原因

  1. Ray 工作进程会重新导入所需的模块
  2. 模块级别的修改不会自动同步到工作进程
  3. 闭包变量会在任务序列化时被捕获

解决方案:避免依赖模块级变量的运行时修改,配置信息优先使用环境变量、配置文件或 Actor 状态来管理,或者在任务初始化时显式传递所需的上下文信息。


以上,除了基础的分布式计算功能,Ray 还提供了一系列强大的工具:

  • Tune 用于自动化超参数调优
  • RLlib 是基于 Tune 开发的分布式强化学习框架
  • Serve 用于模型部署和服务
  • Datasets 处理分布式数据

这些工具让 Ray 成为一个完整的机器学习基础设施解决方案。

后续如果需要,我们再针对这些工具展开详细讨论。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐