375 lines
12 KiB
JavaScript
375 lines
12 KiB
JavaScript
'use strict';
|
|
|
|
const amqp = require('amqplib');
|
|
const env = require('./env');
|
|
const pino = require('./logger').child('dlq_queue_setup');
|
|
|
|
/**
|
|
* DLQ Queue Setup Helper Module
|
|
*
|
|
* Provides reusable functions to set up Dead Letter Queue (DLQ) infrastructure
|
|
* for any RabbitMQ queue with configurable TTL and archival routing.
|
|
*
|
|
* Architecture:
|
|
* - Main Queue → DLQ (with TTL) → Archive Exchange → Archive Queue → Filesystem
|
|
*
|
|
* Usage:
|
|
* const { setupDLQQueues, getDLQConnection } = require('../helpers/dlq_queue_setup');
|
|
*
|
|
* // In worker startup:
|
|
* const { connection, channel } = await setupDLQQueues('partner_tasks');
|
|
*/
|
|
|
|
/**
|
|
* Create and return a RabbitMQ connection
|
|
* @param {Object} options - Connection options (optional, uses env defaults)
|
|
* @returns {Promise<Object>} RabbitMQ connection
|
|
*/
|
|
async function getDLQConnection(options = {}) {
|
|
const conOps = {
|
|
protocol: 'amqp',
|
|
hostname: options.hostname || env.QUEUE_HOST || 'localhost',
|
|
port: options.port || env.QUEUE_PORT || 5672,
|
|
username: options.username || env.QUEUE_USR || 'agmuser',
|
|
password: options.password || env.QUEUE_PWD,
|
|
vhost: options.vhost || env.QUEUE_VHOST || '/',
|
|
heartbeat: options.heartbeat || env.QUEUE_HEARTBEAT || 580,
|
|
frameMax: options.frameMax || 0
|
|
};
|
|
|
|
const connection = await amqp.connect(conOps);
|
|
|
|
connection.on('error', (err) => {
|
|
pino.error({ err }, '[DLQ Setup] Connection error');
|
|
});
|
|
|
|
connection.on('close', () => {
|
|
pino.warn('[DLQ Setup] Connection closed');
|
|
});
|
|
|
|
return connection;
|
|
}
|
|
|
|
/**
|
|
* Set up complete DLQ infrastructure for a queue
|
|
*
|
|
* @param {string} queueName - Name of the main queue (e.g., 'partner_tasks')
|
|
* @param {Object} options - Configuration options
|
|
* @param {Object} options.connection - Existing AMQP connection (optional)
|
|
* @param {number} options.retentionDays - DLQ retention in days (default: env.DLQ_RETENTION_DAYS)
|
|
* @param {boolean} options.durable - Queue durability (default: true)
|
|
* @param {number} options.prefetch - Consumer prefetch count (default: 1)
|
|
* @param {Object} options.queueArgs - Additional queue arguments (optional)
|
|
*
|
|
* @returns {Promise<Object>} { connection, channel, queueNames: { main, dlq, archive } }
|
|
*/
|
|
async function setupDLQQueues(queueName, options = {}) {
|
|
const {
|
|
connection: existingConnection,
|
|
retentionDays = env.DLQ_RETENTION_DAYS || 365,
|
|
durable = true,
|
|
prefetch = 1,
|
|
queueArgs = {}
|
|
} = options;
|
|
|
|
// Use existing connection or create new one
|
|
const connection = existingConnection || await getDLQConnection();
|
|
const channel = await connection.createChannel();
|
|
|
|
// Set prefetch for fair dispatch
|
|
channel.prefetch(prefetch);
|
|
|
|
// Calculate TTL in milliseconds
|
|
const retentionMs = retentionDays * 24 * 60 * 60 * 1000;
|
|
|
|
// Queue names - use the `_failed` suffix as the canonical DLQ name
|
|
const dlqName = `${queueName}_failed`;
|
|
const archiveExchange = `${queueName}_archive_exchange`;
|
|
const archiveQueue = `${queueName}_archive`;
|
|
|
|
pino.info(`[DLQ Setup] Setting up DLQ infrastructure for queue: ${queueName}`);
|
|
pino.info(`[DLQ Setup] Retention period: ${retentionDays} days (${retentionMs}ms)`);
|
|
|
|
try {
|
|
// 1. Create archive exchange (fanout - broadcasts to all bound queues)
|
|
await channel.assertExchange(archiveExchange, 'fanout', {
|
|
durable: true
|
|
});
|
|
pino.debug(`[DLQ Setup] Created archive exchange: ${archiveExchange}`);
|
|
|
|
// 2. Create archive queue (receives expired DLQ messages)
|
|
await channel.assertQueue(archiveQueue, {
|
|
durable: true
|
|
});
|
|
pino.debug(`[DLQ Setup] Created archive queue: ${archiveQueue}`);
|
|
|
|
// 3. Bind archive queue to archive exchange
|
|
await channel.bindQueue(archiveQueue, archiveExchange, '');
|
|
pino.debug(`[DLQ Setup] Bound archive queue to exchange`);
|
|
|
|
// We always use the `<queue>_failed` DLQ name per DLQ_SYSTEM_GUIDE.md
|
|
pino.debug(`[DLQ Setup] Using DLQ name: ${dlqName}`);
|
|
|
|
// 4. Ensure DLQ exists (create if not present)
|
|
try {
|
|
// If queue exists this will succeed, otherwise create it with desired args
|
|
try {
|
|
await channel.checkQueue(dlqName);
|
|
pino.debug(`[DLQ Setup] DLQ already exists: ${dlqName}`);
|
|
} catch (e) {
|
|
// Not found — create DLQ with TTL and dead-letter routing to archive
|
|
await channel.assertQueue(dlqName, {
|
|
durable: true,
|
|
arguments: {
|
|
'x-message-ttl': retentionMs, // Messages expire after retention period
|
|
'x-dead-letter-exchange': archiveExchange, // Route expired messages to archive
|
|
...queueArgs
|
|
}
|
|
});
|
|
pino.debug(`[DLQ Setup] Created DLQ: ${dlqName} (TTL: ${retentionMs}ms)`);
|
|
}
|
|
|
|
} catch (err) {
|
|
pino.error({ err }, '[DLQ Setup] Failed to ensure DLQ exists');
|
|
throw err;
|
|
}
|
|
|
|
// 5. Create main queue with DLQ routing only if it does not already exist
|
|
try {
|
|
try {
|
|
await channel.checkQueue(queueName);
|
|
pino.debug(`[DLQ Setup] Main queue already exists: ${queueName} — leaving existing arguments intact`);
|
|
} catch (e) {
|
|
// Queue not found — safe to create with DLQ routing arguments
|
|
await channel.assertQueue(queueName, {
|
|
durable,
|
|
arguments: {
|
|
'x-dead-letter-exchange': '', // Use default exchange
|
|
'x-dead-letter-routing-key': dlqName, // Route failed messages to DLQ
|
|
...queueArgs
|
|
}
|
|
});
|
|
pino.debug(`[DLQ Setup] Created main queue: ${queueName} with DLQ routing to ${dlqName}`);
|
|
}
|
|
} catch (error) {
|
|
// If a PRECONDITION_FAILED still occurs (race with another creator) try to proceed without re-declaring
|
|
if (error && error.message && error.message.includes('PRECONDITION_FAILED')) {
|
|
pino.warn({ err: error }, `[DLQ Setup] PRECONDITION_FAILED while declaring ${queueName}; assuming existing queue configuration will be used`);
|
|
try {
|
|
await channel.checkQueue(queueName);
|
|
} catch (chkErr) {
|
|
pino.error({ err: chkErr }, `[DLQ Setup] checkQueue failed after PRECONDITION_FAILED for ${queueName}`);
|
|
throw chkErr;
|
|
}
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
pino.info(`[DLQ Setup] Successfully configured DLQ infrastructure for ${queueName}`);
|
|
pino.info(`[DLQ Setup] Flow: ${queueName} → ${dlqName} (${retentionDays}d TTL) → ${archiveQueue}`);
|
|
|
|
return {
|
|
connection,
|
|
channel,
|
|
queueNames: {
|
|
main: queueName,
|
|
dlq: dlqName,
|
|
archive: archiveQueue,
|
|
archiveExchange
|
|
}
|
|
};
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error, queueName }, '[DLQ Setup] Failed to set up DLQ infrastructure');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get queue statistics
|
|
*
|
|
* @param {Object} channel - AMQP channel
|
|
* @param {string} queueName - Name of the queue
|
|
* @returns {Promise<Object>} Queue statistics { messageCount, consumerCount }
|
|
*/
|
|
async function getQueueStats(channel, queueName) {
|
|
try {
|
|
const queueInfo = await channel.checkQueue(queueName);
|
|
return {
|
|
messageCount: queueInfo.messageCount,
|
|
consumerCount: queueInfo.consumerCount
|
|
};
|
|
} catch (error) {
|
|
pino.error({ err: error, queueName }, '[DLQ Setup] Failed to get queue stats');
|
|
return {
|
|
messageCount: -1,
|
|
consumerCount: 0,
|
|
error: error.message
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Enrich message with DLQ metadata headers
|
|
*
|
|
* @param {Object} taskInfo - Task information object
|
|
* @param {Error} error - Error object (optional)
|
|
* @param {Object} additionalHeaders - Additional headers to include (optional)
|
|
* @returns {Object} Message headers for DLQ
|
|
*
|
|
* Headers are for filtering/identification only. Task data contains all fields for reprocessing.
|
|
*
|
|
* Diagnostic Headers (All Queues):
|
|
* - x-first-death-time: Timestamp of failure
|
|
* - x-task-type: Type of task being processed
|
|
* - x-error-category: Error classification
|
|
* - x-error-reason: Error message
|
|
* - x-severity: Alert severity
|
|
*
|
|
* Context Headers (partner_tasks queue):
|
|
* - x-partner-code: Partner identifier (for filtering)
|
|
* - x-customer-id: Customer ObjectId (for filtering)
|
|
*/
|
|
function createDLQHeaders(taskInfo, error = null, additionalHeaders = {}) {
|
|
const headers = {
|
|
'x-first-death-time': new Date().toISOString(),
|
|
'x-task-type': taskInfo.taskType || 'unknown',
|
|
...additionalHeaders
|
|
};
|
|
|
|
// Add partner context headers for filtering (partner_tasks queue only)
|
|
if (taskInfo.partnerCode) headers['x-partner-code'] = taskInfo.partnerCode;
|
|
if (taskInfo.customerId) headers['x-customer-id'] = String(taskInfo.customerId);
|
|
|
|
// Add error metadata if error provided
|
|
if (error) {
|
|
const errorMessage = error.message || String(error);
|
|
headers['x-error-reason'] = errorMessage.substring(0, 255); // Limit length
|
|
headers['x-error-category'] = categorizeError(errorMessage);
|
|
headers['x-severity'] = calculateSeverity(errorMessage);
|
|
}
|
|
|
|
return headers;
|
|
}
|
|
|
|
/**
|
|
* Categorize error type based on error message
|
|
*/
|
|
function categorizeError(errorMessage) {
|
|
if (!errorMessage) return 'unknown';
|
|
|
|
const msg = errorMessage.toLowerCase();
|
|
|
|
// Infrastructure errors (check before transient to catch "database connection")
|
|
if (msg.includes('database') ||
|
|
msg.includes('mongo') ||
|
|
msg.includes('filesystem') ||
|
|
msg.includes('disk')) {
|
|
return 'infrastructure';
|
|
}
|
|
|
|
// Transient errors (network, timeouts)
|
|
if (msg.includes('timeout') ||
|
|
msg.includes('econnrefused') ||
|
|
msg.includes('enotfound') ||
|
|
msg.includes('network') ||
|
|
msg.includes('connection')) {
|
|
return 'transient';
|
|
}
|
|
|
|
// Validation errors
|
|
if (msg.includes('validation') ||
|
|
msg.includes('invalid') ||
|
|
msg.includes('required') ||
|
|
msg.includes('missing') ||
|
|
msg.includes('format')) {
|
|
return 'validation';
|
|
}
|
|
|
|
// Processing errors
|
|
if (msg.includes('parse') ||
|
|
msg.includes('calculation') ||
|
|
msg.includes('processing') ||
|
|
msg.includes('data')) {
|
|
return 'processing';
|
|
}
|
|
|
|
// Partner API errors
|
|
if (msg.includes('api') ||
|
|
msg.includes('authentication') ||
|
|
msg.includes('unauthorized') ||
|
|
msg.includes('rate limit')) {
|
|
return 'partner_api';
|
|
}
|
|
|
|
return 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Calculate message severity based on error type
|
|
*/
|
|
function calculateSeverity(errorMessage) {
|
|
if (!errorMessage) return 'low';
|
|
|
|
const msg = errorMessage.toLowerCase();
|
|
|
|
// Critical - data loss or corruption
|
|
if (msg.includes('corrupt') ||
|
|
msg.includes('data loss') ||
|
|
msg.includes('fatal')) {
|
|
return 'critical';
|
|
}
|
|
|
|
// High - business impact
|
|
if (msg.includes('authentication') ||
|
|
msg.includes('authorization') ||
|
|
msg.includes('database') ||
|
|
msg.includes('disk full')) {
|
|
return 'high';
|
|
}
|
|
|
|
// Medium - retryable errors
|
|
if (msg.includes('timeout') ||
|
|
msg.includes('connection') ||
|
|
msg.includes('rate limit')) {
|
|
return 'medium';
|
|
}
|
|
|
|
// Low - validation or expected errors
|
|
return 'low';
|
|
}
|
|
|
|
/**
|
|
* Close connection and channel safely
|
|
*/
|
|
async function closeConnection(connection, channel) {
|
|
if (channel) {
|
|
try {
|
|
await channel.close();
|
|
pino.debug('[DLQ Setup] Channel closed');
|
|
} catch (err) {
|
|
pino.debug({ err }, '[DLQ Setup] Error closing channel (may already be closed)');
|
|
}
|
|
}
|
|
|
|
if (connection) {
|
|
try {
|
|
await connection.close();
|
|
pino.debug('[DLQ Setup] Connection closed');
|
|
} catch (err) {
|
|
pino.debug({ err }, '[DLQ Setup] Error closing connection (may already be closed)');
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
setupDLQQueues,
|
|
getDLQConnection,
|
|
getQueueStats,
|
|
createDLQHeaders,
|
|
categorizeError,
|
|
calculateSeverity,
|
|
closeConnection
|
|
};
|