跳转到主要内容
托管队列(Managed Queues)让插件可以运行后台任务,但不再由插件自己持有 BullMQ 基础设施。 平台统一持有 Redis 连接、物理 BullMQ 队列、worker、重试、延迟调度、取消和租户上下文恢复。插件只负责入队一个逻辑 job,并声明 {pluginName, queueName, jobName} 对应的 handler。 适合使用托管队列的场景:
  • 出站消息和通知
  • 入站事件聚合或 debounce 窗口
  • 可重试的第三方 API 调用
  • 延迟执行的后续任务
  • 不应阻塞 HTTP callback 的长耗时插件任务
新的插件任务不应再使用插件本地的 BullModule.forRoot()BullModule.registerQueue()@Processor()WorkerHostInjectQueue

为什么插件不应自管 BullMQ

插件自建 BullMQ 在单进程里很方便,但在生产环境中容易出现问题:
  • 多个租户安装同一个插件时,队列名或 Redis key 可能冲突
  • worker 执行时没有 HTTP 请求,容易丢失租户和组织上下文
  • 每个插件各自创建 Redis 连接和重试行为
  • 取消、延迟、失败处理变成插件私有实现
  • 多 Pod 排查时没有统一的平台视图
托管队列把物理队列固定为平台队列:
xpert:managed-queue:plugin-jobs
插件身份和逻辑队列信息放在 job data envelope 中。

入队任务

从 plugin context 解析队列服务:
import {
  MANAGED_QUEUE_SERVICE_TOKEN,
  type ManagedQueueService
} from '@xpert-ai/plugin-sdk'

const managedQueue = ctx.resolve<ManagedQueueService>(MANAGED_QUEUE_SERVICE_TOKEN)
入队示例:
const result = await managedQueue.enqueue({
  pluginName: '@xpert-ai/plugin-community-wechat',
  queueName: 'wechat.outbound',
  jobName: 'send-message',
  payload: {
    integrationId,
    outboundLogId
  },
  tenantId,
  organizationId,
  scopeKey,
  jobId: `wechat:outbound:${outboundLogId}`,
  delayMs: 1_000,
  attempts: 3,
  backoffMs: {
    type: 'fixed',
    delay: 60_000
  },
  removeOnComplete: {
    age: 24 * 60 * 60,
    count: 5000
  }
})
如果用户后续需要取消、查看或重试任务,应把 result.jobId 保存到插件业务表。 取消 waiting 或 delayed job:
await managedQueue.cancel({ jobId: result.jobId })
如果 job 已经 active、completed 或 failed,平台会返回非成功结果,插件应结合自己的业务状态决定界面提示。

声明 Job Processor

使用 @PluginJobProcessor(),不要使用 BullMQ 的 @Processor()
import {
  PluginJobProcessor,
  type ManagedQueueJob
} from '@xpert-ai/plugin-sdk'

@PluginJobProcessor({
  pluginName: '@xpert-ai/plugin-community-wechat',
  queueName: 'wechat.outbound',
  jobName: 'send-message',
  concurrency: 1
})
@Injectable()
export class WechatOutboundJobHandler {
  async handle(job: ManagedQueueJob<WechatOutboundQueueJobData>) {
    await this.queueService.processSendTextJob(job)
  }
}
平台会在调用 handle() 之前,从 job envelope 恢复 tenant 和 organization context。 一个类可以处理多个逻辑 job:
@PluginJobProcessor({ pluginName, queueName: 'wechat.inbound', jobName: 'aggregate' })
@PluginJobProcessor({ pluginName, queueName: 'wechat.inbound', jobName: 'flush' })
@Injectable()
export class WechatInboundJobHandler {
  async handle(job: ManagedQueueJob<InboundJobData>) {
    if (job.name === 'aggregate') {
      return this.aggregate(job.data)
    }
    if (job.name === 'flush') {
      return this.flush(job.data)
    }
  }
}

Redis 状态和锁

如果插件需要限流状态、聚合状态或分布式锁,请通过 ManagedQueueService.getRedis() 使用平台 Redis:
const redis = await managedQueue.getRedis()
await redis.set(lockKey, '1', 'PX', 30_000, 'NX')
Redis key 必须带作用域。常见组成部分:
  • pluginName
  • tenantId
  • organizationId
  • integrationId
  • 账号 id 或外部会话 id
示例:
plugin_wechat:{tenantId}:{integrationId}:lock:outbound
plugin_wechat:{tenantId}:{organizationId}:inbound:{aggregateKey}
不要通过 BullMQ 的私有 client 访问 Redis。

重试和失败

当任务需要重试时,handler 直接抛错:
async handle(job: ManagedQueueJob<Payload>) {
  try {
    await this.send(job.data)
  } catch (error) {
    await this.recordFailure(job, error)
    throw error
  }
}
BullMQ 会根据入队时的 attemptsbackoffMs 重试。插件仍然要维护自己的业务记录,例如消息日志或 integration 健康状态。

并发

concurrency 限制当前 API 进程中同一个逻辑 handler 的本地并发数量。它不提供集群级串行保证。 跨 Pod 的顺序控制或限流应使用 Redis lock 或业务状态。例如出站消息插件在发送前仍应按 tenant 和 integration 加锁。

迁移已有插件

从插件自管 BullMQ 迁移到托管队列时:
  1. 删除插件包里的 BullMQ imports 和 module registration
  2. ManagedQueueService.enqueue() 替换 queue.add()
  3. ManagedQueueService.cancel() 替换直接删除 BullMQ job
  4. @PluginJobProcessor() handler 替换 @Processor() class
  5. 把 Redis 状态和锁迁移到平台 Redis
  6. 所有 key 都加上 tenant、organization 和 integration 作用域
  7. 确保 handler 不依赖当前 HTTP request
  8. 部署前排空旧物理队列
以 WeChat community 插件为例,新版本不会保留旧队列 consumer。生产切换前,运维人员必须先排空旧的 plugin_wechat BullMQ 队列。