'use strict'; /** * Processing flows of Partner Data Polling Worker: * ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ * │ Cron Job │───▶│ Poll Partners │───▶│ Download Logs │───▶│ Enqueue Tasks │ * │ (15min/1min) │ │ for New Data │ │ Locally │ │ for Processing │ * └─────────────────┘ └──────────────────┘ └─────────────────┘ └──────────────────┘ */ const cron = require('node-cron'), logger = require('../helpers/logger'), pino = logger.child('partner_data_polling_worker'), env = require('../helpers/env.js'), isProd = env.PRODUCTION, { DBConnection } = require('../helpers/db/connect'), { JobAssign } = require('../model'), { AssignStatus, UserTypes, PartnerTasks, PartnerLogTrackerStatus } = require('../helpers/constants'), { PartnerLogTracker } = require('../model'), TaskTracker = require('../model/task_tracker'), { TaskTrackerStatus } = require('../model/task_tracker'), { generateTaskId, generateExecutionId } = require('../services/task_id_generator'); // Partner Log Tracker Status Constants (using centralized constants) const TRACKER_STATUS = PartnerLogTrackerStatus; // Partner queue name (auto-prefixed with 'dev_' in development by env.js) const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER; // Initialize database connection const workerDB = new DBConnection('Partner Data Polling Worker'); // Initialize task queue for enqueueing partner log processing tasks const taskQHelper = require('../helpers/job_queue').getInstance(); taskQHelper.start(); // Register fatal handlers const path = require('path'); const { registerFatalHandlers } = require('../helpers/process_fatal_handlers'); registerFatalHandlers(process, { env, debug: (msg) => pino.info(msg), kindPrefix: 'partner_data_polling_worker', reportFilePath: path.join(__dirname, 'partner_data_polling_worker.rlog'), }); // Initialize the database connection workerDB.initialize({ setupExitHandlers: false }); // Startup cleanup for stuck polling tasks async function startupCleanup() { // Wait for database to be ready with retry logic let retries = 0; const maxRetries = 10; while (!workerDB.isReady() && retries < maxRetries) { pino.debug(`Database not ready for startup cleanup, waiting... (attempt ${retries + 1}/${maxRetries})`); await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second retries++; } if (!workerDB.isReady()) { pino.error('Database not ready after waiting, skipping startup cleanup'); return; } try { const STUCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes const PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes for processing tasks const cutoffTime = new Date(Date.now() - STUCK_TIMEOUT_MS); const processingCutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS); // Find and reset stuck downloading tasks (pending -> downloading but never completed) const stuckDownloading = await PartnerLogTracker.find({ status: TRACKER_STATUS.DOWNLOADING, updatedAt: { $lt: cutoffTime } }); if (stuckDownloading.length > 0) { pino.info(`Found ${stuckDownloading.length} stuck downloading tasks on startup, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.DOWNLOADING, updatedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Download timeout - reset on worker startup', updatedAt: new Date() } } ); pino.info(`Reset ${stuckDownloading.length} stuck downloading tasks to failed status`); } // Find and reset stuck downloaded tasks that were never enqueued const stuckDownloaded = await PartnerLogTracker.find({ status: TRACKER_STATUS.DOWNLOADED, $or: [ { enqueuedAt: { $exists: false } }, { enqueuedAt: null } ], updatedAt: { $lt: cutoffTime } }); if (stuckDownloaded.length > 0) { pino.info(`Found ${stuckDownloaded.length} stuck downloaded tasks (never enqueued) on startup, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.DOWNLOADED, $or: [ { enqueuedAt: { $exists: false } }, { enqueuedAt: null } ], updatedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Downloaded but never enqueued - reset on worker startup', updatedAt: new Date() } } ); pino.info(`Reset ${stuckDownloaded.length} stuck downloaded tasks to failed status`); } // Find and reset stuck processing tasks (downloaded -> processing but never completed) const stuckProcessing = await PartnerLogTracker.find({ status: TRACKER_STATUS.PROCESSING, $or: [ { processingStartedAt: { $lt: processingCutoffTime } }, { processingStartedAt: { $exists: false } } // Corrupted processing state ] }); if (stuckProcessing.length > 0) { pino.info(`Found ${stuckProcessing.length} stuck processing tasks on startup, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PROCESSING, $or: [ { processingStartedAt: { $lt: processingCutoffTime } }, { processingStartedAt: { $exists: false } } ] }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Processing timeout - reset on worker startup', updatedAt: new Date() }, $inc: { retryCount: 1 } // Increment retry count for timeout failures } ); pino.info(`Reset ${stuckProcessing.length} stuck processing tasks to failed status`); } // Find and reset old pending tasks that might be stuck (optional - helps with very old pending tasks) const oldPendingCutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24 hours const oldPending = await PartnerLogTracker.find({ status: TRACKER_STATUS.PENDING, createdAt: { $lt: oldPendingCutoff }, retryCount: { $gte: env.PARTNER_MAX_RETRIES } }); if (oldPending.length > 0) { pino.info(`Found ${oldPending.length} old pending tasks with max retries exceeded, marking as failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PENDING, createdAt: { $lt: oldPendingCutoff }, retryCount: { $gte: env.PARTNER_MAX_RETRIES } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Max retries exceeded - marked failed on startup', updatedAt: new Date() } } ); pino.info(`Marked ${oldPending.length} old pending tasks as failed`); } } catch (error) { pino.error({ err: error }, 'Error during polling worker startup cleanup'); } } // Poll partner systems for available data at regular intervals const pollPartnerData = { schedule: isProd ? '*/15 * * * *' : '*/1 * * * *', // Every 15 minutes in prod, 1 minute in dev status: 0, name: 'partner_data_polling' // Direct polling worker, not a queued task }; const pollPartnerDataTask = cron.schedule(pollPartnerData.schedule, async () => { // Check and only proceed when is idle and the db connection is connected if (!workerDB.isReady() || pollPartnerData.status) return; try { pollPartnerData.status = 1; pino.info('Starting partner data polling...'); await pollAllPartnerSystems(); pino.info('Partner data polling completed'); } catch (error) { pino.error({ err: error }, 'Partner data polling worker error'); } finally { pollPartnerData.status = 0; } }, { scheduled: true, timezone: "Etc/UTC", name: pollPartnerData.name, runOnInit: true }); /** * Poll all partner systems for available data */ async function pollAllPartnerSystems() { // Get assignments with UPLOADED status that have partner integration const partnerAssignments = await JobAssign.populateWithPartnerInfo({ status: AssignStatus.UPLOADED }, true); // Filter for valid partner assignments: // - Must have user (aircraft/device) with partner info and partner // - Must have partnerAircraftId // - User must be DEVICE type (aircraft) const validPartnerAssignments = partnerAssignments.filter(a => a.user && a.user.partnerInfo && a.user.partnerInfo.partner && a.user.partnerInfo.partnerAircraftId && a.user.kind === UserTypes.DEVICE ); if (validPartnerAssignments.length === 0) { pino.debug('No uploaded partner assignments found'); return; } // Group assignments by partner and customer (parent ID) for efficient polling const pollingGroups = {}; for (const assignment of validPartnerAssignments) { const partnerCode = assignment.user.partnerInfo.partner.partnerCode; const partnerAircraftId = assignment.user.partnerInfo.partnerAircraftId; // Use parent ID as customer ID (applicator is parent of aircraft devices) const customerId = assignment.user.parent || assignment.user._id; const key = `${partnerCode}_${customerId}`; if (!pollingGroups[key]) { pollingGroups[key] = { partnerCode: partnerCode, customerId: customerId, aircraftIds: new Set(), assignments: [] }; } pollingGroups[key].aircraftIds.add(partnerAircraftId); pollingGroups[key].assignments.push({ ...assignment, partnerAircraftId }); } pino.debug(`Found ${Object.keys(pollingGroups).length} partner polling groups for uploaded assignments`); // Process each polling group for (const [groupKey, group] of Object.entries(pollingGroups)) { try { await pollPartnerGroup(group); } catch (error) { pino.error({ err: error, groupKey }, `Error polling partner group ${groupKey}`); } } } /** * Poll a specific partner group for available data * @param {object} group - Partner group with aircraft and assignments */ async function pollPartnerGroup(group) { const partnerSyncService = require('../services/partner_sync_service'); if (!partnerSyncService.isPartnerAvailable(group.partnerCode)) { pino.warn(`Partner service not available: ${group.partnerCode}`); return; } const partnerService = partnerSyncService.activeServices.get(group.partnerCode); if (!partnerService) { pino.error(`Partner service not found: ${group.partnerCode}`); return; } pino.info(`Polling ${group.partnerCode} for customer ${group.customerId}, aircraft: ${Array.from(group.aircraftIds).join(', ')}`); // Check each aircraft for available data for (const aircraftId of group.aircraftIds) { pino.info(`[LOOP] About to poll aircraft ${aircraftId}`); try { await pollAircraftData(partnerService, group, aircraftId); pino.info(`[LOOP] Completed polling aircraft ${aircraftId}`); } catch (error) { pino.error({ err: error, aircraftId }, `Error polling aircraft ${aircraftId}`); } } } /** * Periodic cleanup for stuck tasks during normal operation */ async function periodicCleanup() { if (!workerDB.isReady()) { return; } try { const CLEANUP_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes for periodic cleanup (shorter than startup) const PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes for processing tasks const cutoffTime = new Date(Date.now() - CLEANUP_TIMEOUT_MS); const processingCutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS); // Find and reset stuck downloading tasks const stuckDownloading = await PartnerLogTracker.find({ status: TRACKER_STATUS.DOWNLOADING, updatedAt: { $lt: cutoffTime } }); if (stuckDownloading.length > 0) { pino.info(`Periodic cleanup: Found ${stuckDownloading.length} stuck downloading tasks, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.DOWNLOADING, updatedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Download timeout - reset by periodic cleanup', updatedAt: new Date() } } ); } // Find and reset stuck downloaded tasks that were never enqueued const stuckDownloaded = await PartnerLogTracker.find({ status: TRACKER_STATUS.DOWNLOADED, $or: [ { enqueuedAt: { $exists: false } }, { enqueuedAt: null } ], updatedAt: { $lt: cutoffTime } }); if (stuckDownloaded.length > 0) { pino.info(`Periodic cleanup: Found ${stuckDownloaded.length} stuck downloaded tasks, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.DOWNLOADED, $or: [ { enqueuedAt: { $exists: false } }, { enqueuedAt: null } ], updatedAt: { $lt: cutoffTime } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Downloaded but never enqueued - reset by periodic cleanup', updatedAt: new Date() } } ); } // Find and reset stuck processing tasks (longer timeout since processing takes time) const stuckProcessing = await PartnerLogTracker.find({ status: TRACKER_STATUS.PROCESSING, $or: [ { processingStartedAt: { $lt: processingCutoffTime } }, { processingStartedAt: { $exists: false } } // Corrupted processing state ] }); if (stuckProcessing.length > 0) { pino.info(`Periodic cleanup: Found ${stuckProcessing.length} stuck processing tasks, resetting to failed`); await PartnerLogTracker.updateMany( { status: TRACKER_STATUS.PROCESSING, $or: [ { processingStartedAt: { $lt: processingCutoffTime } }, { processingStartedAt: { $exists: false } } ] }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Processing timeout - reset by periodic cleanup', updatedAt: new Date() }, $inc: { retryCount: 1 } // Increment retry count for timeout failures } ); } // Find tasks in unknown states (not in expected status values) const unknownStatusTasks = await PartnerLogTracker.find({ status: { $nin: [ TRACKER_STATUS.PENDING, TRACKER_STATUS.DOWNLOADING, TRACKER_STATUS.DOWNLOADED, TRACKER_STATUS.PROCESSING, TRACKER_STATUS.PROCESSED, TRACKER_STATUS.FAILED ] }, processed: { $ne: true } // Don't touch successfully processed tasks }); if (unknownStatusTasks.length > 0) { pino.warn(`Periodic cleanup: Found ${unknownStatusTasks.length} tasks with unknown status, resetting to failed`); await PartnerLogTracker.updateMany( { status: { $nin: [ TRACKER_STATUS.PENDING, TRACKER_STATUS.DOWNLOADING, TRACKER_STATUS.DOWNLOADED, TRACKER_STATUS.PROCESSING, TRACKER_STATUS.PROCESSED, TRACKER_STATUS.FAILED ] }, processed: { $ne: true } }, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: 'Unknown status - reset by periodic cleanup', updatedAt: new Date() } } ); } } catch (error) { pino.error({ err: error }, 'Error during periodic cleanup'); } } /** * Poll specific aircraft for available data * @param {object} partnerService - Partner service instance * @param {object} group - Partner group info * @param {string} aircraftId - Aircraft ID to poll */ async function pollAircraftData(partnerService, group, aircraftId) { pino.info(`[ENTRY] pollAircraftData called for aircraft ${aircraftId}`); const fs = require('fs').promises; const path = require('path'); const partnerConfig = require('../helpers/partner_config'); // Run periodic cleanup to handle any stuck tasks await periodicCleanup(); try { // Get available logs/data for this aircraft const availableLogs = await partnerService.getAircraftLogs(group.customerId, aircraftId); if (!availableLogs || availableLogs.length === 0) { pino.info(`No logs available for aircraft ${aircraftId}`); return; } pino.info(`Found ${availableLogs.length} logs for aircraft ${aircraftId}`); // Check which logs are new (not processed yet) const newLogs = await filterNewLogs(availableLogs, aircraftId, group.partnerCode); pino.info(`[DEBUG] After filtering: ${newLogs.length} new logs for aircraft ${aircraftId}`); if (newLogs.length === 0) { pino.info(`No new logs for aircraft ${aircraftId}`); return; } pino.info(`Found ${newLogs.length} new logs for aircraft ${aircraftId}`); // Get storage path from partner service (centralized path management) const storagePath = partnerService.getStoragePath(); // Ensure storage directory exists. TODO: move this logic when starting the server if needed try { await fs.mkdir(storagePath, { recursive: true }); } catch (mkdirError) { pino.error({ err: mkdirError, storagePath }, `Failed to create storage directory: ${storagePath}`); throw mkdirError; } // Download and store each log file before enqueueing for (const logInfo of newLogs) { let localFilePath = null; // Full path for file operations try { // Create or update tracker record atomically to prevent race conditions const filter = { logId: logInfo.id, partnerCode: group.partnerCode, aircraftId: aircraftId }; const update = { $setOnInsert: { logId: logInfo.id, partnerCode: group.partnerCode, aircraftId: aircraftId, customerId: group.customerId, logFileName: logInfo.logFileName, uploadedDate: logInfo.uploadedDate, // Stored as-is (string), timezone unknown from partner API status: TRACKER_STATUS.PENDING, processed: false, processedAt: null, retryCount: 0, enqueuedAt: null, createdAt: new Date(), updatedAt: new Date() } }; // Try to atomically create or get existing tracker let tracker = await PartnerLogTracker.findOneAndUpdate( filter, update, { upsert: true, new: true, setDefaultsOnInsert: true } ); let shouldDownloadAndEnqueue = false; let shouldOnlyEnqueue = false; let fileAlreadyExists = false; // Check if file already exists locally const expectedLocalPath = partnerService.resolveLogFilePath(logInfo.logFileName); try { await fs.access(expectedLocalPath); fileAlreadyExists = true; localFilePath = expectedLocalPath; } catch (accessError) { // File doesn't exist, will need to download fileAlreadyExists = false; } // Determine action based on tracker status and file existence if (tracker.status === TRACKER_STATUS.PENDING) { // New log or failed previous attempt if (fileAlreadyExists) { // File exists but tracker is pending - probably from previous run // Try to atomically claim for enqueueing const claimedTracker = await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.PENDING }, { $set: { status: TRACKER_STATUS.DOWNLOADED, savedLocalFile: logInfo.logFileName, // Store filename only (path agnostic) updatedAt: new Date() } }, { new: true } ); if (claimedTracker) { shouldOnlyEnqueue = true; pino.debug(`Claimed existing file for enqueueing: ${logInfo.logFileName}`); } else { pino.debug(`File exists but tracker was claimed by another instance: ${logInfo.logFileName}`); } } else { // File doesn't exist - try to claim for downloading const claimedTracker = await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.PENDING }, { $set: { status: TRACKER_STATUS.DOWNLOADING, updatedAt: new Date() } }, { new: true } ); if (claimedTracker) { shouldDownloadAndEnqueue = true; pino.debug(`Claimed for downloading: ${logInfo.logFileName}`); } else { pino.debug(`Log was claimed by another instance for downloading: ${logInfo.logFileName}`); } } } else if (tracker.status === TRACKER_STATUS.DOWNLOADED && !tracker.enqueuedAt) { // File was downloaded but not yet enqueued (stuck state) const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); // Reduced from 1 hour to 5 minutes if (tracker.updatedAt <= fiveMinutesAgo) { // Try to claim stuck task for enqueueing const claimedTracker = await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.DOWNLOADED, enqueuedAt: { $exists: false } }, { $set: { updatedAt: new Date() }, $inc: { retryCount: 1 } }, { new: true } ); if (claimedTracker) { shouldOnlyEnqueue = true; // Reconstruct full path using service method localFilePath = partnerService.resolveLogFilePath(claimedTracker.savedLocalFile || logInfo.logFileName); pino.debug(`Re-enqueueing stuck downloaded task: ${logInfo.logFileName} (retry ${claimedTracker.retryCount})`); } } } else if (tracker.status === TRACKER_STATUS.FAILED) { // Previous attempt failed - check if we should retry const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); // Reduced from 1 hour to 5 minutes if (tracker.retryCount < env.PARTNER_MAX_RETRIES && tracker.updatedAt <= fiveMinutesAgo) { // Try to claim for retry const claimedTracker = await PartnerLogTracker.findOneAndUpdate( { ...filter, status: TRACKER_STATUS.FAILED, retryCount: { $lt: env.PARTNER_MAX_RETRIES } }, { $set: { status: fileAlreadyExists ? TRACKER_STATUS.DOWNLOADED : TRACKER_STATUS.DOWNLOADING, updatedAt: new Date() }, $inc: { retryCount: 1 } }, { new: true } ); if (claimedTracker) { shouldDownloadAndEnqueue = !fileAlreadyExists; shouldOnlyEnqueue = fileAlreadyExists; pino.debug(`Retrying failed task: ${logInfo.logFileName} (retry ${claimedTracker.retryCount})`); } } else if (tracker.retryCount >= env.PARTNER_MAX_RETRIES) { // Task has exceeded max retries, don't re-enqueue to avoid DLQ pollution pino.warn({ logFileName: logInfo.logFileName, retryCount: tracker.retryCount, maxRetries: env.PARTNER_MAX_RETRIES, lastError: tracker.errorMessage, failedAt: tracker.updatedAt }, `Task ${logInfo.logFileName} has exceeded max retries, skipping`); } else { // Task failed recently, wait for cooldown period pino.debug(`Task ${logInfo.logFileName} failed recently, waiting for cooldown period`); } } else { // Log is in downloading, processing, or processed state - skip pino.debug(`Log ${logInfo.logFileName} is in ${tracker.status} state - skipping`); } // Download file and enqueue if needed, or just enqueue if file exists if (shouldDownloadAndEnqueue || shouldOnlyEnqueue) { try { let downloadedPath = localFilePath; if (shouldDownloadAndEnqueue) { // Download the file first try { pino.debug(`Downloading log file: ${logInfo.logFileName}`); downloadedPath = partnerService.resolveLogFilePath(logInfo.logFileName); // Download file from partner system using correct method const logData = await partnerService.getAircraftLogData(group.customerId, logInfo.id); if (!logData || !logData.logFile) { throw new Error(`No log data received for ${logInfo.logFileName}`); } // Save file to local storage - getAircraftLogData returns logFile as Buffer const fileContent = Buffer.isBuffer(logData.logFile) ? logData.logFile : Buffer.from(logData.logFile, 'base64'); await fs.writeFile(downloadedPath, fileContent); // Atomically update to downloaded status const updatedTracker = await PartnerLogTracker.findOneAndUpdate( filter, { $set: { status: TRACKER_STATUS.DOWNLOADED, savedLocalFile: logInfo.logFileName, // Store filename only (path agnostic) updatedAt: new Date() } }, { new: true } ); if (!updatedTracker) { throw new Error(`Failed to update tracker status to downloaded for ${logInfo.logFileName}`); } pino.info(`Downloaded and stored log file: ${logInfo.logFileName} -> ${downloadedPath}`); } catch (downloadError) { // Clean up partial file if download failed if (downloadedPath) { try { await fs.unlink(downloadedPath); pino.debug(`Cleaned up partial file: ${downloadedPath}`); } catch (cleanupError) { pino.warn({ err: cleanupError }, `Failed to clean up partial file: ${downloadedPath}`); } } // Mark as failed atomically await PartnerLogTracker.findOneAndUpdate( filter, { $set: { status: TRACKER_STATUS.FAILED, updatedAt: new Date(), errorMessage: downloadError.message } }, { new: true } ); pino.error({ err: downloadError, logFileName: logInfo.logFileName }, `Failed to download log file: ${logInfo.logFileName}`); continue; // Skip to next log } } else if (shouldOnlyEnqueue) { pino.debug(`Using existing local file for ${logInfo.logFileName}: ${downloadedPath}`); } // Enqueue for processing try { // Generate TaskTracker IDs const taskId = generateTaskId(PARTNER_QUEUE, { partnerCode: group.partnerCode, aircraftId: aircraftId, logId: logInfo.id }); const executionId = generateExecutionId(); // Deduplication check - prevent duplicate enqueues const recentTask = await TaskTracker.findOne({ taskId, status: { $in: [TaskTrackerStatus.QUEUED, TaskTrackerStatus.PROCESSING] }, enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) } // Last 5 minutes }).lean(); if (recentTask) { pino.debug({ taskId, existingExecutionId: recentTask.executionId, logFileName: logInfo.logFileName }, 'Task already queued/processing, skipping duplicate'); continue; // Skip enqueue } // Create TaskTracker entry (parallel tracking) await TaskTracker.create({ taskId, executionId, queueName: PARTNER_QUEUE, status: TaskTrackerStatus.QUEUED, metadata: { partnerCode: group.partnerCode, aircraftId: aircraftId, logId: logInfo.id, customerId: group.customerId, logFileName: logInfo.logFileName, localFilePath: downloadedPath } }); // Enqueue processing task with minimal essential fields only // - localFilePath: resolved from tracker.savedLocalFile or logFileName when processing // - assignments: queried from JobAssign collection when processing // - uploadedDate: retrieved from tracker record when processing await taskQHelper.addTaskASync(PartnerTasks.PROCESS_PARTNER_LOG, { // Essential identification fields (used for tracker lookup and file resolution) customerId: group.customerId, partnerCode: group.partnerCode, aircraftId: aircraftId, logId: logInfo.id, logFileName: logInfo.logFileName, // TaskTracker IDs for idempotency taskId, executionId }); // Mark as enqueued atomically await PartnerLogTracker.findOneAndUpdate( filter, { $set: { enqueuedAt: new Date(), updatedAt: new Date() } }, { new: true } ); pino.info(shouldDownloadAndEnqueue ? `Downloaded and queued log processing task for ${logInfo.logFileName}` : `Re-queued existing log file for processing: ${logInfo.logFileName}`); } catch (enqueueError) { // File is already downloaded successfully, don't delete it // Mark as failed to enqueue but keep downloaded status await PartnerLogTracker.findOneAndUpdate( filter, { $set: { status: TRACKER_STATUS.FAILED, errorMessage: `Enqueue failed: ${enqueueError.message}`, updatedAt: new Date() } }, { new: true } ); pino.error({ err: enqueueError, logFileName: logInfo.logFileName }, `Failed to enqueue log file (file preserved): ${logInfo.logFileName}`); } } catch (generalError) { // Mark as failed await PartnerLogTracker.findOneAndUpdate( filter, { $set: { status: TRACKER_STATUS.FAILED, updatedAt: new Date(), errorMessage: generalError.message } }, { new: true } ); pino.error({ err: generalError, logFileName: logInfo.logFileName }, `Unexpected error processing log file: ${logInfo.id}-${logInfo.logFileName}`); } } } catch (generalError) { pino.error({ err: generalError, logFileName: logInfo.logFileName }, `Unexpected error processing log file: ${logInfo.id}-${logInfo.logFileName}`); } } } catch (error) { pino.error({ err: error, aircraftId }, `Error polling aircraft data for ${aircraftId}`); } } /** * Filter logs to find new ones that haven't been processed or are currently being processed * @param {array} logs - Available logs from partner system * @param {string} aircraftId - Aircraft ID * @param {string} partnerCode - Partner code * @returns {array} New logs to process */ async function filterNewLogs(logs, aircraftId, partnerCode) { pino.info(`[ENTRY] filterNewLogs called: ${logs.length} logs for aircraft ${aircraftId}`); // Get only completed logs for this aircraft (let enqueueing logic handle duplicates) const completedLogs = await PartnerLogTracker.find({ aircraftId: aircraftId, partnerCode: partnerCode, processed: true }, 'logId logFileName').lean(); const completedLogIds = new Set(completedLogs.map(log => log.logId)); const completedLogNames = new Set(completedLogs.map(log => log.logFileName)); // Filter out only completed logs - let enqueueing handle other duplicates const newLogs = logs.filter(log => { return !completedLogIds.has(log.id) && !completedLogNames.has(log.logFileName); }); // Log some debug info if (completedLogs.length > 0) { pino.debug(`Found ${completedLogs.length} completed logs for aircraft ${aircraftId}`); } return newLogs; } // Start the polling worker if not in test mode pino.info(`NODE_ENV: ${env.NODE_ENV}, Starting worker: ${env.NODE_ENV !== 'test'}`); pino.info(`LOG_LEVEL: ${process.env.LOG_LEVEL}, LOG_MODULES: ${process.env.LOG_MODULES}`); if (env.NODE_ENV !== 'test') { // Run startup cleanup after a short delay to ensure DB is connected setTimeout(async () => { pino.info('Running startup cleanup for polling worker...'); try { await startupCleanup(); } catch (err) { pino.error({ err }, 'Startup cleanup failed'); } }, 2000); // 2 second delay pollPartnerDataTask.start(); pino.info('Partner data polling worker started'); } else { pino.info('Partner data polling worker NOT started (NODE_ENV is test)'); } module.exports = { pollPartnerDataTask, pollPartnerData, pollAllPartnerSystems, pollPartnerGroup, pollAircraftData, startupCleanup, periodicCleanup };