- [Controller 层](#Controller%20%E5%B1%82)
- [`generate()`的执行时机](#%60generate()%60%E7%9A%84%E6%89%A7%E8%A1%8C%E6%97%B6%E6%9C%BA)
- [Service 层](#Service%20%E5%B1%82)
# Summary
1. [意图识别](意图识别.md),一层还是多层,如何 check,如何和入口做轻度绑定
# Cues
[LangGraph](LangGraph.md)
[[异步回调]]
[[python异步生成器]]
[[sse_starlette]]
[流式处理](流式处理.md)
[生产者-消费者模式](生产者-消费者模式)
# Notes
## Controller 层
```python
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""SSE流式聊天接口"""
try:
# 检查API密钥
api_key = OPENROUTER_API_KEY or os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise HTTPException(
status_code=500,
detail="请设置环境变量 OPENROUTER_API_KEY"
)
# 生成会话ID
session_id = request.session_id or str(uuid.uuid4())
# 处理消息并返回SSE流
async def generate():
try:
async for event in workflow_service.process_message(
request.message,
session_id
):
yield event
except Exception as e:
logger.error(f"处理消息时出错: {e}")
yield f"data: {{'type': 'error', 'error': '{str(e)}'}}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲
}
)
except Exception as e:
logger.error(f"聊天接口错误: {e}")
raise HTTPException(status_code=500, detail=str(e))```
```Java
时间轴: 客户端 服务器
│ │ │
├─1─────►│ POST /chat/stream │
│ │ │
├─2─────►│ │ 验证API密钥
│ │ │ 生成session_id
│ │ │ 定义 generate() 函数 ← 只是定义,不执行
├─3─────►│ │ 创建StreamingResponse
│ │ │ 设置响应头
│ │ │
├─4─────►│ 收到响应头: │
│ │ HTTP/1.1 200 OK │
│ │ Content-Type: text/event-stream
│ │ Cache-Control: no-cache
│ │ Connection: keep-alive
│ │ X-Accel-Buffering: no
│ │ │
├─5─────►│ 建立SSE连接,等待数据 │
│ │ │
├─6─────►│ │ 启动异步处理
│ │ │ workflow_service.process_message()
│ │ │
├─7─────►│ 收到数据: data: {...} │ yield event1
│ │ │
├─8─────►│ 收到数据: data: {...} │ yield event2
│ │ │
├─9─────►│ 收到数据: data: {...} │ yield event3
│ │ │
└─10────►│ 连接关闭 │ 处理完成
```
1. 客户端发送 POST /chat/stream 请求
↓
2. 服务器立即返回响应头 (HTTP 200 + 各种headers),触发[SSE协议](SSE协议)
↓
3. 客户端收到响应头,建立SSE连接
↓
4. 服务器开始异步处理,通过 generate() 函数推送数据
↓
5. 客户端持续接收流式数据
## `generate()`的执行时机
[[python异步生成器]]
1. async def generate(): - 只是定义函数,不执行
2. generate() - 在 StreamingResponse 中调用时才真正执行
``` python
# 如果立即执行,会阻塞响应头发送
async def chat_stream(request):
# ❌ 错误方式:立即执行会阻塞
result = await workflow_service.process_message(...) # 阻塞!
return StreamingResponse(result, ...) # 响应头延迟发送
# ✅ 正确方式:延迟执行
async def chat_stream(request):
async def generate(): # 定义但不执行
# 这里的代码在响应头发送后才执行
async for event in workflow_service.process_message(...):
yield event
return StreamingResponse(generate(), ...) # 立即返回,响应头先发送
```
# Service 层
[[生产者-消费者模式]]
## demo 版
```python
async def process_message(
self,
user_input: str,
session_id: str
) -> AsyncGenerator[str, None]:
"""处理用户消息并返回SSE流"""
# 创建输出队列
await self.output_queue_sevice.create_queue(session_id)
# 准备输入状态
initial_state = {
"user_input": user_input,
"messages": [],
"metadata": {}
}
# 创建配置
config = {
"configurable": {
"thread_id": session_id
}
}
# 启动两个并行任务
event_task = asyncio.create_task(
self._process_events(initial_state, config, session_id)
)
output_task = asyncio.create_task(
self._process_output_queue(session_id)
)
# 流式输出结果
try:
async for event in self._yield_events(session_id):
yield event
finally:
# 确保任务完成
await event_task
output_task.cancel()
self.output_queue_service.cleanup(session_id)
```
时间轴
```Java
# 时间轴
t0: 用户发送消息 "你好"
t1: 启动 _process_events 任务(生产者)
t2: 启动 _yield_events 循环(消费者)
t3: _yield_events 调用 get(),队列为空,阻塞等待
t4: _process_events 产生事件,放入队列
t5: _yield_events 从队列获取到事件,推送给客户端
t6: _yield_events 继续调用 get(),队列为空,阻塞等待
t7: _process_events 产生更多事件,放入队列
t8: _yield_events 从队列获取到事件,推送给客户端
...
tN: _process_events 产生 END 事件,放入队列
tN+1: _yield_events 获取到 END 事件,退出循环
```
## 生产级版本
```python
async def run_agent_workflow(
user_input_content: str,
user_input_messages: list,
debug: bool = False,
conversation_id: str = "",
message_id: str = "",
route_id: str = "",
extMap: Optional[Dict[str, Any]] = None,
c: str = None,
qunar_info: Optional[Dict[str, Any]] = None,
):
"""
Run the agent workflow with the given user input.
Args:
user_input_messages: The user request messages
debug: If True, enables debug level logging
Returns:
The final state after the workflow completes
"""
if not user_input_messages:
raise ValueError("Input could not be empty")
logger.info(f"Starting workflow with user input: {user_input_messages}")
if not conversation_id:
conversation_id = str(uuid.uuid4())
if not message_id:
message_id = str(uuid.uuid4())
# 生成工作流消息ID
workflow_id = str(uuid.uuid4()).replace('-', '')
output_queue_service = OutputQueueService() # 每个请求使用独立实例
skip_thinker_flag = skip_thinker(user_input_content, extMap)
output_queue_service.init_workflow(workflow_id, skip_thinker_flag)
# 初始化事件处理器
thinking_handler = ThinkingOutputStreamHandler(conversation_id, message_id, workflow_id, output_queue_service)
stream_handler = OutputStreamHandler(conversation_id, message_id, workflow_id, output_queue_service)
unstream_handler = OutputUnstreamHandler(conversation_id, message_id, workflow_id, output_queue_service)
assistant_agent_card_handler = AssistantAgentCardHandler(conversation_id,
message_id,
workflow_id,
output_queue_service)
repeater_handler = RepeaterHandler(conversation_id, message_id, workflow_id, output_queue_service)
drawer_chat_handler = DrawerChatHandler(conversation_id, message_id, workflow_id, output_queue_service)
error_handler = ErrorHandler(conversation_id, message_id, workflow_id, output_queue_service)
react_final_handler = ReActFinalStreamHandler(conversation_id, message_id, workflow_id, output_queue_service)
try:
# 获取历史消息
history_messages = get_history_messages(conversation_id) + user_input_messages
parse_message_content(history_messages)
transcript_context = query_transcript_context(conversation_id)
logger.info("run_agent_workflow_save_message_start")
# human_message_data
human_message_data = {
"conversationId": conversation_id,
"messageId": message_id,
"content": user_input_messages[0].get("content"),
}
# 存储当前用户消息
save_messages([
to_save_message_param(conversation_id, message_id, human_message_data, EventType.HUMAN_MESSAGE.value)
], qunar_info.get("userName"))
logger.info("run_agent_workflow_save_message_end")
start_end_event_data = {"conversationId": conversation_id, "messageId": workflow_id}
common_param = json_utils.from_json(c) or {}
user_location = http_request_service.get_location_info({
"latitude": common_param.get("lat", "")
, "longitude": common_param.get("lgt", "")
}) or {}
country = user_location.get("country", "")
city = user_location.get("cityWithoutSuffix", "")
is_cn = "中国" == country
QMonitor.record_one(f"ai_chat_user_country_is_cn_{is_cn}")
user_location_tags = [f"user_country_{to_pinyin(country)}", f"user_city_{to_pinyin(city)}"]
if is_cn and len(city) > 0:
QMonitor.record_one(f"ai_chat_user_city_{to_pinyin(city)}")
at_lt = parse_auto_test_langfuse_tag(conversation_id)
# 准备事件处理器
handlers = {
'repeater': repeater_handler,
'thinking': thinking_handler,
'stream': stream_handler,
'unstream': unstream_handler,
'agent_card': assistant_agent_card_handler,
'drawer_chat': drawer_chat_handler,
'error_handler': error_handler,
'react_final': react_final_handler
}
event_processor = graph.astream_events(
{
# Runtime Variables
"messages": history_messages,
"conversation_id": conversation_id,
"message_id": message_id,
"workflow_id": workflow_id,
"ext_map": extMap,
"route_id": route_id,
"debug": debug,
"c": c,
"common_param": common_param,
"user_location": user_location,
"qunar_info": qunar_info,
"output_queue_service": output_queue_service,
"skip_thinker": skip_thinker_flag,
"transcript_context": transcript_context,
# 用户当前发送的消息内容
"human_input_content" : user_input_messages[0].get("content"),
}
# 配置 langfuse 的回调
, config={
"callbacks": [get_langfuse_callback()] if is_langfuse_enabled() else [],
"metadata": {
"langfuse_session_id": conversation_id or "",
"langfuse_user_id": (qunar_info or {}).get("userId", ""),
"langfuse_tags": get_langfuse_tags() + ([at_lt] if at_lt else []) + user_location_tags,
}
}
)
# 创建事件处理和队列消费生成器
event_gen = process_events(event_processor, handlers, output_queue_service, conversation_id, workflow_id, message_id)
queue_gen = consume_output_queue(output_queue_service, workflow_id)
# 创建输出队列用于实时收集结果
output_queue = asyncio.Queue()
async def run_events():
try:
async for event in event_gen:
if event:
await output_queue.put(('event', event))
finally:
await output_queue.put(('done', 'events'))
async def run_queue():
try:
async for output in queue_gen:
if output:
await output_queue.put(('output', output))
finally:
await output_queue.put(('done', 'queue'))
# 启动两个并行任务
event_task = asyncio.create_task(run_events())
queue_task = asyncio.create_task(run_queue())
# 实时处理输出
done_count = 0
try:
while done_count < 2:
item_type, item_data = await output_queue.get()
if item_type == 'done':
done_count += 1
# 如果是事件处理完成,设置输出结束状态,让队列知道不会再有新输出
if item_data == 'events':
output_queue_service.set_end_output_state(workflow_id, True)
elif item_type in ['event', 'output']:
# 控制输出速度 - 使用输出中的事件类型,如果没有则使用默认速度
event_type = item_data.get("event", "default")
await asyncio.sleep(get_output_speed(event_type))
yield item_data
finally:
# 确保任务完成
if not event_task.done():
event_task.cancel()
if not queue_task.done():
queue_task.cancel()
await asyncio.gather(event_task, queue_task, return_exceptions=True)
# 发送结束事件
output_queue_service.add_save_message(
workflow_id,
[to_save_message_param(conversation_id, workflow_id, start_end_event_data,
EventType.CONVERSATION_ENDING.value)],
qunar_info.get("userName")
)
# 统一保存消息队列中的所有消息,确保顺序一致
save_message_requests = output_queue_service.get_save_messages(workflow_id)
collect_and_save_messages(save_message_requests, qunar_info.get("userName", ""))
# 当前对话所有消息处理完成,发送对话结束事件
# 处理reception节点的goto结果
_handle_reception_goto_result(output_queue_service, workflow_id, conversation_id, message_id)
yield {
"event": EventType.CONVERSATION_ENDING.value,
"data": start_end_event_data
}
except asyncio.CancelledError:
logger.error("Workflow cancelled at top level", exc_info=True)
QMonitor.record_one("workflow_cancelled_at_top_level")
raise
finally:
# 设置输出结束状态,确保SSE流能够正常关闭
output_queue_service.set_end_output_state(workflow_id, True)
output_queue_service.clear_workflow(workflow_id)
```
### 两个生产者 + 一个消费者
两个生产者 + 一个消费者 同时生产同时消费
时间轴:
```Java
event_task queue_task 主循环(消费者)
│ │ │ │
├─0─────►│ 开始处理AI事件 │ │
├─0─────►│ │ 开始处理输出队列 │
├─0─────►│ │ │ 开始消费循环
│ │ │ │
├─1─────►│ 产生思考事件 │ │
├─1─────►│ 放入队列 │ │
├─1─────►│ │ │ 获取到思考事件
├─1─────►│ │ │ 推送给客户端
│ │ │ │
├─2─────►│ 继续处理 │ 从输出队列获取数据 │
├─2─────►│ │ 放入统一队列 │
├─2─────►│ │ │ 获取到输出数据
├─2─────►│ │ │ 推送给客户端
│ │ │ │
├─3─────►│ 产生回复事件 │ 继续处理 │
├─3─────►│ 放入队列 │ │
├─3─────►│ │ │ 获取到回复事件
├─3─────►│ │ │ 推送给客户端
│ │ │ │
└─4─────►│ 任务完成 │ 任务完成 │ 循环结束
```