1354 lines
47 KiB
JavaScript
1354 lines
47 KiB
JavaScript
'use strict';
|
|
|
|
const logger = require('../helpers/logger'),
|
|
pino = logger.child('partner_sync_worker'),
|
|
env = require('../helpers/env.js'),
|
|
{ DBConnection } = require('../helpers/db/connect'),
|
|
amqp = require('amqplib'), // Use promise-based version instead of callback
|
|
{ setupDLQQueues } = require('../helpers/dlq_queue_setup'), // DLQ helper module
|
|
{ PartnerTasks, AssignStatus, RecTypes, PartnerCodes, PartnerLogTrackerStatus } = require('../helpers/constants'),
|
|
SatLocApplicationProcessor = require('../helpers/satloc_application_processor'),
|
|
{ enhancedRunInTransaction } = require('../helpers/mongo_enhanced'),
|
|
{ ApplicationFile, JobAssign } = require('../model'),
|
|
PartnerLogTracker = require('../model/partner_log_tracker'),
|
|
TaskTracker = require('../model/task_tracker'),
|
|
{ TaskTrackerStatus } = require('../model/task_tracker'),
|
|
partnerSyncService = require('../services/partner_sync_service'),
|
|
fs = require('fs').promises;
|
|
|
|
// Partner Log Tracker Status Constants (using centralized constants)
|
|
const TRACKER_STATUS = PartnerLogTrackerStatus;
|
|
|
|
// Initialize database connection
|
|
const workerDB = new DBConnection('Partner Sync Worker');
|
|
|
|
let amqpConn = null;
|
|
let amqpChannel = null;
|
|
let mqClosed = false;
|
|
let reconnecting = false; // Prevent multiple simultaneous reconnection attempts
|
|
|
|
// Interval tracking to prevent leaks on reconnect
|
|
let circuitBreakerCleanupInterval = null;
|
|
let periodicCleanupInterval = null;
|
|
let memoryMonitorInterval = null;
|
|
|
|
const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER;
|
|
|
|
// Register fatal handlers
|
|
const path = require('path');
|
|
const { registerFatalHandlers } = require('../helpers/process_fatal_handlers');
|
|
registerFatalHandlers(process, {
|
|
env,
|
|
debug: (msg) => pino.info(msg),
|
|
kindPrefix: 'partner_sync_worker',
|
|
reportFilePath: path.join(__dirname, 'partner_sync_worker.rlog'),
|
|
});
|
|
|
|
process
|
|
.on('SIGINT', async () => {
|
|
pino.info('Received SIGINT, shutting down gracefully...');
|
|
mqClosed = true;
|
|
|
|
// Clear all intervals to prevent leaks
|
|
if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval);
|
|
if (periodicCleanupInterval) clearInterval(periodicCleanupInterval);
|
|
if (memoryMonitorInterval) clearInterval(memoryMonitorInterval);
|
|
|
|
// Close channel first to prevent memory leaks
|
|
if (amqpChannel) {
|
|
try {
|
|
await amqpChannel.close();
|
|
pino.info('AMQP channel closed');
|
|
} catch (err) {
|
|
pino.error({ err }, 'Error closing AMQP channel');
|
|
}
|
|
}
|
|
|
|
if (amqpConn) {
|
|
try {
|
|
await amqpConn.close();
|
|
pino.info('AMQP connection closed');
|
|
} catch (err) {
|
|
pino.error({ err }, 'Error closing AMQP connection');
|
|
}
|
|
}
|
|
process.exit(0);
|
|
})
|
|
.on('SIGTERM', async () => {
|
|
pino.info('Received SIGTERM, shutting down gracefully...');
|
|
mqClosed = true;
|
|
|
|
// Clear all intervals to prevent leaks
|
|
if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval);
|
|
if (periodicCleanupInterval) clearInterval(periodicCleanupInterval);
|
|
if (memoryMonitorInterval) clearInterval(memoryMonitorInterval);
|
|
|
|
// Close channel first to prevent memory leaks
|
|
if (amqpChannel) {
|
|
try {
|
|
await amqpChannel.close();
|
|
pino.info('AMQP channel closed');
|
|
} catch (err) {
|
|
pino.error({ err }, 'Error closing AMQP channel');
|
|
}
|
|
}
|
|
|
|
if (amqpConn) {
|
|
try {
|
|
await amqpConn.close();
|
|
pino.info('AMQP connection closed');
|
|
} catch (err) {
|
|
pino.error({ err }, 'Error closing AMQP connection');
|
|
}
|
|
}
|
|
process.exit(0);
|
|
});
|
|
|
|
// Initialize the database connection
|
|
workerDB.initialize({ setupExitHandlers: false });
|
|
|
|
// Startup cleanup for stuck processing tasks
|
|
async function startupCleanup() {
|
|
// Wait for database to be ready with retry logic
|
|
let retries = 0;
|
|
const maxRetries = 10;
|
|
|
|
while (!workerDB.isReady() && retries < maxRetries) {
|
|
pino.debug(`Database not ready for startup cleanup, waiting... (attempt ${retries + 1}/${maxRetries})`);
|
|
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second
|
|
retries++;
|
|
}
|
|
|
|
if (!workerDB.isReady()) {
|
|
pino.error('Database not ready after waiting, skipping startup cleanup');
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const STUCK_PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
|
|
const cutoffTime = new Date(Date.now() - STUCK_PROCESSING_TIMEOUT_MS);
|
|
|
|
// Find and reset stuck processing tasks (downloaded -> processing but never completed)
|
|
const stuckProcessing = await PartnerLogTracker.find({
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $lt: cutoffTime }
|
|
});
|
|
|
|
if (stuckProcessing.length > 0) {
|
|
pino.info(`Found ${stuckProcessing.length} stuck processing tasks on startup, resetting to failed`);
|
|
|
|
await PartnerLogTracker.updateMany(
|
|
{
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $lt: cutoffTime }
|
|
},
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.FAILED,
|
|
errorMessage: 'Processing timeout - reset on worker startup',
|
|
updatedAt: new Date()
|
|
}
|
|
}
|
|
);
|
|
|
|
pino.info(`Reset ${stuckProcessing.length} stuck processing tasks to failed status`);
|
|
}
|
|
|
|
// Also check for any processing tasks without processingStartedAt (corrupted state)
|
|
const corruptedProcessing = await PartnerLogTracker.find({
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $exists: false }
|
|
});
|
|
|
|
if (corruptedProcessing.length > 0) {
|
|
pino.info(`Found ${corruptedProcessing.length} corrupted processing tasks on startup, resetting to failed`);
|
|
|
|
await PartnerLogTracker.updateMany(
|
|
{
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $exists: false }
|
|
},
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.FAILED,
|
|
errorMessage: 'Corrupted processing state - reset on worker startup',
|
|
updatedAt: new Date()
|
|
}
|
|
}
|
|
);
|
|
|
|
pino.info(`Reset ${corruptedProcessing.length} corrupted processing tasks to failed status`);
|
|
}
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error during sync worker startup cleanup');
|
|
}
|
|
}
|
|
|
|
// Start AMQP queue worker for partner tasks using modern async/await
|
|
async function startPartnerWorker() {
|
|
// Prevent multiple simultaneous reconnection attempts
|
|
if (reconnecting) {
|
|
pino.debug('Reconnection already in progress, skipping duplicate attempt');
|
|
return;
|
|
}
|
|
reconnecting = true;
|
|
|
|
try {
|
|
// Wait for database to be ready before starting worker
|
|
let dbRetries = 0;
|
|
const maxDbRetries = 20;
|
|
|
|
while (!workerDB.isReady() && dbRetries < maxDbRetries) {
|
|
pino.debug(`Waiting for database connection... (attempt ${dbRetries + 1}/${maxDbRetries})`);
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
dbRetries++;
|
|
}
|
|
|
|
if (!workerDB.isReady()) {
|
|
pino.error('Database connection failed after retries, retrying worker startup in 10s');
|
|
reconnecting = false;
|
|
setTimeout(startPartnerWorker, 10000);
|
|
return;
|
|
}
|
|
|
|
pino.info('Database connection ready, starting AMQP worker...');
|
|
|
|
const conOps = {
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR || 'agmuser',
|
|
password: env.QUEUE_PWD,
|
|
vhost: env.QUEUE_VHOST || '/',
|
|
heartbeat: env.QUEUE_HEARTBEAT || 0,
|
|
frameMax: 0
|
|
};
|
|
|
|
// Close old channel if exists (prevents channel leak on reconnect)
|
|
if (amqpChannel) {
|
|
try {
|
|
await amqpChannel.close();
|
|
pino.debug('[AMQP] Closed old channel before reconnect');
|
|
} catch (err) {
|
|
pino.debug({ err }, '[AMQP] Error closing old channel (may already be closed)');
|
|
}
|
|
amqpChannel = null;
|
|
}
|
|
|
|
const conn = await amqp.connect(conOps);
|
|
|
|
conn.on("error", (err) => {
|
|
if (mqClosed) return; // Already handling reconnection
|
|
|
|
pino.error({ err }, '[AMQP] Connection error');
|
|
mqClosed = true;
|
|
reconnecting = false; // Allow reconnection
|
|
|
|
// Longer delay for permission errors to prevent tight loop
|
|
const isPermissionError = err.message && err.message.includes('ACCESS_REFUSED');
|
|
const delay = isPermissionError ? 30000 : 5000; // 30s for permission errors, 5s for others
|
|
|
|
if (isPermissionError) {
|
|
pino.error(`[AMQP] Permission error detected. Ensure RabbitMQ user has access to queue '${PARTNER_QUEUE}'. Retrying in ${delay / 1000}s`);
|
|
}
|
|
|
|
setTimeout(startPartnerWorker, delay);
|
|
});
|
|
|
|
conn.on("close", () => {
|
|
if (mqClosed) return; // Already handling reconnection
|
|
|
|
pino.info('[AMQP] Connection closed');
|
|
mqClosed = true;
|
|
reconnecting = false; // Allow reconnection
|
|
setTimeout(startPartnerWorker, 5000);
|
|
});
|
|
|
|
amqpConn = conn;
|
|
mqClosed = false;
|
|
pino.info('[AMQP] Partner worker connected');
|
|
|
|
// Use DLQ helper module to set up queue infrastructure
|
|
const { channel, queueNames } = await setupDLQQueues(PARTNER_QUEUE, {
|
|
connection: conn,
|
|
retentionDays: env.DLQ_RETENTION_DAYS,
|
|
prefetch: 1
|
|
});
|
|
|
|
amqpChannel = channel; // Store channel reference for cleanup on reconnect/shutdown
|
|
|
|
pino.info('[AMQP] DLQ infrastructure configured via helper module');
|
|
pino.info('[AMQP] Queue flow: %s -> %s (%d days TTL) -> %s',
|
|
queueNames.main, queueNames.dlq, env.DLQ_RETENTION_DAYS, queueNames.archive);
|
|
|
|
// Start periodic cleanup ONLY AFTER channel is successfully created
|
|
startPeriodicCleanup();
|
|
|
|
reconnecting = false; // Connection AND channel successful, allow future reconnections
|
|
|
|
pino.info('[AMQP] Prefetch set to 1 for memory-safe processing');
|
|
pino.info('[AMQP] Waiting for partner tasks...');
|
|
|
|
await channel.consume(PARTNER_QUEUE, async (msg) => {
|
|
if (!msg) return;
|
|
|
|
let taskMsg;
|
|
try {
|
|
taskMsg = JSON.parse(msg.content);
|
|
} catch (err) {
|
|
pino.error({ err }, 'Failed to parse task message');
|
|
channel.ack(msg);
|
|
return;
|
|
}
|
|
|
|
// Check if message was redelivered (failed processing before)
|
|
const isRedelivered = msg.fields && msg.fields.redelivered;
|
|
|
|
// Add extra protection for redelivered messages
|
|
if (isRedelivered && taskMsg.logFileName) {
|
|
pino.debug(`Processing redelivered message for: ${taskMsg.logFileName}`);
|
|
|
|
// Check if this was already successfully processed (crash recovery)
|
|
if (await isTaskAlreadyProcessed(taskMsg)) {
|
|
pino.info(`Redelivered task ${taskMsg.logFileName} was already processed, acknowledging`);
|
|
channel.ack(msg);
|
|
return;
|
|
}
|
|
}
|
|
|
|
try {
|
|
const result = await processPartnerTask(taskMsg, isRedelivered);
|
|
if (!mqClosed) {
|
|
pino.debug('Partner task completed:', result);
|
|
channel.ack(msg);
|
|
}
|
|
} catch (error) {
|
|
if (!mqClosed) {
|
|
pino.error({
|
|
err: error,
|
|
taskMsg: {
|
|
logFileName: taskMsg.logFileName,
|
|
type: taskMsg.type,
|
|
customerId: taskMsg.customerId,
|
|
partnerCode: taskMsg.partnerCode,
|
|
aircraftId: taskMsg.aircraftId
|
|
}
|
|
}, 'Partner task failed');
|
|
|
|
// Check if task is already processed to avoid sending duplicates to DLQ
|
|
if (await isTaskAlreadyProcessed(taskMsg)) {
|
|
pino.debug(`Task ${taskMsg.logFileName} already processed, acknowledging without DLQ`);
|
|
channel.ack(msg);
|
|
return;
|
|
}
|
|
|
|
// For redelivered messages, be more aggressive about detecting fatal errors
|
|
if (isRedelivered && isFatalError(error)) {
|
|
pino.error({ err: error, taskMsg }, 'Fatal error on redelivered message, sending to DLQ immediately');
|
|
|
|
// Enrich message with error metadata before DLQ
|
|
const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error);
|
|
if (enriched) {
|
|
channel.ack(msg); // Ack original, enriched version will fail to DLQ
|
|
} else {
|
|
channel.reject(msg, false); // Fallback to direct reject
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Implement retry logic with exponential backoff
|
|
if (await shouldRetryTask(taskMsg, error)) {
|
|
pino.debug('Retrying task after delay...');
|
|
// Reject with requeue to retry later
|
|
channel.reject(msg, true);
|
|
} else {
|
|
// Final check before DLQ - log for monitoring
|
|
pino.warn({
|
|
logFileName: taskMsg.logFileName,
|
|
customerId: taskMsg.customerId,
|
|
partnerCode: taskMsg.partnerCode,
|
|
aircraftId: taskMsg.aircraftId,
|
|
retryCount: taskMsg.retryCount || 0,
|
|
lastError: error.message,
|
|
isRedelivered
|
|
}, 'Task exceeded max retries, sending to dead letter queue');
|
|
|
|
// Enrich message with error metadata before DLQ
|
|
const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error);
|
|
if (enriched) {
|
|
channel.ack(msg); // Ack original, enriched version will fail to DLQ
|
|
} else {
|
|
channel.reject(msg, false); // Fallback to direct reject
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}, { noAck: false });
|
|
|
|
} catch (error) {
|
|
pino.error({ err: error }, '[AMQP] Worker setup failed');
|
|
|
|
// CRITICAL: Always reset reconnecting flag to allow future attempts
|
|
reconnecting = false;
|
|
mqClosed = true;
|
|
|
|
// Close any partially created connection/channel to prevent leaks
|
|
if (amqpChannel) {
|
|
try {
|
|
await amqpChannel.close();
|
|
pino.debug('[AMQP] Closed channel after connection failure');
|
|
} catch (err) {
|
|
// Ignore close errors
|
|
}
|
|
amqpChannel = null;
|
|
}
|
|
|
|
if (amqpConn) {
|
|
try {
|
|
await amqpConn.close();
|
|
pino.debug('[AMQP] Closed connection after setup failure');
|
|
} catch (err) {
|
|
// Ignore close errors
|
|
}
|
|
amqpConn = null;
|
|
}
|
|
|
|
// Check if it's a permission error
|
|
const isPermissionError = error.message && (
|
|
error.message.includes('ACCESS_REFUSED') ||
|
|
error.message.includes('403')
|
|
);
|
|
|
|
if (isPermissionError) {
|
|
pino.error(`[AMQP] Permission denied for queue '${PARTNER_QUEUE}'. Check RabbitMQ user permissions. Retrying in 30s`);
|
|
setTimeout(startPartnerWorker, 30000); // Longer delay for permission issues
|
|
} else {
|
|
pino.error('[AMQP] Connection failed, retrying in 5s');
|
|
setTimeout(startPartnerWorker, 5000);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process individual partner tasks using async/await with timeout protection
|
|
async function processPartnerTask(taskMsg, isRedelivered = false) {
|
|
pino.debug('Processing partner task:', taskMsg.type, taskMsg.data, { redelivered: isRedelivered });
|
|
|
|
// Add timeout protection to prevent hanging tasks
|
|
const TASK_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
|
|
|
|
const taskPromise = new Promise(async (resolve, reject) => {
|
|
try {
|
|
let result;
|
|
switch (taskMsg.type) {
|
|
case PartnerTasks.UPLOAD_PARTNER_JOB:
|
|
result = await processPartnerJobUpload(taskMsg.data, isRedelivered);
|
|
break;
|
|
|
|
case PartnerTasks.PROCESS_PARTNER_LOG:
|
|
result = await processPartnerLog(taskMsg.data, isRedelivered);
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`Unknown partner task type: ${taskMsg.type}`);
|
|
}
|
|
resolve(result);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
|
|
const timeoutPromise = new Promise((_, reject) => {
|
|
setTimeout(() => {
|
|
reject(new Error(`Task ${taskMsg.type} timed out after ${TASK_TIMEOUT_MS / 1000} seconds`));
|
|
}, TASK_TIMEOUT_MS);
|
|
});
|
|
|
|
try {
|
|
return await Promise.race([taskPromise, timeoutPromise]);
|
|
} catch (error) {
|
|
pino.error({
|
|
err: error,
|
|
taskType: taskMsg.type,
|
|
logFileName: taskMsg.data?.logFileName,
|
|
isTimeout: error.message.includes('timed out')
|
|
}, 'Partner task processing error');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Constants for task processing
|
|
const MAX_RETRY_ATTEMPTS = env.PARTNER_MAX_RETRIES;
|
|
const RETRY_DELAY_MS = 10000;
|
|
const BASE_RETRY_DELAY = 5000; // 5 seconds base delay
|
|
const MAX_RETRY_DELAY = 60000; // Max 60 seconds delay
|
|
const PROCESSING_TIMEOUT_MS = 90 * 60 * 1000; // 90 minutes for large files
|
|
const CLEANUP_INTERVAL_MS = 30 * 60 * 1000; // 30 minutes (from 15 to prevent overhead)
|
|
|
|
// Circuit breaker for problematic files - more lenient for development
|
|
const problematicFiles = new Map(); // filename -> { attempts, lastAttempt, blocked }
|
|
const MAX_FILE_ATTEMPTS = env.NODE_ENV === 'production' ? 3 : 10; // More attempts in dev
|
|
const FILE_BLOCK_DURATION = env.NODE_ENV === 'production' ? 60 * 60 * 1000 : 5 * 60 * 1000; // 5 minutes in dev vs 1 hour in prod
|
|
|
|
// Periodically clean up old entries from circuit breaker map to prevent memory leak
|
|
if (!circuitBreakerCleanupInterval) {
|
|
circuitBreakerCleanupInterval = setInterval(() => {
|
|
const now = Date.now();
|
|
const cutoffTime = now - (FILE_BLOCK_DURATION * 2); // Keep for 2x block duration
|
|
let cleanedCount = 0;
|
|
|
|
for (const [key, info] of problematicFiles.entries()) {
|
|
if (info.lastAttempt < cutoffTime) {
|
|
problematicFiles.delete(key);
|
|
cleanedCount++;
|
|
}
|
|
}
|
|
|
|
if (cleanedCount > 0) {
|
|
pino.debug(`Cleaned ${cleanedCount} old entries from circuit breaker map (size: ${problematicFiles.size})`);
|
|
}
|
|
}, 60 * 60 * 1000); // Clean every hour
|
|
|
|
pino.info('Circuit breaker cleanup interval started');
|
|
}
|
|
|
|
// Debug: Log constants at startup
|
|
pino.info(`Partner sync worker constants - MAX_RETRY_ATTEMPTS: ${MAX_RETRY_ATTEMPTS}, PARTNER_MAX_RETRIES: ${env.PARTNER_MAX_RETRIES}`);
|
|
pino.info(`Circuit breaker settings - MAX_FILE_ATTEMPTS: ${MAX_FILE_ATTEMPTS}, FILE_BLOCK_DURATION: ${FILE_BLOCK_DURATION}ms (${FILE_BLOCK_DURATION / 60000} minutes)`);
|
|
pino.info(`Environment: ${env.NODE_ENV}, Development mode circuit breaker: ${env.NODE_ENV === 'development' ? 'LENIENT' : 'STRICT'}`);
|
|
|
|
// Cleanup stuck processing tasks periodically
|
|
function startPeriodicCleanup() {
|
|
// Prevent multiple cleanup intervals on reconnect
|
|
if (periodicCleanupInterval) {
|
|
pino.debug('Periodic cleanup already running, skipping duplicate');
|
|
return;
|
|
}
|
|
|
|
periodicCleanupInterval = setInterval(async () => {
|
|
try {
|
|
const cutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS);
|
|
|
|
const stuckTasks = await PartnerLogTracker.find({
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $lt: cutoffTime }
|
|
});
|
|
|
|
if (stuckTasks.length > 0) {
|
|
pino.info(`Found ${stuckTasks.length} stuck processing tasks, resetting them`);
|
|
|
|
await PartnerLogTracker.updateMany(
|
|
{
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $lt: cutoffTime }
|
|
},
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.FAILED,
|
|
errorMessage: 'Processing timeout - automatically reset',
|
|
updatedAt: new Date()
|
|
}
|
|
}
|
|
);
|
|
|
|
pino.info(`Reset ${stuckTasks.length} stuck processing tasks to failed status`);
|
|
}
|
|
} catch (error) {
|
|
pino.error({ err: error }, 'Error during periodic cleanup of stuck tasks');
|
|
}
|
|
}, CLEANUP_INTERVAL_MS);
|
|
|
|
pino.info('Periodic cleanup interval started');
|
|
}
|
|
|
|
// Check if a task should be retried based on error type and retry count
|
|
async function shouldRetryTask(taskMsg, error) {
|
|
try {
|
|
// Get current retry count from task message or initialize to 0
|
|
const retryCount = taskMsg.retryCount || 0;
|
|
|
|
// Debug logging to understand retry logic
|
|
pino.debug(`shouldRetryTask: retryCount=${retryCount}, MAX_RETRY_ATTEMPTS=${MAX_RETRY_ATTEMPTS}, comparison=${retryCount >= MAX_RETRY_ATTEMPTS}`);
|
|
|
|
// Don't retry if we've exceeded max attempts
|
|
if (retryCount >= MAX_RETRY_ATTEMPTS) {
|
|
pino.info(`Task ${taskMsg.type} exceeded max retry attempts (${retryCount} >= ${MAX_RETRY_ATTEMPTS}), sending to DLQ`);
|
|
return false;
|
|
}
|
|
|
|
// Don't retry for certain error types (validation errors, etc.)
|
|
if (isNonRetryableError(error)) {
|
|
pino.debug(`Task ${taskMsg.type} failed with non-retryable error:`, error.message);
|
|
return false;
|
|
}
|
|
|
|
// Calculate exponential backoff delay
|
|
const delay = Math.min(BASE_RETRY_DELAY * Math.pow(2, retryCount), MAX_RETRY_DELAY);
|
|
pino.debug(`Will retry task ${taskMsg.type} after ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRY_ATTEMPTS})`);
|
|
|
|
// Update retry count for next attempt
|
|
taskMsg.retryCount = retryCount + 1;
|
|
taskMsg.lastError = error.message;
|
|
taskMsg.nextRetryAt = Date.now() + delay;
|
|
|
|
// Wait for backoff delay
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
|
|
return true;
|
|
} catch (err) {
|
|
pino.debug('Error in shouldRetryTask:', err);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Check if task is already processed to avoid DLQ duplicates
|
|
async function isTaskAlreadyProcessed(taskMsg) {
|
|
try {
|
|
if (!taskMsg.logFileName || !taskMsg.customerId || !taskMsg.partnerCode || !taskMsg.aircraftId) {
|
|
return false; // If essential fields are missing, let it proceed to DLQ
|
|
}
|
|
|
|
const filter = {
|
|
customerId: taskMsg.customerId,
|
|
partnerCode: taskMsg.partnerCode,
|
|
aircraftId: taskMsg.aircraftId, // Fixed: use aircraftId, not partnerAircraftId
|
|
logFileName: taskMsg.logFileName
|
|
};
|
|
|
|
const tracker = await PartnerLogTracker.findOne(filter);
|
|
|
|
// Consider processed if explicitly marked as processed OR in processed status
|
|
const isProcessed = tracker && (tracker.processed === true || tracker.status === TRACKER_STATUS.PROCESSED);
|
|
|
|
if (isProcessed) {
|
|
pino.debug(`Task ${taskMsg.logFileName} already processed, avoiding DLQ duplicate`);
|
|
}
|
|
|
|
return isProcessed;
|
|
} catch (err) {
|
|
pino.debug('Error checking if task already processed:', err);
|
|
return false; // On error, let it proceed to avoid blocking
|
|
}
|
|
}
|
|
|
|
// Check if an error should not be retried
|
|
function isNonRetryableError(error) {
|
|
const nonRetryablePatterns = [
|
|
/validation/i,
|
|
/invalid.*format/i,
|
|
/malformed/i,
|
|
// Authentication errors are now retryable (removed from this list)
|
|
// /authentication.*failed/i,
|
|
// /unauthorized/i,
|
|
// /forbidden/i,
|
|
/not.*found/i,
|
|
/already.*exists/i,
|
|
/already.*processed/i
|
|
];
|
|
|
|
return nonRetryablePatterns.some(pattern => pattern.test(error.message));
|
|
}
|
|
|
|
// Check if an error is fatal and should immediately go to DLQ (especially for redelivered messages)
|
|
function isFatalError(error) {
|
|
const fatalPatterns = [
|
|
/out of memory/i,
|
|
/heap.*limit/i,
|
|
/maximum call stack/i,
|
|
/cannot allocate memory/i,
|
|
/worker killed/i,
|
|
/signal.*kill/i,
|
|
/database.*connection.*lost/i,
|
|
/connection.*terminated/i,
|
|
/timeout.*exceeded/i,
|
|
/file.*corrupt/i,
|
|
/parse.*error/i,
|
|
/invalid.*file.*format/i
|
|
];
|
|
|
|
return fatalPatterns.some(pattern => pattern.test(error.message));
|
|
}
|
|
|
|
/**
|
|
* Categorize error for DLQ analysis
|
|
* Based on https://rashadansari.medium.com/strategies-for-successful-dead-letter-queue-event-handling-e354f7dfbb3e
|
|
*/
|
|
function categorizeError(error) {
|
|
const errorMsg = (error?.message || '').toLowerCase();
|
|
|
|
// Transient errors - temporary issues that may resolve
|
|
if (/timeout|timed out|connection|network|econnrefused|enotfound|socket|dns/i.test(errorMsg)) {
|
|
return 'transient';
|
|
}
|
|
|
|
// Validation errors - data quality issues
|
|
if (/validation|invalid|malformed|missing required|bad request|400/i.test(errorMsg)) {
|
|
return 'validation';
|
|
}
|
|
|
|
// Infrastructure errors - database, filesystem
|
|
if (/database|mongo|transaction|filesystem|eacces|enoent|disk/i.test(errorMsg)) {
|
|
return 'infrastructure';
|
|
}
|
|
|
|
// Partner API errors
|
|
if (/partner api|api error|http [45]\d\d|unauthorized|forbidden|authentication/i.test(errorMsg)) {
|
|
return 'partner_api';
|
|
}
|
|
|
|
// Processing errors - business logic failures
|
|
if (/processing|calculation|parse|transform|encode|decode/i.test(errorMsg)) {
|
|
return 'processing';
|
|
}
|
|
|
|
return 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Calculate error severity for alerting
|
|
*/
|
|
function calculateSeverity(error, retryCount = 0) {
|
|
const category = categorizeError(error);
|
|
|
|
// Critical: fatal errors or high retry count
|
|
if (isFatalError(error) || retryCount >= env.PARTNER_MAX_RETRIES) {
|
|
return 'critical';
|
|
}
|
|
|
|
// High: infrastructure or persistent issues
|
|
if (category === 'infrastructure' || retryCount >= 3) {
|
|
return 'high';
|
|
}
|
|
|
|
// Medium: validation or processing errors
|
|
if (category === 'validation' || category === 'processing') {
|
|
return 'medium';
|
|
}
|
|
|
|
// Low: transient errors (likely to resolve)
|
|
return 'low';
|
|
}
|
|
|
|
/**
|
|
* Create enriched message properties for DLQ tracking
|
|
*/
|
|
function createDLQHeaders(taskMsg, error) {
|
|
return {
|
|
'x-error-category': categorizeError(error),
|
|
'x-error-reason': error?.message || 'Unknown error',
|
|
'x-task-type': taskMsg.type || 'unknown',
|
|
'x-severity': calculateSeverity(error, taskMsg.retryCount),
|
|
'x-first-death-time': Date.now(),
|
|
'x-partner-code': taskMsg.partnerCode || 'unknown',
|
|
'x-customer-id': taskMsg.customerId?.toString() || 'unknown'
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Enrich message with error metadata and send to DLQ
|
|
* Centralizes DLQ routing logic to avoid code duplication
|
|
*/
|
|
async function sendToQueueWithEnrichment(channel, queueName, taskMsg, error) {
|
|
try {
|
|
const enrichedTask = {
|
|
...taskMsg,
|
|
lastError: error.message,
|
|
failedAt: new Date().toISOString(),
|
|
retryCount: (taskMsg.retryCount || 0) + 1
|
|
};
|
|
|
|
await channel.sendToQueue(
|
|
queueName,
|
|
Buffer.from(JSON.stringify(enrichedTask)),
|
|
{
|
|
persistent: true,
|
|
headers: createDLQHeaders(taskMsg, error)
|
|
}
|
|
);
|
|
return true;
|
|
} catch (enrichError) {
|
|
pino.error({ err: enrichError }, 'Failed to enrich message');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Process partner job upload using async/await
|
|
async function processPartnerJobUpload(taskData, isRedelivered = false) {
|
|
pino.debug('Uploading job to partner:', taskData, { redelivered: isRedelivered });
|
|
|
|
try {
|
|
// If message was redelivered, check if upload was already successful
|
|
if (isRedelivered) {
|
|
const assignment = await JobAssign.findById(taskData.assignId);
|
|
if (assignment && assignment.extJobId) {
|
|
pino.debug('Job upload already completed on redelivered message:', assignment.extJobId);
|
|
return { success: true, result: { alreadyProcessed: true, externalJobId: assignment.extJobId } };
|
|
}
|
|
}
|
|
|
|
let result;
|
|
|
|
// Use atomic transaction to ensure upload and status updates are consistent
|
|
await enhancedRunInTransaction(async (session) => {
|
|
// Upload job to partner with session for atomic operations. This handles assignment status update and job logging internally
|
|
result = await partnerSyncService.uploadJobToPartner(taskData.assignId, { session });
|
|
|
|
if (!result.success) {
|
|
throw new Error(`Upload failed: ${result.message || 'Unknown error'}`);
|
|
}
|
|
});
|
|
|
|
pino.debug('Partner job upload result:', result);
|
|
return { success: true, result };
|
|
} catch (error) {
|
|
pino.debug('Partner job upload error:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Process partner log data using async/await
|
|
async function processPartnerLog(taskData, isRedelivered = false) {
|
|
pino.debug('Processing partner log:', taskData.logFileName, { redelivered: isRedelivered });
|
|
|
|
// TaskTracker idempotency check (parallel tracking)
|
|
const { taskId, executionId } = taskData;
|
|
if (taskId && executionId) {
|
|
const taskTracker = await TaskTracker.findOneAndUpdate(
|
|
{
|
|
taskId,
|
|
executionId,
|
|
status: { $in: [TaskTrackerStatus.QUEUED, TaskTrackerStatus.FAILED] }
|
|
},
|
|
{
|
|
$set: {
|
|
status: TaskTrackerStatus.PROCESSING,
|
|
processingStartedAt: new Date()
|
|
}
|
|
},
|
|
{ new: true }
|
|
);
|
|
|
|
if (!taskTracker) {
|
|
pino.warn({ taskId, executionId, logFileName: taskData.logFileName },
|
|
'Task already claimed or completed by another worker, skipping');
|
|
return { skipped: true, reason: 'already_processed' };
|
|
}
|
|
pino.debug({ taskId, executionId }, 'TaskTracker claimed successfully');
|
|
}
|
|
|
|
// Circuit breaker: Check if this file has been problematic
|
|
const fileKey = `${taskData.logFileName}-${taskData.partnerCode}`;
|
|
const fileInfo = problematicFiles.get(fileKey);
|
|
|
|
// In development, allow manual override by checking for recent successful processing
|
|
if (fileInfo && fileInfo.blocked && (Date.now() - fileInfo.lastAttempt) < FILE_BLOCK_DURATION) {
|
|
if (env.NODE_ENV === 'development') {
|
|
pino.warn(`File ${taskData.logFileName} is temporarily blocked but allowing retry in development mode`);
|
|
// Reset the block in development to allow retries
|
|
problematicFiles.delete(fileKey);
|
|
} else {
|
|
pino.warn(`File ${taskData.logFileName} is temporarily blocked due to repeated failures`);
|
|
throw new Error(`File temporarily blocked due to repeated failures - will retry later`);
|
|
}
|
|
}
|
|
|
|
const processStartTime = Date.now();
|
|
|
|
// Build filter for atomic operations from essential task fields
|
|
// (trackerFilter is no longer passed in task payload - reconstruct from essential fields)
|
|
const filter = {
|
|
logId: taskData.logId,
|
|
partnerCode: taskData.partnerCode,
|
|
aircraftId: taskData.aircraftId,
|
|
customerId: taskData.customerId
|
|
};
|
|
|
|
try {
|
|
// Check database connection before processing
|
|
if (!workerDB.isReady()) {
|
|
throw new Error('Database connection not ready for processing');
|
|
}
|
|
|
|
// Atomically claim log for processing
|
|
// For redelivered messages, be more aggressive about reclaiming stuck processing tasks
|
|
const claimFilter = {
|
|
...filter,
|
|
$or: [
|
|
{ processed: { $exists: false } },
|
|
{ processed: false }
|
|
]
|
|
};
|
|
|
|
// For redelivered messages, allow reclaiming stuck PROCESSING tasks (older than 5 minutes)
|
|
if (isRedelivered) {
|
|
const stuckProcessingCutoff = new Date(Date.now() - 5 * 60 * 1000); // 5 minutes ago
|
|
claimFilter.$or.push(
|
|
// Reclaim PROCESSING tasks that are stuck (started > 5 minutes ago)
|
|
{
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $lt: stuckProcessingCutoff }
|
|
},
|
|
// Reclaim PROCESSING tasks without processingStartedAt (corrupted state)
|
|
{
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: { $exists: false }
|
|
}
|
|
);
|
|
// Also allow DOWNLOADED and FAILED
|
|
claimFilter.$or.push(
|
|
{ status: TRACKER_STATUS.DOWNLOADED },
|
|
{ status: TRACKER_STATUS.FAILED }
|
|
);
|
|
pino.info(`Redelivered message - attempting to reclaim stuck/failed/downloaded task: ${taskData.logFileName}`);
|
|
} else {
|
|
// For non-redelivered messages, only claim DOWNLOADED or FAILED
|
|
claimFilter.status = { $in: [TRACKER_STATUS.DOWNLOADED, TRACKER_STATUS.FAILED] };
|
|
}
|
|
|
|
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
|
|
claimFilter,
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
processingStartedAt: new Date(),
|
|
updatedAt: new Date(),
|
|
...(isRedelivered && { errorMessage: 'Reclaimed from redelivered message' })
|
|
},
|
|
$inc: { retryCount: 1 }
|
|
},
|
|
{ new: true }
|
|
);
|
|
|
|
// Log current state for debugging
|
|
if (!claimedTracker) {
|
|
const existingTracker = await PartnerLogTracker.findOne(filter);
|
|
pino.debug(`Unable to claim log ${taskData.logFileName}. Current state:`, {
|
|
logFileName: taskData.logFileName,
|
|
currentStatus: existingTracker?.status,
|
|
processed: existingTracker?.processed,
|
|
processingStartedAt: existingTracker?.processingStartedAt,
|
|
retryCount: existingTracker?.retryCount
|
|
});
|
|
}
|
|
|
|
if (!claimedTracker) {
|
|
// Log was already processed or claimed by another worker
|
|
const existingTracker = await PartnerLogTracker.findOne(filter);
|
|
|
|
if (existingTracker?.processed) {
|
|
pino.debug(`Log ${taskData.logFileName} already processed`);
|
|
return { success: true, message: 'Already processed' };
|
|
}
|
|
|
|
// For redelivered messages, we already tried to reclaim processing tasks above
|
|
if (isRedelivered) {
|
|
pino.warn(`Redelivered message could not be claimed for ${taskData.logFileName}, may be genuinely processed by another worker`);
|
|
return { success: true, message: 'Could not reclaim redelivered task - likely processed elsewhere' };
|
|
}
|
|
|
|
// For non-redelivered messages, handle processing status more conservatively
|
|
if (existingTracker?.status === TRACKER_STATUS.PROCESSING) {
|
|
// Check if processing is stuck (taking too long)
|
|
const isStuck = existingTracker.processingStartedAt &&
|
|
(Date.now() - existingTracker.processingStartedAt.getTime() > PROCESSING_TIMEOUT_MS);
|
|
|
|
if (isStuck) {
|
|
pino.debug(`Log ${taskData.logFileName} processing appears stuck, resetting to retry`);
|
|
// Reset stuck processing to allow retry
|
|
await PartnerLogTracker.findOneAndUpdate(
|
|
filter,
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.FAILED,
|
|
errorMessage: 'Processing timeout - reset for retry',
|
|
updatedAt: new Date()
|
|
}
|
|
}
|
|
);
|
|
// Throw error to trigger task requeue/retry
|
|
throw new Error(`Log ${taskData.logFileName} processing was stuck and has been reset for retry`);
|
|
} else {
|
|
pino.debug(`Log ${taskData.logFileName} is currently being processed by another worker`);
|
|
// Don't acknowledge - let message be redelivered later
|
|
throw new Error(`Log ${taskData.logFileName} is currently being processed - will retry later`);
|
|
}
|
|
} else {
|
|
pino.debug(`Log ${taskData.logFileName} was claimed by another worker or in wrong state`);
|
|
// Don't acknowledge - let message be redelivered later
|
|
throw new Error(`Log ${taskData.logFileName} in unexpected state - will retry later`);
|
|
}
|
|
}
|
|
|
|
if (isRedelivered && claimedTracker) {
|
|
pino.info(`Successfully reclaimed redelivered task: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`);
|
|
} else {
|
|
pino.debug(`Successfully claimed log for processing: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`);
|
|
}
|
|
|
|
try {
|
|
let processingResult;
|
|
|
|
// Resolve full file path - use service method for path-agnostic storage
|
|
let localFilePath = taskData.localFilePath;
|
|
if (!localFilePath) {
|
|
const SatlocService = require('../services/satloc_service');
|
|
const satlocService = new SatlocService();
|
|
const savedFile = claimedTracker?.savedLocalFile || taskData.logFileName;
|
|
localFilePath = satlocService.resolveLogFilePath(savedFile);
|
|
pino.debug(`Resolved file path via service: ${localFilePath}`);
|
|
}
|
|
|
|
pino.debug(`Processing local file: ${localFilePath}`);
|
|
|
|
// Verify file exists
|
|
try {
|
|
await fs.access(localFilePath);
|
|
} catch (fileError) {
|
|
throw new Error(`Local log file not found: ${localFilePath}`);
|
|
}
|
|
|
|
// Update taskData with resolved path for downstream processing
|
|
taskData.localFilePath = localFilePath;
|
|
|
|
processingResult = await processLocalLogFile(taskData, claimedTracker);
|
|
|
|
// Atomically mark as successfully processed
|
|
const completedTracker = await PartnerLogTracker.findOneAndUpdate(
|
|
{
|
|
...filter,
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
_id: claimedTracker._id
|
|
},
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.PROCESSED,
|
|
processed: true,
|
|
processedAt: new Date(),
|
|
matchedJobs: processingResult.matchedJobs,
|
|
appFileId: processingResult.appFileId,
|
|
processTime: Date.now() - processStartTime,
|
|
updatedAt: new Date()
|
|
}
|
|
},
|
|
{ new: true }
|
|
);
|
|
|
|
if (!completedTracker) {
|
|
throw new Error(`Failed to update tracker status to processed for ${taskData.logFileName}`);
|
|
}
|
|
|
|
// Update TaskTracker to completed (parallel tracking)
|
|
if (taskId && executionId) {
|
|
await TaskTracker.updateOne(
|
|
{ executionId },
|
|
{
|
|
$set: {
|
|
status: TaskTrackerStatus.COMPLETED,
|
|
completedAt: new Date(),
|
|
processTime: Date.now() - processStartTime,
|
|
result: {
|
|
matchedJobs: processingResult.matchedJobs,
|
|
appFileId: processingResult.appFileId
|
|
}
|
|
}
|
|
}
|
|
).catch(err => {
|
|
// Non-blocking - log error but don't fail task
|
|
pino.error({ err, executionId }, 'Failed to update TaskTracker to completed');
|
|
});
|
|
}
|
|
|
|
pino.debug(`Successfully processed log ${taskData.logFileName}: ${processingResult.matchedJobs.length} jobs matched`);
|
|
|
|
// Circuit breaker: Reset file tracking on success
|
|
if (problematicFiles.has(fileKey)) {
|
|
problematicFiles.delete(fileKey);
|
|
pino.debug(`Cleared problematic file tracking for ${taskData.logFileName}`);
|
|
}
|
|
|
|
return {
|
|
success: true,
|
|
result: processingResult
|
|
};
|
|
|
|
} catch (error) {
|
|
// Circuit breaker: Track problematic files
|
|
const currentFileInfo = problematicFiles.get(fileKey) || { attempts: 0, lastAttempt: 0, blocked: false };
|
|
currentFileInfo.attempts++;
|
|
currentFileInfo.lastAttempt = Date.now();
|
|
|
|
if (currentFileInfo.attempts >= MAX_FILE_ATTEMPTS) {
|
|
currentFileInfo.blocked = true;
|
|
pino.warn(`File ${taskData.logFileName} marked as problematic after ${currentFileInfo.attempts} attempts`);
|
|
}
|
|
|
|
problematicFiles.set(fileKey, currentFileInfo);
|
|
|
|
// Atomically mark as failed
|
|
await PartnerLogTracker.findOneAndUpdate(
|
|
{
|
|
...filter,
|
|
status: TRACKER_STATUS.PROCESSING,
|
|
_id: claimedTracker._id
|
|
},
|
|
{
|
|
$set: {
|
|
status: TRACKER_STATUS.FAILED,
|
|
errorMessage: error.message,
|
|
processTime: Date.now() - processStartTime,
|
|
updatedAt: new Date()
|
|
}
|
|
},
|
|
{ new: true }
|
|
);
|
|
|
|
// Update TaskTracker with error details (parallel tracking)
|
|
if (taskId && executionId) {
|
|
const errorCategory = categorizeError(error);
|
|
const canRetry = currentFileInfo.attempts < MAX_FILE_ATTEMPTS;
|
|
|
|
await TaskTracker.updateOne(
|
|
{ executionId },
|
|
{
|
|
$set: {
|
|
status: canRetry ? TaskTrackerStatus.FAILED : TaskTrackerStatus.DLQ,
|
|
errorMessage: error.message,
|
|
errorCategory,
|
|
errorStack: error.stack,
|
|
failedAt: new Date(),
|
|
processTime: Date.now() - processStartTime
|
|
},
|
|
$inc: { retryCount: 1 }
|
|
}
|
|
).catch(err => {
|
|
// Non-blocking - log error but don't fail task
|
|
pino.error({ err, executionId }, 'Failed to update TaskTracker with error');
|
|
});
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
} catch (error) {
|
|
pino.debug('Partner log processing error:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Process local log file that was downloaded by polling worker
|
|
async function processLocalLogFile(taskData, claimedTracker = null) {
|
|
pino.debug(`Processing local log file: ${taskData.localFilePath}`);
|
|
|
|
const result = {
|
|
matchedJobs: [],
|
|
appFileId: null
|
|
};
|
|
|
|
try {
|
|
// Verify file exists
|
|
await fs.access(taskData.localFilePath);
|
|
|
|
if (taskData.partnerCode === PartnerCodes.SATLOC) {
|
|
// Use SatLoc Application Processor for all SatLoc log processing
|
|
pino.debug(`Using SatLoc Application Processor for: ${taskData.localFilePath}`);
|
|
|
|
// Check file size to prevent memory issues
|
|
const fileStats = await fs.stat(taskData.localFilePath);
|
|
const fileSizeMB = fileStats.size / (1024 * 1024);
|
|
|
|
if (fileSizeMB > 100) { // Log files larger than 100MB
|
|
pino.warn(`Large SatLoc log file detected: ${fileSizeMB.toFixed(2)}MB - ${taskData.localFilePath}`);
|
|
}
|
|
|
|
// Retrieve data that was removed from task payload for efficiency
|
|
// - uploadedDate: from tracker record (set when log was detected)
|
|
// - assignments: query from DB by customerId and aircraftId (via user.partnerInfo.partnerAircraftId)
|
|
const uploadedDate = claimedTracker?.uploadedDate || null;
|
|
|
|
// Query assignments for this aircraft from DB
|
|
// partnerAircraftId is stored in user.partnerInfo.partnerAircraftId, not directly on JobAssign
|
|
// Use populateWithPartnerInfo and filter in memory
|
|
const allAssignments = await JobAssign.populateWithPartnerInfo({
|
|
status: { $in: [AssignStatus.UPLOADED, AssignStatus.PROCESSING] }
|
|
}, true); // lean = true
|
|
|
|
// Filter by aircraft ID (matches user.partnerInfo.partnerAircraftId)
|
|
const assignments = allAssignments.filter(assign => {
|
|
const aircraftId = assign.user?.partnerInfo?.partnerAircraftId || assign.user?.partnerInfo?.tailNumber;
|
|
return aircraftId === taskData.aircraftId;
|
|
});
|
|
|
|
pino.debug(`Found ${assignments.length} assignments for aircraft ${taskData.aircraftId} (from ${allAssignments.length} total)`);
|
|
|
|
if (assignments.length === 0) {
|
|
pino.warn(`No assignments found for aircraft ${taskData.aircraftId} - log file may not match any job`);
|
|
}
|
|
|
|
// Build context data with essential fields + retrieved data
|
|
const contextData = {
|
|
taskInfo: {
|
|
source: 'partner_sync',
|
|
// Essential fields from task payload
|
|
customerId: taskData.customerId,
|
|
partnerCode: taskData.partnerCode,
|
|
aircraftId: taskData.aircraftId,
|
|
logId: taskData.logId,
|
|
logFileName: taskData.logFileName,
|
|
localFilePath: taskData.localFilePath,
|
|
// Retrieved from tracker/DB
|
|
uploadedDate: uploadedDate,
|
|
assignments: assignments
|
|
},
|
|
metadata: { fileSize: fileStats.size, fileSizeMB: fileSizeMB.toFixed(2) }
|
|
};
|
|
|
|
// Check if this is a retry scenario by using PartnerLogTracker retry count
|
|
// If retryCount > 1, this indicates previous processing attempts
|
|
const isRetry = claimedTracker && claimedTracker.retryCount > 1;
|
|
|
|
const processor = new SatLocApplicationProcessor({
|
|
batchSize: 1000,
|
|
enableRetryLogic: true
|
|
});
|
|
|
|
let processingResult;
|
|
|
|
try {
|
|
if (isRetry) {
|
|
pino.debug(`Retrying existing log file: ${taskData.logFileName}`);
|
|
processingResult = await processor.retryLogFile(taskData.localFilePath, contextData);
|
|
} else {
|
|
pino.debug(`Processing new log file: ${taskData.logFileName}`);
|
|
processingResult = await processor.processLogFile({ filePath: taskData.localFilePath }, contextData);
|
|
}
|
|
} catch (processingError) {
|
|
// Enhanced error handling for processor failures
|
|
pino.error({
|
|
err: processingError,
|
|
filePath: taskData.localFilePath,
|
|
fileSizeMB,
|
|
isRetry
|
|
}, 'SatLoc Application Processor failed');
|
|
|
|
// Force garbage collection for large files
|
|
if (fileSizeMB > 50 && global.gc) {
|
|
global.gc();
|
|
pino.debug('Forced garbage collection after processing error');
|
|
}
|
|
|
|
throw processingError;
|
|
}
|
|
|
|
if (processingResult.success) {
|
|
// Extract results from the Application Processor
|
|
const applications = processingResult.applications || [processingResult.application];
|
|
const applicationFiles = processingResult.applicationFiles || [processingResult.applicationFile];
|
|
|
|
// Build result compatible with existing tracking
|
|
// Note: matchedJobs comes from satloc_application_processor already properly formatted
|
|
// with assignId and jobId matching partner_log_tracker schema
|
|
result.matchedJobs = processingResult.matchedJobs || [];
|
|
|
|
for (let i = 0; i < applications.length; i++) {
|
|
const application = applications[i];
|
|
const applicationFile = applicationFiles[i];
|
|
|
|
if (application && applicationFile) {
|
|
|
|
// Store the first application file ID for backward compatibility
|
|
if (!result.appFileId) {
|
|
result.appFileId = applicationFile._id;
|
|
}
|
|
}
|
|
}
|
|
|
|
pino.debug(`SatLoc Application Processor completed successfully:`, {
|
|
applicationsCreated: applications.length,
|
|
applicationFilesCreated: applicationFiles.length,
|
|
totalDetails: processingResult.totalDetails,
|
|
statistics: processingResult.statistics,
|
|
fileSizeMB: fileSizeMB
|
|
});
|
|
|
|
// Force garbage collection after every file to prevent memory buildup
|
|
if (global.gc) {
|
|
global.gc();
|
|
pino.debug(`Forced garbage collection after processing file (${fileSizeMB.toFixed(2)}MB)`);
|
|
}
|
|
|
|
} else {
|
|
throw new Error(`SatLoc Application Processor failed: ${processingResult.error}`);
|
|
}
|
|
} else {
|
|
throw new Error(`Unsupported partner log format: ${taskData.partnerCode}`);
|
|
}
|
|
|
|
return result;
|
|
|
|
} catch (error) {
|
|
pino.error(`Error processing local log file:`, error);
|
|
|
|
// Always force GC on error to free up memory from failed processing
|
|
if (global.gc) {
|
|
global.gc();
|
|
pino.debug('Forced garbage collection after processing error');
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Start the workers if not in test mode
|
|
if (env.NODE_ENV !== 'test') {
|
|
// Add memory monitoring with more aggressive cleanup (prevent duplicate intervals)
|
|
if (!memoryMonitorInterval) {
|
|
memoryMonitorInterval = setInterval(() => {
|
|
const memUsage = process.memoryUsage();
|
|
const memUsageMB = {
|
|
rss: Math.round(memUsage.rss / 1024 / 1024),
|
|
heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024),
|
|
heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024),
|
|
external: Math.round(memUsage.external / 1024 / 1024)
|
|
};
|
|
|
|
pino.debug({ memUsageMB, circuitBreakerSize: problematicFiles.size }, 'Memory usage check');
|
|
|
|
// More aggressive threshold - trigger at 512MB heap used
|
|
if (memUsageMB.heapUsed > 512) {
|
|
pino.warn({ memUsageMB }, 'High memory usage detected, forcing GC');
|
|
|
|
if (global.gc) {
|
|
global.gc();
|
|
const afterGC = process.memoryUsage();
|
|
const afterMB = {
|
|
heapUsed: Math.round(afterGC.heapUsed / 1024 / 1024)
|
|
};
|
|
pino.info({ before: memUsageMB.heapUsed, after: afterMB.heapUsed, freed: memUsageMB.heapUsed - afterMB.heapUsed },
|
|
'Garbage collection completed');
|
|
}
|
|
}
|
|
}, 30000); // Every 30 seconds
|
|
|
|
pino.info('Memory monitoring interval started');
|
|
}
|
|
|
|
// Run startup cleanup after a short delay to ensure DB is connected
|
|
setTimeout(async () => {
|
|
pino.info('Running startup cleanup for sync worker...');
|
|
try {
|
|
await startupCleanup();
|
|
} catch (err) {
|
|
pino.error({ err }, 'Sync worker startup cleanup failed');
|
|
}
|
|
}, 2000); // 2 second delay
|
|
|
|
startPartnerWorker(); // Start queue worker
|
|
startPeriodicCleanup(); // Start periodic cleanup for stuck tasks
|
|
pino.debug('Partner sync worker started');
|
|
}
|
|
|
|
module.exports = {
|
|
// Only export the queue worker functionality
|
|
startPartnerWorker,
|
|
startupCleanup
|
|
};
|