# 核心机制解析 ## 1. 事件流与队列的协作 ```python # 思考阶段 - 实时输出 if agent_name in THINKING_AGENTS: output = handlers["thinking"].process_event(event) if output: yield output # 直接输出,不进队列 # 其他阶段 - 进入队列 elif agent_name in STREAM_OUTPUT_AGENTS: output = handlers["stream"].process_event(event) if output: output_queue_service.add_output(workflow_id, output) # 加入队列 ``` ## 2. 输出队列的控制逻辑 - 思考未结束时:队列中的内容被阻塞 - 思考结束后:`set_thinking_ended(True)` 释放队列 - 所有输出按顺序释放 ## 3. 并行处理架构 ```python # 两个并行任务 event_task = asyncio.create_task(run_events()) # 处理事件 queue_task = asyncio.create_task(run_queue()) # 消费队列 ``` ## 4. 运行效果 从输出可以看到: 1. 思考内容立即显示(实时) 2. 分析结果在思考结束后才显示(从队列释放) 3. 助手回答流式输出 4. 酒店卡片结构化展示 5. 推荐详情最后输出 这个 demo 展示了原项目的核心设计思想:通过事件驱动和队列机制,实现多 Agent 协同工作时的输出控制和顺序保证。 ___ ## 1. 用户输入处理 用户发送"帮我推荐北京的酒店",系统开始处理: ```python # workflow_service.py:289 event_processor = graph.astream_events({ "messages": history_messages, # 包含用户输入 "conversation_id": conversation_id, "workflow_id": workflow_id, ... }) ``` ## 2. 多 Agent 协作流程 ### 阶段1:思考阶段(Thinker Agent) ```python # workflow_service.py:127-133 if name in INTENTANALYSER_AGENTS: # thinker 在这个列表中 thinker_output = handlers['thinking'].process_event(event) if thinker_output: yield thinker_output # 直接输出思考内容 if kind == "on_chain_end": output_queue_service.set_end_thinking_state(workflow_id, True) ``` **事件流:** - `on_chain_start` → 输出"正在思考..." - `on_chat_model_stream` → 流式输出思考内容:"用户想要推荐北京的酒店,需要了解预算、入住时间等信息..." - `on_chain_end` → 思考结束,允许后续输出 ### 阶段2:意图识别(Travel Intent Sector) 由于 `travel_intent_sector` 在 `OUTPUT_UNSTREAM_AGENTS` 中,此时输出被加入队列,等待思考结束后才输出。 ```python # workflow_service.py:140 if name in OUTPUT_UNSTREAM_AGENTS: output = handlers['unstream'].process_event(event) if output: output_queue_service.add_output(workflow_id, output) # 加入队列 ``` ### 阶段3:酒店咨询(Hotel Consult Sector) `hotel_consult_sector` 在 `OUTPUT_STREAM_AGENTS` 中,会流式输出内容。 ```python # workflow_service.py:135 if name in OUTPUT_STREAM_AGENTS: output = handlers['stream'].process_event(event) if output: output_queue_service.add_output(workflow_id, output) ``` **流式输出内容:** - "我来为您推荐北京的酒店。" - "北京作为首都,酒店选择非常丰富..." - "请问您的预算范围是多少?" ### 阶段4:Agent 卡片输出 `hotel_consult_sector_by_list` 在 `AGENT_CARD_AGENTS` 中,输出酒店卡片信息,包含酒店列表、价格、位置等。 ```python # workflow_service.py:155 if name in AGENT_CARD_AGENTS: result = handlers['agent_card'].process_event(event) output = process_generator_output(result, output_queue_service, workflow_id) ``` ## 3. 输出队列机制 ### 并行处理架构 ```python # workflow_service.py:324-342 async def run_events(): async for event in event_gen: if event: await output_queue.put(('event', event)) async def run_queue(): async for output in queue_gen: if output: await output_queue.put(('output', output)) # 启动并行任务 event_task = asyncio.create_task(run_events()) queue_task = asyncio.create_task(run_queue()) ``` ### 队列消费逻辑 ```python # workflow_service.py:33-65 (consume_output_queue) while True: # 1. 检查思考是否结束 if output_queue_service.is_end_thinking(workflow_id): outputs = output_queue_service.get_outputs(workflow_id) if outputs: for output in outputs: yield output # 输出队列中的内容 # 2. 检查整体是否结束 if output_queue_service.is_end_output(workflow_id): # 处理剩余输出 while True: remaining_outputs = output_queue_service.get_outputs(workflow_id) if not remaining_outputs: break for output in remaining_outputs: yield output break ``` ## 4. 输出速度控制 不同类型的输出有不同的速度,模拟真实的打字效果。 ```python # workflow_service.py:356-358 event_type = item_data.get("event", "default") await asyncio.sleep(get_output_speed(event_type)) yield item_data ``` ## 5. 最终输出序列 用户看到的输出顺序: 1. **思考阶段(实时)**:"正在分析您的需求..." 2. **等待队列(思考结束后)**: - 意图识别结果 - 酒店推荐开始消息 3. **流式输出**:"我来为您推荐北京的酒店..." 4. **Agent 卡片**:酒店列表展示 5. **结束消息**:对话结束标记 --- > #### ⏺ Update Todos > - [x] 查找和分析 agent 配置文件,了解不同 agent 类型 > - [x] 追踪一个完整的事件流处理过程 > - [x] 分析输出队列的工作机制 > #### ⏺ 这个机制的核心优势在于: > - **解耦性**:各 agent 独立产生输出,通过事件机制解耦 > - **顺序保证**:通过队列机制确保输出顺序的正确性 > - **用户体验**:思考内容实时显示,其他内容有序输出 > - **扩展性**:易于添加新的 agent 类型和输出处理器