> ## Documentation Index
> Fetch the complete documentation index at: https://docs.xpertai.cn/llms.txt
> Use this file to discover all available pages before exploring further.

# 托管队列

托管队列（Managed Queues）让插件可以运行后台任务，但不再由插件自己持有 BullMQ 基础设施。

平台统一持有 Redis 连接、物理 BullMQ 队列、worker、重试、延迟调度、取消和租户上下文恢复。插件只负责入队一个逻辑 job，并声明 `{pluginName, queueName, jobName}` 对应的 handler。

适合使用托管队列的场景：

* 出站消息和通知
* 入站事件聚合或 debounce 窗口
* 可重试的第三方 API 调用
* 延迟执行的后续任务
* 不应阻塞 HTTP callback 的长耗时插件任务

新的插件任务不应再使用插件本地的 `BullModule.forRoot()`、`BullModule.registerQueue()`、`@Processor()`、`WorkerHost` 或 `InjectQueue`。

## 为什么插件不应自管 BullMQ

插件自建 BullMQ 在单进程里很方便，但在生产环境中容易出现问题：

* 多个租户安装同一个插件时，队列名或 Redis key 可能冲突
* worker 执行时没有 HTTP 请求，容易丢失租户和组织上下文
* 每个插件各自创建 Redis 连接和重试行为
* 取消、延迟、失败处理变成插件私有实现
* 多 Pod 排查时没有统一的平台视图

托管队列把物理队列固定为平台队列：

```text theme={null}
xpert:managed-queue:plugin-jobs
```

插件身份和逻辑队列信息放在 job data envelope 中。

## 入队任务

从 plugin context 解析队列服务：

```ts theme={null}
import {
  MANAGED_QUEUE_SERVICE_TOKEN,
  type ManagedQueueService
} from '@xpert-ai/plugin-sdk'

const managedQueue = ctx.resolve<ManagedQueueService>(MANAGED_QUEUE_SERVICE_TOKEN)
```

入队示例：

```ts theme={null}
const result = await managedQueue.enqueue({
  pluginName: '@xpert-ai/plugin-community-wechat',
  queueName: 'wechat.outbound',
  jobName: 'send-message',
  payload: {
    integrationId,
    outboundLogId
  },
  tenantId,
  organizationId,
  scopeKey,
  jobId: `wechat:outbound:${outboundLogId}`,
  delayMs: 1_000,
  attempts: 3,
  backoffMs: {
    type: 'fixed',
    delay: 60_000
  },
  removeOnComplete: {
    age: 24 * 60 * 60,
    count: 5000
  }
})
```

如果用户后续需要取消、查看或重试任务，应把 `result.jobId` 保存到插件业务表。

取消 waiting 或 delayed job：

```ts theme={null}
await managedQueue.cancel({ jobId: result.jobId })
```

如果 job 已经 active、completed 或 failed，平台会返回非成功结果，插件应结合自己的业务状态决定界面提示。

## 声明 Job Processor

使用 `@PluginJobProcessor()`，不要使用 BullMQ 的 `@Processor()`：

```ts theme={null}
import {
  PluginJobProcessor,
  type ManagedQueueJob
} from '@xpert-ai/plugin-sdk'

@PluginJobProcessor({
  pluginName: '@xpert-ai/plugin-community-wechat',
  queueName: 'wechat.outbound',
  jobName: 'send-message',
  concurrency: 1
})
@Injectable()
export class WechatOutboundJobHandler {
  async handle(job: ManagedQueueJob<WechatOutboundQueueJobData>) {
    await this.queueService.processSendTextJob(job)
  }
}
```

平台会在调用 `handle()` 之前，从 job envelope 恢复 tenant 和 organization context。

一个类可以处理多个逻辑 job：

```ts theme={null}
@PluginJobProcessor({ pluginName, queueName: 'wechat.inbound', jobName: 'aggregate' })
@PluginJobProcessor({ pluginName, queueName: 'wechat.inbound', jobName: 'flush' })
@Injectable()
export class WechatInboundJobHandler {
  async handle(job: ManagedQueueJob<InboundJobData>) {
    if (job.name === 'aggregate') {
      return this.aggregate(job.data)
    }
    if (job.name === 'flush') {
      return this.flush(job.data)
    }
  }
}
```

## Redis 状态和锁

如果插件需要限流状态、聚合状态或分布式锁，请通过 `ManagedQueueService.getRedis()` 使用平台 Redis：

```ts theme={null}
const redis = await managedQueue.getRedis()
await redis.set(lockKey, '1', 'PX', 30_000, 'NX')
```

Redis key 必须带作用域。常见组成部分：

* `pluginName`
* `tenantId`
* `organizationId`
* `integrationId`
* 账号 id 或外部会话 id

示例：

```text theme={null}
plugin_wechat:{tenantId}:{integrationId}:lock:outbound
plugin_wechat:{tenantId}:{organizationId}:inbound:{aggregateKey}
```

不要通过 BullMQ 的私有 client 访问 Redis。

## 重试和失败

当任务需要重试时，handler 直接抛错：

```ts theme={null}
async handle(job: ManagedQueueJob<Payload>) {
  try {
    await this.send(job.data)
  } catch (error) {
    await this.recordFailure(job, error)
    throw error
  }
}
```

BullMQ 会根据入队时的 `attempts` 和 `backoffMs` 重试。插件仍然要维护自己的业务记录，例如消息日志或 integration 健康状态。

## 并发

`concurrency` 限制当前 API 进程中同一个逻辑 handler 的本地并发数量。它不提供集群级串行保证。

跨 Pod 的顺序控制或限流应使用 Redis lock 或业务状态。例如出站消息插件在发送前仍应按 tenant 和 integration 加锁。

## 迁移已有插件

从插件自管 BullMQ 迁移到托管队列时：

1. 删除插件包里的 BullMQ imports 和 module registration
2. 用 `ManagedQueueService.enqueue()` 替换 `queue.add()`
3. 用 `ManagedQueueService.cancel()` 替换直接删除 BullMQ job
4. 用 `@PluginJobProcessor()` handler 替换 `@Processor()` class
5. 把 Redis 状态和锁迁移到平台 Redis
6. 所有 key 都加上 tenant、organization 和 integration 作用域
7. 确保 handler 不依赖当前 HTTP request
8. 部署前排空旧物理队列

以 WeChat community 插件为例，新版本不会保留旧队列 consumer。生产切换前，运维人员必须先排空旧的 `plugin_wechat` BullMQ 队列。
