agmission/Development/server/workers/dlq_archival_worker.js

257 lines
6.8 KiB
JavaScript

'use strict';
/**
* DLQ Archival Worker
* Consumes expired messages from DLQ archive queue and writes them to filesystem
* for long-term retention and audit compliance
*
* Features:
* - Automatic archival of TTL-expired DLQ messages
* - Organized by date (year/month/day) for easy retrieval
* - Preserves full message content, headers, and metadata
* - Graceful error handling and recovery
*/
const logger = require('../helpers/logger');
const pino = logger.child('dlq_archival_worker');
const env = require('../helpers/env.js');
const amqp = require('amqplib');
const fs = require('fs').promises;
const path = require('path');
const ARCHIVE_QUEUE = 'partner_tasks_archive';
let connection = null;
let channel = null;
let isClosing = false;
/**
* Ensure archive directory exists
*/
async function ensureArchiveDir() {
try {
await fs.mkdir(env.DLQ_ARCHIVE_PATH, { recursive: true });
pino.info(`Archive directory ready: ${env.DLQ_ARCHIVE_PATH}`);
} catch (error) {
pino.error({ err: error }, 'Failed to create archive directory');
throw error;
}
}
/**
* Get archive file path organized by date
*/
function getArchivePath(date = new Date()) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
return path.join(env.DLQ_ARCHIVE_PATH, String(year), month, day);
}
/**
* Archive a DLQ message to filesystem
*/
async function archiveMessage(msg) {
try {
const timestamp = Date.now();
const taskMsg = JSON.parse(msg.content.toString());
const headers = msg.properties?.headers || {};
// Extract metadata
const taskType = headers['x-task-type'] || taskMsg.type || 'unknown';
const errorCategory = headers['x-error-category'] || 'unknown';
const severity = headers['x-severity'] || 'unknown';
// Create archive record
const archiveRecord = {
archived_at: new Date().toISOString(),
timestamp,
queue_name: 'partner_tasks',
dlq_name: 'partner_tasks_failed',
task_type: taskType,
error_category: errorCategory,
severity,
// Message headers
headers: {
'x-error-category': headers['x-error-category'],
'x-error-reason': headers['x-error-reason'],
'x-task-type': headers['x-task-type'],
'x-severity': headers['x-severity'],
'x-first-death-time': headers['x-first-death-time'],
'x-partner-code': headers['x-partner-code'],
'x-customer-id': headers['x-customer-id']
},
// Message properties
properties: {
contentType: msg.properties?.contentType,
contentEncoding: msg.properties?.contentEncoding,
deliveryMode: msg.properties?.deliveryMode,
priority: msg.properties?.priority,
timestamp: msg.properties?.timestamp,
expiration: msg.properties?.expiration
},
// Full message content
message: taskMsg
};
// Get archive directory path
const archiveDir = getArchivePath();
await fs.mkdir(archiveDir, { recursive: true });
// Create filename with timestamp and task identifiers
const logFileName = taskMsg.logFileName || 'unknown';
const sanitizedFilename = logFileName.replace(/[^a-zA-Z0-9._-]/g, '_');
const filename = `${timestamp}_${taskType}_${sanitizedFilename}.json`;
const filepath = path.join(archiveDir, filename);
// Write archive file
await fs.writeFile(
filepath,
JSON.stringify(archiveRecord, null, 2),
'utf8'
);
pino.info({
filepath,
taskType,
errorCategory,
severity,
logFileName: taskMsg.logFileName
}, 'Message archived successfully');
return true;
} catch (error) {
pino.error({ err: error }, 'Failed to archive message');
return false;
}
}
/**
* Start the archival worker
*/
async function start() {
try {
pino.info('Starting DLQ Archival Worker...');
// Ensure archive directory exists
await ensureArchiveDir();
// Connect to RabbitMQ
const conOps = {
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR || 'agmuser',
password: env.QUEUE_PWD,
vhost: env.QUEUE_VHOST || '/',
heartbeat: env.QUEUE_HEARTBEAT || 0
};
connection = await amqp.connect(conOps);
channel = await connection.createChannel();
// Ensure archive queue exists
await channel.assertQueue(ARCHIVE_QUEUE, { durable: true });
// Set prefetch to 1 for memory-safe processing
await channel.prefetch(1);
pino.info(`Connected to RabbitMQ, consuming from ${ARCHIVE_QUEUE}`);
// Consume messages
await channel.consume(ARCHIVE_QUEUE, async (msg) => {
if (!msg || isClosing) return;
try {
const success = await archiveMessage(msg);
if (success) {
channel.ack(msg);
} else {
// Nack and requeue failed archives (with delay via retry)
pino.warn('Archive failed, requeuing message');
channel.nack(msg, false, true);
}
} catch (error) {
pino.error({ err: error }, 'Error processing archive message');
// Nack without requeue on fatal errors
channel.nack(msg, false, false);
}
}, { noAck: false });
pino.info('DLQ Archival Worker started successfully');
// Setup connection error handlers
connection.on('error', (err) => {
if (!isClosing) {
pino.error({ err }, 'RabbitMQ connection error');
}
});
connection.on('close', () => {
if (!isClosing) {
pino.warn('RabbitMQ connection closed, reconnecting in 5s...');
setTimeout(start, 5000);
}
});
} catch (error) {
pino.error({ err: error }, 'Failed to start DLQ Archival Worker');
// Retry connection after delay
setTimeout(start, 10000);
}
}
/**
* Graceful shutdown
*/
async function stop() {
isClosing = true;
pino.info('Stopping DLQ Archival Worker...');
try {
if (channel) {
await channel.close();
pino.info('Channel closed');
}
if (connection) {
await connection.close();
pino.info('Connection closed');
}
} catch (error) {
pino.error({ err: error }, 'Error during shutdown');
}
pino.info('DLQ Archival Worker stopped');
}
// Handle shutdown signals
process.on('SIGINT', async () => {
pino.info('Received SIGINT');
await stop();
process.exit(0);
});
process.on('SIGTERM', async () => {
pino.info('Received SIGTERM');
await stop();
process.exit(0);
});
// Start worker if run directly
if (require.main === module) {
start().catch(error => {
pino.fatal({ err: error }, 'Fatal error starting worker');
process.exit(1);
});
}
module.exports = { start, stop };