'use strict'; /** * DLQ Archival Worker * Consumes expired messages from DLQ archive queue and writes them to filesystem * for long-term retention and audit compliance * * Features: * - Automatic archival of TTL-expired DLQ messages * - Organized by date (year/month/day) for easy retrieval * - Preserves full message content, headers, and metadata * - Graceful error handling and recovery */ const logger = require('../helpers/logger'); const pino = logger.child('dlq_archival_worker'); const env = require('../helpers/env.js'); const amqp = require('amqplib'); const fs = require('fs').promises; const path = require('path'); const ARCHIVE_QUEUE = 'partner_tasks_archive'; let connection = null; let channel = null; let isClosing = false; /** * Ensure archive directory exists */ async function ensureArchiveDir() { try { await fs.mkdir(env.DLQ_ARCHIVE_PATH, { recursive: true }); pino.info(`Archive directory ready: ${env.DLQ_ARCHIVE_PATH}`); } catch (error) { pino.error({ err: error }, 'Failed to create archive directory'); throw error; } } /** * Get archive file path organized by date */ function getArchivePath(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); return path.join(env.DLQ_ARCHIVE_PATH, String(year), month, day); } /** * Archive a DLQ message to filesystem */ async function archiveMessage(msg) { try { const timestamp = Date.now(); const taskMsg = JSON.parse(msg.content.toString()); const headers = msg.properties?.headers || {}; // Extract metadata const taskType = headers['x-task-type'] || taskMsg.type || 'unknown'; const errorCategory = headers['x-error-category'] || 'unknown'; const severity = headers['x-severity'] || 'unknown'; // Create archive record const archiveRecord = { archived_at: new Date().toISOString(), timestamp, queue_name: 'partner_tasks', dlq_name: 'partner_tasks_failed', task_type: taskType, error_category: errorCategory, severity, // Message headers headers: { 'x-error-category': headers['x-error-category'], 'x-error-reason': headers['x-error-reason'], 'x-task-type': headers['x-task-type'], 'x-severity': headers['x-severity'], 'x-first-death-time': headers['x-first-death-time'], 'x-partner-code': headers['x-partner-code'], 'x-customer-id': headers['x-customer-id'] }, // Message properties properties: { contentType: msg.properties?.contentType, contentEncoding: msg.properties?.contentEncoding, deliveryMode: msg.properties?.deliveryMode, priority: msg.properties?.priority, timestamp: msg.properties?.timestamp, expiration: msg.properties?.expiration }, // Full message content message: taskMsg }; // Get archive directory path const archiveDir = getArchivePath(); await fs.mkdir(archiveDir, { recursive: true }); // Create filename with timestamp and task identifiers const logFileName = taskMsg.logFileName || 'unknown'; const sanitizedFilename = logFileName.replace(/[^a-zA-Z0-9._-]/g, '_'); const filename = `${timestamp}_${taskType}_${sanitizedFilename}.json`; const filepath = path.join(archiveDir, filename); // Write archive file await fs.writeFile( filepath, JSON.stringify(archiveRecord, null, 2), 'utf8' ); pino.info({ filepath, taskType, errorCategory, severity, logFileName: taskMsg.logFileName }, 'Message archived successfully'); return true; } catch (error) { pino.error({ err: error }, 'Failed to archive message'); return false; } } /** * Start the archival worker */ async function start() { try { pino.info('Starting DLQ Archival Worker...'); // Ensure archive directory exists await ensureArchiveDir(); // Connect to RabbitMQ const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR || 'agmuser', password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 0 }; connection = await amqp.connect(conOps); channel = await connection.createChannel(); // Ensure archive queue exists await channel.assertQueue(ARCHIVE_QUEUE, { durable: true }); // Set prefetch to 1 for memory-safe processing await channel.prefetch(1); pino.info(`Connected to RabbitMQ, consuming from ${ARCHIVE_QUEUE}`); // Consume messages await channel.consume(ARCHIVE_QUEUE, async (msg) => { if (!msg || isClosing) return; try { const success = await archiveMessage(msg); if (success) { channel.ack(msg); } else { // Nack and requeue failed archives (with delay via retry) pino.warn('Archive failed, requeuing message'); channel.nack(msg, false, true); } } catch (error) { pino.error({ err: error }, 'Error processing archive message'); // Nack without requeue on fatal errors channel.nack(msg, false, false); } }, { noAck: false }); pino.info('DLQ Archival Worker started successfully'); // Setup connection error handlers connection.on('error', (err) => { if (!isClosing) { pino.error({ err }, 'RabbitMQ connection error'); } }); connection.on('close', () => { if (!isClosing) { pino.warn('RabbitMQ connection closed, reconnecting in 5s...'); setTimeout(start, 5000); } }); } catch (error) { pino.error({ err: error }, 'Failed to start DLQ Archival Worker'); // Retry connection after delay setTimeout(start, 10000); } } /** * Graceful shutdown */ async function stop() { isClosing = true; pino.info('Stopping DLQ Archival Worker...'); try { if (channel) { await channel.close(); pino.info('Channel closed'); } if (connection) { await connection.close(); pino.info('Connection closed'); } } catch (error) { pino.error({ err: error }, 'Error during shutdown'); } pino.info('DLQ Archival Worker stopped'); } // Handle shutdown signals process.on('SIGINT', async () => { pino.info('Received SIGINT'); await stop(); process.exit(0); }); process.on('SIGTERM', async () => { pino.info('Received SIGTERM'); await stop(); process.exit(0); }); // Start worker if run directly if (require.main === module) { start().catch(error => { pino.fatal({ err: error }, 'Fatal error starting worker'); process.exit(1); }); } module.exports = { start, stop };