# 核心机制解析
## 1. 事件流与队列的协作
```python
# 思考阶段 - 实时输出
if agent_name in THINKING_AGENTS:
output = handlers["thinking"].process_event(event)
if output:
yield output # 直接输出,不进队列
# 其他阶段 - 进入队列
elif agent_name in STREAM_OUTPUT_AGENTS:
output = handlers["stream"].process_event(event)
if output:
output_queue_service.add_output(workflow_id, output) # 加入队列
```
## 2. 输出队列的控制逻辑
- 思考未结束时:队列中的内容被阻塞
- 思考结束后:`set_thinking_ended(True)` 释放队列
- 所有输出按顺序释放
## 3. 并行处理架构
```python
# 两个并行任务
event_task = asyncio.create_task(run_events()) # 处理事件
queue_task = asyncio.create_task(run_queue()) # 消费队列
```
## 4. 运行效果
从输出可以看到:
1. 思考内容立即显示(实时)
2. 分析结果在思考结束后才显示(从队列释放)
3. 助手回答流式输出
4. 酒店卡片结构化展示
5. 推荐详情最后输出
这个 demo 展示了原项目的核心设计思想:通过事件驱动和队列机制,实现多 Agent 协同工作时的输出控制和顺序保证。
___
## 1. 用户输入处理
用户发送"帮我推荐北京的酒店",系统开始处理:
```python
# workflow_service.py:289
event_processor = graph.astream_events({
"messages": history_messages, # 包含用户输入
"conversation_id": conversation_id,
"workflow_id": workflow_id,
...
})
```
## 2. 多 Agent 协作流程
### 阶段1:思考阶段(Thinker Agent)
```python
# workflow_service.py:127-133
if name in INTENTANALYSER_AGENTS: # thinker 在这个列表中
thinker_output = handlers['thinking'].process_event(event)
if thinker_output:
yield thinker_output # 直接输出思考内容
if kind == "on_chain_end":
output_queue_service.set_end_thinking_state(workflow_id, True)
```
**事件流:**
- `on_chain_start` → 输出"正在思考..."
- `on_chat_model_stream` → 流式输出思考内容:"用户想要推荐北京的酒店,需要了解预算、入住时间等信息..."
- `on_chain_end` → 思考结束,允许后续输出
### 阶段2:意图识别(Travel Intent Sector)
由于 `travel_intent_sector` 在 `OUTPUT_UNSTREAM_AGENTS` 中,此时输出被加入队列,等待思考结束后才输出。
```python
# workflow_service.py:140
if name in OUTPUT_UNSTREAM_AGENTS:
output = handlers['unstream'].process_event(event)
if output:
output_queue_service.add_output(workflow_id, output) # 加入队列
```
### 阶段3:酒店咨询(Hotel Consult Sector)
`hotel_consult_sector` 在 `OUTPUT_STREAM_AGENTS` 中,会流式输出内容。
```python
# workflow_service.py:135
if name in OUTPUT_STREAM_AGENTS:
output = handlers['stream'].process_event(event)
if output:
output_queue_service.add_output(workflow_id, output)
```
**流式输出内容:**
- "我来为您推荐北京的酒店。"
- "北京作为首都,酒店选择非常丰富..."
- "请问您的预算范围是多少?"
### 阶段4:Agent 卡片输出
`hotel_consult_sector_by_list` 在 `AGENT_CARD_AGENTS` 中,输出酒店卡片信息,包含酒店列表、价格、位置等。
```python
# workflow_service.py:155
if name in AGENT_CARD_AGENTS:
result = handlers['agent_card'].process_event(event)
output = process_generator_output(result, output_queue_service, workflow_id)
```
## 3. 输出队列机制
### 并行处理架构
```python
# workflow_service.py:324-342
async def run_events():
async for event in event_gen:
if event:
await output_queue.put(('event', event))
async def run_queue():
async for output in queue_gen:
if output:
await output_queue.put(('output', output))
# 启动并行任务
event_task = asyncio.create_task(run_events())
queue_task = asyncio.create_task(run_queue())
```
### 队列消费逻辑
```python
# workflow_service.py:33-65 (consume_output_queue)
while True:
# 1. 检查思考是否结束
if output_queue_service.is_end_thinking(workflow_id):
outputs = output_queue_service.get_outputs(workflow_id)
if outputs:
for output in outputs:
yield output # 输出队列中的内容
# 2. 检查整体是否结束
if output_queue_service.is_end_output(workflow_id):
# 处理剩余输出
while True:
remaining_outputs = output_queue_service.get_outputs(workflow_id)
if not remaining_outputs:
break
for output in remaining_outputs:
yield output
break
```
## 4. 输出速度控制
不同类型的输出有不同的速度,模拟真实的打字效果。
```python
# workflow_service.py:356-358
event_type = item_data.get("event", "default")
await asyncio.sleep(get_output_speed(event_type))
yield item_data
```
## 5. 最终输出序列
用户看到的输出顺序:
1. **思考阶段(实时)**:"正在分析您的需求..."
2. **等待队列(思考结束后)**:
- 意图识别结果
- 酒店推荐开始消息
3. **流式输出**:"我来为您推荐北京的酒店..."
4. **Agent 卡片**:酒店列表展示
5. **结束消息**:对话结束标记
---
> #### ⏺ Update Todos
> - [x] 查找和分析 agent 配置文件,了解不同 agent 类型
> - [x] 追踪一个完整的事件流处理过程
> - [x] 分析输出队列的工作机制
> #### ⏺ 这个机制的核心优势在于:
> - **解耦性**:各 agent 独立产生输出,通过事件机制解耦
> - **顺序保证**:通过队列机制确保输出顺序的正确性
> - **用户体验**:思考内容实时显示,其他内容有序输出
> - **扩展性**:易于添加新的 agent 类型和输出处理器