'use strict'; const { AppError, AppParamError } = require('../helpers/app_error'); const { Errors, PartnerLogTrackerStatus } = require('../helpers/constants'); const PartnerLogTracker = require('../model/partner_log_tracker'); const amqp = require('amqplib'); const axios = require('axios'); const env = require('../helpers/env'); const pino = require('../helpers/logger').child('dlq'); /** * RabbitMQ Management HTTP API helper for non-destructive queue operations * Requires RabbitMQ Management plugin to be enabled */ const RABBITMQ_MGMT_PORT = process.env.RABBITMQ_MGMT_PORT || 15672; const RABBITMQ_MGMT_ENABLED = process.env.RABBITMQ_MGMT_ENABLED !== 'false'; /** * Peek at messages using RabbitMQ Management API (non-destructive) * @param {string} queueName - Queue name * @param {number} count - Number of messages to peek * @returns {Promise} Messages without consuming them */ async function peekMessagesViaManagementAPI(queueName, count = 50) { try { const vhost = encodeURIComponent(env.QUEUE_VHOST || '/'); const queueEncoded = encodeURIComponent(queueName); const url = `http://${env.QUEUE_HOST}:${RABBITMQ_MGMT_PORT}/api/queues/${vhost}/${queueEncoded}/get`; const response = await axios.post( url, { count, ackmode: 'ack_requeue_true', // Requeue messages immediately (non-destructive) encoding: 'auto', truncate: 50000 }, { auth: { username: env.QUEUE_USR, password: env.QUEUE_PWD }, timeout: 10000 } ); return response.data.map((msg, idx) => ({ position: idx + 1, payload: msg.payload, properties: msg.properties, exchange: msg.exchange, routing_key: msg.routing_key, message_count: msg.message_count, redelivered: msg.redelivered })); } catch (error) { pino.warn({ err: error }, 'Management API not available for peeking messages'); return null; } } /** * Utility: Create RabbitMQ connection * @returns {Promise} connection object */ async function createRabbitMQConnection() { return await amqp.connect({ protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR, password: env.QUEUE_PWD }); } /** * Utility: Get DLQ name from main queue name * @param {string} queueName - Main queue name * @returns {string} DLQ name with _failed suffix */ function getDLQName(queueName) { return `${queueName}_failed`; } /** * Utility: Check if queue is a partner queue * @param {string} queueName - Queue name to check * @returns {boolean} True if queue is partner-related */ function isPartnerQueue(queueName) { return queueName.includes('partner'); } /** * Utility: Assert both main queue and DLQ exist * @param {Object} channel - RabbitMQ channel * @param {string} queueName - Main queue name * @param {string} dlqName - DLQ name * @param {boolean} withDLX - Whether to configure DLX on main queue */ async function assertQueues(channel, queueName, dlqName, withDLX = false) { await channel.assertQueue(dlqName, { durable: true }); if (withDLX) { try { await channel.assertQueue(queueName, { durable: true, arguments: { 'x-dead-letter-exchange': '', 'x-dead-letter-routing-key': dlqName } }); } catch (error) { if (error.message && error.message.includes('PRECONDITION_FAILED')) { // Queue exists with different configuration - use as-is await channel.assertQueue(queueName, { durable: true }); pino.warn('Using existing queue without DLX configuration: %s', queueName); } else { throw error; } } } else { await channel.assertQueue(queueName, { durable: true }); } } /** * Utility: Get partner tracker statistics (partner queues only) * @returns {Promise} Tracker statistics object */ async function getPartnerTrackerStats() { const trackerStats = await PartnerLogTracker.aggregate([ { $group: { _id: '$status', count: { $sum: 1 } } } ]); const trackers = { [PartnerLogTrackerStatus.FAILED]: 0, [PartnerLogTrackerStatus.PROCESSING]: 0, [PartnerLogTrackerStatus.DOWNLOADED]: 0, [PartnerLogTrackerStatus.PROCESSED]: 0, [PartnerLogTrackerStatus.ARCHIVED]: 0 }; trackerStats.forEach(stat => { if (trackers.hasOwnProperty(stat._id)) { trackers[stat._id] = stat.count; } }); return trackers; } /** * Utility: Get recent partner failures (partner queues only) * @param {number} limit - Maximum number of failures to return * @returns {Promise} Array of formatted failure records */ async function getPartnerRecentFailures(limit = 20) { const recentFailures = await PartnerLogTracker.find({ status: PartnerLogTrackerStatus.FAILED }) .sort({ updatedAt: -1 }) .limit(limit) .select('_id logFileName partnerCode errorMessage retryCount updatedAt') .populate('customerId', 'name username') .lean(); return recentFailures.map(f => ({ id: f._id.toString(), logFileName: f.logFileName, partnerCode: f.partnerCode, customer: f.customerId, errorMessage: f.errorMessage, retryCount: f.retryCount, failedAt: f.updatedAt })); } /** * Utility: Close RabbitMQ channel and connection safely * @param {Object} channel - RabbitMQ channel to close * @param {Object} connection - RabbitMQ connection to close */ async function closeRabbitMQ(channel, connection) { if (channel) await channel.close().catch(() => {}); if (connection) await connection.close().catch(() => {}); } /** * @api {get} /api/dlq/:queueName/stats Get DLQ Statistics * @apiName GetDLQStats * @apiGroup DLQ * @apiDescription Get comprehensive statistics about a specific Dead Letter Queue * * Queue-agnostic: Works for any queue type (jobs, partner_tasks, etc.) * Partner queues get additional tracker statistics and recent failure details. * * @apiParam {String} queueName Queue name (e.g., 'partner_tasks', 'dev_jobs') * * @apiSuccess {Object} dlq DLQ message queue statistics * @apiSuccess {Number} dlq.messageCount Number of messages in DLQ * @apiSuccess {Number} dlq.consumerCount Number of active consumers * @apiSuccess {String} dlq.queueName Name of the DLQ * @apiSuccess {Object} [trackers] Tracker status counts (partner queues only) * @apiSuccess {Array} [recentFailures] Recent failed tasks with details (partner queues only) */ exports.getDLQStats_get = async (req, res, next) => { let connection, channel; try { const queueName = req.params.queueName; if (!queueName) { throw new AppParamError('queueName parameter is required'); } const dlqName = getDLQName(queueName); let queueInfo; try { connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await channel.assertQueue(dlqName, { durable: true }); queueInfo = await channel.checkQueue(dlqName); } catch (error) { pino.error('Error connecting to RabbitMQ:', error); queueInfo = { messageCount: -1, consumerCount: 0, error: error.message }; } finally { await closeRabbitMQ(channel, connection); } const response = { dlq: { messageCount: queueInfo.messageCount, consumerCount: queueInfo.consumerCount, queueName: dlqName, ...(queueInfo.error && { error: queueInfo.error }) } }; // Only query trackers for partner queues (partner-specific feature) if (isPartnerQueue(queueName)) { response.trackers = await getPartnerTrackerStats(); response.recentFailures = await getPartnerRecentFailures(20); } res.json(response); } catch (error) { pino.error('Error getting DLQ stats:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to get DLQ statistics')); } }; /** * @api {get} /api/dlq/:queueName/messages Get DLQ Messages * @apiName GetDLQMessages * @apiGroup DLQ * @apiDescription Retrieve messages from the Dead Letter Queue without consuming them (non-destructive peek) * * Requires RabbitMQ Management plugin enabled: rabbitmq-plugins enable rabbitmq_management * * @apiQuery {Number} [limit=50] Maximum number of messages to retrieve * * @apiSuccess {Array} messages Array of DLQ messages * @apiSuccess {Number} messages.position Message position in queue * @apiSuccess {Object} messages.taskInfo Task information * @apiSuccess {String} messages.errorMessage Error message if available * @apiSuccess {Number} messages.retryCount Number of retries * @apiSuccess {Date} messages.enqueuedAt When message was added to DLQ * @apiSuccess {Object} messages.headers Message headers * @apiSuccess {Boolean} messages.redelivered Whether message was redelivered * @apiSuccess {Number} count Number of messages returned * @apiSuccess {String} queueName DLQ name * @apiSuccess {String} method Always 'management-api' * * @apiError (409) {Object} error Error object * @apiError (409) {String} error..tag Error constant ('unknown_app_error') * @apiError (409) {String} error.message 'RabbitMQ Management API not available' */ exports.getDLQMessages_get = async (req, res, next) => { const limit = parseInt(req.query.limit) || 50; const queueName = req.params.queueName; if (!queueName) { throw new AppParamError('queueName parameter is required'); } const dlqName = getDLQName(queueName); // Check if Management API is enabled if (!RABBITMQ_MGMT_ENABLED) { pino.error('RabbitMQ Management API is disabled in configuration'); throw new AppError( Errors.RABBITMQ_MGMT_DISABLED, 'RabbitMQ Management API not available. Set RABBITMQ_MGMT_ENABLED=true and enable plugin: rabbitmq-plugins enable rabbitmq_management' ); } // Use Management API for non-destructive peek const mgmtMessages = await peekMessagesViaManagementAPI(dlqName, limit); if (!mgmtMessages) { pino.error('Failed to retrieve messages via Management API'); throw new AppError( Errors.RABBITMQ_MGMT_DISABLED, 'RabbitMQ Management API not available. Ensure plugin is enabled: rabbitmq-plugins enable rabbitmq_management Or the access credentials have monitoring permissions.' ); } pino.debug(`Retrieved ${mgmtMessages.length} messages via Management API (non-destructive)`); // Format messages for response const formatted = mgmtMessages.map(msg => { try { const content = typeof msg.payload === 'string' ? JSON.parse(msg.payload) : msg.payload; return { position: msg.position, taskInfo: content.taskInfo || content, errorMessage: content.errorMessage, retryCount: content.retryCount || 0, enqueuedAt: msg.properties?.timestamp || null, headers: msg.properties?.headers, redelivered: msg.redelivered }; } catch (parseError) { pino.error('Error parsing message:', parseError); return { position: msg.position, parseError: parseError.message, rawContent: JSON.stringify(msg.payload).substring(0, 100) }; } }); res.json({ messages: formatted, count: formatted.length, queueName: dlqName, method: 'management-api' }); }; /** * @api {post} /api/dlq/:queueName/process Process DLQ * @apiName ProcessDLQ * @apiGroup DLQ * @apiDescription Process messages in the Dead Letter Queue - categorize errors and retry/archive * * @apiBody {Number} [maxMessages=100] Maximum number of messages to process * @apiBody {Boolean} [dryRun=false] If true, only analyze without taking action * * @apiSuccess {Number} processed Number of messages processed * @apiSuccess {Number} retried Number of messages retried * @apiSuccess {Number} archived Number of messages archived * @apiSuccess {Object} categorization Error categorization results */ exports.processDLQ_post = async (req, res, next) => { let connection, channel; try { const maxMessages = parseInt(req.body.maxMessages) || 100; const dryRun = req.body.dryRun === true; const queueName = req.params.queueName; if (!queueName) { throw new AppParamError('queueName parameter is required'); } const dlqName = getDLQName(queueName); connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await assertQueues(channel, queueName, dlqName, true); const results = { processed: 0, retried: 0, archived: 0, categorization: { transient: 0, validation: 0, processing: 0, infrastructure: 0, partner_api: 0, unknown: 0 } }; // Process messages for (let i = 0; i < maxMessages; i++) { const msg = await channel.get(dlqName, { noAck: false }); if (!msg) break; try { const taskInfo = JSON.parse(msg.content.toString()); results.processed++; // Only use tracker for partner queues (partner-specific feature) const partnerQueue = isPartnerQueue(queueName); let category = 'unknown'; let messageAge = 0; let tracker = null; if (partnerQueue) { // Get tracker info for error categorization tracker = await PartnerLogTracker.findOne({ logFileName: taskInfo.logFileName, partnerId: taskInfo.partnerId, customerId: taskInfo.customerId }); if (!tracker) { pino.warn(`No tracker found for ${taskInfo.logFileName}`); channel.ack(msg); continue; } // Categorize error category = categorizeError(tracker.errorMessage); results.categorization[category]++; // Determine action based on category and age messageAge = Date.now() - new Date(tracker.updatedAt).getTime(); } else { // For non-partner queues, categorize from message headers const errorMsg = msg.properties?.headers?.['x-error-message']; category = categorizeError(errorMsg); results.categorization[category]++; messageAge = msg.properties?.headers?.['x-failed-at'] ? Date.now() - new Date(msg.properties.headers['x-failed-at']).getTime() : 0; } // Determine action based on category and age const AUTO_RETRY_WINDOW_MS = 2 * 60 * 60 * 1000; // 2 hours const MAX_AGE_MS = 24 * 60 * 60 * 1000; // 24 hours let action = 'keep'; if (category === 'transient' && messageAge < AUTO_RETRY_WINDOW_MS) { action = 'retry'; } else if (category === 'validation' || messageAge > MAX_AGE_MS) { action = 'archive'; } if (!dryRun) { if (action === 'retry') { // Reset tracker if partner queue if (partnerQueue && tracker) { await PartnerLogTracker.updateOne( { _id: tracker._id }, { $set: { status: PartnerLogTrackerStatus.DOWNLOADED }, $unset: { errorMessage: 1 } } ); } // Send back to main queue channel.sendToQueue(queueName, msg.content, { persistent: true, headers: { ...msg.properties.headers, 'x-retry-from-dlq': true, 'x-dlq-retry-time': new Date().toISOString() } }); results.retried++; channel.ack(msg); } else if (action === 'archive') { // Archive the tracker if partner queue if (partnerQueue && tracker) { await PartnerLogTracker.updateOne( { _id: tracker._id }, { $set: { status: PartnerLogTrackerStatus.ARCHIVED, archivedAt: new Date(), archivedReason: `DLQ: ${category} error, age: ${Math.round(messageAge / 3600000)}h` } } ); } results.archived++; channel.ack(msg); } else { // Keep in DLQ for manual review channel.nack(msg, false, true); } } else { // Dry run - just requeue channel.nack(msg, false, true); } } catch (error) { pino.error('Error processing DLQ message:', error); channel.nack(msg, false, true); } } res.json({ ...results, dryRun, timestamp: new Date().toISOString() }); } catch (error) { pino.error('Error processing DLQ:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to process DLQ')); } finally { await closeRabbitMQ(channel, connection); } }; /** * @api {post} /api/partners/dlq/retry/:id Retry Failed Task (DEPRECATED - Partner-Specific) * @apiName RetryFailedTask * @apiGroup PartnerDLQ * @apiDescription DEPRECATED: Old tracker-ID-based retry (partner-specific only) * * Use queue-native methods instead: * - POST /api/dlq/:queueName/retryAll * - POST /api/dlq/:queueName/retryByPosition * - POST /api/dlq/:queueName/retryByHeader * * @apiParam {String} id Tracker ID (MongoDB _id) * @apiParam {String} queueName Queue name (should be 'partner_tasks') * * @apiSuccess {Boolean} success Whether retry was successful * @apiSuccess {String} message Status message * @apiSuccess {Object} taskInfo Task information * * @deprecated Use queue-native retry methods instead. This method only works for partner_tasks queue. */ exports.retryFailedTask_post = async (req, res, next) => { let connection, channel; try { const { id } = req.params; const queueName = req.params.queueName; // This is a partner-specific legacy method if (!queueName || !isPartnerQueue(queueName)) { pino.warn('retryFailedTask_post called for non-partner queue, use queue-native methods instead'); throw new AppParamError('This method is for partner queues only. Use /api/dlq/:queueName/retryAll instead'); } // Find the tracker const tracker = await PartnerLogTracker.findById(id); if (!tracker) { throw new AppParamError('Partner log tracker not found'); } if (tracker.status !== PartnerLogTrackerStatus.FAILED && tracker.status !== PartnerLogTrackerStatus.ARCHIVED) { throw new AppParamError(`Cannot retry task with status: ${tracker.status}`); } connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await channel.assertQueue(queueName, { durable: true }); // Reset tracker status await PartnerLogTracker.updateOne( { _id: tracker._id }, { $set: { status: PartnerLogTrackerStatus.DOWNLOADED, processingStartedAt: null }, $unset: { errorMessage: 1 } } ); // Send to queue const taskInfo = { logFileName: tracker.logFileName, partnerId: tracker.partnerId.toString(), customerId: tracker.customerId.toString() }; channel.sendToQueue(queueName, Buffer.from(JSON.stringify(taskInfo)), { persistent: true, headers: { 'x-manual-retry': true, 'x-retry-time': new Date().toISOString(), 'x-retry-by': req.user?.username || 'admin' } }); pino.info(`Manually retried task: ${tracker.logFileName}`); res.json({ success: true, message: 'Task has been queued for retry', taskInfo }); } catch (error) { if (error instanceof AppParamError) { next(error); } else { pino.error('Error retrying task:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry task')); } } finally { await closeRabbitMQ(channel, connection); } }; /** * @api {post} /api/partners/dlq/archive/:id Archive Failed Task (DEPRECATED - Partner-Specific) * @apiName ArchiveFailedTask * @apiGroup PartnerDLQ * @apiDescription DEPRECATED: Old tracker-ID-based archive (partner-specific only) * * Archive functionality should now use DLQ purge or message-specific operations. * * @apiParam {String} id Tracker ID (MongoDB _id) * @apiBody {String} [reason] Archive reason * * @apiSuccess {Boolean} success Whether archive was successful * @apiSuccess {String} message Status message * * @deprecated Archive should be done via DLQ operations, not tracker updates. Partner-specific only. */ exports.archiveFailedTask_post = async (req, res, next) => { try { const { id } = req.params; const { reason } = req.body; // Find the tracker const tracker = await PartnerLogTracker.findById(id); if (!tracker) { throw new AppParamError('Partner log tracker not found'); } // Archive the tracker await PartnerLogTracker.updateOne( { _id: tracker._id }, { $set: { status: PartnerLogTrackerStatus.ARCHIVED, archivedAt: new Date(), archivedReason: reason || 'Manually archived', archivedBy: req.user?.username || 'admin' } } ); pino.info(`Archived task: ${tracker.logFileName}, reason: ${reason || 'Manual'}`); res.json({ success: true, message: 'Task has been archived' }); } catch (error) { if (error instanceof AppParamError) { next(error); } else { pino.error('Error archiving task:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to archive task')); } } }; /** * @api {delete} /api/dlq/:queueName/purge Purge DLQ * @apiName PurgeDLQ * @apiGroup DLQ * @apiDescription Purge all messages from the Dead Letter Queue (USE WITH CAUTION) * * @apiBody {Boolean} confirm Must be set to true to confirm purge * * @apiSuccess {Boolean} success Whether purge was successful * @apiSuccess {Number} purgedCount Number of messages purged */ exports.purgeDLQ_delete = async (req, res, next) => { let connection, channel; try { const { confirm } = req.body; if (confirm !== true) { throw new AppParamError('Must confirm purge by setting confirm=true'); } const queueName = req.params.queueName; if (!queueName) { throw new AppParamError('queueName parameter is required'); } const dlqName = getDLQName(queueName); connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await channel.assertQueue(dlqName, { durable: true }); // Get count before purge const queueInfo = await channel.checkQueue(dlqName); const messageCount = queueInfo.messageCount; // Purge the queue await channel.purgeQueue(dlqName); pino.warn(`DLQ purged by ${req.user?.username || 'admin'}, ${messageCount} messages deleted`); res.json({ success: true, purgedCount: messageCount, message: `Purged ${messageCount} messages from DLQ` }); } catch (error) { if (error instanceof AppParamError) { next(error); } else { pino.error('Error purging DLQ:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to purge DLQ')); } } finally { await closeRabbitMQ(channel, connection); } }; /** * @api {post} /api/dlq/:queueName/:queueName/retryAll Retry All DLQ Messages * @apiName RetryAllDLQMessages * @apiGroup DLQ * @apiDescription Queue-native retry - moves all messages from DLQ back to main queue * * @apiParam {String} queueName Main queue name (e.g., 'partner_tasks') * @apiBody {Number} [maxMessages=100] Maximum number of messages to retry * * @apiSuccess {Boolean} success Whether retry was successful * @apiSuccess {Number} retriedCount Number of messages retried */ exports.retryAllDLQ_post = async (req, res, next) => { let connection, channel; try { const { queueName } = req.params; const maxMessages = parseInt(req.body.maxMessages) || 100; // Validate queue name if (!queueName || typeof queueName !== 'string') { throw new AppParamError('Invalid queue name'); } const dlqName = getDLQName(queueName); connection = await createRabbitMQConnection(); channel = await connection.createChannel(); // Check if queues exist await channel.checkQueue(queueName); await channel.checkQueue(dlqName); let retriedCount = 0; let failedCount = 0; // Move messages from DLQ to main queue (non-disruptive: ack only after successful send) for (let i = 0; i < maxMessages; i++) { const msg = await channel.get(dlqName, { noAck: false }); if (!msg) break; try { // Send to main queue with retry metadata await channel.sendToQueue(queueName, msg.content, { persistent: true, headers: { ...msg.properties.headers, 'x-retry-from-dlq': true, 'x-retry-time': new Date().toISOString(), 'x-retry-by': req.user?.username || 'admin', 'x-retry-method': 'retryAll' } }); // Only ack after successful send to main queue channel.ack(msg); retriedCount++; } catch (error) { pino.error({ err: error }, 'Error retrying message, keeping in DLQ'); // Reject and requeue to DLQ - message stays in DLQ on failure channel.nack(msg, false, true); failedCount++; } } pino.info(`Retried ${retriedCount} messages from ${dlqName} to ${queueName}, ${failedCount} failed`); res.json({ success: true, processed: retriedCount, retriedCount, // Deprecated: use 'processed' instead failedCount, queueName, dlqName }); } catch (error) { if (error instanceof AppParamError) { next(error); } else { pino.error('Error retrying all DLQ messages:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages')); } } finally { await closeRabbitMQ(channel, connection); } }; /** * @api {post} /api/dlq/:queueName/:queueName/retryByPosition Retry DLQ Message by Position * @apiName RetryDLQByPosition * @apiGroup DLQ * @apiDescription Queue-native retry - retry a specific message by its position in the DLQ * * @apiParam {String} queueName Main queue name (e.g., 'partner_tasks') * @apiBody {Number} position Position of the message in DLQ (0-based index) * * @apiSuccess {Boolean} success Whether retry was successful * @apiSuccess {Object} message Message information */ exports.retryDLQByPosition_post = async (req, res, next) => { let connection, channel; try { const { queueName } = req.params; const { position } = req.body; // Validate inputs if (!queueName || typeof queueName !== 'string') { throw new AppParamError('Invalid queue name'); } if (typeof position !== 'number' || position < 0) { throw new AppParamError('Position must be a non-negative number'); } const dlqName = getDLQName(queueName); connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await channel.checkQueue(queueName); const dlqInfo = await channel.checkQueue(dlqName); if (position >= dlqInfo.messageCount) { throw new AppParamError(`Position ${position} is out of range (DLQ has ${dlqInfo.messageCount} messages)`); } // Collect and requeue messages before target (non-disruptive) const messagesToRequeue = []; let targetMessage = null; // Get messages up to and including target position for (let i = 0; i <= position; i++) { const msg = await channel.get(dlqName, { noAck: false }); if (!msg) { throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve message at position'); } if (i === position) { targetMessage = msg; } else { messagesToRequeue.push(msg); } } // Requeue the messages we skipped (non-disruptive: send then ack) for (const msg of messagesToRequeue) { try { await channel.sendToQueue(dlqName, msg.content, { persistent: true, headers: msg.properties.headers }); channel.ack(msg); } catch (error) { pino.error({ err: error }, 'Failed to requeue skipped message'); channel.nack(msg, false, true); // Keep in DLQ on failure } } // Retry the target message to main queue (non-disruptive: send then ack) if (targetMessage) { const taskInfo = JSON.parse(targetMessage.content.toString()); try { await channel.sendToQueue(queueName, targetMessage.content, { persistent: true, headers: { ...targetMessage.properties.headers, 'x-retry-from-dlq': true, 'x-retry-time': new Date().toISOString(), 'x-retry-by': req.user?.username || 'admin', 'x-retry-method': 'retryByPosition', 'x-retry-position': position } }); // Only ack after successful send to main queue channel.ack(targetMessage); pino.info(`Retried message at position ${position} from ${dlqName} to ${queueName}`); res.json({ success: true, message: { position, taskInfo, headers: targetMessage.properties.headers } }); } catch (error) { pino.error({ err: error }, 'Failed to send message to main queue, keeping in DLQ'); // Reject and requeue to DLQ - message stays in DLQ on failure channel.nack(targetMessage, false, true); throw new AppError(Errors.UNKNOWN_APP_ERROR, `Failed to retry message: ${error.message}`); } } else { throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve target message'); } } catch (error) { if (error instanceof AppParamError || error instanceof AppError) { next(error); } else { pino.error('Error retrying DLQ message by position:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ message')); } } finally { await closeRabbitMQ(channel, connection); } }; /** * @api {post} /api/dlq/:queueName/:queueName/retryByHeader Retry DLQ Messages by Header * @apiName RetryDLQByHeader * @apiGroup DLQ * @apiDescription Queue-native retry - retry messages matching specific header criteria (for filtering only) * * Diagnostic headers (all queues): x-error-category, x-task-type, x-severity * Context headers (partner_tasks): x-partner-code, x-customer-id * * @apiParam {String} queueName Main queue name (e.g., 'partner_tasks') * @apiBody {String} headerName Header name to match (e.g., 'x-partner-code', 'x-error-category') * @apiBody {String} headerValue Header value to match (e.g., 'SATLOC', 'transient') * @apiBody {Number} [maxMessages=100] Maximum number of messages to retry * * @apiSuccess {Boolean} success Whether retry was successful * @apiSuccess {Number} retriedCount Number of messages retried * @apiSuccess {Number} scannedCount Number of messages scanned */ exports.retryDLQByHeader_post = async (req, res, next) => { let connection, channel; try { const { queueName } = req.params; const { headerName, headerValue, maxMessages = 100 } = req.body; // Validate inputs if (!queueName || typeof queueName !== 'string') { throw new AppParamError('Invalid queue name'); } if (!headerName || typeof headerName !== 'string') { throw new AppParamError('Header name is required'); } if (headerValue === undefined || headerValue === null) { throw new AppParamError('Header value is required'); } const dlqName = getDLQName(queueName); connection = await createRabbitMQConnection(); channel = await connection.createChannel(); await channel.checkQueue(queueName); await channel.checkQueue(dlqName); let retriedCount = 0; let scannedCount = 0; let failedCount = 0; const messagesToRequeue = []; // Scan DLQ for matching messages (non-disruptive: send then ack) for (let i = 0; i < maxMessages * 2; i++) { // Scan up to 2x maxMessages to find matches const msg = await channel.get(dlqName, { noAck: false }); if (!msg) break; scannedCount++; const msgHeaderValue = msg.properties.headers?.[headerName]; if (msgHeaderValue === headerValue || String(msgHeaderValue) === String(headerValue)) { // Match found - retry to main queue (non-disruptive) try { await channel.sendToQueue(queueName, msg.content, { persistent: true, headers: { ...msg.properties.headers, 'x-retry-from-dlq': true, 'x-retry-time': new Date().toISOString(), 'x-retry-by': req.user?.username || 'admin', 'x-retry-method': 'retryByHeader', 'x-retry-header': `${headerName}=${headerValue}` } }); // Only ack after successful send to main queue channel.ack(msg); retriedCount++; if (retriedCount >= maxMessages) { break; } } catch (error) { pino.error({ err: error }, 'Failed to send matching message to main queue'); // Reject and requeue to DLQ - message stays in DLQ on failure channel.nack(msg, false, true); failedCount++; } } else { // No match - requeue to DLQ for later processing messagesToRequeue.push(msg); } } // Requeue non-matching messages back to DLQ (non-disruptive) for (const msg of messagesToRequeue) { try { await channel.sendToQueue(dlqName, msg.content, { persistent: true, headers: msg.properties.headers }); channel.ack(msg); } catch (error) { pino.error({ err: error }, 'Failed to requeue non-matching message'); channel.nack(msg, false, true); } } pino.info(`Retried ${retriedCount} messages matching ${headerName}=${headerValue} from ${dlqName}, ${failedCount} failed`); res.json({ success: true, retriedCount, scannedCount, failedCount, headerName, headerValue, queueName, dlqName }); } catch (error) { if (error instanceof AppParamError) { next(error); } else { pino.error('Error retrying DLQ messages by header:', error); next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages by header')); } } finally { await closeRabbitMQ(channel, connection); } }; /** * Helper function to categorize errors */ function categorizeError(errorMessage) { if (!errorMessage) return 'unknown'; const msg = errorMessage.toLowerCase(); // Transient errors if (msg.includes('timeout') || msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('network') || msg.includes('connection')) { return 'transient'; } // Validation errors if (msg.includes('validation') || msg.includes('invalid') || msg.includes('required') || msg.includes('missing') || msg.includes('format')) { return 'validation'; } // Processing errors if (msg.includes('parse') || msg.includes('calculation') || msg.includes('processing') || msg.includes('data')) { return 'processing'; } // Infrastructure errors if (msg.includes('database') || msg.includes('mongo') || msg.includes('filesystem') || msg.includes('disk')) { return 'infrastructure'; } // Partner API errors if (msg.includes('api') || msg.includes('authentication') || msg.includes('unauthorized') || msg.includes('rate limit')) { return 'partner_api'; } return 'unknown'; }