订阅和主题
Topic 是消息通道,Subscription 决定谁能接收
📖 核心概念
Topic (主题)
消息的逻辑通道,由 TopicId 标识。
python
topic_id = TopicId(type="news.tech", source="default")Subscription (订阅)
决定哪些 Agent 能接收哪些 Topic 的消息。
python
subscription = TypeSubscription(
topic_type="news.tech", # 订阅的主题类型
agent_type="tech_agent" # 接收的 Agent 类型
)1. Subscription 类型
TypeSubscription
订阅特定类型的 Topic。
python
# 订阅 news.tech 类型的所有消息
await runtime.add_subscription(
TypeSubscription("news.tech", "tech_agent")
)TypePrefixSubscription
订阅特定前缀的 Topic。
python
# 订阅所有以 "news." 开头的主题
await runtime.add_subscription(
TypePrefixSubscription("news.", "news_agent")
)DefaultSubscription
使用默认订阅配置。
python
@default_subscription
class MyAgent(RoutedAgent):
pass2. 添加订阅的时机
注册时自动添加
python
# 注册 Agent 时添加
await MyAgent.register(
runtime,
"my_agent",
lambda: MyAgent("desc"),
skip_class_subscriptions=False # 添加类定义的订阅
)手动添加
python
await runtime.add_subscription(
TypeSubscription("topic_type", "agent_type")
)3. 订阅模式
一对一
Agent A ---subscription---> Topic X一对多
Topic X ---subscription---> Agent A
---subscription---> Agent B
---subscription---> Agent C多对一
Topic X ---subscription--─┐
Topic Y ---subscription--─┼──> Agent A
Topic Z ---subscription--─┘4. 消息流示例
发布消息 接收消息
↓ ↑
┌─────────────┐ ┌──────────┐
│ publish_ │ ──────→ │ 订阅匹配 │
│ message() │ 消息 │ 规则检查 │
└─────────────┘ └──────────┘
↓
┌──────────┐
│ Agent 处理│
└──────────┘5. 代码示例
新闻订阅系统
python
from dataclasses import dataclass
from autogen_core import (
RoutedAgent, message_handler, MessageContext,
SingleThreadedAgentRuntime, TopicId,
TypeSubscription, AgentId
)
@dataclass
class News:
category: str
headline: str
class NewsSubscriber(RoutedAgent):
def __init__(self, categories: list[str]):
super().__init__("新闻订阅者")
self.categories = categories
self.news_received = []
@message_handler
async def on_news(self, message: News, ctx: MessageContext):
if message.category in self.categories:
self.news_received.append(message)
print(f"[{self.id}] 收到:{message.headline}")
async def main():
runtime = SingleThreadedAgentRuntime()
# 注册订阅者
await NewsSubscriber.register(
runtime,
"tech_subscriber",
lambda: NewsSubscriber(["tech", "ai"])
)
# 添加订阅
await runtime.add_subscription(
TypeSubscription("news.tech", "tech_subscriber")
)
runtime.start()
# 发布消息
await runtime.publish_message(
News("tech", "新科技发布"),
topic_id=TopicId("news.tech", "default")
)
await runtime.stop_when_idle()6. 关键点
Subscription 的作用域
- Subscription 是全局的,影响所有相同类型的 Agent
- 一个 Agent 可以有多个 Subscription
Topic 匹配规则
TypeSubscription: 精确匹配topic_typeTypePrefixSubscription: 前缀匹配
📝 练习
参考 02-发布订阅 示例代码进行实践。