'use strict'; const async = require('async'), amqp = require('amqplib/callback_api'), dbConn = require('../helpers/db/connect.js')(), path = require('path'), debug = require('debug')('agm:worker'), glob = require('glob'), fs = require('fs-extra'), csv = require('fast-csv'), unzip = require('extract-zip'), moment = require('moment'), Redis = require('ioredis'), turf = require('@turf/turf'), Job = require('../model/job'), App = require('../model/application'), AppFile = require('../model/application_file'), AppDetail = require('../model/application_detail'), User = require('../model/user'), Pilot = require('../model/pilot'), ObjectId = require('mongodb').ObjectId, utils = require('../helpers/utils'), FILE = require('../helpers/file_constants'), fileHelper = require('../helpers/file_helper'), fileAgNav = require('../helpers/file_storage'), fileKML = require('../helpers/file_kml'), fileShp = require('../helpers/file_shp'), fileSatLog = require('../helpers/file_satlog'), jobUtil = require('../helpers/job_util'), geoUtil = require('../helpers/geo_util'), subUtil = require('../helpers/subscription_util'), { DataUtil, WorkRecord } = require('../helpers/work_record'), { JobUpdateOp, JobStatus } = require('../helpers/job_constants'), CVCST = require('../helpers/convert_constants'), { Errors, RecTypes, UserTypes, AppStatus, AppProStatus, Fields, DEL_APP_IDS, DEFAULT_LANG } = require('../helpers/constants'), { AppError, AppMembershipError, AppAuthError } = require('../helpers/app_error.js'), { SubFields } = require('../model/subscription.js'), env = require('../helpers/env'), errorUtil = require('../helpers/error'); require('../model/crop'); process.setMaxListeners(0); errorUtil.registerUnCaughtProcessErrorsHandler(process, path.join(__dirname, 'job_worker.rlog')); const WkrStatus = Object.freeze({ BROKE: 'broke', CANCELLED: 'cancelled', SKIPPED: 'skipped', DONE: 'done' }); const redis = new Redis({ password: env.REDIS_PWD, showFriendlyErrorStack: true }); const globOps = { nonull: false, nocase: true, dot: true }; const sprayItemsOps = { nonull: false, nocase: true, dot: true, ignore: ['**/{(q[0-9]*.t*|n[0-9]*.t*),n[0-9]*spr+(on|off).+(dbf|prj|shp|shx)}'] }; const uploadPath = env.UPLOAD_DIR; const unzipPath = env.UNZIP_DIR; const jobQueue = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs'; let amqpConn = null; let mqClosed = false; let jobMainFile, jobUnZipPath, isKmlOrKmz = false; dbConn.on('disconnected', () => { debug('-> MongoDB lost connection -> Going to exit...'); process.exit(1); }); dbConn.on('error', (err) => { debug('-> MongoDB connection error -> Going to exit..., Err: ', err); process.exit(1); }); dbConn.on('connected', () => { if (!amqpConn) start(); }); // Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect function start() { const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR || 'agmuser', password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 0, // Will depends on server settings or whichever lower value, 0 will use server default 60s frameMax: 0 }; amqp.connect(conOps, function (err, conn) { if (err) { return process.exit(); } conn.on("error", function (err) { debug(err); return process.exit(); }); conn.on("close", function () { mqClosed = true; return process.exit(); }); amqpConn = conn; process.once('SIGINT', () => { closeOnErr('force_close_SIGINT'); }); whenConnected(); }); } function whenConnected() { debug("[AMQP] Job Worker connected"); mqClosed = false; startWorker(); } // A worker that acks messages only if processed succesfully function startWorker() { debug("Worker started !"); if (!amqpConn) { debug("Ampq connection is null. Please check !"); return; } amqpConn.createChannel(function (err, ch) { if (closeOnErr(err)) return; ch.on("error", function (err) { debug("[AMQP] channel error", err.message); }); ch.on("close", function () { debug("[AMQP] channel closed"); }); ch.prefetch(1); // This tells RabbitMQ not to give more than one message to a worker at a time ch.assertQueue(jobQueue, { durable: true }, (err) => { if (closeOnErr(err)) return; ch.consume(jobQueue, processMsg, { noAck: false }); debug("Worker has started consuming messages..."); }); function processMsg(msg) { if (!msg) return; // Ignore empty message let impMsg; try { impMsg = JSON.parse(msg.content); } catch (err) { debug(err); } if (!impMsg) return; const msgJobId = impMsg.jobId; work(impMsg, (msg.fields && msg.fields.redelivered), (err, result) => { if (mqClosed) { debug("MQ conn already closed -- Skipping..."); return; } let confirmOk = true; try { if (!err) { // Acknowledgement must be sent on the same channel the delivery it is for was received on. if (result.appError || result.wasCancelled) ch.reject(msg, false); else ch.ack(msg); } } catch (rErr) { confirmOk = false; debug('[Work] channel ack/reject error', rErr); closeOnErr(rErr); } finally { const logDone = () => { debug("[x] Done for: ", msg.content.toString()); }; if (result.wasCancelled) { const cleanJobFn = async (impMsg) => { const app = await App.findOne({ _id: ObjectId(impMsg.appId) }); if (!app) return; // The case of import file to create a job => remove that job fully if (app.byImport && undefined === msgJobId && app.jobId) { const job = await Job.findById(app.jobId); if (job) await job.removeFull(); } else { // Only need to remove the application await jobUtil.deleteAppById(result.appId); } await redis.srem(DEL_APP_IDS, result.appId); }; cleanJobFn(impMsg).catch(err => { if (err) debug(err); }).finally(() => { debug(`Task with appId:${impMsg.appId} was cancelled!`); }); } else if (result.wasSkipped) { // Do nothing yet. Normally for marked as deleted applications. They going to be cleaned by a Cleanup Worker logDone(); } else { _cleanup(confirmOk, result.appError); logDone(); } } }); } }); } /** * Clean up files or folders depends on the task processing conditions * @param {boolean} taskAcked Whether the task was acknowledged (done either w/ success or error, and ready to take the next task). * @param {boolean} hasAppError Where there was any application cope errors happened (crashes, unhandled promise exception occured) */ function _cleanup(taskAcked = true, hasAppError = false) { let removeList = []; if (taskAcked && hasAppError) removeList = isKmlOrKmz ? [jobMainFile, path.join(jobUnZipPath, jobMainFile)] : [jobMainFile, jobUnZipPath]; else removeList = isKmlOrKmz ? [path.join(jobUnZipPath, jobMainFile)] : [jobUnZipPath]; if (!utils.isEmptyArray(removeList)) fileHelper.removeFiles(removeList, err => { if (err) debug(err) }); } /** * Calculate offset in hours given GPS time in seconds and time from an Agnav data filename * * @param {any} gpsTimeSeconds * @param {any} fileNameAgNavDT AgNav datetime used in file name ([last digit of the full year][MM][dd][HHmm]) * @returns offset in hours */ function computeGPSTimeOffsetLocalTime(gpsTimeSeconds, fileNameAgNavDT) { const gpsDuration = moment.duration(new Date(1000 * gpsTimeSeconds).toISOString().substring(11, 16)); const localDuration = moment.duration(utils.dateTimePartsFromAgNav(fileNameAgNavDT).time); const diff = localDuration.subtract(gpsDuration); return Number(diff.asHours().toFixed(2)); } function computeStartEndDate(startGPSTimeSecs, endGPSTimeSecs, firstFileAgNavDT, endFileAgNavDT) { const offsetHrs = computeGPSTimeOffsetLocalTime(startGPSTimeSecs, firstFileAgNavDT); const sdate = utils.dateTimePartsFromAgNav(firstFileAgNavDT, 2); const startDate = utils.toUTCDateTime(startGPSTimeSecs, sdate.date); const edate = utils.dateTimePartsFromAgNav(endFileAgNavDT, 2); const endDate = utils.toUTCDateTime(endGPSTimeSecs, edate.date); if (offsetHrs) { startDate.add(offsetHrs, 'hours'); endDate.add(offsetHrs, 'hours'); } if (endDate < startDate) endDate.add(1, 'days'); return { start: startDate, end: endDate }; } function removeAppData(appId, cb) { AppFile.find({ appId: appId }, (err, appFiles) => { if (err) { return cb(err); } async.eachSeries(appFiles, (appFile, cb) => { appFile.remove(cb); }, cb); }); } function wasCancelled(appId, cb) { let ret; canAppProceed(appId) .then(res => ret = res) .catch(err => ret = err) .finally(() => cb && cb(ret)); } async function canAppProceed(appId) { const wasCancelled = await redis.sismember(DEL_APP_IDS, appId); if (wasCancelled) return WkrStatus.CANCELLED; const app = await App.findById(appId, { lean: true }); if (!app || app[Fields.MARKED_DELETE] === true) return WkrStatus.SKIPPED; else if (app && app.status === AppStatus.WAS_CANCELLED) return WkrStatus.CANCELLED; return null; } /** * Import data or job items of the job into db. * * STEPS: * 1. Unzip the arvchive into unzip folder * 2. Create a new Application with status 1 * 3. Read area files and override the job's items (from NO1, PRJ) * 4. Read data files and import to App-Detail collections * 5. When finished importing, update status as done (3 - done, 0 - Error), * * Notes: Now import job items(spray, xcl, waypoint) found within .zip, (kml/kmz): buffer, placemark as well * @param {any} impMsg the queued import message taken from the queue * @param {boolean} redelivered whether the message was redelivered (failed to processed before) * @param {any} cb callback method */ function work(impMsg, redelivered, cb) { debug("Got msg ", impMsg); let areaTypeAndFiles = [], areaFiles = []; let appl = null, appData, hasData = false; let _job, proStatus = AppProStatus.ERROR, hasAppError = false; jobMainFile = path.join(uploadPath, impMsg.file.name); isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(jobMainFile); jobUnZipPath = isKmlOrKmz ? uploadPath : path.join(unzipPath, impMsg.file.name.replace(path.extname(impMsg.file.name), '')); jobUnZipPath = path.join(jobUnZipPath, '/'); async.series([ function (callback) { return wasCancelled(ObjectId(impMsg.appId), callback); }, function (callback) { App.findById(ObjectId(impMsg.appId), (err, data) => { if (err) return callback(err); appl = data; if (!appl) return callback(AppError.create(WkrStatus.BROKE)); else if (appl.status === AppStatus.DONE) return callback(AppError.create(WkrStatus.DONE)); else if (appl[Fields.MARKED_DELETE] === true) return callback(AppError.create(WkrStatus.SKIPPED)); callback(); }); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (impMsg.jobId !== undefined) { Job.findById(impMsg.jobId) .populate({ path: 'crop', select: '_id name color', skipInvalidIds: true }) .then(job => { if (job) { _job = job; return callback(); } else { AppError.throw(Errors.JOB_NOT_FOUND); } }) .catch(err => callback(err)); } else callback(); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { // Handle redelivered (in case of crashed during processing the task) message to avoid duplicated data if (redelivered) { if (!utils.isBlank(appl.errorMsg) && errorUtil.isAppErrorMessage(appl.errorMsg)) { return callback(AppError.create(WkrStatus.SKIPPED)); } else if (appl.byImport && !appl.jobId) { return appl.status === AppStatus.CREATED ? callback() : callback(AppError.create()); } else { if (impMsg.updateOp !== JobUpdateOp.XCLS) removeAppData(appl._id, callback); // remove previous fragmented imported data else callback(); // re-process the task } } else callback(); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (appl.status !== AppStatus.IN_PROGRESS) { appl.status = AppStatus.IN_PROGRESS; return appl.save(callback); } else callback(); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (isKmlOrKmz) { areaFiles.push(jobMainFile); return callback(); } else { async.series([ function (callback) { unzip(jobMainFile, { dir: jobUnZipPath }, (err) => { if (err) return callback(AppError.create(Errors.CORRUPTED_ZIP)); callback(); }); }, function (callback) { if (impMsg.updateOp === JobUpdateOp.DATA_ONLY) return callback(); // Find the files which might contain job items glob(path.join(jobUnZipPath, "**/*.{job,kml,kmz,no1,prj,shp,dbf,agn,dsp,xcl,vfr}"), sprayItemsOps, (err, files) => { if (err) return callback(err); areaFiles = files; if (areaFiles.length === 0) return callback(impMsg.updateOp === JobUpdateOp.OVERWRITE ? null : AppError.create(Errors.ITEMS_NOT_FOUND)); callback(); }); }, ], callback); } }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (impMsg.updateOp !== JobUpdateOp.DATA_ONLY) areaTypeAndFiles = fileHelper.areasFromList(jobUnZipPath, areaFiles); callback(); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (impMsg.updateOp > JobUpdateOp.DATA_ONLY) { if (areaTypeAndFiles.length) { updateJobItems(_job || {}, impMsg, jobUnZipPath, areaTypeAndFiles, impMsg.uid, (err, data) => { if (err) return callback(err); _job = data.job; // Why _job passed by ref does not work any longer ???? if (data.updated) { proStatus = AppProStatus.ERROR + 10; if (!appl.jobId) appl.jobId = impMsg.jobId; if (data.dup) { if (appl.warnMsg) // Update numbers of duplicated areas appl.warnMsg.set('dup', data.dup); else appl.warnMsg = { dup: data.dup }; } } callback(); }); } else callback(!_job ? AppError.create(Errors.ITEMS_NOT_FOUND) : null); } else callback(); }, function (callback) { return wasCancelled(appl._id, callback); }, function (callback) { if (isKmlOrKmz || impMsg.updateOp === JobUpdateOp.XCLS) return callback(); // Not import data for KML/KMZ, XCL files importData(jobUnZipPath, appl._id, _job.toObject(), (err, data) => { if (err) return callback(err); appData = data; hasData = !!(data); callback(); }); } ], function (err) { proStatus += hasData ? AppProStatus.WITH_DATA : AppProStatus.NO_DATA; if (err) { if ((hasAppError = (err instanceof AppError))) { if (WkrStatus.BROKE === err.message) return cb(null, { appError: true }); else if (WkrStatus.DONE === err.message) return cb(null, { appError: false }); else if (WkrStatus.SKIPPED === err.message) return cb(null, { wasSkipped: true }); if (WkrStatus.CANCELLED === err.message) { return cb(null, { appError: hasAppError, wasCancelled: true, appId: impMsg.appId }); } } if (appl !== null) { appl.errorMsg = hasAppError ? err.message : utils.trimStrTo(err.toString(), 100); appl.status = AppStatus.ERROR; } else { return cb(null, { appError: hasAppError }); } } else { if (appl !== null) { appl.status = AppStatus.DONE; if (hasData) { if (utils.isNumber(appData.appRate)) appl.appRate = utils.roundTo(appData.appRate); if (utils.isNumber(appData.totalSprayed)) appl.totalSprayed = appData.totalSprayed * 1E-4; // update and convert from square meters to ha if (utils.isNumber(appData.totalSprLength)) appl.totalSprLength = appData.totalSprLength; // meters if (utils.isNumber(appData.totalTurnTime)) appl.totalTurnTime = appData.totalTurnTime; if (utils.isNumber(appData.totalSprayTime)) appl.totalSprayTime = appData.totalSprayTime; if (utils.isNumber(appData.totalFlightTime)) appl.totalFlightTime = appData.totalFlightTime; if (utils.isNumber(appData.totalSprayMat)) { appl.totalSprayMat = appData.totalSprayMat; appl.totalSprayMatUnit = appData.totalSprayMatUnit; } appl.startDateTime = appData.startDateTime.format('YYYYMMDDTHHmmss'); appl.endDateTime = appData.endDateTime.format('YYYYMMDDTHHmmss'); } } } if (appl !== null) { appl.proStatus = proStatus; appl.updateDate = Date.now(); appl.save() .then(() => { if (hasData && impMsg.jobId) return Job.updateOne({ _id: impMsg.jobId }, { $set: { status: JobStatus.SPRAYED } }); // Update Job Status as Downloaded }) .then(() => cb(null, { appError: hasAppError })) .catch(err => { debug(err); cb(err); }) } }); } function updateJobItems(job, impMsg, filePath, areaTypeAndFiles, uid, cb) { let jobItems = [], anyUpdated = false; let updateMeta = false; // Determine whether to update new job meta from the first found file let userWSettings, meta, dup = 0; let usageLimits = { maxAcres: 0, totalSprAcres: 0 }; let sprZoneColor = 'blue'; const createJob = utils.isEmptyObj(job); async.series([ function (callback) { return wasCancelled(impMsg.appId, callback); }, function (callback) { if (!uid) return callback(); return User.aggregate([ { $match: { _id: ObjectId(uid) } }, { $project: { kind: 1, parent: 1, membership: 1, lang: { $ifNull: ["$lang", DEFAULT_LANG] } } }, { $lookup: { from: "settings", localField: "_id", foreignField: "userId", as: "cfg" } }, { $unwind: { path: "$cfg", "preserveNullAndEmptyArrays": true } }, ]) .exec((err, result) => { if (err) return callback(err); if (result && result.length) { userWSettings = result[0]; if (userWSettings.cfg && userWSettings.cfg.colors && userWSettings.cfg.colors["sprayZone"]) sprZoneColor = userWSettings.cfg.colors.sprayZone; } else return callback(AppError.create(Errors.USER_NOT_FOUND)); callback(); }) }, function (callback) { return wasCancelled(impMsg.appId, callback); }, function (callback) { // Get membership subscription info (only if it is about job modification) and check for the 1st round if (impMsg.updateOp === JobUpdateOp.APPEND || impMsg.updateOp === JobUpdateOp.OVERWRITE) { getUsageLimits(userWSettings) .then(theUsageLimits => { theUsageLimits && (usageLimits = theUsageLimits); callback(); // callback((usageLimits.maxAcres && (usageLimits.totalSprAcres) >= usageLimits.maxAcres) ? Errors.REACHED_AREA_LIMIT : null); }) .catch(err => { callback(err); }) } else callback(); }, function (callback) { if (createJob) { // create new job for the case of not existed updateMeta = true; let isUs = (userWSettings.lang === 'pt') ? false : true; job = new Job({ client: ObjectId(impMsg.clientId), name: utils.normalizeName(impMsg.file.originalName.replace(path.extname(impMsg.file.originalName), '')), measureUnit: isUs, swathWidth: isUs ? 30 : 10, appRate: 10, appRateUnit: isUs ? 1 : 3, }); job.byPuid = userWSettings.kind === UserTypes.APP ? userWSettings._id : userWSettings.parent; new Promise(resolve => resolve({ isPilot: userWSettings.kind === UserTypes.OPERATOR })) .then(value => { if (value.isPilot) return Pilot.findById(ObjectId(uid), '-password', { lean: true }); else return null; }) .then(pilot => { if (pilot) job.operator = pilot._id; // Try reading the first non-empty Qfile for aditional meta-data return utils.execAsync(`find -P "${filePath}" -name "q*.t*" -type f -not -empty | head -n 1`); }) .then(qfile => { if (qfile) return fileAgNav.readQFile_Async(qfile.replace(/\r?\n|\r/g, '')); else return null; }) .then(data => { if (data) { job.flightNumber = data.flightNumber; job.remark = data.remark; // job.crop = data.crob; // May be try to lookup w entities first with case-insensitive ? job.appRate = utils.toNumber(data.appRate, job.appRate); job.appRateUnit = utils.rateStringToCode(data.appRateUnitStr); job.measureUnit = (job.appRateUnit > 2) ? false : true; } callback(); }) .catch(err => callback(err)); } else callback(); }, function (callback) { return wasCancelled(impMsg.appId, callback); }, function (outterCB) { const ops = { sprZoneColor: sprZoneColor, isUS: job.measureUnit, appRate: job.appRate, crop: job.crop }; async.eachSeries(areaTypeAndFiles, (areaFile, esCB) => { // debug("Reading file: ", areaFile); async.series([ function (callback) { return wasCancelled(impMsg.appId, callback); }, function (callback) { switch (areaFile.type) { case FILE.FILE_NO1: case FILE.FILE_PRJ: fileAgNav.readAGN_PRJ(filePath, areaFile, ops, (err, data) => { if (err) { debug(err.stack); if (areaTypeAndFiles.length === 1) return callback(AppError.create(Errors.INVALID_JOB_FILE)); //else Just ignore to move to the next file but log } if (data) { jobItems.push({ type: FILE.FILE_NO1, items: data }); if (updateMeta && !meta && data.meta) { meta = data.meta; } } callback(); }); break; case FILE.FILE_KMZ: case FILE.FILE_KML: fileKML.readKmlKmzToGeoItems(filePath, areaFile, ops, (err, data) => { if (err) { debug(err); if (areaTypeAndFiles.length === 1) return callback(AppError.create(Errors.INVALID_JOB_FILE)); //else Just ignore to move to the next file but log } if (data) jobItems.push({ type: FILE.FILE_KML, items: data }); callback(); }); break; case FILE.FILE_SHP: fileShp.readSHPToGeoItems(filePath, areaFile, ops) .then((data) => { if (data) jobItems.push({ type: FILE.FILE_SHP, items: data }); callback(); }) .catch(err => { if (err) { if (err instanceof AppError) return callback(err); if (areaTypeAndFiles.length === 1) callback(AppError.create(Errors.INVALID_JOB_FILE)); } else callback(); // Ignore error in the case the zip file contains multiple area files }) break; case FILE.FILE_SATLOG_JOB: fileSatLog.readSatLogJob(filePath, areaFile, ops, (err, data) => { if (err) { debug(err); if (areaTypeAndFiles.length === 1) return callback(AppError.create(Errors.INVALID_JOB_FILE)); //else Just ignore to move to the next file but log } if (data) { jobItems.push({ type: FILE.FILE_SATLOG_JOB, items: data }); if (updateMeta && !meta && data.meta) { meta = data.meta; } } callback(); }); break; default: return callback(); } }, function (callback) { if (updateMeta && !meta && areaFile.agn && (areaFile.type === FILE.FILE_SHP || areaFile.type === FILE.FILE_KML || areaFile.type === FILE.FILE_KMZ)) { fileAgNav.readAGN_PRJ(filePath, { area: areaFile.agn, type: FILE.FILE_AGN }, ops, (err, data) => { if (data && data.meta) { meta = data.meta; } callback(); }); } else callback(); } ], (err) => { esCB(err); }); }, (err) => { outterCB(err); }); }, function (callback) { return wasCancelled(impMsg.appId, callback); }, function (callback) { if (!jobItems.length) return callback(impMsg.updateOp === JobUpdateOp.OVERWRITE ? null : AppError.create(Errors.ITEMS_NOT_FOUND)); try { if (updateMeta && meta) { // Update the new job with read measureUnit and swath job.measureUnit = (meta.measureUnit == 1); if (meta.swath > 0) job.swathWidth = meta.swath; } // Collect all items from area files here and do the Job update let sprayAreas = [], xclAreas = [], waypoints = [], bufs = [], places = []; for (let i = 0; i < jobItems.length; i++) { const item = jobItems[i]; if (item.items) { if (!utils.isEmptyArray(item.items.sprayAreas)) sprayAreas = sprayAreas.concat(item.items.sprayAreas); if (!utils.isEmptyArray(item.items.xclAreas)) xclAreas = xclAreas.concat(item.items.xclAreas); if (!utils.isEmptyArray(item.items.waypoints)) waypoints = waypoints.concat(item.items.waypoints); if (item.type === FILE.FILE_KML) { if (!utils.isEmptyArray(item.items.bufs)) bufs = bufs.concat(item.items.bufs); if (!utils.isEmptyArray(item.items.places)) places = places.concat(item.items.places); } } } let oldAreaIds = [], checkRes; //Check for the import item modification options to decide how to update the job's items sprayAreas = jobUtil.cleanAreas(sprayAreas); xclAreas = jobUtil.cleanAreas(xclAreas); const shouldCheckLimits = !!(usageLimits.maxAcres); if (impMsg.updateOp === JobUpdateOp.APPEND) { const allAreas = utils.appendArray(job.sprayAreas, job.excludedAreas); if (!utils.isEmptyArray(sprayAreas)) { checkRes = jobUtil.checkDupAreas(allAreas, sprayAreas); dup += checkRes.dup; job.sprayAreas = utils.appendArray(job.sprayAreas, checkRes.areas); } if (!utils.isEmptyArray(xclAreas)) { checkRes = jobUtil.checkDupAreas(allAreas, xclAreas); dup += checkRes.dup; job.excludedAreas = utils.appendArray(job.excludedAreas, checkRes.areas); } job.waypoints = jobUtil.cleanGeoPoints(utils.appendArray(job.waypoints, waypoints)); job.bufs = utils.appendArray(job.bufs, bufs); job.places = jobUtil.cleanGeoPoints(utils.appendArray(job.places, places)); if (shouldCheckLimits && job.ttSprArea) usageLimits.totalSprAcres -= job.ttSprArea * CVCST.SM2ACR; } else if (impMsg.updateOp === JobUpdateOp.OVERWRITE) { if (job.sprayAreas.length) oldAreaIds = job.sprayAreas.map(a => a._id); job.sprayAreas = sprayAreas; job.excludedAreas = xclAreas; job.waypoints = jobUtil.cleanGeoPoints(waypoints); job.bufs = bufs; job.places = jobUtil.cleanGeoPoints(places); if (shouldCheckLimits && job.ttSprArea) usageLimits.totalSprAcres -= job.ttSprArea * CVCST.SM2ACR; } else if (impMsg.updateOp === JobUpdateOp.XCLS) { let xcl; for (let i = 0; i < sprayAreas.length; i++) { // Convert all found sprayAreas to xcls xcl = jobUtil.sprayToXCL(sprayAreas[i]); if (xcl) xclAreas.push(xcl); } checkRes = jobUtil.checkDupAreas(job.excludedAreas, xclAreas); dup += checkRes.dup; job.excludedAreas = jobUtil.cleanAreas(utils.appendArray(job.excludedAreas, checkRes.areas)); } // Ensure Maximum of Items if (!utils.isEmptyArray(job.sprayAreas) && job.sprayAreas.length >= FILE.MAX_ITEM) job.sprayAreas = job.sprayAreas.slice(0, FILE.MAX_ITEM); if (!utils.isEmptyArray(job.excludedAreas) && job.excludedAreas.length >= FILE.MAX_ITEM) job.excludedAreas = job.excludedAreas.slice(0, FILE.MAX_ITEM); if (!utils.isEmptyArray(job.waypoints) && job.waypoints.length >= FILE.MAX_ITEM) job.waypoints = job.waypoints.slice(0, FILE.MAX_ITEM); if (!utils.isEmptyArray(job.bufs) && job.bufs.length >= FILE.MAX_ITEM) job.bufs = job.bufs.slice(0, FILE.MAX_ITEM); if (!utils.isEmptyArray(job.places) && job.places.length >= FILE.MAX_ITEM) job.places = job.places.slice(0, FILE.MAX_ITEM); const newTTSprSqrMeters = jobUtil.calcTTSprayAreas(job.sprayAreas, job.excludedAreas); if (shouldCheckLimits) { if (usageLimits.totalSprAcres + (newTTSprSqrMeters * CVCST.SM2ACR) >= usageLimits.maxAcres) AppMembershipError.throw(Errors.REACHED_AREA_LIMIT); } job.ttSprArea = newTTSprSqrMeters * CVCST.SM2HA; const updateFn = async (job) => { if (undefined === job._id) { await dbConn.transaction(async (session) => { await job.save({ session }); await App.updateOne({ _id: ObjectId(impMsg.appId) }, { $set: { jobId: job._id } }).session(session); }); } else { await job.save({ validateModifiedOnly: true }); } }; updateFn(job) .then(() => { if (!impMsg.jobId) impMsg.jobId = job._id; if (impMsg.updateOp === JobUpdateOp.OVERWRITE) return jobUtil.deleteAreaLines(oldAreaIds); // Remove grid lines of previous areas (before sprayAreas were overriten) }) .then(() => { anyUpdated = true; callback(); }) .catch(err => { debug(err.stack); return callback(err); }); } catch (err) { callback(err); } } ], function (err) { jobItems = null; if (err) { debug('Error while updating Job Item', err); cb(err); } else { cb(null, { job: job, updated: anyUpdated, dup: dup }); } }); } async function getUsageLimits(user) { if (!user || (user.kind !== UserTypes.APP) && !user.parent) return user; try { let memUser = user, usageLimits = { maxAcres: 0, totalSprAcres: 0 }; if (UserTypes.APP !== memUser.kind && memUser.parent) { memUser = await User.findById({ parent: memUser.parent }); } if (!memUser) AppAuthError.throw(); const pkgSub = subUtil.getPkgSubfromUserInfo(memUser); usageLimits.maxAcres = subUtil.getSubMetaField(pkgSub, SubFields.MAX_ACRES) || 0; if (usageLimits.maxAcres) { usageLimits.totalSprAcres = await subUtil.calcTotalAreaByUser(memUser._id, pkgSub.periodStart, pkgSub.periodEnd) * CVCST.HA2ACR; } return usageLimits; } catch (err) { throw err; } } function importData(dataPath, appId, job, cb) { let appData, totalSprays = 0, totalSprLength = 0, avgRates = [], totalTurnTime = 0, totalSprayTime = 0, totalFlightTime = 0, totalSprMats = 0, dataFiles = [], sprMatsUnit; const importInfo = []; const begin = Date.now(); // DEBUG - Measering total import data time async.series([ function (callback) { return wasCancelled(appId, callback); }, function (callback) { glob(path.join(dataPath, "**/{n*[0-9]*\.t[0-9]*,n*[0-9]*?(spr)@(on|off)*\.dbf,*\.asc}"), globOps, (err, files) => { if (err || files.length === 0) { return callback(); } // Classify found data files into known data format file entries let match, file, typedFiles = []; for (let i = 0; i < files.length; i++) { file = files[i]; let basename = path.basename(file), agn; // AG-NAV data nt files if ((match = basename.match(/^n(\d{7})(-\d+)?.*\.t(\d{1,2})$/i))) { agn = match.length >= 3 ? match[1] + match[3] + (match[2] || "") : match[1] + match[2]; typedFiles.push({ type: FILE.DATA_AGNAV, agn: agn, file: file }); // ESRI Shape spray on/off files } else if ((match = basename.match(/^n(\d{9})(-\d+)?.*spr(?:on|off).*\.dbf$/i))) { agn = match[1]; if (match[2] && match[2].startsWith('-')) agn += match[2]; typedFiles.push({ type: FILE.DATA_SHAPE, agn: agn, file: file }); // SATLOG exported ascii data files } else if ((match = basename.match(/^.*.asc$/i))) { const stats = fs.statSync(file); const m = stats && stats.mtime ? moment.utc(stats.mtime) : moment.utc(); // Note: might think about updating this later from the data if needed agn = m.format('YYMMDDHHmm').substring(1); typedFiles.push({ type: FILE.DATA_SALOG, agn: agn, file: file }); } } // fs.writeFileSync('./data.json', JSON.stringify(typedFiles) , 'utf-8'); if (typedFiles.length) { typedFiles.sort(utils.dynamicSort('agn')); // Sort them in ascending datetime (from file names) order // Pick non-SHAPE data file items dataFiles = utils.appendArray(dataFiles, typedFiles.filter(it => it.type !== FILE.DATA_SHAPE).map(it => [it])); // Pick SHAPE spray-on and off pairs or spray-on data file items only let i = 0, type2Files = typedFiles.filter(it => it.type === FILE.DATA_SHAPE); while (type2Files.length > 0) { let item = type2Files[i]; type2Files.splice(i, 1); let m = item.file.match(/^(.*n(?:\d{9}).*spr)(on|off).*\.dbf$/i); item['sprayOn'] = (m[2].toLocaleLowerCase() === 'on'); let find = item['sprayOn'] ? 'off' : 'on'; let j = type2Files.length - 1; let foundPair = false; while (type2Files.length && j >= 0) { let regex = new RegExp(`^${m[1]}${find}.*.dbf$`, 'i'); if (regex.test(type2Files[j].file) && item.agn === type2Files[j].agn) { type2Files[j]['sprayOn'] = !item['sprayOn']; dataFiles.push([item, type2Files[j]]); type2Files.splice(j, 1); foundPair = true; break; } j--; } if (!foundPair) { if (/^.*n\d{9}.*spron.*\.dbf$/i.test(item.file)) dataFiles.push([item]); } } } return callback(); }); }, function (callback) { return wasCancelled(appId, callback); }, function (callback) { if (!dataFiles.length) return callback(); // debug("Files: ", dataFiles); async.eachSeries(dataFiles, function (fileItems, callback) { async.series([ function (callback) { wasCancelled(appId, callback) }, function (callback) { importDataFiles(fileItems, appId, job, (err, data) => { // debug("Done read for: ", fileItems); if (data) { importInfo.push({ agn: fileItems[0].agn, info: data }); if (utils.isNumber(data.totalSprayed)) totalSprays += data.totalSprayed; if (utils.isNumber(data.totalSprLength)) totalSprLength += data.totalSprLength; if (utils.isNumber(data.turnTime)) totalTurnTime += data.turnTime; if (utils.isNumber(data.turnTime)) totalSprayTime += data.sprayTime; if (utils.isNumber(data.totalTime)) totalFlightTime += data.totalTime; if (data.avgRate) avgRates.push(data.avgRate); if (utils.isNumber(data.totalSprayMat)) { totalSprMats += data.totalSprayMat; sprMatsUnit = data.totalSprayMatUnit; // Asssume that all material unit from files are the same } } callback(); }); } ], callback); }, err => { // If any of the saves produced an error, err would equal that error debug(`All ${dataFiles.length} files Imported.`); if (err) callback(err); else callback(); }); }, function (callback) { return wasCancelled(appId, callback); }, function (callback) { if (!importInfo.length) return callback(); let first = importInfo[0]; let last = importInfo[importInfo.length - 1]; // Compute application start and end time const startendDate = computeStartEndDate(first.info.firstTime, last.info.lastTime, first.agn, last.agn); appData = { startDateTime: startendDate.start, endDateTime: startendDate.end, appRate: avgRates.length ? avgRates.reduce(function (a, b) { return Number(a) + Number(b); }) / avgRates.length : 0, totalSprayed: totalSprays, totalSprLength: totalSprLength, totalSprayTime: totalSprayTime, totalTurnTime: totalTurnTime, totalFlightTime: totalFlightTime, totalSprayMat: totalSprMats, totalSprayMatUnit: sprMatsUnit } const duration = Date.now() - begin; debug('Total (data): %dms', duration); callback(); } ], (err) => { if (err) { return cb(err); } cb(null, appData); }); } function importDataFiles(fileItems, appId, job, cb) { let appFile, firstTime, lastTime, totalSprMats = 0, sprMatsUnit;; let importInfo = { firstTime: 0, lastTime: 0, turnTime: 0, sprayTime: 0, totalTime: 0 }; let fileName = path.basename(fileItems[0].file), fileMeta, hasData = false; const dataType = fileItems[0].type; if (fileItems.length > 1 && dataType === FILE.DATA_SHAPE) { fileName = fileName.replace(new RegExp("(on|off)", "i"), "onoff"); fileItems.sort(utils.dynamicSort('-sprayOn')); // Make spray-on file first in the list } async.series([ function (callback) { // Read q file if exists for AGNAV binary data const file = fileItems[0]; if (file.type === FILE.DATA_AGNAV || file.type === FILE.DATA_SHAPE) { let qfilePath = path.join(path.dirname(file.file), 'q' + (file.type === FILE.DATA_AGNAV ? path.basename(file.file).slice(1) : file.agn)); fileAgNav.readQFile(qfilePath, (err, meta) => { fileMeta = meta; callback(); }); } else { callback(); } }, function (callback) { const fileObj = { appId: appId, name: fileName, agn: fileItems[0].agn }; if (utils.isEmptyObj(fileMeta)) { fileObj.note = "NO_QFILE"; fileMeta = { fcType: 'none', appRate: job.appRate, rateUnit: job.appRateUnit, hasQfile: false }; } else { fileMeta.hasQfile = true; } fileObj.meta = fileMeta; appFile = new AppFile(fileObj); appFile.save(err => { if (err) return callback(err); callback(); }); }, function (callback) { async.eachSeries(fileItems, function (dataFile, callback) { switch (dataFile.type) { case FILE.DATA_AGNAV: readNTFile(dataFile.file, fileMeta, appFile._id, (err, data) => { if (err) return callback(err); importInfo = data; if (importInfo && !utils.isEmptyArray(importInfo.records)) { firstTime = importInfo.records[0].gpsTime; lastTime = importInfo.records[importInfo.records.length - 1].gpsTime; if (utils.isNumber(data.totalSprayMat)) { totalSprMats += data.totalSprayMat; sprMatsUnit = data.totalSprayMatUnit; } } callback(); }); break; case FILE.DATA_SHAPE: readShapeDataFile(dataFile, fileMeta, appFile._id, (err, data) => { if (err) return callback(err); if (data) { if (dataFile['sprayOn']) { importInfo.totalSprayed = data.totalSprayed; importInfo.avgRate = data.avgRate; if (utils.isNumber(data.totalSprayMat)) { totalSprMats += data.totalSprayMat; sprMatsUnit = data.totalSprayMatUnit; } } if (!utils.isEmptyArray(data.records)) { if (utils.isEmptyArray(importInfo.records)) { firstTime = data.records[0].gpsTime; lastTime = data.records[data.records.length - 1].gpsTime; } else { if (data.records[0].gpsTime < firstTime) firstTime = data.records[0].gpsTime; if (data.records[data.records.length - 1].gpsTime > lastTime) lastTime = data.records[data.records.length - 1].gpsTime; } } importInfo.records = utils.appendArray(importInfo.records, data.records); } // else if (!dataFile['sprayOn']) { // // Skip the spray-off if no sprayon file // return callback(new Error(Errors.DATA_NO_SPRAYON)); // } callback(); }); break; case FILE.DATA_SALOG: readSatLogAsc(dataFile.file, appFile._id, (err, data) => { if (err) return callback(err); importInfo = data; if (importInfo && !utils.isEmptyArray(importInfo.records)) { firstTime = importInfo.records[0].gpsTime; lastTime = importInfo.records[importInfo.records.length - 1].gpsTime; // Update agn from the Date and time from the 1st item if (importInfo.records[0]['Date']) { let fileDT = moment(`${new Date().getFullYear().toString().substring(0, 2)}${importInfo.records[0]['Date']} ${importInfo.records[0]['Time']}`, 'YYYYMMDD HH:mm:ss'); if (fileDT.isValid()) { fileItems[0].agn = fileDT.format('YYMMDDHHmm').substring(1); } } } callback(); }); break; default: return callback(); } }, err => { // Ensure skipping the loop normally incase of no spray-on data if (err && err.message == Errors.DATA_NO_SPRAYON) callback(); else callback(err); }) // eachSeries }, // Empty file ? => remove the created appFile entry function (callback) { hasData = (importInfo && !utils.isEmptyArray(importInfo.records)); if (appFile && !hasData) { appFile.remove(callback); } else callback(); }, // Save records of the file to db function (callback) { if (!hasData) return callback(); // Sort spray-on and off data by gspTime (provided data in the two files has been already compensated a day after passing midnight) if (fileItems.length > 1 && dataType === FILE.DATA_SHAPE) importInfo.records.sort(utils.dynamicSort('gpsTime', true)); const chunks = utils.chunkArray(importInfo.records, 1000); async.eachSeries(chunks, (chunk, cb) => { // Use series to ensure correct insert order of the records AppDetail.insertMany(chunk, { rawResult: true, ordered: true }, (err) => { if (err) return cb(err); cb(); }); }, (err) => { // err & console.log(err); callback(err); }); }, function (callback) { if (!hasData) return callback(); /* Calculate turn time for each file or each pair of shape on/off files Turn Time: total turn times after each spray line. Counting start from spray OFF the previous line to spray ON on the next line (skip the segments within the same line) */ let turnTime = { line: null, at: null, nextOff: false, total: 0 }, timeDif = 0, totalSprTime = 0, totalTime = 0; let prevTime = -999, prevSprTime = -999; let record; for (let i = 0; i < importInfo.records.length; i++) { record = importInfo.records[i]; // Calculate total flight time if (prevTime != -999 && prevTime !== record.gpsTime) { timeDif = record.gpsTime - prevTime; if (timeDif < 0) { if (Math.abs(timeDif) >= 80000) timeDif = (86400 - prevTime) + record.gpsTime; } if (timeDif > 0 && timeDif <= 120) totalTime += timeDif; } prevTime = record.gpsTime; // Calculate spray time (secs) if (record.sprayStat > 0) { if (prevSprTime != -999 && record.sprayStat !== 3) { timeDif = record.gpsTime - prevSprTime; if (timeDif < 0) { if (Math.abs(timeDif) >= 80000) timeDif = (86400 - prevSprTime) + record.gpsTime; } if (timeDif > 0 && timeDif <= 120) totalSprTime += timeDif; } prevSprTime = record.gpsTime; } if (fileItems[0].type !== FILE.DATA_SALOG) { // Calculate turn time (secs) if (null === turnTime.line) { if (!record.sprayStat) { turnTime.line = record.llnum; turnTime.at = record.gpsTime; } } else { if (turnTime.line != record.llnum) { if (record.sprayStat) { timeDif = record.gpsTime - turnTime.at; if (timeDif < 0) { if (Math.abs(timeDif) >= 80000) timeDif = (86400 - turnTime.at) + record.gpsTime; } if (timeDif >= 5 && timeDif <= 120) turnTime.total += timeDif; turnTime.line = record.llnum; turnTime.nextOff = true; } } else { if (!record.sprayStat && turnTime.nextOff) { // Mark start for the next turn turnTime.at = record.gpsTime; turnTime.nextOff = false; } else if (record.sprayStat) { turnTime.nextOff = true; } } } } } importInfo.turnTime = turnTime.total; importInfo.sprayTime = totalSprTime; importInfo.totalTime = totalTime; callback(); }, function (callback) { if (hasData && appFile) { if (utils.isNumber(importInfo.totalSprayed)) appFile.totalSprayed = importInfo.totalSprayed * 1E-4; // m2 to ha if (utils.isNumber(importInfo.turnTime)) appFile.totalTurnTime = importInfo.turnTime; if (utils.isNumber(importInfo.sprayTime)) appFile.totalSprayTime = importInfo.sprayTime; if (utils.isNumber(importInfo.totalTime)) appFile.totalFlightTime = importInfo.totalTime; if (utils.isNumber(importInfo.totalSprLength)) appFile.totalSprLength = importInfo.totalSprLength; if (totalSprMats) { appFile.totalSprayMat = totalSprMats; appFile.totalSprayMatUnit = sprMatsUnit; } appFile.save(callback); } else callback(); } ], (err) => { if (err) return cb(err); if (importInfo && !utils.isEmptyArray(importInfo.records)) { importInfo.firstTime = firstTime; importInfo.lastTime = lastTime; importInfo.totalSprayMat = totalSprMats; importInfo.totalSprayMatUnit = sprMatsUnit; delete importInfo.records; gc(); return cb(null, importInfo); } else return cb(); }); } function readNTFile(file, fileMeta, fileId, cb) { let binBuf, records = []; let sprayedSeg = 0, totalSprays = 0, totalSprMats = 0, sprMatsUnit, totalAppRates = 0, totalSprayRecs = 0; let prevUTM_X, prevUTM_Y, prevSwath, prevLine, prevStat = 0; async.series([ function (callback) { fs.readFile(file, (err, data) => { if (err) return callback(err); binBuf = data; callback(); }); }, function (callback) { if (!binBuf || !Buffer.isBuffer(binBuf)) return callback(); let latlon, startIdx = 0, timeOffset = 0, appliedRate; const rateInfo = utils.rateInfoFromFileMeta(fileMeta, RecTypes.AGN_BIN_LQD), recType = rateInfo.recType; while (binBuf.length - startIdx >= FILE.AGN_PACK_SIZE) { if (DataUtil.isValidAgn(binBuf, startIdx)) { const record = DataUtil.readAgnBinary(binBuf, startIdx, recType); if (!record) continue; if (record.type === RecTypes.AGN_AMS) { if (records.length) { // Copy AMS record fields to the last main record records[records.length - 1] = DataUtil.mergeAgnAms(records[records.length - 1], record); } } else { if (record.timeAdv > 0.0) { // Shift the lat lon coordinate according to time Advance (system lag) latlon = geoUtil.projectLatLong(record.lat, record.lon, record.grSpeed, record.head, record.timeAdv); record.lat = latlon.lat; record.lon = latlon.lon; } record["fileId"] = fileId; records.push(record); // For compensation with a day when gpsTime passing mid-night (rolloff) if (records.length > 1 && timeOffset === 0 && ((records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) < 0 && Math.abs(records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) >= 80000)) timeOffset = 86400; record.adjustGpsTime(timeOffset); if (record.sprayStat > 0) { ({ appliedRate, sprMatsUnit } = getAppliedRate(record, rateInfo, rateInfo.recType === RecTypes.AGN_BIN_LQD)); if (record.lhaReq > 0) { totalSprayRecs++; totalAppRates += record.lhaReq; } if (record.sprayStat === 3) { prevUTM_X = record.utmX; prevUTM_Y = record.utmY; prevSwath = record.swath; prevLine = record.llnum; prevStat = record.sprayStat; } else { if (prevStat > 0 && prevLine === record.llnum) { sprayedSeg = Math.hypot(record.utmX - prevUTM_X, record.utmY - prevUTM_Y) * prevSwath; if (sprayedSeg) { totalSprays += sprayedSeg; if (appliedRate > 0) { totalSprMats += (sprayedSeg * CVCST.SM2HA) * appliedRate; } } } prevUTM_X = record.utmX; prevUTM_Y = record.utmY; prevSwath = record.swath; prevLine = record.llnum; prevStat = record.sprayStat; } } } } startIdx += FILE.AGN_PACK_SIZE; } binBuf = null; callback(); } ], err => { if (err) return cb(err); const fileDataInfo = { records: records, totalSprayMat: totalSprMats, totalSprayMatUnit: sprMatsUnit, totalSprayed: totalSprays, avgRate: totalSprayRecs > 0 ? totalAppRates / totalSprayRecs : 0 } // gc(); cb(null, fileDataInfo); }); } /** * Determine the applied rate based on metadata from file or from recorded data record * @param {*} record * @param {*} rateInfo * @param {*} isLiquid * @returns { appliedRate, sprMatsUnit } in Metric: L/Ha or Kg/Ha */ function getAppliedRate(record, rateInfo, isLiquid) { let appliedRate, sprMatsUnit; if (rateInfo && rateInfo.appRate && (!rateInfo.useFC || !record.lminApp)) { appliedRate = rateInfo.appRate; if (rateInfo.rateUnit === 0) { appliedRate = appliedRate * CVCST.OZPA2LPHA; sprMatsUnit = 3; // L/Ha } else { sprMatsUnit = rateInfo.rateUnit; // Convert to metric rate to store to db later const metricRate = utils.toMetricRate(appliedRate, sprMatsUnit); appliedRate = metricRate.value; sprMatsUnit = metricRate.unit; } } else { if (isLiquid) { appliedRate = utils.appRateFromFlowRate(record.lminApp, record.swath, record.grSpeed); sprMatsUnit = 3; // L/Ha } else { appliedRate = record.lminApp; sprMatsUnit = 4; // Kg/Ha } } return { appliedRate, sprMatsUnit }; } function readShapeDataFile(dataFile, fileMeta, fileId, cb) { let sprayedSeg = 0, totalAppRates = 0, totalSprMats = 0, sprMatsUnit, totalSprays = 0, totalSprayRecs = 0; let prevUTM_X, prevUTM_Y, prevSwath, prevLine; let records = [], latlon; fileShp.readDBF4Items(dataFile.file, ["GPSTIME", "LATITUDE", "LONGITUDE", "GRNDSPEED"], (err, items) => { if (err) { debug(`Error in readShapeDataFile(): ${dataFile}`, err) cb(err); return; } let timeOffset = 0, appliedRate; const rateInfo = utils.rateInfoFromFileMeta(fileMeta, RecTypes.AGN_SHP), recType = rateInfo.recType; for (let i = 0; i < items.length; i++) { const record = DataUtil.readShpRecord(items[i]); if (!record) continue; if (record.timeAdv > 0.0) { // Shift the lat lon coordinate according to time Advance (system lag) latlon = geoUtil.projectLatLong(record.lat, record.lon, record.grSpeed, record.head, record.timeAdv); record.lat = latlon.lat; record.lon = latlon.lon; } record["fileId"] = fileId; records.push(record); // For compensation with a day when gpsTime passing mid-night (rolloff) if (i > 0 && records.length > 1 && timeOffset === 0 && ((records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) < 0 && Math.abs(records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) >= 80000)) record.adjustGpsTime(timeOffset); // FOR DEDUG // fs.appendFileSync(`./${fileName}.csv`, `${appDetail.gpsTime}, ${appDetail.sprayStat}, ${appDetail.llnum}, ${appDetail.lat}, ${appDetail.lon}` + endOfLine); if (dataFile['sprayOn']) { if (record.lhaReq > 0) { totalSprayRecs++; totalAppRates += record.lhaReq; } if (record.sprayStat === 3) { prevUTM_X = record.utmX; prevUTM_Y = record.utmY; prevSwath = record.swath; prevLine = record.llnum; } else { ({ appliedRate, sprMatsUnit } = getAppliedRate(record, rateInfo, rateInfo.recType === RecTypes.AGN_SHP)); if (prevLine === record.llnum) { sprayedSeg = Math.hypot(record.utmX - prevUTM_X, record.utmY - prevUTM_Y) * prevSwath; if (sprayedSeg) { totalSprays += sprayedSeg; if (appliedRate > 0) { totalSprMats += (sprayedSeg * CVCST.SM2HA) * appliedRate; } } } prevUTM_X = record.utmX; prevUTM_Y = record.utmY; prevSwath = record.swath; prevLine = record.llnum; } } } let fileDataInfo = null; if (records.length) { fileDataInfo = { records: records, totalSprayMat: totalSprMats, totalSprayMatUnit: sprMatsUnit, totalSprayed: totalSprays, avgRate: totalSprayRecs > 0 ? totalAppRates / totalSprayRecs : 0 } } items = null; return cb(null, fileDataInfo); }); } function readSatLogAsc(dataFile, fileId, cb) { /* const hdrs = ['Time', undefined, undefined, 'Alt', undefined, undefined, undefined, undefined, undefined, 'Date', undefined, 'DOP', undefined, undefined, undefined, 'Hdg', 'Lat', 'Lon', 'RHumi', 'Speed', 'Spray', undefined, 'SU', undefined, 'Temperature', undefined, undefined, undefined, undefined, 'X-Track']; */ let records = [], totalSprLength = 0, currStat = -999, prevStat = -999, curLonLat = turf.point([0, 0]), prevLonLat = turf.point([0, 0]), segLength = 0; fs.createReadStream(dataFile) .pipe(csv.parse({ headers: true, ignoreEmpty: true })) .on('error', err => { if (cb) cb(err); }) .on('data', rowRec => { const record = new WorkRecord(); record.readSLAscRecord(rowRec); if (record.gpsTime && /(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d)/.test(record.gpsTime)) { record.decodeSL(); record["fileId"] = fileId; records.push(record); curLonLat.geometry.coordinates = [record.lon, record.lat]; currStat = record.sprayStat; if (prevStat != -999) { if (prevStat > 0 && currStat > 0 || record.sprayStat != prevStat) { segLength = turf.distance(prevLonLat, curLonLat, { units: "meters" }); if (segLength <= 1000) totalSprLength += segLength; } } prevStat = record.sprayStat; prevLonLat.geometry.coordinates = [curLonLat.lon, curLonLat.lat]; } }) .on('end', rowCount => { curLonLat = prevLonLat = null; const fileDataInfo = { records: records, totalSprayed: 0, totalSprLength: totalSprLength, avgRate: 0 } if (cb) cb(null, fileDataInfo); }); } function closeOnErr(err) { if (!err) return false; try { debug("[AMQP] error:", err); amqpConn.close(); } catch (error) { } return true; } function gc() { if (global.gc) global.gc(); }