agmission/Development/server/helpers/dlq_queue_setup.js

375 lines
12 KiB
JavaScript

'use strict';
const amqp = require('amqplib');
const env = require('./env');
const pino = require('./logger').child('dlq_queue_setup');
/**
* DLQ Queue Setup Helper Module
*
* Provides reusable functions to set up Dead Letter Queue (DLQ) infrastructure
* for any RabbitMQ queue with configurable TTL and archival routing.
*
* Architecture:
* - Main Queue → DLQ (with TTL) → Archive Exchange → Archive Queue → Filesystem
*
* Usage:
* const { setupDLQQueues, getDLQConnection } = require('../helpers/dlq_queue_setup');
*
* // In worker startup:
* const { connection, channel } = await setupDLQQueues('partner_tasks');
*/
/**
* Create and return a RabbitMQ connection
* @param {Object} options - Connection options (optional, uses env defaults)
* @returns {Promise<Object>} RabbitMQ connection
*/
async function getDLQConnection(options = {}) {
const conOps = {
protocol: 'amqp',
hostname: options.hostname || env.QUEUE_HOST || 'localhost',
port: options.port || env.QUEUE_PORT || 5672,
username: options.username || env.QUEUE_USR || 'agmuser',
password: options.password || env.QUEUE_PWD,
vhost: options.vhost || env.QUEUE_VHOST || '/',
heartbeat: options.heartbeat || env.QUEUE_HEARTBEAT || 580,
frameMax: options.frameMax || 0
};
const connection = await amqp.connect(conOps);
connection.on('error', (err) => {
pino.error({ err }, '[DLQ Setup] Connection error');
});
connection.on('close', () => {
pino.warn('[DLQ Setup] Connection closed');
});
return connection;
}
/**
* Set up complete DLQ infrastructure for a queue
*
* @param {string} queueName - Name of the main queue (e.g., 'partner_tasks')
* @param {Object} options - Configuration options
* @param {Object} options.connection - Existing AMQP connection (optional)
* @param {number} options.retentionDays - DLQ retention in days (default: env.DLQ_RETENTION_DAYS)
* @param {boolean} options.durable - Queue durability (default: true)
* @param {number} options.prefetch - Consumer prefetch count (default: 1)
* @param {Object} options.queueArgs - Additional queue arguments (optional)
*
* @returns {Promise<Object>} { connection, channel, queueNames: { main, dlq, archive } }
*/
async function setupDLQQueues(queueName, options = {}) {
const {
connection: existingConnection,
retentionDays = env.DLQ_RETENTION_DAYS || 365,
durable = true,
prefetch = 1,
queueArgs = {}
} = options;
// Use existing connection or create new one
const connection = existingConnection || await getDLQConnection();
const channel = await connection.createChannel();
// Set prefetch for fair dispatch
channel.prefetch(prefetch);
// Calculate TTL in milliseconds
const retentionMs = retentionDays * 24 * 60 * 60 * 1000;
// Queue names - use the `_failed` suffix as the canonical DLQ name
const dlqName = `${queueName}_failed`;
const archiveExchange = `${queueName}_archive_exchange`;
const archiveQueue = `${queueName}_archive`;
pino.info(`[DLQ Setup] Setting up DLQ infrastructure for queue: ${queueName}`);
pino.info(`[DLQ Setup] Retention period: ${retentionDays} days (${retentionMs}ms)`);
try {
// 1. Create archive exchange (fanout - broadcasts to all bound queues)
await channel.assertExchange(archiveExchange, 'fanout', {
durable: true
});
pino.debug(`[DLQ Setup] Created archive exchange: ${archiveExchange}`);
// 2. Create archive queue (receives expired DLQ messages)
await channel.assertQueue(archiveQueue, {
durable: true
});
pino.debug(`[DLQ Setup] Created archive queue: ${archiveQueue}`);
// 3. Bind archive queue to archive exchange
await channel.bindQueue(archiveQueue, archiveExchange, '');
pino.debug(`[DLQ Setup] Bound archive queue to exchange`);
// We always use the `<queue>_failed` DLQ name per DLQ_SYSTEM_GUIDE.md
pino.debug(`[DLQ Setup] Using DLQ name: ${dlqName}`);
// 4. Ensure DLQ exists (create if not present)
try {
// If queue exists this will succeed, otherwise create it with desired args
try {
await channel.checkQueue(dlqName);
pino.debug(`[DLQ Setup] DLQ already exists: ${dlqName}`);
} catch (e) {
// Not found — create DLQ with TTL and dead-letter routing to archive
await channel.assertQueue(dlqName, {
durable: true,
arguments: {
'x-message-ttl': retentionMs, // Messages expire after retention period
'x-dead-letter-exchange': archiveExchange, // Route expired messages to archive
...queueArgs
}
});
pino.debug(`[DLQ Setup] Created DLQ: ${dlqName} (TTL: ${retentionMs}ms)`);
}
} catch (err) {
pino.error({ err }, '[DLQ Setup] Failed to ensure DLQ exists');
throw err;
}
// 5. Create main queue with DLQ routing only if it does not already exist
try {
try {
await channel.checkQueue(queueName);
pino.debug(`[DLQ Setup] Main queue already exists: ${queueName} — leaving existing arguments intact`);
} catch (e) {
// Queue not found — safe to create with DLQ routing arguments
await channel.assertQueue(queueName, {
durable,
arguments: {
'x-dead-letter-exchange': '', // Use default exchange
'x-dead-letter-routing-key': dlqName, // Route failed messages to DLQ
...queueArgs
}
});
pino.debug(`[DLQ Setup] Created main queue: ${queueName} with DLQ routing to ${dlqName}`);
}
} catch (error) {
// If a PRECONDITION_FAILED still occurs (race with another creator) try to proceed without re-declaring
if (error && error.message && error.message.includes('PRECONDITION_FAILED')) {
pino.warn({ err: error }, `[DLQ Setup] PRECONDITION_FAILED while declaring ${queueName}; assuming existing queue configuration will be used`);
try {
await channel.checkQueue(queueName);
} catch (chkErr) {
pino.error({ err: chkErr }, `[DLQ Setup] checkQueue failed after PRECONDITION_FAILED for ${queueName}`);
throw chkErr;
}
} else {
throw error;
}
}
pino.info(`[DLQ Setup] Successfully configured DLQ infrastructure for ${queueName}`);
pino.info(`[DLQ Setup] Flow: ${queueName}${dlqName} (${retentionDays}d TTL) → ${archiveQueue}`);
return {
connection,
channel,
queueNames: {
main: queueName,
dlq: dlqName,
archive: archiveQueue,
archiveExchange
}
};
} catch (error) {
pino.error({ err: error, queueName }, '[DLQ Setup] Failed to set up DLQ infrastructure');
throw error;
}
}
/**
* Get queue statistics
*
* @param {Object} channel - AMQP channel
* @param {string} queueName - Name of the queue
* @returns {Promise<Object>} Queue statistics { messageCount, consumerCount }
*/
async function getQueueStats(channel, queueName) {
try {
const queueInfo = await channel.checkQueue(queueName);
return {
messageCount: queueInfo.messageCount,
consumerCount: queueInfo.consumerCount
};
} catch (error) {
pino.error({ err: error, queueName }, '[DLQ Setup] Failed to get queue stats');
return {
messageCount: -1,
consumerCount: 0,
error: error.message
};
}
}
/**
* Enrich message with DLQ metadata headers
*
* @param {Object} taskInfo - Task information object
* @param {Error} error - Error object (optional)
* @param {Object} additionalHeaders - Additional headers to include (optional)
* @returns {Object} Message headers for DLQ
*
* Headers are for filtering/identification only. Task data contains all fields for reprocessing.
*
* Diagnostic Headers (All Queues):
* - x-first-death-time: Timestamp of failure
* - x-task-type: Type of task being processed
* - x-error-category: Error classification
* - x-error-reason: Error message
* - x-severity: Alert severity
*
* Context Headers (partner_tasks queue):
* - x-partner-code: Partner identifier (for filtering)
* - x-customer-id: Customer ObjectId (for filtering)
*/
function createDLQHeaders(taskInfo, error = null, additionalHeaders = {}) {
const headers = {
'x-first-death-time': new Date().toISOString(),
'x-task-type': taskInfo.taskType || 'unknown',
...additionalHeaders
};
// Add partner context headers for filtering (partner_tasks queue only)
if (taskInfo.partnerCode) headers['x-partner-code'] = taskInfo.partnerCode;
if (taskInfo.customerId) headers['x-customer-id'] = String(taskInfo.customerId);
// Add error metadata if error provided
if (error) {
const errorMessage = error.message || String(error);
headers['x-error-reason'] = errorMessage.substring(0, 255); // Limit length
headers['x-error-category'] = categorizeError(errorMessage);
headers['x-severity'] = calculateSeverity(errorMessage);
}
return headers;
}
/**
* Categorize error type based on error message
*/
function categorizeError(errorMessage) {
if (!errorMessage) return 'unknown';
const msg = errorMessage.toLowerCase();
// Infrastructure errors (check before transient to catch "database connection")
if (msg.includes('database') ||
msg.includes('mongo') ||
msg.includes('filesystem') ||
msg.includes('disk')) {
return 'infrastructure';
}
// Transient errors (network, timeouts)
if (msg.includes('timeout') ||
msg.includes('econnrefused') ||
msg.includes('enotfound') ||
msg.includes('network') ||
msg.includes('connection')) {
return 'transient';
}
// Validation errors
if (msg.includes('validation') ||
msg.includes('invalid') ||
msg.includes('required') ||
msg.includes('missing') ||
msg.includes('format')) {
return 'validation';
}
// Processing errors
if (msg.includes('parse') ||
msg.includes('calculation') ||
msg.includes('processing') ||
msg.includes('data')) {
return 'processing';
}
// Partner API errors
if (msg.includes('api') ||
msg.includes('authentication') ||
msg.includes('unauthorized') ||
msg.includes('rate limit')) {
return 'partner_api';
}
return 'unknown';
}
/**
* Calculate message severity based on error type
*/
function calculateSeverity(errorMessage) {
if (!errorMessage) return 'low';
const msg = errorMessage.toLowerCase();
// Critical - data loss or corruption
if (msg.includes('corrupt') ||
msg.includes('data loss') ||
msg.includes('fatal')) {
return 'critical';
}
// High - business impact
if (msg.includes('authentication') ||
msg.includes('authorization') ||
msg.includes('database') ||
msg.includes('disk full')) {
return 'high';
}
// Medium - retryable errors
if (msg.includes('timeout') ||
msg.includes('connection') ||
msg.includes('rate limit')) {
return 'medium';
}
// Low - validation or expected errors
return 'low';
}
/**
* Close connection and channel safely
*/
async function closeConnection(connection, channel) {
if (channel) {
try {
await channel.close();
pino.debug('[DLQ Setup] Channel closed');
} catch (err) {
pino.debug({ err }, '[DLQ Setup] Error closing channel (may already be closed)');
}
}
if (connection) {
try {
await connection.close();
pino.debug('[DLQ Setup] Connection closed');
} catch (err) {
pino.debug({ err }, '[DLQ Setup] Error closing connection (may already be closed)');
}
}
}
module.exports = {
setupDLQQueues,
getDLQConnection,
getQueueStats,
createDLQHeaders,
categorizeError,
calculateSeverity,
closeConnection
};