/** * DLQ Alert Worker * Monitors Dead Letter Queues and sends email alerts when thresholds are exceeded * * Features: * - Monitors multiple DLQs (partner_tasks, jobs, etc.) * - Threshold-based email alerts (warning, critical) * - Alert throttling to prevent spam * - Works with global DLQ architecture */ 'use strict'; const logger = require('../helpers/logger'); const pino = logger.child('dlq_alert_worker'); const env = require('../helpers/env.js'); const amqp = require('amqplib'); const mailer = require('../helpers/mailer'); // Configuration from environment variables const CHECK_INTERVAL_MS = parseInt(env.DLQ_ALERT_INTERVAL_MS) || 300000; // 5 minutes const WARNING_THRESHOLD = parseInt(env.DLQ_ALERT_THRESHOLD) || 20; const CRITICAL_THRESHOLD = parseInt(env.DLQ_ALERT_CRITICAL) || 50; const ALERT_THROTTLE_MS = 3600000; // 1 hour - minimum time between similar alerts // Queues to monitor const QUEUES_TO_MONITOR = [ env.PRODUCTION ? 'partner_tasks' : 'dev_partner_tasks', env.PRODUCTION ? 'jobs' : 'dev_jobs' ]; class DLQAlertWorker { constructor() { this.connection = null; this.channel = null; this.isRunning = false; this.checkInterval = null; this.lastAlerts = {}; // Track last alert time per queue/severity } /** * Start the alert worker */ async start() { try { if (!env.DLQ_ALERT_ENABLED || env.NO_EMAIL_MODE) { pino.info('DLQ alerts disabled (DLQ_ALERT_ENABLED=false or NO_EMAIL_MODE=true)'); return; } pino.info('Starting DLQ Alert Worker...'); await this.connect(); this.isRunning = true; this.startPeriodicCheck(); pino.info(`DLQ Alert Worker started, checking every ${CHECK_INTERVAL_MS}ms`); } catch (error) { pino.error({ err: error }, 'Failed to start DLQ Alert Worker'); throw error; } } /** * Connect to RabbitMQ */ async connect() { const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR, password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 0 }; this.connection = await amqp.connect(conOps); this.channel = await this.connection.createChannel(); this.connection.on('error', (err) => { pino.error({ err }, 'Connection error'); this.reconnect(); }); this.connection.on('close', () => { pino.warn('Connection closed, reconnecting...'); this.reconnect(); }); } /** * Reconnect to RabbitMQ */ async reconnect() { if (this.isRunning) { await new Promise(resolve => setTimeout(resolve, 5000)); try { await this.connect(); } catch (error) { pino.error({ err: error }, 'Reconnection failed'); this.reconnect(); } } } /** * Start periodic DLQ checks */ startPeriodicCheck() { this.checkInterval = setInterval(async () => { try { await this.checkAllQueues(); } catch (error) { pino.error({ err: error }, 'Error during periodic check'); } }, CHECK_INTERVAL_MS); // Run initial check immediately this.checkAllQueues().catch(err => { pino.error({ err }, 'Error during initial check'); }); } /** * Check all monitored queues */ async checkAllQueues() { for (const queueName of QUEUES_TO_MONITOR) { try { await this.checkQueue(queueName); } catch (error) { pino.error({ err: error, queue: queueName }, 'Error checking queue'); } } } /** * Check a specific queue and send alerts if needed */ async checkQueue(queueName) { const dlqName = `${queueName}_failed`; try { // Get DLQ stats await this.channel.assertQueue(dlqName, { durable: true }); const queueInfo = await this.channel.checkQueue(dlqName); const messageCount = queueInfo.messageCount; pino.debug({ queue: queueName, messageCount }, 'DLQ check'); // Determine alert level let alertLevel = null; if (messageCount >= CRITICAL_THRESHOLD) { alertLevel = 'critical'; } else if (messageCount >= WARNING_THRESHOLD) { alertLevel = 'warning'; } // Send alert if threshold exceeded and not throttled if (alertLevel && this.shouldSendAlert(queueName, alertLevel, messageCount)) { await this.sendAlert(queueName, alertLevel, messageCount); this.recordAlert(queueName, alertLevel, messageCount); } } catch (error) { if (error.message && error.message.includes('NOT_FOUND')) { pino.debug({ queue: dlqName }, 'DLQ does not exist yet'); } else { throw error; } } } /** * Check if alert should be sent (not throttled) */ shouldSendAlert(queueName, level, currentCount) { const key = `${queueName}:${level}`; const lastAlert = this.lastAlerts[key]; if (!lastAlert) { return true; // First alert } const timeSinceLastAlert = Date.now() - lastAlert.timestamp; // Send if throttle period passed if (timeSinceLastAlert >= ALERT_THROTTLE_MS) { return true; } // Send if count significantly increased (doubled since last alert) if (currentCount >= lastAlert.count * 2) { return true; } return false; } /** * Record that an alert was sent */ recordAlert(queueName, level, count) { const key = `${queueName}:${level}`; this.lastAlerts[key] = { timestamp: Date.now(), count: count }; } /** * Send email alert */ async sendAlert(queueName, level, messageCount) { const subject = level === 'critical' ? `🚨 CRITICAL: DLQ Alert - ${queueName} (${messageCount} messages)` : `⚠️ WARNING: DLQ Alert - ${queueName} (${messageCount} messages)`; const dashboardUrl = `${env.APP_URL}/dlq-monitor.html`; const apiStatsUrl = `${env.APP_URL}/api/dlq/${queueName}/stats`; const htmlBody = `

${level === 'critical' ? '🚨 CRITICAL' : '⚠️ WARNING'}: Dead Letter Queue Alert

Queue: ${queueName}

DLQ Messages: ${messageCount}

Threshold: ${level === 'critical' ? CRITICAL_THRESHOLD : WARNING_THRESHOLD}

Time: ${new Date().toISOString()}

Recommended Actions:

Alert Thresholds:

To disable these alerts, set DLQ_ALERT_ENABLED=false in environment config.

`; try { await mailer.sendMail({ to: env.AGM_ADM_EMAIL, subject: subject, html: htmlBody }); pino.info({ queue: queueName, level, messageCount }, 'Alert email sent'); } catch (error) { pino.error({ err: error, queue: queueName }, 'Failed to send alert email'); } } /** * Stop the alert worker */ async stop() { pino.info('Stopping DLQ Alert Worker...'); this.isRunning = false; if (this.checkInterval) { clearInterval(this.checkInterval); this.checkInterval = null; } if (this.channel) { await this.channel.close().catch(() => {}); this.channel = null; } if (this.connection) { await this.connection.close().catch(() => {}); this.connection = null; } pino.info('DLQ Alert Worker stopped'); } /** * Get current status */ async getStatus() { const queues = []; for (const queueName of QUEUES_TO_MONITOR) { const dlqName = `${queueName}_failed`; try { await this.channel.assertQueue(dlqName, { durable: true }); const queueInfo = await this.channel.checkQueue(dlqName); queues.push({ queueName, dlqName, messageCount: queueInfo.messageCount, status: queueInfo.messageCount >= CRITICAL_THRESHOLD ? 'critical' : queueInfo.messageCount >= WARNING_THRESHOLD ? 'warning' : 'ok' }); } catch (error) { queues.push({ queueName, dlqName, error: error.message }); } } return { isRunning: this.isRunning, checkIntervalMs: CHECK_INTERVAL_MS, thresholds: { warning: WARNING_THRESHOLD, critical: CRITICAL_THRESHOLD }, queues }; } } // Graceful shutdown let worker = null; async function shutdown(signal) { pino.info(`Received ${signal}, shutting down gracefully...`); if (worker) { await worker.stop(); } process.exit(0); } process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); // Start worker if run directly if (require.main === module) { worker = new DLQAlertWorker(); worker.start().catch(error => { pino.error({ err: error }, 'Fatal error starting DLQ Alert Worker'); process.exit(1); }); } module.exports = DLQAlertWorker;