agmission/Development/server/helpers/satloc_application_processor.js

676 lines
26 KiB
JavaScript

/**
* SatLoc Application Processor - Handles log grouping and application creation logic
* Similar to Job Worker pattern but designed for SatLoc log files
*/
const debug = require('debug')('agm:satloc-processor');
const { enhancedRunInTransaction, runWithSessionOrTransaction } = require('./mongo_enhanced');
const JobAssign = require('../model/job_assign');
const Application = require('../model/application');
const ApplicationFile = require('../model/application_file');
const ApplicationDetail = require('../model/application_detail');
const { SatLocLogParser } = require('./satloc_log_parser');
const { AppStatus, AppProStatus, AssignStatus, UserTypes, FCTypes, RateUnits } = require('./constants');
const { JobUpdateOp } = require('./job_constants');
const { AppInputError } = require('./app_error');
const env = require('./env');
const path = require('path');
const fs = require('fs').promises;
const _ = require('lodash');
const MAX_TIME_DIFF = 120;
class SatLocApplicationProcessor {
constructor(options = {}) {
this.options = {
batchSize: options.batchSize || 1000,
enableRetryLogic: options.enableRetryLogic !== false,
...options
};
}
/**
* Process a SatLoc log file and create/update Application records with proper grouping
* @param {Object} logFileData - Log file information
* @param {Object} contextData - Context data (jobId, userId, etc.)
* @returns {Promise<Object>} Processing results
*/
async processLogFile(logFileData, contextData = {}) {
debug(`Processing SatLoc log file: ${logFileData.filePath}`);
let result = {
success: false,
matchedJobs: [],
numberOfJobGroups: 0,
applications: [],
applicationFiles: [],
appFileId: null, // For backward compatibility - first application file ID
totalDetails: 0,
error: null
};
try {
// 1. Use SatLocLogParser directly with integrated calculations AND grouping
const parserOptions = {
skipUnknownRecords: true,
validateChecksums: true
};
// Only add maxPositionsPerJob if it's set in ENV
if (env.SATLOC_MAX_POSITIONS_PER_JOB !== undefined) {
parserOptions.maxPositionsPerJob = env.SATLOC_MAX_POSITIONS_PER_JOB;
}
const parser = new SatLocLogParser(parserOptions);
const parseResults = await parser.parseFile(logFileData.filePath, contextData);
if (!parseResults.success) {
throw new Error(`Failed to parse log file: ${parseResults.error}`);
}
// 2. Use pre-grouped job groups from parsing (no duplication)
const jobGroups = parseResults.jobGroups;
result.numberOfJobGroups = Object.keys(jobGroups).length;
result.statistics = parseResults.statistics;
debug(`Found ${result.numberOfJobGroups} job groups in log file`);
const assignedSet = _.groupBy(contextData.taskInfo.assignments, it => it.jobName || it.job._id);
let matchedAssign = null;
// 3. Process each job group separately by jobId and aircraftId
for (const [satlocJobId, applicationDetails] of Object.entries(jobGroups)) {
// Check if we have any assignments before processing
const assignments = contextData.taskInfo?.assignments;
if (!assignments || assignments.length === 0) {
debug(`No assignments available for job group: ${satlocJobId} => skip processing.`);
continue;
}
// Make it configurable via env param to toggle off later when needed
if (env.SATLOC_1ST_ASSIGNMENT_ALWAYS_MATCH) {
matchedAssign = assignments[0]; // TODO: To be removed later. For testing, assume single assignment
} else {
// Only process if the current job group has a matching, (.job (the or ._id) or jobName), assignment
if (!(matchedAssign = assignedSet[satlocJobId])) {
debug(`No assignment found for job group: ${satlocJobId} => skip processing !`);
continue;
}
}
const jobGroupResult = await enhancedRunInTransaction(async (session) => {
// Create consolidated job group context
const jobGroupContext = {
taskInfo: contextData.taskInfo,
matchedAssign,
applicationDetails,
metadata: { ...contextData.metadata, ...parseResults.metadata } || {},
geoInfo: { utmZone: parseResults.utmZone, boundingBox: parseResults.boundingBox },
parseResults
};
return await this.processJobGroup(jobGroupContext, session);
});
// Collect results from job group processing
if (jobGroupResult) {
if (jobGroupResult.application) {
result.applications.push(jobGroupResult.application);
result.matchedJobs.push({
assignId: matchedAssign._id, // Job assignment reference
jobId: jobGroupResult.application.jobId,
confidence: 1.0 // Full confidence for direct assignment match
});
}
if (jobGroupResult.applicationFile) {
result.applicationFiles.push(jobGroupResult.applicationFile);
// Set first appFileId for backward compatibility
if (!result.appFileId) {
result.appFileId = jobGroupResult.applicationFile._id;
}
}
result.totalDetails += jobGroupResult.detailsCount || 0;
}
}
result.success = true;
} catch (error) {
debug(`Error processing log file: ${error.message}`);
result.error = error.message;
throw error;
}
// Multiple jobs: return new format
return {
...result,
multiJob: result.numberOfJobGroups > 1,
};
}
/**
* Process a single job group with calculations and database operations
* @param {Object} groupContext - Consolidated context containing all job group data
* @param {Object} session - MongoDB session
* @returns {Promise<Object>} Processing results for this job group
*/
async processJobGroup(groupContext, session) {
const { matchedAssign, applicationDetails, taskInfo, metadata, geoInfo, parseResults } = groupContext;
debug(`Processing job group: ${matchedAssign?.job?._id} with ${applicationDetails.length} details`);
// Check the assignment still exists and valid for processing against the db
const freshAssign = await JobAssign.findById(matchedAssign._id, {}, { lean: true }).session(session);
if (!freshAssign) {
debug(`No valid assignment found for job group: ${matchedAssign.job._id} => skip processing.`);
return;
}
const finalJobId = matchedAssign.job._id;
// Extract user ID from assignment - this is the pilot/operator who will perform the application
const assignedUserId = matchedAssign.user || freshAssign.user;
// Create context for this job group
const appContext = {
jobId: finalJobId,
fileName: taskInfo.logFileName,
fileSize: metadata.fileSize,
userId: assignedUserId // Use the assigned user (pilot) from the job assignment
};
// Create new Application for this job group (1:1 mapping per job group)
const application = await this.createApplication(appContext, session);
// Process each application detail in this job group (with needed calculations: UTM coords, spray stats, etc.).
const processedDetails = [];
// Initialize aggregation variables for this job group
let totalSprayTime = 0, totalFlightTime = 0, totalSprayed = 0, totalSprayMat = 0;
let totalSprayLength = 0;
let spraySegments = [];
let startDateTime = null, endDateTime = null;
// Tracking variables for calculations
let prevTime = -999, prevSprTime = -999, prevUTM_X, prevUTM_Y;
let prevSprayStat = -1; // Track previous spray status for transition detection
let currentSegment = null;
// Load UTM conversion library
const utmMod = await import('@mickeyjohn/geodesy/utm.js');
const LatLonUTM = utmMod.LatLon;
const refUTMZone = geoInfo.utmZone;
debug(`Processing ${applicationDetails.length} application details for job group ${finalJobId}`);
debug(`Using UTM zone: ${refUTMZone.zoneNumber}${refUTMZone.hemisphere} for coordinate conversion`);
// Create ApplicationFile with calculated data, and ref for application details for this job group
// Normalize metadata according to DATA_FORMAT_NOTES.md
const { DataTypes, MatTypes } = require('./constants');
const normalizedMeta = {
// Normalized fields (common format for both AgNav and SatLoc)
type: DataTypes.SATLOC, // Data source type
matType: metadata.fcType === FCTypes.DRY ? MatTypes.DRY : MatTypes.WET, // Material type: 'wet' or 'dry'
operator: metadata.pilotName || null, // Pilot name (from System Setup record Type 100)
fcName: metadata.fcName || null, // Flow controller name (from Controller Type record Type 46)
// All original metadata fields (preserved completely)
...metadata,
// Additional processing metadata
recordCount: applicationDetails.length,
parseStats: parseResults.statistics,
version: parseResults.headerInfo?.version,
utmZone: parseResults.utmZone,
bbox: parseResults.boundingBox
};
const appFile = await this.createApplicationFile(application._id, appContext.fileName, taskInfo.uploadedDate,
normalizedMeta,
session
);
// Process each detail with calculations
for (let i = 0; i < applicationDetails.length; i++) {
const record = applicationDetails[i];
// Convert lat/lon to UTM coordinates using reference zone
if (record.lat && record.lon && !isNaN(record.lat) && !isNaN(record.lon) && refUTMZone.zoneNumber) {
try {
const latLonObj = new LatLonUTM(record.lat, record.lon);
const utm = latLonObj.toUtm(refUTMZone.zoneNumber, refUTMZone.hemisphere);
record.utmX = utm.easting;
record.utmY = utm.northing;
} catch (error) {
debug(`UTM conversion error for record ${i}: ${error.message}`);
record.utmX = 0;
record.utmY = 0;
}
} else {
debug(`Skipping record ${i} due to invalid coordinates`);
continue; // Skip records without valid coordinates
}
// Track overall time range for this job group
if (!startDateTime || record.gpsTime < startDateTime) {
startDateTime = record.gpsTime;
}
if (!endDateTime || record.gpsTime > endDateTime) {
endDateTime = record.gpsTime;
}
// Calculate total flight time
if (prevTime !== -999 && prevTime !== record.gpsTime) {
let timeDif = record.gpsTime - prevTime;
// Only count reasonable time differences
if (timeDif > 0 && timeDif <= MAX_TIME_DIFF) {
totalFlightTime += timeDif;
}
}
prevTime = record.gpsTime;
// Detect spray segment transitions based on sprayStat changes
const curSprayStat = record.sprayStat || 0;
// Handle first record: if spray starts ON, create initial segment
if (prevSprayStat === -1) {
if (curSprayStat > 0) {
env && !env.PRODUCTION && debug(`Job group ${finalJobId}: Log starts with spray ON at time ${record.gpsTime}, creating initial segment`);
currentSegment = {
startTime: record.gpsTime,
endTime: record.gpsTime,
startLat: record.lat,
startLon: record.lon,
endLat: record.lat,
endLon: record.lon,
distance: 0,
area: 0,
points: []
};
}
} else {
// Check for spray transitions: OFF to ON (start segment) or ON to OFF (end segment)
// Spray turning ON (previous OFF, current ON) - start new segment
if (prevSprayStat === 0 && curSprayStat > 0) {
env && !env.PRODUCTION && debug(`Job group ${finalJobId}: Spray ON detected at time ${record.gpsTime}, starting new segment`);
currentSegment = {
startTime: record.gpsTime,
endTime: record.gpsTime,
startLat: record.lat,
startLon: record.lon,
endLat: record.lat,
endLon: record.lon,
distance: 0,
area: 0,
points: []
};
}
// Spray turning OFF (previous ON, current OFF) - close current segment
if (prevSprayStat > 0 && curSprayStat === 0) {
env && !env.PRODUCTION && debug(`Job group ${finalJobId}: Spray OFF detected at time ${record.gpsTime}, closing segment`);
if (currentSegment) {
spraySegments.push(currentSegment);
currentSegment = null;
}
}
}
// Calculate spray time and area when spray is active
if (curSprayStat > 0) {
// ALWAYS add spray ON points to current segment
if (currentSegment) {
currentSegment.endTime = record.gpsTime;
currentSegment.endLat = record.lat;
currentSegment.endLon = record.lon;
// Add EVERY spray ON point to the segment
currentSegment.points.push({
lat: record.lat,
lon: record.lon,
gpsTime: record.gpsTime,
sprayStat: record.sprayStat
});
}
// Calculate spray time and area calculations
if (prevUTM_X !== undefined && prevUTM_Y !== undefined && prevSprayStat > 0) {
const timeDif = record.gpsTime - prevSprTime;
if (timeDif > 0 && timeDif <= MAX_TIME_DIFF) {
totalSprayTime += timeDif;
}
const distance = Math.hypot(record.utmX - prevUTM_X, record.utmY - prevUTM_Y);
if (distance > 0 && record.swath > 0) {
const swathArea = distance * record.swath;
totalSprayed += swathArea;
totalSprayLength += distance;
// Track spray material usage
const appRate = record.lhaApp || record.lhaReq || 0;
if (appRate > 0) {
totalSprayMat += (swathArea * appRate) / 10000; // Convert Liters/m² to Liters/ha
}
// Update segment distance and area
if (currentSegment) {
currentSegment.distance += distance;
currentSegment.area += swathArea;
}
}
}
prevSprTime = record.gpsTime;
// Update UTM position for next iteration (only when spray is ON)
prevUTM_X = record.utmX;
prevUTM_Y = record.utmY;
}
// Update previous spray status for next iteration (MOVED OUTSIDE spray condition)
// This ensures ALL transitions (ON→OFF, OFF→ON) are properly tracked
prevSprayStat = curSprayStat;
// Create processed detail with calculations (will be linked to file after ApplicationFile creation)
const processedDetail = {
...record,
fileId: appFile._id
};
processedDetails.push(processedDetail);
}
// Convert units (like job worker)
totalSprayed = totalSprayed * 1E-4; // Convert m² to hectares
debug(`Job group ${finalJobId} calculated totals - Flight: ${totalFlightTime} s, Spray: ${totalSprayTime} s, Area: ${totalSprayed} Ha, Material: ${totalSprayMat} L/Kg, Segments: ${spraySegments.length}`);
const sprayMatUnit = metadata.fcType === FCTypes.LIQUID ? RateUnits.LIT_PER_HA : RateUnits.KG_PER_HA;
// Save ApplicationDetails in batches for efficiency
// await this.saveApplicationDetails(processedDetails, session);
// ==============================================================
// Prepare job group statistics for Application update
const jobGroupStats = {
totalSprayTime,
totalFlightTime,
totalSprayed,
totalSprayMat,
totalSprayMatUnit: sprayMatUnit,
totalSprayLength,
startDateTime,
endDateTime,
spraySegments
};
// Batch insert ApplicationDetails for this job group
if (processedDetails.length > 0) {
await this.saveApplicationDetails(processedDetails, session);
}
// Update Application with calculated totals for this job group
await ApplicationFile.updateOne({ _id: appFile._id },
{
$set: {
totalSprLength: jobGroupStats.totalSprayLength || 0,
totalSprayTime: jobGroupStats.totalSprayTime || 0,
totalFlightTime: jobGroupStats.totalFlightTime || 0,
totalSprayed: jobGroupStats.totalSprayed || 0,
totalSprayMat: jobGroupStats.totalSprayMat || 0,
totalSprayMatUnit: jobGroupStats.totalSprayMatUnit || sprayMatUnit,
}
},
{ session }
);
// Update Application with calculated totals for this job group
await Application.updateOne({ _id: application._id },
{
$set: {
status: AppStatus.DONE,
proStatus: processedDetails.length > 0 ? AppProStatus.WITH_DATA : AppProStatus.NO_DATA,
totalSprayTime: jobGroupStats.totalSprayTime || 0,
totalFlightTime: jobGroupStats.totalFlightTime || 0,
totalSprayed: jobGroupStats.totalSprayed || 0,
totalSprayMat: jobGroupStats.totalSprayMat || 0,
totalSprayMatUnit: jobGroupStats.totalSprayMatUnit || sprayMatUnit,
totalSprLength: jobGroupStats.totalSprayLength || 0,
appRate: 0, // Average application rate (L/ha or Kg/ha). To be calculated if needed
startDateTime: jobGroupStats.startDateTime ? new Date(jobGroupStats.startDateTime * 1000).toISOString() : null,
endDateTime: jobGroupStats.endDateTime ? new Date(jobGroupStats.endDateTime * 1000).toISOString() : null,
updateDate: new Date()
}
},
{ session }
);
debug(`Completed job group ${finalJobId}: ${processedDetails.length} details processed, ${jobGroupStats.totalSprayed.toFixed(2)}ha sprayed, ${jobGroupStats.spraySegments.length} segments`);
// Finalize last segment for this job group
if (currentSegment) {
const lastRecord = applicationDetails[applicationDetails.length - 1];
// Only finalize if segment has meaningful data (consecutive spray ON points)
if (currentSegment.points.length > 0) {
spraySegments.push(currentSegment);
debug(`Job group ${finalJobId}: Finalized last spray segment with ${currentSegment.points.length} consecutive spray ON points`);
} else {
debug(`Job group ${finalJobId}: Discarded empty last segment - no spray ON points`);
}
}
// Return results for this job group including application and applicationFile for tracker update
return {
success: true,
application, // Application document for this job group
applicationFile: appFile, // ApplicationFile document for this job group
detailsCount: processedDetails.length,
skipped: false
};
}
/**
* Find matching assignment where
* @param {string} aircraftId - Aircraft ID to match against user's partnerInfo.partnerAircraftId, if needed
* @param {string} satlocJobId - SatLoc job ID for additional matching, using now
* @param {Object} session - MongoDB session
* @returns {Promise<Object>} Assignment with populated job or null
*/
async findJobAssignment(aircraftId, satlocJobId, session = null) {
let recentAssignment = null;
try {
debug(`Looking for assignment with partnerAircraftId: ${aircraftId}, satlocJobId: ${satlocJobId}`);
if (!aircraftId || !satlocJobId) AppInputError.throw();
const validAssignments = await JobAssign.find({ status: { $in: [AssignStatus.UPLOADED] }, jobName: satlocJobId }, { lean: true })
.populate({ path: 'job', select: '_id status' })
.populate({ path: 'partnerInfo.partner', model: UserTypes.PARTNER, select: 'partnerAircraftId' })
.session(session);
if (validAssignments.length > 0) {
recentAssignment = validAssignments[0];
debug(`Found recent assignment: ${recentAssignment.job} (${recentAssignment.job.status}) for user: ${recentAssignment.userId.username}`);
if (recentAssignment.userId.partnerInfo?.partnerAircraftId !== aircraftId) {
debug(`Warning: partnerAircraftId mismatch: expected ${aircraftId}, found ${recentAssignment.userId.partnerInfo?.partnerAircraftId}`);
}
}
return recentAssignment;
} catch (error) {
debug(`Error finding assignment by partner aircraft ID: ${error.message}`);
return null;
}
}
/**
* Handle retry logic for existing files - resets data and reprocesses
* @param {string} filePath - Path to the log file
* @param {Object} contextData - Context data
* @returns {Promise<Object>} Processing results
*/
async retryLogFile(filePath, contextData = {}) {
debug(`Retrying SatLoc log file: ${filePath}`);
try {
/* NOTE: Use markedDelete flag instead of deleting data for the case of accesive old application details
=> Cleanup worker will handle actual data deletion later
*/
await enhancedRunInTransaction(async (session) => {
// Find existing ApplicationFile by the log file name
const logFileName = contextData.taskInfo?.logFileName || path.basename(filePath);
await ApplicationFile.updateMany({ fileName: logFileName }, { $set: { markedDelete: true } }).session(session);
});
// Now process the file normally
return await this.processLogFile({ filePath }, contextData);
} catch (error) {
debug(`Error retrying log file: ${error.message}`);
throw error;
}
}
/**
* Find existing Application or create new one - simplified for 1:1 file-to-application mapping
* Each log file gets its own Application record, similar to Job Worker pattern
* @param {Object} ctxData - Context data (jobId, fileName, etc.)
* @param {Object} session - MongoDB session
* @returns {Promise<Object>} Application document
*/
async createApplication(ctxData, session) {
// Always create a new Application for each log file
const newApplication = new Application({
jobId: ctxData.jobId,
fileName: ctxData.fileName,
savedFilename: ctxData.fileName,
fileSize: ctxData.fileSize,
updateOp: JobUpdateOp.DATA_ONLY,
status: AppStatus.CREATED,
createdDate: new Date(),
updateDate: new Date(),
errorMsg: null,
cid: null,
byUser: ctxData.userId || null, // User ID from job assignment (vehicle/partner aircraft)
byImport: true // Mark as import since this comes from partner sync, not direct user upload, needs review later
});
const savedApplication = await newApplication.save({ session });
debug(`Created new application ${savedApplication._id} for SatLoc log file: ${ctxData.fileName}`);
return savedApplication;
}
/**
* Create ApplicationFile record for individual log file
* @param {ObjectId} applicationId - Application ID
* @param {string} fileName - Log file name
* @param {string} uploadedDate - Uploaded date as string (timezone unknown, stored as-is from partner API)
* @param {Object} logMetadata - Normalized metadata object with fields:
* - type: 'satloc' | 'agnav' - Data source type
* - matType: 'wet' | 'dry' - Material type (normalized from fcType)
* - operator: string - Pilot name (from pilotName)
* - fcName: string - Flow controller name
* - aircraftId: string - Aircraft identifier (SatLoc-specific)
* - jobId: string - Job identifier (SatLoc-specific)
* - fcType: FCTypes.LIQUID | FCTypes.DRY - Original flow controller type
* - recordCount: number - Number of application details
* - parseStats: Object - Parser statistics
* - version: string - Log file version
* - utmZone: Object - UTM zone information
* - bbox: Array - Bounding box [minLon, minLat, maxLon, maxLat]
* @param {Object} session - MongoDB session
* @returns {Promise<Object>} ApplicationFile document
*/
async createApplicationFile(applicationId, fileName, uploadedDate, logMetadata, session) {
debug(`Creating ApplicationFile for application ${applicationId}, file: ${fileName}`);
const applicationFile = new ApplicationFile({
appId: applicationId,
name: fileName,
agn: (uploadedDate && this.generateAGNFromISO(uploadedDate)),
meta: logMetadata || {}
});
const savedFile = await applicationFile.save({ session });
debug(`Created application file ${savedFile._id} for log ${fileName}`);
return savedFile;
}
/**
* Generate AGN (AgNav identifier) from timestamp
*/
generateAGN(timestamp) {
if (!timestamp) return '0000000000';
const date = new Date(timestamp);
const year = date.getFullYear().toString().substr(-2);
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hour = String(date.getHours()).padStart(2, '0');
const minute = String(date.getMinutes()).padStart(2, '0');
return `${year}${month}${day}${hour}${minute}`;
}
/**
* Generate AGN (AgNav File Group Name) from ISO 8601 timestamp
* @param {string} isoString - ISO 8601 timestamp, i.e. "2023-10-05T14:30:00Z"
* @returns {string} - AGN string
*/
generateAGNFromISO(isoString) {
if (!isoString) return '0000000000';
const date = new Date(isoString);
if (isNaN(date.getTime())) return '0000000000';
const year = date.getFullYear().toString().substr(-2);
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hour = String(date.getHours()).padStart(2, '0');
const minute = String(date.getMinutes()).padStart(2, '0');
return `${year}${month}${day}${hour}${minute}`;
}
/**
* Batch insert application details with proper error handling
*/
async saveApplicationDetails(applicationDetails, session) {
if (!applicationDetails || applicationDetails.length === 0) {
return { inserted: 0 };
}
const batchSize = this.options.batchSize;
let totalInserted = 0;
for (let i = 0; i < applicationDetails.length; i += batchSize) {
const batch = applicationDetails.slice(i, i + batchSize);
try {
const result = await ApplicationDetail.insertMany(batch, {
ordered: false,
lean: true,
session
});
totalInserted += result.length;
env && !env.PRODUCTION && debug(`Inserted batch ${Math.floor(i / batchSize) + 1}: ${result.length} records`);
} catch (error) {
debug(`Error inserting batch: ${error.message}`);
// Continue with next batch on error
}
}
return { inserted: totalInserted };
}
/**
* Get processing statistics
*/
getStatistics() {
return {
batchSize: this.options.batchSize,
enableRetryLogic: this.options.enableRetryLogic
};
}
}
module.exports = SatLocApplicationProcessor;