Capítulo 01

¿Qué es BullMQ?

Sistema de colas de mensajes y tareas de alta performance para Node.js,
respaldado por Redis. Sucesor moderno de Bull, escrito en TypeScript.

BullMQ es una librería que permite encolar trabajos (jobs) para procesarlos de forma asíncrona, en segundo plano, con reintentos automáticos, prioridades, retardos y flujos complejos. Piensa en ella como una fábrica de trabajos: tú entregas el pedido (job), y la fábrica lo procesa en el momento adecuado.

Usa Redis como backend de persistencia, lo que le da velocidad, durabilidad y soporte para múltiples workers distribuidos en varios servidores.

Alta Performance
Miles de jobs/seg. Redis como motor. Latencia <1ms en LAN.
🔁
Reintentos Automáticos
Backoff exponencial. Configurable por tipo de error.
🌊
Flujos Complejos
Chains, pipelines, dependencias entre jobs con Flows API.
📊
Observabilidad
Bull Board UI. Métricas, logs, historial de fallos.
ℹ️
¿Bull o BullMQ? Bull (sin MQ) es la versión anterior, en mantenimiento pasivo. BullMQ es la versión moderna reescrita en TypeScript con mejor API, soporte para Flows, y mantenimiento activo. Usa siempre BullMQ para proyectos nuevos.

Capítulo 02

Conceptos Clave

Antes de escribir código, entiende los tres actores principales.

📋
Queue (Cola)
Canal nombrado donde se depositan los jobs. No procesa nada por sí sola; solo almacena. Puede tener múltiples productores y consumidores. Ejemplo: "cfdi-queue", "nomina-queue", "sync-queue".
⚙️
Worker (Trabajador)
Proceso que escucha la cola y ejecuta los jobs. Puede correr en el mismo proceso Node.js o en servidores separados. Varios workers pueden escuchar la misma cola para procesar en paralelo.
📄
Job (Trabajo)
Unidad mínima de trabajo. Contiene un nombre, datos (payload JSON) y opciones (reintentos, delay, prioridad). Tiene estados: waiting → active → completed | failed.

Ciclo de vida de un Job:

ADDED
queue.add()
WAITING
en cola Redis
🔄
ACTIVE
worker procesando
COMPLETED
job exitoso
⏱️
DELAYED
espera delay
⏸️
PAUSED
queue pausada
FAILED
max retries
☠️
DLQ
dead-letter
WAITING ACTIVE COMPLETED FAILED DELAYED PAUSED

Capítulo 03

Instalación y Setup

terminal BASH
# Instalar BullMQ
npm install bullmq

# También necesitas Redis corriendo
# Con Docker (desarrollo local):
docker run -d -p 6379:6379 redis:7-alpine

# O en tu servidor CEDIS / sucursal:
# sudo apt install redis-server
config/redis.js JS
// Conexión Redis compartida — usada por Queue y Worker
const connection = {
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT) || 6379,
  password: process.env.REDIS_PASSWORD,
  // Para producción en AWS ElastiCache:
  tls: process.env.NODE_ENV === 'production' ? {} : undefined,
};

module.exports = { connection };

Capítulo 04

La Queue

La Queue es el punto de entrada. Aquí defines el canal y agregas jobs. Puede vivir en cualquier parte de tu app: un endpoint REST, un evento de Moleculer, un cron, etc.

queues/cfdi.queue.js JS
const { Queue } = require('bullmq');
const { connection } = require('../config/redis');

// Crear la cola — el nombre debe ser único y descriptivo
const cfdiQueue = new Queue('cfdi', {
  connection,
  defaultJobOptions: {
    attempts: 5,            // reintentos ante fallo
    backoff: {
      type: 'exponential',   // espera 2s, 4s, 8s…
      delay: 2000,
    },
    removeOnComplete: { count: 500 }, // guardar los últimos 500
    removeOnFail:     { count: 200 }, // guardar últimos 200 fallos
  },
});

// ─── Agregar un job de timbrado CFDI ───────────────────
async function encolarCFDI(venta) {
  const job = await cfdiQueue.add(
    'timbrar-venta',          // nombre del job
    {                          // payload (datos del job)
      ventaId:    venta.id,
      sucursalId: venta.sucursalId,
      total:      venta.total,
      receptor:   venta.clienteRFC,
      items:      venta.items,
    },
    {                          // opciones del job (override)
      priority:  10,           // menor número = más prioridad
      jobId:     `cfdi-${venta.id}`, // ID único evita duplicados
    }
  );

  console.log(`✅ Job encolado: ${job.id}`);
  return job.id;
}

module.exports = { cfdiQueue, encolarCFDI };
💡
jobId único previene duplicados Si se pierde la conexión y el cliente reintenta, usar jobId: 'cfdi-${ventaId}' garantiza que el mismo CFDI no se timbre dos veces. BullMQ ignora silenciosamente el job si ya existe con ese ID.

Capítulo 05

El Worker

El Worker es el procesador. Escucha la cola, toma un job a la vez (o N en paralelo) y ejecuta tu función. Si lanza un error, BullMQ reintenta automáticamente según la configuración.

workers/cfdi.worker.js JS
const { Worker } = require('bullmq');
const { connection } = require('../config/redis');
const { timbrarEnPAC } = require('../services/pac');

const worker = new Worker(
  'cfdi',             // debe coincidir con el nombre de la Queue

  async (job) => {   // processor function
    console.log(`📄 Procesando job ${job.id}: ${job.name}`);

    // Reportar progreso (visible en Bull Board)
    await job.updateProgress(10);

    // Acceder a los datos del job
    const { ventaId, items, receptor, total } = job.data;

    await job.updateProgress(30);

    // Tu lógica de negocio aquí
    const resultado = await timbrarEnPAC({
      ventaId, items, receptor, total,
    });

    await job.updateProgress(90);

    // Guardar UUID del CFDI en el job para referencia
    await job.updateData({ ...job.data, uuid: resultado.uuid });

    await job.updateProgress(100);

    // Lo que retornes se guarda en job.returnvalue
    return { uuid: resultado.uuid, xml: resultado.xml };
  },

  {
    connection,
    concurrency: 3,   // procesar 3 jobs en paralelo
    limiter: {
      max:      10,   // máximo 10 jobs por ventana
      duration: 1000, // ventana de 1 segundo (rate limit al PAC)
    },
  }
);

// Eventos del worker
worker.on('completed', (job) => {
  console.log(`✅ CFDI timbrado: ${job.returnvalue.uuid}`);
});

worker.on('failed', (job, err) => {
  console.error(`❌ Fallo intento ${job.attemptsMade}/${job.opts.attempts}: ${err.message}`);
});

console.log('🐂 Worker CFDI iniciado, escuchando cola...');
Job: timbrar-venta #cfdi-8842 ACTIVE
attemptsMade: 1 / 5 · worker-node-1 · latency: 42ms

Capítulo 06

Job Options

Cada job puede tener opciones específicas que anulan las defaults de la Queue.

OpciónTipoDefaultDescripción
attemptsnumber1Máximo de intentos ante fallo
backoffobject-Estrategia de reintento: fixed | exponential
delaynumber0Ms antes de pasar a WAITING (job diferido)
prioritynumber0Menor = más prioridad. 1 se procesa antes que 10
jobIdstringauto UUIDID único. Previene duplicados si ya existe
removeOnCompleteboolean/objectfalseLimpiar jobs completados. { count: N } guarda los últimos N
removeOnFailboolean/objectfalseLimpiar jobs fallidos. Igual que removeOnComplete
timeoutnumber-Ms máximos de ejecución antes de abortar
stackTraceLimitnumber10Líneas de stack trace guardadas en fallo
Ejemplos de opciones JS
// Job con delay (enviar reporte en 5 minutos)
await reportQueue.add('reporte-dia', data, {
  delay: 5 * 60 * 1000,   // 5 minutos en ms
});

// Job de alta prioridad (cierre de caja urgente)
await cfdiQueue.add('cancelar-cfdi', data, {
  priority: 1,             // máxima prioridad
  attempts: 10,
  backoff: { type: 'fixed', delay: 5000 },
});

// Job con timeout (no puede tardar más de 30s)
await syncQueue.add('sync-sucursal', data, {
  timeout: 30_000,
  attempts: 3,
  backoff: { type: 'exponential', delay: 10_000 },
});

Capítulo 07

Eventos y QueueEvents

BullMQ emite eventos en tiempo real. Puedes escucharlos en el Worker (sin overhead extra) o desde cualquier proceso usando QueueEvents (escucha vía Redis).

events/cfdi.events.js JS
const { QueueEvents } = require('bullmq');
const { connection } = require('../config/redis');

// QueueEvents permite escuchar desde cualquier proceso / servicio
const cfdiEvents = new QueueEvents('cfdi', { connection });

// ── Eventos principales ──────────────────────────────
cfdiEvents.on('waiting', ({ jobId }) => {
  console.log(`⏳ Encolado: ${jobId}`);
});

cfdiEvents.on('active', ({ jobId, prev }) => {
  console.log(`🔄 Procesando: ${jobId} (anterior: ${prev})`);
});

cfdiEvents.on('progress', ({ jobId, data }) => {
  console.log(`📊 Progreso ${jobId}: ${data}%`);
});

cfdiEvents.on('completed', ({ jobId, returnvalue }) => {
  // ← Aquí podría notificar al Moleculer Finance Service
  console.log(`✅ CFDI listo: ${jobId} · UUID: ${returnvalue}`);
});

cfdiEvents.on('failed', ({ jobId, failedReason }) => {
  console.error(`❌ Falló ${jobId}: ${failedReason}`);
  // ← Aquí podría mandar alerta a Postmark / Twilio
});

cfdiEvents.on('delayed', ({ jobId, delay }) => {
  console.log(`⏱ Retrasado ${jobId} por ${delay}ms`);
});

Capítulo 08

Jobs Repetitivos

BullMQ soporta jobs periódicos usando expresiones cron o intervalos en ms. Ideal para tareas de mantenimiento, reportes automáticos y sincronizaciones programadas.

jobs/scheduled.js JS
const { reportQueue, syncQueue, inventarioQueue } = require('./queues');

async function registrarJobsRepetitivos() {

  // ── Reporte de ventas diario a las 23:55 ─────────────
  await reportQueue.add(
    'reporte-ventas-diario',
    { tipo: 'ventas', destinatarios: ['gerencia@vlim.mx'] },
    {
      repeat: { pattern: '55 23 * * *' }, // cron: 23:55 cada día
      jobId: 'reporte-ventas-diario',     // evitar duplicados
    }
  );

  // ── Alerta de vigencias próximas: cada 6 horas ───────
  await inventarioQueue.add(
    'check-vigencias',
    { diasAlerta: 3 },
    {
      repeat: { pattern: '0 */6 * * *' }, // cada 6h
      jobId: 'check-vigencias',
    }
  );

  // ── Sync offline de sucursales: cada 5 minutos ───────
  await syncQueue.add(
    'sync-delta',
    { todas: true },
    {
      repeat: { every: 5 * 60 * 1000 }, // cada 5 min
      jobId: 'sync-delta',
    }
  );

  // ── Conciliación bancaria: lunes a viernes 9am ───────
  await reportQueue.add(
    'conciliacion-bancaria',
    {},
    {
      repeat: { pattern: '0 9 * * 1-5' }, // L-V 09:00
      jobId: 'conciliacion-bancaria',
    }
  );

  console.log('⏰ Jobs repetitivos registrados');
}

registrarJobsRepetitivos();
⚠️
Importante: registrar solo una vez Llamar a queue.add() con repeat desde múltiples instancias puede crear jobs duplicados. Registra los jobs repetitivos en un solo proceso al arrancar la app (o usa jobId único para deduplicar).

Capítulo 09

Flows — Cadenas de Jobs

FlowProducer permite crear árboles de jobs con dependencias. Un job padre no se completa hasta que todos sus hijos terminen. Perfecto para pipelines multi-paso.

flows/cierre-caja.flow.js JS
const { FlowProducer } = require('bullmq');
const { connection } = require('../config/redis');

const flow = new FlowProducer({ connection });

// Pipeline: Cierre de caja diario en sucursal
// El job padre (resumen-dia) espera a que todos los hijos terminen
await flow.add({
  name:  'resumen-dia',         // ← padre: ejecuta al final
  queueName: 'reportes',
  data: { sucursalId: 7, fecha: '2025-01-15' },

  children: [
    {
      name:      'timbrar-cfdis-pendientes', // hijo 1
      queueName: 'cfdi',
      data:      { sucursalId: 7, lote: 'cierre' },
    },
    {
      name:      'sync-inventario-final',     // hijo 2
      queueName: 'sync',
      data:      { sucursalId: 7, tipo: 'inventario' },
    },
    {
      name:      'calcular-mermas-dia',       // hijo 3 (tiene nieto)
      queueName: 'inventario',
      data:      { sucursalId: 7 },
      children: [
        {
          name:      'exportar-mermas-pdf',   // nieto
          queueName: 'reportes',
          data:      { sucursalId: 7, formato: 'pdf' },
        },
      ],
    },
  ],
});
🧾
timbrar-cfdis
queue: cfdi
🔄
sync-inventario
queue: sync
📉
calcular-mermas
queue: inventario
↓ todos completados ↓
📊
resumen-dia
queue: reportes · padre

Capítulo 10

Prioridades y Rate Limiting

Prioridades + Limiter JS
// ── Prioridades (menor número = mayor urgencia) ───────
await cfdiQueue.add('cancelacion-urgente', data, { priority: 1 });
await cfdiQueue.add('timbrar-venta',        data, { priority: 10 });
await cfdiQueue.add('cfdi-nomina',           data, { priority: 50 });
// → Se procesarán en orden: 1, 10, 50

// ── Rate Limiter: proteger API del PAC ─────────────
const workerConLimite = new Worker('cfdi', processor, {
  connection,
  concurrency: 5,
  limiter: {
    max:      20,     // max 20 jobs…
    duration: 1000,  // …por segundo
  },
  // Los jobs que excedan el límite esperan automáticamente
});

Capítulo 11

Dead-Letter Queue

Cuando un job agota todos sus reintentos, queda en estado failed. Con una Dead-Letter Queue puedes moverlos a una cola separada para inspección manual, alertas o reprocesamiento posterior.

workers/dlq.handler.js JS
const { Worker, Queue } = require('bullmq');
const { connection } = require('../config/redis');
const { enviarAlertaPostmark } = require('../services/postmark');

const dlqQueue = new Queue('cfdi-dlq', { connection });

// Worker principal: al agotar reintentos → mueve a DLQ
const cfdiWorker = new Worker('cfdi', processor, {
  connection,
  concurrency: 3,
});

cfdiWorker.on('failed', async (job, err) => {
  // ¿Agotó todos los reintentos?
  if (job.attemptsMade >= job.opts.attempts) {
    // Mover a Dead-Letter Queue con contexto de error
    await dlqQueue.add('job-muerto', {
      originalJob:    job.name,
      originalData:   job.data,
      failedReason:   err.message,
      attemptsMade:   job.attemptsMade,
      failedAt:       new Date().toISOString(),
    }, { jobId: `dlq-${job.id}` });

    // Alerta por Postmark a operaciones
    await enviarAlertaPostmark({
      to:      'operaciones@vlim.mx',
      subject: `❌ CFDI sin timbrar: venta ${job.data.ventaId}`,
      body:    `Error: ${err.message} · Sucursal: ${job.data.sucursalId}`,
    });
  }
});

// Worker que procesa la DLQ (para reproceso manual)
new Worker('cfdi-dlq', async (job) => {
  console.warn('☠️ Job en DLQ esperando revisión manual:', job.data);
  // Aquí: notificar, guardar en BD, o reprocesar con fix
}, { connection, concurrency: 1 });

Capítulo 12

Bull Board — Dashboard

Bull Board es una UI web para monitorear todas tus colas en tiempo real. Muestra jobs activos, fallidos, completados, permite reintentar jobs manualmente y ver el stack trace de errores.

dashboard/bull-board.js JS
const { createBullBoard } = require('@bull-board/api');
const { BullMQAdapter }  = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter }  = require('@bull-board/express');
const express            = require('express');

// npm install @bull-board/api @bull-board/express

const { cfdiQueue, syncQueue, reportQueue, nominaQueue } =
  require('./queues');

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(cfdiQueue),
    new BullMQAdapter(syncQueue),
    new BullMQAdapter(reportQueue),
    new BullMQAdapter(nominaQueue),
  ],
  serverAdapter,
});

const app = express();
// ⚠️ Protege con autenticación en producción!
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3001, () => {
  console.log('📊 Bull Board: http://localhost:3001/admin/queues');
});
🔒
Protege Bull Board en producción Bull Board da acceso total a tus colas, incluyendo datos de jobs (pueden contener RFC, datos de nómina, etc.). Protégelo con middleware de autenticación (JWT, sesión) o restringe el acceso a la red interna VPN del CEDIS.

Capítulo 13

ERP Carnicerías — Caso Real

Mapa completo de colas usadas en el ERP, con sus workers y el rol de cada una en la operación del negocio.

queues/index.js — todas las colas del ERP JS
const { Queue } = require('bullmq');
const { connection } = require('../config/redis');

const defaults = {
  connection,
  defaultJobOptions: {
    removeOnComplete: { count: 1000 },
    removeOnFail:     { count: 500  },
  },
};

module.exports = {
  // ── Fiscales (alta prioridad) ──────────────────────
  cfdiQueue:       new Queue('cfdi',       defaults), // timbrado CFDI 4.0
  cartaPorteQueue: new Queue('carta-porte',defaults), // complemento traslado
  nominaQueue:     new Queue('nomina',     defaults), // CFDI nómina empleados

  // ── Inventario y calidad ───────────────────────────
  mermasQueue:     new Queue('mermas',     defaults), // cálculo y registro
  vigenciasQueue:  new Queue('vigencias',  defaults), // alertas caducidad
  stockQueue:      new Queue('stock',      defaults), // reposición automática

  // ── Sincronización offline ─────────────────────────
  syncQueue:       new Queue('sync',       defaults), // delta sync sucursales

  // ── Reportes y notificaciones ─────────────────────
  reportQueue:     new Queue('reportes',   defaults), // generación async
  notifQueue:      new Queue('notif',      defaults), // Postmark/SMS/Push

  // ── Dead-letter ────────────────────────────────────
  dlqQueue:        new Queue('dlq',        defaults), // jobs sin recuperar
};
🥩
Flujo completo: venta en sucursal Cajero cobra → POS emite evento sale.completed → Sales Service (Moleculer) → encola en cfdi (timbrar) + sync (delta a AWS) + stock (descontar inventario) → BullMQ procesa en paralelo → notif envía ticket por email vía Postmark.

Capítulo 14

Patrones y Buenas Prácticas

🆔
jobId idempotente
Siempre usa jobId derivado del ID de negocio. Si el job ya existe, BullMQ lo ignora. Previene doble timbrado en reconexiones.
🔀
Separar Queue por dominio
No uses una sola cola para todo. Separa por dominio (cfdi, sync, nomina) para poder escalar workers independientemente.
🛡️
Siempre DLQ
Nunca dejes jobs fallidos sin atención. Implementa DLQ + alerta Postmark para cualquier cola crítica del ERP.
📊
updateProgress()
Reporta progreso en jobs largos (nómina, reportes). Permite ver el avance en Bull Board y calcular ETAs.
🧪
Testea el procesador
La función procesadora es pure JS. Testéala con Jest sin necesidad de Redis. Mockea job.data y job.updateProgress.
⏱️
timeout en jobs largos
Pon siempre timeout en jobs con llamadas externas (PAC, bancos). Sin timeout, un job colgado bloquea un slot de concurrency.
Patrón: Graceful Shutdown PRODUCCIÓN
// Cierre limpio al detener el servidor (SIGTERM en ECS)
async function gracefulShutdown() {
  console.log('🛑 Cerrando workers...');

  // Espera a que los jobs activos terminen (max 30s)
  await Promise.all([
    cfdiWorker.close(),
    syncWorker.close(),
    reportWorker.close(),
  ]);

  console.log('✅ Workers cerrados limpiamente');
  process.exit(0);
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT',  gracefulShutdown);
📚
Recursos oficiales docs.bullmq.io — documentación completa · github.com/taskforcesh/bullmq — código fuente · github.com/felixmosh/bull-board — UI dashboard