Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.xpertai.cn/llms.txt

Use this file to discover all available pages before exploring further.

The Xpert chat API uses the handoff queue to run agent chat work outside the HTTP/SSE process. The HTTP instance only keeps the client stream open and subscribes to realtime results. A Bull handoff worker executes the actual XpertChatCommand and publishes stream events back through Redis Pub/Sub. This architecture is used by:
  • POST /api/xpert/:id/chat
  • POST /api/xpert/:name/chat-app
It replaces the previous process-local callback path for Xpert Chat SSE, where the request process had to wait on in-memory task and pending-result maps.

Goals

  • Support multiple API and worker instances without sticky sessions.
  • Keep the public HTTP API and normal SSE stream payloads unchanged.
  • Avoid Local task not found and Pending result timeout on Xpert Chat SSE requests caused by process-local state.
  • Use Redis Pub/Sub only for live forwarding. The system does not replay missed chat stream events after disconnect.

Runtime Flow

Browser
  |
  | POST /api/xpert/:id/chat or /api/xpert/:name/chat-app
  v
API instance
  |
  | 1. create runId: xpert-chat-<uuid>
  | 2. subscribe to Redis channel ai:handoff:agent-chat:<runId>
  | 3. after subscribe succeeds, enqueue agent.chat_dispatch.v1
  v
Bull handoff queue
  |
  | 4. any worker instance consumes the job
  v
AgentChatDispatchHandoffProcessor
  |
  | 5. execute XpertChatCommand(request, options)
  | 6. publish stream, complete, or error payloads to Redis Pub/Sub
  v
API instance
  |
  | 7. forward stream payloads as SSE MessageEvent
  | 8. complete or emit event: error, then close
  v
Browser
The API instance subscribes before enqueueing the job. This prevents a fast worker from publishing the first token before the SSE process is listening.

Main Components

ComponentResponsibility
XpertControllerReceives chat requests, builds a serializable agent.chat_dispatch.v1 handoff message, and returns an SSE observable.
AgentChatRealtimeServiceOwns the Redis Pub/Sub channel, subscribes before enqueue, forwards realtime payloads to SSE, and cancels the run on client disconnect.
HandoffQueueServiceEnqueues the handoff message into the Bull queue.
AgentChatDispatchHandoffProcessorRuns XpertChatCommand in the worker process and publishes observable events back to Redis Pub/Sub.
StopHandoffMessageCommandCancels waiting or active handoff jobs across instances when the SSE client closes.

Message Contract

The controller enqueues a handoff message with type agent.chat_dispatch.v1. The request and options must be serializable because the job can run in another process.
{
  id: "xpert-chat-<uuid>",
  type: "agent.chat_dispatch.v1",
  tenantId,
  sessionKey,
  businessKey: sessionKey,
  traceId,
  payload: {
    request,
    options,
    callback: {
      transport: "redis-pubsub"
    },
    executionId
  },
  headers: {
    organizationId,
    userId,
    language,
    conversationId,
    source: "chat"
  }
}
runId is the same value as the handoff message id. The realtime Redis channel is:
ai:handoff:agent-chat:<runId>

Callback Transports

AgentChatCallbackTarget supports two callback transports:
type AgentChatCallbackTarget =
  | {
      transport?: "handoff-message"
      messageType: string
      headers?: Record<string, string>
      context?: Record<string, unknown>
    }
  | {
      transport: "redis-pubsub"
      context?: Record<string, unknown>
    }
handoff-message is the compatibility mode. It publishes callback messages through the handoff queue and still requires messageType. redis-pubsub is the Xpert Chat SSE mode. It publishes realtime envelopes directly to Redis:
{
  kind: "stream" | "complete" | "error",
  sourceMessageId: runId,
  sequence,
  event,
  error,
  context
}

SSE Semantics

  • stream: forwarded as the original MessageEvent payload.
  • complete: completes the SSE observable.
  • error: sends an SSE error event and then completes the observable.
  • Client close: triggers StopHandoffMessageCommand({ messageIds: [runId] }).
  • Keepalive behavior remains unchanged. The controller still applies keepAlive(30000) and takeUntilClose(res).
Redis Pub/Sub is a live transport only. If the browser disconnects, the chat run is canceled and the client must start a new request. There is no DB persistence or event replay for this stream.

Operational Notes

  • Redis is required for both Bull queues and realtime Pub/Sub.
  • API and worker instances can scale independently. The worker that executes XpertChatCommand does not need to be the same process that accepted the HTTP request.
  • If a request stalls, check whether the API instance subscribed to ai:handoff:agent-chat:<runId>, whether the handoff job was enqueued, and whether the worker published stream, complete, or error.
  • Local task not found and Pending result timeout should not appear on this Xpert Chat SSE path. If they do, the request is likely still using an older local handoff path.

Code Map

FilePurpose
xpert/packages/server-ai/src/xpert/xpert.controller.tsBuilds the dispatch message and opens the realtime SSE stream.
xpert/packages/server-ai/src/handoff/agent-chat-realtime.service.tsRedis Pub/Sub bridge for stream results and client disconnect cancellation.
xpert/packages/server-ai/src/handoff/plugins/agent-chat/agent-chat-dispatch.processor.tsWorker-side execution and realtime callback publishing.
xpert/packages/plugin-sdk/src/lib/agent/handoff/agent-chat.contract.tsShared message and callback transport contract.
xpert/packages/server-ai/src/handoff/message-queue.module.tsRegisters and exports the realtime service.