// 01

Roles y Permisos

PostgreSQL usa un modelo de roles — no hay distincion entre usuarios y grupos. Un rol puede tener contrasena (usuario) o no (grupo). Todo acceso a objetos pasa por GRANT/REVOKE.

admin/roles.sqlSQL
-- ── Crear roles para el ERP ─────────────────────────
CREATE ROLE erp_readonly;
CREATE ROLE erp_app;
CREATE ROLE erp_admin;

-- Crear usuarios (roles con LOGIN y contrasena)
CREATE USER erp_api_user
  WITH PASSWORD 'p4ssword-seguro'
  CONNECTION LIMIT 50
  IN ROLE erp_app;           -- hereda permisos de erp_app

CREATE USER erp_readonly_user
  WITH PASSWORD 'readonly-pass'
  IN ROLE erp_readonly;

-- ── Permisos sobre schema ────────────────────────────
GRANT USAGE ON SCHEMA public TO erp_app;
GRANT USAGE ON SCHEMA public TO erp_readonly;

-- Permisos sobre tablas existentes
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO erp_app;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO erp_readonly;

-- Permisos sobre secuencias (para SERIAL/BIGSERIAL)
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO erp_app;

-- DEFAULT PRIVILEGES — aplica a tablas FUTURAS tambien
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO erp_app;

ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT ON TABLES TO erp_readonly;

ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT USAGE, SELECT ON SEQUENCES TO erp_app;

-- ── Row Level Security (RLS) ─────────────────────────
-- Cada sucursal solo ve sus propias ventas
ALTER TABLE ventas ENABLE ROW LEVEL SECURITY;

CREATE POLICY ventas_sucursal_policy ON ventas
  USING (sucursal_id = current_setting('app.sucursal_id')::uuid);

-- En Node.js, antes de cada query de sucursal:
-- await client.query("SET app.sucursal_id = $1", [sucursalId])

-- erp_admin bypasea RLS
ALTER TABLE ventas FORCE ROW LEVEL SECURITY;
GRANT BYPASSRLS TO erp_admin;

-- ── Ver roles y permisos ─────────────────────────────
SELECT rolname, rolsuper, rolcreatedb, rolcanlogin, rolconnlimit
FROM   pg_roles
WHERE  rolname LIKE 'erp%';
💡
Principio de minimo privilegioLa aplicacion Node.js (erp_api_user) solo necesita SELECT/INSERT/UPDATE/DELETE — nunca DROP TABLE ni CREATE. El schema lo gestiona un usuario de migracion separado. Esto limita el daño si las credenciales se comprometen.

// 02

Schemas y Namespaces

Un schema es un namespace dentro de una base de datos. Permite organizar tablas por dominio, separar datos de la app de datos internos, y dar permisos granulares por area.

admin/schemas.sqlSQL
-- Schemas para el ERP carnicerías
CREATE SCHEMA ventas;       -- tablas de punto de venta
CREATE SCHEMA inventario;   -- stock, movimientos, almacen
CREATE SCHEMA rrhh;         -- empleados, turnos, nomina
CREATE SCHEMA contabilidad; -- cfdi, cuentas, cierres
CREATE SCHEMA _audit;       -- logs de cambios (prefijo _ = interno)

-- Tablas con schema explicito
CREATE TABLE ventas.ventas ( ... );
CREATE TABLE inventario.movimientos ( ... );
CREATE TABLE _audit.log_cambios ( ... );

-- search_path — schemas por defecto para queries sin prefijo
SET search_path TO ventas, inventario, public;

-- Para un usuario especifico (persistente)
ALTER USER erp_api_user SET search_path TO ventas, inventario, public;

-- Permisos por schema
GRANT USAGE ON SCHEMA ventas      TO erp_app;
GRANT USAGE ON SCHEMA inventario  TO erp_app;
GRANT USAGE ON SCHEMA rrhh        TO erp_admin;   -- solo admin
GRANT USAGE ON SCHEMA contabilidad TO erp_admin;

-- Ver todos los schemas
SELECT schema_name, schema_owner
FROM   information_schema.schemata
WHERE  schema_name NOT IN ('pg_catalog', 'information_schema');

-- Ver tablas de un schema especifico
SELECT table_name
FROM   information_schema.tables
WHERE  table_schema = 'ventas'
ORDER BY table_name;

// 03

Configuración del Servidor

postgresql.conf controla el comportamiento del servidor. Los parametros mas importantes para produccion son memoria, conexiones maximas y logging de queries lentas.

postgresql.conf — parametros clave para produccionCONFIG
# ── Conexiones ───────────────────────────────────────
max_connections = 100          # ajustar segun RAM y uso
                                # Node.js Pool usa ~10-20 conexiones

# ── Memoria ─────────────────────────────────────────
shared_buffers = 256MB         # 25% de la RAM disponible
effective_cache_size = 1GB     # 75% de la RAM (ayuda al planner)
work_mem = 16MB                # por operacion de sort/hash
maintenance_work_mem = 128MB   # para VACUUM, CREATE INDEX

# ── WAL y durabilidad ────────────────────────────────
wal_level = replica            # necesario para replicacion
synchronous_commit = on        # off = mas rapido, menos seguro
checkpoint_completion_target = 0.9

# ── Logging de queries lentas ────────────────────────
log_min_duration_statement = 1000  # loggear queries > 1 segundo
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_connections = on
log_disconnections = on

# ── Autovacuum ──────────────────────────────────────
autovacuum = on                # siempre on en produccion
autovacuum_max_workers = 3
autovacuum_vacuum_scale_factor = 0.05  # vacuum al 5% de cambios

# ── Ver configuracion actual ─────────────────────────
SHOW shared_buffers;
SHOW max_connections;
SELECT name, setting, unit, context
FROM   pg_settings
WHERE  name IN ('max_connections','shared_buffers','work_mem');

// 04

Pool de Conexiones

Una conexion a PostgreSQL cuesta ~5MB de RAM y ~50ms para establecer. El Pool reutiliza conexiones existentes — la app arranca 10 conexiones y las comparte entre miles de requests concurrentes.

src/db/pool.ts — configuracion del poolNODE
import { Pool, PoolConfig } from 'pg';
import { config } from '../config/index.js';

const poolConfig: PoolConfig = {
  host:     config.db.host,
  port:     config.db.port,
  database: config.db.database,
  user:     config.db.user,
  password: config.db.password,

  // ── Tamano del pool ──────────────────────────────
  max:     20,      // conexiones maximas simultaneas
  min:      2,      // conexiones minimas en idle
  idleTimeoutMillis:    30000,  // cerrar conexion idle tras 30s
  connectionTimeoutMillis: 5000,  // error si no hay conexion en 5s

  // ── SSL en produccion ────────────────────────────
  ssl: config.isProduction ? { rejectUnauthorized: true } : false,

  // ── Application name (visible en pg_stat_activity) ──
  application_name: 'erp-carniceria-api',
};

export const pool = new Pool(poolConfig);

// Loggear errores del pool (conexiones muertas, timeouts)
pool.on('error', (err, client) => {
  console.error('Pool error inesperado en cliente idle', err);
});

pool.on('connect', (client) => {
  // Configurar search_path en cada nueva conexion
  client.query('SET search_path TO ventas, inventario, public');
});

// ── Helper tipado para queries ────────────────────
export async function query<T = Record<string, unknown>>(
  text: string,
  params?: unknown[]
): Promise<T[]> {
  const start = Date.now();
  const { rows } = await pool.query<T>(text, params);
  const duration = Date.now() - start;
  if (duration > 1000) {
    console.warn(`Query lenta (${duration}ms): ${text.slice(0, 80)}`);
  }
  return rows;
}

// Query que devuelve una sola fila o null
export async function queryOne<T>(
  text: string,
  params?: unknown[]
): Promise<T | null> {
  const rows = await query<T>(text, params);
  return rows[0] ?? null;
}

// Graceful shutdown — cerrar pool al apagar la app
export async function closePool(): Promise<void> {
  await pool.end();
  console.log('Pool cerrado correctamente');
}

process.on('SIGTERM', closePool);
process.on('SIGINT',  closePool);
ParametroValor tipico ERPRazon
max10–20max_connections PG = 100, deja margen para otras apps
min2Mantiene conexiones listas para requests inmediatos
idleTimeoutMillis30 000Cierra conexiones que llevan 30s sin usar
connectionTimeoutMillis5 000Falla rapido si el pool esta lleno — mejor que colgar

// 05

Queries Parametrizadas

Los parametros ($1, $2...) son la unica defensa contra SQL injection. El driver pg envía la query y los valores por separado — PostgreSQL nunca interpreta los valores como SQL.

src/services/productos.service.tsNODE
import { query, queryOne } from '../db/pool.js';

// ── Interfaz del dominio ─────────────────────────────
interface Producto {
  id: string; nombre: string; precio: number;
  categoria: string; unidad: string; activo: boolean;
}

// ── SELECT tipado ─────────────────────────────────────
export async function listarProductos(filtros?: {
  categoria?: string; activo?: boolean; limite?: number; offset?: number;
}): Promise<Producto[]> {
  const { categoria, activo = true, limite = 20, offset = 0 } = filtros ?? {};
  const params: unknown[] = [activo, limite, offset];
  let sql = `
    SELECT id, nombre, precio, categoria, unidad, activo
    FROM   productos
    WHERE  activo = $1`;

  if (categoria) { params.splice(1, 0, categoria); sql += ` AND categoria = $2`; }

  sql += ` ORDER BY nombre LIMIT $${params.length-1} OFFSET $${params.length}`;
  return query<Producto>(sql, params);
}

// ── INSERT con RETURNING tipado ───────────────────────
interface CrearProductoDTO {
  nombre: string; codigo: string; precio: number;
  categoria: string; unidad: string;
}

export async function crearProducto(dto: CrearProductoDTO): Promise<Producto> {
  const row = await queryOne<Producto>(
    `INSERT INTO productos (nombre, codigo, precio, categoria, unidad)
     VALUES ($1, $2, $3, $4, $5)
     RETURNING id, nombre, precio, categoria, unidad, activo`,
    [dto.nombre, dto.codigo, dto.precio, dto.categoria, dto.unidad]
  );
  if (!row) throw new Error('INSERT no retorno datos');
  return row;
}

// ── UPSERT — on conflict ──────────────────────────────
export async function upsertProducto(dto: CrearProductoDTO): Promise<Producto> {
  const row = await queryOne<Producto>(
    `INSERT INTO productos (nombre, codigo, precio, categoria, unidad)
     VALUES ($1, $2, $3, $4, $5)
     ON CONFLICT (codigo) DO UPDATE SET
       precio         = EXCLUDED.precio,
       actualizado_en = now()
     RETURNING *`,
    [dto.nombre, dto.codigo, dto.precio, dto.categoria, dto.unidad]
  );
  if (!row) throw new Error('Upsert fallo');
  return row;
}

// ── Busqueda full-text ────────────────────────────────
export async function buscarProductos(termino: string): Promise<Producto[]> {
  return query<Producto>(
    `SELECT id, nombre, precio, categoria, unidad,
            ts_rank(to_tsvector('spanish', unaccent(nombre)),
                    plainto_tsquery('spanish', unaccent($1))) AS rank
     FROM   productos
     WHERE  to_tsvector('spanish', unaccent(nombre))
            @@ plainto_tsquery('spanish', unaccent($1))
       AND  activo = true
     ORDER BY rank DESC
     LIMIT 20`,
    [termino]
  );
}
🚨
Nunca concatenes strings en queries SQLquery("SELECT * FROM ventas WHERE id = '" + id + "'") es SQL injection. Un atacante puede mandar id = "' OR '1'='1" y leer toda la tabla. Siempre usa parametros: query("... WHERE id = $1", [id]).

// 06

Migraciones con Node.js

Las migraciones son archivos SQL con numeros de version que se aplican en orden. Cada deploy aplica solo las nuevas. Nunca edites una migracion ya aplicada — crea una nueva.

src/db/migrations/migrate.ts — runner de migracionesNODE
import fs   from 'node:fs/promises';
import path from 'node:path';
import { pool } from '../pool.js';

// Tabla que registra que migraciones se han aplicado
async function ensureMigrationsTable(): Promise<void> {
  await pool.query(`
    CREATE TABLE IF NOT EXISTS _migrations (
      id         SERIAL      PRIMARY KEY,
      filename   TEXT        UNIQUE NOT NULL,
      applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
    )
  `);
}

export async function runMigrations(): Promise<void> {
  await ensureMigrationsTable();

  // Leer todos los .sql de la carpeta migrations/sql/
  const migsDir = new URL('./sql', import.meta.url).pathname;
  const files = (await fs.readdir(migsDir))
    .filter(f => f.endsWith('.sql'))
    .sort();

  // Ver cuales ya estan aplicadas
  const { rows: applied } = await pool.query<{ filename: string }>(
    'SELECT filename FROM _migrations'
  );
  const appliedSet = new Set(applied.map(r => r.filename));

  // Aplicar solo las nuevas, en una transaccion cada una
  for (const file of files) {
    if (appliedSet.has(file)) continue;

    const sql = await fs.readFile(path.join(migsDir, file), 'utf-8');
    const client = await pool.connect();
    try {
      await client.query('BEGIN');
      await client.query(sql);
      await client.query(
        'INSERT INTO _migrations (filename) VALUES ($1)', [file]
      );
      await client.query('COMMIT');
      console.log(`✓ Migración aplicada: ${file}`);
    } catch (err) {
      await client.query('ROLLBACK');
      console.error(`✗ Error en migración ${file}:`, err);
      throw err;   // detener — no continuar con migraciones siguientes
    } finally {
      client.release();
    }
  }
  console.log('Migraciones completadas');
}
migrations/sql/ — archivos de migracionSQL
-- 001_initial_schema.sql
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "unaccent";
CREATE TABLE sucursales ( ... );
CREATE TABLE productos  ( ... );

-- 002_add_ventas.sql
CREATE TABLE ventas      ( ... );
CREATE TABLE venta_items( ... );
CREATE INDEX idx_ventas_sucursal ON ventas(sucursal_id, creado_en DESC);

-- 003_add_inventario.sql
CREATE TABLE inventario ( ... );
CREATE INDEX idx_inv_stock_bajo ON inventario(sucursal_id, producto_id)
  WHERE stock <= stock_minimo;

-- 004_add_clientes_fts.sql  ← migration nueva, no toca las anteriores
ALTER TABLE clientes ADD COLUMN IF NOT EXISTS
  nombre_search tsvector
  GENERATED ALWAYS AS (to_tsvector('spanish', unaccent(nombre))) STORED;
CREATE INDEX idx_clientes_fts ON clientes USING GIN(nombre_search);
⚠️
Regla de oro: las migraciones son inmutablesUna vez aplicada una migracion en produccion, nunca la edites. Si necesitas cambiar algo, crea 005_fix_xxx.sql. Esto garantiza que dev, staging y prod tienen exactamente el mismo historial.

// 07

Transacciones en Node.js

En Node.js, las transacciones requieren usar el mismo client para todas las queries — no el pool. El pool puede dar clients diferentes a cada llamada. El patron correcto: connect(), BEGIN, operaciones, COMMIT/ROLLBACK, release().

src/db/transaction.ts — helper genericoNODE
import { pool } from './pool.js';
import type { PoolClient } from 'pg';

// Helper que maneja BEGIN/COMMIT/ROLLBACK automaticamente
export async function withTransaction<T>(
  fn: (client: PoolClient) => Promise<T>
): Promise<T> {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    const result = await fn(client);
    await client.query('COMMIT');
    return result;
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();   // SIEMPRE devolver al pool
  }
}

// ── Uso: registrar venta con items e inventario ───────
export async function registrarVenta(dto: CrearVentaDTO): Promise<Venta> {
  return withTransaction(async (client) => {
    // 1. Verificar stock disponible
    for (const item of dto.items) {
      const { rows } = await client.query(
        `SELECT stock FROM inventario
         WHERE  sucursal_id = $1 AND producto_id = $2
         FOR UPDATE`,          // bloquear fila hasta COMMIT
        [dto.sucursalId, item.productoId]
      );
      if (!rows[0] || rows[0].stock < item.cantidad) {
        throw new Error(`Stock insuficiente: ${item.productoId}`);
      }
    }

    // 2. Crear la venta
    const { rows: [venta] } = await client.query(
      `INSERT INTO ventas (sucursal_id, folio, total, metodo_pago)
       VALUES ($1, $2, $3, $4) RETURNING *`,
      [dto.sucursalId, dto.folio, dto.total, dto.metodoPago]
    );

    // 3. Insertar items y descontar inventario
    for (const item of dto.items) {
      await client.query(
        `INSERT INTO venta_items (venta_id, producto_id, cantidad, precio)
         VALUES ($1, $2, $3, $4)`,
        [venta.id, item.productoId, item.cantidad, item.precio]
      );
      await client.query(
        `UPDATE inventario SET stock = stock - $1
         WHERE  sucursal_id = $2 AND producto_id = $3`,
        [item.cantidad, dto.sucursalId, item.productoId]
      );
    }

    return venta as Venta;
  });  // si cualquier paso falla -> ROLLBACK automatico
}
Demo — simulacion de transaccion con ROLLBACK▶ LIVE
// Simular transaccion con fallo async function withTransaction(fn) { const log = []; const client = { db: { stock: { 'prod-1': 5, 'prod-2': 2 }, ventas: [] }, query: async (sql, params) => { log.push('SQL: ' + sql.trim().split(' ')[0].slice(0,50)); return { rows: [] }; } }; log.push('BEGIN'); try { const r = await fn(client, log); log.push('COMMIT ✓'); return { ok: true, result: r, log }; } catch(e) { log.push('ROLLBACK ← ' + e.message); return { ok: false, error: e.message, log }; } } async function testVenta(stockSuficiente) { return withTransaction(async (client, log) => { const stock = stockSuficiente ? 10 : 1; log.push(' → stock disponible: ' + stock + ' kg'); if (stock < 3) throw new Error('Stock insuficiente'); log.push(' → INSERT ventas'); log.push(' → UPDATE inventario -3kg'); return { ventaId: 'vta-' + Date.now() }; }); } const r1 = await testVenta(true); console.log('Caso 1 (stock OK):', r1.ok ? 'COMMIT' : 'ROLLBACK'); r1.log.forEach(l => console.log(' ', l)); const r2 = await testVenta(false); console.log('Caso 2 (sin stock):', r2.ok ? 'COMMIT' : 'ROLLBACK'); r2.log.forEach(l => console.log(' ', l));
// outputHaz clic en Ejecutar...

// 08

Funciones y PL/pgSQL

PL/pgSQL es el lenguaje procedural de PostgreSQL. Las funciones y stored procedures se ejecutan dentro del servidor — reducen round-trips de red y encapsulan logica de negocio compleja.

funciones-plpgsql.sql — logica del ERP en el servidorPL/pgSQL
-- ── FUNCTION — retorna un valor ──────────────────────
CREATE OR REPLACE FUNCTION calcular_precio_con_iva(
  precio_base NUMERIC,
  porcentaje  NUMERIC DEFAULT 0.16
) RETURNS NUMERIC
LANGUAGE plpgsql
IMMUTABLE                -- no cambia datos, mismo input = mismo output
AS $$
BEGIN
  RETURN ROUND(precio_base * (1 + porcentaje), 2);
END;
$$;

-- Uso:
SELECT nombre, precio, calcular_precio_con_iva(precio) AS precio_iva
FROM productos;

-- ── FUNCTION que retorna TABLE ────────────────────────
CREATE OR REPLACE FUNCTION corte_de_caja(
  p_sucursal_id UUID,
  p_fecha       DATE DEFAULT CURRENT_DATE
)
RETURNS TABLE(
  metodo_pago  TEXT,
  num_ventas   BIGINT,
  monto        NUMERIC,
  pct_total    NUMERIC
)
LANGUAGE plpgsql
STABLE                   -- no modifica datos
AS $$
DECLARE
  v_total NUMERIC;
BEGIN
  -- Calcular total del dia
  SELECT COALESCE(SUM(v.total), 0)
  INTO   v_total
  FROM   ventas v
  WHERE  v.sucursal_id = p_sucursal_id
    AND  v.creado_en::date = p_fecha
    AND  v.estado = 'pagada';

  IF v_total = 0 THEN
    RAISE NOTICE 'Sin ventas para sucursal % en %', p_sucursal_id, p_fecha;
  END IF;

  -- Retornar filas por metodo de pago
  RETURN QUERY
    SELECT
      v.metodo_pago,
      COUNT(*)::BIGINT,
      SUM(v.total),
      CASE WHEN v_total > 0
        THEN ROUND(SUM(v.total) * 100.0 / v_total, 1)
        ELSE 0
      END
    FROM   ventas v
    WHERE  v.sucursal_id = p_sucursal_id
      AND  v.creado_en::date = p_fecha
      AND  v.estado = 'pagada'
    GROUP BY v.metodo_pago
    ORDER BY monto DESC;
END;
$$;

-- Llamar desde Node.js:
-- const rows = await query('SELECT * FROM corte_de_caja($1, $2)', [sucursalId, fecha])

-- ── PROCEDURE — para operaciones con transaccion ─────
CREATE OR REPLACE PROCEDURE transferir_stock(
  p_producto_id    UUID,
  p_origen_id      UUID,
  p_destino_id     UUID,
  p_cantidad       NUMERIC
)
LANGUAGE plpgsql
AS $$
DECLARE
  v_stock_origen NUMERIC;
BEGIN
  SELECT stock INTO v_stock_origen
  FROM   inventario
  WHERE  producto_id = p_producto_id
    AND  sucursal_id = p_origen_id
  FOR UPDATE;

  IF v_stock_origen < p_cantidad THEN
    RAISE EXCEPTION 'Stock insuficiente: tiene % pero pide %',
      v_stock_origen, p_cantidad;
  END IF;

  UPDATE inventario SET stock = stock - p_cantidad
  WHERE  producto_id = p_producto_id AND sucursal_id = p_origen_id;

  INSERT INTO inventario (sucursal_id, producto_id, stock)
  VALUES (p_destino_id, p_producto_id, p_cantidad)
  ON CONFLICT (sucursal_id, producto_id) DO UPDATE
    SET stock = inventario.stock + p_cantidad;
END;
$$;

-- Llamar: CALL transferir_stock($1, $2, $3, $4)

// 09

Triggers Avanzados

Un trigger es una funcion que PostgreSQL ejecuta automaticamente en respuesta a INSERT, UPDATE o DELETE. Ideal para auditoría, timestamps automaticos, validaciones complejas y eventos de dominio.

triggers.sql — triggers del ERPPL/pgSQL
-- ── 1. Trigger de timestamp automatico ───────────────
CREATE OR REPLACE FUNCTION fn_set_actualizado_en()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  NEW.actualizado_en = now();
  RETURN NEW;
END; $$;

-- Aplicar a todas las tablas que lo necesiten
CREATE TRIGGER trg_productos_ts
  BEFORE UPDATE ON productos
  FOR EACH ROW EXECUTE FUNCTION fn_set_actualizado_en();

CREATE TRIGGER trg_ventas_ts
  BEFORE UPDATE ON ventas
  FOR EACH ROW EXECUTE FUNCTION fn_set_actualizado_en();

-- ── 2. Trigger de auditoria completa ─────────────────
CREATE TABLE _audit.log (
  id          BIGSERIAL   PRIMARY KEY,
  tabla       TEXT        NOT NULL,
  operacion   TEXT        NOT NULL,  -- INSERT | UPDATE | DELETE
  row_id      TEXT,
  datos_antes JSONB,
  datos_des   JSONB,
  usuario     TEXT        DEFAULT current_user,
  app_user    TEXT,       -- usuario de la app (SET app.user_id = ...)
  ts          TIMESTAMPTZ DEFAULT now()
);

CREATE OR REPLACE FUNCTION fn_audit()
RETURNS TRIGGER LANGUAGE plpgsql
SECURITY DEFINER        -- se ejecuta con permisos del dueno
AS $$
BEGIN
  INSERT INTO _audit.log (tabla, operacion, row_id, datos_antes, datos_des, app_user)
  VALUES (
    TG_TABLE_NAME,
    TG_OP,
    CASE TG_OP WHEN 'DELETE' THEN OLD.id::TEXT ELSE NEW.id::TEXT END,
    CASE TG_OP WHEN 'INSERT' THEN NULL ELSE to_jsonb(OLD) END,
    CASE TG_OP WHEN 'DELETE' THEN NULL ELSE to_jsonb(NEW) END,
    current_setting('app.user_id', true)
  );
  RETURN COALESCE(NEW, OLD);
END; $$;

-- Aplicar auditoria a ventas
CREATE TRIGGER trg_ventas_audit
  AFTER INSERT OR UPDATE OR DELETE ON ventas
  FOR EACH ROW EXECUTE FUNCTION fn_audit();

-- En Node.js, antes de cada operacion sensible:
-- await client.query("SET app.user_id = $1", [userId])

-- ── 3. Trigger de validacion compleja ────────────────
CREATE OR REPLACE FUNCTION fn_validar_venta_item()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
  v_precio NUMERIC;
  v_activo BOOLEAN;
BEGIN
  SELECT precio, activo INTO v_precio, v_activo
  FROM   productos
  WHERE  id = NEW.producto_id;

  IF NOT v_activo THEN
    RAISE EXCEPTION 'Producto % no esta activo', NEW.producto_id;
  END IF;

  -- Precio no puede desviarse mas del 20% del precio actual
  IF ABS(NEW.precio - v_precio) / v_precio > 0.20 THEN
    RAISE WARNING 'Precio % muy diferente al catalogo %', NEW.precio, v_precio;
  END IF;

  RETURN NEW;
END; $$;

CREATE TRIGGER trg_venta_item_validar
  BEFORE INSERT ON venta_items
  FOR EACH ROW EXECUTE FUNCTION fn_validar_venta_item();

-- ── Ver todos los triggers de una tabla ──────────────
SELECT trigger_name, event_manipulation, action_timing, action_statement
FROM   information_schema.triggers
WHERE  event_object_table = 'ventas'
ORDER BY action_timing, event_manipulation;
💡
BEFORE vs AFTER triggerBEFORE: puede modificar NEW antes de que se escriba — ideal para timestamps y validaciones. AFTER: la fila ya está escrita — ideal para auditoría y efectos secundarios (emitir eventos, actualizar otras tablas). Para auditoría siempre AFTER para que datos_des refleje el valor final real.

// 10

Backups y pg_dump

Un backup que no se ha probado no es un backup. pg_dump exporta una base de datos completa o selectiva. pg_restore la reconstruye. WAL archiving permite point-in-time recovery.

scripts/backup.sh — backups automatizadosOPS
#!/bin/bash
# Variables de entorno: DB_HOST, DB_NAME, DB_USER, BACKUP_DIR, S3_BUCKET

FECHA=$(date +%Y%m%d_%H%M%S)
ARCHIVO="$BACKUP_DIR/erp_${DB_NAME}_${FECHA}.dump"

# ── pg_dump en formato custom (comprimido, restaurable selectivamente) ─
PGPASSWORD="$DB_PASS" pg_dump   --host="$DB_HOST"   --username="$DB_USER"   --dbname="$DB_NAME"   --format=custom \        # formato binario comprimido (preferido)
  --compress=9 \           # maxima compresion
  --no-owner \             # no incluir SET OWNER (facilita restaurar en otro server)
  --no-privileges   --file="$ARCHIVO"

# ── Verificar que el backup es valido ────────────────
pg_restore --list "$ARCHIVO" > /dev/null 2>&1
if [ $? -ne 0 ]; then
  echo "ERROR: backup corrupto $ARCHIVO"
  exit 1
fi

# ── Subir a S3 ───────────────────────────────────────
aws s3 cp "$ARCHIVO" "s3://$S3_BUCKET/backups/$(basename $ARCHIVO)"   --storage-class STANDARD_IA   # Infrequent Access = mas barato

# ── Limpiar backups locales > 7 dias ─────────────────
find "$BACKUP_DIR" -name "*.dump" -mtime +7 -delete

echo "Backup completado: $ARCHIVO"

## ── RESTAURAR un backup ──────────────────────────────

# Crear DB nueva (no restaurar sobre produccion activa)
createdb -h "$DB_HOST" -U "$DB_USER" erp_restore_test

# Restaurar completo
PGPASSWORD="$DB_PASS" pg_restore   --host="$DB_HOST"   --username="$DB_USER"   --dbname=erp_restore_test   --jobs=4 \               # restauracion paralela (mas rapido)
  --verbose   "$ARCHIVO"

# Restaurar solo una tabla especifica
PGPASSWORD="$DB_PASS" pg_restore   --host="$DB_HOST" --username="$DB_USER" --dbname="$DB_NAME"   --table=productos   "$ARCHIVO"

## ── pg_dumpall — backup de roles y config global ─────
PGPASSWORD="$DB_PASS" pg_dumpall   --host="$DB_HOST" --username=postgres   --globals-only \         # solo roles y tablespaces, no datos
  --file="roles_${FECHA}.sql"
scripts/backup-node.ts — backup desde Node.jsNODE
import { exec }  from 'node:child_process';
import { promisify } from 'node:util';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import fs from 'node:fs';

const execAsync = promisify(exec);

export async function runBackup(): Promise<string> {
  const ts       = new Date().toISOString().replace(/[:.]/g, '-');
  const filename = `erp_${ts}.dump`;
  const filepath = `/tmp/${filename}`;

  // Ejecutar pg_dump
  await execAsync(
    `PGPASSWORD=${process.env.DB_PASS} pg_dump       -h ${process.env.DB_HOST}       -U ${process.env.DB_USER}       -d ${process.env.DB_NAME}       -F c -Z 9 --no-owner -f ${filepath}`
  );

  // Subir a S3
  const s3 = new S3Client({ region: 'us-east-1' });
  await s3.send(new PutObjectCommand({
    Bucket: process.env.S3_BUCKET,
    Key:    `backups/${filename}`,
    Body:   fs.createReadStream(filepath),
    StorageClass: 'STANDARD_IA',
  }));

  fs.unlinkSync(filepath);
  return filename;
}
Formato pg_dumpFlagUso ideal
custom-F cProduccion — comprimido, restauracion selectiva por tabla
directory-F dBases grandes — permite --jobs paralelo en pg_restore
plain SQL-F pDev/debugging — texto legible, no comprimido

// 11

Replicación Básica

PostgreSQL soporta streaming replication — un servidor primario transmite cambios WAL en tiempo real a uno o más standbys. El standby puede usarse como lectura o como failover.

replication/setup.sql + postgresql.confOPS
-- ── Configuracion en el PRIMARIO ────────────────────
-- postgresql.conf del primario:
-- wal_level = replica
-- max_wal_senders = 3      # max conexiones de standbys
-- wal_keep_size = 256MB    # conservar WAL para standbys lentos

-- Crear usuario de replicacion
CREATE USER replicador
  WITH REPLICATION
  ENCRYPTED PASSWORD 'pass-replicacion'
  CONNECTION LIMIT 5;

-- pg_hba.conf del primario (permitir conexion del standby):
-- host  replication  replicador  10.0.0.2/32  scram-sha-256

-- ── Crear base del standby (en el servidor standby) ─
-- pg_basebackup \
--   -h 10.0.0.1 -U replicador \
--   -D /var/lib/postgresql/16/main \
--   --wal-method=stream \
--   --progress --verbose

-- postgresql.conf del standby:
-- primary_conninfo = 'host=10.0.0.1 user=replicador password=xxx'
-- hot_standby = on          # permitir SELECT en el standby

-- postgresql.auto.conf del standby:
-- recovery_target_timeline = 'latest'

-- Crear archivo de señal (PG 12+)
-- touch /var/lib/postgresql/16/main/standby.signal

-- ── Monitorear replicacion desde el PRIMARIO ─────────
SELECT
  client_addr,
  state,
  sent_lsn,
  write_lsn,
  flush_lsn,
  replay_lsn,
  (sent_lsn - replay_lsn)  AS replication_lag_bytes,
  sync_state
FROM   pg_stat_replication;

-- Lag en segundos (requiere PG 10+)
SELECT
  client_addr,
  EXTRACT(EPOCH FROM replay_lag) AS lag_segundos
FROM   pg_stat_replication;

-- ── Node.js: pool separado para lectura ──────────────
// src/db/pool.ts
// export const poolWrite = new Pool({ host: '10.0.0.1', ... });  // primario
// export const poolRead  = new Pool({ host: '10.0.0.2', ... });  // standby
//
// Reportes y listados van al standby, escrituras al primario
// const productos = await queryRead('SELECT ...') // standby
// const venta     = await query('INSERT ...')     // primario
⚠️
Read replicas y consistenciaEl standby puede estar unos milisegundos detrás del primario. Si un usuario hace INSERT y luego inmediatamente un SELECT del mismo dato, envía el SELECT al primario también — o acepta que puede no ver el dato de inmediato. Para el ERP: reportes al standby, consultas de POS al primario.

// 12

Monitoreo y Métricas

PostgreSQL expone estadísticas detalladas en vistas del sistema (pg_stat_*). Desde Node.js puedes construir un endpoint de health que consulta estas vistas y alerta antes de que algo falle.

src/health/db-health.ts + queries de monitoreoNODE
-- ── Queries de monitoreo esenciales ──────────────────

-- Conexiones activas por estado
SELECT
  state,
  COUNT(*)         AS total,
  MAX(EXTRACT(EPOCH FROM (now() - query_start))) AS max_seg
FROM   pg_stat_activity
WHERE  datname = current_database()
GROUP BY state;

-- Queries actualmente en ejecucion > 5 segundos
SELECT
  pid,
  usename,
  application_name,
  state,
  ROUND(EXTRACT(EPOCH FROM (now() - query_start))::numeric, 1) AS seg,
  LEFT(query, 80) AS query_resumen
FROM   pg_stat_activity
WHERE  state = 'active'
  AND  query_start < now() - '5 seconds'::interval
  AND  pid <> pg_backend_pid()
ORDER BY seg DESC;

-- Tamano de tablas y sus indices
SELECT
  tablename,
  pg_size_pretty(pg_table_size(schemaname||'.'||tablename))   AS datos,
  pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) AS indices,
  pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS total
FROM   pg_tables
WHERE  schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 10;

-- Hits de cache (debe ser > 95%)
SELECT
  ROUND(heap_blks_hit * 100.0 /
    NULLIF(heap_blks_hit + heap_blks_read, 0), 1) AS cache_hit_pct
FROM   pg_statio_user_tables
WHERE  relname = 'ventas';

-- ── Endpoint de health en Moleculer ──────────────────
// actions: {
//   'db.health': async (ctx) => {
//     const [conn, slow, size] = await Promise.all([
//       query('SELECT COUNT(*) FROM pg_stat_activity WHERE state=$1', ['active']),
//       query("SELECT COUNT(*) FROM pg_stat_activity WHERE state='active' AND query_start < now()-'5s'::interval"),
//       query("SELECT pg_size_pretty(pg_database_size(current_database())) AS size"),
//     ]);
//     return {
//       active_connections: +conn[0].count,
//       slow_queries:       +slow[0].count,
//       db_size:            size[0].size,
//       pool: { total: pool.totalCount, idle: pool.idleCount, waiting: pool.waitingCount },
//     };
//   }
// }
Demo — simulador de pg_stat_activity + pool health▶ LIVE
// Simular snapshot de pg_stat_activity const pgStatActivity = [ { pid:101, state:'active', app:'erp-api', query_sec:0.3 }, { pid:102, state:'active', app:'erp-api', query_sec:8.2 }, { pid:103, state:'idle', app:'erp-api', query_sec:null }, { pid:104, state:'idle', app:'erp-api', query_sec:null }, { pid:105, state:'active', app:'erp-api', query_sec:12.5 }, { pid:106, state:'waiting', app:'pgAdmin', query_sec:2.1 }, ]; const pool = { total:10, idle:6, waiting:1 }; function dbHealth(activity, pool) { const byState = {}; activity.forEach(r => { byState[r.state] = (byState[r.state] || 0) + 1; }); const slow = activity.filter(r => r.query_sec > 5); const cacheHit = 98.4; // simulado console.log('=== DB Health Report ==='); console.log('Conexiones por estado:', JSON.stringify(byState)); console.log('Queries lentas (>5s):', slow.length); slow.forEach(q => console.log(' PID', q.pid, '-', q.query_sec + 's')); console.log('Pool:', `${pool.total} total, ${pool.idle} idle, ${pool.waiting} waiting`); console.log('Cache hit:', cacheHit + '%', cacheHit >= 95 ? '✓ OK' : '⚠ BAJO'); const status = slow.length === 0 && pool.waiting < 3 ? 'healthy' : 'degraded'; console.log('Status:', status); } dbHealth(pgStatActivity, pool);
// outputHaz clic en Ejecutar...

// 13

PostgreSQL en el ERP Real

Cómo todo lo del tutorial se conecta en producción: arquitectura de la conexión Node.js → Pool → PostgreSQL dentro del stack Moleculer del ERP.

src/services/ventas.service.ts — servicio Moleculer completoREAL
import { Service, Context }     from 'moleculer';
import { withTransaction, query } from '../db/pool.js';
import type { PoolClient }         from 'pg';

export default class VentasService extends Service {
  name = 'ventas';

  actions = {
    // Accion principal: registrar venta completa
    'registrar': {
      params: {
        sucursalId: 'uuid',
        items: { type: 'array', min: 1 },
        metodoPago: 'string',
      },
      async handler(ctx: Context<RegistrarVentaDTO>) {
        const { sucursalId, items, metodoPago } = ctx.params;

        const venta = await withTransaction(async (client: PoolClient) => {
          // Setear contexto para audit trigger
          await client.query('SET LOCAL app.user_id = $1', [ctx.meta.userId]);
          await client.query('SET LOCAL app.sucursal_id = $1', [sucursalId]);

          // Calcular total con precios actuales del catalogo
          const precios = await client.query(
            `SELECT id, precio FROM productos
             WHERE id = ANY($1) AND activo = true`,
            [items.map((i: any) => i.productoId)]
          );
          const precioMap = Object.fromEntries(
            precios.rows.map((r: any) => [r.id, +r.precio])
          );

          const total = items.reduce((sum: number, item: any) =>
            sum + precioMap[item.productoId] * item.cantidad, 0
          );
          const folio = `VTA-${Date.now()}`;

          // INSERT venta
          const { rows: [v] } = await client.query(
            `INSERT INTO ventas (sucursal_id, folio, total, metodo_pago)
             VALUES ($1,$2,$3,$4) RETURNING *`,
            [sucursalId, folio, total.toFixed(2), metodoPago]
          );

          // INSERT items + descontar inventario
          for (const item of items) {
            await client.query(
              `INSERT INTO venta_items (venta_id, producto_id, cantidad, precio)
               VALUES ($1,$2,$3,$4)`,
              [v.id, item.productoId, item.cantidad, precioMap[item.productoId]]
            );
            await client.query(
              `UPDATE inventario SET stock = stock - $1
               WHERE sucursal_id = $2 AND producto_id = $3`,
              [item.cantidad, sucursalId, item.productoId]
            );
          }
          return v;
        });

        // Emitir evento Moleculer (sin bloquear)
        ctx.emit('venta.registrada', { ventaId: venta.id, sucursalId });
        return venta;
      }
    },

    // Corte de caja via funcion PL/pgSQL
    'corte': {
      params: { sucursalId: 'uuid', fecha: { type: 'string', optional: true } },
      async handler(ctx: Context) {
        const { sucursalId, fecha } = ctx.params as any;
        return query(
          'SELECT * FROM corte_de_caja($1, $2)',
          [sucursalId, fecha ?? null]
        );
      }
    }
  };
}
Demo — simulacion completa: pool + query tipada + auditoria▶ LIVE
// Simular stack completo: Pool → Query → Trigger → Audit const pool = { totalCount: 10, idleCount: 7, waitingCount: 0, clients: [], connect() { const client = { context: {}, auditLog: [], db: { ventas: [], inventario: { 'p1': 8.5, 'p2': 3.0 } }, async query(sql, params = []) { const up = sql.toUpperCase().trim(); if (up.startsWith('SET LOCAL APP.')) { const [key, val] = [sql.match(/APP\.([\w]+)/)[1], params[0]]; this.context[key] = val; return { rows: [] }; } if (up.startsWith('INSERT INTO VENTAS')) { const venta = { id: 'v-' + Date.now(), folio: params[1], total: params[2] }; this.db.ventas.push(venta); // Simular trigger de auditoria this.auditLog.push({ op:'INSERT', tabla:'ventas', row_id: venta.id, app_user: this.context.user_id }); return { rows: [venta] }; } if (up.startsWith('UPDATE INVENTARIO')) { const [cantidad, , prodId] = params; this.db.inventario[prodId] -= cantidad; return { rows: [] }; } return { rows: [] }; }, release() { pool.idleCount++; } }; pool.idleCount--; return client; } }; async function registrarVenta(dto, userId) { const client = await pool.connect(); try { await client.query('SET LOCAL app.user_id = $1', [userId]); await client.query('SET LOCAL app.sucursal_id = $1', [dto.sucursalId]); const [v] = (await client.query( 'INSERT INTO ventas(suc,folio,total,metodo) VALUES($1,$2,$3,$4)', [dto.sucursalId, dto.folio, dto.total, dto.metodoPago] )).rows; for (const item of dto.items) { await client.query('UPDATE inventario SET stock=stock-$1 WHERE suc=$2 AND prod=$3', [item.cantidad, dto.sucursalId, item.productoId]); } console.log('✓ Venta:', v.folio, '— total: $' + v.total); console.log(' Audit log:', JSON.stringify(client.auditLog)); console.log(' Inventario final:', JSON.stringify(client.db.inventario)); console.log(' Pool idle:', pool.idleCount + '/10'); client.release(); return v; } catch(e) { client.release(); throw e; } } await registrarVenta({ sucursalId: 's1', folio: 'VTA-9001', total: 567.50, metodoPago: 'efectivo', items: [{ productoId: 'p1', cantidad: 2.5 }, { productoId: 'p2', cantidad: 1.0 }] }, 'user-admin');
// outputHaz clic en Ejecutar...
🐘
PostgreSQL completa el stack del ERPNode.js habla con PostgreSQL via el pool de pg. Los triggers mantienen timestamps y auditoría sin código extra en la app. Las funciones PL/pgSQL encapsulan la lógica de corte de caja y transferencias. Moleculer llama a estas queries desde sus acciones y emite eventos que BullMQ procesa asincrónamente.