群聊模式
多 Agent 协作的三种方式
📖 概述
群聊(Group Chat)允许多个 Agent 共享上下文、协作完成任务。
| 模式 | 说明 | 适用场景 |
|---|---|---|
| [轮询](#RoundRobinGroupChat) | 按顺序发言 | 简单轮流讨论 |
| [选择器](#SelectorGroupChat) | 动态选择下一个 | 需要智能调度 |
| [群体](#SwarmGroupChat) | 基于 handoff | 复杂任务分解 |
1. RoundRobinGroupChat (轮询)
按固定顺序让每个 Agent 发言。
python
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
# 创建 Agent
agent1 = AssistantAgent("analyst", model_client=model)
agent2 = AssistantAgent("writer", model_client=model)
agent3 = AssistantAgent("reviewer", model_client=model)
# 创建轮询团队
team = RoundRobinGroupChat(
[agent1, agent2, agent3],
max_turns=5, # 最多 5 轮
)
# 运行
result = await team.run(task="写一篇关于 AI 的文章")流程
轮次 1: analyst → writer → reviewer
轮次 2: analyst → writer → reviewer
...2. SelectorGroupChat (选择器)
使用模型动态选择下一个发言的 Agent。
python
from autogen_agentchat.teams import SelectorGroupChat
team = SelectorGroupChat(
[agent1, agent2, agent3],
model_client=model, # 用于选择下一个发言人
max_turns=5
)流程
轮次 1: analyst (模型选择) → writer (模型选择) → reviewer
轮次 2: analyst (根据上下文选择下一个) → ...3. SwarmGroupChat (群体)
基于 Handoff 的群体协作。
python
from autogen_agentchat.teams import SwarmGroupChat
from autogen_agentchat.base import Handoff
# 定义 handoff
handoff_to_writer = Handoff(target="writer", name="转交写作")
analyst = AssistantAgent("analyst", model_client=model, handoffs=[handoff_to_writer])
writer = AssistantAgent("writer", model_client=model)
team = SwarmGroupChat(
[analyst, writer],
model_client=model,
max_turns=5
)4. 终止条件
MaxMessageTermination
python
from autogen_agentchat.conditions import MaxMessageTermination
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=MaxMessageTermination(max_messages=10)
)TextMentionTermination
python
from autogen_agentchat.conditions import TextMentionTermination
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=TextMentionTermination("完成任务")
)组合条件
python
from autogen_agentchat.conditions import OrTermination
team = RoundRobinGroupChat(
[agent1, agent2],
termination_condition=OrTermination([
MaxMessageTermination(10),
TextMentionTermination("完成")
])
)5. 完整示例
写作团队
python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model = OpenAIChatCompletionClient(model="gpt-4.1")
planner = AssistantAgent(
"planner",
model_client=model,
system_message="你是规划专家,负责制定写作大纲"
)
writer = AssistantAgent(
"writer",
model_client=model,
system_message="你是写作专家,负责根据大纲撰写文章"
)
reviewer = AssistantAgent(
"reviewer",
model_client=model,
system_message="你是审稿专家,负责审查文章质量"
)
team = RoundRobinGroupChat(
[planner, writer, reviewer],
max_turns=3
)
await Console(team.run_stream(task="写一篇关于量子计算的科普文章"))
asyncio.run(main())6. 消息流
用户任务
↓
┌─────────────────────────────────┐
│ GroupChat │
│ ┌─────────────────────────┐ │
│ │ Round 1 │ │
│ │ Agent A → Agent B → C │ │
│ └─────────────────────────┘ │
│ ┌─────────────────────────┐ │
│ │ Round 2 │ │
│ │ Agent A → Agent B → C │ │
│ └─────────────────────────┘ │
└─────────────────────────────────┘
↓
TaskResult (所有消息)7. 最佳实践
选择合适的模式
- 轮询: 简单的头脑风暴、轮流审查
- 选择器: 需要智能调度的复杂任务
- 群体: 任务需要明确分工和交接
设置合理的终止条件
python
# 避免无限循环
team = SelectorGroupChat(
[agent1, agent2],
max_turns=10, # 必须设置
termination_condition=TextMentionTermination("完成")
)📝 练习
- 创建一个三轮写作的 RoundRobinGroupChat
- 尝试使用 SelectorGroupChat 动态选择发言人
- 设置组合终止条件
🔗 相关链接
- [00-AgentChat 总览](00-AgentChat 总览)
- 01-AssistantAgent
- 03-终止条件