The Xpert chat API uses the handoff queue to run agent chat work outside the HTTP/SSE process. The HTTP instance only keeps the client stream open and subscribes to realtime results. A Bull handoff worker executes the actualDocumentation Index
Fetch the complete documentation index at: https://docs.xpertai.cn/llms.txt
Use this file to discover all available pages before exploring further.
XpertChatCommand and publishes stream events back through Redis Pub/Sub.
This architecture is used by:
POST /api/xpert/:id/chatPOST /api/xpert/:name/chat-app
Goals
- Support multiple API and worker instances without sticky sessions.
- Keep the public HTTP API and normal SSE stream payloads unchanged.
- Avoid
Local task not foundandPending result timeouton Xpert Chat SSE requests caused by process-local state. - Use Redis Pub/Sub only for live forwarding. The system does not replay missed chat stream events after disconnect.
Runtime Flow
Main Components
| Component | Responsibility |
|---|---|
XpertController | Receives chat requests, builds a serializable agent.chat_dispatch.v1 handoff message, and returns an SSE observable. |
AgentChatRealtimeService | Owns the Redis Pub/Sub channel, subscribes before enqueue, forwards realtime payloads to SSE, and cancels the run on client disconnect. |
HandoffQueueService | Enqueues the handoff message into the Bull queue. |
AgentChatDispatchHandoffProcessor | Runs XpertChatCommand in the worker process and publishes observable events back to Redis Pub/Sub. |
StopHandoffMessageCommand | Cancels waiting or active handoff jobs across instances when the SSE client closes. |
Message Contract
The controller enqueues a handoff message with typeagent.chat_dispatch.v1. The request and options must be serializable because the job can run in another process.
runId is the same value as the handoff message id. The realtime Redis channel is:
Callback Transports
AgentChatCallbackTarget supports two callback transports:
handoff-message is the compatibility mode. It publishes callback messages through the handoff queue and still requires messageType.
redis-pubsub is the Xpert Chat SSE mode. It publishes realtime envelopes directly to Redis:
SSE Semantics
stream: forwarded as the originalMessageEventpayload.complete: completes the SSE observable.error: sends an SSE error event and then completes the observable.- Client close: triggers
StopHandoffMessageCommand({ messageIds: [runId] }). - Keepalive behavior remains unchanged. The controller still applies
keepAlive(30000)andtakeUntilClose(res).
Operational Notes
- Redis is required for both Bull queues and realtime Pub/Sub.
- API and worker instances can scale independently. The worker that executes
XpertChatCommanddoes not need to be the same process that accepted the HTTP request. - If a request stalls, check whether the API instance subscribed to
ai:handoff:agent-chat:<runId>, whether the handoff job was enqueued, and whether the worker publishedstream,complete, orerror. Local task not foundandPending result timeoutshould not appear on this Xpert Chat SSE path. If they do, the request is likely still using an older local handoff path.
Code Map
| File | Purpose |
|---|---|
xpert/packages/server-ai/src/xpert/xpert.controller.ts | Builds the dispatch message and opens the realtime SSE stream. |
xpert/packages/server-ai/src/handoff/agent-chat-realtime.service.ts | Redis Pub/Sub bridge for stream results and client disconnect cancellation. |
xpert/packages/server-ai/src/handoff/plugins/agent-chat/agent-chat-dispatch.processor.ts | Worker-side execution and realtime callback publishing. |
xpert/packages/plugin-sdk/src/lib/agent/handoff/agent-chat.contract.ts | Shared message and callback transport contract. |
xpert/packages/server-ai/src/handoff/message-queue.module.ts | Registers and exports the realtime service. |