358 lines
9.3 KiB
JavaScript
358 lines
9.3 KiB
JavaScript
/**
|
|
* DLQ Alert Worker
|
|
* Monitors Dead Letter Queues and sends email alerts when thresholds are exceeded
|
|
*
|
|
* Features:
|
|
* - Monitors multiple DLQs (partner_tasks, jobs, etc.)
|
|
* - Threshold-based email alerts (warning, critical)
|
|
* - Alert throttling to prevent spam
|
|
* - Works with global DLQ architecture
|
|
*/
|
|
|
|
'use strict';
|
|
|
|
const logger = require('../helpers/logger');
|
|
const pino = logger.child('dlq_alert_worker');
|
|
const env = require('../helpers/env.js');
|
|
const amqp = require('amqplib');
|
|
const mailer = require('../helpers/mailer');
|
|
|
|
// Configuration from environment variables
|
|
const CHECK_INTERVAL_MS = parseInt(env.DLQ_ALERT_INTERVAL_MS) || 300000; // 5 minutes
|
|
const WARNING_THRESHOLD = parseInt(env.DLQ_ALERT_THRESHOLD) || 20;
|
|
const CRITICAL_THRESHOLD = parseInt(env.DLQ_ALERT_CRITICAL) || 50;
|
|
const ALERT_THROTTLE_MS = 3600000; // 1 hour - minimum time between similar alerts
|
|
|
|
// Queues to monitor
|
|
const QUEUES_TO_MONITOR = [
|
|
env.PRODUCTION ? 'partner_tasks' : 'dev_partner_tasks',
|
|
env.PRODUCTION ? 'jobs' : 'dev_jobs'
|
|
];
|
|
|
|
class DLQAlertWorker {
|
|
constructor() {
|
|
this.connection = null;
|
|
this.channel = null;
|
|
this.isRunning = false;
|
|
this.checkInterval = null;
|
|
this.lastAlerts = {}; // Track last alert time per queue/severity
|
|
}
|
|
|
|
/**
|
|
* Start the alert worker
|
|
*/
|
|
async start() {
|
|
try {
|
|
if (!env.DLQ_ALERT_ENABLED || env.NO_EMAIL_MODE) {
|
|
pino.info('DLQ alerts disabled (DLQ_ALERT_ENABLED=false or NO_EMAIL_MODE=true)');
|
|
return;
|
|
}
|
|
|
|
pino.info('Starting DLQ Alert Worker...');
|
|
|
|
await this.connect();
|
|
|
|
this.isRunning = true;
|
|
this.startPeriodicCheck();
|
|
|
|
pino.info(`DLQ Alert Worker started, checking every ${CHECK_INTERVAL_MS}ms`);
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Failed to start DLQ Alert Worker');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Connect to RabbitMQ
|
|
*/
|
|
async connect() {
|
|
const conOps = {
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD,
|
|
vhost: env.QUEUE_VHOST || '/',
|
|
heartbeat: env.QUEUE_HEARTBEAT || 0
|
|
};
|
|
|
|
this.connection = await amqp.connect(conOps);
|
|
this.channel = await this.connection.createChannel();
|
|
|
|
this.connection.on('error', (err) => {
|
|
pino.error({ err }, 'Connection error');
|
|
this.reconnect();
|
|
});
|
|
|
|
this.connection.on('close', () => {
|
|
pino.warn('Connection closed, reconnecting...');
|
|
this.reconnect();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Reconnect to RabbitMQ
|
|
*/
|
|
async reconnect() {
|
|
if (this.isRunning) {
|
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
|
try {
|
|
await this.connect();
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Reconnection failed');
|
|
this.reconnect();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start periodic DLQ checks
|
|
*/
|
|
startPeriodicCheck() {
|
|
this.checkInterval = setInterval(async () => {
|
|
try {
|
|
await this.checkAllQueues();
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error during periodic check');
|
|
}
|
|
}, CHECK_INTERVAL_MS);
|
|
|
|
// Run initial check immediately
|
|
this.checkAllQueues().catch(err => {
|
|
pino.error({ err }, 'Error during initial check');
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check all monitored queues
|
|
*/
|
|
async checkAllQueues() {
|
|
for (const queueName of QUEUES_TO_MONITOR) {
|
|
try {
|
|
await this.checkQueue(queueName);
|
|
} catch (error) {
|
|
pino.error({ err: error, queue: queueName }, 'Error checking queue');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check a specific queue and send alerts if needed
|
|
*/
|
|
async checkQueue(queueName) {
|
|
const dlqName = `${queueName}_failed`;
|
|
|
|
try {
|
|
// Get DLQ stats
|
|
await this.channel.assertQueue(dlqName, { durable: true });
|
|
const queueInfo = await this.channel.checkQueue(dlqName);
|
|
const messageCount = queueInfo.messageCount;
|
|
|
|
pino.debug({ queue: queueName, messageCount }, 'DLQ check');
|
|
|
|
// Determine alert level
|
|
let alertLevel = null;
|
|
if (messageCount >= CRITICAL_THRESHOLD) {
|
|
alertLevel = 'critical';
|
|
} else if (messageCount >= WARNING_THRESHOLD) {
|
|
alertLevel = 'warning';
|
|
}
|
|
|
|
// Send alert if threshold exceeded and not throttled
|
|
if (alertLevel && this.shouldSendAlert(queueName, alertLevel, messageCount)) {
|
|
await this.sendAlert(queueName, alertLevel, messageCount);
|
|
this.recordAlert(queueName, alertLevel, messageCount);
|
|
}
|
|
|
|
} catch (error) {
|
|
if (error.message && error.message.includes('NOT_FOUND')) {
|
|
pino.debug({ queue: dlqName }, 'DLQ does not exist yet');
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if alert should be sent (not throttled)
|
|
*/
|
|
shouldSendAlert(queueName, level, currentCount) {
|
|
const key = `${queueName}:${level}`;
|
|
const lastAlert = this.lastAlerts[key];
|
|
|
|
if (!lastAlert) {
|
|
return true; // First alert
|
|
}
|
|
|
|
const timeSinceLastAlert = Date.now() - lastAlert.timestamp;
|
|
|
|
// Send if throttle period passed
|
|
if (timeSinceLastAlert >= ALERT_THROTTLE_MS) {
|
|
return true;
|
|
}
|
|
|
|
// Send if count significantly increased (doubled since last alert)
|
|
if (currentCount >= lastAlert.count * 2) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Record that an alert was sent
|
|
*/
|
|
recordAlert(queueName, level, count) {
|
|
const key = `${queueName}:${level}`;
|
|
this.lastAlerts[key] = {
|
|
timestamp: Date.now(),
|
|
count: count
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Send email alert
|
|
*/
|
|
async sendAlert(queueName, level, messageCount) {
|
|
const subject = level === 'critical'
|
|
? `🚨 CRITICAL: DLQ Alert - ${queueName} (${messageCount} messages)`
|
|
: `⚠️ WARNING: DLQ Alert - ${queueName} (${messageCount} messages)`;
|
|
|
|
const dashboardUrl = `${env.APP_URL}/dlq-monitor.html`;
|
|
const apiStatsUrl = `${env.APP_URL}/api/dlq/${queueName}/stats`;
|
|
|
|
const htmlBody = `
|
|
<h2>${level === 'critical' ? '🚨 CRITICAL' : '⚠️ WARNING'}: Dead Letter Queue Alert</h2>
|
|
|
|
<p><strong>Queue:</strong> ${queueName}</p>
|
|
<p><strong>DLQ Messages:</strong> ${messageCount}</p>
|
|
<p><strong>Threshold:</strong> ${level === 'critical' ? CRITICAL_THRESHOLD : WARNING_THRESHOLD}</p>
|
|
<p><strong>Time:</strong> ${new Date().toISOString()}</p>
|
|
|
|
<h3>Recommended Actions:</h3>
|
|
<ul>
|
|
<li>Review failed messages in the <a href="${dashboardUrl}">DLQ Dashboard</a></li>
|
|
<li>Check <a href="${apiStatsUrl}">DLQ statistics</a> for details</li>
|
|
<li>Investigate root cause of failures</li>
|
|
<li>Retry messages after fixing issues: <code>POST /api/dlq/${queueName}/retryAll</code></li>
|
|
</ul>
|
|
|
|
<h3>Alert Thresholds:</h3>
|
|
<ul>
|
|
<li>Warning: ${WARNING_THRESHOLD} messages</li>
|
|
<li>Critical: ${CRITICAL_THRESHOLD} messages</li>
|
|
</ul>
|
|
|
|
<p><small>To disable these alerts, set DLQ_ALERT_ENABLED=false in environment config.</small></p>
|
|
`;
|
|
|
|
try {
|
|
await mailer.sendMail({
|
|
to: env.AGM_ADM_EMAIL,
|
|
subject: subject,
|
|
html: htmlBody
|
|
});
|
|
|
|
pino.info({
|
|
queue: queueName,
|
|
level,
|
|
messageCount
|
|
}, 'Alert email sent');
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error, queue: queueName }, 'Failed to send alert email');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop the alert worker
|
|
*/
|
|
async stop() {
|
|
pino.info('Stopping DLQ Alert Worker...');
|
|
|
|
this.isRunning = false;
|
|
|
|
if (this.checkInterval) {
|
|
clearInterval(this.checkInterval);
|
|
this.checkInterval = null;
|
|
}
|
|
|
|
if (this.channel) {
|
|
await this.channel.close().catch(() => {});
|
|
this.channel = null;
|
|
}
|
|
|
|
if (this.connection) {
|
|
await this.connection.close().catch(() => {});
|
|
this.connection = null;
|
|
}
|
|
|
|
pino.info('DLQ Alert Worker stopped');
|
|
}
|
|
|
|
/**
|
|
* Get current status
|
|
*/
|
|
async getStatus() {
|
|
const queues = [];
|
|
|
|
for (const queueName of QUEUES_TO_MONITOR) {
|
|
const dlqName = `${queueName}_failed`;
|
|
try {
|
|
await this.channel.assertQueue(dlqName, { durable: true });
|
|
const queueInfo = await this.channel.checkQueue(dlqName);
|
|
|
|
queues.push({
|
|
queueName,
|
|
dlqName,
|
|
messageCount: queueInfo.messageCount,
|
|
status: queueInfo.messageCount >= CRITICAL_THRESHOLD ? 'critical' :
|
|
queueInfo.messageCount >= WARNING_THRESHOLD ? 'warning' : 'ok'
|
|
});
|
|
} catch (error) {
|
|
queues.push({
|
|
queueName,
|
|
dlqName,
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
isRunning: this.isRunning,
|
|
checkIntervalMs: CHECK_INTERVAL_MS,
|
|
thresholds: {
|
|
warning: WARNING_THRESHOLD,
|
|
critical: CRITICAL_THRESHOLD
|
|
},
|
|
queues
|
|
};
|
|
}
|
|
}
|
|
|
|
// Graceful shutdown
|
|
let worker = null;
|
|
|
|
async function shutdown(signal) {
|
|
pino.info(`Received ${signal}, shutting down gracefully...`);
|
|
if (worker) {
|
|
await worker.stop();
|
|
}
|
|
process.exit(0);
|
|
}
|
|
|
|
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
|
process.on('SIGINT', () => shutdown('SIGINT'));
|
|
|
|
// Start worker if run directly
|
|
if (require.main === module) {
|
|
worker = new DLQAlertWorker();
|
|
|
|
worker.start().catch(error => {
|
|
pino.error({ err: error }, 'Fatal error starting DLQ Alert Worker');
|
|
process.exit(1);
|
|
});
|
|
}
|
|
|
|
module.exports = DLQAlertWorker;
|