你是不是也看到过那种视频:一个人打开手机,让ChatGPT通过摄像头识别当前拍摄的物体,让它来高速你这个东西是什么,以及具体用法;甚至是给ChatGPT拍摄一道习题,让它来给你具体的解题思路。我们今天也来做一个类似的可视频识别物体与你进行语音对话的聊天机器人。

具体来讲,在本教程中,你将了解如何使用 OpenAI、LiveKit 和 Deepgram 部署在 DigitalOcean GPU Droplets服务器上,构建一个具有计算机视觉和语音对话功能的实时AI聊天机器人。这个聊天机器人能够与用户进行实时对话,分析从你的摄像头捕捉到的图像,并提供准确及时的响应。

构建实时AI聊天机器人的4项技术

在本教程中,你将利用三项强大的技术来构建你的实时AI聊天机器人,每项技术都服务于特定目的,以增强聊天机器人的能力,同时利用DigitalOcean的GPU Droplets作为低成本高性能的基础设施:

  • OpenAI API如果你关注AI行业,这个你应该不陌生。OpenAI API将根据用户输入生成类似人类的文本响应。通过使用如GPT-4o这样的高级模型,我们的聊天机器人将能够理解上下文,进行流畅的对话,并对用户的提问提供准确的答案。
  • LiveKitLiveKit将促进用户与聊天机器人之间的实时音频和视频通信。它允许我们创建无缝的交互体验,使用户能够与聊天机器人对话并接收语音回应。这对于构建能够自然地吸引用户的语音启用聊天机器人至关重要,使得交互感觉更加个性化和直观。(当然,如果你熟悉其他的音视频API也可以使用,比如声网、WebRTC、腾讯等)
  • DeepgramDeepgram将用于语音识别,将口语转换为文本。这使聊天机器人能够有效地处理用户的语音输入。通过整合Deepgram的功能,你可以确保聊天机器人准确理解用户的命令和查询,从而提高整体交互质量。在需要快速准确响应以维持用户参与度的实时环境中,这一点尤为重要。
  • DigitalOcean GPU Droplet服务器利用DigitalOcean的GPU Droplets对于此类项目特别有利,因为它们提供了必要的计算和GPU基础设施,以支持这些AI模型和实时通信所需的密集处理。首先它的H100 GPU针对运行AI/ML工作负载进行了优化,显著加快了模型推理和视频处理任务的速度。这确保了即使在高负载下,聊天机器人也能快速高效地提供响应,改善用户体验和参与度。同时,它的套餐内还提供大量的出站流量,以及2Gbps- 10Gbps的峰值带宽,可以稳定地支撑实时音视频交互所需的带宽与流量,而且就算流量超出也仅需0.01美元/GB,相对于大多数云平台都要便宜。更重要的是,DigitalOcean的H100 GPU Droplet仍然处于优惠中,相比其他平台都要实惠。

准备工作

开始之前,请确保你已具备以下条件:

  • DigitalOcean云平台账号,可在Digitalocean.com进行注册,该平台支持绑定支付宝或信用卡,新注册用户会有200美元免费使用额度。
  • 已部署并运行的GPU Droplet,具体方法非常简单,只需要点选即可,后面我们会简要介绍,详细步骤可参考我们往期的教程
  • 基础的Python编程知识。
  • 设置好用于GPT-4o模型的OpenAI API密钥。
  • 在你的GPU Droplet上运行的LiveKit服务器,这一点我们在后面会讲。
  • Deepgram API密钥。

步骤1 - 设置GPU Droplet

1、创建新项目:你需要从云后台创建一个新项目(project),并将其绑定到一个GPU Droplet。

2、创建GPU Droplet:登录你的DigitalOcean账户,创建一个新的GPU Droplet,并选择AI/ML Ready作为操作系统。该OS镜像安装了所有必需的NVIDIA GPU驱动程序。

3、添加SSH密钥进行身份验证:需要SSH密钥来认证GPU Droplet,通过添加SSH密钥,你可以从终端登录到GPU Droplet。

4、最终确定并创建GPU Droplet:完成上述所有步骤后,最终确定并创建新的GPU Droplet。

步骤2 - 设置LiveKit账户并在GPU Droplet上安装CLI

首先,你需要创建一个账户或登录到你的LiveKit Cloud账户,并创建一个LiveKit项目。请记下项目设置页面上的LIVEKIT_URL、LIVEKIT_API_KEY和LIVEKIT_API_SECRET环境变量,因为在本教程后面部分会用到它们。

安装LiveKit CLI

以下命令将在你的GPU Droplet上安装LiveKit CLI。

curl -sSL https://get.livekit.io/cli | bash

对于LiveKit Cloud用户,你可以通过CLI对你的Cloud项目进行身份验证以创建API密钥和秘密。这让你可以不用每次都手动提供凭据的情况下使用CLI。

lk cloud auth

然后,按照指示并通过浏览器登录。

你将被要求添加设备并授权访问你在此步骤中创建的LiveKit项目。

步骤3 - 从现有的 LiveKit 模板启动一个代理

该模板提供了一个构建语音助手的基础。模板包括:

  • 基本语音交互
  • 仅音频轨道订阅
  • 语音活动检测(VAD)
  • 语音转文字(STT)
  • 语言模型(LLM)
  • 文字转语音(TTS)

克隆一个简单的 Python 语音代理的起始模板:

lk app create

这将为你提供多个现有的 LiveKit 模板,你可以使用它们来部署应用程序。你会得到这样一个输出:


voice-assistant-frontend                                                                                                                                        
transcription-frontend                                                                                                                                        
token-server                                                                                                                                               
multimodal-agent-python                                                                                                                                                 
multimodal-agent-node                                                                                                  
voice-pipeline-agent-python                                                                                                                       
voice-pipeline-agent-node                                                                                                                                                     
android-voice-assistant                                                                                                                                                         
voice-assistant-swift                                                                                                                                                                
outbound-caller-python

你会使用 voice-pipeline-agent-python 模板。

lk app create --template voice-pipeline-agent-python

根据提示输入你的应用程序名称、OpenAI API密钥和Deepgram API密钥。如果你不使用Deepgram和OpenAI,可以查看其他支持的插件。

输出示例:

Cloning template...
Instantiating environment...
Cleaning up...
To setup and run the agent:

       cd /root/do-voice-vision-bot
       python3 -m venv venv
       source venv/bin/activate
       pip install -r requirements.txt
       python3 agent.py dev

步骤4 - 安装依赖并创建虚拟环境

首先,切换到上一步创建的应用程序目录:

cd <app_name>

列出从模板创建的文件:

ls

输出示例:

LICENSE  README.md  agent.py  requirements.txt

agent.py 是包含AI聊天机器人逻辑和源代码的主要应用程序文件。

现在,你将使用以下命令创建并激活一个Python虚拟环境:

apt install python3.10-venv
python3 -m venv venv

添加以下API密钥到你的环境中:

export LIVEKIT_URL=<>
export LIVEKIT_API_KEY=<>
export LIVEKIT_API_SECRET=<>
export DEEPGRAM_API_KEY=<>
export OPENAI_API_KEY=<>

你可以在 LiveKit 项目设置页面找到 LIVEKIT_URLLIVEKIT_API_KEYLIVEKIT_API_SECRET

激活虚拟环境:

source venv/bin/activate

注意:在Debian/Ubuntu系统上,你需要使用以下命令安装python3-venv包:

apt install python3.10-venv

现在,让我们安装应用程序运行所需的依赖项:

python3 -m pip install -r requirements.txt

步骤5 - 向AI代理添加视觉功能

向代理添加视觉功能,你需要在agent.py文件中添加以下导入语句和函数。

要为你的代理添加视觉功能,你需要修改 agent.py 文件,添加以下导入语句和函数。

首先,让我们先添加这些导入语句,将其与现有的导入语句放在一起。使用文本编辑器(如 vi 或 nano)打开你的 agent.py 文件。

打开agent.py文件使用文本编辑器如vi或nano:

vi agent.py

复制以下导入语句到现有导入语句旁边:

from livekit import rtc
from livekit.agents.llm import ChatMessage, ChatImage

这些新的导入包括:

  • rtc: 访问LiveKit的视频功能。
  • ChatMessageChatImage: 用于将图像发送到LLM的类。

启用视频功能:

在agent.py中找到ctx.connect()行,在入口点函数中将其更改为AutoSubscribe.SUBSCRIBE_ALL

await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)

注意:如果在GPU Droplet上使用vi或nano文本编辑器编辑和修改agent.py文件比较困难,你可以将agent.py文件内容复制到本地系统,并在VSCode等代码编辑器中进行必要的编辑,然后将更新后的代码复制粘贴回去。

刚刚修改的那一行代码将使助手能够接收视频以及音频。

添加视频帧处理:

在导入语句之后在prewarm函数之前,添加这两个辅助函数:

async def get_video_track(room: rtc.Room):
   """Find and return the first available remote video track in the room."""
   for participant_id, participant in room.remote_participants.items():
       for track_id, track_publication in participant.track_publications.items():
           if track_publication.track and isinstance(
               track_publication.track, rtc.RemoteVideoTrack
           ):
               logger.info(
                   f"Found video track {track_publication.track.sid} "
                   f"from participant {participant_id}"
               )
               return track_publication.track
   raise ValueError("No remote video track found in the room")

这个函数会搜索所有参与者以找到可用的视频流。它主要是用于定位要处理的视频源。

接下来,你将添加帧捕获函数。

async def get_latest_image(room: rtc.Room):
   """Capture and return a single frame from the video track."""
   video_stream = None
   try:
       video_track = await get_video_track(room)
       video_stream = rtc.VideoStream(video_track)
       async for event in video_stream:
           logger.debug("Captured latest video frame")
           return event.frame
   except Exception as e:
       logger.error(f"Failed to get latest image: {e}")
       return None
   finally:
       if video_stream:
           await video_stream.aclose()

这些函数的作用是捕获视频流中的单个帧,并确保清理资源。使用aclose()释放系统资源如内存缓冲区和视频解码实例,有助于防止内存泄漏。

添加LLM回调

现在,在entrypoint函数内部添加以下回调函数,该函数将在LLM生成响应之前注入最新的视频帧。在agent.py文件中搜索entrypoint函数:

async def before_llm_cb(assistant: VoicePipelineAgent, chat_ctx: llm.ChatContext):
    """
    Callback that runs right before the LLM generates a response.
    Captures the current video frame and adds it to the conversation context.
    """
    try:
        if not hasattr(assistant, '_room'):
            logger.warning("Room not available in assistant")
            return
        latest_image = await get_latest_image(assistant._room)
        if latest_image:
            image_content = [ChatImage(image=latest_image)]
            chat_ctx.messages.append(ChatMessage(role="user", content=image_content))
            logger.debug("Added latest frame to conversation context")
        else:
            logger.warning("No image captured from video stream")
    except Exception as e:
        logger.error(f"Error in before_llm_cb: {e}")

这个回调函数是高效管理上下文的关键——它仅在助手即将做出回应时添加视觉信息。如果在每条消息中都添加视觉信息,会迅速填满大型语言模型的上下文窗口,这将让程序极其低效,且会增加不必要的成本。

更新系统提示

entrypoint函数中找到initial_ctx,并更新它以包含视觉功能:

initial_ctx = llm.ChatContext().append(
    role="system",
    text=(
        "You are a voice assistant created by LiveKit that can both see and hear. "
        "You should use short and concise responses, avoiding unpronounceable punctuation. "
        "When you see an image in our conversation, naturally incorporate what you see "
        "into your response. Keep visual descriptions brief but informative."
    ),
)

更新助理配置

entrypoint函数中找到VoicePipelineAgent,并添加回调函数:

assistant = VoicePipelineAgent(
    vad=ctx.proc.userdata["vad"],
    stt=openai.STT(),
    llm=openai.LLM(),
    tts=openai.TTS(),
    chat_ctx=initial_ctx,
    before_llm_cb=before_llm_cb  # 添加回调函数
)

这里的主要更新是before_llm_cb参数,它使用之前创建的回调函数将最新的视频帧注入到对话上下文中。

最终的agent.py文件(带有语音和视觉功能)

这是在添加所有必要的函数和导入语句后agent.py文件的样子:

from asyncio.log import logger
from livekit import rtc
from livekit.agents.llm import ChatMessage, ChatImage
import logging
from dotenv import load_dotenv
from livekit.agents import (
    AutoSubscribe,
    JobContext,
    JobProcess,
    WorkerOptions,
    cli,
    llm,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, deepgram, silero

async def get_video_track(room: rtc.Room):
    """Find and return the first available remote video track in the room."""
    for participant_id, participant in room.remote_participants.items():
        for track_id, track_publication in participant.track_publications.items():
            if track_publication.track and isinstance(track_publication.track, rtc.RemoteVideoTrack):
                logger.info(f"Found video track {track_publication.track.sid} from participant {participant_id}")
                return track_publication.track
    raise ValueError("No remote video track found in the room")

async def get_latest_image(room: rtc.Room):
    """Capture and return a single frame from the video track."""
    video_stream = None
    try:
        video_track = await get_video_track(room)
        video_stream = rtc.VideoStream(video_track)
        async for event in video_stream:
            logger.debug("Captured latest video frame")
            return event.frame
    except Exception as e:
        logger.error(f"Failed to get latest image: {e}")
        return None
    finally:
        if video_stream:
            await video_stream.aclose()

def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()

async def entrypoint(ctx: JobContext):

    async def before_llm_cb(assistant: VoicePipelineAgent, chat_ctx: llm.ChatContext):
        """
        Callback that runs right before the LLM generates a response.
        Captures the current video frame and adds it to the conversation context.
        """
        try:
            if not hasattr(assistant, '_room'):
                logger.warning("Room not available in assistant")
                return
            latest_image = await get_latest_image(assistant._room)
            if latest_image:
                image_content = [ChatImage(image=latest_image)]
                chat_ctx.messages.append(ChatMessage(role="user", content=image_content))
                logger.debug("Added latest frame to conversation context")
            else:
                logger.warning("No image captured from video stream")
        except Exception as e:
            logger.error(f"Error in before_llm_cb: {e}")

    initial_ctx = llm.ChatContext().append(
        role="system",
        text=(
            "You are a voice assistant created by LiveKit that can both see and hear. "
            "You should use short and concise responses, avoiding unpronounceable punctuation. "
            "When you see an image in our conversation, naturally incorporate what you see "
            "into your response. Keep visual descriptions brief but informative."
        ),
    )

    logger.info(f"connecting to room {ctx.room.name}")
    await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)

    # Wait for the first participant to connect
    participant = await ctx.wait_for_participant()
    logger.info(f"starting voice assistant for participant {participant.identity}")

    # Configure the voice pipeline agent
    agent = VoicePipelineAgent(
        vad=ctx.proc.userdata["vad"],
        stt=deepgram.STT(),
        llm=openai.LLM(),
        tts=openai.TTS(),
        chat_ctx=initial_ctx,
        before_llm_cb=before_llm_cb  # Add the callback here
    )

    agent.start(ctx.room, participant)

    # Greet the user when the agent starts
    await agent.say("Hey, how can I help you today?", allow_interruptions=True)

if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            prewarm_fnc=prewarm,
        ),
    )

测试AI聊天机器人

启动你的AI聊天机器人并测试以下功能:

python3 agent.py dev

  1. 测试语音交互:对着麦克风说话,看看聊天机器人如何回应。
  2. 测试视觉功能:让聊天机器人通过视频识别物体。

你将在控制台中看到如下日志输出:

2024-12-30 08:32:56,167 - DEBUG asyncio - Using selector: EpollSelector
2024-12-30 08:32:56,168 - DEV  livekit.agents - Watching /root/do-voice-vision-bot
2024-12-30 08:32:56,774 - DEBUG asyncio - Using selector: EpollSelector
2024-12-30 08:32:56,778 - INFO livekit.agents - starting worker {"version": "0.12.5", "rtc-version": "0.18.3"}
2024-12-30 08:32:56,819 - INFO livekit.agents - registered worker {"id": "AW_cjS8QXCEnFxy", "region": "US East", "protocol": 15, "node_id": "NC_OASHBURN1A_BvkfVkdYVEWo"}

现在,你需要使用一个可同时发音频和视频的客户端将应用程序连接到 LiveKit 房间。最简单的方法是使用托管的LiveKit的Agent Playground

由于这个AI聊天机器人需要一个前端应用程序来进行通信。你可以使用我们提供的 livekit-examples 中的一个示例前端,按照其中一个客户端快速入门指南创建自己的前端,或者立即针对托管的沙盒前端进行测试。

在这个示例中,你将使用一个现有的托管的Agent Playground。只需在系统的浏览器中打开这个 LiveKit Agents Playground 并连接你的 LiveKit 项目。它应该会自动填充你的项目信息。

工作原理

通过上述对代理的更改,你的AI聊天机器人现在可以:

  1. 连接音频和视频流:助手能够同时接收音频和视频数据。
  2. 监听用户语音:就像之前一样,助手会监听用户的语音输入。
  3. 生成响应前的动作
    1. 捕获当前视频帧:在生成每个响应之前,助手会捕获当前的视频帧。
    2. 将其添加到对话上下文中:助手会将捕获的视频帧添加到对话上下文中。
    3. 利用其来生成响应:根据视频内容,助手会在回应中自然地融入视觉信息。
  4. 通过仅在需要时添加帧来保持上下文的整洁

小结

你已经成功使用OpenAI、LiveKit和Deepgram在DigitalOcean GPU Droplets上构建了一个具有视觉和语音功能的实时AI聊天机器人。这种强大的组合使得应用程序能够实现高效、可扩展且实时的交互。

如果你也希望构建自己的AI应用,或者训练大语言模型,欢迎尝试DigitalOcean的GPU Droplet服务器。如过需要H100 GPU服务器裸机,可直接联系DigitalOcean中国区独家战略合作伙伴卓普云,他们会根据用量提供优惠报价。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐