Roles y Permisos
PostgreSQL usa un modelo de roles — no hay distincion entre usuarios y grupos. Un rol puede tener contrasena (usuario) o no (grupo). Todo acceso a objetos pasa por GRANT/REVOKE.
-- ── Crear roles para el ERP ───────────────────────── CREATE ROLE erp_readonly; CREATE ROLE erp_app; CREATE ROLE erp_admin; -- Crear usuarios (roles con LOGIN y contrasena) CREATE USER erp_api_user WITH PASSWORD 'p4ssword-seguro' CONNECTION LIMIT 50 IN ROLE erp_app; -- hereda permisos de erp_app CREATE USER erp_readonly_user WITH PASSWORD 'readonly-pass' IN ROLE erp_readonly; -- ── Permisos sobre schema ──────────────────────────── GRANT USAGE ON SCHEMA public TO erp_app; GRANT USAGE ON SCHEMA public TO erp_readonly; -- Permisos sobre tablas existentes GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO erp_app; GRANT SELECT ON ALL TABLES IN SCHEMA public TO erp_readonly; -- Permisos sobre secuencias (para SERIAL/BIGSERIAL) GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO erp_app; -- DEFAULT PRIVILEGES — aplica a tablas FUTURAS tambien ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO erp_app; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO erp_readonly; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE, SELECT ON SEQUENCES TO erp_app; -- ── Row Level Security (RLS) ───────────────────────── -- Cada sucursal solo ve sus propias ventas ALTER TABLE ventas ENABLE ROW LEVEL SECURITY; CREATE POLICY ventas_sucursal_policy ON ventas USING (sucursal_id = current_setting('app.sucursal_id')::uuid); -- En Node.js, antes de cada query de sucursal: -- await client.query("SET app.sucursal_id = $1", [sucursalId]) -- erp_admin bypasea RLS ALTER TABLE ventas FORCE ROW LEVEL SECURITY; GRANT BYPASSRLS TO erp_admin; -- ── Ver roles y permisos ───────────────────────────── SELECT rolname, rolsuper, rolcreatedb, rolcanlogin, rolconnlimit FROM pg_roles WHERE rolname LIKE 'erp%';
Schemas y Namespaces
Un schema es un namespace dentro de una base de datos. Permite organizar tablas por dominio, separar datos de la app de datos internos, y dar permisos granulares por area.
-- Schemas para el ERP carnicerías CREATE SCHEMA ventas; -- tablas de punto de venta CREATE SCHEMA inventario; -- stock, movimientos, almacen CREATE SCHEMA rrhh; -- empleados, turnos, nomina CREATE SCHEMA contabilidad; -- cfdi, cuentas, cierres CREATE SCHEMA _audit; -- logs de cambios (prefijo _ = interno) -- Tablas con schema explicito CREATE TABLE ventas.ventas ( ... ); CREATE TABLE inventario.movimientos ( ... ); CREATE TABLE _audit.log_cambios ( ... ); -- search_path — schemas por defecto para queries sin prefijo SET search_path TO ventas, inventario, public; -- Para un usuario especifico (persistente) ALTER USER erp_api_user SET search_path TO ventas, inventario, public; -- Permisos por schema GRANT USAGE ON SCHEMA ventas TO erp_app; GRANT USAGE ON SCHEMA inventario TO erp_app; GRANT USAGE ON SCHEMA rrhh TO erp_admin; -- solo admin GRANT USAGE ON SCHEMA contabilidad TO erp_admin; -- Ver todos los schemas SELECT schema_name, schema_owner FROM information_schema.schemata WHERE schema_name NOT IN ('pg_catalog', 'information_schema'); -- Ver tablas de un schema especifico SELECT table_name FROM information_schema.tables WHERE table_schema = 'ventas' ORDER BY table_name;
Configuración del Servidor
postgresql.conf controla el comportamiento del servidor. Los parametros mas importantes para produccion son memoria, conexiones maximas y logging de queries lentas.
# ── Conexiones ─────────────────────────────────────── max_connections = 100 # ajustar segun RAM y uso # Node.js Pool usa ~10-20 conexiones # ── Memoria ───────────────────────────────────────── shared_buffers = 256MB # 25% de la RAM disponible effective_cache_size = 1GB # 75% de la RAM (ayuda al planner) work_mem = 16MB # por operacion de sort/hash maintenance_work_mem = 128MB # para VACUUM, CREATE INDEX # ── WAL y durabilidad ──────────────────────────────── wal_level = replica # necesario para replicacion synchronous_commit = on # off = mas rapido, menos seguro checkpoint_completion_target = 0.9 # ── Logging de queries lentas ──────────────────────── log_min_duration_statement = 1000 # loggear queries > 1 segundo log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h ' log_checkpoints = on log_connections = on log_disconnections = on # ── Autovacuum ────────────────────────────────────── autovacuum = on # siempre on en produccion autovacuum_max_workers = 3 autovacuum_vacuum_scale_factor = 0.05 # vacuum al 5% de cambios # ── Ver configuracion actual ───────────────────────── SHOW shared_buffers; SHOW max_connections; SELECT name, setting, unit, context FROM pg_settings WHERE name IN ('max_connections','shared_buffers','work_mem');
Pool de Conexiones
Una conexion a PostgreSQL cuesta ~5MB de RAM y ~50ms para establecer. El Pool reutiliza conexiones existentes — la app arranca 10 conexiones y las comparte entre miles de requests concurrentes.
import { Pool, PoolConfig } from 'pg'; import { config } from '../config/index.js'; const poolConfig: PoolConfig = { host: config.db.host, port: config.db.port, database: config.db.database, user: config.db.user, password: config.db.password, // ── Tamano del pool ────────────────────────────── max: 20, // conexiones maximas simultaneas min: 2, // conexiones minimas en idle idleTimeoutMillis: 30000, // cerrar conexion idle tras 30s connectionTimeoutMillis: 5000, // error si no hay conexion en 5s // ── SSL en produccion ──────────────────────────── ssl: config.isProduction ? { rejectUnauthorized: true } : false, // ── Application name (visible en pg_stat_activity) ── application_name: 'erp-carniceria-api', }; export const pool = new Pool(poolConfig); // Loggear errores del pool (conexiones muertas, timeouts) pool.on('error', (err, client) => { console.error('Pool error inesperado en cliente idle', err); }); pool.on('connect', (client) => { // Configurar search_path en cada nueva conexion client.query('SET search_path TO ventas, inventario, public'); }); // ── Helper tipado para queries ──────────────────── export async function query<T = Record<string, unknown>>( text: string, params?: unknown[] ): Promise<T[]> { const start = Date.now(); const { rows } = await pool.query<T>(text, params); const duration = Date.now() - start; if (duration > 1000) { console.warn(`Query lenta (${duration}ms): ${text.slice(0, 80)}`); } return rows; } // Query que devuelve una sola fila o null export async function queryOne<T>( text: string, params?: unknown[] ): Promise<T | null> { const rows = await query<T>(text, params); return rows[0] ?? null; } // Graceful shutdown — cerrar pool al apagar la app export async function closePool(): Promise<void> { await pool.end(); console.log('Pool cerrado correctamente'); } process.on('SIGTERM', closePool); process.on('SIGINT', closePool);
| Parametro | Valor tipico ERP | Razon |
|---|---|---|
| max | 10–20 | max_connections PG = 100, deja margen para otras apps |
| min | 2 | Mantiene conexiones listas para requests inmediatos |
| idleTimeoutMillis | 30 000 | Cierra conexiones que llevan 30s sin usar |
| connectionTimeoutMillis | 5 000 | Falla rapido si el pool esta lleno — mejor que colgar |
Queries Parametrizadas
Los parametros ($1, $2...) son la unica defensa contra SQL injection. El driver pg envía la query y los valores por separado — PostgreSQL nunca interpreta los valores como SQL.
import { query, queryOne } from '../db/pool.js'; // ── Interfaz del dominio ───────────────────────────── interface Producto { id: string; nombre: string; precio: number; categoria: string; unidad: string; activo: boolean; } // ── SELECT tipado ───────────────────────────────────── export async function listarProductos(filtros?: { categoria?: string; activo?: boolean; limite?: number; offset?: number; }): Promise<Producto[]> { const { categoria, activo = true, limite = 20, offset = 0 } = filtros ?? {}; const params: unknown[] = [activo, limite, offset]; let sql = ` SELECT id, nombre, precio, categoria, unidad, activo FROM productos WHERE activo = $1`; if (categoria) { params.splice(1, 0, categoria); sql += ` AND categoria = $2`; } sql += ` ORDER BY nombre LIMIT $${params.length-1} OFFSET $${params.length}`; return query<Producto>(sql, params); } // ── INSERT con RETURNING tipado ─────────────────────── interface CrearProductoDTO { nombre: string; codigo: string; precio: number; categoria: string; unidad: string; } export async function crearProducto(dto: CrearProductoDTO): Promise<Producto> { const row = await queryOne<Producto>( `INSERT INTO productos (nombre, codigo, precio, categoria, unidad) VALUES ($1, $2, $3, $4, $5) RETURNING id, nombre, precio, categoria, unidad, activo`, [dto.nombre, dto.codigo, dto.precio, dto.categoria, dto.unidad] ); if (!row) throw new Error('INSERT no retorno datos'); return row; } // ── UPSERT — on conflict ────────────────────────────── export async function upsertProducto(dto: CrearProductoDTO): Promise<Producto> { const row = await queryOne<Producto>( `INSERT INTO productos (nombre, codigo, precio, categoria, unidad) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (codigo) DO UPDATE SET precio = EXCLUDED.precio, actualizado_en = now() RETURNING *`, [dto.nombre, dto.codigo, dto.precio, dto.categoria, dto.unidad] ); if (!row) throw new Error('Upsert fallo'); return row; } // ── Busqueda full-text ──────────────────────────────── export async function buscarProductos(termino: string): Promise<Producto[]> { return query<Producto>( `SELECT id, nombre, precio, categoria, unidad, ts_rank(to_tsvector('spanish', unaccent(nombre)), plainto_tsquery('spanish', unaccent($1))) AS rank FROM productos WHERE to_tsvector('spanish', unaccent(nombre)) @@ plainto_tsquery('spanish', unaccent($1)) AND activo = true ORDER BY rank DESC LIMIT 20`, [termino] ); }
Migraciones con Node.js
Las migraciones son archivos SQL con numeros de version que se aplican en orden. Cada deploy aplica solo las nuevas. Nunca edites una migracion ya aplicada — crea una nueva.
import fs from 'node:fs/promises'; import path from 'node:path'; import { pool } from '../pool.js'; // Tabla que registra que migraciones se han aplicado async function ensureMigrationsTable(): Promise<void> { await pool.query(` CREATE TABLE IF NOT EXISTS _migrations ( id SERIAL PRIMARY KEY, filename TEXT UNIQUE NOT NULL, applied_at TIMESTAMPTZ NOT NULL DEFAULT now() ) `); } export async function runMigrations(): Promise<void> { await ensureMigrationsTable(); // Leer todos los .sql de la carpeta migrations/sql/ const migsDir = new URL('./sql', import.meta.url).pathname; const files = (await fs.readdir(migsDir)) .filter(f => f.endsWith('.sql')) .sort(); // Ver cuales ya estan aplicadas const { rows: applied } = await pool.query<{ filename: string }>( 'SELECT filename FROM _migrations' ); const appliedSet = new Set(applied.map(r => r.filename)); // Aplicar solo las nuevas, en una transaccion cada una for (const file of files) { if (appliedSet.has(file)) continue; const sql = await fs.readFile(path.join(migsDir, file), 'utf-8'); const client = await pool.connect(); try { await client.query('BEGIN'); await client.query(sql); await client.query( 'INSERT INTO _migrations (filename) VALUES ($1)', [file] ); await client.query('COMMIT'); console.log(`✓ Migración aplicada: ${file}`); } catch (err) { await client.query('ROLLBACK'); console.error(`✗ Error en migración ${file}:`, err); throw err; // detener — no continuar con migraciones siguientes } finally { client.release(); } } console.log('Migraciones completadas'); }
-- 001_initial_schema.sql CREATE EXTENSION IF NOT EXISTS "pgcrypto"; CREATE EXTENSION IF NOT EXISTS "unaccent"; CREATE TABLE sucursales ( ... ); CREATE TABLE productos ( ... ); -- 002_add_ventas.sql CREATE TABLE ventas ( ... ); CREATE TABLE venta_items( ... ); CREATE INDEX idx_ventas_sucursal ON ventas(sucursal_id, creado_en DESC); -- 003_add_inventario.sql CREATE TABLE inventario ( ... ); CREATE INDEX idx_inv_stock_bajo ON inventario(sucursal_id, producto_id) WHERE stock <= stock_minimo; -- 004_add_clientes_fts.sql ← migration nueva, no toca las anteriores ALTER TABLE clientes ADD COLUMN IF NOT EXISTS nombre_search tsvector GENERATED ALWAYS AS (to_tsvector('spanish', unaccent(nombre))) STORED; CREATE INDEX idx_clientes_fts ON clientes USING GIN(nombre_search);
Transacciones en Node.js
En Node.js, las transacciones requieren usar el mismo client para todas las queries — no el pool. El pool puede dar clients diferentes a cada llamada. El patron correcto: connect(), BEGIN, operaciones, COMMIT/ROLLBACK, release().
import { pool } from './pool.js'; import type { PoolClient } from 'pg'; // Helper que maneja BEGIN/COMMIT/ROLLBACK automaticamente export async function withTransaction<T>( fn: (client: PoolClient) => Promise<T> ): Promise<T> { const client = await pool.connect(); try { await client.query('BEGIN'); const result = await fn(client); await client.query('COMMIT'); return result; } catch (err) { await client.query('ROLLBACK'); throw err; } finally { client.release(); // SIEMPRE devolver al pool } } // ── Uso: registrar venta con items e inventario ─────── export async function registrarVenta(dto: CrearVentaDTO): Promise<Venta> { return withTransaction(async (client) => { // 1. Verificar stock disponible for (const item of dto.items) { const { rows } = await client.query( `SELECT stock FROM inventario WHERE sucursal_id = $1 AND producto_id = $2 FOR UPDATE`, // bloquear fila hasta COMMIT [dto.sucursalId, item.productoId] ); if (!rows[0] || rows[0].stock < item.cantidad) { throw new Error(`Stock insuficiente: ${item.productoId}`); } } // 2. Crear la venta const { rows: [venta] } = await client.query( `INSERT INTO ventas (sucursal_id, folio, total, metodo_pago) VALUES ($1, $2, $3, $4) RETURNING *`, [dto.sucursalId, dto.folio, dto.total, dto.metodoPago] ); // 3. Insertar items y descontar inventario for (const item of dto.items) { await client.query( `INSERT INTO venta_items (venta_id, producto_id, cantidad, precio) VALUES ($1, $2, $3, $4)`, [venta.id, item.productoId, item.cantidad, item.precio] ); await client.query( `UPDATE inventario SET stock = stock - $1 WHERE sucursal_id = $2 AND producto_id = $3`, [item.cantidad, dto.sucursalId, item.productoId] ); } return venta as Venta; }); // si cualquier paso falla -> ROLLBACK automatico }
Funciones y PL/pgSQL
PL/pgSQL es el lenguaje procedural de PostgreSQL. Las funciones y stored procedures se ejecutan dentro del servidor — reducen round-trips de red y encapsulan logica de negocio compleja.
-- ── FUNCTION — retorna un valor ────────────────────── CREATE OR REPLACE FUNCTION calcular_precio_con_iva( precio_base NUMERIC, porcentaje NUMERIC DEFAULT 0.16 ) RETURNS NUMERIC LANGUAGE plpgsql IMMUTABLE -- no cambia datos, mismo input = mismo output AS $$ BEGIN RETURN ROUND(precio_base * (1 + porcentaje), 2); END; $$; -- Uso: SELECT nombre, precio, calcular_precio_con_iva(precio) AS precio_iva FROM productos; -- ── FUNCTION que retorna TABLE ──────────────────────── CREATE OR REPLACE FUNCTION corte_de_caja( p_sucursal_id UUID, p_fecha DATE DEFAULT CURRENT_DATE ) RETURNS TABLE( metodo_pago TEXT, num_ventas BIGINT, monto NUMERIC, pct_total NUMERIC ) LANGUAGE plpgsql STABLE -- no modifica datos AS $$ DECLARE v_total NUMERIC; BEGIN -- Calcular total del dia SELECT COALESCE(SUM(v.total), 0) INTO v_total FROM ventas v WHERE v.sucursal_id = p_sucursal_id AND v.creado_en::date = p_fecha AND v.estado = 'pagada'; IF v_total = 0 THEN RAISE NOTICE 'Sin ventas para sucursal % en %', p_sucursal_id, p_fecha; END IF; -- Retornar filas por metodo de pago RETURN QUERY SELECT v.metodo_pago, COUNT(*)::BIGINT, SUM(v.total), CASE WHEN v_total > 0 THEN ROUND(SUM(v.total) * 100.0 / v_total, 1) ELSE 0 END FROM ventas v WHERE v.sucursal_id = p_sucursal_id AND v.creado_en::date = p_fecha AND v.estado = 'pagada' GROUP BY v.metodo_pago ORDER BY monto DESC; END; $$; -- Llamar desde Node.js: -- const rows = await query('SELECT * FROM corte_de_caja($1, $2)', [sucursalId, fecha]) -- ── PROCEDURE — para operaciones con transaccion ───── CREATE OR REPLACE PROCEDURE transferir_stock( p_producto_id UUID, p_origen_id UUID, p_destino_id UUID, p_cantidad NUMERIC ) LANGUAGE plpgsql AS $$ DECLARE v_stock_origen NUMERIC; BEGIN SELECT stock INTO v_stock_origen FROM inventario WHERE producto_id = p_producto_id AND sucursal_id = p_origen_id FOR UPDATE; IF v_stock_origen < p_cantidad THEN RAISE EXCEPTION 'Stock insuficiente: tiene % pero pide %', v_stock_origen, p_cantidad; END IF; UPDATE inventario SET stock = stock - p_cantidad WHERE producto_id = p_producto_id AND sucursal_id = p_origen_id; INSERT INTO inventario (sucursal_id, producto_id, stock) VALUES (p_destino_id, p_producto_id, p_cantidad) ON CONFLICT (sucursal_id, producto_id) DO UPDATE SET stock = inventario.stock + p_cantidad; END; $$; -- Llamar: CALL transferir_stock($1, $2, $3, $4)
Triggers Avanzados
Un trigger es una funcion que PostgreSQL ejecuta automaticamente en respuesta a INSERT, UPDATE o DELETE. Ideal para auditoría, timestamps automaticos, validaciones complejas y eventos de dominio.
-- ── 1. Trigger de timestamp automatico ─────────────── CREATE OR REPLACE FUNCTION fn_set_actualizado_en() RETURNS TRIGGER LANGUAGE plpgsql AS $$ BEGIN NEW.actualizado_en = now(); RETURN NEW; END; $$; -- Aplicar a todas las tablas que lo necesiten CREATE TRIGGER trg_productos_ts BEFORE UPDATE ON productos FOR EACH ROW EXECUTE FUNCTION fn_set_actualizado_en(); CREATE TRIGGER trg_ventas_ts BEFORE UPDATE ON ventas FOR EACH ROW EXECUTE FUNCTION fn_set_actualizado_en(); -- ── 2. Trigger de auditoria completa ───────────────── CREATE TABLE _audit.log ( id BIGSERIAL PRIMARY KEY, tabla TEXT NOT NULL, operacion TEXT NOT NULL, -- INSERT | UPDATE | DELETE row_id TEXT, datos_antes JSONB, datos_des JSONB, usuario TEXT DEFAULT current_user, app_user TEXT, -- usuario de la app (SET app.user_id = ...) ts TIMESTAMPTZ DEFAULT now() ); CREATE OR REPLACE FUNCTION fn_audit() RETURNS TRIGGER LANGUAGE plpgsql SECURITY DEFINER -- se ejecuta con permisos del dueno AS $$ BEGIN INSERT INTO _audit.log (tabla, operacion, row_id, datos_antes, datos_des, app_user) VALUES ( TG_TABLE_NAME, TG_OP, CASE TG_OP WHEN 'DELETE' THEN OLD.id::TEXT ELSE NEW.id::TEXT END, CASE TG_OP WHEN 'INSERT' THEN NULL ELSE to_jsonb(OLD) END, CASE TG_OP WHEN 'DELETE' THEN NULL ELSE to_jsonb(NEW) END, current_setting('app.user_id', true) ); RETURN COALESCE(NEW, OLD); END; $$; -- Aplicar auditoria a ventas CREATE TRIGGER trg_ventas_audit AFTER INSERT OR UPDATE OR DELETE ON ventas FOR EACH ROW EXECUTE FUNCTION fn_audit(); -- En Node.js, antes de cada operacion sensible: -- await client.query("SET app.user_id = $1", [userId]) -- ── 3. Trigger de validacion compleja ──────────────── CREATE OR REPLACE FUNCTION fn_validar_venta_item() RETURNS TRIGGER LANGUAGE plpgsql AS $$ DECLARE v_precio NUMERIC; v_activo BOOLEAN; BEGIN SELECT precio, activo INTO v_precio, v_activo FROM productos WHERE id = NEW.producto_id; IF NOT v_activo THEN RAISE EXCEPTION 'Producto % no esta activo', NEW.producto_id; END IF; -- Precio no puede desviarse mas del 20% del precio actual IF ABS(NEW.precio - v_precio) / v_precio > 0.20 THEN RAISE WARNING 'Precio % muy diferente al catalogo %', NEW.precio, v_precio; END IF; RETURN NEW; END; $$; CREATE TRIGGER trg_venta_item_validar BEFORE INSERT ON venta_items FOR EACH ROW EXECUTE FUNCTION fn_validar_venta_item(); -- ── Ver todos los triggers de una tabla ────────────── SELECT trigger_name, event_manipulation, action_timing, action_statement FROM information_schema.triggers WHERE event_object_table = 'ventas' ORDER BY action_timing, event_manipulation;
Backups y pg_dump
Un backup que no se ha probado no es un backup. pg_dump exporta una base de datos completa o selectiva. pg_restore la reconstruye. WAL archiving permite point-in-time recovery.
#!/bin/bash # Variables de entorno: DB_HOST, DB_NAME, DB_USER, BACKUP_DIR, S3_BUCKET FECHA=$(date +%Y%m%d_%H%M%S) ARCHIVO="$BACKUP_DIR/erp_${DB_NAME}_${FECHA}.dump" # ── pg_dump en formato custom (comprimido, restaurable selectivamente) ─ PGPASSWORD="$DB_PASS" pg_dump --host="$DB_HOST" --username="$DB_USER" --dbname="$DB_NAME" --format=custom \ # formato binario comprimido (preferido) --compress=9 \ # maxima compresion --no-owner \ # no incluir SET OWNER (facilita restaurar en otro server) --no-privileges --file="$ARCHIVO" # ── Verificar que el backup es valido ──────────────── pg_restore --list "$ARCHIVO" > /dev/null 2>&1 if [ $? -ne 0 ]; then echo "ERROR: backup corrupto $ARCHIVO" exit 1 fi # ── Subir a S3 ─────────────────────────────────────── aws s3 cp "$ARCHIVO" "s3://$S3_BUCKET/backups/$(basename $ARCHIVO)" --storage-class STANDARD_IA # Infrequent Access = mas barato # ── Limpiar backups locales > 7 dias ───────────────── find "$BACKUP_DIR" -name "*.dump" -mtime +7 -delete echo "Backup completado: $ARCHIVO" ## ── RESTAURAR un backup ────────────────────────────── # Crear DB nueva (no restaurar sobre produccion activa) createdb -h "$DB_HOST" -U "$DB_USER" erp_restore_test # Restaurar completo PGPASSWORD="$DB_PASS" pg_restore --host="$DB_HOST" --username="$DB_USER" --dbname=erp_restore_test --jobs=4 \ # restauracion paralela (mas rapido) --verbose "$ARCHIVO" # Restaurar solo una tabla especifica PGPASSWORD="$DB_PASS" pg_restore --host="$DB_HOST" --username="$DB_USER" --dbname="$DB_NAME" --table=productos "$ARCHIVO" ## ── pg_dumpall — backup de roles y config global ───── PGPASSWORD="$DB_PASS" pg_dumpall --host="$DB_HOST" --username=postgres --globals-only \ # solo roles y tablespaces, no datos --file="roles_${FECHA}.sql"
import { exec } from 'node:child_process'; import { promisify } from 'node:util'; import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'; import fs from 'node:fs'; const execAsync = promisify(exec); export async function runBackup(): Promise<string> { const ts = new Date().toISOString().replace(/[:.]/g, '-'); const filename = `erp_${ts}.dump`; const filepath = `/tmp/${filename}`; // Ejecutar pg_dump await execAsync( `PGPASSWORD=${process.env.DB_PASS} pg_dump -h ${process.env.DB_HOST} -U ${process.env.DB_USER} -d ${process.env.DB_NAME} -F c -Z 9 --no-owner -f ${filepath}` ); // Subir a S3 const s3 = new S3Client({ region: 'us-east-1' }); await s3.send(new PutObjectCommand({ Bucket: process.env.S3_BUCKET, Key: `backups/${filename}`, Body: fs.createReadStream(filepath), StorageClass: 'STANDARD_IA', })); fs.unlinkSync(filepath); return filename; }
| Formato pg_dump | Flag | Uso ideal |
|---|---|---|
| custom | -F c | Produccion — comprimido, restauracion selectiva por tabla |
| directory | -F d | Bases grandes — permite --jobs paralelo en pg_restore |
| plain SQL | -F p | Dev/debugging — texto legible, no comprimido |
Replicación Básica
PostgreSQL soporta streaming replication — un servidor primario transmite cambios WAL en tiempo real a uno o más standbys. El standby puede usarse como lectura o como failover.
-- ── Configuracion en el PRIMARIO ──────────────────── -- postgresql.conf del primario: -- wal_level = replica -- max_wal_senders = 3 # max conexiones de standbys -- wal_keep_size = 256MB # conservar WAL para standbys lentos -- Crear usuario de replicacion CREATE USER replicador WITH REPLICATION ENCRYPTED PASSWORD 'pass-replicacion' CONNECTION LIMIT 5; -- pg_hba.conf del primario (permitir conexion del standby): -- host replication replicador 10.0.0.2/32 scram-sha-256 -- ── Crear base del standby (en el servidor standby) ─ -- pg_basebackup \ -- -h 10.0.0.1 -U replicador \ -- -D /var/lib/postgresql/16/main \ -- --wal-method=stream \ -- --progress --verbose -- postgresql.conf del standby: -- primary_conninfo = 'host=10.0.0.1 user=replicador password=xxx' -- hot_standby = on # permitir SELECT en el standby -- postgresql.auto.conf del standby: -- recovery_target_timeline = 'latest' -- Crear archivo de señal (PG 12+) -- touch /var/lib/postgresql/16/main/standby.signal -- ── Monitorear replicacion desde el PRIMARIO ───────── SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, (sent_lsn - replay_lsn) AS replication_lag_bytes, sync_state FROM pg_stat_replication; -- Lag en segundos (requiere PG 10+) SELECT client_addr, EXTRACT(EPOCH FROM replay_lag) AS lag_segundos FROM pg_stat_replication; -- ── Node.js: pool separado para lectura ────────────── // src/db/pool.ts // export const poolWrite = new Pool({ host: '10.0.0.1', ... }); // primario // export const poolRead = new Pool({ host: '10.0.0.2', ... }); // standby // // Reportes y listados van al standby, escrituras al primario // const productos = await queryRead('SELECT ...') // standby // const venta = await query('INSERT ...') // primario
Monitoreo y Métricas
PostgreSQL expone estadísticas detalladas en vistas del sistema (pg_stat_*). Desde Node.js puedes construir un endpoint de health que consulta estas vistas y alerta antes de que algo falle.
-- ── Queries de monitoreo esenciales ────────────────── -- Conexiones activas por estado SELECT state, COUNT(*) AS total, MAX(EXTRACT(EPOCH FROM (now() - query_start))) AS max_seg FROM pg_stat_activity WHERE datname = current_database() GROUP BY state; -- Queries actualmente en ejecucion > 5 segundos SELECT pid, usename, application_name, state, ROUND(EXTRACT(EPOCH FROM (now() - query_start))::numeric, 1) AS seg, LEFT(query, 80) AS query_resumen FROM pg_stat_activity WHERE state = 'active' AND query_start < now() - '5 seconds'::interval AND pid <> pg_backend_pid() ORDER BY seg DESC; -- Tamano de tablas y sus indices SELECT tablename, pg_size_pretty(pg_table_size(schemaname||'.'||tablename)) AS datos, pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) AS indices, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS total FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC LIMIT 10; -- Hits de cache (debe ser > 95%) SELECT ROUND(heap_blks_hit * 100.0 / NULLIF(heap_blks_hit + heap_blks_read, 0), 1) AS cache_hit_pct FROM pg_statio_user_tables WHERE relname = 'ventas'; -- ── Endpoint de health en Moleculer ────────────────── // actions: { // 'db.health': async (ctx) => { // const [conn, slow, size] = await Promise.all([ // query('SELECT COUNT(*) FROM pg_stat_activity WHERE state=$1', ['active']), // query("SELECT COUNT(*) FROM pg_stat_activity WHERE state='active' AND query_start < now()-'5s'::interval"), // query("SELECT pg_size_pretty(pg_database_size(current_database())) AS size"), // ]); // return { // active_connections: +conn[0].count, // slow_queries: +slow[0].count, // db_size: size[0].size, // pool: { total: pool.totalCount, idle: pool.idleCount, waiting: pool.waitingCount }, // }; // } // }
PostgreSQL en el ERP Real
Cómo todo lo del tutorial se conecta en producción: arquitectura de la conexión Node.js → Pool → PostgreSQL dentro del stack Moleculer del ERP.
import { Service, Context } from 'moleculer'; import { withTransaction, query } from '../db/pool.js'; import type { PoolClient } from 'pg'; export default class VentasService extends Service { name = 'ventas'; actions = { // Accion principal: registrar venta completa 'registrar': { params: { sucursalId: 'uuid', items: { type: 'array', min: 1 }, metodoPago: 'string', }, async handler(ctx: Context<RegistrarVentaDTO>) { const { sucursalId, items, metodoPago } = ctx.params; const venta = await withTransaction(async (client: PoolClient) => { // Setear contexto para audit trigger await client.query('SET LOCAL app.user_id = $1', [ctx.meta.userId]); await client.query('SET LOCAL app.sucursal_id = $1', [sucursalId]); // Calcular total con precios actuales del catalogo const precios = await client.query( `SELECT id, precio FROM productos WHERE id = ANY($1) AND activo = true`, [items.map((i: any) => i.productoId)] ); const precioMap = Object.fromEntries( precios.rows.map((r: any) => [r.id, +r.precio]) ); const total = items.reduce((sum: number, item: any) => sum + precioMap[item.productoId] * item.cantidad, 0 ); const folio = `VTA-${Date.now()}`; // INSERT venta const { rows: [v] } = await client.query( `INSERT INTO ventas (sucursal_id, folio, total, metodo_pago) VALUES ($1,$2,$3,$4) RETURNING *`, [sucursalId, folio, total.toFixed(2), metodoPago] ); // INSERT items + descontar inventario for (const item of items) { await client.query( `INSERT INTO venta_items (venta_id, producto_id, cantidad, precio) VALUES ($1,$2,$3,$4)`, [v.id, item.productoId, item.cantidad, precioMap[item.productoId]] ); await client.query( `UPDATE inventario SET stock = stock - $1 WHERE sucursal_id = $2 AND producto_id = $3`, [item.cantidad, sucursalId, item.productoId] ); } return v; }); // Emitir evento Moleculer (sin bloquear) ctx.emit('venta.registrada', { ventaId: venta.id, sucursalId }); return venta; } }, // Corte de caja via funcion PL/pgSQL 'corte': { params: { sucursalId: 'uuid', fecha: { type: 'string', optional: true } }, async handler(ctx: Context) { const { sucursalId, fecha } = ctx.params as any; return query( 'SELECT * FROM corte_de_caja($1, $2)', [sucursalId, fecha ?? null] ); } } }; }