257 lines
6.8 KiB
JavaScript
257 lines
6.8 KiB
JavaScript
'use strict';
|
|
|
|
/**
|
|
* DLQ Archival Worker
|
|
* Consumes expired messages from DLQ archive queue and writes them to filesystem
|
|
* for long-term retention and audit compliance
|
|
*
|
|
* Features:
|
|
* - Automatic archival of TTL-expired DLQ messages
|
|
* - Organized by date (year/month/day) for easy retrieval
|
|
* - Preserves full message content, headers, and metadata
|
|
* - Graceful error handling and recovery
|
|
*/
|
|
|
|
const logger = require('../helpers/logger');
|
|
const pino = logger.child('dlq_archival_worker');
|
|
const env = require('../helpers/env.js');
|
|
const amqp = require('amqplib');
|
|
const fs = require('fs').promises;
|
|
const path = require('path');
|
|
|
|
const ARCHIVE_QUEUE = 'partner_tasks_archive';
|
|
let connection = null;
|
|
let channel = null;
|
|
let isClosing = false;
|
|
|
|
/**
|
|
* Ensure archive directory exists
|
|
*/
|
|
async function ensureArchiveDir() {
|
|
try {
|
|
await fs.mkdir(env.DLQ_ARCHIVE_PATH, { recursive: true });
|
|
pino.info(`Archive directory ready: ${env.DLQ_ARCHIVE_PATH}`);
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Failed to create archive directory');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get archive file path organized by date
|
|
*/
|
|
function getArchivePath(date = new Date()) {
|
|
const year = date.getFullYear();
|
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
|
const day = String(date.getDate()).padStart(2, '0');
|
|
|
|
return path.join(env.DLQ_ARCHIVE_PATH, String(year), month, day);
|
|
}
|
|
|
|
/**
|
|
* Archive a DLQ message to filesystem
|
|
*/
|
|
async function archiveMessage(msg) {
|
|
try {
|
|
const timestamp = Date.now();
|
|
const taskMsg = JSON.parse(msg.content.toString());
|
|
const headers = msg.properties?.headers || {};
|
|
|
|
// Extract metadata
|
|
const taskType = headers['x-task-type'] || taskMsg.type || 'unknown';
|
|
const errorCategory = headers['x-error-category'] || 'unknown';
|
|
const severity = headers['x-severity'] || 'unknown';
|
|
|
|
// Create archive record
|
|
const archiveRecord = {
|
|
archived_at: new Date().toISOString(),
|
|
timestamp,
|
|
queue_name: 'partner_tasks',
|
|
dlq_name: 'partner_tasks_failed',
|
|
task_type: taskType,
|
|
error_category: errorCategory,
|
|
severity,
|
|
|
|
// Message headers
|
|
headers: {
|
|
'x-error-category': headers['x-error-category'],
|
|
'x-error-reason': headers['x-error-reason'],
|
|
'x-task-type': headers['x-task-type'],
|
|
'x-severity': headers['x-severity'],
|
|
'x-first-death-time': headers['x-first-death-time'],
|
|
'x-partner-code': headers['x-partner-code'],
|
|
'x-customer-id': headers['x-customer-id']
|
|
},
|
|
|
|
// Message properties
|
|
properties: {
|
|
contentType: msg.properties?.contentType,
|
|
contentEncoding: msg.properties?.contentEncoding,
|
|
deliveryMode: msg.properties?.deliveryMode,
|
|
priority: msg.properties?.priority,
|
|
timestamp: msg.properties?.timestamp,
|
|
expiration: msg.properties?.expiration
|
|
},
|
|
|
|
// Full message content
|
|
message: taskMsg
|
|
};
|
|
|
|
// Get archive directory path
|
|
const archiveDir = getArchivePath();
|
|
await fs.mkdir(archiveDir, { recursive: true });
|
|
|
|
// Create filename with timestamp and task identifiers
|
|
const logFileName = taskMsg.logFileName || 'unknown';
|
|
const sanitizedFilename = logFileName.replace(/[^a-zA-Z0-9._-]/g, '_');
|
|
const filename = `${timestamp}_${taskType}_${sanitizedFilename}.json`;
|
|
const filepath = path.join(archiveDir, filename);
|
|
|
|
// Write archive file
|
|
await fs.writeFile(
|
|
filepath,
|
|
JSON.stringify(archiveRecord, null, 2),
|
|
'utf8'
|
|
);
|
|
|
|
pino.info({
|
|
filepath,
|
|
taskType,
|
|
errorCategory,
|
|
severity,
|
|
logFileName: taskMsg.logFileName
|
|
}, 'Message archived successfully');
|
|
|
|
return true;
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Failed to archive message');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start the archival worker
|
|
*/
|
|
async function start() {
|
|
try {
|
|
pino.info('Starting DLQ Archival Worker...');
|
|
|
|
// Ensure archive directory exists
|
|
await ensureArchiveDir();
|
|
|
|
// Connect to RabbitMQ
|
|
const conOps = {
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR || 'agmuser',
|
|
password: env.QUEUE_PWD,
|
|
vhost: env.QUEUE_VHOST || '/',
|
|
heartbeat: env.QUEUE_HEARTBEAT || 0
|
|
};
|
|
|
|
connection = await amqp.connect(conOps);
|
|
channel = await connection.createChannel();
|
|
|
|
// Ensure archive queue exists
|
|
await channel.assertQueue(ARCHIVE_QUEUE, { durable: true });
|
|
|
|
// Set prefetch to 1 for memory-safe processing
|
|
await channel.prefetch(1);
|
|
|
|
pino.info(`Connected to RabbitMQ, consuming from ${ARCHIVE_QUEUE}`);
|
|
|
|
// Consume messages
|
|
await channel.consume(ARCHIVE_QUEUE, async (msg) => {
|
|
if (!msg || isClosing) return;
|
|
|
|
try {
|
|
const success = await archiveMessage(msg);
|
|
|
|
if (success) {
|
|
channel.ack(msg);
|
|
} else {
|
|
// Nack and requeue failed archives (with delay via retry)
|
|
pino.warn('Archive failed, requeuing message');
|
|
channel.nack(msg, false, true);
|
|
}
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error processing archive message');
|
|
// Nack without requeue on fatal errors
|
|
channel.nack(msg, false, false);
|
|
}
|
|
}, { noAck: false });
|
|
|
|
pino.info('DLQ Archival Worker started successfully');
|
|
|
|
// Setup connection error handlers
|
|
connection.on('error', (err) => {
|
|
if (!isClosing) {
|
|
pino.error({ err }, 'RabbitMQ connection error');
|
|
}
|
|
});
|
|
|
|
connection.on('close', () => {
|
|
if (!isClosing) {
|
|
pino.warn('RabbitMQ connection closed, reconnecting in 5s...');
|
|
setTimeout(start, 5000);
|
|
}
|
|
});
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Failed to start DLQ Archival Worker');
|
|
|
|
// Retry connection after delay
|
|
setTimeout(start, 10000);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Graceful shutdown
|
|
*/
|
|
async function stop() {
|
|
isClosing = true;
|
|
pino.info('Stopping DLQ Archival Worker...');
|
|
|
|
try {
|
|
if (channel) {
|
|
await channel.close();
|
|
pino.info('Channel closed');
|
|
}
|
|
|
|
if (connection) {
|
|
await connection.close();
|
|
pino.info('Connection closed');
|
|
}
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error during shutdown');
|
|
}
|
|
|
|
pino.info('DLQ Archival Worker stopped');
|
|
}
|
|
|
|
// Handle shutdown signals
|
|
process.on('SIGINT', async () => {
|
|
pino.info('Received SIGINT');
|
|
await stop();
|
|
process.exit(0);
|
|
});
|
|
|
|
process.on('SIGTERM', async () => {
|
|
pino.info('Received SIGTERM');
|
|
await stop();
|
|
process.exit(0);
|
|
});
|
|
|
|
// Start worker if run directly
|
|
if (require.main === module) {
|
|
start().catch(error => {
|
|
pino.fatal({ err: error }, 'Fatal error starting worker');
|
|
process.exit(1);
|
|
});
|
|
}
|
|
|
|
module.exports = { start, stop };
|