agmission/Development/server/workers/partner_data_polling_worker.js

927 lines
34 KiB
JavaScript

'use strict';
/**
* Processing flows of Partner Data Polling Worker:
* ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐
* │ Cron Job │───▶│ Poll Partners │───▶│ Download Logs │───▶│ Enqueue Tasks │
* │ (15min/1min) │ │ for New Data │ │ Locally │ │ for Processing │
* └─────────────────┘ └──────────────────┘ └─────────────────┘ └──────────────────┘
*/
const cron = require('node-cron'),
logger = require('../helpers/logger'),
pino = logger.child('partner_data_polling_worker'),
env = require('../helpers/env.js'),
isProd = env.PRODUCTION,
{ DBConnection } = require('../helpers/db/connect'),
{ JobAssign } = require('../model'),
{ AssignStatus, UserTypes, PartnerTasks, PartnerLogTrackerStatus } = require('../helpers/constants'),
{ PartnerLogTracker } = require('../model'),
TaskTracker = require('../model/task_tracker'),
{ TaskTrackerStatus } = require('../model/task_tracker'),
{ generateTaskId, generateExecutionId } = require('../services/task_id_generator');
// Partner Log Tracker Status Constants (using centralized constants)
const TRACKER_STATUS = PartnerLogTrackerStatus;
// Partner queue name (auto-prefixed with 'dev_' in development by env.js)
const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER;
// Initialize database connection
const workerDB = new DBConnection('Partner Data Polling Worker');
// Initialize task queue for enqueueing partner log processing tasks
const taskQHelper = require('../helpers/job_queue').getInstance();
taskQHelper.start();
// Register fatal handlers
const path = require('path');
const { registerFatalHandlers } = require('../helpers/process_fatal_handlers');
registerFatalHandlers(process, {
env,
debug: (msg) => pino.info(msg),
kindPrefix: 'partner_data_polling_worker',
reportFilePath: path.join(__dirname, 'partner_data_polling_worker.rlog'),
});
// Initialize the database connection
workerDB.initialize({ setupExitHandlers: false });
// Startup cleanup for stuck polling tasks
async function startupCleanup() {
// Wait for database to be ready with retry logic
let retries = 0;
const maxRetries = 10;
while (!workerDB.isReady() && retries < maxRetries) {
pino.debug(`Database not ready for startup cleanup, waiting... (attempt ${retries + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second
retries++;
}
if (!workerDB.isReady()) {
pino.error('Database not ready after waiting, skipping startup cleanup');
return;
}
try {
const STUCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes
const PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes for processing tasks
const cutoffTime = new Date(Date.now() - STUCK_TIMEOUT_MS);
const processingCutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS);
// Find and reset stuck downloading tasks (pending -> downloading but never completed)
const stuckDownloading = await PartnerLogTracker.find({
status: TRACKER_STATUS.DOWNLOADING,
updatedAt: { $lt: cutoffTime }
});
if (stuckDownloading.length > 0) {
pino.info(`Found ${stuckDownloading.length} stuck downloading tasks on startup, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.DOWNLOADING,
updatedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Download timeout - reset on worker startup',
updatedAt: new Date()
}
}
);
pino.info(`Reset ${stuckDownloading.length} stuck downloading tasks to failed status`);
}
// Find and reset stuck downloaded tasks that were never enqueued
const stuckDownloaded = await PartnerLogTracker.find({
status: TRACKER_STATUS.DOWNLOADED,
$or: [
{ enqueuedAt: { $exists: false } },
{ enqueuedAt: null }
],
updatedAt: { $lt: cutoffTime }
});
if (stuckDownloaded.length > 0) {
pino.info(`Found ${stuckDownloaded.length} stuck downloaded tasks (never enqueued) on startup, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.DOWNLOADED,
$or: [
{ enqueuedAt: { $exists: false } },
{ enqueuedAt: null }
],
updatedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Downloaded but never enqueued - reset on worker startup',
updatedAt: new Date()
}
}
);
pino.info(`Reset ${stuckDownloaded.length} stuck downloaded tasks to failed status`);
}
// Find and reset stuck processing tasks (downloaded -> processing but never completed)
const stuckProcessing = await PartnerLogTracker.find({
status: TRACKER_STATUS.PROCESSING,
$or: [
{ processingStartedAt: { $lt: processingCutoffTime } },
{ processingStartedAt: { $exists: false } } // Corrupted processing state
]
});
if (stuckProcessing.length > 0) {
pino.info(`Found ${stuckProcessing.length} stuck processing tasks on startup, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PROCESSING,
$or: [
{ processingStartedAt: { $lt: processingCutoffTime } },
{ processingStartedAt: { $exists: false } }
]
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Processing timeout - reset on worker startup',
updatedAt: new Date()
},
$inc: { retryCount: 1 } // Increment retry count for timeout failures
}
);
pino.info(`Reset ${stuckProcessing.length} stuck processing tasks to failed status`);
}
// Find and reset old pending tasks that might be stuck (optional - helps with very old pending tasks)
const oldPendingCutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24 hours
const oldPending = await PartnerLogTracker.find({
status: TRACKER_STATUS.PENDING,
createdAt: { $lt: oldPendingCutoff },
retryCount: { $gte: env.PARTNER_MAX_RETRIES }
});
if (oldPending.length > 0) {
pino.info(`Found ${oldPending.length} old pending tasks with max retries exceeded, marking as failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PENDING,
createdAt: { $lt: oldPendingCutoff },
retryCount: { $gte: env.PARTNER_MAX_RETRIES }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Max retries exceeded - marked failed on startup',
updatedAt: new Date()
}
}
);
pino.info(`Marked ${oldPending.length} old pending tasks as failed`);
}
} catch (error) {
pino.error({ err: error }, 'Error during polling worker startup cleanup');
}
}
// Poll partner systems for available data at regular intervals
const pollPartnerData = {
schedule: isProd ? '*/15 * * * *' : '*/1 * * * *', // Every 15 minutes in prod, 1 minute in dev
status: 0,
name: 'partner_data_polling' // Direct polling worker, not a queued task
};
const pollPartnerDataTask = cron.schedule(pollPartnerData.schedule, async () => {
// Check and only proceed when is idle and the db connection is connected
if (!workerDB.isReady() || pollPartnerData.status)
return;
try {
pollPartnerData.status = 1;
pino.info('Starting partner data polling...');
await pollAllPartnerSystems();
pino.info('Partner data polling completed');
} catch (error) {
pino.error({ err: error }, 'Partner data polling worker error');
} finally {
pollPartnerData.status = 0;
}
}, {
scheduled: true,
timezone: "Etc/UTC",
name: pollPartnerData.name,
runOnInit: true
});
/**
* Poll all partner systems for available data
*/
async function pollAllPartnerSystems() {
// Get assignments with UPLOADED status that have partner integration
const partnerAssignments = await JobAssign.populateWithPartnerInfo({ status: AssignStatus.UPLOADED }, true);
// Filter for valid partner assignments:
// - Must have user (aircraft/device) with partner info and partner
// - Must have partnerAircraftId
// - User must be DEVICE type (aircraft)
const validPartnerAssignments = partnerAssignments.filter(a =>
a.user &&
a.user.partnerInfo &&
a.user.partnerInfo.partner &&
a.user.partnerInfo.partnerAircraftId &&
a.user.kind === UserTypes.DEVICE
);
if (validPartnerAssignments.length === 0) {
pino.debug('No uploaded partner assignments found');
return;
}
// Group assignments by partner and customer (parent ID) for efficient polling
const pollingGroups = {};
for (const assignment of validPartnerAssignments) {
const partnerCode = assignment.user.partnerInfo.partner.partnerCode;
const partnerAircraftId = assignment.user.partnerInfo.partnerAircraftId;
// Use parent ID as customer ID (applicator is parent of aircraft devices)
const customerId = assignment.user.parent || assignment.user._id;
const key = `${partnerCode}_${customerId}`;
if (!pollingGroups[key]) {
pollingGroups[key] = {
partnerCode: partnerCode,
customerId: customerId,
aircraftIds: new Set(),
assignments: []
};
}
pollingGroups[key].aircraftIds.add(partnerAircraftId);
pollingGroups[key].assignments.push({ ...assignment, partnerAircraftId });
}
pino.debug(`Found ${Object.keys(pollingGroups).length} partner polling groups for uploaded assignments`);
// Process each polling group
for (const [groupKey, group] of Object.entries(pollingGroups)) {
try {
await pollPartnerGroup(group);
} catch (error) {
pino.error({ err: error, groupKey }, `Error polling partner group ${groupKey}`);
}
}
}
/**
* Poll a specific partner group for available data
* @param {object} group - Partner group with aircraft and assignments
*/
async function pollPartnerGroup(group) {
const partnerSyncService = require('../services/partner_sync_service');
if (!partnerSyncService.isPartnerAvailable(group.partnerCode)) {
pino.warn(`Partner service not available: ${group.partnerCode}`);
return;
}
const partnerService = partnerSyncService.activeServices.get(group.partnerCode);
if (!partnerService) {
pino.error(`Partner service not found: ${group.partnerCode}`);
return;
}
pino.info(`Polling ${group.partnerCode} for customer ${group.customerId}, aircraft: ${Array.from(group.aircraftIds).join(', ')}`);
// Check each aircraft for available data
for (const aircraftId of group.aircraftIds) {
pino.info(`[LOOP] About to poll aircraft ${aircraftId}`);
try {
await pollAircraftData(partnerService, group, aircraftId);
pino.info(`[LOOP] Completed polling aircraft ${aircraftId}`);
} catch (error) {
pino.error({ err: error, aircraftId }, `Error polling aircraft ${aircraftId}`);
}
}
}
/**
* Periodic cleanup for stuck tasks during normal operation
*/
async function periodicCleanup() {
if (!workerDB.isReady()) {
return;
}
try {
const CLEANUP_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes for periodic cleanup (shorter than startup)
const PROCESSING_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes for processing tasks
const cutoffTime = new Date(Date.now() - CLEANUP_TIMEOUT_MS);
const processingCutoffTime = new Date(Date.now() - PROCESSING_TIMEOUT_MS);
// Find and reset stuck downloading tasks
const stuckDownloading = await PartnerLogTracker.find({
status: TRACKER_STATUS.DOWNLOADING,
updatedAt: { $lt: cutoffTime }
});
if (stuckDownloading.length > 0) {
pino.info(`Periodic cleanup: Found ${stuckDownloading.length} stuck downloading tasks, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.DOWNLOADING,
updatedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Download timeout - reset by periodic cleanup',
updatedAt: new Date()
}
}
);
}
// Find and reset stuck downloaded tasks that were never enqueued
const stuckDownloaded = await PartnerLogTracker.find({
status: TRACKER_STATUS.DOWNLOADED,
$or: [
{ enqueuedAt: { $exists: false } },
{ enqueuedAt: null }
],
updatedAt: { $lt: cutoffTime }
});
if (stuckDownloaded.length > 0) {
pino.info(`Periodic cleanup: Found ${stuckDownloaded.length} stuck downloaded tasks, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.DOWNLOADED,
$or: [
{ enqueuedAt: { $exists: false } },
{ enqueuedAt: null }
],
updatedAt: { $lt: cutoffTime }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Downloaded but never enqueued - reset by periodic cleanup',
updatedAt: new Date()
}
}
);
}
// Find and reset stuck processing tasks (longer timeout since processing takes time)
const stuckProcessing = await PartnerLogTracker.find({
status: TRACKER_STATUS.PROCESSING,
$or: [
{ processingStartedAt: { $lt: processingCutoffTime } },
{ processingStartedAt: { $exists: false } } // Corrupted processing state
]
});
if (stuckProcessing.length > 0) {
pino.info(`Periodic cleanup: Found ${stuckProcessing.length} stuck processing tasks, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: TRACKER_STATUS.PROCESSING,
$or: [
{ processingStartedAt: { $lt: processingCutoffTime } },
{ processingStartedAt: { $exists: false } }
]
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Processing timeout - reset by periodic cleanup',
updatedAt: new Date()
},
$inc: { retryCount: 1 } // Increment retry count for timeout failures
}
);
}
// Find tasks in unknown states (not in expected status values)
const unknownStatusTasks = await PartnerLogTracker.find({
status: {
$nin: [
TRACKER_STATUS.PENDING,
TRACKER_STATUS.DOWNLOADING,
TRACKER_STATUS.DOWNLOADED,
TRACKER_STATUS.PROCESSING,
TRACKER_STATUS.PROCESSED,
TRACKER_STATUS.FAILED
]
},
processed: { $ne: true } // Don't touch successfully processed tasks
});
if (unknownStatusTasks.length > 0) {
pino.warn(`Periodic cleanup: Found ${unknownStatusTasks.length} tasks with unknown status, resetting to failed`);
await PartnerLogTracker.updateMany(
{
status: {
$nin: [
TRACKER_STATUS.PENDING,
TRACKER_STATUS.DOWNLOADING,
TRACKER_STATUS.DOWNLOADED,
TRACKER_STATUS.PROCESSING,
TRACKER_STATUS.PROCESSED,
TRACKER_STATUS.FAILED
]
},
processed: { $ne: true }
},
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: 'Unknown status - reset by periodic cleanup',
updatedAt: new Date()
}
}
);
}
} catch (error) {
pino.error({ err: error }, 'Error during periodic cleanup');
}
}
/**
* Poll specific aircraft for available data
* @param {object} partnerService - Partner service instance
* @param {object} group - Partner group info
* @param {string} aircraftId - Aircraft ID to poll
*/
async function pollAircraftData(partnerService, group, aircraftId) {
pino.info(`[ENTRY] pollAircraftData called for aircraft ${aircraftId}`);
const fs = require('fs').promises;
const path = require('path');
const partnerConfig = require('../helpers/partner_config');
// Run periodic cleanup to handle any stuck tasks
await periodicCleanup();
try {
// Get available logs/data for this aircraft
const availableLogs = await partnerService.getAircraftLogs(group.customerId, aircraftId);
if (!availableLogs || availableLogs.length === 0) {
pino.info(`No logs available for aircraft ${aircraftId}`);
return;
}
pino.info(`Found ${availableLogs.length} logs for aircraft ${aircraftId}`);
// Check which logs are new (not processed yet)
const newLogs = await filterNewLogs(availableLogs, aircraftId, group.partnerCode);
pino.info(`[DEBUG] After filtering: ${newLogs.length} new logs for aircraft ${aircraftId}`);
if (newLogs.length === 0) {
pino.info(`No new logs for aircraft ${aircraftId}`);
return;
}
pino.info(`Found ${newLogs.length} new logs for aircraft ${aircraftId}`);
// Get storage path from partner service (centralized path management)
const storagePath = partnerService.getStoragePath();
// Ensure storage directory exists. TODO: move this logic when starting the server if needed
try {
await fs.mkdir(storagePath, { recursive: true });
} catch (mkdirError) {
pino.error({ err: mkdirError, storagePath }, `Failed to create storage directory: ${storagePath}`);
throw mkdirError;
}
// Download and store each log file before enqueueing
for (const logInfo of newLogs) {
let localFilePath = null; // Full path for file operations
try {
// Create or update tracker record atomically to prevent race conditions
const filter = { logId: logInfo.id, partnerCode: group.partnerCode, aircraftId: aircraftId };
const update = {
$setOnInsert: {
logId: logInfo.id,
partnerCode: group.partnerCode,
aircraftId: aircraftId,
customerId: group.customerId,
logFileName: logInfo.logFileName,
uploadedDate: logInfo.uploadedDate, // Stored as-is (string), timezone unknown from partner API
status: TRACKER_STATUS.PENDING,
processed: false,
processedAt: null,
retryCount: 0,
enqueuedAt: null,
createdAt: new Date(),
updatedAt: new Date()
}
};
// Try to atomically create or get existing tracker
let tracker = await PartnerLogTracker.findOneAndUpdate(
filter,
update,
{ upsert: true, new: true, setDefaultsOnInsert: true }
);
let shouldDownloadAndEnqueue = false;
let shouldOnlyEnqueue = false;
let fileAlreadyExists = false;
// Check if file already exists locally
const expectedLocalPath = partnerService.resolveLogFilePath(logInfo.logFileName);
try {
await fs.access(expectedLocalPath);
fileAlreadyExists = true;
localFilePath = expectedLocalPath;
} catch (accessError) {
// File doesn't exist, will need to download
fileAlreadyExists = false;
}
// Determine action based on tracker status and file existence
if (tracker.status === TRACKER_STATUS.PENDING) {
// New log or failed previous attempt
if (fileAlreadyExists) {
// File exists but tracker is pending - probably from previous run
// Try to atomically claim for enqueueing
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
{ ...filter, status: TRACKER_STATUS.PENDING },
{
$set: {
status: TRACKER_STATUS.DOWNLOADED,
savedLocalFile: logInfo.logFileName, // Store filename only (path agnostic)
updatedAt: new Date()
}
},
{ new: true }
);
if (claimedTracker) {
shouldOnlyEnqueue = true;
pino.debug(`Claimed existing file for enqueueing: ${logInfo.logFileName}`);
} else {
pino.debug(`File exists but tracker was claimed by another instance: ${logInfo.logFileName}`);
}
} else {
// File doesn't exist - try to claim for downloading
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
{ ...filter, status: TRACKER_STATUS.PENDING },
{
$set: {
status: TRACKER_STATUS.DOWNLOADING,
updatedAt: new Date()
}
},
{ new: true }
);
if (claimedTracker) {
shouldDownloadAndEnqueue = true;
pino.debug(`Claimed for downloading: ${logInfo.logFileName}`);
} else {
pino.debug(`Log was claimed by another instance for downloading: ${logInfo.logFileName}`);
}
}
} else if (tracker.status === TRACKER_STATUS.DOWNLOADED && !tracker.enqueuedAt) {
// File was downloaded but not yet enqueued (stuck state)
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); // Reduced from 1 hour to 5 minutes
if (tracker.updatedAt <= fiveMinutesAgo) {
// Try to claim stuck task for enqueueing
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
{ ...filter, status: TRACKER_STATUS.DOWNLOADED, enqueuedAt: { $exists: false } },
{
$set: { updatedAt: new Date() },
$inc: { retryCount: 1 }
},
{ new: true }
);
if (claimedTracker) {
shouldOnlyEnqueue = true;
// Reconstruct full path using service method
localFilePath = partnerService.resolveLogFilePath(claimedTracker.savedLocalFile || logInfo.logFileName);
pino.debug(`Re-enqueueing stuck downloaded task: ${logInfo.logFileName} (retry ${claimedTracker.retryCount})`);
}
}
} else if (tracker.status === TRACKER_STATUS.FAILED) {
// Previous attempt failed - check if we should retry
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); // Reduced from 1 hour to 5 minutes
if (tracker.retryCount < env.PARTNER_MAX_RETRIES && tracker.updatedAt <= fiveMinutesAgo) {
// Try to claim for retry
const claimedTracker = await PartnerLogTracker.findOneAndUpdate(
{ ...filter, status: TRACKER_STATUS.FAILED, retryCount: { $lt: env.PARTNER_MAX_RETRIES } },
{
$set: {
status: fileAlreadyExists ? TRACKER_STATUS.DOWNLOADED : TRACKER_STATUS.DOWNLOADING,
updatedAt: new Date()
},
$inc: { retryCount: 1 }
},
{ new: true }
);
if (claimedTracker) {
shouldDownloadAndEnqueue = !fileAlreadyExists;
shouldOnlyEnqueue = fileAlreadyExists;
pino.debug(`Retrying failed task: ${logInfo.logFileName} (retry ${claimedTracker.retryCount})`);
}
} else if (tracker.retryCount >= env.PARTNER_MAX_RETRIES) {
// Task has exceeded max retries, don't re-enqueue to avoid DLQ pollution
pino.warn({
logFileName: logInfo.logFileName,
retryCount: tracker.retryCount,
maxRetries: env.PARTNER_MAX_RETRIES,
lastError: tracker.errorMessage,
failedAt: tracker.updatedAt
}, `Task ${logInfo.logFileName} has exceeded max retries, skipping`);
} else {
// Task failed recently, wait for cooldown period
pino.debug(`Task ${logInfo.logFileName} failed recently, waiting for cooldown period`);
}
} else {
// Log is in downloading, processing, or processed state - skip
pino.debug(`Log ${logInfo.logFileName} is in ${tracker.status} state - skipping`);
}
// Download file and enqueue if needed, or just enqueue if file exists
if (shouldDownloadAndEnqueue || shouldOnlyEnqueue) {
try {
let downloadedPath = localFilePath;
if (shouldDownloadAndEnqueue) {
// Download the file first
try {
pino.debug(`Downloading log file: ${logInfo.logFileName}`);
downloadedPath = partnerService.resolveLogFilePath(logInfo.logFileName);
// Download file from partner system using correct method
const logData = await partnerService.getAircraftLogData(group.customerId, logInfo.id);
if (!logData || !logData.logFile) {
throw new Error(`No log data received for ${logInfo.logFileName}`);
}
// Save file to local storage - getAircraftLogData returns logFile as Buffer
const fileContent = Buffer.isBuffer(logData.logFile) ? logData.logFile : Buffer.from(logData.logFile, 'base64');
await fs.writeFile(downloadedPath, fileContent);
// Atomically update to downloaded status
const updatedTracker = await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
status: TRACKER_STATUS.DOWNLOADED,
savedLocalFile: logInfo.logFileName, // Store filename only (path agnostic)
updatedAt: new Date()
}
},
{ new: true }
);
if (!updatedTracker) {
throw new Error(`Failed to update tracker status to downloaded for ${logInfo.logFileName}`);
}
pino.info(`Downloaded and stored log file: ${logInfo.logFileName} -> ${downloadedPath}`);
} catch (downloadError) {
// Clean up partial file if download failed
if (downloadedPath) {
try {
await fs.unlink(downloadedPath);
pino.debug(`Cleaned up partial file: ${downloadedPath}`);
} catch (cleanupError) {
pino.warn({ err: cleanupError }, `Failed to clean up partial file: ${downloadedPath}`);
}
}
// Mark as failed atomically
await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
status: TRACKER_STATUS.FAILED,
updatedAt: new Date(),
errorMessage: downloadError.message
}
},
{ new: true }
);
pino.error({ err: downloadError, logFileName: logInfo.logFileName }, `Failed to download log file: ${logInfo.logFileName}`);
continue; // Skip to next log
}
} else if (shouldOnlyEnqueue) {
pino.debug(`Using existing local file for ${logInfo.logFileName}: ${downloadedPath}`);
}
// Enqueue for processing
try {
// Generate TaskTracker IDs
const taskId = generateTaskId(PARTNER_QUEUE, {
partnerCode: group.partnerCode,
aircraftId: aircraftId,
logId: logInfo.id
});
const executionId = generateExecutionId();
// Deduplication check - prevent duplicate enqueues
const recentTask = await TaskTracker.findOne({
taskId,
status: { $in: [TaskTrackerStatus.QUEUED, TaskTrackerStatus.PROCESSING] },
enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) } // Last 5 minutes
}).lean();
if (recentTask) {
pino.debug({ taskId, existingExecutionId: recentTask.executionId, logFileName: logInfo.logFileName },
'Task already queued/processing, skipping duplicate');
continue; // Skip enqueue
}
// Create TaskTracker entry (parallel tracking)
await TaskTracker.create({
taskId,
executionId,
queueName: PARTNER_QUEUE,
status: TaskTrackerStatus.QUEUED,
metadata: {
partnerCode: group.partnerCode,
aircraftId: aircraftId,
logId: logInfo.id,
customerId: group.customerId,
logFileName: logInfo.logFileName,
localFilePath: downloadedPath
}
});
// Enqueue processing task with minimal essential fields only
// - localFilePath: resolved from tracker.savedLocalFile or logFileName when processing
// - assignments: queried from JobAssign collection when processing
// - uploadedDate: retrieved from tracker record when processing
await taskQHelper.addTaskASync(PartnerTasks.PROCESS_PARTNER_LOG, {
// Essential identification fields (used for tracker lookup and file resolution)
customerId: group.customerId,
partnerCode: group.partnerCode,
aircraftId: aircraftId,
logId: logInfo.id,
logFileName: logInfo.logFileName,
// TaskTracker IDs for idempotency
taskId,
executionId
});
// Mark as enqueued atomically
await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
enqueuedAt: new Date(),
updatedAt: new Date()
}
},
{ new: true }
);
pino.info(shouldDownloadAndEnqueue ?
`Downloaded and queued log processing task for ${logInfo.logFileName}` :
`Re-queued existing log file for processing: ${logInfo.logFileName}`);
} catch (enqueueError) {
// File is already downloaded successfully, don't delete it
// Mark as failed to enqueue but keep downloaded status
await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
status: TRACKER_STATUS.FAILED,
errorMessage: `Enqueue failed: ${enqueueError.message}`,
updatedAt: new Date()
}
},
{ new: true }
); pino.error({ err: enqueueError, logFileName: logInfo.logFileName }, `Failed to enqueue log file (file preserved): ${logInfo.logFileName}`);
}
} catch (generalError) {
// Mark as failed
await PartnerLogTracker.findOneAndUpdate(
filter,
{
$set: {
status: TRACKER_STATUS.FAILED,
updatedAt: new Date(),
errorMessage: generalError.message
}
},
{ new: true }
);
pino.error({ err: generalError, logFileName: logInfo.logFileName }, `Unexpected error processing log file: ${logInfo.id}-${logInfo.logFileName}`);
}
}
} catch (generalError) {
pino.error({ err: generalError, logFileName: logInfo.logFileName }, `Unexpected error processing log file: ${logInfo.id}-${logInfo.logFileName}`);
}
}
} catch (error) {
pino.error({ err: error, aircraftId }, `Error polling aircraft data for ${aircraftId}`);
}
}
/**
* Filter logs to find new ones that haven't been processed or are currently being processed
* @param {array} logs - Available logs from partner system
* @param {string} aircraftId - Aircraft ID
* @param {string} partnerCode - Partner code
* @returns {array} New logs to process
*/
async function filterNewLogs(logs, aircraftId, partnerCode) {
pino.info(`[ENTRY] filterNewLogs called: ${logs.length} logs for aircraft ${aircraftId}`);
// Get only completed logs for this aircraft (let enqueueing logic handle duplicates)
const completedLogs = await PartnerLogTracker.find({
aircraftId: aircraftId,
partnerCode: partnerCode,
processed: true
}, 'logId logFileName').lean();
const completedLogIds = new Set(completedLogs.map(log => log.logId));
const completedLogNames = new Set(completedLogs.map(log => log.logFileName));
// Filter out only completed logs - let enqueueing handle other duplicates
const newLogs = logs.filter(log => {
return !completedLogIds.has(log.id) && !completedLogNames.has(log.logFileName);
});
// Log some debug info
if (completedLogs.length > 0) {
pino.debug(`Found ${completedLogs.length} completed logs for aircraft ${aircraftId}`);
}
return newLogs;
}
// Start the polling worker if not in test mode
pino.info(`NODE_ENV: ${env.NODE_ENV}, Starting worker: ${env.NODE_ENV !== 'test'}`);
pino.info(`LOG_LEVEL: ${process.env.LOG_LEVEL}, LOG_MODULES: ${process.env.LOG_MODULES}`);
if (env.NODE_ENV !== 'test') {
// Run startup cleanup after a short delay to ensure DB is connected
setTimeout(async () => {
pino.info('Running startup cleanup for polling worker...');
try {
await startupCleanup();
} catch (err) {
pino.error({ err }, 'Startup cleanup failed');
}
}, 2000); // 2 second delay
pollPartnerDataTask.start();
pino.info('Partner data polling worker started');
} else {
pino.info('Partner data polling worker NOT started (NODE_ENV is test)');
}
module.exports = {
pollPartnerDataTask,
pollPartnerData,
pollAllPartnerSystems,
pollPartnerGroup,
pollAircraftData,
startupCleanup,
periodicCleanup
};