消息传递机制
RPC 消息 vs 广播消息
📖 两种消息类型
AutoGen 支持两种消息传递模式:
| 特性 | RPC 消息 | 广播消息 |
|---|---|---|
| 方法 | send_message() | publish_message() |
| 响应 | 期待返回 | 不期待返回 |
| 模式 | 一对一 | 一对多 |
| 使用场景 | 请求 - 响应 | 通知、事件 |
1. RPC 消息
发送
python
response = await runtime.send_message(
message,
recipient=agent_id,
sender=sender_id, # 可选
cancellation_token=token # 可选
)处理
python
@message_handler(match=lambda _, ctx: ctx.is_rpc)
async def on_rpc_message(self, message: MessageType, ctx: MessageContext):
return response # 返回响应2. 广播消息(事件)
发送
python
await runtime.publish_message(
message,
topic_id=topic_id,
sender=sender_id # 可选
)处理
python
@message_handler(match=lambda _, ctx: not ctx.is_rpc)
async def on_broadcast_message(self, message: MessageType, ctx: MessageContext):
pass # 不返回响应3. TopicId 结构
python
topic_id = TopicId(
type="news.tech", # 主题类型
source="default" # 主题来源
)- type: 消息的逻辑分类
- source: 消息的来源标识
4. 代码对比
RPC 示例
python
# 发送
response = await runtime.send_message(
Greeting("你好"),
recipient=AgentId("hello", "default")
)
print(f"收到响应:{response.content}")
# 接收方
@message_handler
async def on_greeting(self, message: Greeting, ctx: MessageContext):
return Greeting("你好!") # 返回广播示例
python
# 发送
await runtime.publish_message(
News("AI 突破!"),
topic_id=TopicId("news.tech", "default")
)
# 没有响应
# 接收方
@message_handler
async def on_news(self, message: News, ctx: MessageContext):
print(f"收到新闻:{message.headline}")
# 不返回5. 判断消息类型
在处理器中可以通过 ctx.is_rpc 判断:
python
@message_handler
async def on_message(self, message: Msg, ctx: MessageContext):
if ctx.is_rpc:
# RPC 消息,需要返回响应
return Response(...)
else:
# 广播消息,不需要返回
print(f"收到事件:{message}")6. 最佳实践
使用 RPC 的场景
- 需要获取结果
- 一对一通信
- 同步调用
使用广播的场景
- 通知多个订阅者
- 事件驱动架构
- 不需要响应
📝 练习
参考 02-发布订阅 示例代码进行实践。