agmission/Development/server/workers/partner_sync_worker.js

1354 lines
47 KiB
JavaScript

'use strict';
const logger = require('../helpers/logger'),
pino = logger.child('partner_sync_worker'),
env = require('../helpers/env.js'),
{ DBConnection } = require('../helpers/db/connect'),
amqp = require('amqplib'), // Use promise-based version instead of callback
{ setupDLQQueues } = require('../helpers/dlq_queue_setup'), // DLQ helper module
{ PartnerTasks, AssignStatus, RecTypes, PartnerCodes, PartnerLogTrackerStatus } = require('../helpers/constants'),
SatLocApplicationProcessor = require('../helpers/satloc_application_processor'),
{ enhancedRunInTransaction } = require('../helpers/mongo_enhanced'),
{ ApplicationFile, JobAssign } = require('../model'),
PartnerLogTracker = require('../model/partner_log_tracker'),
TaskTracker = require('../model/task_tracker'),
{ TaskTrackerStatus } = require('../model/task_tracker'),
partnerSyncService = require('../services/partner_sync_service'),
fs = require('fs').promises;
// Partner Log Tracker Status Constants (using centralized constants)
const TRACKER_STATUS = PartnerLogTrackerStatus;
// Initialize database connection
const workerDB = new DBConnection('Partner Sync Worker');
let amqpConn = null;
let amqpChannel = null;
let mqClosed = false;
let reconnecting = false; // Prevent multiple simultaneous reconnection attempts
// Interval tracking to prevent leaks on reconnect
let circuitBreakerCleanupInterval = null;
let periodicCleanupInterval = null;
let memoryMonitorInterval = null;
const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER;
// Register fatal handlers
const path = require('path');
const { registerFatalHandlers } = require('../helpers/process_fatal_handlers');
registerFatalHandlers(process, {
env,
debug: (msg) => pino.info(msg),
kindPrefix: 'partner_sync_worker',
reportFilePath: path.join(__dirname, 'partner_sync_worker.rlog'),
});
process
.on('SIGINT', async () => {
pino.info('Received SIGINT, shutting down gracefully...');
mqClosed = true;
// Clear all intervals to prevent leaks
if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval);
if (periodicCleanupInterval) clearInterval(periodicCleanupInterval);
if (memoryMonitorInterval) clearInterval(memoryMonitorInterval);
// Close channel first to prevent memory leaks
if (amqpChannel) {
try {
await amqpChannel.close();
pino.info('AMQP channel closed');
} catch (err) {
pino.error({ err }, 'Error closing AMQP channel');
}
}
if (amqpConn) {
try {
await amqpConn.close();
pino.info('AMQP connection closed');
} catch (err) {
pino.error({ err }, 'Error closing AMQP connection');
}
}
process.exit(0);
})
.on('SIGTERM', async () => {
pino.info('Received SIGTERM, shutting down gracefully...');
mqClosed = true;
// Clear all intervals to prevent leaks
if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval);
if (periodicCleanupInterval) clearInterval(periodicCleanupInterval);
if (memoryMonitorInterval) clearInterval(memoryMonitorInterval);
// Close channel first to prevent memory leaks
if (amqpChannel) {
try {
await amqpChannel.close();
pino.info('AMQP channel closed');
} catch (err) {
pino.error({ err }, 'Error closing AMQP channel');
}
}
if (amqpConn) {
try {
await amqpConn.close();
pino.info('AMQP connection closed');
} catch (err) {
pino.error({ err }, 'Error closing AMQP connection');
}
}
process.exit(0);
});
// Initialize the database connection
workerDB.initialize({ setupExitHandlers: false });
// Startup cleanup for stuck processing tasks
async function startupCleanup() {
// Wait for database to be ready with retry logic
let retries = 0;
const maxRetries = 10;
while (!workerDB.isReady() && retries < maxRetries) {
pino.debug(`Database not ready for startup cleanup, waiting... (attempt ${retries + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second
retries++;
}
if (!workerDB.isReady()) {
pino.error('Database not ready after waiting, skipping startup cleanup');
return;
}
try {
const STUCK_PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
const cutoffTime = new Date(Date.now() - STUCK_PROCESSING_TIMEOUT_MS);
// Find and reset stuck processing tasks (downloaded -> processing but never completed)
const stuckProcessing = await PartnerLogTracker.find({
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $lt: cutoffTime }
});
if (stuckProcessing.length > 0) {
pino.info(`Found ${stuckProcessing.length} stuck processing tasks on startup, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Processing timeout - reset on worker startup',
updatedAt: new Date()
}
}
);
pino.info(`Reset ${stuckProcessing.length} stuck processing tasks to failed status`);
}
// Also check for any processing tasks without processingStartedAt (corrupted state)
const corruptedProcessing = await PartnerLogTracker.find({
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $exists: false }
});
if (corruptedProcessing.length > 0) {
pino.info(`Found ${corruptedProcessing.length} corrupted processing tasks on startup, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $exists: false }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Corrupted processing state - reset on worker startup',
updatedAt: new Date()
}
}
);
pino.info(`Reset ${corruptedProcessing.length} corrupted processing tasks to failed status`);
}
} catch (error) {
pino.error({ err: error }, 'Error during sync worker startup cleanup');
}
}
// Start AMQP queue worker for partner tasks using modern async/await
async function startPartnerWorker() {
// Prevent multiple simultaneous reconnection attempts
if (reconnecting) {
pino.debug('Reconnection already in progress, skipping duplicate attempt');
return;
}
reconnecting = true;
try {
// Wait for database to be ready before starting worker
let dbRetries = 0;
const maxDbRetries = 20;
while (!workerDB.isReady() && dbRetries < maxDbRetries) {
pino.debug(`Waiting for database connection... (attempt ${dbRetries + 1}/${maxDbRetries})`);
await new Promise(resolve => setTimeout(resolve, 1000));
dbRetries++;
}
if (!workerDB.isReady()) {
pino.error('Database connection failed after retries, retrying worker startup in 10s');
reconnecting = false;
setTimeout(startPartnerWorker, 10000);
return;
}
pino.info('Database connection ready, starting AMQP worker...');
const conOps = {
protocol: 'amqp',
hostname: env.QUEUE_HOST || 'localhost',
port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR || 'agmuser',
password: env.QUEUE_PWD,
vhost: env.QUEUE_VHOST || '/',
heartbeat: env.QUEUE_HEARTBEAT || 0,
frameMax: 0
};
// Close old channel if exists (prevents channel leak on reconnect)
if (amqpChannel) {
try {
await amqpChannel.close();
pino.debug('[AMQP] Closed old channel before reconnect');
} catch (err) {
pino.debug({ err }, '[AMQP] Error closing old channel (may already be closed)');
}
amqpChannel = null;
}
const conn = await amqp.connect(conOps);
conn.on("error", (err) => {
if (mqClosed) return; // Already handling reconnection
pino.error({ err }, '[AMQP] Connection error');
mqClosed = true;
reconnecting = false; // Allow reconnection
// Longer delay for permission errors to prevent tight loop
const isPermissionError = err.message && err.message.includes('ACCESS_REFUSED');
const delay = isPermissionError ? 30000 : 5000; // 30s for permission errors, 5s for others
if (isPermissionError) {
pino.error(`[AMQP] Permission error detected. Ensure RabbitMQ user has access to queue '${PARTNER_QUEUE}'. Retrying in ${delay / 1000}s`);
}
setTimeout(startPartnerWorker, delay);
});
conn.on("close", () => {
if (mqClosed) return; // Already handling reconnection
pino.info('[AMQP] Connection closed');
mqClosed = true;
reconnecting = false; // Allow reconnection
setTimeout(startPartnerWorker, 5000);
});
amqpConn = conn;
mqClosed = false;
pino.info('[AMQP] Partner worker connected');
// Use DLQ helper module to set up queue infrastructure
const { channel, queueNames } = await setupDLQQueues(PARTNER_QUEUE, {
connection: conn,
retentionDays: env.DLQ_RETENTION_DAYS,
prefetch: 1
});
amqpChannel = channel; // Store channel reference for cleanup on reconnect/shutdown
pino.info('[AMQP] DLQ infrastructure configured via helper module');
pino.info('[AMQP] Queue flow: %s -> %s (%d days TTL) -> %s',
queueNames.main, queueNames.dlq, env.DLQ_RETENTION_DAYS, queueNames.archive);
// Start periodic cleanup ONLY AFTER channel is successfully created
startPeriodicCleanup();
reconnecting = false; // Connection AND channel successful, allow future reconnections
pino.info('[AMQP] Prefetch set to 1 for memory-safe processing');
pino.info('[AMQP] Waiting for partner tasks...');
await channel.consume(PARTNER_QUEUE, async (msg) => {
if (!msg) return;
let taskMsg;
try {
taskMsg = JSON.parse(msg.content);
} catch (err) {
pino.error({ err }, 'Failed to parse task message');
channel.ack(msg);
return;
}
// Check if message was redelivered (failed processing before)
const isRedelivered = msg.fields && msg.fields.redelivered;
// Add extra protection for redelivered messages
if (isRedelivered && taskMsg.logFileName) {
pino.debug(`Processing redelivered message for: ${taskMsg.logFileName}`);
// Check if this was already successfully processed (crash recovery)
if (await isTaskAlreadyProcessed(taskMsg)) {
pino.info(`Redelivered task ${taskMsg.logFileName} was already processed, acknowledging`);
channel.ack(msg);
return;
}
}
try {
const result = await processPartnerTask(taskMsg, isRedelivered);
if (!mqClosed) {
pino.debug('Partner task completed:', result);
channel.ack(msg);
}
} catch (error) {
if (!mqClosed) {
pino.error({
err: error,
taskMsg: {
logFileName: taskMsg.logFileName,
type: taskMsg.type,
customerId: taskMsg.customerId,
partnerCode: taskMsg.partnerCode,
aircraftId: taskMsg.aircraftId
}
}, 'Partner task failed');
// Check if task is already processed to avoid sending duplicates to DLQ
if (await isTaskAlreadyProcessed(taskMsg)) {
pino.debug(`Task ${taskMsg.logFileName} already processed, acknowledging without DLQ`);
channel.ack(msg);
return;
}
// For redelivered messages, be more aggressive about detecting fatal errors
if (isRedelivered && isFatalError(error)) {
pino.error({ err: error, taskMsg }, 'Fatal error on redelivered message, sending to DLQ immediately');
// Enrich message with error metadata before DLQ
const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error);
if (enriched) {
channel.ack(msg); // Ack original, enriched version will fail to DLQ
} else {
channel.reject(msg, false); // Fallback to direct reject
}
return;
}
// Implement retry logic with exponential backoff
if (await shouldRetryTask(taskMsg, error)) {
pino.debug('Retrying task after delay...');
// Reject with requeue to retry later
channel.reject(msg, true);
} else {
// Final check before DLQ - log for monitoring
pino.warn({
logFileName: taskMsg.logFileName,
customerId: taskMsg.customerId,
partnerCode: taskMsg.partnerCode,
aircraftId: taskMsg.aircraftId,
retryCount: taskMsg.retryCount || 0,
lastError: error.message,
isRedelivered
}, 'Task exceeded max retries, sending to dead letter queue');
// Enrich message with error metadata before DLQ
const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error);
if (enriched) {
channel.ack(msg); // Ack original, enriched version will fail to DLQ
} else {
channel.reject(msg, false); // Fallback to direct reject
}
}
}
}
}, { noAck: false });
} catch (error) {
pino.error({ err: error }, '[AMQP] Worker setup failed');
// CRITICAL: Always reset reconnecting flag to allow future attempts
reconnecting = false;
mqClosed = true;
// Close any partially created connection/channel to prevent leaks
if (amqpChannel) {
try {
await amqpChannel.close();
pino.debug('[AMQP] Closed channel after connection failure');
} catch (err) {
// Ignore close errors
}
amqpChannel = null;
}
if (amqpConn) {
try {
await amqpConn.close();
pino.debug('[AMQP] Closed connection after setup failure');
} catch (err) {
// Ignore close errors
}
amqpConn = null;
}
// Check if it's a permission error
const isPermissionError = error.message && (
error.message.includes('ACCESS_REFUSED') ||
error.message.includes('403')
);
if (isPermissionError) {
pino.error(`[AMQP] Permission denied for queue '${PARTNER_QUEUE}'. Check RabbitMQ user permissions. Retrying in 30s`);
setTimeout(startPartnerWorker, 30000); // Longer delay for permission issues
} else {
pino.error('[AMQP] Connection failed, retrying in 5s');
setTimeout(startPartnerWorker, 5000);
}
}
}
// Process individual partner tasks using async/await with timeout protection
async function processPartnerTask(taskMsg, isRedelivered = false) {
pino.debug('Processing partner task:', taskMsg.type, taskMsg.data, { redelivered: isRedelivered });
// Add timeout protection to prevent hanging tasks
const TASK_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
const taskPromise = new Promise(async (resolve, reject) => {
try {
let result;
switch (taskMsg.type) {
case PartnerTasks.UPLOAD_PARTNER_JOB:
result = await processPartnerJobUpload(taskMsg.data, isRedelivered);
break;
case PartnerTasks.PROCESS_PARTNER_LOG:
result = await processPartnerLog(taskMsg.data, isRedelivered);
break;
default:
throw new Error(`Unknown partner task type: ${taskMsg.type}`);
}
resolve(result);
} catch (error) {
reject(error);
}
});
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Task ${taskMsg.type} timed out after ${TASK_TIMEOUT_MS / 1000} seconds`));
}, TASK_TIMEOUT_MS);
});
try {
return await Promise.race([taskPromise, timeoutPromise]);
} catch (error) {
pino.error({
err: error,
taskType: taskMsg.type,
logFileName: taskMsg.data?.logFileName,
isTimeout: error.message.includes('timed out')
}, 'Partner task processing error');
throw error;
}
}
// Constants for task processing
const MAX_RETRY_ATTEMPTS = env.PARTNER_MAX_RETRIES;
const RETRY_DELAY_MS = 10000;
const BASE_RETRY_DELAY = 5000; // 5 seconds base delay
const MAX_RETRY_DELAY = 60000; // Max 60 seconds delay
const PROCESSING_TIMEOUT_MS = 90 * 60 * 1000; // 90 minutes for large files
const CLEANUP_INTERVAL_MS = 30 * 60 * 1000; // 30 minutes (from 15 to prevent overhead)
// Circuit breaker for problematic files - more lenient for development
const problematicFiles = new Map(); // filename -> { attempts, lastAttempt, blocked }
const MAX_FILE_ATTEMPTS = env.NODE_ENV === 'production' ? 3 : 10; // More attempts in dev
const FILE_BLOCK_DURATION = env.NODE_ENV === 'production' ? 60 * 60 * 1000 : 5 * 60 * 1000; // 5 minutes in dev vs 1 hour in prod
// Periodically clean up old entries from circuit breaker map to prevent memory leak
if (!circuitBreakerCleanupInterval) {
circuitBreakerCleanupInterval = setInterval(() => {
const now = Date.now();
const cutoffTime = now - (FILE_BLOCK_DURATION * 2); // Keep for 2x block duration
let cleanedCount = 0;
for (const [key, info] of problematicFiles.entries()) {
if (info.lastAttempt < cutoffTime) {
problematicFiles.delete(key);
cleanedCount++;
}
}
if (cleanedCount > 0) {
pino.debug(`Cleaned ${cleanedCount} old entries from circuit breaker map (size: ${problematicFiles.size})`);
}
}, 60 * 60 * 1000); // Clean every hour
pino.info('Circuit breaker cleanup interval started');
}
// Debug: Log constants at startup
pino.info(`Partner sync worker constants - MAX_RETRY_ATTEMPTS: ${MAX_RETRY_ATTEMPTS}, PARTNER_MAX_RETRIES: ${env.PARTNER_MAX_RETRIES}`);
pino.info(`Circuit breaker settings - MAX_FILE_ATTEMPTS: ${MAX_FILE_ATTEMPTS}, FILE_BLOCK_DURATION: ${FILE_BLOCK_DURATION}ms (${FILE_BLOCK_DURATION / 60000} minutes)`);
pino.info(`Environment: ${env.NODE_ENV}, Development mode circuit breaker: ${env.NODE_ENV === 'development' ? 'LENIENT' : 'STRICT'}`);
// Cleanup stuck processing tasks periodically
function startPeriodicCleanup() {
// Prevent multiple cleanup intervals on reconnect
if (periodicCleanupInterval) {
pino.debug('Periodic cleanup already running, skipping duplicate');
return;
}
periodicCleanupInterval = setInterval(async () => {
try {
const cutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS);
const stuckTasks = await PartnerLogTracker.find({
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $lt: cutoffTime }
});
if (stuckTasks.length > 0) {
pino.info(`Found ${stuckTasks.length} stuck processing tasks, resetting them`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Processing timeout - automatically reset',
updatedAt: new Date()
}
}
);
pino.info(`Reset ${stuckTasks.length} stuck processing tasks to failed status`);
}
} catch (error) {
pino.error({ err: error }, 'Error during periodic cleanup of stuck tasks');
}
}, CLEANUP_INTERVAL_MS);
pino.info('Periodic cleanup interval started');
}
// Check if a task should be retried based on error type and retry count
async function shouldRetryTask(taskMsg, error) {
try {
// Get current retry count from task message or initialize to 0
const retryCount = taskMsg.retryCount || 0;
// Debug logging to understand retry logic
pino.debug(`shouldRetryTask: retryCount=${retryCount}, MAX_RETRY_ATTEMPTS=${MAX_RETRY_ATTEMPTS}, comparison=${retryCount >= MAX_RETRY_ATTEMPTS}`);
// Don't retry if we've exceeded max attempts
if (retryCount >= MAX_RETRY_ATTEMPTS) {
pino.info(`Task ${taskMsg.type} exceeded max retry attempts (${retryCount} >= ${MAX_RETRY_ATTEMPTS}), sending to DLQ`);
return false;
}
// Don't retry for certain error types (validation errors, etc.)
if (isNonRetryableError(error)) {
pino.debug(`Task ${taskMsg.type} failed with non-retryable error:`, error.message);
return false;
}
// Calculate exponential backoff delay
const delay = Math.min(BASE_RETRY_DELAY * Math.pow(2, retryCount), MAX_RETRY_DELAY);
pino.debug(`Will retry task ${taskMsg.type} after ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRY_ATTEMPTS})`);
// Update retry count for next attempt
taskMsg.retryCount = retryCount + 1;
taskMsg.lastError = error.message;
taskMsg.nextRetryAt = Date.now() + delay;
// Wait for backoff delay
await new Promise(resolve => setTimeout(resolve, delay));
return true;
} catch (err) {
pino.debug('Error in shouldRetryTask:', err);
return false;
}
}
// Check if task is already processed to avoid DLQ duplicates
async function isTaskAlreadyProcessed(taskMsg) {
try {
if (!taskMsg.logFileName || !taskMsg.customerId || !taskMsg.partnerCode || !taskMsg.aircraftId) {
return false; // If essential fields are missing, let it proceed to DLQ
}
const filter = {
customerId: taskMsg.customerId,
partnerCode: taskMsg.partnerCode,
aircraftId: taskMsg.aircraftId, // Fixed: use aircraftId, not partnerAircraftId
logFileName: taskMsg.logFileName
};
const tracker = await PartnerLogTracker.findOne(filter);
// Consider processed if explicitly marked as processed OR in processed status
const isProcessed = tracker && (tracker.processed === true || tracker.status === TRACKER_STATUS.PROCESSED);
if (isProcessed) {
pino.debug(`Task ${taskMsg.logFileName} already processed, avoiding DLQ duplicate`);
}
return isProcessed;
} catch (err) {
pino.debug('Error checking if task already processed:', err);
return false; // On error, let it proceed to avoid blocking
}
}
// Check if an error should not be retried
function isNonRetryableError(error) {
const nonRetryablePatterns = [
/validation/i,
/invalid.*format/i,
/malformed/i,
// Authentication errors are now retryable (removed from this list)
// /authentication.*failed/i,
// /unauthorized/i,
// /forbidden/i,
/not.*found/i,
/already.*exists/i,
/already.*processed/i
];
return nonRetryablePatterns.some(pattern => pattern.test(error.message));
}
// Check if an error is fatal and should immediately go to DLQ (especially for redelivered messages)
function isFatalError(error) {
const fatalPatterns = [
/out of memory/i,
/heap.*limit/i,
/maximum call stack/i,
/cannot allocate memory/i,
/worker killed/i,
/signal.*kill/i,
/database.*connection.*lost/i,
/connection.*terminated/i,
/timeout.*exceeded/i,
/file.*corrupt/i,
/parse.*error/i,
/invalid.*file.*format/i
];
return fatalPatterns.some(pattern => pattern.test(error.message));
}
/**
* Categorize error for DLQ analysis
* Based on https://rashadansari.medium.com/strategies-for-successful-dead-letter-queue-event-handling-e354f7dfbb3e
*/
function categorizeError(error) {
const errorMsg = (error?.message || '').toLowerCase();
// Transient errors - temporary issues that may resolve
if (/timeout|timed out|connection|network|econnrefused|enotfound|socket|dns/i.test(errorMsg)) {
return 'transient';
}
// Validation errors - data quality issues
if (/validation|invalid|malformed|missing required|bad request|400/i.test(errorMsg)) {
return 'validation';
}
// Infrastructure errors - database, filesystem
if (/database|mongo|transaction|filesystem|eacces|enoent|disk/i.test(errorMsg)) {
return 'infrastructure';
}
// Partner API errors
if (/partner api|api error|http [45]\d\d|unauthorized|forbidden|authentication/i.test(errorMsg)) {
return 'partner_api';
}
// Processing errors - business logic failures
if (/processing|calculation|parse|transform|encode|decode/i.test(errorMsg)) {
return 'processing';
}
return 'unknown';
}
/**
* Calculate error severity for alerting
*/
function calculateSeverity(error, retryCount = 0) {
const category = categorizeError(error);
// Critical: fatal errors or high retry count
if (isFatalError(error) || retryCount >= env.PARTNER_MAX_RETRIES) {
return 'critical';
}
// High: infrastructure or persistent issues
if (category === 'infrastructure' || retryCount >= 3) {
return 'high';
}
// Medium: validation or processing errors
if (category === 'validation' || category === 'processing') {
return 'medium';
}
// Low: transient errors (likely to resolve)
return 'low';
}
/**
* Create enriched message properties for DLQ tracking
*/
function createDLQHeaders(taskMsg, error) {
return {
'x-error-category': categorizeError(error),
'x-error-reason': error?.message || 'Unknown error',
'x-task-type': taskMsg.type || 'unknown',
'x-severity': calculateSeverity(error, taskMsg.retryCount),
'x-first-death-time': Date.now(),
'x-partner-code': taskMsg.partnerCode || 'unknown',
'x-customer-id': taskMsg.customerId?.toString() || 'unknown'
};
}
/**
* Enrich message with error metadata and send to DLQ
* Centralizes DLQ routing logic to avoid code duplication
*/
async function sendToQueueWithEnrichment(channel, queueName, taskMsg, error) {
try {
const enrichedTask = {
...taskMsg,
lastError: error.message,
failedAt: new Date().toISOString(),
retryCount: (taskMsg.retryCount || 0) + 1
};
await channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(enrichedTask)),
{
persistent: true,
headers: createDLQHeaders(taskMsg, error)
}
);
return true;
} catch (enrichError) {
pino.error({ err: enrichError }, 'Failed to enrich message');
return false;
}
}
// Process partner job upload using async/await
async function processPartnerJobUpload(taskData, isRedelivered = false) {
pino.debug('Uploading job to partner:', taskData, { redelivered: isRedelivered });
try {
// If message was redelivered, check if upload was already successful
if (isRedelivered) {
const assignment = await JobAssign.findById(taskData.assignId);
if (assignment && assignment.extJobId) {
pino.debug('Job upload already completed on redelivered message:', assignment.extJobId);
return { success: true, result: { alreadyProcessed: true, externalJobId: assignment.extJobId } };
}
}
let result;
// Use atomic transaction to ensure upload and status updates are consistent
await enhancedRunInTransaction(async (session) => {
// Upload job to partner with session for atomic operations. This handles assignment status update and job logging internally
result = await partnerSyncService.uploadJobToPartner(taskData.assignId, { session });
if (!result.success) {
throw new Error(`Upload failed: ${result.message || 'Unknown error'}`);
}
});
pino.debug('Partner job upload result:', result);
return { success: true, result };
} catch (error) {
pino.debug('Partner job upload error:', error);
throw error;
}
}
// Process partner log data using async/await
async function processPartnerLog(taskData, isRedelivered = false) {
pino.debug('Processing partner log:', taskData.logFileName, { redelivered: isRedelivered });
// TaskTracker idempotency check (parallel tracking)
const { taskId, executionId } = taskData;
if (taskId && executionId) {
const taskTracker = await TaskTracker.findOneAndUpdate(
{
taskId,
executionId,
status: { $in: [TaskTrackerStatus.QUEUED, TaskTrackerStatus.FAILED] }
},
{
$set: {
status: TaskTrackerStatus.PROCESSING,
processingStartedAt: new Date()
}
},
{ new: true }
);
if (!taskTracker) {
pino.warn({ taskId, executionId, logFileName: taskData.logFileName },
'Task already claimed or completed by another worker, skipping');
return { skipped: true, reason: 'already_processed' };
}
pino.debug({ taskId, executionId }, 'TaskTracker claimed successfully');
}
// Circuit breaker: Check if this file has been problematic
const fileKey = `${taskData.logFileName}-${taskData.partnerCode}`;
const fileInfo = problematicFiles.get(fileKey);
// In development, allow manual override by checking for recent successful processing
if (fileInfo && fileInfo.blocked && (Date.now() - fileInfo.lastAttempt) < FILE_BLOCK_DURATION) {
if (env.NODE_ENV === 'development') {
pino.warn(`File ${taskData.logFileName} is temporarily blocked but allowing retry in development mode`);
// Reset the block in development to allow retries
problematicFiles.delete(fileKey);
} else {
pino.warn(`File ${taskData.logFileName} is temporarily blocked due to repeated failures`);
throw new Error(`File temporarily blocked due to repeated failures - will retry later`);
}
}
const processStartTime = Date.now();
// Build filter for atomic operations from essential task fields
// (trackerFilter is no longer passed in task payload - reconstruct from essential fields)
const filter = {
logId: taskData.logId,
partnerCode: taskData.partnerCode,
aircraftId: taskData.aircraftId,
customerId: taskData.customerId
};
try {
// Check database connection before processing
if (!workerDB.isReady()) {
throw new Error('Database connection not ready for processing');
}
// Atomically claim log for processing
// For redelivered messages, be more aggressive about reclaiming stuck processing tasks
const claimFilter = {
...filter,
$or: [
{ processed: { $exists: false } },
{ processed: false }
]
};
// For redelivered messages, allow reclaiming stuck PROCESSING tasks (older than 5 minutes)
if (isRedelivered) {
const stuckProcessingCutoff = new Date(Date.now() - 5 * 60 * 1000); // 5 minutes ago
claimFilter.$or.push(
// Reclaim PROCESSING tasks that are stuck (started > 5 minutes ago)
{
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $lt: stuckProcessingCutoff }
},
// Reclaim PROCESSING tasks without processingStartedAt (corrupted state)
{
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: { $exists: false }
}
);
// Also allow DOWNLOADED and FAILED
claimFilter.$or.push(
{ status: TRACKER_STATUS.DOWNLOADED },
{ status: TRACKER_STATUS.FAILED }
);
pino.info(`Redelivered message - attempting to reclaim stuck/failed/downloaded task: ${taskData.logFileName}`);
} else {
// For non-redelivered messages, only claim DOWNLOADED or FAILED
claimFilter.status = { $in: [TRACKER_STATUS.DOWNLOADED, TRACKER_STATUS.FAILED] };
}
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
claimFilter,
{
$set: {
status: TRACKER_STATUS.PROCESSING,
processingStartedAt: new Date(),
updatedAt: new Date(),
...(isRedelivered && { errorMessage: 'Reclaimed from redelivered message' })
},
$inc: { retryCount: 1 }
},
{ new: true }
);
// Log current state for debugging
if (!claimedTracker) {
const existingTracker = await PartnerLogTracker.findOne(filter);
pino.debug(`Unable to claim log ${taskData.logFileName}. Current state:`, {
logFileName: taskData.logFileName,
currentStatus: existingTracker?.status,
processed: existingTracker?.processed,
processingStartedAt: existingTracker?.processingStartedAt,
retryCount: existingTracker?.retryCount
});
}
if (!claimedTracker) {
// Log was already processed or claimed by another worker
const existingTracker = await PartnerLogTracker.findOne(filter);
if (existingTracker?.processed) {
pino.debug(`Log ${taskData.logFileName} already processed`);
return { success: true, message: 'Already processed' };
}
// For redelivered messages, we already tried to reclaim processing tasks above
if (isRedelivered) {
pino.warn(`Redelivered message could not be claimed for ${taskData.logFileName}, may be genuinely processed by another worker`);
return { success: true, message: 'Could not reclaim redelivered task - likely processed elsewhere' };
}
// For non-redelivered messages, handle processing status more conservatively
if (existingTracker?.status === TRACKER_STATUS.PROCESSING) {
// Check if processing is stuck (taking too long)
const isStuck = existingTracker.processingStartedAt &&
(Date.now() - existingTracker.processingStartedAt.getTime() > PROCESSING_TIMEOUT_MS);
if (isStuck) {
pino.debug(`Log ${taskData.logFileName} processing appears stuck, resetting to retry`);
// Reset stuck processing to allow retry
await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Processing timeout - reset for retry',
updatedAt: new Date()
}
}
);
// Throw error to trigger task requeue/retry
throw new Error(`Log ${taskData.logFileName} processing was stuck and has been reset for retry`);
} else {
pino.debug(`Log ${taskData.logFileName} is currently being processed by another worker`);
// Don't acknowledge - let message be redelivered later
throw new Error(`Log ${taskData.logFileName} is currently being processed - will retry later`);
}
} else {
pino.debug(`Log ${taskData.logFileName} was claimed by another worker or in wrong state`);
// Don't acknowledge - let message be redelivered later
throw new Error(`Log ${taskData.logFileName} in unexpected state - will retry later`);
}
}
if (isRedelivered && claimedTracker) {
pino.info(`Successfully reclaimed redelivered task: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`);
} else {
pino.debug(`Successfully claimed log for processing: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`);
}
try {
let processingResult;
// Resolve full file path - use service method for path-agnostic storage
let localFilePath = taskData.localFilePath;
if (!localFilePath) {
const SatlocService = require('../services/satloc_service');
const satlocService = new SatlocService();
const savedFile = claimedTracker?.savedLocalFile || taskData.logFileName;
localFilePath = satlocService.resolveLogFilePath(savedFile);
pino.debug(`Resolved file path via service: ${localFilePath}`);
}
pino.debug(`Processing local file: ${localFilePath}`);
// Verify file exists
try {
await fs.access(localFilePath);
} catch (fileError) {
throw new Error(`Local log file not found: ${localFilePath}`);
}
// Update taskData with resolved path for downstream processing
taskData.localFilePath = localFilePath;
processingResult = await processLocalLogFile(taskData, claimedTracker);
// Atomically mark as successfully processed
const completedTracker = await PartnerLogTracker.findOneAndUpdate(
{
...filter,
status: TRACKER_STATUS.PROCESSING,
_id: claimedTracker._id
},
{
$set: {
status: TRACKER_STATUS.PROCESSED,
processed: true,
processedAt: new Date(),
matchedJobs: processingResult.matchedJobs,
appFileId: processingResult.appFileId,
processTime: Date.now() - processStartTime,
updatedAt: new Date()
}
},
{ new: true }
);
if (!completedTracker) {
throw new Error(`Failed to update tracker status to processed for ${taskData.logFileName}`);
}
// Update TaskTracker to completed (parallel tracking)
if (taskId && executionId) {
await TaskTracker.updateOne(
{ executionId },
{
$set: {
status: TaskTrackerStatus.COMPLETED,
completedAt: new Date(),
processTime: Date.now() - processStartTime,
result: {
matchedJobs: processingResult.matchedJobs,
appFileId: processingResult.appFileId
}
}
}
).catch(err => {
// Non-blocking - log error but don't fail task
pino.error({ err, executionId }, 'Failed to update TaskTracker to completed');
});
}
pino.debug(`Successfully processed log ${taskData.logFileName}: ${processingResult.matchedJobs.length} jobs matched`);
// Circuit breaker: Reset file tracking on success
if (problematicFiles.has(fileKey)) {
problematicFiles.delete(fileKey);
pino.debug(`Cleared problematic file tracking for ${taskData.logFileName}`);
}
return {
success: true,
result: processingResult
};
} catch (error) {
// Circuit breaker: Track problematic files
const currentFileInfo = problematicFiles.get(fileKey) || { attempts: 0, lastAttempt: 0, blocked: false };
currentFileInfo.attempts++;
currentFileInfo.lastAttempt = Date.now();
if (currentFileInfo.attempts >= MAX_FILE_ATTEMPTS) {
currentFileInfo.blocked = true;
pino.warn(`File ${taskData.logFileName} marked as problematic after ${currentFileInfo.attempts} attempts`);
}
problematicFiles.set(fileKey, currentFileInfo);
// Atomically mark as failed
await PartnerLogTracker.findOneAndUpdate(
{
...filter,
status: TRACKER_STATUS.PROCESSING,
_id: claimedTracker._id
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: error.message,
processTime: Date.now() - processStartTime,
updatedAt: new Date()
}
},
{ new: true }
);
// Update TaskTracker with error details (parallel tracking)
if (taskId && executionId) {
const errorCategory = categorizeError(error);
const canRetry = currentFileInfo.attempts < MAX_FILE_ATTEMPTS;
await TaskTracker.updateOne(
{ executionId },
{
$set: {
status: canRetry ? TaskTrackerStatus.FAILED : TaskTrackerStatus.DLQ,
errorMessage: error.message,
errorCategory,
errorStack: error.stack,
failedAt: new Date(),
processTime: Date.now() - processStartTime
},
$inc: { retryCount: 1 }
}
).catch(err => {
// Non-blocking - log error but don't fail task
pino.error({ err, executionId }, 'Failed to update TaskTracker with error');
});
}
throw error;
}
} catch (error) {
pino.debug('Partner log processing error:', error);
throw error;
}
}
// Process local log file that was downloaded by polling worker
async function processLocalLogFile(taskData, claimedTracker = null) {
pino.debug(`Processing local log file: ${taskData.localFilePath}`);
const result = {
matchedJobs: [],
appFileId: null
};
try {
// Verify file exists
await fs.access(taskData.localFilePath);
if (taskData.partnerCode === PartnerCodes.SATLOC) {
// Use SatLoc Application Processor for all SatLoc log processing
pino.debug(`Using SatLoc Application Processor for: ${taskData.localFilePath}`);
// Check file size to prevent memory issues
const fileStats = await fs.stat(taskData.localFilePath);
const fileSizeMB = fileStats.size / (1024 * 1024);
if (fileSizeMB > 100) { // Log files larger than 100MB
pino.warn(`Large SatLoc log file detected: ${fileSizeMB.toFixed(2)}MB - ${taskData.localFilePath}`);
}
// Retrieve data that was removed from task payload for efficiency
// - uploadedDate: from tracker record (set when log was detected)
// - assignments: query from DB by customerId and aircraftId (via user.partnerInfo.partnerAircraftId)
const uploadedDate = claimedTracker?.uploadedDate || null;
// Query assignments for this aircraft from DB
// partnerAircraftId is stored in user.partnerInfo.partnerAircraftId, not directly on JobAssign
// Use populateWithPartnerInfo and filter in memory
const allAssignments = await JobAssign.populateWithPartnerInfo({
status: { $in: [AssignStatus.UPLOADED, AssignStatus.PROCESSING] }
}, true); // lean = true
// Filter by aircraft ID (matches user.partnerInfo.partnerAircraftId)
const assignments = allAssignments.filter(assign => {
const aircraftId = assign.user?.partnerInfo?.partnerAircraftId || assign.user?.partnerInfo?.tailNumber;
return aircraftId === taskData.aircraftId;
});
pino.debug(`Found ${assignments.length} assignments for aircraft ${taskData.aircraftId} (from ${allAssignments.length} total)`);
if (assignments.length === 0) {
pino.warn(`No assignments found for aircraft ${taskData.aircraftId} - log file may not match any job`);
}
// Build context data with essential fields + retrieved data
const contextData = {
taskInfo: {
source: 'partner_sync',
// Essential fields from task payload
customerId: taskData.customerId,
partnerCode: taskData.partnerCode,
aircraftId: taskData.aircraftId,
logId: taskData.logId,
logFileName: taskData.logFileName,
localFilePath: taskData.localFilePath,
// Retrieved from tracker/DB
uploadedDate: uploadedDate,
assignments: assignments
},
metadata: { fileSize: fileStats.size, fileSizeMB: fileSizeMB.toFixed(2) }
};
// Check if this is a retry scenario by using PartnerLogTracker retry count
// If retryCount > 1, this indicates previous processing attempts
const isRetry = claimedTracker && claimedTracker.retryCount > 1;
const processor = new SatLocApplicationProcessor({
batchSize: 1000,
enableRetryLogic: true
});
let processingResult;
try {
if (isRetry) {
pino.debug(`Retrying existing log file: ${taskData.logFileName}`);
processingResult = await processor.retryLogFile(taskData.localFilePath, contextData);
} else {
pino.debug(`Processing new log file: ${taskData.logFileName}`);
processingResult = await processor.processLogFile({ filePath: taskData.localFilePath }, contextData);
}
} catch (processingError) {
// Enhanced error handling for processor failures
pino.error({
err: processingError,
filePath: taskData.localFilePath,
fileSizeMB,
isRetry
}, 'SatLoc Application Processor failed');
// Force garbage collection for large files
if (fileSizeMB > 50 && global.gc) {
global.gc();
pino.debug('Forced garbage collection after processing error');
}
throw processingError;
}
if (processingResult.success) {
// Extract results from the Application Processor
const applications = processingResult.applications || [processingResult.application];
const applicationFiles = processingResult.applicationFiles || [processingResult.applicationFile];
// Build result compatible with existing tracking
// Note: matchedJobs comes from satloc_application_processor already properly formatted
// with assignId and jobId matching partner_log_tracker schema
result.matchedJobs = processingResult.matchedJobs || [];
for (let i = 0; i < applications.length; i++) {
const application = applications[i];
const applicationFile = applicationFiles[i];
if (application && applicationFile) {
// Store the first application file ID for backward compatibility
if (!result.appFileId) {
result.appFileId = applicationFile._id;
}
}
}
pino.debug(`SatLoc Application Processor completed successfully:`, {
applicationsCreated: applications.length,
applicationFilesCreated: applicationFiles.length,
totalDetails: processingResult.totalDetails,
statistics: processingResult.statistics,
fileSizeMB: fileSizeMB
});
// Force garbage collection after every file to prevent memory buildup
if (global.gc) {
global.gc();
pino.debug(`Forced garbage collection after processing file (${fileSizeMB.toFixed(2)}MB)`);
}
} else {
throw new Error(`SatLoc Application Processor failed: ${processingResult.error}`);
}
} else {
throw new Error(`Unsupported partner log format: ${taskData.partnerCode}`);
}
return result;
} catch (error) {
pino.error(`Error processing local log file:`, error);
// Always force GC on error to free up memory from failed processing
if (global.gc) {
global.gc();
pino.debug('Forced garbage collection after processing error');
}
throw error;
}
}
// Start the workers if not in test mode
if (env.NODE_ENV !== 'test') {
// Add memory monitoring with more aggressive cleanup (prevent duplicate intervals)
if (!memoryMonitorInterval) {
memoryMonitorInterval = setInterval(() => {
const memUsage = process.memoryUsage();
const memUsageMB = {
rss: Math.round(memUsage.rss / 1024 / 1024),
heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024),
heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024),
external: Math.round(memUsage.external / 1024 / 1024)
};
pino.debug({ memUsageMB, circuitBreakerSize: problematicFiles.size }, 'Memory usage check');
// More aggressive threshold - trigger at 512MB heap used
if (memUsageMB.heapUsed > 512) {
pino.warn({ memUsageMB }, 'High memory usage detected, forcing GC');
if (global.gc) {
global.gc();
const afterGC = process.memoryUsage();
const afterMB = {
heapUsed: Math.round(afterGC.heapUsed / 1024 / 1024)
};
pino.info({ before: memUsageMB.heapUsed, after: afterMB.heapUsed, freed: memUsageMB.heapUsed - afterMB.heapUsed },
'Garbage collection completed');
}
}
}, 30000); // Every 30 seconds
pino.info('Memory monitoring interval started');
}
// Run startup cleanup after a short delay to ensure DB is connected
setTimeout(async () => {
pino.info('Running startup cleanup for sync worker...');
try {
await startupCleanup();
} catch (err) {
pino.error({ err }, 'Sync worker startup cleanup failed');
}
}, 2000); // 2 second delay
startPartnerWorker(); // Start queue worker
startPeriodicCleanup(); // Start periodic cleanup for stuck tasks
pino.debug('Partner sync worker started');
}
module.exports = {
// Only export the queue worker functionality
startPartnerWorker,
startupCleanup
};