链载Ai

标题: 深入 A2A Protocol:一个 Python 的例子 [打印本页]

作者: 链载Ai    时间: 1 小时前
标题: 深入 A2A Protocol:一个 Python 的例子

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding-right: 0.2em;padding-left: 0.2em;color: rgb(255, 255, 255);background: rgb(15, 76, 129);">目录

简介

在本教程中,您将使用Python构建一个简单的回显A2A服务器。这个基础实现将展示A2A提供的所有功能。通过本教程,您将能够使用Ollama或Google的Agent Development Kit添加代理功能。

您将学习:

设置环境

您需要的工具

Python环境

我们将使用uv作为包管理器并设置我们的项目。

我们将使用的A2A库需要python >= 3.12,如果您还没有匹配的版本,uv可以安装。我们将使用python 3.12。

检查

运行以下命令以确保您已准备好进行下一步:

echo'importsys;print(sys.version)'|uvrun-

如果您看到类似以下内容,则表示您已准备就绪!

3.12.3(main,Feb42025,14:48:35)[GCC13.3.0]

我的环境

创建项目

首先使用uv创建一个项目。我们将添加--package标志,以便您以后可以添加测试或发布项目:

uvinit--packagemy-project
cdmy-project

使用虚拟环境

我们将为这个项目创建一个虚拟环境。这只需要做一次:

uvvenv.venv

对于此及将来打开的任何终端窗口,您需要激活此虚拟环境:

source.venv/bin/activate

如果您使用的是VS Code等代码编辑器,您需要设置Python解释器以获取代码补全。在VS Code中,按Ctrl-Shift-P并选择Python: Select Interpreter。然后选择您的项目my-project,然后选择正确的python解释器Python 3.12.3 ('.venv':venv) ./.venv/bin/python

源代码现在应该类似于这样:

#my-project
tree
.
|____pyproject.toml
|____README.md
|____.venv
||____bin
|||____activate.bat
|||____activate.ps1
|||____python3
|||____python
|||____activate.fish
|||____pydoc.bat
|||____activate_this.py
|||____activate
|||____activate.nu
|||____deactivate.bat
|||____python3.13
|||____activate.csh
||____pyvenv.cfg
||____CACHEDIR.TAG
||____.gitignore
||____lib
|||____python3.13
||||____site-packages
|||||_____virtualenv.py
|||||_____virtualenv.pth
|____.python-version
|____src
||____my_project
|||______init__.py

添加Google-A2A Python库

接下来,我们将添加来自Google的示例A2A python库:

uvaddgit+https://github.com/google/A2A#subdirectory=samples/python

pyproject.toml:

[project]
name="my-project"
version="0.1.0"
description="Addyourdescriptionhere"
readme="README.md"
authors=[
{name="zhangcheng",email="zh.milo@gmail.com"}
]
requires-python=">=3.13"
dependencies=[
"a2a-samples",
]

[project.scripts]
my-project="my_project:main"

[build-system]
requires=["hatchling"]
build-backend="hatchling.build"

[tool.uv.sources]
a2a-samples={git="https://github.com/google/A2A",subdirectory="samples/python"}

设置项目结构

现在创建一些我们稍后将使用的文件:

touchsrc/my_project/agent.py
touchsrc/my_project/task_manager.py

测试运行

如果一切设置正确,您现在应该能够运行您的应用程序:

uvrunmy-project

输出应该类似于这样:

Hellofrommy-project!

代理技能

代理技能是代理可以执行的一组功能。以下是我们的回显代理的示例:

{
id:"my-project-echo-skill"
name:"EchoTool",
description:"Echostheinputgiven",
tags:["echo","repeater"],
examples:["Iwillseethisechoedbacktome"],
inputModes:["text"],
outputModes:["text"]
}

这符合代理卡片的技能部分:

{
id:string;//代理技能的唯一标识符
name:string;//技能的人类可读名称
//技能描述-将被客户端或人类用作
//了解技能作用的提示。
description:string;
//描述此特定技能能力类别的标签词集合
//(例如"cooking","customersupport","billing")
tags:string[];
//技能可以执行的示例场景集合。
//将被客户端用作了解如何使用技能的提示。
//(例如"Ineedarecipeforbread")
examples?:string[];//任务提示示例
//技能支持的交互模式集合
//(如果与默认值不同)
inputModes?:string[];//支持的输入mime类型
outputModes?:string[];//支持的输出mime类型
}

实现

让我们在代码中创建这个代理技能。打开src/my-project/__init__.py并用以下代码替换内容:

importgoogle_a2a
fromgoogle_a2a.common.typesimportAgentSkill

defmain():
skill=AgentSkill(
id="my-project-echo-skill",
name="EchoTool",
description="Echostheinputgiven",
tags=["echo","repeater"],
examples=["Iwillseethisechoedbacktome"],
inputModes=["text"],
outputModes=["text"],
)
print(skill)

if__name__=="__main__":
main()

如果遇到模块错误,请尝试以下方式:

fromcommon.typesimportAgentSkill

#相同的代码

测试运行

让我们运行一下:

uvrunmy-project

输出应该类似于这样:

id='my-project-echo-skill'name='EchoTool'description='Echostheinputgiven'tags=['echo','repeater']examples=['Iwillseethisechoedbacktome']inputModes=['text']outputModes=['text']

代理卡片

现在我们已经定义了技能,可以创建代理卡片了。

远程代理需要以JSON格式发布代理卡片,描述代理的功能和技能以及身份验证机制。换句话说,这让世界了解您的代理以及如何与之交互。

实现

首先添加一些用于解析命令行参数的辅助工具。这对于以后启动服务器会很有帮助:

uvaddclick

并更新我们的代码:

importlogging

importclick
importgoogle_a2a
fromgoogle_a2a.common.typesimportAgentSkill,AgentCapabilities,AgentCard

logging.basicConfig(level=logging.INFO)
logger=logging.getLogger(__name__)

@click.command()
@click.option("--host",default="localhost")
@click.option("--port",default=10002)
defmain(host,port):
skill=AgentSkill(
id="my-project-echo-skill",
name="EchoTool",
description="Echostheinputgiven",
tags=["echo","repeater"],
examples=["Iwillseethisechoedbacktome"],
inputModes=["text"],
outputModes=["text"],
)
logging.info(skill)

if__name__=="__main__":
main()

接下来我们添加代理卡片:

#...
defmain(host,port):
#...
capabilities=AgentCapabilities()
agent_card=AgentCard(
name="EchoAgent",
description="Thisagentechostheinputgiven",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=capabilities,
skills=[skill]
)
logging.info(agent_card)

if__name__=="__main__":
main()

测试运行

让我们运行一下:

uvrunmy-project

输出应该类似于这样:

INFO:root:id='my-project-echo-skill'name='EchoTool'description='Echostheinputgiven'tags=['echo','repeater']examples=['Iwillseethisechoedbacktome']inputModes=['text']outputModes=['text']
INFO:root:name='EchoAgent'description='Thisagentechostheinputgiven'url='http://localhost:10002/'provider=Noneversion='0.1.0'documentationUrl=Nonecapabilities=AgentCapabilities(streaming=False,pushNotifications=False,stateTransitionHistory=False)authentication=NonedefaultInputModes=['text']defaultOutputModes=['text']skills=[AgentSkill(id='my-project-echo-skill',name='EchoTool',description='Echostheinputgiven',tags=['echo','repeater'],examples=['Iwillseethisechoedbacktome'],inputModes=['text'],outputModes=['text'])]

A2A服务器

我们几乎准备好启动服务器了!我们将使用Google-A2A中的A2AServer类,它在底层启动了一个uvicorn服务器。

任务管理器

在创建服务器之前,我们需要一个任务管理器来处理传入请求。

我们将实现InMemoryTaskManager接口,这需要我们实现两个方法:

asyncdefon_send_task(
self,
request:SendTaskRequest
)->SendTaskResponse:
"""
此方法查询或为代理创建任务。
调用者将收到恰好一个响应。
"""
pass

asyncdefon_send_task_subscribe(
self,
request:SendTaskStreamingRequest
)->AsyncIterable[SendTaskStreamingResponse]|JSONRPCResponse:
"""
此方法让调用者订阅关于任务的未来更新。
调用者将收到一个响应,并通过客户端和服务器之间建立的会话
额外接收订阅更新
"""
pass

打开src/my_project/task_manager.py并添加以下代码。我们将简单地返回直接回显响应,并立即将任务标记为完成,不使用任何会话或订阅:

fromtypingimportAsyncIterable

importgoogle_a2a
fromgoogle_a2a.common.server.task_managerimportInMemoryTaskManager
fromgoogle_a2a.common.typesimport(
Artifact,
JSONRPCResponse,
Message,
SendTaskRequest,
SendTaskResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
Task,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)

classMyAgentTaskManager(InMemoryTaskManager):
def__init__(self):
super().__init__()

asyncdefon_send_task(self,request:SendTaskRequest)->SendTaskResponse:
#更新InMemoryTaskManager存储的任务
awaitself.upsert_task(request.params)

task_id=request.params.id
#我们的自定义逻辑,简单地将任务标记为完成
#并返回回显文本
received_text=request.params.message.parts[0].text
task=awaitself._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_taskreceived:{received_text}"
)

#发送响应
returnSendTaskResponse(id=request.id,result=task)

asyncdefon_send_task_subscribe(
self,
request:SendTaskStreamingRequest
)->AsyncIterable[SendTaskStreamingResponse]|JSONRPCResponse:
pass

asyncdef_update_task(
self,
task_id:str,
task_state:TaskState,
response_text:str,
)->Task:
task=self.tasks[task_id]
agent_response_parts=[
{
"type":"text",
"text":response_text,
}
]
task.status=TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
)
)
task.artifacts=[
Artifact(
parts=agent_response_parts,
)
]
returntask

A2A服务器

有了完整的任务管理器,我们现在可以创建服务器了。

打开src/my_project/__init__.py并添加以下代码:

#...
fromgoogle_a2a.common.serverimportA2AServer
frommy_project.task_managerimportMyAgentTaskManager
#...
defmain(host,port):
#...

task_manager=MyAgentTaskManager()
server=A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
server.start()

测试运行

让我们运行一下:

uvrunmy-project

输出应该类似于这样:

INFO:root:id='my-project-echo-skill'name='EchoTool'description='Echostheinputgiven'tags=['echo','repeater']examples=['Iwillseethisechoedbacktome']inputModes=['text']outputModes=['text']
INFO:root:name='EchoAgent'description='Thisagentechostheinputgiven'url='http://localhost:10002/'provider=Noneversion='0.1.0'documentationUrl=Nonecapabilities=AgentCapabilities(streaming=False,pushNotifications=False,stateTransitionHistory=False)authentication=NonedefaultInputModes=['text']defaultOutputModes=['text']skills=[AgentSkill(id='my-project-echo-skill',name='EchoTool',description='Echostheinputgiven',tags=['echo','repeater'],examples=['Iwillseethisechoedbacktome'],inputModes=['text'],outputModes=['text'])]
INFO:Startedserverprocess[582]
INFO:Waitingforapplicationstartup.
INFO:Applicationstartupcomplete.
INFO:Uvicornrunningonhttp://localhost:10002(PressCTRL+Ctoquit)

恭喜!您的A2A服务器现在已经运行!

与A2A服务器交互

首先,我们将使用Google-A2A的命令行工具向我们的A2A服务器发送请求。尝试之后,我们将编写自己的基本客户端,看看这在幕后是如何工作的。

使用Google-A2A的命令行工具

在上一步中,让您的A2A服务器已经运行:

#这应该已经在您的终端中运行
$uvrunmy-project
INFO:Startedserverprocess[20538]
INFO:Waitingforapplicationstartup.
INFO:Applicationstartupcomplete.
INFO:Uvicornrunningonhttp://localhost:10002(PressCTRL+Ctoquit)

在同一目录中打开一个新终端:

source.venv/bin/activate
uvrungoogle-a2a-cli--agenthttp://localhost:10002

#如果出错,请尝试这个(确保.venv/lib/python3.13/site-packages中有一个hosts目录):
uvrunpython-mhosts.cli--agenthttp://localhost:10002

注意:只有当您从这个pull request安装了google-a2a,这才能工作,因为cli之前没有暴露。

否则,您必须直接检出Google/A2A仓库,导航到samples/python仓库并直接运行cli。

也可以使用如下的方式运行简陋的 cli:

gitclonegit@github.com:google/A2A.git
cdsamples/python/hosts/cli
uvrun.--agent[url-of-your-a2a-server]

然后您可以通过输入并按Enter键向服务器发送消息:

=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):Hello!

如果一切正常,您将在响应中看到:

$uvrunpython-mhosts.cli--agenthttp://localhost:10002
=======AgentCard========
{"name":"EchoAgent","description":"Thisagentechostheinputgiven","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"EchoTool","description":"Echostheinputgiven","tags":["echo","repeater"],"examples":["Iwillseethisechoedbacktome"],"inputModes":["text"],"outputModes":["text"]}]}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):hello
Selectafilepathtoattach?(pressentertoskip):

{"jsonrpc":"2.0","id":"5b3b74b7ea80495daff4047ee48a6c48","result":{"id":"740f1e21465b4ee2af4af7b8c6cacad5","sessionId":"7fbd065264cb4d6c91ed96909589fc35","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"on_send_taskreceived:hello"}]},"timestamp":"2025-05-03T22:18:41.649600"},"artifacts":[{"parts":[{"type":"text","text":"on_send_taskreceived:hello"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hello"}]}]}}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):

要退出,请输入:q并按Enter键。

添加代理功能

现在我们已经有了一个基本的A2A服务器运行,让我们添加更多功能。我们将探索A2A如何异步工作并流式传输响应。

流式传输

这允许客户端订阅服务器并接收多个更新,而不是单个响应。这对于长时间运行的代理任务或可能向客户端流式传输多个Artifacts的情况非常有用。

首先,我们将声明我们的代理已准备好进行流式传输。打开src/my_project/__init__.py并更新AgentCapabilities:

#...
defmain(host,port):
#...
capabilities=AgentCapabilities(
streaming=True
)
#...

现在在src/my_project/task_manager.py中,我们必须实现on_send_task_subscribe

importasyncio
#...
classMyAgentTaskManager(InMemoryTaskManager):
#...
asyncdef_stream_3_messages(self,request:SendTaskStreamingRequest):
task_id=request.params.id
received_text=request.params.message.parts[0].text

text_messages=["one","two","three"]
fortextintext_messages:
parts=[
{
"type":"text",
"text":f"{received_text}:{text}",
}
]
message=Message(role="agent",parts=parts)
is_last=text==text_messages[-1]
task_state=TaskState.COMPLETEDifis_lastelseTaskState.WORKING
task_status=TaskStatus(
state=task_state,
message=message
)
task_update_event=TaskStatusUpdateEvent(
id=request.params.id,
status=task_status,
final=is_last,
)
awaitself.enqueue_events_for_sse(
request.params.id,
task_update_event
)

asyncdefon_send_task_subscribe(
self,
request:SendTaskStreamingRequest
)->AsyncIterable[SendTaskStreamingResponse]|JSONRPCResponse:
#更新InMemoryTaskManager存储的任务
awaitself.upsert_task(request.params)

task_id=request.params.id
#为此任务创建一个工作队列
sse_event_queue=awaitself.setup_sse_consumer(task_id=task_id)

#为此任务启动异步工作
asyncio.create_task(self._stream_3_messages(request))

#告诉客户端期望未来的流式响应
returnself.dequeue_events_for_sse(
request_id=request.id,
task_id=task_id,
sse_event_queue=sse_event_queue,
)

重启您的A2A服务器以获取新更改,然后重新运行cli:

$uvrunpython-mhosts.cli--agenthttp://localhost:10002
=======AgentCard========
{"name":"EchoAgent","description":"Thisagentechostheinputgiven","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"EchoTool","description":"Echostheinputgiven","tags":["echo","repeater"],"examples":["Iwillseethisechoedbacktome"],"inputModes":["text"],"outputModes":["text"]}]}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):Streaming?
Selectafilepathtoattach?(pressentertoskip):
streamevent=>{"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?ne"}]},"timestamp":"2025-05-03T22:22:31.354656"},"final":false}}
streamevent=>{"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:two"}]},"timestamp":"2025-05-03T22:22:31.354684"},"final":false}}
streamevent=>{"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:three"}]},"timestamp":"2025-05-03T22:22:31.354698"},"final":true}}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):

有时代理可能需要额外输入。例如,也许代理会询问客户端是否想继续重复3条消息。在这种情况下,代理将响应TaskState.INPUT_REQUIRED,然后客户端将重新发送send_task_streaming,使用相同的task_idsession_id,但更新了提供代理所需输入的消息。在服务器端,我们将更新on_send_task_subscribe来处理这种情况:

importasyncio
fromtypingimportAsyncIterable

fromcommon.server.task_managerimportInMemoryTaskManager
fromcommon.typesimport(
Artifact,
JSONRPCResponse,
Message,
SendTaskRequest,
SendTaskResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
Task,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)

classMyAgentTaskManager(InMemoryTaskManager):
def__init__(self):
super().__init__()

asyncdefon_send_task(self,request:SendTaskRequest)->SendTaskResponse:
#更新InMemoryTaskManager存储的任务
awaitself.upsert_task(request.params)

task_id=request.params.id
#我们的自定义逻辑,简单地将任务标记为完成
#并返回回显文本
received_text=request.params.message.parts[0].text
task=awaitself._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_taskreceived:{received_text}"
)

#发送响应
returnSendTaskResponse(id=request.id,result=task)

asyncdef_stream_3_messages(self,request:SendTaskStreamingRequest):
task_id=request.params.id
received_text=request.params.message.parts[0].text

text_messages=["one","two","three"]
fortextintext_messages:
parts=[
{
"type":"text",
"text":f"{received_text}:{text}",
}
]
message=Message(role="agent",parts=parts)
#is_last=text==text_messages[-1]
task_state=TaskState.WORKING
#task_state=TaskState.COMPLETEDifis_lastelseTaskState.WORKING
task_status=TaskStatus(
state=task_state,
message=message
)
task_update_event=TaskStatusUpdateEvent(
id=request.params.id,
status=task_status,
final=False,
)
awaitself.enqueue_events_for_sse(
request.params.id,
task_update_event
)
ask_message=Message(
role="agent",
parts=[
{
"type":"text",
"text":"Wouldyoulikemoremessages?(Y/N)"
}
]
)
task_update_event=TaskStatusUpdateEvent(
id=request.params.id,
status=TaskStatus(
state=TaskState.INPUT_REQUIRED,
message=ask_message
),
final=True,
)
awaitself.enqueue_events_for_sse(
request.params.id,
task_update_event
)

asyncdefon_send_task_subscribe(
self,
request:SendTaskStreamingRequest
)->AsyncIterable[SendTaskStreamingResponse]|JSONRPCResponse:
task_id=request.params.id
is_new_task=task_idinself.tasks
#更新InMemoryTaskManager存储的任务
awaitself.upsert_task(request.params)

received_text=request.params.message.parts[0].text
sse_event_queue=awaitself.setup_sse_consumer(task_id=task_id)
ifnotis_new_taskandreceived_text=="N":
task_update_event=TaskStatusUpdateEvent(
id=request.params.id,
status=TaskStatus(
state=TaskState.COMPLETED,
message=Message(
role="agent",
parts=[
{
"type":"text",
"text":"Alldone!"
}
]
)
),
final=True,
)
awaitself.enqueue_events_for_sse(
request.params.id,
task_update_event,
)
else:
asyncio.create_task(self._stream_3_messages(request))

returnself.dequeue_events_for_sse(
request_id=request.id,
task_id=task_id,
sse_event_queue=sse_event_queue,
)

asyncdef_update_task(
self,
task_id:str,
task_state:TaskState,
response_text:str,
)->Task:
task=self.tasks[task_id]
agent_response_parts=[
{
"type":"text",
"text":response_text,
}
]
task.status=TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
)
)
task.artifacts=[
Artifact(
parts=agent_response_parts,
)
]
returntask

现在重启服务器并运行cli,我们可以看到任务将继续运行,直到我们告诉代理N

uvrunpython-mhosts.cli--agenthttp://localhost:10002
=======AgentCard========
{"name":"EchoAgent","description":"Thisagentechostheinputgiven","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"EchoTool","description":"Echostheinputgiven","tags":["echo","repeater"],"examples":["Iwillseethisechoedbacktome"],"inputModes":["text"],"outputModes":["text"]}]}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):Streaming?
Selectafilepathtoattach?(pressentertoskip):
streamevent=>{"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?ne"}]},"timestamp":"2025-05-04T09:18:18.235994"},"final":false}}
streamevent=>{"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:two"}]},"timestamp":"2025-05-04T09:18:18.236021"},"final":false}}
streamevent=>{"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:three"}]},"timestamp":"2025-05-04T09:18:18.236033"},"final":false}}
streamevent=>{"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Wouldyoulikemoremessages?(Y/N)"}]},"timestamp":"2025-05-04T09:18:18.236044"},"final":true}}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):N
Selectafilepathtoattach?(pressentertoskip):
streamevent=>{"jsonrpc":"2.0","id":"86ce510ba68b4797a5b68061c8c4780b","result":{"id":"64e51665dc354d2da7c31bcc45abc8f9","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Alldone!"}]},"timestamp":"2025-05-04T09:22:24.598749"},"final":true}}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):

恭喜!您现在拥有一个能够异步执行工作并在需要时向用户请求输入的代理。

使用本地Ollama模型

现在我们来到了激动人心的部分。我们将为A2A服务器添加AI功能。

在本教程中,我们将设置一个本地Ollama模型并将其与A2A服务器集成。

要求

我们将安装ollamalangchain,并下载支持MCP工具的ollama模型(用于将来的教程)。

  1. 1. 下载ollama

  2. 2. 运行ollama服务器:

#注意:如果ollama已经在运行,您可能会收到错误,例如
#Error:listentcp127.0.0.1:11434:bind:addressalreadyinuse
#在Linux上,您可以运行systemctlstopollama来停止ollama
ollamaserve
  1. 3. 从此列表下载模型。我们将使用qwen3

ollamapullqwen3:4b
#仅2.4G
  1. 4. 安装langchain

uvaddlangchainlangchain-ollamalanggraph

现在ollama设置好了,我们可以开始将其集成到我们的A2A服务器中。

将Ollama集成到我们的A2A服务器中

首先打开src/my_project/__init__.py

#...

@click.command()
@click.option("--host",default="localhost")
@click.option("--port",default=10002)
@click.option("--ollama-host",default="http://127.0.0.1:11434")
@click.option("--ollama-model",default=None)
defmain(host,port,ollama_host,ollama_model):
#...
capabilities=AgentCapabilities(
streaming=False#我们将流式功能作为读者的练习
)
#...
task_manager=MyAgentTaskManager(
ollama_host=ollama_host,
ollama_model=ollama_model,
)
#..

现在让我们在src/my_project/agent.py中添加AI功能:

fromlangchain_ollamaimportChatOllama
fromlanggraph.prebuiltimportcreate_react_agent
fromlanggraph.graph.graphimportCompiledGraph

defcreate_ollama_agent(ollama_base_url:str,ollama_model:str):
ollama_chat_llm=ChatOllama(
base_url=ollama_base_url,
model=ollama_model,
temperature=0.2
)
agent=create_react_agent(ollama_chat_llm,tools=[])
returnagent

asyncdefrun_ollama(ollama_agent:CompiledGraph,prompt:str):
agent_response=awaitollama_agent.ainvoke(
{"messages":prompt}
)
message=agent_response["messages"][-1].content
returnstr(message)

最后,让我们从src/my_project/task_manager.py调用我们的ollama代理:

#...
frommy_project.agentimportcreate_ollama_agent,run_ollama

classMyAgentTaskManager(InMemoryTaskManager):
def__init__(
self,
ollama_host:str,
ollama_model:typing.Union[None,str]
):
super().__init__()
ifollama_modelisnotNone:
self.ollama_agent=create_ollama_agent(
ollama_base_url=ollama_host,
ollama_model=ollama_model
)
else:
self.ollama_agent=None

asyncdefon_send_task(self,request:SendTaskRequest)->SendTaskResponse:
#...
received_text=request.params.message.parts[0].text
response_text=f"on_send_taskreceived:{received_text}"
ifself.ollama_agentisnotNone:
response_text=awaitrun_ollama(ollama_agent=self.ollama_agent,prompt=received_text)

task=awaitself._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=response_text
)

#发送响应
returnSendTaskResponse(id=request.id,result=task)

#...

让我们测试一下!

首先重新运行我们的A2A服务器,将qwq替换为您下载的ollama模型:

uvrunmy-project--ollama-hosthttp://127.0.0.1:11434--ollama-modelqwen3:4b

然后重新运行cli:

uvrunpython-mhosts.cli--agenthttp://localhost:10002

注意,如果您使用的是大型模型,可能需要一段时间才能加载。cli可能会超时。在这种情况下,一旦ollama服务器完成加载模型,就重新运行cli。

您应该会看到类似以下内容:

=======AgentCard========
{"name":"EchoAgent","description":"Thisagentechostheinputgiven","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"EchoTool","description":"Echostheinputgiven","tags":["echo","repeater"],"examples":["Iwillseethisechoedbacktome"],"inputModes":["text"],"outputModes":["text"]}]}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):hey
Selectafilepathtoattach?(pressentertoskip):

{"jsonrpc":"2.0","id":"eca7ecf4d6da4a65a4ff99ab0954b957","result":{"id":"62636e021ac0483bb31d40c1473796fa","sessionId":"438927e3540f459389f3d3cb216dd945","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay,theuserjustsaid\"hey\".That'saprettyopen-endedgreeting.Ineedtorespondinafriendlyandwelcomingway.Maybestartwithagreetinglike\"Hithere!\"tokeepitcasual.Then,askhowIcanassistthem.Sincetheydidn'tspecifyatopic,Ishouldkeeptheresponsegeneralbutinviting.Letmemakesurethetoneispositiveandapproachable.Also,checkifthere'sanyspecificcontextIshouldconsider,butsincethere'snopriorconversation,it'ssafetoassumetheyjustwanttostartanewinteraction.Alright,timetoputthattogether.\n</think>\n\nHithere!HowcanIassistyoutoday??"}]},"timestamp":"2025-05-04T10:01:55.068049"},"artifacts":[{"parts":[{"type":"text","text":"<think>\nOkay,theuserjustsaid\"hey\".That'saprettyopen-endedgreeting.Ineedtorespondinafriendlyandwelcomingway.Maybestartwithagreetinglike\"Hithere!\"tokeepitcasual.Then,askhowIcanassistthem.Sincetheydidn'tspecifyatopic,Ishouldkeeptheresponsegeneralbutinviting.Letmemakesurethetoneispositiveandapproachable.Also,checkifthere'sanyspecificcontextIshouldconsider,butsincethere'snopriorconversation,it'ssafetoassumetheyjustwanttostartanewinteraction.Alright,timetoputthattogether.\n</think>\n\nHithere!HowcanIassistyoutoday??"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hey"}]}]}}
=========startinganewtask========

Whatdoyouwanttosendtotheagent?(:qorquittoexit):

恭喜!您现在拥有一个使用AI模型生成响应的A2A服务器!

目前有关 A2A的实现都很简陋,包括官方的,其他的一些参考如下:

NameAuthorDescriptionStars
a2ajava@vishalmysoreA pure Java implementation of Google's A2A protocol for Spring Boot applications, featuring both client and server implementations
Stars
legion-a2a@TheRaLabsA TypeScript implementation of the A2A protocol with a focus on modularity and extensibility
Stars
trpc-a2a-go@trpc-groupGo A2A implementation by the tRPC team featuring full client/server support, in-memory task management, streaming responses, session management, multiple auth methods (JWT, API Key, OAuth2), and comprehensive examples
Stars
jira-a2a@tuannvmThe Jira A2A system is a DevOps workflow automation platform using the tRPC-A2A-Go framework. It consists of independent Go agents that communicate via A2A messages.
Stars
a2a-go@a2aserverA Go library for building A2A servers, with example implementations
Stars
a2a-rs@EmilLindforsAn idiomatic Rust implementation following hexagonal architecture principles
Stars
a2a_min@pcingolaA minimalistic Python SDK for A2A communication
Stars
a2adotnet@azixakaA C#/.NET implementation of the A2A protocol
Stars
nestjs-a2a@thestupdA module for integrating the A2A protocol into NestJS applications
Stars
python-a2a@themanojdesaiAn easy-to-use Python library for implementing the A2A protocol
Stars
Aira@IhateCreatingUserNames2An A2A network implementation for hosting, registering, discovering, and interacting with agents
Stars
Cognisphere@IhateCreatingUserNames2An AI agent development framework built on Google's ADK, facilitating agent creation potentially for A2A networks
Stars
a2a-server@chrishayukA lightweight A2A python implementation
Stars
a2a-cli@chrishayukA command-line client for the A2A
Stars
A2A Test Suit@robert-at-pretension-ioA2A Test Suite
Stars
Grasp@adcenturyA Self-hosted Browser Using Agent with built-in MCP and A2A support
Stars
swissknife@daltonnyxA multi-agent chat application with MCP support, aiming to expose agents via the A2A protocol and connect to remote A2A agents as a client
Stars
artinet-sdk@the-artinet-projectA JS/TS SDK for the Agent2Agent Protocol with a focus on developer experience and comprehensive features
Stars

一些社区的案例:

NameAuthorDescriptionStars
a2a-agent-coder@sing1eeA Coder Agent implementation with A2A Server and Client
Stars
agentic-trading@kweinmeisterA sample application demonstrating Google ADK and A2A interoperability for trading automation
Stars
python-a2a-tutorial@sing1eeA comprehensive tutorial for implementing A2A in Python with practical examples
Stars






欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5