agmission/Development/server/docs/archived/partner_dlq.js

973 lines
29 KiB
JavaScript

'use strict';
const { AppError, AppParamError } = require('../helpers/app_error');
const { Errors } = require('../helpers/constants');
const PartnerLogTracker = require('../model/partner_log_tracker');
const amqp = require('amqplib');
const env = require('../helpers/env');
const pino = require('../helpers/logger').child('partner_dlq');
/**
* @api {get} /api/dlq/:queueName/stats Get DLQ Statistics
* @apiName GetDLQStats
* @apiGroup PartnerDLQ
* @apiDescription Get comprehensive statistics about the Dead Letter Queue and partner log processing
*
* @apiSuccess {Object} dlq DLQ message queue statistics
* @apiSuccess {Number} dlq.messageCount Number of messages in DLQ
* @apiSuccess {Number} dlq.consumerCount Number of active consumers
* @apiSuccess {String} dlq.queueName Name of the DLQ
* @apiSuccess {Object} trackers Partner log tracker status counts
* @apiSuccess {Number} trackers.failed Number of failed tasks
* @apiSuccess {Number} trackers.processing Number of currently processing tasks
* @apiSuccess {Number} trackers.downloaded Number of downloaded tasks
* @apiSuccess {Number} trackers.processed Number of successfully processed tasks
* @apiSuccess {Number} trackers.archived Number of archived tasks
* @apiSuccess {Array} recentFailures Recent failed tasks with details
*/
exports.getDLQStats_get = async (req, res, next) => {
try {
// Get queue statistics
const queueName = env.QUEUE_NAME_PARTNER;
const dlqName = `${queueName}_failed`;
let connection, channel, queueInfo;
try {
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.assertQueue(dlqName, { durable: true });
queueInfo = await channel.checkQueue(dlqName);
} catch (error) {
pino.error('Error connecting to RabbitMQ:', error);
queueInfo = {
messageCount: -1,
consumerCount: 0,
error: error.message
};
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
// Get tracker statistics
const trackerStats = await PartnerLogTracker.aggregate([
{
$group: {
_id: '$status',
count: { $sum: 1 }
}
}
]);
const trackers = {
failed: 0,
processing: 0,
downloaded: 0,
processed: 0,
archived: 0
};
trackerStats.forEach(stat => {
if (trackers.hasOwnProperty(stat._id)) {
trackers[stat._id] = stat.count;
}
});
// Get recent failures (last 20)
const recentFailures = await PartnerLogTracker.find({ status: 'failed' })
.sort({ updatedAt: -1 })
.limit(20)
.select('_id logFileName partnerCode errorMessage retryCount updatedAt')
.populate('customerId', 'name username')
.lean();
// Format failures for response
const formattedFailures = recentFailures.map(f => ({
id: f._id.toString(),
logFileName: f.logFileName,
partnerCode: f.partnerCode,
customer: f.customerId,
errorMessage: f.errorMessage,
retryCount: f.retryCount,
failedAt: f.updatedAt
}));
res.json({
dlq: {
messageCount: queueInfo.messageCount,
consumerCount: queueInfo.consumerCount,
queueName: dlqName,
...(queueInfo.error && { error: queueInfo.error })
},
trackers,
recentFailures: formattedFailures
});
} catch (error) {
pino.error('Error getting DLQ stats:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to get DLQ statistics'));
}
};
/**
* @api {get} /api/dlq/:queueName/messages Get DLQ Messages
* @apiName GetDLQMessages
* @apiGroup PartnerDLQ
* @apiDescription Retrieve messages from the Dead Letter Queue without consuming them
*
* @apiQuery {Number} [limit=50] Maximum number of messages to retrieve
*
* @apiSuccess {Array} messages Array of DLQ messages
* @apiSuccess {String} messages.taskInfo Task information
* @apiSuccess {String} messages.errorMessage Error message if available
* @apiSuccess {Number} messages.retryCount Number of retries
* @apiSuccess {Date} messages.enqueuedAt When message was added to DLQ
*/
exports.getDLQMessages_get = async (req, res, next) => {
let connection, channel;
try {
const limit = parseInt(req.query.limit) || 50;
const queueName = env.QUEUE_NAME_PARTNER;
const dlqName = `${queueName}_failed`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.assertQueue(dlqName, { durable: true });
const messages = [];
// Get messages without consuming them
for (let i = 0; i < limit; i++) {
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) break;
try {
const content = JSON.parse(msg.content.toString());
messages.push({
taskInfo: content.taskInfo || content,
errorMessage: content.errorMessage,
retryCount: content.retryCount || 0,
enqueuedAt: msg.properties.timestamp || null,
headers: msg.properties.headers
});
// Requeue the message (we're just peeking)
channel.nack(msg, false, true);
} catch (parseError) {
pino.error('Error parsing DLQ message:', parseError);
channel.nack(msg, false, true);
}
}
res.json({ messages });
} catch (error) {
pino.error('Error getting DLQ messages:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to get DLQ messages'));
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/process Process DLQ
* @apiName ProcessDLQ
* @apiGroup PartnerDLQ
* @apiDescription Process messages in the Dead Letter Queue - categorize errors and retry/archive
*
* @apiBody {Number} [maxMessages=100] Maximum number of messages to process
* @apiBody {Boolean} [dryRun=false] If true, only analyze without taking action
*
* @apiSuccess {Number} processed Number of messages processed
* @apiSuccess {Number} retried Number of messages retried
* @apiSuccess {Number} archived Number of messages archived
* @apiSuccess {Object} categorization Error categorization results
*/
exports.processDLQ_post = async (req, res, next) => {
let connection, channel;
try {
const maxMessages = parseInt(req.body.maxMessages) || 100;
const dryRun = req.body.dryRun === true;
const queueName = env.QUEUE_NAME_PARTNER;
const dlqName = `${queueName}_failed`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.assertQueue(dlqName, { durable: true });
// Try to assert queue with DLX, fallback to existing queue configuration
try {
await channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': dlqName
}
});
} catch (error) {
if (error.message && error.message.includes('PRECONDITION_FAILED')) {
// Queue exists with different configuration - use as-is
await channel.assertQueue(queueName, { durable: true });
pino.warn('Using existing queue without DLX configuration: %s', queueName);
} else {
throw error;
}
}
const results = {
processed: 0,
retried: 0,
archived: 0,
categorization: {
transient: 0,
validation: 0,
processing: 0,
infrastructure: 0,
partner_api: 0,
unknown: 0
}
};
// Process messages
for (let i = 0; i < maxMessages; i++) {
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) break;
try {
const taskInfo = JSON.parse(msg.content.toString());
results.processed++;
// Get tracker info for error categorization
const tracker = await PartnerLogTracker.findOne({
logFileName: taskInfo.logFileName,
partnerId: taskInfo.partnerId,
customerId: taskInfo.customerId
});
if (!tracker) {
pino.warn(`No tracker found for ${taskInfo.logFileName}`);
channel.ack(msg);
continue;
}
// Categorize error
const category = categorizeError(tracker.errorMessage);
results.categorization[category]++;
// Determine action based on category and age
const messageAge = Date.now() - new Date(tracker.updatedAt).getTime();
const AUTO_RETRY_WINDOW_MS = 2 * 60 * 60 * 1000; // 2 hours
const MAX_AGE_MS = 24 * 60 * 60 * 1000; // 24 hours
let action = 'keep';
if (category === 'transient' && messageAge < AUTO_RETRY_WINDOW_MS) {
action = 'retry';
} else if (category === 'validation' || messageAge > MAX_AGE_MS) {
action = 'archive';
}
if (!dryRun) {
if (action === 'retry') {
// Reset tracker and retry
await PartnerLogTracker.updateOne(
{ _id: tracker._id },
{
$set: { status: 'downloaded' },
$unset: { errorMessage: 1 }
}
);
// Send back to main queue
channel.sendToQueue(queueName, msg.content, {
persistent: true,
headers: {
...msg.properties.headers,
'x-retry-from-dlq': true,
'x-dlq-retry-time': new Date().toISOString()
}
});
results.retried++;
channel.ack(msg);
} else if (action === 'archive') {
// Archive the tracker
await PartnerLogTracker.updateOne(
{ _id: tracker._id },
{
$set: {
status: 'archived',
archivedAt: new Date(),
archivedReason: `DLQ: ${category} error, age: ${Math.round(messageAge / 3600000)}h`
}
}
);
results.archived++;
channel.ack(msg);
} else {
// Keep in DLQ for manual review
channel.nack(msg, false, true);
}
} else {
// Dry run - just requeue
channel.nack(msg, false, true);
}
} catch (error) {
pino.error('Error processing DLQ message:', error);
channel.nack(msg, false, true);
}
}
res.json({
...results,
dryRun,
timestamp: new Date().toISOString()
});
} catch (error) {
pino.error('Error processing DLQ:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to process DLQ'));
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/retryAll Retry All DLQ Messages
* @apiName RetryAllDLQ
* @apiGroup PartnerDLQ
* @apiDescription Retry all messages currently in the DLQ (queue-native operation)
*
* @apiParam {String} queueName Queue name (e.g., 'partner_tasks')
*
* @apiSuccess {Boolean} success Whether retry was successful
* @apiSuccess {String} message Status message
* @apiSuccess {Number} retriedCount Number of messages retried
*
* @deprecated This JSDoc refers to old tracker-ID-based retry.
* See retryAllDLQ_post, retryDLQByPosition_post, retryDLQByHeader_post for current implementations.
*/
exports.retryFailedTask_post = async (req, res, next) => {
let connection, channel;
try {
const { id } = req.params;
// Find the tracker
const tracker = await PartnerLogTracker.findById(id);
if (!tracker) {
throw new AppParamError('Partner log tracker not found');
}
if (tracker.status !== 'failed' && tracker.status !== 'archived') {
throw new AppParamError(`Cannot retry task with status: ${tracker.status}`);
}
// Connect to RabbitMQ
const queueName = env.QUEUE_NAME_PARTNER;
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.assertQueue(queueName, { durable: true });
// Reset tracker status
await PartnerLogTracker.updateOne(
{ _id: tracker._id },
{
$set: {
status: 'downloaded',
processingStartedAt: null
},
$unset: { errorMessage: 1 }
}
);
// Send to queue
const taskInfo = {
logFileName: tracker.logFileName,
partnerId: tracker.partnerId.toString(),
customerId: tracker.customerId.toString()
};
channel.sendToQueue(queueName, Buffer.from(JSON.stringify(taskInfo)), {
persistent: true,
headers: {
'x-manual-retry': true,
'x-retry-time': new Date().toISOString(),
'x-retry-by': req.user?.username || 'admin'
}
});
pino.info(`Manually retried task: ${tracker.logFileName}`);
res.json({
success: true,
message: 'Task has been queued for retry',
taskInfo
});
} catch (error) {
if (error instanceof AppParamError) {
next(error);
} else {
pino.error('Error retrying task:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry task'));
}
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/retryByPosition Retry DLQ by Position
* @apiName RetryDLQByPosition
* @apiGroup PartnerDLQ
* @apiDescription Retry messages by position range (queue-native operation)
*
* @apiParam {String} queueName Queue name
* @apiBody {Number} startPosition Start position (1-based)
* @apiBody {Number} endPosition End position (inclusive)
*
* @apiSuccess {Boolean} success Whether retry was successful
* @apiSuccess {String} message Status message
* @apiSuccess {Number} retriedCount Number of messages retried
*
* @deprecated This JSDoc refers to old tracker-ID-based archive.
* Archive functionality has been replaced with queue-native retry operations.
*/
exports.archiveFailedTask_post = async (req, res, next) => {
try {
const { id } = req.params;
const { reason } = req.body;
// Find the tracker
const tracker = await PartnerLogTracker.findById(id);
if (!tracker) {
throw new AppParamError('Partner log tracker not found');
}
// Archive the tracker
await PartnerLogTracker.updateOne(
{ _id: tracker._id },
{
$set: {
status: 'archived',
archivedAt: new Date(),
archivedReason: reason || 'Manually archived',
archivedBy: req.user?.username || 'admin'
}
}
);
pino.info(`Archived task: ${tracker.logFileName}, reason: ${reason || 'Manual'}`);
res.json({
success: true,
message: 'Task has been archived'
});
} catch (error) {
if (error instanceof AppParamError) {
next(error);
} else {
pino.error('Error archiving task:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to archive task'));
}
}
};
/**
* @api {delete} /api/dlq/:queueName/purge Purge DLQ
* @apiName PurgeDLQ
* @apiGroup PartnerDLQ
* @apiDescription Purge all messages from the Dead Letter Queue (USE WITH CAUTION)
*
* @apiBody {Boolean} confirm Must be set to true to confirm purge
*
* @apiSuccess {Boolean} success Whether purge was successful
* @apiSuccess {Number} purgedCount Number of messages purged
*/
exports.purgeDLQ_delete = async (req, res, next) => {
let connection, channel;
try {
const { confirm } = req.body;
if (confirm !== true) {
throw new AppParamError('Must confirm purge by setting confirm=true');
}
const queueName = env.QUEUE_NAME_PARTNER;
const dlqName = `${queueName}_failed`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.assertQueue(dlqName, { durable: true });
// Get count before purge
const queueInfo = await channel.checkQueue(dlqName);
const messageCount = queueInfo.messageCount;
// Purge the queue
await channel.purgeQueue(dlqName);
pino.warn(`DLQ purged by ${req.user?.username || 'admin'}, ${messageCount} messages deleted`);
res.json({
success: true,
purgedCount: messageCount,
message: `Purged ${messageCount} messages from DLQ`
});
} catch (error) {
if (error instanceof AppParamError) {
next(error);
} else {
pino.error('Error purging DLQ:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to purge DLQ'));
}
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/retryAll Retry All DLQ Messages
* @apiName RetryAllDLQMessages
* @apiGroup PartnerDLQ
* @apiDescription Queue-native retry - moves all messages from DLQ back to main queue
*
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
* @apiBody {Number} [maxMessages=100] Maximum number of messages to retry
*
* @apiSuccess {Boolean} success Whether retry was successful
* @apiSuccess {Number} retriedCount Number of messages retried
*/
exports.retryAllDLQ_post = async (req, res, next) => {
let connection, channel;
try {
const { queueName } = req.params;
const maxMessages = parseInt(req.body.maxMessages) || 100;
// Validate queue name
if (!queueName || typeof queueName !== 'string') {
throw new AppParamError('Invalid queue name');
}
const dlqName = `${queueName}_dlq`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
// Check if queues exist
await channel.checkQueue(queueName);
await channel.checkQueue(dlqName);
let retriedCount = 0;
// Move messages from DLQ to main queue
for (let i = 0; i < maxMessages; i++) {
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) break;
try {
// Send to main queue with retry metadata
channel.sendToQueue(queueName, msg.content, {
persistent: true,
headers: {
...msg.properties.headers,
'x-retry-from-dlq': true,
'x-retry-time': new Date().toISOString(),
'x-retry-by': req.user?.username || 'admin',
'x-retry-method': 'retryAll'
}
});
channel.ack(msg);
retriedCount++;
} catch (error) {
pino.error({ err: error }, 'Error retrying message');
channel.nack(msg, false, true); // Requeue on error
}
}
pino.info(`Retried ${retriedCount} messages from ${dlqName} to ${queueName}`);
res.json({
success: true,
retriedCount,
queueName,
dlqName
});
} catch (error) {
if (error instanceof AppParamError) {
next(error);
} else {
pino.error('Error retrying all DLQ messages:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages'));
}
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/retryByPosition Retry DLQ Message by Position
* @apiName RetryDLQByPosition
* @apiGroup PartnerDLQ
* @apiDescription Queue-native retry - retry a specific message by its position in the DLQ
*
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
* @apiBody {Number} position Position of the message in DLQ (0-based index)
*
* @apiSuccess {Boolean} success Whether retry was successful
* @apiSuccess {Object} message Message information
*/
exports.retryDLQByPosition_post = async (req, res, next) => {
let connection, channel;
try {
const { queueName } = req.params;
const { position } = req.body;
// Validate inputs
if (!queueName || typeof queueName !== 'string') {
throw new AppParamError('Invalid queue name');
}
if (typeof position !== 'number' || position < 0) {
throw new AppParamError('Position must be a non-negative number');
}
const dlqName = `${queueName}_dlq`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.checkQueue(queueName);
const dlqInfo = await channel.checkQueue(dlqName);
if (position >= dlqInfo.messageCount) {
throw new AppParamError(`Position ${position} is out of range (DLQ has ${dlqInfo.messageCount} messages)`);
}
// Collect and requeue messages before target
const messagesToRequeue = [];
let targetMessage = null;
// Get messages up to and including target position
for (let i = 0; i <= position; i++) {
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) {
throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve message at position');
}
if (i === position) {
targetMessage = msg;
} else {
messagesToRequeue.push(msg);
}
}
// Requeue the messages we skipped
for (const msg of messagesToRequeue) {
channel.sendToQueue(dlqName, msg.content, {
persistent: true,
headers: msg.properties.headers
});
channel.ack(msg);
}
// Retry the target message to main queue
if (targetMessage) {
const taskInfo = JSON.parse(targetMessage.content.toString());
channel.sendToQueue(queueName, targetMessage.content, {
persistent: true,
headers: {
...targetMessage.properties.headers,
'x-retry-from-dlq': true,
'x-retry-time': new Date().toISOString(),
'x-retry-by': req.user?.username || 'admin',
'x-retry-method': 'retryByPosition',
'x-retry-position': position
}
});
channel.ack(targetMessage);
pino.info(`Retried message at position ${position} from ${dlqName} to ${queueName}`);
res.json({
success: true,
message: {
position,
taskInfo,
headers: targetMessage.properties.headers
}
});
} else {
throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve target message');
}
} catch (error) {
if (error instanceof AppParamError || error instanceof AppError) {
next(error);
} else {
pino.error('Error retrying DLQ message by position:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ message'));
}
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* @api {post} /api/dlq/:queueName/retryByHeader Retry DLQ Messages by Header
* @apiName RetryDLQByHeader
* @apiGroup PartnerDLQ
* @apiDescription Queue-native retry - retry messages matching specific header criteria
*
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
* @apiBody {String} headerName Header name to match (e.g., 'x-partner-code')
* @apiBody {String} headerValue Header value to match
* @apiBody {Number} [maxMessages=100] Maximum number of messages to retry
*
* @apiSuccess {Boolean} success Whether retry was successful
* @apiSuccess {Number} retriedCount Number of messages retried
* @apiSuccess {Number} scannedCount Number of messages scanned
*/
exports.retryDLQByHeader_post = async (req, res, next) => {
let connection, channel;
try {
const { queueName } = req.params;
const { headerName, headerValue, maxMessages = 100 } = req.body;
// Validate inputs
if (!queueName || typeof queueName !== 'string') {
throw new AppParamError('Invalid queue name');
}
if (!headerName || typeof headerName !== 'string') {
throw new AppParamError('Header name is required');
}
if (headerValue === undefined || headerValue === null) {
throw new AppParamError('Header value is required');
}
const dlqName = `${queueName}_dlq`;
// Connect to RabbitMQ
connection = await amqp.connect({
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR,
password: env.QUEUE_PWD
});
channel = await connection.createChannel();
await channel.checkQueue(queueName);
await channel.checkQueue(dlqName);
let retriedCount = 0;
let scannedCount = 0;
const messagesToRequeue = [];
// Scan DLQ for matching messages
for (let i = 0; i < maxMessages * 2; i++) { // Scan up to 2x maxMessages to find matches
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) break;
scannedCount++;
const msgHeaderValue = msg.properties.headers?.[headerName];
if (msgHeaderValue === headerValue || String(msgHeaderValue) === String(headerValue)) {
// Match found - retry to main queue
channel.sendToQueue(queueName, msg.content, {
persistent: true,
headers: {
...msg.properties.headers,
'x-retry-from-dlq': true,
'x-retry-time': new Date().toISOString(),
'x-retry-by': req.user?.username || 'admin',
'x-retry-method': 'retryByHeader',
'x-retry-header': `${headerName}=${headerValue}`
}
});
channel.ack(msg);
retriedCount++;
if (retriedCount >= maxMessages) {
break;
}
} else {
// No match - requeue to DLQ for later processing
messagesToRequeue.push(msg);
}
}
// Requeue non-matching messages back to DLQ
for (const msg of messagesToRequeue) {
channel.sendToQueue(dlqName, msg.content, {
persistent: true,
headers: msg.properties.headers
});
channel.ack(msg);
}
pino.info(`Retried ${retriedCount} messages matching ${headerName}=${headerValue} from ${dlqName}`);
res.json({
success: true,
retriedCount,
scannedCount,
headerName,
headerValue,
queueName,
dlqName
});
} catch (error) {
if (error instanceof AppParamError) {
next(error);
} else {
pino.error('Error retrying DLQ messages by header:', error);
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages by header'));
}
} finally {
if (channel) await channel.close().catch(() => { });
if (connection) await connection.close().catch(() => { });
}
};
/**
* Helper function to categorize errors
*/
function categorizeError(errorMessage) {
if (!errorMessage) return 'unknown';
const msg = errorMessage.toLowerCase();
// Transient errors
if (msg.includes('timeout') ||
msg.includes('econnrefused') ||
msg.includes('enotfound') ||
msg.includes('network') ||
msg.includes('connection')) {
return 'transient';
}
// Validation errors
if (msg.includes('validation') ||
msg.includes('invalid') ||
msg.includes('required') ||
msg.includes('missing') ||
msg.includes('format')) {
return 'validation';
}
// Processing errors
if (msg.includes('parse') ||
msg.includes('calculation') ||
msg.includes('processing') ||
msg.includes('data')) {
return 'processing';
}
// Infrastructure errors
if (msg.includes('database') ||
msg.includes('mongo') ||
msg.includes('filesystem') ||
msg.includes('disk')) {
return 'infrastructure';
}
// Partner API errors
if (msg.includes('api') ||
msg.includes('authentication') ||
msg.includes('unauthorized') ||
msg.includes('rate limit')) {
return 'partner_api';
}
return 'unknown';
}