跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://docs.xpertai.cn/llms.txt

Use this file to discover all available pages before exploring further.

Xpert Chat API 使用 handoff 队列在 HTTP/SSE 进程之外执行智能体对话。HTTP 实例只负责保持客户端 SSE 连接、订阅实时结果通道并转发事件;真正的 XpertChatCommand 由 Bull handoff worker 执行,worker 再通过 Redis Pub/Sub 把流式结果发布回来。 该架构覆盖:
  • POST /api/xpert/:id/chat
  • POST /api/xpert/:name/chat-app
这条链路替换了 Xpert Chat SSE 以前依赖进程内 task map 和 pending-result map 的回传方式,避免多实例部署下结果回不到原请求进程。

目标

  • 支持多个 API 实例和 worker 实例,不依赖粘性会话。
  • 保持 HTTP API 和正常 SSE 流式事件格式不变。
  • 避免 Xpert Chat SSE 请求因为进程内状态导致 Local task not foundPending result timeout
  • Redis Pub/Sub 只做实时转发;浏览器断线后不补历史事件。

运行链路

Browser
  |
  | POST /api/xpert/:id/chat 或 /api/xpert/:name/chat-app
  v
API instance
  |
  | 1. 创建 runId: xpert-chat-<uuid>
  | 2. 订阅 Redis channel ai:handoff:agent-chat:<runId>
  | 3. 订阅成功后 enqueue agent.chat_dispatch.v1
  v
Bull handoff queue
  |
  | 4. 任意 worker 实例消费 job
  v
AgentChatDispatchHandoffProcessor
  |
  | 5. 执行 XpertChatCommand(request, options)
  | 6. 向 Redis Pub/Sub 发布 stream、complete 或 error
  v
API instance
  |
  | 7. 将 stream 原样转发为 SSE MessageEvent
  | 8. complete 结束;error 发出 event: error 后结束
  v
Browser
API 实例会先完成订阅,再把 handoff job 入队。这样可以避免 worker 执行太快,首个 token 在 SSE 进程开始监听前就被发布出去。

核心组件

组件职责
XpertController接收 chat 请求,构造可序列化的 agent.chat_dispatch.v1 handoff message,并返回 SSE observable。
AgentChatRealtimeService管理 Redis Pub/Sub channel,保证先订阅后入队,将实时 payload 转成 SSE,并在客户端断开时取消 run。
HandoffQueueService将 handoff message 入 Bull 队列。
AgentChatDispatchHandoffProcessor在 worker 进程执行 XpertChatCommand,并把 observable 事件发布到 Redis Pub/Sub。
StopHandoffMessageCommandSSE 客户端断开时,跨实例取消 waiting 或 active handoff job。

消息契约

controller 入队的 handoff message 类型为 agent.chat_dispatch.v1。由于 job 可能在另一个进程执行,requestoptions 必须是可序列化数据。
{
  id: "xpert-chat-<uuid>",
  type: "agent.chat_dispatch.v1",
  tenantId,
  sessionKey,
  businessKey: sessionKey,
  traceId,
  payload: {
    request,
    options,
    callback: {
      transport: "redis-pubsub"
    },
    executionId
  },
  headers: {
    organizationId,
    userId,
    language,
    conversationId,
    source: "chat"
  }
}
runId 直接使用 handoff message 的 id。实时 Redis channel 约定为:
ai:handoff:agent-chat:<runId>

Callback Transport

AgentChatCallbackTarget 支持两类回传 transport:
type AgentChatCallbackTarget =
  | {
      transport?: "handoff-message"
      messageType: string
      headers?: Record<string, string>
      context?: Record<string, unknown>
    }
  | {
      transport: "redis-pubsub"
      context?: Record<string, unknown>
    }
handoff-message 是兼容模式,会继续通过 handoff 队列发布 callback message,因此仍需要 messageType redis-pubsub 是 Xpert Chat SSE 使用的模式,会直接向 Redis 发布实时 envelope:
{
  kind: "stream" | "complete" | "error",
  sourceMessageId: runId,
  sequence,
  event,
  error,
  context
}

SSE 语义

  • stream:原样转发为 MessageEvent
  • complete:结束 SSE observable。
  • error:发送 SSE error event,然后结束 observable。
  • 客户端断开:触发 StopHandoffMessageCommand({ messageIds: [runId] })
  • keepalive 行为保持不变,controller 仍然套用 keepAlive(30000)takeUntilClose(res)
Redis Pub/Sub 只作为实时通道使用。如果浏览器断开,本次 chat run 会被取消,客户端需要重新发起请求;系统不会从数据库或 Redis 中补发历史流式事件。

运维与排障

  • Redis 是 Bull 队列和实时 Pub/Sub 的必备依赖。
  • API 实例和 worker 实例可以独立扩缩容;执行 XpertChatCommand 的 worker 不需要和接收 HTTP 请求的进程相同。
  • 如果请求卡住,优先检查 API 实例是否订阅了 ai:handoff:agent-chat:<runId>,handoff job 是否成功入队,以及 worker 是否发布了 streamcompleteerror
  • Local task not foundPending result timeout 不应再出现在 Xpert Chat SSE 链路上。如果仍然出现,通常说明该请求还在走旧的本地 handoff 路径。

代码位置

文件作用
xpert/packages/server-ai/src/xpert/xpert.controller.ts构造 dispatch message,并打开实时 SSE stream。
xpert/packages/server-ai/src/handoff/agent-chat-realtime.service.tsRedis Pub/Sub 结果桥接和客户端断开取消。
xpert/packages/server-ai/src/handoff/plugins/agent-chat/agent-chat-dispatch.processor.tsworker 侧执行和实时 callback 发布。
xpert/packages/plugin-sdk/src/lib/agent/handoff/agent-chat.contract.ts共享 message 与 callback transport 契约。
xpert/packages/server-ai/src/handoff/message-queue.module.ts注册并导出 realtime service。