Xpert Chat API 使用 handoff 队列在 HTTP/SSE 进程之外执行智能体对话。HTTP 实例只负责保持客户端 SSE 连接、订阅实时结果通道并转发事件;真正的Documentation Index
Fetch the complete documentation index at: https://docs.xpertai.cn/llms.txt
Use this file to discover all available pages before exploring further.
XpertChatCommand 由 Bull handoff worker 执行,worker 再通过 Redis Pub/Sub 把流式结果发布回来。
该架构覆盖:
POST /api/xpert/:id/chatPOST /api/xpert/:name/chat-app
目标
- 支持多个 API 实例和 worker 实例,不依赖粘性会话。
- 保持 HTTP API 和正常 SSE 流式事件格式不变。
- 避免 Xpert Chat SSE 请求因为进程内状态导致
Local task not found和Pending result timeout。 - Redis Pub/Sub 只做实时转发;浏览器断线后不补历史事件。
运行链路
核心组件
| 组件 | 职责 |
|---|---|
XpertController | 接收 chat 请求,构造可序列化的 agent.chat_dispatch.v1 handoff message,并返回 SSE observable。 |
AgentChatRealtimeService | 管理 Redis Pub/Sub channel,保证先订阅后入队,将实时 payload 转成 SSE,并在客户端断开时取消 run。 |
HandoffQueueService | 将 handoff message 入 Bull 队列。 |
AgentChatDispatchHandoffProcessor | 在 worker 进程执行 XpertChatCommand,并把 observable 事件发布到 Redis Pub/Sub。 |
StopHandoffMessageCommand | SSE 客户端断开时,跨实例取消 waiting 或 active handoff job。 |
消息契约
controller 入队的 handoff message 类型为agent.chat_dispatch.v1。由于 job 可能在另一个进程执行,request 和 options 必须是可序列化数据。
runId 直接使用 handoff message 的 id。实时 Redis channel 约定为:
Callback Transport
AgentChatCallbackTarget 支持两类回传 transport:
handoff-message 是兼容模式,会继续通过 handoff 队列发布 callback message,因此仍需要 messageType。
redis-pubsub 是 Xpert Chat SSE 使用的模式,会直接向 Redis 发布实时 envelope:
SSE 语义
stream:原样转发为MessageEvent。complete:结束 SSE observable。error:发送 SSE error event,然后结束 observable。- 客户端断开:触发
StopHandoffMessageCommand({ messageIds: [runId] })。 - keepalive 行为保持不变,controller 仍然套用
keepAlive(30000)和takeUntilClose(res)。
运维与排障
- Redis 是 Bull 队列和实时 Pub/Sub 的必备依赖。
- API 实例和 worker 实例可以独立扩缩容;执行
XpertChatCommand的 worker 不需要和接收 HTTP 请求的进程相同。 - 如果请求卡住,优先检查 API 实例是否订阅了
ai:handoff:agent-chat:<runId>,handoff job 是否成功入队,以及 worker 是否发布了stream、complete或error。 Local task not found和Pending result timeout不应再出现在 Xpert Chat SSE 链路上。如果仍然出现,通常说明该请求还在走旧的本地 handoff 路径。
代码位置
| 文件 | 作用 |
|---|---|
xpert/packages/server-ai/src/xpert/xpert.controller.ts | 构造 dispatch message,并打开实时 SSE stream。 |
xpert/packages/server-ai/src/handoff/agent-chat-realtime.service.ts | Redis Pub/Sub 结果桥接和客户端断开取消。 |
xpert/packages/server-ai/src/handoff/plugins/agent-chat/agent-chat-dispatch.processor.ts | worker 侧执行和实时 callback 发布。 |
xpert/packages/plugin-sdk/src/lib/agent/handoff/agent-chat.contract.ts | 共享 message 与 callback transport 契约。 |
xpert/packages/server-ai/src/handoff/message-queue.module.ts | 注册并导出 realtime service。 |