链载Ai

标题: 手把手教你开发Agent:聊聊DB-GPT Agent的架构设计、源码解读和实战开发 [打印本页]

作者: 链载Ai    时间: 昨天 11:50
标题: 手把手教你开发Agent:聊聊DB-GPT Agent的架构设计、源码解读和实战开发


背景
去年5月份DB-GPT项目正式开源,作为DB-GPT的元老级开发者,我当时主要负责让DB-GPT能兼容支持AutoGPT的Plugin,实现自然语言和数据库相关的交互能力, 以及Text2SQL后的可视化流程。
去年7月份开始,DB-GPT核心团队开始重新思考LLM在数据领域的应用和发展,对整个DB-GPT的架构做了全新的设计,其中Agent是我们认为AI应用架构里至关重要且绕不开的核心能力之一。
我们做了大量的调研,同时对比了MetaGPT, AutoGPT, X-Agent,AutoGen很多当时比较火的Agent框架,虽然Agent本身的理念和逻辑基本都来自同一个论文的一套思路,但在不同的场景和框架下其实很难做到直接的互相兼容。当时我们考虑使用轻量级的AutoGen,但是过程中发现很难和DB-GPT现有的基础模块和能力相结合,最终我们只能尝试重新实现一套DB-GPT自有的Agent架构,但是在底层API和一些思路上还是借鉴了不少AutoGen的设计。
去年10月份大概第一版出来后我们尝试做了一些简单的Demo应用,同时和AWEL工作流做了结合探索。后续我们又对Agent的架构进行了持续的改造,从记忆机制、资源、消息、用户交互等多个维度进行了优化。

设计思路和方案
1.关于LLM Agent的一些基础
我们从《Thinking, Fast and Slow思考快与慢》一书中对人类的认知系统进行了两种定义分类:

系统 1:是一种响应迅速且自动化的推理过程。
系统 2:是慢速、深思熟虑的推理过程。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei", "Helvetica Neue", Helvetica, Arial, sans-serif, "Segoe UI";font-size: 0px;letter-spacing: normal;text-align: start;" width="458"/>

Lilian Weng在其博文《LLM Powered Autonomous Agents》中提出,对基于LLM的AI Agent 做了系统综述。


2.DB-GPT实践落地的Agent方案设计





ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;">源码架构解析
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;">1.Agent API

@abstractmethodasync def send(self,message: AgentMessage,recipient: Agent,reviewer: Optional[Agent] = None,request_reply: Optional[bool] = True,is_recovery: Optional[bool] = False,silent: Optional[bool] = False,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,) -> None:"""Send a message to recipient agent.
Args:message(AgentMessage): the message to be sent.recipient(Agent): the recipient agent.reviewer(Agent): the reviewer agent.request_reply(bool): whether to request a reply.is_recovery(bool): whether the message is a recovery message.
Returns:None"""

@abstractmethodasync def receive(self,message: AgentMessage,sender: Agent,reviewer: Optional[Agent] = None,request_reply: Optional[bool] = None,silent: Optional[bool] = False,is_recovery: Optional[bool] = False,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,) -> None:"""Receive a message from another agent.
Args:message(AgentMessage): the received message.sender(Agent): the sender agent.reviewer(Agent): the reviewer agent.request_reply(bool): whether to request a reply.silent(bool): whether to be silent.is_recovery(bool): whether the message is a recovery message.
Returns:None"""

@abstractmethodasync def generate_reply(self,received_message: AgentMessage,sender: Agent,reviewer: Optional[Agent] = None,rely_messages: Optional[List[AgentMessage]] = None,is_retry_chat: bool = False,last_speaker_name: Optional[str] = None,**kwargs,) -> AgentMessage:"""Generate a reply based on the received messages.
Args:received_message(AgentMessage): the received message.sender: sender of an Agent instance.reviewer: reviewer of an Agent instance.rely_messages: a list of messages received.
Returns:AgentMessage: the generated reply. If None, no reply is generated."""

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;font-size: 16px;letter-spacing: 0.034em;">2.Agent 核心类

class Agent(ABC)
class Role(ABC, BaseModel)
class ConversableAgent(Role, Agent)
class AgentManager(BaseComponent)
class Team(BaseModel)
class AgentMemoryFragment(MemoryFragment)
class Action(ABC, Generic[T])
class Resource(ABC, Generic[P])

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: var(--articleFontsize);letter-spacing: 0.034em;" width="1161"/>

(初稿仅供方便理解, 后续再逐步更新)

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;letter-spacing: 0.034em;">3.Agent 注册机制

agent_manage = get_agent_manager(system_app)
agent_manage.register_agent(ApiDisplayAssistantAgent)

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;letter-spacing: 0.034em;">4.Agent 资源绑定机制

# 资源基础类class Resource(ABC, Generic[P]):"""Resource for the agent."""# 数据库资源对象class DBResource(Resource[P], Generic[P]):
#知识资源对象(将召回对象作为资源绑定)class RetrieverResource(Resource[ResourceParameters]):#知识库空间资源对象(将DBGPT的知识库空间作为资源对象)class KnowledgeSpaceRetrieverResource(RetrieverResource):
# 资源包(将多个资源变成一个资源包的方式绑定引用)class ResourcePack(Resource[PackResourceParameters]):

# 内置工具资源class ToolPack(ResourcePack):# 插件工具资源包,可加载Autogpt插件class PluginToolPack(ToolPack):class AutoGPTPluginToolPack(ToolPack):

# 内置工具定义和使用方法@tool(description="List the supported models in DB-GPT project.")def list_dbgpt_support_models(model_type: Annotated[str, Doc("The model type, LLM(Large Language Model) and EMBEDDING).")] = "LLM",) -> str:...

@tool(description="Get current host CPU status.")def get_current_host_cpu_status() -> str:...
@tool(description="Baidu search and return the results as a markdown string. Please set ""number of results not less than 8 for rich search results.",)def baidu_search(query: Annotated[str, Doc("The search query.")],num_results: Annotated[int, Doc("The number of search results to return.")] = 8,) -> str:...
‍资源定义:

llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")context: AgentContext = AgentContext(conv_id="test456")
agent_memory = AgentMemory()tools = ToolPack([simple_calculator, count_directory_files])prompt_template: PromptTemplate = prompt_service.get_template(prompt_code=record.prompt_template)await ToolAssistantAgent().bind(context) #agent 运行上下文 会话id、应用名、推理参数等.bind(LLMConfig(llm_client=llm_client)) #当前agent使用的模型服务.bind(agent_memory) # 绑定当前agent的记忆对象.bind(prompt_template) # 绑定Agent的prompt 覆盖角色定义 暂时依赖Prompt模块,后续改造为面向API.bind(tools)# 绑定当前agent要使用的资源.build() #Agent准备检查和预加载等工作

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;letter-spacing: 0.034em;">5.Agent 记忆、消息缓存机制

# 默认短期记忆 默认使用 ShortTermMemory(buffer_size=5) 内存队列作为存储agent_memory = AgentMemory(gpts_memory=self.memory)
# 短期记忆class ShortTermMemory(Memory, Generic[T])
# 长期记忆class LongTermMemory(Memory, Generic[T])
embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)embedding_fn = embedding_factory.create(model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL])vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"Just use chroma store nowvector_store_connector = VectorStoreConnector(vector_store_type=CFG.VECTOR_STORE_TYPE,vector_store_config=VectorStoreConfig(name=vstore_name, embedding_fn=embedding_fn),)memory = HybridMemory[AgentMemoryFragment].from_chroma(vstore_name=vstore_name,embeddings=embedding_fn,)
# 感知记忆class SensoryMemory(Memory, Generic[T])
# 混合记忆class HybridMemory(Memory, Generic[T])# 增强短期记忆class EnhancedShortTermMemory(ShortTermMemory[T])
self.memory.init({conv_id})try:# 这里开始一个Agent的对话await user_proxy.initiate_chat(recipient=tool_engineer,reviewer=user_proxy,message="Calculate the product of 10 and 99",)finally:await self.memory.clear({conv_id}))

## 外部通过集体记忆对象的通道获取Agent的对话消息,支持流式输出async def chat_messages(self, conv_id: str, user_code: str = None, system_app: str = None,):while True:queue = self.memory.queue(conv_id)if not queue:breakitem = await queue.get()if item == "[DONE]":queue.task_done()breakelse:yield itemawait asyncio.sleep(0.005)

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;letter-spacing: 0.034em;">6.Agent 意图识别和应用链接机制

在实际落地的时候,复杂场景基本会拆分为多个应用场景,或者多个分支Flow,如果是多个应用场景的时候, 我们会在Flow的某个节点call起来另一个应用(参考如下实现方案)

# 参考这个Acitonclass StartAppAction(Action[LinkAppInput]):
async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:conv_id = kwargs.get("conv_id")user_input = kwargs.get("user_input")paren_agent = kwargs.get("paren_agent")init_message_rounds = kwargs.get("init_message_rounds")
# TODO 这里放应用启动前的逻辑代码from dbgpt.serve.agent.agents.controller import multi_agents
await multi_agents.agent_team_chat_new(new_user_input if new_user_input else user_input,conv_id,gpts_app,paren_agent.memory,False,link_sender=paren_agent,app_link_start=True,init_message_rounds=init_message_rounds,)return ActionOutput(is_exe_success=True, content="", view=None, have_retry=False)

# 参考这个Action class LinkAppAction(Action[LinkAppInput]):async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:
# TODO 这里根据模型输出解析出下一步要走到的Agent角色名称role = "xxxx"# 当前Agent返回时指定下一个发言者信息return ActionOutput(is_exe_success=True,content=json.dumps(app_link_param, ensure_ascii=False),view=await self.render_protocal.display(content=app_link_param),next_speakers=[role],)

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-align: left;letter-spacing: 0.034em;">7.Agent 消息输出展示机制

self._render_protocol=VisChart()view=awaitself.render_protocol.display(chart=json.loads(model_to_json(param)),data_df=data_df)

8.Agent 身份定义和其他属性特性



9.Agent 推理模型选择策略

Agent推理使用的模型可以根据模型选择策略来决定,多个Agent每个Agent都可以有自己的模型选择策略。
# 基类 和接口class LLMStrategy:# 默认使用当前模型服的默认模型async def next_llm(self, excluded_models: Optional[List[str]] = None):
## 优先级策略的模型选择策略实现 class LLMStrategyPriority(LLMStrategy):# 按配置优先级进行选择和重试async def next_llm(self, excluded_models: Optional[List[str]] = None) -> str:"""Return next available llm model name."""try:if not excluded_models:excluded_models = []all_models = await self._llm_client.models()if not self._context:raise ValueError("No context provided for priority strategy!")priority: List[str] = json.loads(self._context)can_uses = self._excluded_models(all_models, excluded_models, priority)if can_uses and len(can_uses) > 0:return can_uses[0].modelelse:raise ValueError("No model service available!")
except Exception as e:logger.error(f"{self.type} get next llm failed!{str(e)}")raise ValueError(f"Failed to allocate model service,{str(e)}!")

Agent拓展开发
了解了上面关于Agent 的一些基础知识后,我们可以尝试开发一个自己的Agent, 开发自己Agent时主要关注下面几个方面就可以了。

1.角色

定义当前Agent 的身份信息,如下:
class DataScientistAgent(ConversableAgent):"""Data Scientist Agent."""
profile: ProfileConfig = ProfileConfig(name=DynConfig("Edgar",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_name",),role=DynConfig("DataScientist",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_role",),goal=DynConfig("Use correct {{dialect}} SQL to analyze and resolve user ""input targets based on the data structure information of the ""database given in the resource.",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_goal",),constraints=DynConfig(["Please ensure that the output is in the required format. ""Please ensure that each analysis only outputs one analysis ""result SQL, including as much analysis target content as possible.","If there is a recent message record, pay attention to refer to ""the answers and execution results inside when analyzing, ""and do not generate the same wrong answer.Please check carefully ""to make sure the correct SQL is generated. Please strictly adhere ""to the data structure definition given. The use of non-existing ""fields is prohibited. Be careful not to confuse fields from ""different tables, and you can perform multi-table related queries.","If the data and fields that need to be analyzed in the target are in ""different tables, it is recommended to use multi-table correlation ""queries first, and pay attention to the correlation between multiple ""table structures.","It is prohibited to construct data yourself as query conditions. ""Only the data values given by the famous songs in the input can ""be used as query conditions.","Please select an appropriate one from the supported display methods ""for data display. If no suitable display type is found, ""use 'response_table' as default value. Supported display types: \n""{{ display_type }}",],category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_constraints",),desc=DynConfig("Use database resources to conduct data analysis, analyze SQL, and provide ""recommended rendering methods.",category="agent",key="dbgpt_agent_expand_dashboard_assistant_agent_profile_desc",),)

2.推理

默认情况下Agent的推理由基类统一实现,不需要做任何事情. 如果对推理 有特殊逻辑改动,可以重载如下方法:
asyncdefthinking(self,messagesist[AgentMessage],sender:Optional[Agent]=None,prompt:Optional[str]=None,)->Tuple[Optional[str],Optional[str]]:

3.记忆

def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:from dbgpt.agent.core.memory.hybrid import HybridMemoryfrom dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIGfrom dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
memory_key = f"{dbgpts_name}_{conv_id}"if memory_key in self.agent_memory_map:return self.agent_memory_map[memory_key]
# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)# embedding_fn = embedding_factory.create(# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]# )# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"# Just use chroma store now# vector_store_connector = VectorStoreConnector(# vector_store_type=CFG.VECTOR_STORE_TYPE,# vector_store_config=VectorStoreConfig(# name=vstore_name, embedding_fn=embedding_fn# ),# )# memory = HybridMemory[AgentMemoryFragment].from_chroma(# vstore_name=vstore_name,# embeddings=embedding_fn,# )
agent_memory = AgentMemory(gpts_memory=self.memory)self.agent_memory_map[memory_key] = agent_memoryreturn agent_memory

4.行动

class SqlInput(BaseModel):"""SQL input model."""
display_type: str = Field(...,description="The chart rendering method selected for SQL. If you don’t know ""what to output, just output 'response_table' uniformly.",)sql: str = Field(..., description="Executable sql generated for the current target/problem")thought: str = Field(..., description="Summary of thoughts to the user")

class ChartAction(Action[SqlInput]):"""Chart action class."""
def __init__(self, **kwargs):"""Chart action init."""super().__init__(**kwargs)self._render_protocol = VisChart()

@propertydef out_model_type(self):"""Return the output model type."""return SqlInput
async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:"""Perform the action."""try:param: SqlInput = self._input_convert(ai_message, SqlInput)except Exception as e:logger.exception(f"{str(e)}! \n {ai_message}")return ActionOutput(is_exe_success=False,content="Error:The answer is not output in the required format.",)try:if not self.resource_need:raise ValueError("The resource type is not found!")
if not self.render_protocol:raise ValueError("The rendering protocol is not initialized!")
db_resources: List[DBResource] = DBResource.from_resource(self.resource)if not db_resources:raise ValueError("The database resource is not found!")
db = db_resources[0]data_df = await db.query_to_df(param.sql)view = await self.render_protocol.display(chart=json.loads(model_to_json(param)), data_df=data_df)
return ActionOutput(is_exe_success=True,content=model_to_json(param),view=view,resource_type=self.resource_need.value,resource_value=db._db_name,)except Exception as e:logger.exception("Check your answers, the sql run failed!")return ActionOutput(is_exe_success=False,content=f"Error:Check your answers, the sql run failed!Reason:{str(e)}",)

class XXXAgent(ConversableAgent):......# 为Action准备的额外执行参数def prepare_act_param(self, received_message: Optional[AgentMessage], sender: Agent,rely_messages: Optional[List[AgentMessage]] = None,**kwargs) -> Dict[str, Any]:
historical_dialogues = kwargs.get("historical_dialogues", None)return {"user_input": received_message.content,"conv_id": self.agent_context.conv_id,"paren_agent": self,"rely_messages": rely_messages,"historical_dialogues": historical_dialogues,}

5.资源

#资源加载方法,此处会默认会将资源包通过资源类的方法转成资源输入给LLMasyncdefload_resource(self,question:str,is_retry_chat:bool=False,**kwargs):logger.info(f"DomainApiload_resource:{question}")

6.用户交互和跨主题多轮对话

class XXXAction(Action[xxInput]):async def run(self,ai_message: str,resource: Optional[AgentResource] = None,rely_action_out: Optional[ActionOutput] = None,need_vis_render: bool = True,**kwargs,) -> ActionOutput:...
return ActionOutput(is_exe_success=False, # 提示当前Agent进展失败content=json.dumps(intent.to_dict(), ensure_ascii=False), # 问题内容view=intent.ask_user if intent.ask_user else ai_message, # 问题展示效果(可以配合GptVis像用户发起类似动态表单的消息)have_retry=False, # 并主动向用户发起提问ask_user=True)

多Agent协作
目前DB-GPT对于多Agent协作暂时只实现了自动拆分规划,和Flow编排, 后续会考虑ReAct动态规划.基于上文核心类提到的Team基础类,构建各种协作模式的管理者类,然后通过一个管理者Agent角色雇佣多个Agent来合作完成任务回答:
manager=AutoPlanChatManager()manager=(awaitmanager.bind(context).bind(agent_memory).bind(llm_config).build())manager.hire(employees)user_proxy:UserProxyAgent=(awaitUserProxyAgent().bind(context).bind(agent_memory).build())awaituser_proxy.initiate_chat(recipient=manager,message=user_query,is_retry_chat=is_retry_chat,last_speaker_name=last_speaker_name,message_rounds=init_message_rounds,**ext_info,)


AutoPlan协作模式的Agent

classAutoPlanChatManager(ManagerAgent):"""Achatmanageragentthatcanmanageateamchatofmultipleagents."""

classPlannerAgent(ConversableAgent):"""PlannerAgent.


AWEL协作模式的Agent

classAWELBaseManager(ManagerAgent,ABC):"""AWELbasemanager."""
‍Agent和AWEL Flow的结合目前相对比较生硬,早期尝试过让Agent基础算子类,但是实际情况Agent的资源绑定体系思路和算子的动态组件再初始化的时候会冲突,所以折中后构建了一套Agent容器算子, Flow的Agent容器算子绑定具体Agent,实际运行的时候在容器里绑定资源的方案, 下面介绍下Agent相关容器算子和资源节点。

## Agent相关算子### Agent Flow触发器,无实际逻辑,Flow的特性必须从触发器开始class AgentDummyTrigger(Trigger):
### Agent算子容器,拥有一致的输入输出,可以实现Agent Flow的自由拼接class AWELAgentOperator(MixinLLMOperator, MapOperator[AgentGenerateContext, AgentGenerateContext]):
## Agent Flow特性算子### 实现Agent Flow分支的算子class AgentBranchOperator(BranchOperator[AgentGenerateContext, AgentGenerateContext]):### 实现Agent Flow分支合并的算子class AgentBranchJoinOperator(BranchJoinOperator[AgentGenerateContext]):

# 实际Agent在Flow里的绑定节点(把Agent作为Agent算子容器的资源)class AWELAgent(BaseModel):
# Agent的绑定资源,将Agent的绑定资源作为Agent资源节点的资源节点### Agent资源 class AWELAgentResource(AgentResource):"""AWEL Agent Resource."""### Agent知识库资源class AWELAgentKnowledgeResource(AgentResource):### Agent的Prompt资源class AgentPrompt(BaseModel):### Agent的模型配置资源class AWELAgentConfig(LLMConfig):






欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5