'use strict'; const amqp = require('amqplib'); const env = require('./env'); const pino = require('./logger').child('dlq_queue_setup'); /** * DLQ Queue Setup Helper Module * * Provides reusable functions to set up Dead Letter Queue (DLQ) infrastructure * for any RabbitMQ queue with configurable TTL and archival routing. * * Architecture: * - Main Queue → DLQ (with TTL) → Archive Exchange → Archive Queue → Filesystem * * Usage: * const { setupDLQQueues, getDLQConnection } = require('../helpers/dlq_queue_setup'); * * // In worker startup: * const { connection, channel } = await setupDLQQueues('partner_tasks'); */ /** * Create and return a RabbitMQ connection * @param {Object} options - Connection options (optional, uses env defaults) * @returns {Promise} RabbitMQ connection */ async function getDLQConnection(options = {}) { const conOps = { protocol: 'amqp', hostname: options.hostname || env.QUEUE_HOST || 'localhost', port: options.port || env.QUEUE_PORT || 5672, username: options.username || env.QUEUE_USR || 'agmuser', password: options.password || env.QUEUE_PWD, vhost: options.vhost || env.QUEUE_VHOST || '/', heartbeat: options.heartbeat || env.QUEUE_HEARTBEAT || 580, frameMax: options.frameMax || 0 }; const connection = await amqp.connect(conOps); connection.on('error', (err) => { pino.error({ err }, '[DLQ Setup] Connection error'); }); connection.on('close', () => { pino.warn('[DLQ Setup] Connection closed'); }); return connection; } /** * Set up complete DLQ infrastructure for a queue * * @param {string} queueName - Name of the main queue (e.g., 'partner_tasks') * @param {Object} options - Configuration options * @param {Object} options.connection - Existing AMQP connection (optional) * @param {number} options.retentionDays - DLQ retention in days (default: env.DLQ_RETENTION_DAYS) * @param {boolean} options.durable - Queue durability (default: true) * @param {number} options.prefetch - Consumer prefetch count (default: 1) * @param {Object} options.queueArgs - Additional queue arguments (optional) * * @returns {Promise} { connection, channel, queueNames: { main, dlq, archive } } */ async function setupDLQQueues(queueName, options = {}) { const { connection: existingConnection, retentionDays = env.DLQ_RETENTION_DAYS || 365, durable = true, prefetch = 1, queueArgs = {} } = options; // Use existing connection or create new one const connection = existingConnection || await getDLQConnection(); const channel = await connection.createChannel(); // Set prefetch for fair dispatch channel.prefetch(prefetch); // Calculate TTL in milliseconds const retentionMs = retentionDays * 24 * 60 * 60 * 1000; // Queue names - use the `_failed` suffix as the canonical DLQ name const dlqName = `${queueName}_failed`; const archiveExchange = `${queueName}_archive_exchange`; const archiveQueue = `${queueName}_archive`; pino.info(`[DLQ Setup] Setting up DLQ infrastructure for queue: ${queueName}`); pino.info(`[DLQ Setup] Retention period: ${retentionDays} days (${retentionMs}ms)`); try { // 1. Create archive exchange (fanout - broadcasts to all bound queues) await channel.assertExchange(archiveExchange, 'fanout', { durable: true }); pino.debug(`[DLQ Setup] Created archive exchange: ${archiveExchange}`); // 2. Create archive queue (receives expired DLQ messages) await channel.assertQueue(archiveQueue, { durable: true }); pino.debug(`[DLQ Setup] Created archive queue: ${archiveQueue}`); // 3. Bind archive queue to archive exchange await channel.bindQueue(archiveQueue, archiveExchange, ''); pino.debug(`[DLQ Setup] Bound archive queue to exchange`); // We always use the `_failed` DLQ name per DLQ_SYSTEM_GUIDE.md pino.debug(`[DLQ Setup] Using DLQ name: ${dlqName}`); // 4. Ensure DLQ exists (create if not present) try { // If queue exists this will succeed, otherwise create it with desired args try { await channel.checkQueue(dlqName); pino.debug(`[DLQ Setup] DLQ already exists: ${dlqName}`); } catch (e) { // Not found — create DLQ with TTL and dead-letter routing to archive await channel.assertQueue(dlqName, { durable: true, arguments: { 'x-message-ttl': retentionMs, // Messages expire after retention period 'x-dead-letter-exchange': archiveExchange, // Route expired messages to archive ...queueArgs } }); pino.debug(`[DLQ Setup] Created DLQ: ${dlqName} (TTL: ${retentionMs}ms)`); } } catch (err) { pino.error({ err }, '[DLQ Setup] Failed to ensure DLQ exists'); throw err; } // 5. Create main queue with DLQ routing only if it does not already exist try { try { await channel.checkQueue(queueName); pino.debug(`[DLQ Setup] Main queue already exists: ${queueName} — leaving existing arguments intact`); } catch (e) { // Queue not found — safe to create with DLQ routing arguments await channel.assertQueue(queueName, { durable, arguments: { 'x-dead-letter-exchange': '', // Use default exchange 'x-dead-letter-routing-key': dlqName, // Route failed messages to DLQ ...queueArgs } }); pino.debug(`[DLQ Setup] Created main queue: ${queueName} with DLQ routing to ${dlqName}`); } } catch (error) { // If a PRECONDITION_FAILED still occurs (race with another creator) try to proceed without re-declaring if (error && error.message && error.message.includes('PRECONDITION_FAILED')) { pino.warn({ err: error }, `[DLQ Setup] PRECONDITION_FAILED while declaring ${queueName}; assuming existing queue configuration will be used`); try { await channel.checkQueue(queueName); } catch (chkErr) { pino.error({ err: chkErr }, `[DLQ Setup] checkQueue failed after PRECONDITION_FAILED for ${queueName}`); throw chkErr; } } else { throw error; } } pino.info(`[DLQ Setup] Successfully configured DLQ infrastructure for ${queueName}`); pino.info(`[DLQ Setup] Flow: ${queueName} → ${dlqName} (${retentionDays}d TTL) → ${archiveQueue}`); return { connection, channel, queueNames: { main: queueName, dlq: dlqName, archive: archiveQueue, archiveExchange } }; } catch (error) { pino.error({ err: error, queueName }, '[DLQ Setup] Failed to set up DLQ infrastructure'); throw error; } } /** * Get queue statistics * * @param {Object} channel - AMQP channel * @param {string} queueName - Name of the queue * @returns {Promise} Queue statistics { messageCount, consumerCount } */ async function getQueueStats(channel, queueName) { try { const queueInfo = await channel.checkQueue(queueName); return { messageCount: queueInfo.messageCount, consumerCount: queueInfo.consumerCount }; } catch (error) { pino.error({ err: error, queueName }, '[DLQ Setup] Failed to get queue stats'); return { messageCount: -1, consumerCount: 0, error: error.message }; } } /** * Enrich message with DLQ metadata headers * * @param {Object} taskInfo - Task information object * @param {Error} error - Error object (optional) * @param {Object} additionalHeaders - Additional headers to include (optional) * @returns {Object} Message headers for DLQ * * Headers are for filtering/identification only. Task data contains all fields for reprocessing. * * Diagnostic Headers (All Queues): * - x-first-death-time: Timestamp of failure * - x-task-type: Type of task being processed * - x-error-category: Error classification * - x-error-reason: Error message * - x-severity: Alert severity * * Context Headers (partner_tasks queue): * - x-partner-code: Partner identifier (for filtering) * - x-customer-id: Customer ObjectId (for filtering) */ function createDLQHeaders(taskInfo, error = null, additionalHeaders = {}) { const headers = { 'x-first-death-time': new Date().toISOString(), 'x-task-type': taskInfo.taskType || 'unknown', ...additionalHeaders }; // Add partner context headers for filtering (partner_tasks queue only) if (taskInfo.partnerCode) headers['x-partner-code'] = taskInfo.partnerCode; if (taskInfo.customerId) headers['x-customer-id'] = String(taskInfo.customerId); // Add error metadata if error provided if (error) { const errorMessage = error.message || String(error); headers['x-error-reason'] = errorMessage.substring(0, 255); // Limit length headers['x-error-category'] = categorizeError(errorMessage); headers['x-severity'] = calculateSeverity(errorMessage); } return headers; } /** * Categorize error type based on error message */ function categorizeError(errorMessage) { if (!errorMessage) return 'unknown'; const msg = errorMessage.toLowerCase(); // Infrastructure errors (check before transient to catch "database connection") if (msg.includes('database') || msg.includes('mongo') || msg.includes('filesystem') || msg.includes('disk')) { return 'infrastructure'; } // Transient errors (network, timeouts) if (msg.includes('timeout') || msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('network') || msg.includes('connection')) { return 'transient'; } // Validation errors if (msg.includes('validation') || msg.includes('invalid') || msg.includes('required') || msg.includes('missing') || msg.includes('format')) { return 'validation'; } // Processing errors if (msg.includes('parse') || msg.includes('calculation') || msg.includes('processing') || msg.includes('data')) { return 'processing'; } // Partner API errors if (msg.includes('api') || msg.includes('authentication') || msg.includes('unauthorized') || msg.includes('rate limit')) { return 'partner_api'; } return 'unknown'; } /** * Calculate message severity based on error type */ function calculateSeverity(errorMessage) { if (!errorMessage) return 'low'; const msg = errorMessage.toLowerCase(); // Critical - data loss or corruption if (msg.includes('corrupt') || msg.includes('data loss') || msg.includes('fatal')) { return 'critical'; } // High - business impact if (msg.includes('authentication') || msg.includes('authorization') || msg.includes('database') || msg.includes('disk full')) { return 'high'; } // Medium - retryable errors if (msg.includes('timeout') || msg.includes('connection') || msg.includes('rate limit')) { return 'medium'; } // Low - validation or expected errors return 'low'; } /** * Close connection and channel safely */ async function closeConnection(connection, channel) { if (channel) { try { await channel.close(); pino.debug('[DLQ Setup] Channel closed'); } catch (err) { pino.debug({ err }, '[DLQ Setup] Error closing channel (may already be closed)'); } } if (connection) { try { await connection.close(); pino.debug('[DLQ Setup] Connection closed'); } catch (err) { pino.debug({ err }, '[DLQ Setup] Error closing connection (may already be closed)'); } } } module.exports = { setupDLQQueues, getDLQConnection, getQueueStats, createDLQHeaders, categorizeError, calculateSeverity, closeConnection };