¿Qué es Moleculer?
Framework de microservicios rápido, moderno y potente para Node.js.
Diseñado para construir sistemas distribuidos confiables sin burocracia.
Moleculer es un framework de microservicios que permite dividir una aplicación en servicios independientes que se comunican entre sí mediante mensajes. Cada servicio expone acciones (como endpoints) y emite/escucha eventos. El ServiceBroker es el orquestador central que conecta todo.
Su punto diferencial: viene con Service Discovery automático, balanceo de carga, circuit breaker, caché, reintentos y métricas incorporados. No necesitas configurar nada para empezar a tener un sistema resiliente.
broker.call('inventory.descuentoStock') — puede estar en el mismo proceso o en un servidor en Singapur. El código es idéntico; solo cambia el transporter.
ServiceBroker
El corazón de Moleculer. Es el contenedor que gestiona todos los servicios, el transporter de mensajes, el registro de nodos y la configuración global del sistema.
const { ServiceBroker } = require('moleculer'); const broker = new ServiceBroker({ // ── Identidad del nodo ───────────────────────────── nodeID: 'node-cedis-01', // único en el cluster namespace: 'erp-prod', // separa ambientes (prod/staging) // ── Transporter (cómo hablan los nodos entre sí) ── transporter: { type: 'Redis', options: { host: process.env.REDIS_HOST, port: 6379, password: process.env.REDIS_PASSWORD, }, }, // ── Caché (opcional, mejora latencia de actions) ── cacher: { type: 'Redis', options: { ttl: 60 }, // 60 segundos default }, // ── Circuit Breaker global ───────────────────────── circuitBreaker: { enabled: true, threshold: 0.5, // 50% de errores activa el corte minRequestCount: 10, windowTime: 60, // ventana de 60s halfOpenTime: 5000, }, // ── Reintentos globales ──────────────────────────── retryPolicy: { enabled: true, retries: 3, delay: 500, maxDelay: 2000, factor: 2, // backoff exponencial }, // ── Logging ──────────────────────────────────────── logger: { type: 'Console', options: { level: 'info', formatter: 'full' }, }, // ── Métricas (Prometheus) ────────────────────────── metrics: { enabled: true, reporter: { type: 'Prometheus', options: { port: 3030 } }, }, }); // Arrancar el broker y todos sus servicios broker.start() .then(() => console.log('✅ Broker activo — nodo:', broker.nodeID)); module.exports = broker;
Services — Unidad Básica
Un servicio es un módulo con nombre, que contiene acciones (llamables desde fuera), eventos (que emite o escucha) y lógica interna. Es el equivalente a un controlador + modelo en arquitecturas clásicas.
// Un servicio es un objeto plano que el broker registra module.exports = { name: 'inventario', // identificador único del servicio version: 2, // versiona acciones: v2.inventario.stock // ── Configuración y constantes del servicio ──────── settings: { stockMinimo: 5, alertaEmail: 'inventario@carniceria.mx', }, // ── Dependencias: espera a que estén disponibles ── dependencies: [ 'productos', 'notificaciones', ], // ── Acciones (expuestas al exterior) ──────────────── actions: { obtenerStock: { cache: { ttl: 30 }, // cachear 30s params: { productoId: 'string', // validación automática sucursalId: 'string', }, async handler(ctx) { return this.obtenerDelDB(ctx.params); }, }, descontarStock: { params: { productoId: 'string', cantidad: 'number|positive', sucursalId: 'string', ventaId: 'string', }, async handler(ctx) { const { productoId, cantidad, sucursalId } = ctx.params; const stock = await this.actualizarStock(productoId, -cantidad, sucursalId); // Emitir evento si el stock queda bajo mínimo if (stock.actual <= this.settings.stockMinimo) { ctx.emit('inventario.stockBajo', { productoId, sucursalId, stock: stock.actual }); } // Invalidar caché de obtenerStock await this.broker.cacher.clean(`inventario.obtenerStock:**`); return stock; }, }, }, // ── Eventos (que el servicio escucha) ─────────────── events: { 'ventas.completada': { async handler(ctx) { // Cuando se registra una venta → descontar stock for (const item of ctx.params.items) { await this.broker.call('inventario.descontarStock', item); } }, }, }, // ── Métodos privados del servicio ─────────────────── methods: { async actualizarStock(productoId, delta, sucursalId) { // lógica de DB aquí… }, async obtenerDelDB(params) { // query a PostgreSQL… }, }, // ── Hooks de ciclo de vida ────────────────────────── async started() { this.logger.info('🥩 Servicio inventario iniciado'); }, async stopped() { this.logger.info('🛑 Servicio inventario detenido'); }, };
Actions — Llamadas RPC
Las acciones son como endpoints de una API REST, pero invocables entre servicios. Soportan validación de parámetros, caché, streaming y metadatos de contexto.
// ── Llamada simple ──────────────────────────────────── const resultado = await broker.call('inventario.obtenerStock', { productoId: 'prod-001', sucursalId: 'suc-07', }); // ── Llamada con opciones ────────────────────────────── const stock = await broker.call('inventario.obtenerStock', params, { timeout: 5000, // ms antes de lanzar TimeoutError retries: 3, // override del retry global nodeID: 'node-suc-07',// forzar nodo específico meta: { userId: 'usr-1', traceId: 'abc123' }, // propagado en toda la cadena }); // ── Llamadas en paralelo (mcall) ────────────────────── const [producto, proveedor, historial] = await broker.mcall([ { action: 'productos.get', params: { id: 'p-1' } }, { action: 'proveedores.get', params: { id: 'pr-1' } }, { action: 'compras.historial', params: { productoId: 'p-1' } }, ]); // Ejecutan en paralelo → latencia = MAX(t1, t2, t3) // ── Llamada desde dentro de un servicio (ctx.call) ─── actions: { crearOrden: { async handler(ctx) { // ctx.call propaga metadatos y trace automáticamente const ok = await ctx.call('inventario.verificarStock', ctx.params); if (!ok) throw new MoleculerError('Stock insuficiente', 422); // ... }, }, }, // ── Validación automática de parámetros ─────────────── // Moleculer usa fastest-validator para las params definidas params: { nombre: 'string|min:2|max:100', precio: 'number|positive', cantidad: 'number|integer|positive', email: 'email|optional', tags: 'array|items:string|optional', } // Si los params no pasan validación → ValidationError automático
Events — Pub/Sub
Los eventos permiten comunicación desacoplada. Un servicio emite un evento; todos los servicios suscritos lo reciben. No hay respuesta (fire-and-forget), pero son ideales para notificar cambios de estado.
// ── Emitir eventos ──────────────────────────────────── // Desde un handler de acción (ctx.emit → mismo namespace) async handler(ctx) { const venta = await this.guardarVenta(ctx.params); // emit: todos los servicios suscritos reciben el evento ctx.emit('ventas.completada', { venta, sucursalId: ctx.params.sucursalId }); // broadcast: todos los NODOS reciben el evento (no solo un worker) ctx.broadcast('sistema.ventaRegistrada', { ventaId: venta.id }); return venta; }, // Desde el broker (fuera de un servicio) broker.emit('inventario.stockActualizado', { productoId, nuevo: 42 }); // ── Escuchar eventos (en otro servicio) ─────────────── events: { // Escucha simple 'ventas.completada'(ctx) { this.logger.info('📦 Venta recibida, actualizando stock...'); return this.actualizarInventario(ctx.params.venta); }, // Con opciones avanzadas 'inventario.stockBajo': { group: 'reposicion', // con group: solo UNA instancia del servicio recibe async handler(ctx) { const { productoId, sucursalId, stock } = ctx.params; await ctx.call('notificaciones.alertarReposicion', { productoId, sucursalId, stock, mensaje: `⚠️ Stock bajo: ${stock} unidades en ${sucursalId}`, }); }, }, },
ctx.emit() — solo una instancia del grupo recibe el evento (load-balanced entre instancias del mismo servicio). Ideal para tareas. ctx.broadcast() — TODOS los nodos que tienen el servicio reciben el evento. Ideal para invalidar cachés locales o sincronizar estado entre nodos.
Transporter — Redis
El Transporter es la capa de mensajería que conecta múltiples nodos en un cluster distribuido. Con Redis Transporter, los servicios en diferentes servidores se comunican transparentemente.
// En CEDIS (nodo central) const brokerCEDIS = new ServiceBroker({ nodeID: 'cedis', transporter: 'redis://user:pass@elasticache.aws.com:6379', // Con TLS para AWS ElastiCache: transporter: { type: 'Redis', options: { host: 'my-cluster.cache.amazonaws.com', port: 6380, tls: {}, // TLS habilitado password: process.env.REDIS_AUTH, }, }, }); // En Sucursal #7 (nodo remoto, distinto servidor) const brokerSuc7 = new ServiceBroker({ nodeID: 'sucursal-07', transporter: 'redis://...', // mismo Redis = mismo cluster }); // ← Ahora brokerCEDIS.call('ventas.registrar') puede llamar // ← al servicio ventas que corre en sucursal-07 de forma transparente. // ← El broker resuelve la ubicación automáticamente. // ── Qué usa cada canal Redis ────────────────────────── // MOL-erp-prod.DISCOVER → heartbeat y descubrimiento // MOL-erp-prod.INFO → info del nodo (servicios disponibles) // MOL-erp-prod.REQ.xxx → requests (acciones) // MOL-erp-prod.RES.xxx → responses // MOL-erp-prod.EVENT.xxx → eventos // MOL-erp-prod.DISCONNECT → notificación de desconexión
Lifecycle Hooks
module.exports = { name: 'ventas', // ── created: servicio instanciado pero no iniciado ── created() { this.db = new PostgresPool(config); this.cache = new Map(); }, // ── started: broker conectado, Redis disponible ───── async started() { await this.db.connect(); this.logger.info('💼 Servicio Ventas iniciado'); // Registrar jobs repetitivos en BullMQ al arrancar await registrarJobsRepetitivos(); }, // ── stopped: SIGTERM, broker.stop(), etc. ────────── async stopped() { await this.db.end(); // cerrar conexiones await this.worker.close(); // cerrar BullMQ worker this.logger.info('🛑 Servicio Ventas detenido limpiamente'); }, };
Mixins — Reutilización
Los mixins permiten compartir acciones, eventos, settings y métodos entre múltiples servicios. Son como clases base que se mezclan con el servicio hijo.
// ── Definir un mixin reutilizable ───────────────────── const BaseCrudMixin = { settings: { pageSize: 20, }, actions: { listar: { params: { pagina: 'number|integer|optional' }, async handler(ctx) { return this.paginar(ctx.params.pagina); }, }, obtener: { params: { id: 'string' }, async handler(ctx) { return this.findById(ctx.params.id); }, }, eliminar: { params: { id: 'string' }, async handler(ctx) { return this.softDelete(ctx.params.id); }, }, }, methods: { paginar(pagina = 1) { /* query con LIMIT/OFFSET */ }, softDelete(id) { /* marca deleted_at en lugar de DELETE */ }, }, }; // ── Usar el mixin en servicios ──────────────────────── module.exports = { name: 'productos', mixins: [BaseCrudMixin], // ← hereda listar, obtener, eliminar actions: { // Solo agrega las acciones específicas de productos buscarPorCodigo: { params: { codigo: 'string' }, async handler(ctx) { return this.findByCodigo(ctx.params.codigo); }, }, }, };
Middleware — Interceptores
Los middlewares envuelven el pipeline de llamadas. Permiten autenticación, logging, validación de JWT, rate limiting y cualquier lógica transversal a todos los servicios.
// Middleware de autenticación JWT para todas las acciones module.exports = { name: 'AuthMiddleware', // Envuelve TODAS las llamadas a acciones localAction(next, action) { return async function(ctx) { // Acciones públicas: no requieren auth const esPublica = action.name.startsWith('auth.') || action.public; if (!esPublica) { const token = ctx.meta.token; if (!token) throw new MoleculerError('Token requerido', 401); const usuario = verificarJWT(token); ctx.meta.usuario = usuario; // disponible en ctx.meta.usuario } return next(ctx); // continuar con la acción }; }, }; // Registrar el middleware en el broker const broker = new ServiceBroker({ middlewares: [ require('./middleware/auth.middleware'), require('./middleware/audit.middleware'), require('./middleware/rate-limit.middleware'), ], });
API Gateway — moleculer-web
moleculer-web expone las acciones de todos los servicios como endpoints HTTP REST (y WebSocket). Los clientes React se conectan solo al Gateway; no saben de los servicios internos.
const ApiService = require('moleculer-web'); module.exports = { name: 'api', mixins: [ApiService], // toda la magia viene del mixin settings: { port: 4000, cors: { origin: ['https://erp.carniceria.mx'], credentials: true, }, routes: [ { path: '/api/v1', // Auth por defecto en todas las rutas authorization: true, // Mapeo manual de endpoints (o usar auto-aliases) aliases: { // REST completo para productos: 'REST /productos': 'productos', // Rutas específicas: 'POST /ventas': 'ventas.registrar', 'GET /ventas/:id': 'ventas.obtener', 'GET /stock/:prod': 'inventario.obtenerStock', 'POST /auth/login': 'auth.login', }, // Whitelist: solo expone acciones listadas aquí whitelist: [ 'ventas.*', 'productos.*', 'inventario.*', 'auth.*', ], // Extraer token JWT del header y ponerlo en ctx.meta onBeforeCall(ctx, route, req) { ctx.meta.token = req.headers['authorization']?.split(' ')[1]; }, // Serializar errores de Moleculer a HTTP onError(req, res, err) { res.status(err.code || 500).json({ error: err.message, data: err.data }); }, }, // Ruta pública sin auth (login, health) { path: '/public', authorization: false, aliases: { 'POST /login': 'auth.login', 'GET /health': '$node.health', // acción interna }, }, ], }, };
Circuit Breaker
Protege el sistema ante fallos en cascada. Si un servicio falla repetidamente, el breaker "corta el circuito" y devuelve error inmediato sin intentar llamar al servicio caído.
actions: { timbrarCFDI: { params: { ventaId: 'string' }, // Circuit breaker específico para esta acción circuitBreaker: { enabled: true, threshold: 0.3, // corta con 30% de errores minRequestCount: 5, windowTime: 30, // en 30 segundos halfOpenTime: 10_000, // intenta recuperar a los 10s }, // Fallback cuando el breaker está abierto fallback(ctx, err) { this.logger.warn('⚠️ PAC no disponible, encolando para reintento'); // BullMQ guardará el job para cuando el PAC se recupere return encolarCFDIPendiente(ctx.params.ventaId); }, async handler(ctx) { return timbrarEnPAC(ctx.params); }, }, },
Retry & Fallback
// ── Retry por llamada ───────────────────────────────── const resultado = await ctx.call('banco.verificarPago', params, { retries: 5, delay: 1000, // espera 1s, 2s, 4s, 8s… (backoff exponencial) factor: 2, // Reintentar solo ante errores de red, no ValidationError check(err) { return err.type === 'NETWORK_ERROR'; }, }); // ── Fallback en la definición de la acción ──────────── actions: { consultarPrecio: { async handler(ctx) { return obtenerPrecioEnLinea(ctx.params); }, // Si el handler falla → devuelve precio de la caché local async fallback(ctx, err) { this.logger.warn('Usando precio cacheado:', err.message); return this.precioCache.get(ctx.params.productoId); }, }, },
Caching de Acciones
actions: { // Cache simple: TTL de 60s, key = todos los params listarProductos: { cache: true, // usa TTL del cacher global async handler(ctx) { return this.getProductos(); }, }, // Cache con TTL y keys específicas obtenerProducto: { cache: { ttl: 120, // 2 minutos keys: ['id'], // key = solo el param 'id' }, params: { id: 'string' }, async handler(ctx) { return this.findById(ctx.params.id); }, }, // Acción que actualiza → invalida caché relacionada actualizarProducto: { params: { id: 'string' }, async handler(ctx) { const prod = await this.update(ctx.params); // Limpiar entrada específica del caché await this.broker.cacher.clean(`productos.obtenerProducto:**`); await this.broker.cacher.clean(`productos.listarProductos:**`); // O emitir evento de caché (todos los nodos limpian su caché local) await ctx.broadcast('cache.limpiar', { servicio: 'productos' }); return prod; }, }, },
Framework MoRe
MoRe = Moleculer + React. La arquitectura custom del ERP Carnicerías que combina el frontend React con el backend Moleculer en una sola estructura de proyecto cohesiva.
more-erp/ ├── backend/ │ ├── broker.js ← ServiceBroker central │ ├── services/ │ │ ├── api.service.js ← API Gateway (moleculer-web) │ │ ├── ventas.service.js │ │ ├── inventario.service.js │ │ ├── finanzas.service.js │ │ ├── rrhh.service.js │ │ ├── calidad.service.js │ │ ├── reportes.service.js │ │ └── sync.service.js ← maneja offline de sucursales │ ├── workers/ ← BullMQ workers │ │ ├── cfdi.worker.js │ │ └── nomina.worker.js │ ├── queues/ ← BullMQ queues │ └── mixins/ │ ├── base-crud.mixin.js │ └── auditable.mixin.js │ ├── frontend/ │ ├── src/ │ │ ├── features/ ← módulos React por dominio │ │ │ ├── pos/ │ │ │ ├── inventario/ │ │ │ └── reportes/ │ │ ├── hooks/ │ │ │ ├── useMoleculer.js ← wrapper de broker.call → fetch │ │ │ └── useBascula.js │ │ └── App.jsx │ └── shared/ └── types/ ← TypeScript interfaces compartidas
Los 8 Servicios del ERP
REDIS
const { MoleculerError } = require('moleculer'); const AuditableMixin = require('../mixins/auditable.mixin'); const { encolarCFDI } = require('../queues'); module.exports = { name: 'ventas', mixins: [AuditableMixin], actions: { registrar: { params: { sucursalId: 'string', cajeroId: 'string', items: 'array|min:1', clienteRFC: 'string|optional', formaPago: 'string|enum:efectivo,tarjeta,transferencia', }, async handler(ctx) { const { sucursalId, cajeroId, items, clienteRFC, formaPago } = ctx.params; // 1. Verificar stock disponible (llama a inventario) const stockOk = await ctx.call('inventario.verificarItems', { items, sucursalId }); if (!stockOk) throw new MoleculerError('Stock insuficiente', 422, 'STOCK_ERROR'); // 2. Guardar la venta en DB const total = items.reduce((s, i) => s + i.precio * i.cantidad, 0); const venta = await this.guardarVenta({ sucursalId, cajeroId, items, total, formaPago }); // 3. Emitir evento → inventario descuenta, finanzas crea póliza ctx.emit('ventas.completada', { venta, items, sucursalId }); // 4. Encolar CFDI en BullMQ (asíncrono, no bloquea la respuesta) await encolarCFDI({ ventaId: venta.id, sucursalId, items, clienteRFC, total }); // 5. Respuesta inmediata al POS (el CFDI llega después por WebSocket) return { ventaId: venta.id, total, status: 'registrada' }; }, }, }, };
Patrones y Buenas Prácticas
version: 2 en el service. Permite deployar nuevas versiones sin romper clientes: v2.ventas.registrar.mcall. Es la diferencia entre latencia × 3 y latencia MAX.| Patrón | Moleculer | Cuándo |
|---|---|---|
| Síncrono | broker.call() | Necesitas la respuesta para continuar |
| Paralelo | broker.mcall() | Múltiples datos independientes a la vez |
| Fire & Forget | broker.emit() | Notificar sin esperar respuesta |
| Broadcast | broker.broadcast() | Invalidar caché en TODOS los nodos |
| Asíncrono | BullMQ Queue | Tareas pesadas, CFDI, nómina, reportes |
| Stream | broker.call() + stream | Archivos grandes, exportaciones CSV |