返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

AI Agent

[复制链接]
链载Ai 显示全部楼层 发表于 1 小时前 |阅读模式 打印 上一主题 下一主题

写在前面

👀
在Trajectory的实现中,参照了一些开源的实现,主要是字节的trae Agent和langgraph的源码。这里在看字节的Trae agent的过程中,也顺带改了一个小bug,下一篇会讲下trae agent的源码解析,主要聚焦在几个方面:
  • prompt是怎么写的?
  • benchmark是如何做的?
  • 有一个奇怪的多轮对话的报错,是为什么?
另外下一步会看下PromptX的实现,据说是提出一个挑选工具的协议,看看PromptX这个项目是怎么实现的。
感兴趣的话,先关注吧,说实话,网上也很少有博主像我写的这么详细的。

背景

📌
Trajectory 是什么?它的核心价值是什么?
在 Agent 的语境下,Trajectory(轨迹)指的是 Agent 在处理一个任务或一次对话交互过程中的完整执行记录(轨迹)指的是 Agent 在处理一个任务或一次对话交互过程中的完整执行记录。它就像飞机的“黑匣子”,详细记录了从接收用户输入开始,到最终给出响应的每一步思考、决策和行动。

一个典型的 Trajectory 包含以下关键信息:
  • 用户输入 (User Message):对话的起点。
  • AI 的思考过程 (Thought):Agent 的中间推理步骤。
  • 工具调用 (Tool Call):Agent 决定使用哪个工具,以及传入的参数。
  • 工具执行结果 (Tool Output):工具返回给 Agent 的信息。
  • AI 的最终响应 (AI Message):Agent 回复给用户的内容。

其核心价值主要体现在以下几个方面:
  • 调试与可观测性 (Debugging & Observability):当 Agent 行为不符合预期时,Trajectory 是定位问题的最直接、最有效的工具。开发者可以清晰地看到每一步的输入输出,快速诊断是模型幻觉、工具错误还是逻辑流问题。
  • 审计与归档 (Audit & Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,计与归档 (Audit &Archive):在金融、法务、客服等需要合规和追溯的场景下,Trajectory 提供了一份不可篡改的、详细的交互历史。这既可以作为审计凭证,也可以作为历史案例进行归档,用于后续的分析和复盘。
  • 评估与优化 (Evaluation & Optimization):通过分析大量的 Trajectory 数据,我们可以评估 Agent 在不同任务上的表现,发现其能力的边界和常见的失败模式,为后续的模型微调(Fine-tuning)或 Prompt Engineering 提供数据支持。
因此想基于 LangGraph 的 Agent 设计并实现一个健壮、可扩展的 Trajectory(轨迹)记录系统。该系统不仅能满足基本的调试、审计和归-档需求,还引入了分布式追踪理念,为 Agent 的行为提供了深度可观测性,最终实现了类似 LangSmith 的轨迹分组与追踪效果。

实现目标

核心目标是捕获 Agent在执行任务过程中的每一步关键信息,并将其结构化地记录下来。希望做到的功能:
  • 调试与审计:开发者可以清晰地回溯 Agent 的思考链、工具调用和模型响应,快速定位问题。
  • 归档与分析:将 Agent 的完整交互历史永久化存储,用于后续的行为分析和模型优化。
  • 可观测性与分布式追踪:借鉴 OpenTelemetry 等分布式追踪系统的理念,为每一次用户交互(Trace)和其中的每一个步骤(Span)分配唯一 ID,实现跨组件、跨服务的行为链路追踪。
  • 多轮对话分组:能准确地将一次完整的端到端对话(从用户输入到最终回复)划分为一个独立的 Trace,便于分组查看和分析。
  • 高可扩展性:系统设计应与具体存储后端解耦,支持从本地文件轻松扩展到 Kafka、数据库或专业日志系统。
  • 灵活集成:既能无缝集成到 LangGraph 的ReAct Agent 中,也能作为一个独立的节点(Node)在任何 LangGraph 图中即插即用。

设计与实现

设计

在实现 Trajectory 记录功能的过程中,我评估了多种技术方案,最终选择了一种基于“扫描 state['messages']”的模式Trajectory 记录功能的过程中,我评估了多种技术方案,最终选择了一种基于“扫描 state['messages']”的模式。这个决策是权衡了多种方案的利弊后做出的。

曾经考虑过的方案:

  1. 节点装饰器 (Node Decorator):最初,想为 LangGraph 中的每一个 Node(节点)都包裹一个装饰器。这个装饰器会在节点执行前后自动记录日志。
  • 代码类似
  • 优点:逻辑和业务分离,看起来很优雅。
  • 缺点:实现起来非常复杂。LangGraph 的节点功能各异,有的调用 LLM,有的执行工具,有的只是简单的逻辑判断。为这些异构的节点设计一个通用的、能提取所有关键信息的装饰器,成本很高,且容易与 LangGraph 的内部机制产生冲突。
    class TrajectoryHook:  """Hook for automatically recording LangGraph execution."""
    def __init__(self, recorder: TrajectoryRecorder): self.recorder = recorder self._session_id: Optional[str] = None
    def wrap_node(self, node_name: str, node_func: Callable) -> Callable: """Wrap a node function to record its execution.""" @wraps(node_func) async def wrapped_node(state: Dict[str, Any]) -> Any: if not self._session_id: return await node_func(state)
    # Record node start await self.recorder.record_event( self._session_id, node_name=node_name, event_type="node_start", data={"state_keys": list(state.keys()) if isinstance(state, dict) else None} )
    try: # Execute node if asyncio.iscoroutinefunction(node_func): result = await node_func(state) else: result = node_func(state)
    # Record node end await self.recorder.record_event( self._session_id, node_name=node_name, event_type="node_end", data={"has_result": result is not None} )
    # Record messages if present if isinstance(result, dict) and "messages" in result: messages = result["messages"] if isinstance(messages, list): for msg in messages: if hasattr(msg, "content"): # 确保是消息对象 await self.recorder.record_message(self._session_id, msg)
    # Record node output await self.recorder.record_node_output( self._session_id, node_name, result )
    return result
    except Exception as e: # Record error await self.recorder.record_error( self._session_id, error_type=type(e).__name__, error_message=str(e), node_name=node_name ) raise
    return wrapped_node
    async def __aenter__(self): """Start recording session.""" self._session_id = await self.recorder.start_session() return self
    async def __aexit__(self, exc_type, exc_val, exc_tb): """End recording session.""" if self._session_id: success = exc_type is None await self.recorder.end_session(self._session_id, success=success) self._session_id = None

  • 重写检查点 (RecordingCheckpoint):另一个思路是包装 LangGraph 的 Checkpoint(检查点)机制。因为 Checkpoint 本身就是用来持久化 Agent 状态的,我们可以在它保存状态的同时,将轨迹信息也一并记录下来。
    • 优点:这是一个非常可行的方案。Checkpoint 是状态变更的关键枢纽,在这里做文章能确保捕捉到所有重要的变化。
    • 缺点:它将轨迹记录与状态持久化逻辑强耦合。如果我们未来想更换 Checkpoint 的后端(比如从内存换到 Redis),或者在某些场景下禁用 Checkpoint,可能会影响到轨迹记录功能。当然也可以做一个组合的checkpoint,也是一个可行的方案!

    最终选择的方案:扫描state['messages']
    我最终实现的方案,其核心依据是:
    👀
    无论 Agent 内部的逻辑图(Graph)多么复杂,其所有关键的外部交互(用户输入、AI回复、工具调用与结果)最终都会以消息(Message)的形式被追加到 state['messages'] 列表中。

    基于这个洞察,我的实现变得非常纯粹和解耦:
    • 我们设计了一个MessageProcessor,它的唯一职责就是对比上一次的状态和当前状态,找出messages列表中的**新消息**。
    • 一旦发现新消息,就将其标准化为一条或多条轨迹事件(Event),并赋予它们正确的 Trace ID 和 Span ID。
    • 这种方式不关心这些消息是哪个 Node 产生的,也不关心 Checkpoint 是否启用。它只关心最终的结果,从而实现了与 LangGraph 内部执行逻辑的解耦。

    实现

    为实现上述目标,系统被设计为由多个松散耦合的组件构成,每个组件承担独立的职责。
    关键组件
    1. TraceContext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id和 parent_sntext: 追踪上下文,一个轻量级的数据容器,负责在整个 LangGraph 的执行流程中传递追踪状态,主要包含 trace_id、span_id 和 parent_span_id。
    2. TrajectoryRecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存ecorder: 轨迹记录器,是系统的核心协调者。它本身是无状态的,接收处理过的事件数据,并通过一个可插拔的 TrajectoryBackend 将数据写入到指定的存储中。
    3. TrajectoryBackend: 存储后端,定义了数据写入的接口。LocalFileBackendend: 存储后端,定义了数据写入的接口。LocalFileBackend 是其默认实现,将轨迹数据以 JSONL 格式写入本地文件,每一行代表一个事件。这种设计使得更换后端(如 Kafka、Redis、PostgreSQL)变得非常简单。
    4. MessageProcessor: 消息处理器,负责将 LangGraphstate['messages']中的原始消息转换为结构化的、可记录的 Trajectory 事件。它能识别消息类型(human,ai, tool)并生成相应的事件 payload。
    5. ReactTrajectoryHook**TrajectoryNode**: 集成层,负责将轨迹记录功能接入 LangGraph。
    • ReactTrajectoryHook作为 ReAct Agent 的post_model_hook,在每次模型调用后触发,自动管理 Trace 的生命周期(开始与结束)。
    • TrajectoryNode作为一个独立的 LangGraph 节点,可以在图的任意位置被调用,手动记录轨迹。
  • TrajectoryViewer: 一个简单的命令行工具,用于读取本地存储的轨迹文件,并按照trace_id进行分组和格式化展示,模拟了 LangSmith 的可视化效果。

  • 分布式追踪机制

    为了实现类似 LangSmith 那样的可视化效果,我们需要将一次完整的“请求-响应”流程中的所有事件(多次工具调用、AI 思考等)聚合到同一个 Trace 下。这就引出了一个关键问题:如何判断一轮对话的开始和结束?
    • 当前实现:我们的策略相对直接——**通过新的用户输入来开启一个新的 Trace**。当一个HumanMessage(用户消息)在上一轮对话结束后出现时,系统会生成一个新的trace_id。本轮中所有的后续事件(AIMessage、ToolCall、ToolMessage)都会沿用这个trace_id,但会拥有各自独立的span_id,并通过parent_span_id建立父子关系。
    通过这三个 ID,我们可以清晰地重构出 Agent 的完整执行树。

    • langgraph_hook.py
      • 提供了TrajectoryNode,一个可以被添加到任何 LangGraph 图中的独立节点。它提供了与 ReactTrajectoryHook 类似的功能,但给予开发者更大的控制权,可以在图的任意位置手动记录状态。

    • trajectory_viewer.py
      • TrajectoryViewer是一个离线分析工具。它读取 JSONL 文件,使用itertools.groupby**按 ****trace_id**用 itertools.groupby **按 ****trace_id**** 对所有事件进行分组**,然后格式化输出每一次完整的交互,清晰地展示了思考链和工具调用过程。

    调用过程

    效果

    1. 实现了完整的轨迹记录与追踪:成功构建了一个从数据捕获、处理、存储到展示的端到端轨迹系统。
    2. 高度解耦和可扩展:通过TrajectoryBackend抽象,系统可以轻松适配不同的生产环境存储方案。
    3. 优雅的 Trace 划分机制:通过监控用户输入来判定新 Trace 的开始,符合直觉且实现简洁,准确地将多轮对话划分为独立追踪单元。
    4. 双重集成模式:提供了Hook和Node两种集成方式,兼顾了自动化和灵活性,适用于不同类型的 LangGraph 应用。
    5. 简化的实现逻辑:从最初复杂的节点装饰器和状态快照方案,演进到最终“扫描消息增量”的模式,代码更简洁、鲁棒性更强。
    6. 实用的可视化工具:TrajectoryViewer提供了类似 LangSmith 的分组展示功能,极大地提升了调试和分析的效率。

    未来展望


    虽然当前系统已功能完备,但距离真实的线上系统仍有许多可以扩展和优化的方向,
    • 丰富后端支持:实现更多TrajectoryBackend,如 Kafka(用于实时数据流)、Redis(用于快速缓存)或 ClickHouse/PostgreSQL(用于持久化存储和复杂查询)。
    • 增强可视化界面:将TrajectoryViewer从命令行工具升级为交互式Web 应用,提供更丰富的过滤、搜索和可视化功能。
    • 与日志/数据管道集成:在生产环境中,将TrajectoryBackend对接到公司统一的日志系统和数据管道(如 Flink、Spark),实现企业级的监控和分析。
    • 优化 Trace 结束判定:探索更复杂的 Trace 结束逻辑,例如基于特定事件或超时机制,以适应更复杂的业务场景。
    👀
    在实际的大模型工程的工作中,每一个小组件都经过多轮的评审和打磨,拿这个数据举例,可以做的事情还非常多,比如:
    • 要满足数据合规的要求,有的不可以存明文,可能就需要大模型改写信息来脱敏
    • 关联用户的点赞,不点赞的行为,分析上下文,来改写模型
    • 将数据进行冷热分离,持久归档冷数据到便宜的存储,省钱
    等等,但这些很多都是传统的后端和大数据的技术栈。有句话叫:所有的产品都值得被大模型重做一遍。同样的:所有的技术都会在大模型上找到影子。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ