973 lines
29 KiB
JavaScript
973 lines
29 KiB
JavaScript
'use strict';
|
|
|
|
const { AppError, AppParamError } = require('../helpers/app_error');
|
|
const { Errors } = require('../helpers/constants');
|
|
const PartnerLogTracker = require('../model/partner_log_tracker');
|
|
const amqp = require('amqplib');
|
|
const env = require('../helpers/env');
|
|
const pino = require('../helpers/logger').child('partner_dlq');
|
|
|
|
/**
|
|
* @api {get} /api/dlq/:queueName/stats Get DLQ Statistics
|
|
* @apiName GetDLQStats
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Get comprehensive statistics about the Dead Letter Queue and partner log processing
|
|
*
|
|
* @apiSuccess {Object} dlq DLQ message queue statistics
|
|
* @apiSuccess {Number} dlq.messageCount Number of messages in DLQ
|
|
* @apiSuccess {Number} dlq.consumerCount Number of active consumers
|
|
* @apiSuccess {String} dlq.queueName Name of the DLQ
|
|
* @apiSuccess {Object} trackers Partner log tracker status counts
|
|
* @apiSuccess {Number} trackers.failed Number of failed tasks
|
|
* @apiSuccess {Number} trackers.processing Number of currently processing tasks
|
|
* @apiSuccess {Number} trackers.downloaded Number of downloaded tasks
|
|
* @apiSuccess {Number} trackers.processed Number of successfully processed tasks
|
|
* @apiSuccess {Number} trackers.archived Number of archived tasks
|
|
* @apiSuccess {Array} recentFailures Recent failed tasks with details
|
|
*/
|
|
exports.getDLQStats_get = async (req, res, next) => {
|
|
try {
|
|
// Get queue statistics
|
|
const queueName = env.QUEUE_NAME_PARTNER;
|
|
const dlqName = `${queueName}_failed`;
|
|
|
|
let connection, channel, queueInfo;
|
|
|
|
try {
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
await channel.assertQueue(dlqName, { durable: true });
|
|
queueInfo = await channel.checkQueue(dlqName);
|
|
} catch (error) {
|
|
pino.error('Error connecting to RabbitMQ:', error);
|
|
queueInfo = {
|
|
messageCount: -1,
|
|
consumerCount: 0,
|
|
error: error.message
|
|
};
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
|
|
// Get tracker statistics
|
|
const trackerStats = await PartnerLogTracker.aggregate([
|
|
{
|
|
$group: {
|
|
_id: '$status',
|
|
count: { $sum: 1 }
|
|
}
|
|
}
|
|
]);
|
|
|
|
const trackers = {
|
|
failed: 0,
|
|
processing: 0,
|
|
downloaded: 0,
|
|
processed: 0,
|
|
archived: 0
|
|
};
|
|
|
|
trackerStats.forEach(stat => {
|
|
if (trackers.hasOwnProperty(stat._id)) {
|
|
trackers[stat._id] = stat.count;
|
|
}
|
|
});
|
|
|
|
// Get recent failures (last 20)
|
|
const recentFailures = await PartnerLogTracker.find({ status: 'failed' })
|
|
.sort({ updatedAt: -1 })
|
|
.limit(20)
|
|
.select('_id logFileName partnerCode errorMessage retryCount updatedAt')
|
|
.populate('customerId', 'name username')
|
|
.lean();
|
|
|
|
// Format failures for response
|
|
const formattedFailures = recentFailures.map(f => ({
|
|
id: f._id.toString(),
|
|
logFileName: f.logFileName,
|
|
partnerCode: f.partnerCode,
|
|
customer: f.customerId,
|
|
errorMessage: f.errorMessage,
|
|
retryCount: f.retryCount,
|
|
failedAt: f.updatedAt
|
|
}));
|
|
|
|
res.json({
|
|
dlq: {
|
|
messageCount: queueInfo.messageCount,
|
|
consumerCount: queueInfo.consumerCount,
|
|
queueName: dlqName,
|
|
...(queueInfo.error && { error: queueInfo.error })
|
|
},
|
|
trackers,
|
|
recentFailures: formattedFailures
|
|
});
|
|
|
|
} catch (error) {
|
|
pino.error('Error getting DLQ stats:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to get DLQ statistics'));
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {get} /api/dlq/:queueName/messages Get DLQ Messages
|
|
* @apiName GetDLQMessages
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Retrieve messages from the Dead Letter Queue without consuming them
|
|
*
|
|
* @apiQuery {Number} [limit=50] Maximum number of messages to retrieve
|
|
*
|
|
* @apiSuccess {Array} messages Array of DLQ messages
|
|
* @apiSuccess {String} messages.taskInfo Task information
|
|
* @apiSuccess {String} messages.errorMessage Error message if available
|
|
* @apiSuccess {Number} messages.retryCount Number of retries
|
|
* @apiSuccess {Date} messages.enqueuedAt When message was added to DLQ
|
|
*/
|
|
exports.getDLQMessages_get = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const limit = parseInt(req.query.limit) || 50;
|
|
const queueName = env.QUEUE_NAME_PARTNER;
|
|
const dlqName = `${queueName}_failed`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
await channel.assertQueue(dlqName, { durable: true });
|
|
|
|
const messages = [];
|
|
|
|
// Get messages without consuming them
|
|
for (let i = 0; i < limit; i++) {
|
|
const msg = await channel.get(dlqName, { noAck: false });
|
|
if (!msg) break;
|
|
|
|
try {
|
|
const content = JSON.parse(msg.content.toString());
|
|
messages.push({
|
|
taskInfo: content.taskInfo || content,
|
|
errorMessage: content.errorMessage,
|
|
retryCount: content.retryCount || 0,
|
|
enqueuedAt: msg.properties.timestamp || null,
|
|
headers: msg.properties.headers
|
|
});
|
|
|
|
// Requeue the message (we're just peeking)
|
|
channel.nack(msg, false, true);
|
|
} catch (parseError) {
|
|
pino.error('Error parsing DLQ message:', parseError);
|
|
channel.nack(msg, false, true);
|
|
}
|
|
}
|
|
|
|
res.json({ messages });
|
|
|
|
} catch (error) {
|
|
pino.error('Error getting DLQ messages:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to get DLQ messages'));
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/process Process DLQ
|
|
* @apiName ProcessDLQ
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Process messages in the Dead Letter Queue - categorize errors and retry/archive
|
|
*
|
|
* @apiBody {Number} [maxMessages=100] Maximum number of messages to process
|
|
* @apiBody {Boolean} [dryRun=false] If true, only analyze without taking action
|
|
*
|
|
* @apiSuccess {Number} processed Number of messages processed
|
|
* @apiSuccess {Number} retried Number of messages retried
|
|
* @apiSuccess {Number} archived Number of messages archived
|
|
* @apiSuccess {Object} categorization Error categorization results
|
|
*/
|
|
exports.processDLQ_post = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const maxMessages = parseInt(req.body.maxMessages) || 100;
|
|
const dryRun = req.body.dryRun === true;
|
|
|
|
const queueName = env.QUEUE_NAME_PARTNER;
|
|
const dlqName = `${queueName}_failed`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
await channel.assertQueue(dlqName, { durable: true });
|
|
|
|
// Try to assert queue with DLX, fallback to existing queue configuration
|
|
try {
|
|
await channel.assertQueue(queueName, {
|
|
durable: true,
|
|
arguments: {
|
|
'x-dead-letter-exchange': '',
|
|
'x-dead-letter-routing-key': dlqName
|
|
}
|
|
});
|
|
} catch (error) {
|
|
if (error.message && error.message.includes('PRECONDITION_FAILED')) {
|
|
// Queue exists with different configuration - use as-is
|
|
await channel.assertQueue(queueName, { durable: true });
|
|
pino.warn('Using existing queue without DLX configuration: %s', queueName);
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
const results = {
|
|
processed: 0,
|
|
retried: 0,
|
|
archived: 0,
|
|
categorization: {
|
|
transient: 0,
|
|
validation: 0,
|
|
processing: 0,
|
|
infrastructure: 0,
|
|
partner_api: 0,
|
|
unknown: 0
|
|
}
|
|
};
|
|
|
|
// Process messages
|
|
for (let i = 0; i < maxMessages; i++) {
|
|
const msg = await channel.get(dlqName, { noAck: false });
|
|
if (!msg) break;
|
|
|
|
try {
|
|
const taskInfo = JSON.parse(msg.content.toString());
|
|
results.processed++;
|
|
|
|
// Get tracker info for error categorization
|
|
const tracker = await PartnerLogTracker.findOne({
|
|
logFileName: taskInfo.logFileName,
|
|
partnerId: taskInfo.partnerId,
|
|
customerId: taskInfo.customerId
|
|
});
|
|
|
|
if (!tracker) {
|
|
pino.warn(`No tracker found for ${taskInfo.logFileName}`);
|
|
channel.ack(msg);
|
|
continue;
|
|
}
|
|
|
|
// Categorize error
|
|
const category = categorizeError(tracker.errorMessage);
|
|
results.categorization[category]++;
|
|
|
|
// Determine action based on category and age
|
|
const messageAge = Date.now() - new Date(tracker.updatedAt).getTime();
|
|
const AUTO_RETRY_WINDOW_MS = 2 * 60 * 60 * 1000; // 2 hours
|
|
const MAX_AGE_MS = 24 * 60 * 60 * 1000; // 24 hours
|
|
|
|
let action = 'keep';
|
|
|
|
if (category === 'transient' && messageAge < AUTO_RETRY_WINDOW_MS) {
|
|
action = 'retry';
|
|
} else if (category === 'validation' || messageAge > MAX_AGE_MS) {
|
|
action = 'archive';
|
|
}
|
|
|
|
if (!dryRun) {
|
|
if (action === 'retry') {
|
|
// Reset tracker and retry
|
|
await PartnerLogTracker.updateOne(
|
|
{ _id: tracker._id },
|
|
{
|
|
$set: { status: 'downloaded' },
|
|
$unset: { errorMessage: 1 }
|
|
}
|
|
);
|
|
|
|
// Send back to main queue
|
|
channel.sendToQueue(queueName, msg.content, {
|
|
persistent: true,
|
|
headers: {
|
|
...msg.properties.headers,
|
|
'x-retry-from-dlq': true,
|
|
'x-dlq-retry-time': new Date().toISOString()
|
|
}
|
|
});
|
|
|
|
results.retried++;
|
|
channel.ack(msg);
|
|
|
|
} else if (action === 'archive') {
|
|
// Archive the tracker
|
|
await PartnerLogTracker.updateOne(
|
|
{ _id: tracker._id },
|
|
{
|
|
$set: {
|
|
status: 'archived',
|
|
archivedAt: new Date(),
|
|
archivedReason: `DLQ: ${category} error, age: ${Math.round(messageAge / 3600000)}h`
|
|
}
|
|
}
|
|
);
|
|
|
|
results.archived++;
|
|
channel.ack(msg);
|
|
|
|
} else {
|
|
// Keep in DLQ for manual review
|
|
channel.nack(msg, false, true);
|
|
}
|
|
} else {
|
|
// Dry run - just requeue
|
|
channel.nack(msg, false, true);
|
|
}
|
|
|
|
} catch (error) {
|
|
pino.error('Error processing DLQ message:', error);
|
|
channel.nack(msg, false, true);
|
|
}
|
|
}
|
|
|
|
res.json({
|
|
...results,
|
|
dryRun,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
} catch (error) {
|
|
pino.error('Error processing DLQ:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to process DLQ'));
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/retryAll Retry All DLQ Messages
|
|
* @apiName RetryAllDLQ
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Retry all messages currently in the DLQ (queue-native operation)
|
|
*
|
|
* @apiParam {String} queueName Queue name (e.g., 'partner_tasks')
|
|
*
|
|
* @apiSuccess {Boolean} success Whether retry was successful
|
|
* @apiSuccess {String} message Status message
|
|
* @apiSuccess {Number} retriedCount Number of messages retried
|
|
*
|
|
* @deprecated This JSDoc refers to old tracker-ID-based retry.
|
|
* See retryAllDLQ_post, retryDLQByPosition_post, retryDLQByHeader_post for current implementations.
|
|
*/
|
|
exports.retryFailedTask_post = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const { id } = req.params;
|
|
|
|
// Find the tracker
|
|
const tracker = await PartnerLogTracker.findById(id);
|
|
|
|
if (!tracker) {
|
|
throw new AppParamError('Partner log tracker not found');
|
|
}
|
|
|
|
if (tracker.status !== 'failed' && tracker.status !== 'archived') {
|
|
throw new AppParamError(`Cannot retry task with status: ${tracker.status}`);
|
|
}
|
|
|
|
// Connect to RabbitMQ
|
|
const queueName = env.QUEUE_NAME_PARTNER;
|
|
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
await channel.assertQueue(queueName, { durable: true });
|
|
|
|
// Reset tracker status
|
|
await PartnerLogTracker.updateOne(
|
|
{ _id: tracker._id },
|
|
{
|
|
$set: {
|
|
status: 'downloaded',
|
|
processingStartedAt: null
|
|
},
|
|
$unset: { errorMessage: 1 }
|
|
}
|
|
);
|
|
|
|
// Send to queue
|
|
const taskInfo = {
|
|
logFileName: tracker.logFileName,
|
|
partnerId: tracker.partnerId.toString(),
|
|
customerId: tracker.customerId.toString()
|
|
};
|
|
|
|
channel.sendToQueue(queueName, Buffer.from(JSON.stringify(taskInfo)), {
|
|
persistent: true,
|
|
headers: {
|
|
'x-manual-retry': true,
|
|
'x-retry-time': new Date().toISOString(),
|
|
'x-retry-by': req.user?.username || 'admin'
|
|
}
|
|
});
|
|
|
|
pino.info(`Manually retried task: ${tracker.logFileName}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
message: 'Task has been queued for retry',
|
|
taskInfo
|
|
});
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error retrying task:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry task'));
|
|
}
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/retryByPosition Retry DLQ by Position
|
|
* @apiName RetryDLQByPosition
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Retry messages by position range (queue-native operation)
|
|
*
|
|
* @apiParam {String} queueName Queue name
|
|
* @apiBody {Number} startPosition Start position (1-based)
|
|
* @apiBody {Number} endPosition End position (inclusive)
|
|
*
|
|
* @apiSuccess {Boolean} success Whether retry was successful
|
|
* @apiSuccess {String} message Status message
|
|
* @apiSuccess {Number} retriedCount Number of messages retried
|
|
*
|
|
* @deprecated This JSDoc refers to old tracker-ID-based archive.
|
|
* Archive functionality has been replaced with queue-native retry operations.
|
|
*/
|
|
exports.archiveFailedTask_post = async (req, res, next) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { reason } = req.body;
|
|
|
|
// Find the tracker
|
|
const tracker = await PartnerLogTracker.findById(id);
|
|
|
|
if (!tracker) {
|
|
throw new AppParamError('Partner log tracker not found');
|
|
}
|
|
|
|
// Archive the tracker
|
|
await PartnerLogTracker.updateOne(
|
|
{ _id: tracker._id },
|
|
{
|
|
$set: {
|
|
status: 'archived',
|
|
archivedAt: new Date(),
|
|
archivedReason: reason || 'Manually archived',
|
|
archivedBy: req.user?.username || 'admin'
|
|
}
|
|
}
|
|
);
|
|
|
|
pino.info(`Archived task: ${tracker.logFileName}, reason: ${reason || 'Manual'}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
message: 'Task has been archived'
|
|
});
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error archiving task:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to archive task'));
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {delete} /api/dlq/:queueName/purge Purge DLQ
|
|
* @apiName PurgeDLQ
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Purge all messages from the Dead Letter Queue (USE WITH CAUTION)
|
|
*
|
|
* @apiBody {Boolean} confirm Must be set to true to confirm purge
|
|
*
|
|
* @apiSuccess {Boolean} success Whether purge was successful
|
|
* @apiSuccess {Number} purgedCount Number of messages purged
|
|
*/
|
|
exports.purgeDLQ_delete = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const { confirm } = req.body;
|
|
|
|
if (confirm !== true) {
|
|
throw new AppParamError('Must confirm purge by setting confirm=true');
|
|
}
|
|
|
|
const queueName = env.QUEUE_NAME_PARTNER;
|
|
const dlqName = `${queueName}_failed`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
await channel.assertQueue(dlqName, { durable: true });
|
|
|
|
// Get count before purge
|
|
const queueInfo = await channel.checkQueue(dlqName);
|
|
const messageCount = queueInfo.messageCount;
|
|
|
|
// Purge the queue
|
|
await channel.purgeQueue(dlqName);
|
|
|
|
pino.warn(`DLQ purged by ${req.user?.username || 'admin'}, ${messageCount} messages deleted`);
|
|
|
|
res.json({
|
|
success: true,
|
|
purgedCount: messageCount,
|
|
message: `Purged ${messageCount} messages from DLQ`
|
|
});
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error purging DLQ:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to purge DLQ'));
|
|
}
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/retryAll Retry All DLQ Messages
|
|
* @apiName RetryAllDLQMessages
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Queue-native retry - moves all messages from DLQ back to main queue
|
|
*
|
|
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
|
|
* @apiBody {Number} [maxMessages=100] Maximum number of messages to retry
|
|
*
|
|
* @apiSuccess {Boolean} success Whether retry was successful
|
|
* @apiSuccess {Number} retriedCount Number of messages retried
|
|
*/
|
|
exports.retryAllDLQ_post = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const { queueName } = req.params;
|
|
const maxMessages = parseInt(req.body.maxMessages) || 100;
|
|
|
|
// Validate queue name
|
|
if (!queueName || typeof queueName !== 'string') {
|
|
throw new AppParamError('Invalid queue name');
|
|
}
|
|
|
|
const dlqName = `${queueName}_dlq`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
|
|
// Check if queues exist
|
|
await channel.checkQueue(queueName);
|
|
await channel.checkQueue(dlqName);
|
|
|
|
let retriedCount = 0;
|
|
|
|
// Move messages from DLQ to main queue
|
|
for (let i = 0; i < maxMessages; i++) {
|
|
const msg = await channel.get(dlqName, { noAck: false });
|
|
if (!msg) break;
|
|
|
|
try {
|
|
// Send to main queue with retry metadata
|
|
channel.sendToQueue(queueName, msg.content, {
|
|
persistent: true,
|
|
headers: {
|
|
...msg.properties.headers,
|
|
'x-retry-from-dlq': true,
|
|
'x-retry-time': new Date().toISOString(),
|
|
'x-retry-by': req.user?.username || 'admin',
|
|
'x-retry-method': 'retryAll'
|
|
}
|
|
});
|
|
|
|
channel.ack(msg);
|
|
retriedCount++;
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error retrying message');
|
|
channel.nack(msg, false, true); // Requeue on error
|
|
}
|
|
}
|
|
|
|
pino.info(`Retried ${retriedCount} messages from ${dlqName} to ${queueName}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
retriedCount,
|
|
queueName,
|
|
dlqName
|
|
});
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error retrying all DLQ messages:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages'));
|
|
}
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/retryByPosition Retry DLQ Message by Position
|
|
* @apiName RetryDLQByPosition
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Queue-native retry - retry a specific message by its position in the DLQ
|
|
*
|
|
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
|
|
* @apiBody {Number} position Position of the message in DLQ (0-based index)
|
|
*
|
|
* @apiSuccess {Boolean} success Whether retry was successful
|
|
* @apiSuccess {Object} message Message information
|
|
*/
|
|
exports.retryDLQByPosition_post = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const { queueName } = req.params;
|
|
const { position } = req.body;
|
|
|
|
// Validate inputs
|
|
if (!queueName || typeof queueName !== 'string') {
|
|
throw new AppParamError('Invalid queue name');
|
|
}
|
|
|
|
if (typeof position !== 'number' || position < 0) {
|
|
throw new AppParamError('Position must be a non-negative number');
|
|
}
|
|
|
|
const dlqName = `${queueName}_dlq`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
|
|
await channel.checkQueue(queueName);
|
|
const dlqInfo = await channel.checkQueue(dlqName);
|
|
|
|
if (position >= dlqInfo.messageCount) {
|
|
throw new AppParamError(`Position ${position} is out of range (DLQ has ${dlqInfo.messageCount} messages)`);
|
|
}
|
|
|
|
// Collect and requeue messages before target
|
|
const messagesToRequeue = [];
|
|
let targetMessage = null;
|
|
|
|
// Get messages up to and including target position
|
|
for (let i = 0; i <= position; i++) {
|
|
const msg = await channel.get(dlqName, { noAck: false });
|
|
if (!msg) {
|
|
throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve message at position');
|
|
}
|
|
|
|
if (i === position) {
|
|
targetMessage = msg;
|
|
} else {
|
|
messagesToRequeue.push(msg);
|
|
}
|
|
}
|
|
|
|
// Requeue the messages we skipped
|
|
for (const msg of messagesToRequeue) {
|
|
channel.sendToQueue(dlqName, msg.content, {
|
|
persistent: true,
|
|
headers: msg.properties.headers
|
|
});
|
|
channel.ack(msg);
|
|
}
|
|
|
|
// Retry the target message to main queue
|
|
if (targetMessage) {
|
|
const taskInfo = JSON.parse(targetMessage.content.toString());
|
|
|
|
channel.sendToQueue(queueName, targetMessage.content, {
|
|
persistent: true,
|
|
headers: {
|
|
...targetMessage.properties.headers,
|
|
'x-retry-from-dlq': true,
|
|
'x-retry-time': new Date().toISOString(),
|
|
'x-retry-by': req.user?.username || 'admin',
|
|
'x-retry-method': 'retryByPosition',
|
|
'x-retry-position': position
|
|
}
|
|
});
|
|
|
|
channel.ack(targetMessage);
|
|
|
|
pino.info(`Retried message at position ${position} from ${dlqName} to ${queueName}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
message: {
|
|
position,
|
|
taskInfo,
|
|
headers: targetMessage.properties.headers
|
|
}
|
|
});
|
|
} else {
|
|
throw new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retrieve target message');
|
|
}
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError || error instanceof AppError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error retrying DLQ message by position:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ message'));
|
|
}
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @api {post} /api/dlq/:queueName/retryByHeader Retry DLQ Messages by Header
|
|
* @apiName RetryDLQByHeader
|
|
* @apiGroup PartnerDLQ
|
|
* @apiDescription Queue-native retry - retry messages matching specific header criteria
|
|
*
|
|
* @apiParam {String} queueName Main queue name (e.g., 'partner_tasks')
|
|
* @apiBody {String} headerName Header name to match (e.g., 'x-partner-code')
|
|
* @apiBody {String} headerValue Header value to match
|
|
* @apiBody {Number} [maxMessages=100] Maximum number of messages to retry
|
|
*
|
|
* @apiSuccess {Boolean} success Whether retry was successful
|
|
* @apiSuccess {Number} retriedCount Number of messages retried
|
|
* @apiSuccess {Number} scannedCount Number of messages scanned
|
|
*/
|
|
exports.retryDLQByHeader_post = async (req, res, next) => {
|
|
let connection, channel;
|
|
|
|
try {
|
|
const { queueName } = req.params;
|
|
const { headerName, headerValue, maxMessages = 100 } = req.body;
|
|
|
|
// Validate inputs
|
|
if (!queueName || typeof queueName !== 'string') {
|
|
throw new AppParamError('Invalid queue name');
|
|
}
|
|
|
|
if (!headerName || typeof headerName !== 'string') {
|
|
throw new AppParamError('Header name is required');
|
|
}
|
|
|
|
if (headerValue === undefined || headerValue === null) {
|
|
throw new AppParamError('Header value is required');
|
|
}
|
|
|
|
const dlqName = `${queueName}_dlq`;
|
|
|
|
// Connect to RabbitMQ
|
|
connection = await amqp.connect({
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR,
|
|
password: env.QUEUE_PWD
|
|
});
|
|
|
|
channel = await connection.createChannel();
|
|
|
|
await channel.checkQueue(queueName);
|
|
await channel.checkQueue(dlqName);
|
|
|
|
let retriedCount = 0;
|
|
let scannedCount = 0;
|
|
const messagesToRequeue = [];
|
|
|
|
// Scan DLQ for matching messages
|
|
for (let i = 0; i < maxMessages * 2; i++) { // Scan up to 2x maxMessages to find matches
|
|
const msg = await channel.get(dlqName, { noAck: false });
|
|
if (!msg) break;
|
|
|
|
scannedCount++;
|
|
const msgHeaderValue = msg.properties.headers?.[headerName];
|
|
|
|
if (msgHeaderValue === headerValue || String(msgHeaderValue) === String(headerValue)) {
|
|
// Match found - retry to main queue
|
|
channel.sendToQueue(queueName, msg.content, {
|
|
persistent: true,
|
|
headers: {
|
|
...msg.properties.headers,
|
|
'x-retry-from-dlq': true,
|
|
'x-retry-time': new Date().toISOString(),
|
|
'x-retry-by': req.user?.username || 'admin',
|
|
'x-retry-method': 'retryByHeader',
|
|
'x-retry-header': `${headerName}=${headerValue}`
|
|
}
|
|
});
|
|
|
|
channel.ack(msg);
|
|
retriedCount++;
|
|
|
|
if (retriedCount >= maxMessages) {
|
|
break;
|
|
}
|
|
} else {
|
|
// No match - requeue to DLQ for later processing
|
|
messagesToRequeue.push(msg);
|
|
}
|
|
}
|
|
|
|
// Requeue non-matching messages back to DLQ
|
|
for (const msg of messagesToRequeue) {
|
|
channel.sendToQueue(dlqName, msg.content, {
|
|
persistent: true,
|
|
headers: msg.properties.headers
|
|
});
|
|
channel.ack(msg);
|
|
}
|
|
|
|
pino.info(`Retried ${retriedCount} messages matching ${headerName}=${headerValue} from ${dlqName}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
retriedCount,
|
|
scannedCount,
|
|
headerName,
|
|
headerValue,
|
|
queueName,
|
|
dlqName
|
|
});
|
|
|
|
} catch (error) {
|
|
if (error instanceof AppParamError) {
|
|
next(error);
|
|
} else {
|
|
pino.error('Error retrying DLQ messages by header:', error);
|
|
next(new AppError(Errors.UNKNOWN_APP_ERROR, 'Failed to retry DLQ messages by header'));
|
|
}
|
|
} finally {
|
|
if (channel) await channel.close().catch(() => { });
|
|
if (connection) await connection.close().catch(() => { });
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Helper function to categorize errors
|
|
*/
|
|
function categorizeError(errorMessage) {
|
|
if (!errorMessage) return 'unknown';
|
|
|
|
const msg = errorMessage.toLowerCase();
|
|
|
|
// Transient errors
|
|
if (msg.includes('timeout') ||
|
|
msg.includes('econnrefused') ||
|
|
msg.includes('enotfound') ||
|
|
msg.includes('network') ||
|
|
msg.includes('connection')) {
|
|
return 'transient';
|
|
}
|
|
|
|
// Validation errors
|
|
if (msg.includes('validation') ||
|
|
msg.includes('invalid') ||
|
|
msg.includes('required') ||
|
|
msg.includes('missing') ||
|
|
msg.includes('format')) {
|
|
return 'validation';
|
|
}
|
|
|
|
// Processing errors
|
|
if (msg.includes('parse') ||
|
|
msg.includes('calculation') ||
|
|
msg.includes('processing') ||
|
|
msg.includes('data')) {
|
|
return 'processing';
|
|
}
|
|
|
|
// Infrastructure errors
|
|
if (msg.includes('database') ||
|
|
msg.includes('mongo') ||
|
|
msg.includes('filesystem') ||
|
|
msg.includes('disk')) {
|
|
return 'infrastructure';
|
|
}
|
|
|
|
// Partner API errors
|
|
if (msg.includes('api') ||
|
|
msg.includes('authentication') ||
|
|
msg.includes('unauthorized') ||
|
|
msg.includes('rate limit')) {
|
|
return 'partner_api';
|
|
}
|
|
|
|
return 'unknown';
|
|
}
|