{pluginName, queueName, jobName} 对应的 handler。
适合使用托管队列的场景:
- 出站消息和通知
- 入站事件聚合或 debounce 窗口
- 可重试的第三方 API 调用
- 延迟执行的后续任务
- 不应阻塞 HTTP callback 的长耗时插件任务
BullModule.forRoot()、BullModule.registerQueue()、@Processor()、WorkerHost 或 InjectQueue。
为什么插件不应自管 BullMQ
插件自建 BullMQ 在单进程里很方便,但在生产环境中容易出现问题:- 多个租户安装同一个插件时,队列名或 Redis key 可能冲突
- worker 执行时没有 HTTP 请求,容易丢失租户和组织上下文
- 每个插件各自创建 Redis 连接和重试行为
- 取消、延迟、失败处理变成插件私有实现
- 多 Pod 排查时没有统一的平台视图
入队任务
从 plugin context 解析队列服务:result.jobId 保存到插件业务表。
取消 waiting 或 delayed job:
声明 Job Processor
使用@PluginJobProcessor(),不要使用 BullMQ 的 @Processor():
handle() 之前,从 job envelope 恢复 tenant 和 organization context。
一个类可以处理多个逻辑 job:
Redis 状态和锁
如果插件需要限流状态、聚合状态或分布式锁,请通过ManagedQueueService.getRedis() 使用平台 Redis:
pluginNametenantIdorganizationIdintegrationId- 账号 id 或外部会话 id
重试和失败
当任务需要重试时,handler 直接抛错:attempts 和 backoffMs 重试。插件仍然要维护自己的业务记录,例如消息日志或 integration 健康状态。
并发
concurrency 限制当前 API 进程中同一个逻辑 handler 的本地并发数量。它不提供集群级串行保证。
跨 Pod 的顺序控制或限流应使用 Redis lock 或业务状态。例如出站消息插件在发送前仍应按 tenant 和 integration 加锁。
迁移已有插件
从插件自管 BullMQ 迁移到托管队列时:- 删除插件包里的 BullMQ imports 和 module registration
- 用
ManagedQueueService.enqueue()替换queue.add() - 用
ManagedQueueService.cancel()替换直接删除 BullMQ job - 用
@PluginJobProcessor()handler 替换@Processor()class - 把 Redis 状态和锁迁移到平台 Redis
- 所有 key 都加上 tenant、organization 和 integration 作用域
- 确保 handler 不依赖当前 HTTP request
- 部署前排空旧物理队列
plugin_wechat BullMQ 队列。