- [Controller 层](#Controller%20%E5%B1%82) - [`generate()`的执行时机](#%60generate()%60%E7%9A%84%E6%89%A7%E8%A1%8C%E6%97%B6%E6%9C%BA) - [Service 层](#Service%20%E5%B1%82) # Summary 1. [意图识别](意图识别.md),一层还是多层,如何 check,如何和入口做轻度绑定 # Cues [LangGraph](LangGraph.md) [[异步回调]] [[python异步生成器]] [[sse_starlette]] [流式处理](流式处理.md) [生产者-消费者模式](生产者-消费者模式) # Notes ## Controller 层 ```python @app.post("/chat/stream") async def chat_stream(request: ChatRequest): """SSE流式聊天接口""" try: # 检查API密钥 api_key = OPENROUTER_API_KEY or os.getenv("OPENROUTER_API_KEY") if not api_key: raise HTTPException( status_code=500, detail="请设置环境变量 OPENROUTER_API_KEY" ) # 生成会话ID session_id = request.session_id or str(uuid.uuid4()) # 处理消息并返回SSE流 async def generate(): try: async for event in workflow_service.process_message( request.message, session_id ): yield event except Exception as e: logger.error(f"处理消息时出错: {e}") yield f"data: {{'type': 'error', 'error': '{str(e)}'}}\n\n" return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # 禁用Nginx缓冲 } ) except Exception as e: logger.error(f"聊天接口错误: {e}") raise HTTPException(status_code=500, detail=str(e))``` ```Java 时间轴: 客户端 服务器 │ │ │ ├─1─────►│ POST /chat/stream │ │ │ │ ├─2─────►│ │ 验证API密钥 │ │ │ 生成session_id │ │ │ 定义 generate() 函数 ← 只是定义,不执行 ├─3─────►│ │ 创建StreamingResponse │ │ │ 设置响应头 │ │ │ ├─4─────►│ 收到响应头: │ │ │ HTTP/1.1 200 OK │ │ │ Content-Type: text/event-stream │ │ Cache-Control: no-cache │ │ Connection: keep-alive │ │ X-Accel-Buffering: no │ │ │ ├─5─────►│ 建立SSE连接,等待数据 │ │ │ │ ├─6─────►│ │ 启动异步处理 │ │ │ workflow_service.process_message() │ │ │ ├─7─────►│ 收到数据: data: {...} │ yield event1 │ │ │ ├─8─────►│ 收到数据: data: {...} │ yield event2 │ │ │ ├─9─────►│ 收到数据: data: {...} │ yield event3 │ │ │ └─10────►│ 连接关闭 │ 处理完成 ``` 1. 客户端发送 POST /chat/stream 请求 ↓ 2. 服务器立即返回响应头 (HTTP 200 + 各种headers),触发[SSE协议](SSE协议) ↓ 3. 客户端收到响应头,建立SSE连接 ↓ 4. 服务器开始异步处理,通过 generate() 函数推送数据 ↓ 5. 客户端持续接收流式数据 ## `generate()`的执行时机 [[python异步生成器]] 1. async def generate(): - 只是定义函数,不执行 2. generate() - 在 StreamingResponse 中调用时才真正执行 ``` python # 如果立即执行,会阻塞响应头发送 async def chat_stream(request): # ❌ 错误方式:立即执行会阻塞 result = await workflow_service.process_message(...) # 阻塞! return StreamingResponse(result, ...) # 响应头延迟发送 # ✅ 正确方式:延迟执行 async def chat_stream(request): async def generate(): # 定义但不执行 # 这里的代码在响应头发送后才执行 async for event in workflow_service.process_message(...): yield event return StreamingResponse(generate(), ...) # 立即返回,响应头先发送 ``` # Service 层 [[生产者-消费者模式]] ## demo 版 ```python async def process_message( self, user_input: str, session_id: str ) -> AsyncGenerator[str, None]: """处理用户消息并返回SSE流""" # 创建输出队列 await self.output_queue_sevice.create_queue(session_id) # 准备输入状态 initial_state = { "user_input": user_input, "messages": [], "metadata": {} } # 创建配置 config = { "configurable": { "thread_id": session_id } } # 启动两个并行任务 event_task = asyncio.create_task( self._process_events(initial_state, config, session_id) ) output_task = asyncio.create_task( self._process_output_queue(session_id) ) # 流式输出结果 try: async for event in self._yield_events(session_id): yield event finally: # 确保任务完成 await event_task output_task.cancel() self.output_queue_service.cleanup(session_id) ``` 时间轴 ```Java # 时间轴 t0: 用户发送消息 "你好" t1: 启动 _process_events 任务(生产者) t2: 启动 _yield_events 循环(消费者) t3: _yield_events 调用 get(),队列为空,阻塞等待 t4: _process_events 产生事件,放入队列 t5: _yield_events 从队列获取到事件,推送给客户端 t6: _yield_events 继续调用 get(),队列为空,阻塞等待 t7: _process_events 产生更多事件,放入队列 t8: _yield_events 从队列获取到事件,推送给客户端 ... tN: _process_events 产生 END 事件,放入队列 tN+1: _yield_events 获取到 END 事件,退出循环 ``` ## 生产级版本 ```python async def run_agent_workflow( user_input_content: str, user_input_messages: list, debug: bool = False, conversation_id: str = "", message_id: str = "", route_id: str = "", extMap: Optional[Dict[str, Any]] = None, c: str = None, qunar_info: Optional[Dict[str, Any]] = None, ): """ Run the agent workflow with the given user input. Args: user_input_messages: The user request messages debug: If True, enables debug level logging Returns: The final state after the workflow completes """ if not user_input_messages: raise ValueError("Input could not be empty") logger.info(f"Starting workflow with user input: {user_input_messages}") if not conversation_id: conversation_id = str(uuid.uuid4()) if not message_id: message_id = str(uuid.uuid4()) # 生成工作流消息ID workflow_id = str(uuid.uuid4()).replace('-', '') output_queue_service = OutputQueueService() # 每个请求使用独立实例 skip_thinker_flag = skip_thinker(user_input_content, extMap) output_queue_service.init_workflow(workflow_id, skip_thinker_flag) # 初始化事件处理器 thinking_handler = ThinkingOutputStreamHandler(conversation_id, message_id, workflow_id, output_queue_service) stream_handler = OutputStreamHandler(conversation_id, message_id, workflow_id, output_queue_service) unstream_handler = OutputUnstreamHandler(conversation_id, message_id, workflow_id, output_queue_service) assistant_agent_card_handler = AssistantAgentCardHandler(conversation_id, message_id, workflow_id, output_queue_service) repeater_handler = RepeaterHandler(conversation_id, message_id, workflow_id, output_queue_service) drawer_chat_handler = DrawerChatHandler(conversation_id, message_id, workflow_id, output_queue_service) error_handler = ErrorHandler(conversation_id, message_id, workflow_id, output_queue_service) react_final_handler = ReActFinalStreamHandler(conversation_id, message_id, workflow_id, output_queue_service) try: # 获取历史消息 history_messages = get_history_messages(conversation_id) + user_input_messages parse_message_content(history_messages) transcript_context = query_transcript_context(conversation_id) logger.info("run_agent_workflow_save_message_start") # human_message_data human_message_data = { "conversationId": conversation_id, "messageId": message_id, "content": user_input_messages[0].get("content"), } # 存储当前用户消息 save_messages([ to_save_message_param(conversation_id, message_id, human_message_data, EventType.HUMAN_MESSAGE.value) ], qunar_info.get("userName")) logger.info("run_agent_workflow_save_message_end") start_end_event_data = {"conversationId": conversation_id, "messageId": workflow_id} common_param = json_utils.from_json(c) or {} user_location = http_request_service.get_location_info({ "latitude": common_param.get("lat", "") , "longitude": common_param.get("lgt", "") }) or {} country = user_location.get("country", "") city = user_location.get("cityWithoutSuffix", "") is_cn = "中国" == country QMonitor.record_one(f"ai_chat_user_country_is_cn_{is_cn}") user_location_tags = [f"user_country_{to_pinyin(country)}", f"user_city_{to_pinyin(city)}"] if is_cn and len(city) > 0: QMonitor.record_one(f"ai_chat_user_city_{to_pinyin(city)}") at_lt = parse_auto_test_langfuse_tag(conversation_id) # 准备事件处理器 handlers = { 'repeater': repeater_handler, 'thinking': thinking_handler, 'stream': stream_handler, 'unstream': unstream_handler, 'agent_card': assistant_agent_card_handler, 'drawer_chat': drawer_chat_handler, 'error_handler': error_handler, 'react_final': react_final_handler } event_processor = graph.astream_events( { # Runtime Variables "messages": history_messages, "conversation_id": conversation_id, "message_id": message_id, "workflow_id": workflow_id, "ext_map": extMap, "route_id": route_id, "debug": debug, "c": c, "common_param": common_param, "user_location": user_location, "qunar_info": qunar_info, "output_queue_service": output_queue_service, "skip_thinker": skip_thinker_flag, "transcript_context": transcript_context, # 用户当前发送的消息内容 "human_input_content" : user_input_messages[0].get("content"), } # 配置 langfuse 的回调 , config={ "callbacks": [get_langfuse_callback()] if is_langfuse_enabled() else [], "metadata": { "langfuse_session_id": conversation_id or "", "langfuse_user_id": (qunar_info or {}).get("userId", ""), "langfuse_tags": get_langfuse_tags() + ([at_lt] if at_lt else []) + user_location_tags, } } ) # 创建事件处理和队列消费生成器 event_gen = process_events(event_processor, handlers, output_queue_service, conversation_id, workflow_id, message_id) queue_gen = consume_output_queue(output_queue_service, workflow_id) # 创建输出队列用于实时收集结果 output_queue = asyncio.Queue() async def run_events(): try: async for event in event_gen: if event: await output_queue.put(('event', event)) finally: await output_queue.put(('done', 'events')) async def run_queue(): try: async for output in queue_gen: if output: await output_queue.put(('output', output)) finally: await output_queue.put(('done', 'queue')) # 启动两个并行任务 event_task = asyncio.create_task(run_events()) queue_task = asyncio.create_task(run_queue()) # 实时处理输出 done_count = 0 try: while done_count < 2: item_type, item_data = await output_queue.get() if item_type == 'done': done_count += 1 # 如果是事件处理完成,设置输出结束状态,让队列知道不会再有新输出 if item_data == 'events': output_queue_service.set_end_output_state(workflow_id, True) elif item_type in ['event', 'output']: # 控制输出速度 - 使用输出中的事件类型,如果没有则使用默认速度 event_type = item_data.get("event", "default") await asyncio.sleep(get_output_speed(event_type)) yield item_data finally: # 确保任务完成 if not event_task.done(): event_task.cancel() if not queue_task.done(): queue_task.cancel() await asyncio.gather(event_task, queue_task, return_exceptions=True) # 发送结束事件 output_queue_service.add_save_message( workflow_id, [to_save_message_param(conversation_id, workflow_id, start_end_event_data, EventType.CONVERSATION_ENDING.value)], qunar_info.get("userName") ) # 统一保存消息队列中的所有消息,确保顺序一致 save_message_requests = output_queue_service.get_save_messages(workflow_id) collect_and_save_messages(save_message_requests, qunar_info.get("userName", "")) # 当前对话所有消息处理完成,发送对话结束事件 # 处理reception节点的goto结果 _handle_reception_goto_result(output_queue_service, workflow_id, conversation_id, message_id) yield { "event": EventType.CONVERSATION_ENDING.value, "data": start_end_event_data } except asyncio.CancelledError: logger.error("Workflow cancelled at top level", exc_info=True) QMonitor.record_one("workflow_cancelled_at_top_level") raise finally: # 设置输出结束状态,确保SSE流能够正常关闭 output_queue_service.set_end_output_state(workflow_id, True) output_queue_service.clear_workflow(workflow_id) ``` ### 两个生产者 + 一个消费者 两个生产者 + 一个消费者 同时生产同时消费 时间轴: ```Java event_task queue_task 主循环(消费者) │ │ │ │ ├─0─────►│ 开始处理AI事件 │ │ ├─0─────►│ │ 开始处理输出队列 │ ├─0─────►│ │ │ 开始消费循环 │ │ │ │ ├─1─────►│ 产生思考事件 │ │ ├─1─────►│ 放入队列 │ │ ├─1─────►│ │ │ 获取到思考事件 ├─1─────►│ │ │ 推送给客户端 │ │ │ │ ├─2─────►│ 继续处理 │ 从输出队列获取数据 │ ├─2─────►│ │ 放入统一队列 │ ├─2─────►│ │ │ 获取到输出数据 ├─2─────►│ │ │ 推送给客户端 │ │ │ │ ├─3─────►│ 产生回复事件 │ 继续处理 │ ├─3─────►│ 放入队列 │ │ ├─3─────►│ │ │ 获取到回复事件 ├─3─────►│ │ │ 推送给客户端 │ │ │ │ └─4─────►│ 任务完成 │ 任务完成 │ 循环结束 ```