'use strict'; const logger = require('../helpers/logger'), pino = logger.child('partner_sync_worker'), env = require('../helpers/env.js'), { DBConnection } = require('../helpers/db/connect'), amqp = require('amqplib'), // Use promise-based version instead of callback { setupDLQQueues } = require('../helpers/dlq_queue_setup'), // DLQ helper module { PartnerTasks, AssignStatus, RecTypes, PartnerCodes, PartnerLogTrackerStatus } = require('../helpers/constants'), SatLocApplicationProcessor = require('../helpers/satloc_application_processor'), { enhancedRunInTransaction } = require('../helpers/mongo_enhanced'), { ApplicationFile, JobAssign } = require('../model'), PartnerLogTracker = require('../model/partner_log_tracker'), TaskTracker = require('../model/task_tracker'), { TaskTrackerStatus } = require('../model/task_tracker'), partnerSyncService = require('../services/partner_sync_service'), fs = require('fs').promises; // Partner Log Tracker Status Constants (using centralized constants) const TRACKER_STATUS = PartnerLogTrackerStatus; // Initialize database connection const workerDB = new DBConnection('Partner Sync Worker'); let amqpConn = null; let amqpChannel = null; let mqClosed = false; let reconnecting = false; // Prevent multiple simultaneous reconnection attempts // Interval tracking to prevent leaks on reconnect let circuitBreakerCleanupInterval = null; let periodicCleanupInterval = null; let memoryMonitorInterval = null; const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER; // Register fatal handlers const path = require('path'); const { registerFatalHandlers } = require('../helpers/process_fatal_handlers'); registerFatalHandlers(process, { env, debug: (msg) => pino.info(msg), kindPrefix: 'partner_sync_worker', reportFilePath: path.join(__dirname, 'partner_sync_worker.rlog'), }); process .on('SIGINT', async () => { pino.info('Received SIGINT, shutting down gracefully...'); mqClosed = true; // Clear all intervals to prevent leaks if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval); if (periodicCleanupInterval) clearInterval(periodicCleanupInterval); if (memoryMonitorInterval) clearInterval(memoryMonitorInterval); // Close channel first to prevent memory leaks if (amqpChannel) { try { await amqpChannel.close(); pino.info('AMQP channel closed'); } catch (err) { pino.error({ err }, 'Error closing AMQP channel'); } } if (amqpConn) { try { await amqpConn.close(); pino.info('AMQP connection closed'); } catch (err) { pino.error({ err }, 'Error closing AMQP connection'); } } process.exit(0); }) .on('SIGTERM', async () => { pino.info('Received SIGTERM, shutting down gracefully...'); mqClosed = true; // Clear all intervals to prevent leaks if (circuitBreakerCleanupInterval) clearInterval(circuitBreakerCleanupInterval); if (periodicCleanupInterval) clearInterval(periodicCleanupInterval); if (memoryMonitorInterval) clearInterval(memoryMonitorInterval); // Close channel first to prevent memory leaks if (amqpChannel) { try { await amqpChannel.close(); pino.info('AMQP channel closed'); } catch (err) { pino.error({ err }, 'Error closing AMQP channel'); } } if (amqpConn) { try { await amqpConn.close(); pino.info('AMQP connection closed'); } catch (err) { pino.error({ err }, 'Error closing AMQP connection'); } } process.exit(0); }); // Initialize the database connection workerDB.initialize({ setupExitHandlers: false }); // Startup cleanup for stuck processing tasks async function startupCleanup() { // Wait for database to be ready with retry logic let retries = 0; const maxRetries = 10; while (!workerDB.isReady() && retries < maxRetries) { pino.debug(`Database not ready for startup cleanup, waiting... (attempt ${retries + 1}/${maxRetries})`); await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second retries++; } if (!workerDB.isReady()) { pino.error('Database not ready after waiting, skipping startup cleanup'); return; } try { const STUCK_PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes const cutoffTime = new Date(Date.now() - STUCK_PROCESSING_TIMEOUT_MS); // Find and reset stuck processing tasks (downloaded -> processing but never completed) const stuckProcessing = await PartnerLogTracker.find({ status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $lt: cutoffTime } }); if (stuckProcessing.length > 0) { pino.info(`Found ${stuckProcessing.length} stuck processing tasks on startup, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Processing timeout - reset on worker startup', updatedAt: new Date() } } ); pino.info(`Reset ${stuckProcessing.length} stuck processing tasks to failed status`); } // Also check for any processing tasks without processingStartedAt (corrupted state) const corruptedProcessing = await PartnerLogTracker.find({ status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $exists: false } }); if (corruptedProcessing.length > 0) { pino.info(`Found ${corruptedProcessing.length} corrupted processing tasks on startup, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $exists: false } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Corrupted processing state - reset on worker startup', updatedAt: new Date() } } ); pino.info(`Reset ${corruptedProcessing.length} corrupted processing tasks to failed status`); } } catch (error) { pino.error({ err: error }, 'Error during sync worker startup cleanup'); } } // Start AMQP queue worker for partner tasks using modern async/await async function startPartnerWorker() { // Prevent multiple simultaneous reconnection attempts if (reconnecting) { pino.debug('Reconnection already in progress, skipping duplicate attempt'); return; } reconnecting = true; try { // Wait for database to be ready before starting worker let dbRetries = 0; const maxDbRetries = 20; while (!workerDB.isReady() && dbRetries < maxDbRetries) { pino.debug(`Waiting for database connection... (attempt ${dbRetries + 1}/${maxDbRetries})`); await new Promise(resolve => setTimeout(resolve, 1000)); dbRetries++; } if (!workerDB.isReady()) { pino.error('Database connection failed after retries, retrying worker startup in 10s'); reconnecting = false; setTimeout(startPartnerWorker, 10000); return; } pino.info('Database connection ready, starting AMQP worker...'); const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR || 'agmuser', password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 0, frameMax: 0 }; // Close old channel if exists (prevents channel leak on reconnect) if (amqpChannel) { try { await amqpChannel.close(); pino.debug('[AMQP] Closed old channel before reconnect'); } catch (err) { pino.debug({ err }, '[AMQP] Error closing old channel (may already be closed)'); } amqpChannel = null; } const conn = await amqp.connect(conOps); conn.on("error", (err) => { if (mqClosed) return; // Already handling reconnection pino.error({ err }, '[AMQP] Connection error'); mqClosed = true; reconnecting = false; // Allow reconnection // Longer delay for permission errors to prevent tight loop const isPermissionError = err.message && err.message.includes('ACCESS_REFUSED'); const delay = isPermissionError ? 30000 : 5000; // 30s for permission errors, 5s for others if (isPermissionError) { pino.error(`[AMQP] Permission error detected. Ensure RabbitMQ user has access to queue '${PARTNER_QUEUE}'. Retrying in ${delay / 1000}s`); } setTimeout(startPartnerWorker, delay); }); conn.on("close", () => { if (mqClosed) return; // Already handling reconnection pino.info('[AMQP] Connection closed'); mqClosed = true; reconnecting = false; // Allow reconnection setTimeout(startPartnerWorker, 5000); }); amqpConn = conn; mqClosed = false; pino.info('[AMQP] Partner worker connected'); // Use DLQ helper module to set up queue infrastructure const { channel, queueNames } = await setupDLQQueues(PARTNER_QUEUE, { connection: conn, retentionDays: env.DLQ_RETENTION_DAYS, prefetch: 1 }); amqpChannel = channel; // Store channel reference for cleanup on reconnect/shutdown pino.info('[AMQP] DLQ infrastructure configured via helper module'); pino.info('[AMQP] Queue flow: %s -> %s (%d days TTL) -> %s', queueNames.main, queueNames.dlq, env.DLQ_RETENTION_DAYS, queueNames.archive); // Start periodic cleanup ONLY AFTER channel is successfully created startPeriodicCleanup(); reconnecting = false; // Connection AND channel successful, allow future reconnections pino.info('[AMQP] Prefetch set to 1 for memory-safe processing'); pino.info('[AMQP] Waiting for partner tasks...'); await channel.consume(PARTNER_QUEUE, async (msg) => { if (!msg) return; let taskMsg; try { taskMsg = JSON.parse(msg.content); } catch (err) { pino.error({ err }, 'Failed to parse task message'); channel.ack(msg); return; } // Check if message was redelivered (failed processing before) const isRedelivered = msg.fields && msg.fields.redelivered; // Add extra protection for redelivered messages if (isRedelivered && taskMsg.logFileName) { pino.debug(`Processing redelivered message for: ${taskMsg.logFileName}`); // Check if this was already successfully processed (crash recovery) if (await isTaskAlreadyProcessed(taskMsg)) { pino.info(`Redelivered task ${taskMsg.logFileName} was already processed, acknowledging`); channel.ack(msg); return; } } try { const result = await processPartnerTask(taskMsg, isRedelivered); if (!mqClosed) { pino.debug('Partner task completed:', result); channel.ack(msg); } } catch (error) { if (!mqClosed) { pino.error({ err: error, taskMsg: { logFileName: taskMsg.logFileName, type: taskMsg.type, customerId: taskMsg.customerId, partnerCode: taskMsg.partnerCode, aircraftId: taskMsg.aircraftId } }, 'Partner task failed'); // Check if task is already processed to avoid sending duplicates to DLQ if (await isTaskAlreadyProcessed(taskMsg)) { pino.debug(`Task ${taskMsg.logFileName} already processed, acknowledging without DLQ`); channel.ack(msg); return; } // For redelivered messages, be more aggressive about detecting fatal errors if (isRedelivered && isFatalError(error)) { pino.error({ err: error, taskMsg }, 'Fatal error on redelivered message, sending to DLQ immediately'); // Enrich message with error metadata before DLQ const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error); if (enriched) { channel.ack(msg); // Ack original, enriched version will fail to DLQ } else { channel.reject(msg, false); // Fallback to direct reject } return; } // Implement retry logic with exponential backoff if (await shouldRetryTask(taskMsg, error)) { pino.debug('Retrying task after delay...'); // Reject with requeue to retry later channel.reject(msg, true); } else { // Final check before DLQ - log for monitoring pino.warn({ logFileName: taskMsg.logFileName, customerId: taskMsg.customerId, partnerCode: taskMsg.partnerCode, aircraftId: taskMsg.aircraftId, retryCount: taskMsg.retryCount || 0, lastError: error.message, isRedelivered }, 'Task exceeded max retries, sending to dead letter queue'); // Enrich message with error metadata before DLQ const enriched = await sendToQueueWithEnrichment(channel, PARTNER_QUEUE, taskMsg, error); if (enriched) { channel.ack(msg); // Ack original, enriched version will fail to DLQ } else { channel.reject(msg, false); // Fallback to direct reject } } } } }, { noAck: false }); } catch (error) { pino.error({ err: error }, '[AMQP] Worker setup failed'); // CRITICAL: Always reset reconnecting flag to allow future attempts reconnecting = false; mqClosed = true; // Close any partially created connection/channel to prevent leaks if (amqpChannel) { try { await amqpChannel.close(); pino.debug('[AMQP] Closed channel after connection failure'); } catch (err) { // Ignore close errors } amqpChannel = null; } if (amqpConn) { try { await amqpConn.close(); pino.debug('[AMQP] Closed connection after setup failure'); } catch (err) { // Ignore close errors } amqpConn = null; } // Check if it's a permission error const isPermissionError = error.message && ( error.message.includes('ACCESS_REFUSED') || error.message.includes('403') ); if (isPermissionError) { pino.error(`[AMQP] Permission denied for queue '${PARTNER_QUEUE}'. Check RabbitMQ user permissions. Retrying in 30s`); setTimeout(startPartnerWorker, 30000); // Longer delay for permission issues } else { pino.error('[AMQP] Connection failed, retrying in 5s'); setTimeout(startPartnerWorker, 5000); } } } // Process individual partner tasks using async/await with timeout protection async function processPartnerTask(taskMsg, isRedelivered = false) { pino.debug('Processing partner task:', taskMsg.type, taskMsg.data, { redelivered: isRedelivered }); // Add timeout protection to prevent hanging tasks const TASK_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes const taskPromise = new Promise(async (resolve, reject) => { try { let result; switch (taskMsg.type) { case PartnerTasks.UPLOAD_PARTNER_JOB: result = await processPartnerJobUpload(taskMsg.data, isRedelivered); break; case PartnerTasks.PROCESS_PARTNER_LOG: result = await processPartnerLog(taskMsg.data, isRedelivered); break; default: throw new Error(`Unknown partner task type: ${taskMsg.type}`); } resolve(result); } catch (error) { reject(error); } }); const timeoutPromise = new Promise((_, reject) => { setTimeout(() => { reject(new Error(`Task ${taskMsg.type} timed out after ${TASK_TIMEOUT_MS / 1000} seconds`)); }, TASK_TIMEOUT_MS); }); try { return await Promise.race([taskPromise, timeoutPromise]); } catch (error) { pino.error({ err: error, taskType: taskMsg.type, logFileName: taskMsg.data?.logFileName, isTimeout: error.message.includes('timed out') }, 'Partner task processing error'); throw error; } } // Constants for task processing const MAX_RETRY_ATTEMPTS = env.PARTNER_MAX_RETRIES; const RETRY_DELAY_MS = 10000; const BASE_RETRY_DELAY = 5000; // 5 seconds base delay const MAX_RETRY_DELAY = 60000; // Max 60 seconds delay const PROCESSING_TIMEOUT_MS = 90 * 60 * 1000; // 90 minutes for large files const CLEANUP_INTERVAL_MS = 30 * 60 * 1000; // 30 minutes (from 15 to prevent overhead) // Circuit breaker for problematic files - more lenient for development const problematicFiles = new Map(); // filename -> { attempts, lastAttempt, blocked } const MAX_FILE_ATTEMPTS = env.NODE_ENV === 'production' ? 3 : 10; // More attempts in dev const FILE_BLOCK_DURATION = env.NODE_ENV === 'production' ? 60 * 60 * 1000 : 5 * 60 * 1000; // 5 minutes in dev vs 1 hour in prod // Periodically clean up old entries from circuit breaker map to prevent memory leak if (!circuitBreakerCleanupInterval) { circuitBreakerCleanupInterval = setInterval(() => { const now = Date.now(); const cutoffTime = now - (FILE_BLOCK_DURATION * 2); // Keep for 2x block duration let cleanedCount = 0; for (const [key, info] of problematicFiles.entries()) { if (info.lastAttempt < cutoffTime) { problematicFiles.delete(key); cleanedCount++; } } if (cleanedCount > 0) { pino.debug(`Cleaned ${cleanedCount} old entries from circuit breaker map (size: ${problematicFiles.size})`); } }, 60 * 60 * 1000); // Clean every hour pino.info('Circuit breaker cleanup interval started'); } // Debug: Log constants at startup pino.info(`Partner sync worker constants - MAX_RETRY_ATTEMPTS: ${MAX_RETRY_ATTEMPTS}, PARTNER_MAX_RETRIES: ${env.PARTNER_MAX_RETRIES}`); pino.info(`Circuit breaker settings - MAX_FILE_ATTEMPTS: ${MAX_FILE_ATTEMPTS}, FILE_BLOCK_DURATION: ${FILE_BLOCK_DURATION}ms (${FILE_BLOCK_DURATION / 60000} minutes)`); pino.info(`Environment: ${env.NODE_ENV}, Development mode circuit breaker: ${env.NODE_ENV === 'development' ? 'LENIENT' : 'STRICT'}`); // Cleanup stuck processing tasks periodically function startPeriodicCleanup() { // Prevent multiple cleanup intervals on reconnect if (periodicCleanupInterval) { pino.debug('Periodic cleanup already running, skipping duplicate'); return; } periodicCleanupInterval = setInterval(async () => { try { const cutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS); const stuckTasks = await PartnerLogTracker.find({ status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $lt: cutoffTime } }); if (stuckTasks.length > 0) { pino.info(`Found ${stuckTasks.length} stuck processing tasks, resetting them`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Processing timeout - automatically reset', updatedAt: new Date() } } ); pino.info(`Reset ${stuckTasks.length} stuck processing tasks to failed status`); } } catch (error) { pino.error({ err: error }, 'Error during periodic cleanup of stuck tasks'); } }, CLEANUP_INTERVAL_MS); pino.info('Periodic cleanup interval started'); } // Check if a task should be retried based on error type and retry count async function shouldRetryTask(taskMsg, error) { try { // Get current retry count from task message or initialize to 0 const retryCount = taskMsg.retryCount || 0; // Debug logging to understand retry logic pino.debug(`shouldRetryTask: retryCount=${retryCount}, MAX_RETRY_ATTEMPTS=${MAX_RETRY_ATTEMPTS}, comparison=${retryCount >= MAX_RETRY_ATTEMPTS}`); // Don't retry if we've exceeded max attempts if (retryCount >= MAX_RETRY_ATTEMPTS) { pino.info(`Task ${taskMsg.type} exceeded max retry attempts (${retryCount} >= ${MAX_RETRY_ATTEMPTS}), sending to DLQ`); return false; } // Don't retry for certain error types (validation errors, etc.) if (isNonRetryableError(error)) { pino.debug(`Task ${taskMsg.type} failed with non-retryable error:`, error.message); return false; } // Calculate exponential backoff delay const delay = Math.min(BASE_RETRY_DELAY * Math.pow(2, retryCount), MAX_RETRY_DELAY); pino.debug(`Will retry task ${taskMsg.type} after ${delay}ms (attempt ${retryCount + 1}/${MAX_RETRY_ATTEMPTS})`); // Update retry count for next attempt taskMsg.retryCount = retryCount + 1; taskMsg.lastError = error.message; taskMsg.nextRetryAt = Date.now() + delay; // Wait for backoff delay await new Promise(resolve => setTimeout(resolve, delay)); return true; } catch (err) { pino.debug('Error in shouldRetryTask:', err); return false; } } // Check if task is already processed to avoid DLQ duplicates async function isTaskAlreadyProcessed(taskMsg) { try { if (!taskMsg.logFileName || !taskMsg.customerId || !taskMsg.partnerCode || !taskMsg.aircraftId) { return false; // If essential fields are missing, let it proceed to DLQ } const filter = { customerId: taskMsg.customerId, partnerCode: taskMsg.partnerCode, aircraftId: taskMsg.aircraftId, // Fixed: use aircraftId, not partnerAircraftId logFileName: taskMsg.logFileName }; const tracker = await PartnerLogTracker.findOne(filter); // Consider processed if explicitly marked as processed OR in processed status const isProcessed = tracker && (tracker.processed === true || tracker.status === TRACKER_STATUS.PROCESSED); if (isProcessed) { pino.debug(`Task ${taskMsg.logFileName} already processed, avoiding DLQ duplicate`); } return isProcessed; } catch (err) { pino.debug('Error checking if task already processed:', err); return false; // On error, let it proceed to avoid blocking } } // Check if an error should not be retried function isNonRetryableError(error) { const nonRetryablePatterns = [ /validation/i, /invalid.*format/i, /malformed/i, // Authentication errors are now retryable (removed from this list) // /authentication.*failed/i, // /unauthorized/i, // /forbidden/i, /not.*found/i, /already.*exists/i, /already.*processed/i ]; return nonRetryablePatterns.some(pattern => pattern.test(error.message)); } // Check if an error is fatal and should immediately go to DLQ (especially for redelivered messages) function isFatalError(error) { const fatalPatterns = [ /out of memory/i, /heap.*limit/i, /maximum call stack/i, /cannot allocate memory/i, /worker killed/i, /signal.*kill/i, /database.*connection.*lost/i, /connection.*terminated/i, /timeout.*exceeded/i, /file.*corrupt/i, /parse.*error/i, /invalid.*file.*format/i ]; return fatalPatterns.some(pattern => pattern.test(error.message)); } /** * Categorize error for DLQ analysis * Based on https://rashadansari.medium.com/strategies-for-successful-dead-letter-queue-event-handling-e354f7dfbb3e */ function categorizeError(error) { const errorMsg = (error?.message || '').toLowerCase(); // Transient errors - temporary issues that may resolve if (/timeout|timed out|connection|network|econnrefused|enotfound|socket|dns/i.test(errorMsg)) { return 'transient'; } // Validation errors - data quality issues if (/validation|invalid|malformed|missing required|bad request|400/i.test(errorMsg)) { return 'validation'; } // Infrastructure errors - database, filesystem if (/database|mongo|transaction|filesystem|eacces|enoent|disk/i.test(errorMsg)) { return 'infrastructure'; } // Partner API errors if (/partner api|api error|http [45]\d\d|unauthorized|forbidden|authentication/i.test(errorMsg)) { return 'partner_api'; } // Processing errors - business logic failures if (/processing|calculation|parse|transform|encode|decode/i.test(errorMsg)) { return 'processing'; } return 'unknown'; } /** * Calculate error severity for alerting */ function calculateSeverity(error, retryCount = 0) { const category = categorizeError(error); // Critical: fatal errors or high retry count if (isFatalError(error) || retryCount >= env.PARTNER_MAX_RETRIES) { return 'critical'; } // High: infrastructure or persistent issues if (category === 'infrastructure' || retryCount >= 3) { return 'high'; } // Medium: validation or processing errors if (category === 'validation' || category === 'processing') { return 'medium'; } // Low: transient errors (likely to resolve) return 'low'; } /** * Create enriched message properties for DLQ tracking */ function createDLQHeaders(taskMsg, error) { return { 'x-error-category': categorizeError(error), 'x-error-reason': error?.message || 'Unknown error', 'x-task-type': taskMsg.type || 'unknown', 'x-severity': calculateSeverity(error, taskMsg.retryCount), 'x-first-death-time': Date.now(), 'x-partner-code': taskMsg.partnerCode || 'unknown', 'x-customer-id': taskMsg.customerId?.toString() || 'unknown' }; } /** * Enrich message with error metadata and send to DLQ * Centralizes DLQ routing logic to avoid code duplication */ async function sendToQueueWithEnrichment(channel, queueName, taskMsg, error) { try { const enrichedTask = { ...taskMsg, lastError: error.message, failedAt: new Date().toISOString(), retryCount: (taskMsg.retryCount || 0) + 1 }; await channel.sendToQueue( queueName, Buffer.from(JSON.stringify(enrichedTask)), { persistent: true, headers: createDLQHeaders(taskMsg, error) } ); return true; } catch (enrichError) { pino.error({ err: enrichError }, 'Failed to enrich message'); return false; } } // Process partner job upload using async/await async function processPartnerJobUpload(taskData, isRedelivered = false) { pino.debug('Uploading job to partner:', taskData, { redelivered: isRedelivered }); try { // If message was redelivered, check if upload was already successful if (isRedelivered) { const assignment = await JobAssign.findById(taskData.assignId); if (assignment && assignment.extJobId) { pino.debug('Job upload already completed on redelivered message:', assignment.extJobId); return { success: true, result: { alreadyProcessed: true, externalJobId: assignment.extJobId } }; } } let result; // Use atomic transaction to ensure upload and status updates are consistent await enhancedRunInTransaction(async (session) => { // Upload job to partner with session for atomic operations. This handles assignment status update and job logging internally result = await partnerSyncService.uploadJobToPartner(taskData.assignId, { session }); if (!result.success) { throw new Error(`Upload failed: ${result.message || 'Unknown error'}`); } }); pino.debug('Partner job upload result:', result); return { success: true, result }; } catch (error) { pino.debug('Partner job upload error:', error); throw error; } } // Process partner log data using async/await async function processPartnerLog(taskData, isRedelivered = false) { pino.debug('Processing partner log:', taskData.logFileName, { redelivered: isRedelivered }); // TaskTracker idempotency check (parallel tracking) const { taskId, executionId } = taskData; if (taskId && executionId) { const taskTracker = await TaskTracker.findOneAndUpdate( { taskId, executionId, status: { $in: [TaskTrackerStatus.QUEUED, TaskTrackerStatus.FAILED] } }, { $set: { status: TaskTrackerStatus.PROCESSING, processingStartedAt: new Date() } }, { new: true } ); if (!taskTracker) { pino.warn({ taskId, executionId, logFileName: taskData.logFileName }, 'Task already claimed or completed by another worker, skipping'); return { skipped: true, reason: 'already_processed' }; } pino.debug({ taskId, executionId }, 'TaskTracker claimed successfully'); } // Circuit breaker: Check if this file has been problematic const fileKey = `${taskData.logFileName}-${taskData.partnerCode}`; const fileInfo = problematicFiles.get(fileKey); // In development, allow manual override by checking for recent successful processing if (fileInfo && fileInfo.blocked && (Date.now() - fileInfo.lastAttempt) < FILE_BLOCK_DURATION) { if (env.NODE_ENV === 'development') { pino.warn(`File ${taskData.logFileName} is temporarily blocked but allowing retry in development mode`); // Reset the block in development to allow retries problematicFiles.delete(fileKey); } else { pino.warn(`File ${taskData.logFileName} is temporarily blocked due to repeated failures`); throw new Error(`File temporarily blocked due to repeated failures - will retry later`); } } const processStartTime = Date.now(); // Build filter for atomic operations from essential task fields // (trackerFilter is no longer passed in task payload - reconstruct from essential fields) const filter = { logId: taskData.logId, partnerCode: taskData.partnerCode, aircraftId: taskData.aircraftId, customerId: taskData.customerId }; try { // Check database connection before processing if (!workerDB.isReady()) { throw new Error('Database connection not ready for processing'); } // Atomically claim log for processing // For redelivered messages, be more aggressive about reclaiming stuck processing tasks const claimFilter = { ...filter, $or: [ { processed: { $exists: false } }, { processed: false } ] }; // For redelivered messages, allow reclaiming stuck PROCESSING tasks (older than 5 minutes) if (isRedelivered) { const stuckProcessingCutoff = new Date(Date.now() - 5 * 60 * 1000); // 5 minutes ago claimFilter.$or.push( // Reclaim PROCESSING tasks that are stuck (started > 5 minutes ago) { status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $lt: stuckProcessingCutoff } }, // Reclaim PROCESSING tasks without processingStartedAt (corrupted state) { status: TRACKER_STATUS.PROCESSING, processingStartedAt: { $exists: false } } ); // Also allow DOWNLOADED and FAILED claimFilter.$or.push( { status: TRACKER_STATUS.DOWNLOADED }, { status: TRACKER_STATUS.FAILED } ); pino.info(`Redelivered message - attempting to reclaim stuck/failed/downloaded task: ${taskData.logFileName}`); } else { // For non-redelivered messages, only claim DOWNLOADED or FAILED claimFilter.status = { $in: [TRACKER_STATUS.DOWNLOADED, TRACKER_STATUS.FAILED] }; } const claimedTracker = await PartnerLogTracker.findOneAndUpdate( claimFilter, { $set: { status: TRACKER_STATUS.PROCESSING, processingStartedAt: new Date(), updatedAt: new Date(), ...(isRedelivered && { errorMessage: 'Reclaimed from redelivered message' }) }, $inc: { retryCount: 1 } }, { new: true } ); // Log current state for debugging if (!claimedTracker) { const existingTracker = await PartnerLogTracker.findOne(filter); pino.debug(`Unable to claim log ${taskData.logFileName}. Current state:`, { logFileName: taskData.logFileName, currentStatus: existingTracker?.status, processed: existingTracker?.processed, processingStartedAt: existingTracker?.processingStartedAt, retryCount: existingTracker?.retryCount }); } if (!claimedTracker) { // Log was already processed or claimed by another worker const existingTracker = await PartnerLogTracker.findOne(filter); if (existingTracker?.processed) { pino.debug(`Log ${taskData.logFileName} already processed`); return { success: true, message: 'Already processed' }; } // For redelivered messages, we already tried to reclaim processing tasks above if (isRedelivered) { pino.warn(`Redelivered message could not be claimed for ${taskData.logFileName}, may be genuinely processed by another worker`); return { success: true, message: 'Could not reclaim redelivered task - likely processed elsewhere' }; } // For non-redelivered messages, handle processing status more conservatively if (existingTracker?.status === TRACKER_STATUS.PROCESSING) { // Check if processing is stuck (taking too long) const isStuck = existingTracker.processingStartedAt && (Date.now() - existingTracker.processingStartedAt.getTime() > PROCESSING_TIMEOUT_MS); if (isStuck) { pino.debug(`Log ${taskData.logFileName} processing appears stuck, resetting to retry`); // Reset stuck processing to allow retry await PartnerLogTracker.findOneAndUpdate( filter, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Processing timeout - reset for retry', updatedAt: new Date() } } ); // Throw error to trigger task requeue/retry throw new Error(`Log ${taskData.logFileName} processing was stuck and has been reset for retry`); } else { pino.debug(`Log ${taskData.logFileName} is currently being processed by another worker`); // Don't acknowledge - let message be redelivered later throw new Error(`Log ${taskData.logFileName} is currently being processed - will retry later`); } } else { pino.debug(`Log ${taskData.logFileName} was claimed by another worker or in wrong state`); // Don't acknowledge - let message be redelivered later throw new Error(`Log ${taskData.logFileName} in unexpected state - will retry later`); } } if (isRedelivered && claimedTracker) { pino.info(`Successfully reclaimed redelivered task: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`); } else { pino.debug(`Successfully claimed log for processing: ${taskData.logFileName} (retry ${claimedTracker.retryCount})`); } try { let processingResult; // Resolve full file path - use service method for path-agnostic storage let localFilePath = taskData.localFilePath; if (!localFilePath) { const SatlocService = require('../services/satloc_service'); const satlocService = new SatlocService(); const savedFile = claimedTracker?.savedLocalFile || taskData.logFileName; localFilePath = satlocService.resolveLogFilePath(savedFile); pino.debug(`Resolved file path via service: ${localFilePath}`); } pino.debug(`Processing local file: ${localFilePath}`); // Verify file exists try { await fs.access(localFilePath); } catch (fileError) { throw new Error(`Local log file not found: ${localFilePath}`); } // Update taskData with resolved path for downstream processing taskData.localFilePath = localFilePath; processingResult = await processLocalLogFile(taskData, claimedTracker); // Atomically mark as successfully processed const completedTracker = await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.PROCESSING, _id: claimedTracker._id }, { $set: { status: TRACKER_STATUS.PROCESSED, processed: true, processedAt: new Date(), matchedJobs: processingResult.matchedJobs, appFileId: processingResult.appFileId, processTime: Date.now() - processStartTime, updatedAt: new Date() } }, { new: true } ); if (!completedTracker) { throw new Error(`Failed to update tracker status to processed for ${taskData.logFileName}`); } // Update TaskTracker to completed (parallel tracking) if (taskId && executionId) { await TaskTracker.updateOne( { executionId }, { $set: { status: TaskTrackerStatus.COMPLETED, completedAt: new Date(), processTime: Date.now() - processStartTime, result: { matchedJobs: processingResult.matchedJobs, appFileId: processingResult.appFileId } } } ).catch(err => { // Non-blocking - log error but don't fail task pino.error({ err, executionId }, 'Failed to update TaskTracker to completed'); }); } pino.debug(`Successfully processed log ${taskData.logFileName}: ${processingResult.matchedJobs.length} jobs matched`); // Circuit breaker: Reset file tracking on success if (problematicFiles.has(fileKey)) { problematicFiles.delete(fileKey); pino.debug(`Cleared problematic file tracking for ${taskData.logFileName}`); } return { success: true, result: processingResult }; } catch (error) { // Circuit breaker: Track problematic files const currentFileInfo = problematicFiles.get(fileKey) || { attempts: 0, lastAttempt: 0, blocked: false }; currentFileInfo.attempts++; currentFileInfo.lastAttempt = Date.now(); if (currentFileInfo.attempts >= MAX_FILE_ATTEMPTS) { currentFileInfo.blocked = true; pino.warn(`File ${taskData.logFileName} marked as problematic after ${currentFileInfo.attempts} attempts`); } problematicFiles.set(fileKey, currentFileInfo); // Atomically mark as failed await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.PROCESSING, _id: claimedTracker._id }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: error.message, processTime: Date.now() - processStartTime, updatedAt: new Date() } }, { new: true } ); // Update TaskTracker with error details (parallel tracking) if (taskId && executionId) { const errorCategory = categorizeError(error); const canRetry = currentFileInfo.attempts < MAX_FILE_ATTEMPTS; await TaskTracker.updateOne( { executionId }, { $set: { status: canRetry ? TaskTrackerStatus.FAILED : TaskTrackerStatus.DLQ, errorMessage: error.message, errorCategory, errorStack: error.stack, failedAt: new Date(), processTime: Date.now() - processStartTime }, $inc: { retryCount: 1 } } ).catch(err => { // Non-blocking - log error but don't fail task pino.error({ err, executionId }, 'Failed to update TaskTracker with error'); }); } throw error; } } catch (error) { pino.debug('Partner log processing error:', error); throw error; } } // Process local log file that was downloaded by polling worker async function processLocalLogFile(taskData, claimedTracker = null) { pino.debug(`Processing local log file: ${taskData.localFilePath}`); const result = { matchedJobs: [], appFileId: null }; try { // Verify file exists await fs.access(taskData.localFilePath); if (taskData.partnerCode === PartnerCodes.SATLOC) { // Use SatLoc Application Processor for all SatLoc log processing pino.debug(`Using SatLoc Application Processor for: ${taskData.localFilePath}`); // Check file size to prevent memory issues const fileStats = await fs.stat(taskData.localFilePath); const fileSizeMB = fileStats.size / (1024 * 1024); if (fileSizeMB > 100) { // Log files larger than 100MB pino.warn(`Large SatLoc log file detected: ${fileSizeMB.toFixed(2)}MB - ${taskData.localFilePath}`); } // Retrieve data that was removed from task payload for efficiency // - uploadedDate: from tracker record (set when log was detected) // - assignments: query from DB by customerId and aircraftId (via user.partnerInfo.partnerAircraftId) const uploadedDate = claimedTracker?.uploadedDate || null; // Query assignments for this aircraft from DB // partnerAircraftId is stored in user.partnerInfo.partnerAircraftId, not directly on JobAssign // Use populateWithPartnerInfo and filter in memory const allAssignments = await JobAssign.populateWithPartnerInfo({ status: { $in: [AssignStatus.UPLOADED, AssignStatus.PROCESSING] } }, true); // lean = true // Filter by aircraft ID (matches user.partnerInfo.partnerAircraftId) const assignments = allAssignments.filter(assign => { const aircraftId = assign.user?.partnerInfo?.partnerAircraftId || assign.user?.partnerInfo?.tailNumber; return aircraftId === taskData.aircraftId; }); pino.debug(`Found ${assignments.length} assignments for aircraft ${taskData.aircraftId} (from ${allAssignments.length} total)`); if (assignments.length === 0) { pino.warn(`No assignments found for aircraft ${taskData.aircraftId} - log file may not match any job`); } // Build context data with essential fields + retrieved data const contextData = { taskInfo: { source: 'partner_sync', // Essential fields from task payload customerId: taskData.customerId, partnerCode: taskData.partnerCode, aircraftId: taskData.aircraftId, logId: taskData.logId, logFileName: taskData.logFileName, localFilePath: taskData.localFilePath, // Retrieved from tracker/DB uploadedDate: uploadedDate, assignments: assignments }, metadata: { fileSize: fileStats.size, fileSizeMB: fileSizeMB.toFixed(2) } }; // Check if this is a retry scenario by using PartnerLogTracker retry count // If retryCount > 1, this indicates previous processing attempts const isRetry = claimedTracker && claimedTracker.retryCount > 1; const processor = new SatLocApplicationProcessor({ batchSize: 1000, enableRetryLogic: true }); let processingResult; try { if (isRetry) { pino.debug(`Retrying existing log file: ${taskData.logFileName}`); processingResult = await processor.retryLogFile(taskData.localFilePath, contextData); } else { pino.debug(`Processing new log file: ${taskData.logFileName}`); processingResult = await processor.processLogFile({ filePath: taskData.localFilePath }, contextData); } } catch (processingError) { // Enhanced error handling for processor failures pino.error({ err: processingError, filePath: taskData.localFilePath, fileSizeMB, isRetry }, 'SatLoc Application Processor failed'); // Force garbage collection for large files if (fileSizeMB > 50 && global.gc) { global.gc(); pino.debug('Forced garbage collection after processing error'); } throw processingError; } if (processingResult.success) { // Extract results from the Application Processor const applications = processingResult.applications || [processingResult.application]; const applicationFiles = processingResult.applicationFiles || [processingResult.applicationFile]; // Build result compatible with existing tracking // Note: matchedJobs comes from satloc_application_processor already properly formatted // with assignId and jobId matching partner_log_tracker schema result.matchedJobs = processingResult.matchedJobs || []; for (let i = 0; i < applications.length; i++) { const application = applications[i]; const applicationFile = applicationFiles[i]; if (application && applicationFile) { // Store the first application file ID for backward compatibility if (!result.appFileId) { result.appFileId = applicationFile._id; } } } pino.debug(`SatLoc Application Processor completed successfully:`, { applicationsCreated: applications.length, applicationFilesCreated: applicationFiles.length, totalDetails: processingResult.totalDetails, statistics: processingResult.statistics, fileSizeMB: fileSizeMB }); // Force garbage collection after every file to prevent memory buildup if (global.gc) { global.gc(); pino.debug(`Forced garbage collection after processing file (${fileSizeMB.toFixed(2)}MB)`); } } else { throw new Error(`SatLoc Application Processor failed: ${processingResult.error}`); } } else { throw new Error(`Unsupported partner log format: ${taskData.partnerCode}`); } return result; } catch (error) { pino.error(`Error processing local log file:`, error); // Always force GC on error to free up memory from failed processing if (global.gc) { global.gc(); pino.debug('Forced garbage collection after processing error'); } throw error; } } // Start the workers if not in test mode if (env.NODE_ENV !== 'test') { // Add memory monitoring with more aggressive cleanup (prevent duplicate intervals) if (!memoryMonitorInterval) { memoryMonitorInterval = setInterval(() => { const memUsage = process.memoryUsage(); const memUsageMB = { rss: Math.round(memUsage.rss / 1024 / 1024), heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024), heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024), external: Math.round(memUsage.external / 1024 / 1024) }; pino.debug({ memUsageMB, circuitBreakerSize: problematicFiles.size }, 'Memory usage check'); // More aggressive threshold - trigger at 512MB heap used if (memUsageMB.heapUsed > 512) { pino.warn({ memUsageMB }, 'High memory usage detected, forcing GC'); if (global.gc) { global.gc(); const afterGC = process.memoryUsage(); const afterMB = { heapUsed: Math.round(afterGC.heapUsed / 1024 / 1024) }; pino.info({ before: memUsageMB.heapUsed, after: afterMB.heapUsed, freed: memUsageMB.heapUsed - afterMB.heapUsed }, 'Garbage collection completed'); } } }, 30000); // Every 30 seconds pino.info('Memory monitoring interval started'); } // Run startup cleanup after a short delay to ensure DB is connected setTimeout(async () => { pino.info('Running startup cleanup for sync worker...'); try { await startupCleanup(); } catch (err) { pino.error({ err }, 'Sync worker startup cleanup failed'); } }, 2000); // 2 second delay startPartnerWorker(); // Start queue worker startPeriodicCleanup(); // Start periodic cleanup for stuck tasks pino.debug('Partner sync worker started'); } module.exports = { // Only export the queue worker functionality startPartnerWorker, startupCleanup };