¿Qué es BullMQ?
Sistema de colas de mensajes y tareas de alta performance para Node.js,
respaldado por Redis. Sucesor moderno de Bull, escrito en TypeScript.
BullMQ es una librería que permite encolar trabajos (jobs) para procesarlos de forma asíncrona, en segundo plano, con reintentos automáticos, prioridades, retardos y flujos complejos. Piensa en ella como una fábrica de trabajos: tú entregas el pedido (job), y la fábrica lo procesa en el momento adecuado.
Usa Redis como backend de persistencia, lo que le da velocidad, durabilidad y soporte para múltiples workers distribuidos en varios servidores.
Conceptos Clave
Antes de escribir código, entiende los tres actores principales.
Ciclo de vida de un Job:
Instalación y Setup
# Instalar BullMQ npm install bullmq # También necesitas Redis corriendo # Con Docker (desarrollo local): docker run -d -p 6379:6379 redis:7-alpine # O en tu servidor CEDIS / sucursal: # sudo apt install redis-server
// Conexión Redis compartida — usada por Queue y Worker const connection = { host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT) || 6379, password: process.env.REDIS_PASSWORD, // Para producción en AWS ElastiCache: tls: process.env.NODE_ENV === 'production' ? {} : undefined, }; module.exports = { connection };
La Queue
La Queue es el punto de entrada. Aquí defines el canal y agregas jobs. Puede vivir en cualquier parte de tu app: un endpoint REST, un evento de Moleculer, un cron, etc.
const { Queue } = require('bullmq'); const { connection } = require('../config/redis'); // Crear la cola — el nombre debe ser único y descriptivo const cfdiQueue = new Queue('cfdi', { connection, defaultJobOptions: { attempts: 5, // reintentos ante fallo backoff: { type: 'exponential', // espera 2s, 4s, 8s… delay: 2000, }, removeOnComplete: { count: 500 }, // guardar los últimos 500 removeOnFail: { count: 200 }, // guardar últimos 200 fallos }, }); // ─── Agregar un job de timbrado CFDI ─────────────────── async function encolarCFDI(venta) { const job = await cfdiQueue.add( 'timbrar-venta', // nombre del job { // payload (datos del job) ventaId: venta.id, sucursalId: venta.sucursalId, total: venta.total, receptor: venta.clienteRFC, items: venta.items, }, { // opciones del job (override) priority: 10, // menor número = más prioridad jobId: `cfdi-${venta.id}`, // ID único evita duplicados } ); console.log(`✅ Job encolado: ${job.id}`); return job.id; } module.exports = { cfdiQueue, encolarCFDI };
jobId: 'cfdi-${ventaId}' garantiza que el mismo CFDI no se timbre dos veces. BullMQ ignora silenciosamente el job si ya existe con ese ID.
El Worker
El Worker es el procesador. Escucha la cola, toma un job a la vez (o N en paralelo) y ejecuta tu función. Si lanza un error, BullMQ reintenta automáticamente según la configuración.
const { Worker } = require('bullmq'); const { connection } = require('../config/redis'); const { timbrarEnPAC } = require('../services/pac'); const worker = new Worker( 'cfdi', // debe coincidir con el nombre de la Queue async (job) => { // processor function console.log(`📄 Procesando job ${job.id}: ${job.name}`); // Reportar progreso (visible en Bull Board) await job.updateProgress(10); // Acceder a los datos del job const { ventaId, items, receptor, total } = job.data; await job.updateProgress(30); // Tu lógica de negocio aquí const resultado = await timbrarEnPAC({ ventaId, items, receptor, total, }); await job.updateProgress(90); // Guardar UUID del CFDI en el job para referencia await job.updateData({ ...job.data, uuid: resultado.uuid }); await job.updateProgress(100); // Lo que retornes se guarda en job.returnvalue return { uuid: resultado.uuid, xml: resultado.xml }; }, { connection, concurrency: 3, // procesar 3 jobs en paralelo limiter: { max: 10, // máximo 10 jobs por ventana duration: 1000, // ventana de 1 segundo (rate limit al PAC) }, } ); // Eventos del worker worker.on('completed', (job) => { console.log(`✅ CFDI timbrado: ${job.returnvalue.uuid}`); }); worker.on('failed', (job, err) => { console.error(`❌ Fallo intento ${job.attemptsMade}/${job.opts.attempts}: ${err.message}`); }); console.log('🐂 Worker CFDI iniciado, escuchando cola...');
Job Options
Cada job puede tener opciones específicas que anulan las defaults de la Queue.
| Opción | Tipo | Default | Descripción |
|---|---|---|---|
| attempts | number | 1 | Máximo de intentos ante fallo |
| backoff | object | - | Estrategia de reintento: fixed | exponential |
| delay | number | 0 | Ms antes de pasar a WAITING (job diferido) |
| priority | number | 0 | Menor = más prioridad. 1 se procesa antes que 10 |
| jobId | string | auto UUID | ID único. Previene duplicados si ya existe |
| removeOnComplete | boolean/object | false | Limpiar jobs completados. { count: N } guarda los últimos N |
| removeOnFail | boolean/object | false | Limpiar jobs fallidos. Igual que removeOnComplete |
| timeout | number | - | Ms máximos de ejecución antes de abortar |
| stackTraceLimit | number | 10 | Líneas de stack trace guardadas en fallo |
// Job con delay (enviar reporte en 5 minutos) await reportQueue.add('reporte-dia', data, { delay: 5 * 60 * 1000, // 5 minutos en ms }); // Job de alta prioridad (cierre de caja urgente) await cfdiQueue.add('cancelar-cfdi', data, { priority: 1, // máxima prioridad attempts: 10, backoff: { type: 'fixed', delay: 5000 }, }); // Job con timeout (no puede tardar más de 30s) await syncQueue.add('sync-sucursal', data, { timeout: 30_000, attempts: 3, backoff: { type: 'exponential', delay: 10_000 }, });
Eventos y QueueEvents
BullMQ emite eventos en tiempo real. Puedes escucharlos en el Worker (sin overhead extra) o desde cualquier proceso usando QueueEvents (escucha vía Redis).
const { QueueEvents } = require('bullmq'); const { connection } = require('../config/redis'); // QueueEvents permite escuchar desde cualquier proceso / servicio const cfdiEvents = new QueueEvents('cfdi', { connection }); // ── Eventos principales ────────────────────────────── cfdiEvents.on('waiting', ({ jobId }) => { console.log(`⏳ Encolado: ${jobId}`); }); cfdiEvents.on('active', ({ jobId, prev }) => { console.log(`🔄 Procesando: ${jobId} (anterior: ${prev})`); }); cfdiEvents.on('progress', ({ jobId, data }) => { console.log(`📊 Progreso ${jobId}: ${data}%`); }); cfdiEvents.on('completed', ({ jobId, returnvalue }) => { // ← Aquí podría notificar al Moleculer Finance Service console.log(`✅ CFDI listo: ${jobId} · UUID: ${returnvalue}`); }); cfdiEvents.on('failed', ({ jobId, failedReason }) => { console.error(`❌ Falló ${jobId}: ${failedReason}`); // ← Aquí podría mandar alerta a Postmark / Twilio }); cfdiEvents.on('delayed', ({ jobId, delay }) => { console.log(`⏱ Retrasado ${jobId} por ${delay}ms`); });
Jobs Repetitivos
BullMQ soporta jobs periódicos usando expresiones cron o intervalos en ms. Ideal para tareas de mantenimiento, reportes automáticos y sincronizaciones programadas.
const { reportQueue, syncQueue, inventarioQueue } = require('./queues'); async function registrarJobsRepetitivos() { // ── Reporte de ventas diario a las 23:55 ───────────── await reportQueue.add( 'reporte-ventas-diario', { tipo: 'ventas', destinatarios: ['gerencia@vlim.mx'] }, { repeat: { pattern: '55 23 * * *' }, // cron: 23:55 cada día jobId: 'reporte-ventas-diario', // evitar duplicados } ); // ── Alerta de vigencias próximas: cada 6 horas ─────── await inventarioQueue.add( 'check-vigencias', { diasAlerta: 3 }, { repeat: { pattern: '0 */6 * * *' }, // cada 6h jobId: 'check-vigencias', } ); // ── Sync offline de sucursales: cada 5 minutos ─────── await syncQueue.add( 'sync-delta', { todas: true }, { repeat: { every: 5 * 60 * 1000 }, // cada 5 min jobId: 'sync-delta', } ); // ── Conciliación bancaria: lunes a viernes 9am ─────── await reportQueue.add( 'conciliacion-bancaria', {}, { repeat: { pattern: '0 9 * * 1-5' }, // L-V 09:00 jobId: 'conciliacion-bancaria', } ); console.log('⏰ Jobs repetitivos registrados'); } registrarJobsRepetitivos();
queue.add() con repeat desde múltiples instancias puede crear jobs duplicados. Registra los jobs repetitivos en un solo proceso al arrancar la app (o usa jobId único para deduplicar).
Flows — Cadenas de Jobs
FlowProducer permite crear árboles de jobs con dependencias. Un job padre no se completa hasta que todos sus hijos terminen. Perfecto para pipelines multi-paso.
const { FlowProducer } = require('bullmq'); const { connection } = require('../config/redis'); const flow = new FlowProducer({ connection }); // Pipeline: Cierre de caja diario en sucursal // El job padre (resumen-dia) espera a que todos los hijos terminen await flow.add({ name: 'resumen-dia', // ← padre: ejecuta al final queueName: 'reportes', data: { sucursalId: 7, fecha: '2025-01-15' }, children: [ { name: 'timbrar-cfdis-pendientes', // hijo 1 queueName: 'cfdi', data: { sucursalId: 7, lote: 'cierre' }, }, { name: 'sync-inventario-final', // hijo 2 queueName: 'sync', data: { sucursalId: 7, tipo: 'inventario' }, }, { name: 'calcular-mermas-dia', // hijo 3 (tiene nieto) queueName: 'inventario', data: { sucursalId: 7 }, children: [ { name: 'exportar-mermas-pdf', // nieto queueName: 'reportes', data: { sucursalId: 7, formato: 'pdf' }, }, ], }, ], });
Prioridades y Rate Limiting
// ── Prioridades (menor número = mayor urgencia) ─────── await cfdiQueue.add('cancelacion-urgente', data, { priority: 1 }); await cfdiQueue.add('timbrar-venta', data, { priority: 10 }); await cfdiQueue.add('cfdi-nomina', data, { priority: 50 }); // → Se procesarán en orden: 1, 10, 50 // ── Rate Limiter: proteger API del PAC ───────────── const workerConLimite = new Worker('cfdi', processor, { connection, concurrency: 5, limiter: { max: 20, // max 20 jobs… duration: 1000, // …por segundo }, // Los jobs que excedan el límite esperan automáticamente });
Dead-Letter Queue
Cuando un job agota todos sus reintentos, queda en estado failed. Con una Dead-Letter Queue puedes moverlos a una cola separada para inspección manual, alertas o reprocesamiento posterior.
const { Worker, Queue } = require('bullmq'); const { connection } = require('../config/redis'); const { enviarAlertaPostmark } = require('../services/postmark'); const dlqQueue = new Queue('cfdi-dlq', { connection }); // Worker principal: al agotar reintentos → mueve a DLQ const cfdiWorker = new Worker('cfdi', processor, { connection, concurrency: 3, }); cfdiWorker.on('failed', async (job, err) => { // ¿Agotó todos los reintentos? if (job.attemptsMade >= job.opts.attempts) { // Mover a Dead-Letter Queue con contexto de error await dlqQueue.add('job-muerto', { originalJob: job.name, originalData: job.data, failedReason: err.message, attemptsMade: job.attemptsMade, failedAt: new Date().toISOString(), }, { jobId: `dlq-${job.id}` }); // Alerta por Postmark a operaciones await enviarAlertaPostmark({ to: 'operaciones@vlim.mx', subject: `❌ CFDI sin timbrar: venta ${job.data.ventaId}`, body: `Error: ${err.message} · Sucursal: ${job.data.sucursalId}`, }); } }); // Worker que procesa la DLQ (para reproceso manual) new Worker('cfdi-dlq', async (job) => { console.warn('☠️ Job en DLQ esperando revisión manual:', job.data); // Aquí: notificar, guardar en BD, o reprocesar con fix }, { connection, concurrency: 1 });
Bull Board — Dashboard
Bull Board es una UI web para monitorear todas tus colas en tiempo real. Muestra jobs activos, fallidos, completados, permite reintentar jobs manualmente y ver el stack trace de errores.
const { createBullBoard } = require('@bull-board/api'); const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter'); const { ExpressAdapter } = require('@bull-board/express'); const express = require('express'); // npm install @bull-board/api @bull-board/express const { cfdiQueue, syncQueue, reportQueue, nominaQueue } = require('./queues'); const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullMQAdapter(cfdiQueue), new BullMQAdapter(syncQueue), new BullMQAdapter(reportQueue), new BullMQAdapter(nominaQueue), ], serverAdapter, }); const app = express(); // ⚠️ Protege con autenticación en producción! app.use('/admin/queues', serverAdapter.getRouter()); app.listen(3001, () => { console.log('📊 Bull Board: http://localhost:3001/admin/queues'); });
ERP Carnicerías — Caso Real
Mapa completo de colas usadas en el ERP, con sus workers y el rol de cada una en la operación del negocio.
const { Queue } = require('bullmq'); const { connection } = require('../config/redis'); const defaults = { connection, defaultJobOptions: { removeOnComplete: { count: 1000 }, removeOnFail: { count: 500 }, }, }; module.exports = { // ── Fiscales (alta prioridad) ────────────────────── cfdiQueue: new Queue('cfdi', defaults), // timbrado CFDI 4.0 cartaPorteQueue: new Queue('carta-porte',defaults), // complemento traslado nominaQueue: new Queue('nomina', defaults), // CFDI nómina empleados // ── Inventario y calidad ─────────────────────────── mermasQueue: new Queue('mermas', defaults), // cálculo y registro vigenciasQueue: new Queue('vigencias', defaults), // alertas caducidad stockQueue: new Queue('stock', defaults), // reposición automática // ── Sincronización offline ───────────────────────── syncQueue: new Queue('sync', defaults), // delta sync sucursales // ── Reportes y notificaciones ───────────────────── reportQueue: new Queue('reportes', defaults), // generación async notifQueue: new Queue('notif', defaults), // Postmark/SMS/Push // ── Dead-letter ──────────────────────────────────── dlqQueue: new Queue('dlq', defaults), // jobs sin recuperar };
sale.completed → Sales Service (Moleculer) → encola en cfdi (timbrar) + sync (delta a AWS) + stock (descontar inventario) → BullMQ procesa en paralelo → notif envía ticket por email vía Postmark.
Patrones y Buenas Prácticas
jobId derivado del ID de negocio. Si el job ya existe, BullMQ lo ignora. Previene doble timbrado en reconexiones.timeout en jobs con llamadas externas (PAC, bancos). Sin timeout, un job colgado bloquea un slot de concurrency.// Cierre limpio al detener el servidor (SIGTERM en ECS) async function gracefulShutdown() { console.log('🛑 Cerrando workers...'); // Espera a que los jobs activos terminen (max 30s) await Promise.all([ cfdiWorker.close(), syncWorker.close(), reportWorker.close(), ]); console.log('✅ Workers cerrados limpiamente'); process.exit(0); } process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown);