// 01

Que es Redis

Redis (Remote Dictionary Server) es una base de datos en memoria, extremadamente rapida. Persiste en disco opcionalmente. En el ERP lo usamos para tres cosas: cache, pub/sub entre microservicios, y cola de jobs con BullMQ.

Sub-milisegundo
Lecturas y escrituras en ~0.1ms. PostgreSQL tarda 1-10ms. 10-100x mas rapido para datos calientes.
🧠
En memoria
Todos los datos viven en RAM. Puede persistir a disco con RDB snapshots o AOF log.
🗂️
Estructuras nativas
String, Hash, List, Set, Sorted Set, Stream. Cada una con comandos especializados y atomicos.
📡
Pub/Sub + Streams
Mensajeria entre servicios. BullMQ usa Redis Streams internamente para las colas de jobs.
primeros-pasos — redis-cliCLI
# Instalar y arrancar Redis
# brew install redis && brew services start redis   (macOS)
# sudo apt install redis-server && sudo systemctl start redis  (Ubuntu)
# docker run -d -p 6379:6379 redis:7-alpine  (Docker)

# Conectar con redis-cli
redis-cli

# Comandos basicos de diagnostico
PING                     # PONG — confirma que Redis responde
INFO server              # version, uptime, modo
INFO memory              # uso de RAM
INFO stats               # comandos procesados, hits/misses de keyspace
DBSIZE                   # numero de keys en la BD actual
CONFIG GET maxmemory     # limite de memoria configurado
MONITOR                  # ver todos los comandos en tiempo real (dev only)

# Bases de datos — Redis tiene 16 (0-15) por defecto
SELECT 0                 # BD 0 (default)
SELECT 1                 # BD 1 para tests
FLUSHDB                  # borrar BD actual (cuidado en prod)
src/redis/client.ts — conexion con ioredisNODE
import Redis from 'ioredis';

// ioredis es el cliente Node.js mas completo para Redis
// npm install ioredis

export const redis = new Redis({
  host:     process.env.REDIS_HOST ?? 'localhost',
  port:     parseInt(process.env.REDIS_PORT ?? '6379'),
  password: process.env.REDIS_PASSWORD,
  db:       0,

  // Reconexion automatica
  retryStrategy: (times) => {
    if (times > 10) return null;  // rendirse despues de 10 intentos
    return Math.min(times * 200, 3000);  // esperar 200ms, 400ms... max 3s
  },

  // Prefijo global para separar namespaces
  keyPrefix: 'erp:',

  // Timeout de conexion
  connectTimeout:  5000,
  commandTimeout: 3000,

  // TLS en produccion
  tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
});

redis.on('connect',     () => console.log('Redis conectado'));
redis.on('error',  (err) => console.error('Redis error:', err.message));
redis.on('reconnecting', () => console.warn('Redis reconectando...'));

// Graceful shutdown
export async function closeRedis() {
  await redis.quit();
}
process.on('SIGTERM', closeRedis);
process.on('SIGINT',  closeRedis);

// 02

Strings y Contadores

String es el tipo mas simple y versatil de Redis. Puede guardar texto, numeros, JSON serializado o datos binarios. INCR/DECR son atomicos — perfectos para contadores concurrentes.

strings.redis + strings.tsSTRING
# ── Comandos basicos ─────────────────────────────────
SET  usuario:101:nombre "Maria Garcia"
GET  usuario:101:nombre          # "Maria Garcia"
DEL  usuario:101:nombre

# SET con opciones
SET  sesion:abc123 "user-101"  EX 3600   # expira en 1 hora
SET  lock:venta   "worker-1"  NX        # NX = solo si NO existe (mutex)
SET  config:iva   "0.16"       XX        # XX = solo si YA existe
GETEX sesion:abc123             EX 3600   # GET y renovar TTL
GETDEL token:temporal                     # GET y DELETE atomico

# Contadores atomicos
SET   ventas:hoy:count 0
INCR  ventas:hoy:count           # 1 (atomico, seguro con concurrencia)
INCRBY ventas:hoy:total 450     # suma 450 al contador
INCRBYFLOAT ventas:hoy:kg 2.5   # suma float
DECR  stock:prod-01              # restar 1

# Multiple SET/GET en una sola operacion (mas eficiente)
MSET p:r001:precio "189.50"  p:r001:stock "25"  p:r001:activo "1"
MGET p:r001:precio p:r001:stock p:r001:activo

# JSON serializado (patron muy comun)
SET  producto:uuid-001 '{"id":"uuid","nombre":"Arrachera","precio":189.50}'  EX 300
GET  producto:uuid-001
strings.ts — uso en Node.js con ioredisNODE
import { redis } from './client.js';

// Guardar y recuperar JSON
async function cacheProducto(producto: Producto): Promise<void> {
  await redis.set(
    `producto:${producto.id}`,
    JSON.stringify(producto),
    'EX', 300            // TTL 5 minutos
  );
}

async function getProductoCache(id: string): Promise<Producto | null> {
  const raw = await redis.get(`producto:${id}`);
  return raw ? JSON.parse(raw) : null;
}

// Contador de ventas del dia (atomico, seguro con multiples workers)
async function incrementarContadorVenta(sucursalId: string, monto: number) {
  const hoy = new Date().toISOString().slice(0,10);  // "2024-01-15"
  const pipe = redis.pipeline();
  pipe.incr(`ventas:${sucursalId}:${hoy}:count`);
  pipe.incrbyfloat(`ventas:${sucursalId}:${hoy}:total`, monto);
  pipe.expireat(`ventas:${sucursalId}:${hoy}:count`, endOfDay());
  pipe.expireat(`ventas:${sucursalId}:${hoy}:total`, endOfDay());
  await pipe.exec();
}

// Distributed lock — evitar doble procesamiento
async function adquirirLock(recurso: string, ttl = 30): Promise<boolean> {
  const ok = await redis.set(
    `lock:${recurso}`, '1', 'EX', ttl, 'NX'
  );
  return ok === 'OK';
}

async function liberarLock(recurso: string): Promise<void> {
  await redis.del(`lock:${recurso}`);
}

// 03

Hashes — Objetos

Un Hash es un objeto clave-valor dentro de una key de Redis. Ideal para representar entidades como productos, sesiones o configuraciones — mas eficiente en memoria que serializar JSON.

hashes.redis + hashes.tsHASH
# ── Comandos HASH ─────────────────────────────────────
HSET  producto:uuid-001  nombre "Arrachera Premium"  precio "189.50"  categoria "res"
HGET  producto:uuid-001  nombre          # "Arrachera Premium"
HMGET producto:uuid-001  nombre precio   # ["Arrachera Premium", "189.50"]
HGETALL producto:uuid-001               # todos los campos

# Actualizar un campo sin tocar los demas
HSET  producto:uuid-001  precio "199.00"
HINCRBY producto:uuid-001  stock 10      # incrementar campo numerico
HINCRBYFLOAT producto:uuid-001 precio 5.50

# Verificar existencia
HEXISTS producto:uuid-001  nombre        # 1 (si existe), 0 (no)
HLEN    producto:uuid-001                # numero de campos
HKEYS   producto:uuid-001                # ["nombre","precio","categoria",...]
HVALS   producto:uuid-001                # ["Arrachera Premium","199.00",...]
HDEL    producto:uuid-001  descripcion   # eliminar campo

# HSETNX — set solo si el campo NO existe
HSETNX producto:uuid-001  codigo "RES-001"

# Caso de uso: sesion de usuario
HSET  sesion:tok-abc123
  userId    "user-101"
  sucursalId "suc-01"
  rol       "cajero"
  loginAt   "2024-01-15T08:00:00Z"
EXPIRE sesion:tok-abc123 28800           # 8 horas
session.service.ts — sesiones con HashNODE
import { redis } from './client.js';
import { randomUUID } from 'node:crypto';

interface Session {
  userId: string; sucursalId: string;
  rol: string; loginAt: string;
}

const SESSION_TTL = 8 * 60 * 60;  // 8 horas en segundos

export async function crearSesion(data: Session): Promise<string> {
  const token = randomUUID();
  const key   = `sesion:${token}`;

  // Pipeline: HSET + EXPIRE en una sola ida al servidor
  const pipe = redis.pipeline();
  pipe.hset(key, data as any);
  pipe.expire(key, SESSION_TTL);
  await pipe.exec();

  return token;
}

export async function getSesion(token: string): Promise<Session | null> {
  const data = await redis.hgetall(`sesion:${token}`);
  if (!data || !data.userId) return null;

  // Renovar TTL en cada uso (sliding expiration)
  await redis.expire(`sesion:${token}`, SESSION_TTL);
  return data as Session;
}

export async function cerrarSesion(token: string): Promise<void> {
  await redis.del(`sesion:${token}`);
}

// Actualizar solo el campo de ultima actividad
export async function actualizarActividad(token: string): Promise<void> {
  await redis.hset(`sesion:${token}`, 'lastActivity', Date.now().toString());
}

// 04

Lists y Queues

Una List de Redis es una lista enlazada. Inserciones en los extremos son O(1). Ideal para colas FIFO (LPUSH + BRPOP), stacks LIFO, y los ultimos N eventos de actividad.

lists.redis + queue.tsLIST
# ── Comandos LIST ─────────────────────────────────────
# LPUSH agrega al inicio (left), RPUSH al final (right)
RPUSH cola:notificaciones '{"tipo":"venta","id":"v-001"}'
RPUSH cola:notificaciones '{"tipo":"stock_bajo","prodId":"p-01"}'
LPUSH cola:urgente       '{"tipo":"alerta_critica"}'  # insert al frente

# Leer sin consumir
LLEN   cola:notificaciones        # longitud
LRANGE cola:notificaciones 0 -1  # todos los elementos
LRANGE cola:notificaciones 0  9  # primeros 10
LINDEX cola:notificaciones 0     # primer elemento

# Consumir (pop)
LPOP  cola:notificaciones         # consumir del inicio (FIFO con RPUSH)
RPOP  cola:notificaciones         # consumir del final (LIFO)
LPOP  cola:notificaciones 5       # consumir 5 a la vez (Redis 6.2+)

# BRPOP / BLPOP — blocking pop (espera hasta que haya datos)
BRPOP cola:notificaciones cola:urgente 30  # esperar 30s, prioridad a cola:urgente

# Historial de actividad — mantener solo los ultimos 100
LPUSH  actividad:user-101 '{"accion":"login","ts":1705312800}'
LTRIM  actividad:user-101 0 99     # conservar solo 100 ultimos
ℹ️
Lists vs BullMQ para colasPara colas simples con un consumer, BRPOP funciona. Para colas con reintentos, prioridad, jobs fallidos, concurrencia controlada y visibilidad — usa BullMQ que internamente usa Redis Streams. En el ERP, BullMQ maneja la mayoria de colas de trabajo.

// 05

Sets y Sorted Sets

Set: coleccion sin duplicados, operaciones de conjuntos O(1). Sorted Set (ZSet): igual pero cada miembro tiene un score numerico — permite rankings, leaderboards y rangos por tiempo.

sets-zsets.redisSET
# ── SET — sin duplicados, O(1) ────────────────────────
SADD   sucursal:s1:cajeros  "user-101" "user-102" "user-103"
SREM   sucursal:s1:cajeros  "user-103"        # eliminar miembro
SISMEMBER sucursal:s1:cajeros "user-101"      # 1 (es miembro)
SMEMBERS sucursal:s1:cajeros                  # todos los miembros
SCARD  sucursal:s1:cajeros                    # cantidad

# Operaciones de conjuntos
SUNION  sucursal:s1:cajeros sucursal:s2:cajeros  # union
SINTER  permisos:admin      permisos:cajero      # interseccion
SDIFF   permisos:admin      permisos:cajero      # diferencia

# ── SORTED SET — con score numerico ──────────────────
# ZADD key score member
ZADD ranking:ventas:mes  45320.50  "suc-centro"
ZADD ranking:ventas:mes  38100.00  "suc-polanco"
ZADD ranking:ventas:mes  52700.75  "suc-coyoacan"

# Leer ranking (0=primero, -1=ultimo)
ZRANGE     ranking:ventas:mes 0 -1 WITHSCORES REV  # mayor a menor
ZREVRANK   ranking:ventas:mes "suc-centro"             # posicion (0=1er lugar)
ZSCORE     ranking:ventas:mes "suc-centro"             # 45320.50
ZINCRBY    ranking:ventas:mes 1250.00 "suc-centro"     # incrementar score

# Rango por score — sucursales con ventas entre 30k y 50k
ZRANGEBYSCORE ranking:ventas:mes 30000 50000 WITHSCORES

# Rate limiting con ZSet — ventana deslizante
# (ver cap 06 para el patron completo)
ZADD rate:user-101:api 1705312800000 "req-1"
ZREMRANGEBYSCORE rate:user-101:api 0 "(1705312740000"  # eliminar > 1 min
ZCARD rate:user-101:api                                 # requests en ultimo min

// 06

TTL y Expiración

Redis puede expirar keys automaticamente. TTL (Time To Live) es fundamental para cache, sesiones, rate limiting y cualquier dato temporal. Sin TTL, Redis crece hasta llenar la RAM.

expiry.redis + rate-limit.tsTTL
# ── Comandos de expiracion ────────────────────────────
EXPIRE    sesion:tok    3600       # TTL en segundos
PEXPIRE   sesion:tok  3600000    # TTL en milisegundos
EXPIREAT  cache:reporte 1705359600 # timestamp Unix exacto
PERSIST   config:global           # eliminar TTL (hacer permanente)

# Consultar TTL
TTL   sesion:tok   # segundos restantes (-1=sin TTL, -2=no existe)
PTTL  sesion:tok   # milisegundos restantes

# Expiracion por campo en Hash (Redis 7.4+)
HEXPIRE producto:uuid 300 FIELDS 1 precio_oferta
rate-limit.ts — rate limiting con ventana deslizanteNODE
import { redis } from './client.js';

// Rate limiter con Sorted Set — ventana deslizante de 60 segundos
export async function checkRateLimit(
  userId: string,
  maxRequests = 100,
  windowSec   = 60
): Promise<{ allowed: boolean; remaining: number; resetIn: number }> {
  const key  = `rate:${userId}`;
  const now  = Date.now();
  const from = now - windowSec * 1000;

  const pipe = redis.pipeline();
  pipe.zremrangebyscore(key, 0, from);              // limpiar fuera de ventana
  pipe.zadd(key, now, `${now}-${Math.random()}`);    # registrar request actual
  pipe.zcard(key);                                    // contar requests en ventana
  pipe.pexpire(key, windowSec * 1000);              // renovar TTL
  const results = await pipe.exec();

  const count   = results?.[2]?.[1] as number ?? 0;
  const allowed = count <= maxRequests;

  if (!allowed) {
    // Si se excede, deshacer el zadd que acabamos de hacer
    await redis.zremrangebyscore(key, now, now + 1);
  }

  return {
    allowed,
    remaining: Math.max(0, maxRequests - count),
    resetIn:   windowSec,
  };
}

// Middleware de Moleculer
export const rateLimitMiddleware = {
  async localAction(handler: Function, action: any) {
    return async (ctx: any) => {
      if (ctx.meta.userId) {
        const { allowed, remaining } = await checkRateLimit(ctx.meta.userId);
        if (!allowed) throw new Error('Rate limit excedido');
        ctx.meta.$responseHeaders = { 'X-RateLimit-Remaining': remaining };
      }
      return handler(ctx);
    };
  }
};
Caso de usoTTL recomendadoEstructura
Sesion de cajero8 horas (28800s)Hash
Cache de producto5 minutos (300s)String / Hash
Cache de reporte diariohasta medianocheString (JSON)
Token de reset password15 minutos (900s)String
Distributed lock30s (con heartbeat)String + NX
Rate limit window=ventana (60s)Sorted Set

// 07

Cache con Node.js

El patron cache-aside es el mas comun: buscar en Redis, si no esta (cache miss) consultar PostgreSQL y guardar en Redis. El decorador @Cached encapsula esto en el ERP.

Demo — simulacion Cache-Aside con hit/miss y TTL▶ LIVE
// Simular Cache-Aside pattern const redisStore = new Map(); // simula Redis const dbQueryCount = { n: 0 }; // contador de queries a PG // Simula postgres (lento, 50ms) async function queryDB(id) { dbQueryCount.n++; await new Promise(r => setTimeout(r, 50)); // simular latencia return { id, nombre: 'Arrachera Premium', precio: 189.50 }; } // Cache-aside: Redis primero, PG si miss async function getProducto(id) { const cacheKey = `producto:${id}`; const cached = redisStore.get(cacheKey); if (cached) { console.log(`HIT ${cacheKey} ← Redis (0ms)`); return JSON.parse(cached); } console.log(`MISS ${cacheKey} → consultar PostgreSQL`); const t0 = Date.now(); const producto = await queryDB(id); const ms = Date.now() - t0; // Guardar en cache con TTL simulado (5 min) redisStore.set(cacheKey, JSON.stringify(producto)); console.log(`SET ${cacheKey} ← guardado en Redis (${ms}ms de PG)`); return producto; } // Test: 4 requests al mismo producto for (let i = 0; i < 4; i++) { const p = await getProducto('uuid-001'); console.log(` → nombre: ${p.nombre}`); } console.log(` Total queries a PostgreSQL: ${dbQueryCount.n} (de 4 requests)`);
// outputHaz clic en Ejecutar...
src/cache/cache.service.ts — cache generico tipadoNODE
import { redis } from '../redis/client.js';

export class CacheService {
  // Cache-aside generico
  static async get<T>(
    key:    string,
    fetch:  () => Promise<T>,
    ttlSec = 300
  ): Promise<T> {
    const cached = await redis.get(key);
    if (cached) return JSON.parse(cached) as T;

    const data = await fetch();
    await redis.set(key, JSON.stringify(data), 'EX', ttlSec);
    return data;
  }

  // Invalidar una key o patron
  static async del(key: string): Promise<void> {
    await redis.del(key);
  }

  // Invalidar por patron (p.ej. "producto:*" — usa SCAN, no KEYS)
  static async delPattern(pattern: string): Promise<number> {
    let cursor = '0';
    let deleted = 0;
    do {
      const [next, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
      cursor = next;
      if (keys.length) {
        await redis.del(...keys);
        deleted += keys.length;
      }
    } while (cursor !== '0');
    return deleted;
  }

  // Invalidar cache cuando se modifica un producto
  static async invalidarProducto(productoId: string): Promise<void> {
    await Promise.all([
      redis.del(`producto:${productoId}`),
      this.delPattern(`productos:lista:*`),  // todas las listas cacheadas
    ]);
  }
}

// Uso en servicio Moleculer:
// const producto = await CacheService.get(
//   `producto:${id}`,
//   () => db.getProducto(id),
//   300  // 5 min
// );
⚠️
Nunca uses KEYS * en produccionKEYS bloquea Redis hasta completar el scan de toda la BD — puede congelar todos los clientes durante segundos. Siempre usa SCAN con cursor para iterar incrementalmente sin bloquear.

// 08

Pub/Sub

Pub/Sub desacopla publicadores de suscriptores. Un publisher emite a un canal, todos los subscribers lo reciben en tiempo real. En el ERP: cambios de precio, alertas de stock, eventos entre servicios.

pubsub.ts — publisher y subscriber en ioredisPUBSUB
import Redis from 'ioredis';

// IMPORTANTE: ioredis necesita conexiones separadas para pub y sub
// Un cliente en modo SUBSCRIBE no puede ejecutar otros comandos
export const publisher  = new Redis(redisConfig);
export const subscriber = new Redis(redisConfig);

// ── PUBLISHER ────────────────────────────────────────
interface PrecioActualizadoEvent {
  productoId: string; precioAnterior: number; precioNuevo: number;
  actualizadoPor: string; ts: string;
}

export async function publishPrecioActualizado(e: PrecioActualizadoEvent) {
  await publisher.publish(
    'eventos:precio_actualizado',
    JSON.stringify(e)
  );
}

// ── SUBSCRIBER ───────────────────────────────────────
export async function iniciarSubscribers(): Promise<void> {
  // Suscribirse a canales especificos
  await subscriber.subscribe(
    'eventos:precio_actualizado',
    'eventos:stock_bajo',
    'eventos:venta_registrada'
  );

  // Suscribirse con patron (recibe de todos los canales que coincidan)
  await subscriber.psubscribe('eventos:*');

  subscriber.on('message', (channel, message) => {
    const evento = JSON.parse(message);
    console.log(`[${channel}]`, evento);

    switch (channel) {
      case 'eventos:precio_actualizado':
        handlePrecioActualizado(evento);
        break;
      case 'eventos:stock_bajo':
        handleStockBajo(evento);
        break;
    }
  });

  subscriber.on('pmessage', (pattern, channel, message) => {
    // Recibe de psubscribe
    console.log(`[pattern:${pattern}] canal:${channel}`);
  });
}

async function handlePrecioActualizado(e: PrecioActualizadoEvent) {
  // Invalidar cache del producto
  await CacheService.invalidarProducto(e.productoId);
  // Notificar a los POS activos via WebSocket (ejemplo)
  await notificarPOS(e.productoId, e.precioNuevo);
}

// ── Keyspace notifications (eventos internos de Redis) ─
// redis.conf: notify-keyspace-events "Ex"  (expired events)
// subscriber.psubscribe('__keyevent@0__:expired')
// subscriber.on('pmessage', (_, channel, expiredKey) => {
//   console.log('Key expirada:', expiredKey)
// })
📡
Pub/Sub vs Streams vs Moleculer eventsPub/Sub: fire-and-forget, no persiste, si el subscriber esta caido pierde el mensaje. Streams: persiste, permite consumer groups, ideal para colas durables. En el ERP, Moleculer emit() usa Pub/Sub via NATS/Redis para eventos internos, y BullMQ Streams para jobs que deben persistir.

// 09

Redis Streams

Streams son un log de mensajes con ID de tiempo. Consumer groups permiten que multiples workers consuman el mismo stream sin duplicar mensajes — con ACK para garantizar procesamiento. BullMQ usa Streams internamente.

streams.redis + streams.tsSTREAM
# ── XADD — agregar mensajes al stream ────────────────
XADD stream:ventas  *          # * = ID autoincremental (timestamp-seq)
  ventaId    "uuid-venta"
  sucursalId "suc-01"
  total      "567.50"
  metodo     "efectivo"

# Limitar tamano del stream
XADD stream:ventas MAXLEN ~ 10000 *  nombre "val"  # max ~10000 entradas

# ── XREAD — leer sin consumer group ──────────────────
XREAD COUNT 10 STREAMS stream:ventas 0   # desde el inicio
XREAD COUNT 10 BLOCK 5000 STREAMS stream:ventas $  # solo nuevos, block 5s

# ── Consumer Groups — multiples workers ──────────────
XGROUP CREATE stream:ventas  workers  $  MKSTREAM

# Leer como consumer de un grupo (cada worker toma mensajes distintos)
XREADGROUP GROUP workers  worker-1  COUNT 5 STREAMS stream:ventas  >

# ACK — confirmar procesamiento
XACK stream:ventas workers "1705312800000-0"

# Ver mensajes pendientes (no confirmados)
XPENDING stream:ventas workers - + 10
stream-consumer.ts — consumer group en Node.jsNODE
import { redis } from './client.js';

const STREAM  = 'stream:ventas';
const GROUP   = 'workers';
const WORKER  = `worker-${process.pid}`;

export async function startConsumer(): Promise<void> {
  // Crear grupo si no existe
  try {
    await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
  } catch(e: any) {
    if (!e.message.includes('BUSYGROUP')) throw e;  // ya existe, ok
  }

  console.log(`Consumer ${WORKER} iniciado`);

  while (true) {
    // Leer hasta 10 mensajes, bloquear 2s si no hay
    const results = await redis.xreadgroup(
      'GROUP', GROUP, WORKER,
      'COUNT', 10,
      'BLOCK', 2000,
      'STREAMS', STREAM, '>'
    ) as any;

    if (!results) continue;  // timeout — intentar de nuevo

    const [[, messages]] = results;
    for (const [id, fields] of messages) {
      // Convertir array plano a objeto
      const data = Object.fromEntries(
        fields.reduce((acc: any[], v: string, i: number) =>
          i % 2 === 0 ? [...acc, [v]] : (acc[acc.length-1].push(v), acc), [])
      );

      try {
        await procesarVenta(data);
        await redis.xack(STREAM, GROUP, id);  // confirmar procesamiento
      } catch(err) {
        console.error(`Error procesando ${id}:`, err);
        // No hacer XACK — quedara en pending para reprocesar
      }
    }
  }
}

// 10

Scripts Lua

Los scripts Lua se ejecutan en Redis de forma atomica — ninguna otra operacion puede interrumpirlos. Ideal para operaciones que requieren leer y escribir en una sola transaccion sin race conditions.

scripts/lua-scripts.ts — scripts atomicos del ERPLUA
import { redis } from '../redis/client.js';

// ── Script 1: Distributed Lock con verificacion de owner ─
// Solo libera el lock si el valor coincide (el que lo adquirio)
const UNLOCK_SCRIPT = `
  if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
  else
    return 0
  end
`;

export async function acquireLock(
  resource: string, ownerId: string, ttlSec = 30
): Promise<boolean> {
  const ok = await redis.set(`lock:${resource}`, ownerId, 'EX', ttlSec, 'NX');
  return ok === 'OK';
}

export async function releaseLock(resource: string, ownerId: string): Promise<boolean> {
  const result = await redis.eval(UNLOCK_SCRIPT, 1, `lock:${resource}`, ownerId);
  return result === 1;
}

// ── Script 2: Descontar stock con verificacion atomica ────
// Sin Lua: GET stock, verificar, UPDATE — entre GET y UPDATE otro worker puede leer igual
// Con Lua: todo atomico, imposible race condition
const DECR_STOCK_SCRIPT = `
  local stock = tonumber(redis.call("hget", KEYS[1], "stock"))
  if stock == nil then
    return redis.error_reply("NOKEY: producto no encontrado en cache")
  end
  local cantidad = tonumber(ARGV[1])
  if stock < cantidad then
    return redis.error_reply("NOSTOCK: stock=" .. stock .. " pedido=" .. cantidad)
  end
  redis.call("hincrbyfloat", KEYS[1], "stock", -cantidad)
  return stock - cantidad
`;

export async function descontarStockCache(
  productoId: string, cantidad: number
): Promise<number> {
  try {
    const nuevoStock = await redis.eval(
      DECR_STOCK_SCRIPT, 1,
      `producto:${productoId}`,
      cantidad.toString()
    );
    return nuevoStock as number;
  } catch (err: any) {
    if (err.message.startsWith('NOSTOCK')) throw new Error('Stock insuficiente');
    if (err.message.startsWith('NOKEY'))   throw new Error('Producto no en cache');
    throw err;
  }
}

// ── Script 3: Contador con techo (rate limit atomico) ────
const RATE_LIMIT_SCRIPT = `
  local current = tonumber(redis.call("get", KEYS[1])) or 0
  if current >= tonumber(ARGV[1]) then
    return 0  -- limite alcanzado
  end
  redis.call("incr", KEYS[1])
  redis.call("expire", KEYS[1], ARGV[2])
  return 1  -- permitido
`;

export async function checkLimit(
  key: string, max: number, windowSec: number
): Promise<boolean> {
  const result = await redis.eval(
    RATE_LIMIT_SCRIPT, 1, key,
    max.toString(), windowSec.toString()
  );
  return result === 1;
}

// ── EVALSHA — cachear script en Redis por su SHA1 ────────
// En lugar de enviar el script completo cada vez, Redis lo guarda por hash
const sha = await redis.script('LOAD', UNLOCK_SCRIPT);   // sha1 del script
await redis.evalsha(sha, 1, 'lock:recurso', 'owner-1');  // mas eficiente
💡
Cuando usar Lua vs Pipeline vs transaccionesPipeline: enviar multiples comandos en un solo round-trip, sin atomicidad. MULTI/EXEC: transaccion atomica pero sin logica condicional. Lua: atomicidad + logica condicional (if/else, bucles). Para el patron check-then-act (verificar stock y descontar) siempre Lua — es la unica forma segura.

// 11

Persistencia y Backups

Redis es in-memory pero puede persistir a disco. RDB hace snapshots periodicos. AOF loggea cada escritura. En produccion se usa RDB + AOF juntos. Sin persistencia, un reinicio pierde todos los datos.

redis.conf — configuracion de persistenciaCONFIG
## ── RDB — snapshots periodicos ──────────────────────
## save <segundos> <cambios_minimos>
save 900 1      # snapshot si 1 cambio en 15 min
save 300 10     # snapshot si 10 cambios en 5 min
save 60  10000  # snapshot si 10000 cambios en 1 min

rdbcompression yes
rdbchecksum    yes
dbfilename     dump.rdb
dir            /var/lib/redis/

## ── AOF — append only file ──────────────────────────
appendonly yes
appendfilename "appendonly.aof"

## fsync: everysec = balance rendimiento/durabilidad (recomendado)
## always = mas seguro pero lento | no = mas rapido, puede perder datos
appendfsync everysec

## AOF rewrite — compactar el log cuando crece mucho
auto-aof-rewrite-percentage 100   # rewrite al doble del tamano base
auto-aof-rewrite-min-size   64mb  # minimo para triggerear rewrite

## ── Limite de memoria ────────────────────────────────
maxmemory 512mb
## allkeys-lru: cuando se llena, evictar la LRU de TODAS las keys
## volatile-lru: solo evictar keys con TTL
maxmemory-policy allkeys-lru   # recomendado para cache puro
## volatile-lru es mejor si mezclas datos con y sin TTL
backup-redis.sh — backup manual y restauracionOPS
#!/bin/bash
FECHA=$(date +%Y%m%d_%H%M%S)
REDIS_DIR="/var/lib/redis"

# ── Backup: forzar snapshot RDB ───────────────────────
redis-cli BGSAVE                          # snapshot en background (no bloquea)
# O para snapshot sincrono (bloquea brevemente):
redis-cli DEBUG SLEEP 0                   # asegurar que AOF esta al dia
redis-cli BGSAVE

# Esperar a que termine BGSAVE
while [ $(redis-cli LASTSAVE) == $LAST_SAVE ]; do
  sleep 1
done

# Copiar dump.rdb
cp "$REDIS_DIR/dump.rdb" "/backup/redis_${FECHA}.rdb"
aws s3 cp "/backup/redis_${FECHA}.rdb" "s3://mi-bucket/redis/"

# ── Restauracion ─────────────────────────────────────
# 1. Detener Redis
sudo systemctl stop redis

# 2. Reemplazar dump.rdb
cp "/backup/redis_20240115.rdb" "$REDIS_DIR/dump.rdb"
chown redis:redis "$REDIS_DIR/dump.rdb"

# 3. Reiniciar (Redis carga el RDB al arrancar)
sudo systemctl start redis

# ── Info util desde redis-cli ─────────────────────────
redis-cli INFO persistence   # estado de RDB y AOF
redis-cli INFO memory        # uso de RAM
redis-cli LASTSAVE           # timestamp del ultimo snapshot exitoso
redis-cli DEBUG JMAP         # distribucion de memoria por tipo
ModoDurabilidadRendimientoRecomendado para
Sin persistencia0% (todo se pierde)MaximoCache puro, datos efimeros
Solo RDBHasta ultimo snapshotMuy altoDatos que pueden reconstruirse
Solo AOFHasta ultima escrituraMedioDatos importantes sin RDB
RDB + AOFMuy altaMedio-altoProduccion con sesiones y estado

// 12

Cluster y Alta Disponibilidad

Redis Sentinel: monitorea un primario y N replicas, hace failover automatico si el primario cae. Redis Cluster: sharding automatico en multiples nodos. Para el ERP en AWS usamos ElastiCache con replicacion.

redis-ha.ts — conexion a Sentinel y Cluster con ioredisHA
import Redis from 'ioredis';

// ── Sentinel — failover automatico ───────────────────
// Sentinel monitorea: 1 primario + 2 replicas
// Si el primario cae, Sentinel elige una replica como nuevo primario
export const redisSentinel = new Redis({
  sentinels: [
    { host: 'sentinel-1', port: 26379 },
    { host: 'sentinel-2', port: 26379 },
    { host: 'sentinel-3', port: 26379 },
  ],
  name: 'mymaster',      // nombre del grupo en sentinel.conf
  sentinelPassword: process.env.SENTINEL_PASS,
  password:         process.env.REDIS_PASS,
  role: 'master',        // 'master' para escrituras, 'slave' para lecturas
});

// ── Redis Cluster — sharding en 6 nodos (3 primarios + 3 replicas) ─
import { Cluster } from 'ioredis';

export const redisCluster = new Cluster([
  { host: 'redis-1', port: 7001 },
  { host: 'redis-2', port: 7002 },
  { host: 'redis-3', port: 7003 },
], {
  redisOptions: { password: process.env.REDIS_PASS },
  enableReadyCheck: true,
  scaleReads: 'slave',    // lecturas a replicas, escrituras al primario
  clusterRetryStrategy: (times) => Math.min(times * 300, 5000),
});

// ── AWS ElastiCache (Redis managed) ──────────────────
// Para produccion en AWS, ElastiCache simplifica la operacion
// Cluster mode disabled + replica = Sentinel gestionado por AWS
export const elasticache = new Redis({
  host:     process.env.ELASTICACHE_ENDPOINT,  // xxx.cache.amazonaws.com
  port:     6379,
  tls:      {},                // ElastiCache requiere TLS
  password: process.env.ELASTICACHE_TOKEN,
  keyPrefix: 'erp:',
});

// ── Limitacion de Cluster: keys deben estar en el mismo slot ─
// Las hash tags {} fuerzan que las keys vayan al mismo slot
// CORRECTO — todas las keys de una venta en el mismo nodo:
// `{venta:uuid-001}:datos`
// `{venta:uuid-001}:items`
// `{venta:uuid-001}:lock`
// INCORRECTO para pipelines multi-key en cluster:
// `venta:uuid-001` y `producto:uuid-002` pueden estar en nodos distintos

// 13

Redis en el ERP Real

En el ERP para carnicerías, Redis cumple tres roles: cache de catálogo y sesiones, pub/sub para sincronizar cambios entre sucursales, y backend de BullMQ para la cola de jobs asincrónos.

Cache catálogo Sesiones cajero Eventos entre servicios BullMQ backend Rate limiting API Distributed locks Contadores en tiempo real
src/services/pos.service.ts — servicio POS completo con RedisREAL
import { Service, Context }  from 'moleculer';
import { redis, publisher }   from '../redis/client.js';
import { CacheService }       from '../cache/cache.service.js';
import { acquireLock, releaseLock } from '../scripts/lua-scripts.js';
import { getSesion }          from '../sessions/session.service.js';
import { withTransaction }    from '../db/transaction.js';

export default class PosService extends Service {
  name = 'pos';

  actions = {

    // ── 1. Buscar producto — cache-aside ─────────────────
    'buscarProducto': {
      params: { termino: 'string' },
      async handler(ctx: Context<{ termino: string }>) {
        const cacheKey = `busqueda:${ctx.params.termino.toLowerCase()}`;
        return CacheService.get(
          cacheKey,
          () => ctx.call('productos.buscar', { termino: ctx.params.termino }),
          60   // 1 minuto — busquedas cambian poco
        );
      }
    },

    // ── 2. Verificar sesion — Hash en Redis ──────────────
    'verificarSesion': {
      params: { token: string },
      async handler(ctx: Context<{ token: string }>) {
        const sesion = await getSesion(ctx.params.token);
        if (!sesion) throw new Error('Sesion invalida o expirada');
        return sesion;
      }
    },

    // ── 3. Registrar venta — lock + transaccion PG + pub/sub ─
    'registrarVenta': {
      async handler(ctx: Context<RegistrarVentaDTO>) {
        const { sucursalId, items } = ctx.params;
        const lockId = randomUUID();

        // Lock para evitar doble registro del mismo folio
        const locked = await acquireLock(`venta:${sucursalId}`, lockId, 30);
        if (!locked) throw new Error('Operacion en progreso, reintenta');

        try {
          // Transaccion en PostgreSQL
          const venta = await withTransaction(async (client) => {
            /* ... INSERT ventas, items, UPDATE inventario ... */
          });

          // Actualizar contadores en Redis (sin bloquear respuesta)
          const hoy = new Date().toISOString().slice(0,10);
          const pipe = redis.pipeline();
          pipe.incr(`stats:${sucursalId}:${hoy}:ventas`);
          pipe.incrbyfloat(`stats:${sucursalId}:${hoy}:total`, venta.total);
          pipe.exec();   // sin await — fire and forget

          // Publicar evento para otros servicios (CFDI, inventario, reportes)
          await publisher.publish(
            'eventos:venta_registrada',
            JSON.stringify({ ventaId: venta.id, sucursalId, total: venta.total })
          );

          return venta;
        } finally {
          await releaseLock(`venta:${sucursalId}`, lockId);
        }
      }
    },

    // ── 4. Dashboard en tiempo real — leer contadores ────
    'dashboardTiempoReal': {
      params: { sucursalId: string },
      async handler(ctx: Context<{ sucursalId: string }>) {
        const { sucursalId } = ctx.params;
        const hoy = new Date().toISOString().slice(0,10);
        const prefix = `stats:${sucursalId}:${hoy}`;

        const [numVentas, totalVentas, rankingRaw] = await Promise.all([
          redis.get(`${prefix}:ventas`),
          redis.get(`${prefix}:total`),
          redis.zrange(`ranking:ventas:${hoy}`, 0, 10, 'WITHSCORES', 'REV'),
        ]);

        return {
          numVentas:   parseInt(numVentas ?? '0'),
          totalVentas: parseFloat(totalVentas ?? '0'),
          ranking:     parseRanking(rankingRaw),
          fuente:      'redis',   // sub-milisegundo
        };
      }
    }
  };
}
Demo — simulacion completa: Cache + Pub/Sub + Contadores + Lock▶ LIVE
// Simular Redis en memoria const store = new Map(); const pubsub = { listeners: {} }; const redis = { get: (k) => Promise.resolve(store.get(k) ?? null), set: (k, v, ...opts) => { store.set(k, v); return Promise.resolve('OK'); }, del: (k) => { store.delete(k); return Promise.resolve(1); }, incr: (k) => { const v = parseInt(store.get(k) ?? '0') + 1; store.set(k, String(v)); return Promise.resolve(v); }, incrbyfloat: (k, n) => { const v = parseFloat(store.get(k) ?? '0') + n; store.set(k, v.toFixed(2)); return Promise.resolve(v); }, publish: (ch, msg) => { (pubsub.listeners[ch] || []).forEach(fn => fn(msg)); return Promise.resolve(1); }, subscribe: (ch, fn) => { pubsub.listeners[ch] = pubsub.listeners[ch] || []; pubsub.listeners[ch].push(fn); } }; // Suscribir a eventos de venta redis.subscribe('eventos:venta_registrada', (msg) => { const e = JSON.parse(msg); console.log('📡 PUB/SUB recibido:', `venta ${e.ventaId} — $${e.total}`); }); // Simular registro de venta async function registrarVenta(dto) { const hoy = new Date().toISOString().slice(0,10); const lockKey = `lock:venta:${dto.sucursalId}`; // 1. Distributed lock const locked = !(await redis.get(lockKey)); if (!locked) throw new Error('Lock ocupado'); await redis.set(lockKey, 'worker-1', 'EX', 30); console.log('🔒 Lock adquirido'); try { // 2. Simular INSERT en PostgreSQL const venta = { id: 'vta-' + Date.now(), ...dto }; console.log('🐘 PostgreSQL: INSERT venta', venta.id); // 3. Actualizar contadores en Redis const [numVentas, total] = await Promise.all([ redis.incr(`stats:${dto.sucursalId}:${hoy}:ventas`), redis.incrbyfloat(`stats:${dto.sucursalId}:${hoy}:total`, dto.total), ]); console.log(`📊 Contadores → ${numVentas} ventas, total acumulado: $${total}`); // 4. Pub/Sub: notificar otros servicios await redis.publish('eventos:venta_registrada', JSON.stringify({ ventaId: venta.id, sucursalId: dto.sucursalId, total: dto.total })); return venta; } finally { await redis.del(lockKey); console.log('🔓 Lock liberado'); } } // Ejecutar 2 ventas await registrarVenta({ sucursalId: 'suc-01', total: 567.50 }); console.log('---'); await registrarVenta({ sucursalId: 'suc-01', total: 329.00 }); // Dashboard tiempo real const hoy = new Date().toISOString().slice(0,10); const num = await redis.get(`stats:suc-01:${hoy}:ventas`); const tot = await redis.get(`stats:suc-01:${hoy}:total`); console.log(` 📈 Dashboard: ${num} ventas hoy — Total: $${tot}`);
// outputHaz clic en Ejecutar...
Redis completa el stack del ERPPostgreSQL guarda el estado permanente. Redis guarda el estado caliente: sesiones de cajeros, precios cacheados, contadores del dashboard, locks distribuidos entre sucursales. BullMQ usa Redis como backend para las colas de CFDI, reportes y sincronizacion. Moleculer usa Redis como transporte pub/sub entre microservicios.