1611 lines
59 KiB
JavaScript
1611 lines
59 KiB
JavaScript
'use strict';
|
|
|
|
const
|
|
util = require('util'),
|
|
async = require('async'),
|
|
amqp = require('amqplib/callback_api'),
|
|
dbConn = require('../helpers/db/connect.js')(),
|
|
path = require('path'),
|
|
debug = require('debug')('agm:worker'),
|
|
glob = require('glob'),
|
|
fs = require('fs-extra'),
|
|
csv = require('fast-csv'),
|
|
unzip = util.callbackify(require('extract-zip')),
|
|
moment = require('moment'),
|
|
Redis = require('ioredis'),
|
|
turf = require('@turf/turf'),
|
|
Job = require('../model/job'),
|
|
App = require('../model/application'),
|
|
AppFile = require('../model/application_file'),
|
|
AppDetail = require('../model/application_detail'),
|
|
User = require('../model/user'),
|
|
Pilot = require('../model/pilot'),
|
|
ObjectId = require('mongodb').ObjectId,
|
|
utils = require('../helpers/utils'),
|
|
FILE = require('../helpers/file_constants'),
|
|
fileHelper = require('../helpers/file_helper'),
|
|
fileAgNav = require('../helpers/file_storage'),
|
|
fileKML = require('../helpers/file_kml'),
|
|
fileShp = require('../helpers/file_shp'),
|
|
fileSatLog = require('../helpers/file_satlog'),
|
|
jobUtil = require('../helpers/job_util'),
|
|
geoUtil = require('../helpers/geo_util'),
|
|
subUtil = require('../helpers/subscription_util'),
|
|
{ DataUtil, WorkRecord } = require('../helpers/work_record'),
|
|
{ JobUpdateOp, JobStatus } = require('../helpers/job_constants'),
|
|
CVCST = require('../helpers/convert_constants'),
|
|
{ Errors, RecTypes, UserTypes, AppStatus, AppProStatus, Fields, DEL_APP_IDS, DEFAULT_LANG } = require('../helpers/constants'),
|
|
{ AppError, AppMembershipError, AppAuthError } = require('../helpers/app_error.js'),
|
|
{ SubFields } = require('../model/subscription.js'),
|
|
env = require('../helpers/env'),
|
|
errorHandler = require('error-handler').errorHandler,
|
|
errorCommon = require('error-handler').common;
|
|
|
|
|
|
require('../model/crop');
|
|
|
|
process.setMaxListeners(0);
|
|
errorHandler && (errorHandler.registerUnCaughtProcessErrorsHandler(process, path.join(__dirname, 'job_worker.rlog')));
|
|
|
|
const WkrStatus = Object.freeze({
|
|
BROKE: 'broke',
|
|
CANCELLED: 'cancelled',
|
|
SKIPPED: 'skipped',
|
|
DONE: 'done'
|
|
});
|
|
|
|
const redis = new Redis({ password: env.REDIS_PWD, showFriendlyErrorStack: true });
|
|
|
|
const globOps = { nonull: false, nocase: true, dot: true };
|
|
const sprayItemsOps = { nonull: false, nocase: true, dot: true, ignore: ['**/{(q[0-9]*.t*|n[0-9]*.t*),n[0-9]*spr+(on|off).+(dbf|prj|shp|shx)}'] };
|
|
const uploadPath = env.UPLOAD_DIR;
|
|
const unzipPath = env.UNZIP_DIR;
|
|
const jobQueue = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs';
|
|
|
|
let amqpConn = null;
|
|
let mqClosed = false;
|
|
|
|
let jobMainFile, jobUnZipPath, isKmlOrKmz = false;
|
|
|
|
dbConn.on('disconnected', () => {
|
|
debug('-> MongoDB lost connection -> Going to exit...');
|
|
process.exit(1);
|
|
});
|
|
dbConn.on('error', (err) => {
|
|
debug('-> MongoDB connection error -> Going to exit..., Err: ', err);
|
|
process.exit(1);
|
|
});
|
|
dbConn.on('connected', () => {
|
|
if (!amqpConn)
|
|
start();
|
|
});
|
|
|
|
// Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect
|
|
function start() {
|
|
const conOps = {
|
|
protocol: 'amqp',
|
|
hostname: env.QUEUE_HOST || 'localhost',
|
|
port: env.QUEUE_PORT || 5672,
|
|
username: env.QUEUE_USR || 'agmuser',
|
|
password: env.QUEUE_PWD,
|
|
vhost: env.QUEUE_VHOST || '/',
|
|
heartbeat: env.QUEUE_HEARTBEAT || 0, // Will depends on server settings or whichever lower value, 0 will use server default 60s
|
|
frameMax: 0
|
|
};
|
|
|
|
amqp.connect(conOps, function (err, conn) {
|
|
|
|
if (err) {
|
|
return process.exit();
|
|
}
|
|
conn.on("error", function (err) {
|
|
debug(err);
|
|
return process.exit();
|
|
});
|
|
conn.on("close", function () {
|
|
mqClosed = true;
|
|
return process.exit();
|
|
});
|
|
|
|
amqpConn = conn;
|
|
process.once('SIGINT', () => {
|
|
closeOnErr('force_close_SIGINT');
|
|
});
|
|
whenConnected();
|
|
});
|
|
}
|
|
|
|
function whenConnected() {
|
|
debug("[AMQP] Job Worker connected");
|
|
mqClosed = false;
|
|
startWorker();
|
|
}
|
|
|
|
// A worker that acks messages only if processed succesfully
|
|
function startWorker() {
|
|
debug("Worker started !");
|
|
if (!amqpConn) {
|
|
debug("Ampq connection is null. Please check !");
|
|
return;
|
|
}
|
|
amqpConn.createChannel(function (err, ch) {
|
|
if (closeOnErr(err)) return;
|
|
|
|
ch.on("error", function (err) {
|
|
debug("[AMQP] channel error", err.message);
|
|
});
|
|
|
|
ch.on("close", function () {
|
|
debug("[AMQP] channel closed");
|
|
});
|
|
|
|
ch.prefetch(1); // This tells RabbitMQ not to give more than one message to a worker at a time
|
|
|
|
ch.assertQueue(jobQueue, { durable: true }, (err) => {
|
|
if (closeOnErr(err)) return;
|
|
ch.consume(jobQueue, processMsg, { noAck: false });
|
|
debug("Worker has started consuming messages...");
|
|
});
|
|
|
|
function processMsg(msg) {
|
|
if (!msg) return; // Ignore empty message
|
|
|
|
let impMsg;
|
|
try {
|
|
impMsg = JSON.parse(msg.content);
|
|
} catch (err) {
|
|
debug(err);
|
|
}
|
|
if (!impMsg) return;
|
|
|
|
const msgJobId = impMsg.jobId;
|
|
work(impMsg, (msg.fields && msg.fields.redelivered), (err, result) => {
|
|
if (mqClosed) {
|
|
debug("MQ conn already closed -- Skipping...");
|
|
return;
|
|
}
|
|
let confirmOk = true;
|
|
try {
|
|
if (!err) {
|
|
// Acknowledgement must be sent on the same channel the delivery it is for was received on.
|
|
if (result.appError || result.wasCancelled)
|
|
ch.reject(msg, false);
|
|
else
|
|
ch.ack(msg);
|
|
}
|
|
}
|
|
catch (rErr) {
|
|
confirmOk = false;
|
|
debug('[Work] channel ack/reject error', rErr);
|
|
closeOnErr(rErr);
|
|
}
|
|
finally {
|
|
const logDone = () => { debug("[x] Done for: ", msg.content.toString()); };
|
|
if (result.wasCancelled) {
|
|
const cleanJobFn = async (impMsg) => {
|
|
const app = await App.findOne({ _id: ObjectId(impMsg.appId) });
|
|
if (!app) return;
|
|
// The case of import file to create a job => remove that job fully
|
|
if (app.byImport && undefined === msgJobId && app.jobId) {
|
|
const job = await Job.findById(app.jobId);
|
|
if (job) await job.removeFull();
|
|
} else {
|
|
// Only need to remove the application
|
|
await jobUtil.deleteAppById(result.appId);
|
|
}
|
|
await redis.srem(DEL_APP_IDS, result.appId);
|
|
};
|
|
|
|
cleanJobFn(impMsg).catch(err => {
|
|
if (err) debug(err);
|
|
}).finally(() => {
|
|
debug(`Task with appId:${impMsg.appId} was cancelled!`);
|
|
});
|
|
} else if (result.wasSkipped) {
|
|
// Do nothing yet. Normally for marked as deleted applications. They going to be cleaned by a Cleanup Worker
|
|
logDone();
|
|
} else {
|
|
_cleanup(confirmOk, result.appError);
|
|
logDone();
|
|
}
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Clean up files or folders depends on the task processing conditions
|
|
* @param {boolean} taskAcked Whether the task was acknowledged (done either w/ success or error, and ready to take the next task).
|
|
* @param {boolean} hasAppError Where there was any application cope errors happened (crashes, unhandled promise exception occured)
|
|
*/
|
|
function _cleanup(taskAcked = true, hasAppError = false) {
|
|
let removeList = [];
|
|
if (taskAcked && hasAppError)
|
|
removeList = isKmlOrKmz ? [jobMainFile, path.join(jobUnZipPath, jobMainFile)] : [jobMainFile, jobUnZipPath];
|
|
else
|
|
removeList = isKmlOrKmz ? [path.join(jobUnZipPath, jobMainFile)] : [jobUnZipPath];
|
|
|
|
if (!utils.isEmptyArray(removeList))
|
|
fileHelper.removeFiles(removeList, err => { if (err) debug(err) });
|
|
}
|
|
|
|
/**
|
|
* Calculate offset in hours given GPS time in seconds and time from an Agnav data filename
|
|
*
|
|
* @param {any} gpsTimeSeconds
|
|
* @param {any} fileNameAgNavDT AgNav datetime used in file name ([last digit of the full year][MM][dd][HHmm])
|
|
* @returns offset in hours
|
|
*/
|
|
function computeGPSTimeOffsetLocalTime(gpsTimeSeconds, fileNameAgNavDT) {
|
|
const gpsDuration = moment.duration(new Date(1000 * gpsTimeSeconds).toISOString().substring(11, 16));
|
|
const localDuration = moment.duration(utils.dateTimePartsFromAgNav(fileNameAgNavDT).time);
|
|
const diff = localDuration.subtract(gpsDuration);
|
|
|
|
return Number(diff.asHours().toFixed(2));
|
|
}
|
|
|
|
function computeStartEndDate(startGPSTimeSecs, endGPSTimeSecs, firstFileAgNavDT, endFileAgNavDT) {
|
|
const offsetHrs = computeGPSTimeOffsetLocalTime(startGPSTimeSecs, firstFileAgNavDT);
|
|
const sdate = utils.dateTimePartsFromAgNav(firstFileAgNavDT, 2);
|
|
const startDate = utils.toUTCDateTime(startGPSTimeSecs, sdate.date);
|
|
const edate = utils.dateTimePartsFromAgNav(endFileAgNavDT, 2);
|
|
const endDate = utils.toUTCDateTime(endGPSTimeSecs, edate.date);
|
|
|
|
if (offsetHrs) {
|
|
startDate.add(offsetHrs, 'hours');
|
|
endDate.add(offsetHrs, 'hours');
|
|
}
|
|
if (endDate < startDate) endDate.add(1, 'days');
|
|
|
|
return { start: startDate, end: endDate };
|
|
}
|
|
|
|
function removeAppData(appId, cb) {
|
|
AppFile.find({ appId: appId }, (err, appFiles) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
async.eachSeries(appFiles, (appFile, cb) => {
|
|
appFile.remove(cb);
|
|
}, cb);
|
|
});
|
|
}
|
|
|
|
function wasCancelled(appId, cb) {
|
|
let ret;
|
|
canAppProceed(appId)
|
|
.then(res => ret = res)
|
|
.catch(err => ret = err)
|
|
.finally(() => cb && cb(ret));
|
|
}
|
|
|
|
async function canAppProceed(appId) {
|
|
const wasCancelled = await redis.sismember(DEL_APP_IDS, appId);
|
|
if (wasCancelled)
|
|
return WkrStatus.CANCELLED;
|
|
|
|
const app = await App.findById(appId, { lean: true });
|
|
if (!app || app[Fields.MARKED_DELETE] === true)
|
|
return WkrStatus.SKIPPED;
|
|
else if (app && app.status === AppStatus.WAS_CANCELLED)
|
|
return WkrStatus.CANCELLED;
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Import data or job items of the job into db.
|
|
*
|
|
* STEPS:
|
|
* 1. Unzip the arvchive into unzip folder
|
|
* 2. Create a new Application with status 1
|
|
* 3. Read area files and override the job's items (from NO1, PRJ)
|
|
* 4. Read data files and import to App-Detail collections
|
|
* 5. When finished importing, update status as done (3 - done, 0 - Error),
|
|
*
|
|
* Notes: Now import job items(spray, xcl, waypoint) found within .zip, (kml/kmz): buffer, placemark as well
|
|
* @param {any} impMsg the queued import message taken from the queue
|
|
* @param {boolean} redelivered whether the message was redelivered (failed to processed before)
|
|
* @param {any} cb callback method
|
|
*/
|
|
function work(impMsg, redelivered, cb) {
|
|
debug("Got msg ", impMsg);
|
|
|
|
let areaTypeAndFiles = [], areaFiles = [];
|
|
let appl = null, appData, hasData = false;
|
|
let _job, proStatus = AppProStatus.ERROR, hasAppError = false;
|
|
|
|
jobMainFile = path.join(uploadPath, impMsg.file.name);
|
|
isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(jobMainFile);
|
|
|
|
jobUnZipPath = isKmlOrKmz ? uploadPath : path.join(unzipPath, impMsg.file.name.replace(path.extname(impMsg.file.name), ''));
|
|
jobUnZipPath = path.join(jobUnZipPath, '/');
|
|
|
|
async.series([
|
|
function (callback) {
|
|
return wasCancelled(ObjectId(impMsg.appId), callback);
|
|
},
|
|
function (callback) {
|
|
App.findById(ObjectId(impMsg.appId), (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
appl = data;
|
|
if (!appl)
|
|
return callback(AppError.create(WkrStatus.BROKE));
|
|
else if (appl.status === AppStatus.DONE)
|
|
return callback(AppError.create(WkrStatus.DONE));
|
|
else if (appl[Fields.MARKED_DELETE] === true)
|
|
return callback(AppError.create(WkrStatus.SKIPPED));
|
|
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (impMsg.jobId !== undefined) {
|
|
Job.findById(impMsg.jobId)
|
|
.populate({ path: 'crop', select: '_id name color', skipInvalidIds: true })
|
|
.then(job => {
|
|
if (job) {
|
|
_job = job;
|
|
return callback();
|
|
} else {
|
|
AppError.throw(Errors.JOB_NOT_FOUND);
|
|
}
|
|
})
|
|
.catch(err => callback(err));
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
// Handle redelivered (in case of crashed during processing the task) message to avoid duplicated data
|
|
if (redelivered) {
|
|
if (!utils.isBlank(appl.errorMsg) && errorCommon.isAppError(appl.errorMsg)) {
|
|
return callback(AppError.create(WkrStatus.SKIPPED));
|
|
} else if (appl.byImport && !appl.jobId) {
|
|
return appl.status === AppStatus.CREATED ? callback() : callback(AppError.create());
|
|
} else {
|
|
if (impMsg.updateOp !== JobUpdateOp.XCLS)
|
|
removeAppData(appl._id, callback); // remove previous fragmented imported data
|
|
else
|
|
callback(); // re-process the task
|
|
}
|
|
} else callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (appl.status !== AppStatus.IN_PROGRESS) {
|
|
appl.status = AppStatus.IN_PROGRESS;
|
|
return appl.save(callback);
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (isKmlOrKmz) {
|
|
areaFiles.push(jobMainFile);
|
|
return callback();
|
|
}
|
|
else {
|
|
async.series([
|
|
function (callback) {
|
|
unzip(jobMainFile, { dir: jobUnZipPath }, (err) => {
|
|
if (err) {
|
|
debug(err);
|
|
return callback(AppError.create(Errors.CORRUPTED_ZIP));
|
|
}
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
if (impMsg.updateOp === JobUpdateOp.DATA_ONLY)
|
|
return callback();
|
|
// Find the files which might contain job items
|
|
glob(path.join(jobUnZipPath, "**/*.{job,kml,kmz,no1,prj,shp,dbf,agn,dsp,xcl,vfr}"), sprayItemsOps, (err, files) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
areaFiles = files;
|
|
if (areaFiles.length === 0)
|
|
return callback(impMsg.updateOp === JobUpdateOp.OVERWRITE ? null : AppError.create(Errors.ITEMS_NOT_FOUND));
|
|
callback();
|
|
});
|
|
},
|
|
], callback);
|
|
}
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (impMsg.updateOp !== JobUpdateOp.DATA_ONLY)
|
|
areaTypeAndFiles = fileHelper.areasFromList(jobUnZipPath, areaFiles);
|
|
callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (impMsg.updateOp > JobUpdateOp.DATA_ONLY) {
|
|
if (areaTypeAndFiles.length) {
|
|
updateJobItems(_job || {}, impMsg, jobUnZipPath, areaTypeAndFiles, impMsg.uid, (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
_job = data.job; // Why _job passed by ref does not work any longer ????
|
|
|
|
if (data.updated) {
|
|
proStatus = AppProStatus.ERROR + 10;
|
|
if (!appl.jobId) appl.jobId = impMsg.jobId;
|
|
if (data.dup) {
|
|
if (appl.warnMsg) // Update numbers of duplicated areas
|
|
appl.warnMsg.set('dup', data.dup);
|
|
else appl.warnMsg = { dup: data.dup };
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
}
|
|
else
|
|
callback(!_job ? AppError.create(Errors.ITEMS_NOT_FOUND) : null);
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appl._id, callback);
|
|
},
|
|
function (callback) {
|
|
if (isKmlOrKmz || impMsg.updateOp === JobUpdateOp.XCLS)
|
|
return callback(); // Not import data for KML/KMZ, XCL files
|
|
|
|
importData(jobUnZipPath, appl._id, _job.toObject(), (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
appData = data;
|
|
hasData = !!(data);
|
|
callback();
|
|
});
|
|
}
|
|
], function (err) {
|
|
proStatus += hasData ? AppProStatus.WITH_DATA : AppProStatus.NO_DATA;
|
|
|
|
if (err) {
|
|
if ((hasAppError = (err instanceof AppError))) {
|
|
if (WkrStatus.BROKE === err.message)
|
|
return cb(null, { appError: true });
|
|
else if (WkrStatus.DONE === err.message)
|
|
return cb(null, { appError: false });
|
|
else if (WkrStatus.SKIPPED === err.message)
|
|
return cb(null, { wasSkipped: true });
|
|
if (WkrStatus.CANCELLED === err.message) {
|
|
return cb(null, { appError: hasAppError, wasCancelled: true, appId: impMsg.appId });
|
|
}
|
|
}
|
|
|
|
if (appl !== null) {
|
|
appl.errorMsg = hasAppError ? err.message : utils.trimStrTo(err.toString(), 100);
|
|
appl.status = AppStatus.ERROR;
|
|
}
|
|
else {
|
|
return cb(null, { appError: hasAppError });
|
|
}
|
|
}
|
|
else {
|
|
if (appl !== null) {
|
|
appl.status = AppStatus.DONE;
|
|
if (hasData) {
|
|
if (utils.isNumber(appData.appRate)) appl.appRate = utils.roundTo(appData.appRate);
|
|
if (utils.isNumber(appData.totalSprayed)) appl.totalSprayed = appData.totalSprayed * 1E-4; // update and convert from square meters to ha
|
|
if (utils.isNumber(appData.totalSprLength)) appl.totalSprLength = appData.totalSprLength; // meters
|
|
if (utils.isNumber(appData.totalTurnTime)) appl.totalTurnTime = appData.totalTurnTime;
|
|
if (utils.isNumber(appData.totalSprayTime)) appl.totalSprayTime = appData.totalSprayTime;
|
|
if (utils.isNumber(appData.totalFlightTime)) appl.totalFlightTime = appData.totalFlightTime;
|
|
if (utils.isNumber(appData.totalSprayMat)) {
|
|
appl.totalSprayMat = appData.totalSprayMat;
|
|
appl.totalSprayMatUnit = appData.totalSprayMatUnit;
|
|
}
|
|
appl.startDateTime = appData.startDateTime.format('YYYYMMDDTHHmmss');
|
|
appl.endDateTime = appData.endDateTime.format('YYYYMMDDTHHmmss');
|
|
}
|
|
}
|
|
}
|
|
if (appl !== null) {
|
|
appl.proStatus = proStatus;
|
|
appl.updateDate = Date.now();
|
|
appl.save()
|
|
.then(() => {
|
|
if (hasData && impMsg.jobId)
|
|
return Job.updateOne({ _id: impMsg.jobId }, { $set: { status: JobStatus.SPRAYED } }); // Update Job Status as Downloaded
|
|
})
|
|
.then(() => cb(null, { appError: hasAppError }))
|
|
.catch(err => {
|
|
debug(err);
|
|
cb(err);
|
|
})
|
|
}
|
|
});
|
|
}
|
|
|
|
function updateJobItems(job, impMsg, filePath, areaTypeAndFiles, uid, cb) {
|
|
let jobItems = [], anyUpdated = false;
|
|
let updateMeta = false; // Determine whether to update new job meta from the first found file
|
|
let userWSettings, meta, dup = 0;
|
|
let usageLimits = { maxAcres: 0, totalSprAcres: 0 };
|
|
let sprZoneColor = 'blue';
|
|
|
|
const createJob = utils.isEmptyObj(job);
|
|
|
|
async.series([
|
|
function (callback) {
|
|
return wasCancelled(impMsg.appId, callback);
|
|
},
|
|
function (callback) {
|
|
if (!uid) return callback();
|
|
|
|
return User.aggregate([
|
|
{ $match: { _id: ObjectId(uid) } },
|
|
{ $project: { kind: 1, parent: 1, membership: 1, lang: { $ifNull: ["$lang", DEFAULT_LANG] } } },
|
|
{
|
|
$lookup:
|
|
{
|
|
from: "settings",
|
|
localField: "_id",
|
|
foreignField: "userId",
|
|
as: "cfg"
|
|
}
|
|
},
|
|
{ $unwind: { path: "$cfg", "preserveNullAndEmptyArrays": true } },
|
|
])
|
|
.exec((err, result) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
if (result && result.length) {
|
|
userWSettings = result[0];
|
|
if (userWSettings.cfg && userWSettings.cfg.colors && userWSettings.cfg.colors["sprayZone"])
|
|
sprZoneColor = userWSettings.cfg.colors.sprayZone;
|
|
}
|
|
else return callback(AppError.create(Errors.USER_NOT_FOUND));
|
|
|
|
callback();
|
|
})
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(impMsg.appId, callback);
|
|
},
|
|
function (callback) {
|
|
// Get membership subscription info (only if it is about job modification) and check for the 1st round
|
|
if (impMsg.updateOp === JobUpdateOp.APPEND || impMsg.updateOp === JobUpdateOp.OVERWRITE) {
|
|
getUsageLimits(userWSettings)
|
|
.then(theUsageLimits => {
|
|
theUsageLimits && (usageLimits = theUsageLimits);
|
|
callback((usageLimits.maxAcres && (usageLimits.totalSprAcres) >= usageLimits.maxAcres) ? Errors.REACHED_AREA_LIMIT : null);
|
|
})
|
|
.catch(err => {
|
|
callback(err);
|
|
})
|
|
} else callback();
|
|
},
|
|
function (callback) {
|
|
if (createJob) { // create new job for the case of not existed
|
|
updateMeta = true;
|
|
let isUs = (userWSettings.lang === 'pt') ? false : true;
|
|
job = new Job({
|
|
client: ObjectId(impMsg.clientId),
|
|
name: utils.normalizeName(impMsg.file.originalName.replace(path.extname(impMsg.file.originalName), '')),
|
|
measureUnit: isUs,
|
|
swathWidth: isUs ? 30 : 10,
|
|
appRate: 10,
|
|
appRateUnit: isUs ? 1 : 3,
|
|
});
|
|
|
|
job.byPuid = userWSettings.kind === UserTypes.APP ? userWSettings._id : userWSettings.parent;
|
|
|
|
new Promise(resolve => resolve({ isPilot: userWSettings.kind === UserTypes.OPERATOR }))
|
|
.then(value => {
|
|
if (value.isPilot)
|
|
return Pilot.findById(ObjectId(uid), '-password', { lean: true });
|
|
else
|
|
return null;
|
|
})
|
|
.then(pilot => {
|
|
if (pilot)
|
|
job.operator = pilot._id;
|
|
// Try reading the first non-empty Qfile for aditional meta-data
|
|
return utils.execAsync(`find -P "${filePath}" -name "q*.t*" -type f -not -empty | head -n 1`);
|
|
})
|
|
.then(qfile => {
|
|
if (qfile) return fileAgNav.readQFile_Async(qfile.replace(/\r?\n|\r/g, ''));
|
|
else return null;
|
|
})
|
|
.then(data => {
|
|
if (data) {
|
|
job.flightNumber = data.flightNumber;
|
|
job.remark = data.remark;
|
|
// job.crop = data.crob; // May be try to lookup w entities first with case-insensitive ?
|
|
job.appRate = utils.toNumber(data.appRate, job.appRate);
|
|
job.appRateUnit = utils.rateStringToCode(data.appRateUnitStr);
|
|
job.measureUnit = (job.appRateUnit > 2) ? false : true;
|
|
}
|
|
callback();
|
|
})
|
|
.catch(err => callback(err));
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(impMsg.appId, callback);
|
|
},
|
|
function (outterCB) {
|
|
const ops = { sprZoneColor: sprZoneColor, isUS: job.measureUnit, appRate: job.appRate, crop: job.crop };
|
|
|
|
async.eachSeries(areaTypeAndFiles, (areaFile, esCB) => {
|
|
// debug("Reading file: ", areaFile);
|
|
async.series([
|
|
function (callback) {
|
|
return wasCancelled(impMsg.appId, callback);
|
|
},
|
|
function (callback) {
|
|
switch (areaFile.type) {
|
|
case FILE.FILE_NO1:
|
|
case FILE.FILE_PRJ:
|
|
fileAgNav.readAGN_PRJ(filePath, areaFile, ops, (err, data) => {
|
|
if (err) {
|
|
debug(err.stack);
|
|
if (areaTypeAndFiles.length === 1)
|
|
return callback(AppError.create(Errors.INVALID_JOB_FILE));
|
|
//else Just ignore to move to the next file but log
|
|
}
|
|
if (data) {
|
|
jobItems.push({ type: FILE.FILE_NO1, items: data });
|
|
if (updateMeta && !meta && data.meta) {
|
|
meta = data.meta;
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
break;
|
|
case FILE.FILE_KMZ:
|
|
case FILE.FILE_KML:
|
|
fileKML.readKmlKmzToGeoItems(filePath, areaFile, ops, (err, data) => {
|
|
if (err) {
|
|
debug(err);
|
|
if (areaTypeAndFiles.length === 1)
|
|
return callback(AppError.create(Errors.INVALID_JOB_FILE));
|
|
//else Just ignore to move to the next file but log
|
|
}
|
|
if (data)
|
|
jobItems.push({ type: FILE.FILE_KML, items: data });
|
|
callback();
|
|
});
|
|
break;
|
|
case FILE.FILE_SHP:
|
|
fileShp.readSHPToGeoItems(filePath, areaFile, ops)
|
|
.then((data) => {
|
|
if (data)
|
|
jobItems.push({ type: FILE.FILE_SHP, items: data });
|
|
callback();
|
|
})
|
|
.catch(err => {
|
|
if (err) {
|
|
if (err instanceof AppError)
|
|
return callback(err);
|
|
if (areaTypeAndFiles.length === 1)
|
|
callback(AppError.create(Errors.INVALID_JOB_FILE));
|
|
}
|
|
else
|
|
callback(); // Ignore error in the case the zip file contains multiple area files
|
|
})
|
|
break;
|
|
case FILE.FILE_SATLOG_JOB:
|
|
fileSatLog.readSatLogJob(filePath, areaFile, ops, (err, data) => {
|
|
if (err) {
|
|
debug(err);
|
|
if (areaTypeAndFiles.length === 1)
|
|
return callback(AppError.create(Errors.INVALID_JOB_FILE));
|
|
//else Just ignore to move to the next file but log
|
|
}
|
|
if (data) {
|
|
jobItems.push({ type: FILE.FILE_SATLOG_JOB, items: data });
|
|
if (updateMeta && !meta && data.meta) {
|
|
meta = data.meta;
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
break;
|
|
default:
|
|
return callback();
|
|
}
|
|
},
|
|
function (callback) {
|
|
if (updateMeta && !meta && areaFile.agn && (areaFile.type === FILE.FILE_SHP || areaFile.type === FILE.FILE_KML || areaFile.type === FILE.FILE_KMZ)) {
|
|
fileAgNav.readAGN_PRJ(filePath, { area: areaFile.agn, type: FILE.FILE_AGN }, ops, (err, data) => {
|
|
if (data && data.meta) {
|
|
meta = data.meta;
|
|
}
|
|
callback();
|
|
});
|
|
}
|
|
else callback();
|
|
}
|
|
], (err) => {
|
|
esCB(err);
|
|
});
|
|
}, (err) => {
|
|
outterCB(err);
|
|
});
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(impMsg.appId, callback);
|
|
},
|
|
function (callback) {
|
|
if (!jobItems.length)
|
|
return callback(impMsg.updateOp === JobUpdateOp.OVERWRITE ? null : AppError.create(Errors.ITEMS_NOT_FOUND));
|
|
|
|
try {
|
|
|
|
if (updateMeta && meta) { // Update the new job with read measureUnit and swath
|
|
job.measureUnit = (meta.measureUnit == 1);
|
|
if (meta.swath > 0) job.swathWidth = meta.swath;
|
|
}
|
|
|
|
// Collect all items from area files here and do the Job update
|
|
let sprayAreas = [], xclAreas = [], waypoints = [], bufs = [], places = [];
|
|
for (let i = 0; i < jobItems.length; i++) {
|
|
const item = jobItems[i];
|
|
if (item.items) {
|
|
if (!utils.isEmptyArray(item.items.sprayAreas))
|
|
sprayAreas = sprayAreas.concat(item.items.sprayAreas);
|
|
|
|
if (!utils.isEmptyArray(item.items.xclAreas))
|
|
xclAreas = xclAreas.concat(item.items.xclAreas);
|
|
|
|
if (!utils.isEmptyArray(item.items.waypoints))
|
|
waypoints = waypoints.concat(item.items.waypoints);
|
|
|
|
if (item.type === FILE.FILE_KML) {
|
|
if (!utils.isEmptyArray(item.items.bufs))
|
|
bufs = bufs.concat(item.items.bufs);
|
|
|
|
if (!utils.isEmptyArray(item.items.places))
|
|
places = places.concat(item.items.places);
|
|
}
|
|
}
|
|
}
|
|
|
|
let oldAreaIds = [], checkRes;
|
|
//Check for the import item modification options to decide how to update the job's items
|
|
sprayAreas = jobUtil.cleanAreas(sprayAreas);
|
|
xclAreas = jobUtil.cleanAreas(xclAreas);
|
|
const shouldCheckLimits = !!(usageLimits.maxAcres);
|
|
|
|
if (impMsg.updateOp === JobUpdateOp.APPEND) {
|
|
const allAreas = utils.appendArray(job.sprayAreas, job.excludedAreas);
|
|
if (!utils.isEmptyArray(sprayAreas)) {
|
|
checkRes = jobUtil.checkDupAreas(allAreas, sprayAreas);
|
|
dup += checkRes.dup;
|
|
job.sprayAreas = utils.appendArray(job.sprayAreas, checkRes.areas);
|
|
}
|
|
if (!utils.isEmptyArray(xclAreas)) {
|
|
checkRes = jobUtil.checkDupAreas(allAreas, xclAreas);
|
|
dup += checkRes.dup;
|
|
job.excludedAreas = utils.appendArray(job.excludedAreas, checkRes.areas);
|
|
}
|
|
|
|
job.waypoints = jobUtil.cleanGeoPoints(utils.appendArray(job.waypoints, waypoints));
|
|
job.bufs = utils.appendArray(job.bufs, bufs);
|
|
job.places = jobUtil.cleanGeoPoints(utils.appendArray(job.places, places));
|
|
|
|
if (shouldCheckLimits && job.ttSprArea) usageLimits.totalSprAcres -= job.ttSprArea * CVCST.SM2ACR;
|
|
}
|
|
else if (impMsg.updateOp === JobUpdateOp.OVERWRITE) {
|
|
if (job.sprayAreas.length)
|
|
oldAreaIds = job.sprayAreas.map(a => a._id);
|
|
|
|
job.sprayAreas = sprayAreas;
|
|
job.excludedAreas = xclAreas;
|
|
job.waypoints = jobUtil.cleanGeoPoints(waypoints);
|
|
job.bufs = bufs;
|
|
job.places = jobUtil.cleanGeoPoints(places);
|
|
|
|
if (shouldCheckLimits && job.ttSprArea) usageLimits.totalSprAcres -= job.ttSprArea * CVCST.SM2ACR;
|
|
}
|
|
else if (impMsg.updateOp === JobUpdateOp.XCLS) {
|
|
let xcl;
|
|
for (let i = 0; i < sprayAreas.length; i++) { // Convert all found sprayAreas to xcls
|
|
xcl = jobUtil.sprayToXCL(sprayAreas[i]);
|
|
if (xcl) xclAreas.push(xcl);
|
|
}
|
|
checkRes = jobUtil.checkDupAreas(job.excludedAreas, xclAreas);
|
|
dup += checkRes.dup;
|
|
job.excludedAreas = jobUtil.cleanAreas(utils.appendArray(job.excludedAreas, checkRes.areas));
|
|
}
|
|
|
|
// Ensure Maximum of Items
|
|
if (!utils.isEmptyArray(job.sprayAreas) && job.sprayAreas.length >= FILE.MAX_ITEM)
|
|
job.sprayAreas = job.sprayAreas.slice(0, FILE.MAX_ITEM);
|
|
|
|
if (!utils.isEmptyArray(job.excludedAreas) && job.excludedAreas.length >= FILE.MAX_ITEM)
|
|
job.excludedAreas = job.excludedAreas.slice(0, FILE.MAX_ITEM);
|
|
|
|
if (!utils.isEmptyArray(job.waypoints) && job.waypoints.length >= FILE.MAX_ITEM)
|
|
job.waypoints = job.waypoints.slice(0, FILE.MAX_ITEM);
|
|
|
|
if (!utils.isEmptyArray(job.bufs) && job.bufs.length >= FILE.MAX_ITEM)
|
|
job.bufs = job.bufs.slice(0, FILE.MAX_ITEM);
|
|
|
|
if (!utils.isEmptyArray(job.places) && job.places.length >= FILE.MAX_ITEM)
|
|
job.places = job.places.slice(0, FILE.MAX_ITEM);
|
|
|
|
const newTTSprSqrMeters = jobUtil.calcTTSprayAreas(job.sprayAreas, job.excludedAreas);
|
|
|
|
if (shouldCheckLimits) {
|
|
if (usageLimits.totalSprAcres + (newTTSprSqrMeters * CVCST.SM2ACR) >= usageLimits.maxAcres)
|
|
AppMembershipError.throw(Errors.REACHED_AREA_LIMIT);
|
|
}
|
|
|
|
job.ttSprArea = newTTSprSqrMeters * CVCST.SM2HA;
|
|
|
|
const updateFn = async (job) => {
|
|
if (undefined === job._id) {
|
|
await dbConn.transaction(async (session) => {
|
|
await job.save({ session });
|
|
await App.updateOne({ _id: ObjectId(impMsg.appId) }, { $set: { jobId: job._id } }).session(session);
|
|
});
|
|
} else {
|
|
await job.save({ validateModifiedOnly: true });
|
|
}
|
|
};
|
|
|
|
updateFn(job)
|
|
.then(() => {
|
|
if (!impMsg.jobId)
|
|
impMsg.jobId = job._id;
|
|
|
|
if (impMsg.updateOp === JobUpdateOp.OVERWRITE)
|
|
return jobUtil.deleteAreaLines(oldAreaIds); // Remove grid lines of previous areas (before sprayAreas were overriten)
|
|
})
|
|
.then(() => {
|
|
anyUpdated = true;
|
|
callback();
|
|
})
|
|
.catch(err => {
|
|
debug(err.stack);
|
|
return callback(err);
|
|
});
|
|
} catch (err) {
|
|
callback(err);
|
|
}
|
|
}
|
|
], function (err) {
|
|
jobItems = null;
|
|
|
|
if (err) {
|
|
debug('Error while updating Job Item', err);
|
|
cb(err);
|
|
} else {
|
|
cb(null, { job: job, updated: anyUpdated, dup: dup });
|
|
}
|
|
});
|
|
}
|
|
|
|
async function getUsageLimits(user) {
|
|
if (!user || (user.kind !== UserTypes.APP) && !user.parent) AppAuthError.throw(Errors.INVALID_ACCOUNT);
|
|
|
|
let memUser = user, usageLimits = { maxAcres: 0, totalSprAcres: 0 };
|
|
|
|
if (UserTypes.APP !== memUser.kind && memUser.parent) {
|
|
memUser = await User.findById(memUser.parent, 'membership', { lean: true });
|
|
}
|
|
if (!memUser) AppAuthError.throw(Errors.APPLICATOR_NOT_FOUND);
|
|
|
|
const pkgSub = subUtil.getPkgSubfromUserInfo(memUser);
|
|
usageLimits.maxAcres = subUtil.getSubMetaField(pkgSub, SubFields.MAX_ACRES) || 0;
|
|
if (usageLimits.maxAcres) {
|
|
usageLimits.totalSprAcres = await subUtil.calcTotalAreaByUser(memUser._id, pkgSub.periodStart, pkgSub.periodEnd) * CVCST.HA2ACR;
|
|
}
|
|
return usageLimits;
|
|
}
|
|
|
|
function importData(dataPath, appId, job, cb) {
|
|
let appData, totalSprays = 0, totalSprLength = 0, avgRates = [], totalTurnTime = 0, totalSprayTime = 0, totalFlightTime = 0, totalSprMats = 0, dataFiles = [], sprMatsUnit;
|
|
const importInfo = [];
|
|
const begin = Date.now(); // DEBUG - Measering total import data time
|
|
|
|
async.series([
|
|
function (callback) {
|
|
return wasCancelled(appId, callback);
|
|
},
|
|
function (callback) {
|
|
glob(path.join(dataPath, "**/{n*[0-9]*\.t[0-9]*,n*[0-9]*?(spr)@(on|off)*\.dbf,*\.asc}"), globOps, (err, files) => {
|
|
if (err || files.length === 0) {
|
|
return callback();
|
|
}
|
|
// Classify found data files into known data format file entries
|
|
let match, file, typedFiles = [];
|
|
for (let i = 0; i < files.length; i++) {
|
|
file = files[i];
|
|
let basename = path.basename(file), agn;
|
|
|
|
// AG-NAV data nt files
|
|
if ((match = basename.match(/^n(\d{7})(-\d+)?.*\.t(\d{1,2})$/i))) {
|
|
agn = match.length >= 3 ? match[1] + match[3] + (match[2] || "") : match[1] + match[2];
|
|
typedFiles.push({ type: FILE.DATA_AGNAV, agn: agn, file: file });
|
|
// ESRI Shape spray on/off files
|
|
} else if ((match = basename.match(/^n(\d{9})(-\d+)?.*spr(?:on|off).*\.dbf$/i))) {
|
|
agn = match[1];
|
|
if (match[2] && match[2].startsWith('-'))
|
|
agn += match[2];
|
|
typedFiles.push({ type: FILE.DATA_SHAPE, agn: agn, file: file });
|
|
// SATLOG exported ascii data files
|
|
} else if ((match = basename.match(/^.*.asc$/i))) {
|
|
const stats = fs.statSync(file);
|
|
const m = stats && stats.mtime ? moment.utc(stats.mtime) : moment.utc();
|
|
// Note: might think about updating this later from the data if needed
|
|
agn = m.format('YYMMDDHHmm').substring(1);
|
|
typedFiles.push({ type: FILE.DATA_SALOG, agn: agn, file: file });
|
|
}
|
|
}
|
|
// fs.writeFileSync('./data.json', JSON.stringify(typedFiles) , 'utf-8');
|
|
|
|
if (typedFiles.length) {
|
|
typedFiles.sort(utils.dynamicSort('agn')); // Sort them in ascending datetime (from file names) order
|
|
|
|
// Pick non-SHAPE data file items
|
|
dataFiles = utils.appendArray(dataFiles, typedFiles.filter(it => it.type !== FILE.DATA_SHAPE).map(it => [it]));
|
|
|
|
// Pick SHAPE spray-on and off pairs or spray-on data file items only
|
|
let i = 0, type2Files = typedFiles.filter(it => it.type === FILE.DATA_SHAPE);
|
|
while (type2Files.length > 0) {
|
|
let item = type2Files[i];
|
|
type2Files.splice(i, 1);
|
|
|
|
let m = item.file.match(/^(.*n(?:\d{9}).*spr)(on|off).*\.dbf$/i);
|
|
item['sprayOn'] = (m[2].toLocaleLowerCase() === 'on');
|
|
let find = item['sprayOn'] ? 'off' : 'on';
|
|
|
|
let j = type2Files.length - 1;
|
|
let foundPair = false;
|
|
while (type2Files.length && j >= 0) {
|
|
let regex = new RegExp(`^${m[1]}${find}.*.dbf$`, 'i');
|
|
if (regex.test(type2Files[j].file) && item.agn === type2Files[j].agn) {
|
|
type2Files[j]['sprayOn'] = !item['sprayOn'];
|
|
dataFiles.push([item, type2Files[j]]);
|
|
type2Files.splice(j, 1);
|
|
foundPair = true;
|
|
break;
|
|
}
|
|
j--;
|
|
}
|
|
if (!foundPair) {
|
|
if (/^.*n\d{9}.*spron.*\.dbf$/i.test(item.file))
|
|
dataFiles.push([item]);
|
|
}
|
|
}
|
|
}
|
|
return callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appId, callback);
|
|
},
|
|
function (callback) {
|
|
if (!dataFiles.length) return callback();
|
|
// debug("Files: ", dataFiles);
|
|
async.eachSeries(dataFiles,
|
|
function (fileItems, callback) {
|
|
async.series([
|
|
function (callback) {
|
|
wasCancelled(appId, callback)
|
|
},
|
|
function (callback) {
|
|
importDataFiles(fileItems, appId, job, (err, data) => {
|
|
// debug("Done read for: ", fileItems);
|
|
if (data) {
|
|
importInfo.push({ agn: fileItems[0].agn, info: data });
|
|
|
|
if (utils.isNumber(data.totalSprayed)) totalSprays += data.totalSprayed;
|
|
if (utils.isNumber(data.totalSprLength)) totalSprLength += data.totalSprLength;
|
|
if (utils.isNumber(data.turnTime)) totalTurnTime += data.turnTime;
|
|
if (utils.isNumber(data.turnTime)) totalSprayTime += data.sprayTime;
|
|
if (utils.isNumber(data.totalTime)) totalFlightTime += data.totalTime;
|
|
|
|
if (data.avgRate)
|
|
avgRates.push(data.avgRate);
|
|
|
|
if (utils.isNumber(data.totalSprayMat)) {
|
|
totalSprMats += data.totalSprayMat;
|
|
sprMatsUnit = data.totalSprayMatUnit; // Asssume that all material unit from files are the same
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
}
|
|
], callback);
|
|
}, err => { // If any of the saves produced an error, err would equal that error
|
|
debug(`All ${dataFiles.length} files Imported.`);
|
|
if (err) callback(err);
|
|
else callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
return wasCancelled(appId, callback);
|
|
},
|
|
function (callback) {
|
|
if (!importInfo.length) return callback();
|
|
|
|
let first = importInfo[0];
|
|
let last = importInfo[importInfo.length - 1];
|
|
// Compute application start and end time
|
|
const startendDate = computeStartEndDate(first.info.firstTime, last.info.lastTime, first.agn, last.agn);
|
|
appData = {
|
|
startDateTime: startendDate.start,
|
|
endDateTime: startendDate.end,
|
|
appRate: avgRates.length ? avgRates.reduce(function (a, b) { return Number(a) + Number(b); }) / avgRates.length : 0,
|
|
totalSprayed: totalSprays,
|
|
totalSprLength: totalSprLength,
|
|
totalSprayTime: totalSprayTime,
|
|
totalTurnTime: totalTurnTime,
|
|
totalFlightTime: totalFlightTime,
|
|
totalSprayMat: totalSprMats,
|
|
totalSprayMatUnit: sprMatsUnit
|
|
}
|
|
|
|
const duration = Date.now() - begin;
|
|
debug('Total (data): %dms', duration);
|
|
callback();
|
|
}
|
|
], (err) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
cb(null, appData);
|
|
});
|
|
}
|
|
|
|
function importDataFiles(fileItems, appId, job, cb) {
|
|
let appFile, firstTime, lastTime, totalSprMats = 0, sprMatsUnit;;
|
|
let importInfo = { firstTime: 0, lastTime: 0, turnTime: 0, sprayTime: 0, totalTime: 0 };
|
|
let fileName = path.basename(fileItems[0].file), fileMeta, hasData = false;
|
|
|
|
const dataType = fileItems[0].type;
|
|
|
|
if (fileItems.length > 1 && dataType === FILE.DATA_SHAPE) {
|
|
fileName = fileName.replace(new RegExp("(on|off)", "i"), "onoff");
|
|
fileItems.sort(utils.dynamicSort('-sprayOn')); // Make spray-on file first in the list
|
|
}
|
|
|
|
async.series([
|
|
function (callback) {
|
|
// Read q file if exists for AGNAV binary data
|
|
const file = fileItems[0];
|
|
if (file.type === FILE.DATA_AGNAV || file.type === FILE.DATA_SHAPE) {
|
|
let qfilePath = path.join(path.dirname(file.file), 'q' + (file.type === FILE.DATA_AGNAV ? path.basename(file.file).slice(1) : file.agn));
|
|
fileAgNav.readQFile(qfilePath, (err, meta) => {
|
|
fileMeta = meta;
|
|
callback();
|
|
});
|
|
} else {
|
|
callback();
|
|
}
|
|
},
|
|
function (callback) {
|
|
const fileObj = { appId: appId, name: fileName, agn: fileItems[0].agn };
|
|
if (utils.isEmptyObj(fileMeta)) {
|
|
fileObj.note = "NO_QFILE";
|
|
fileMeta = { fcType: 'none', appRate: job.appRate, rateUnit: job.appRateUnit, hasQfile: false };
|
|
} else {
|
|
fileMeta.hasQfile = true;
|
|
}
|
|
fileObj.meta = fileMeta;
|
|
|
|
appFile = new AppFile(fileObj);
|
|
|
|
appFile.save(err => {
|
|
if (err) return callback(err);
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
async.eachSeries(fileItems,
|
|
function (dataFile, callback) {
|
|
switch (dataFile.type) {
|
|
case FILE.DATA_AGNAV:
|
|
readNTFile(dataFile.file, fileMeta, appFile._id, (err, data) => {
|
|
if (err) return callback(err);
|
|
importInfo = data;
|
|
if (importInfo && !utils.isEmptyArray(importInfo.records)) {
|
|
firstTime = importInfo.records[0].gpsTime;
|
|
lastTime = importInfo.records[importInfo.records.length - 1].gpsTime;
|
|
if (utils.isNumber(data.totalSprayMat)) {
|
|
totalSprMats += data.totalSprayMat;
|
|
sprMatsUnit = data.totalSprayMatUnit;
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
break;
|
|
case FILE.DATA_SHAPE:
|
|
readShapeDataFile(dataFile, fileMeta, appFile._id, (err, data) => {
|
|
if (err) return callback(err);
|
|
if (data) {
|
|
if (dataFile['sprayOn']) {
|
|
importInfo.totalSprayed = data.totalSprayed;
|
|
importInfo.avgRate = data.avgRate;
|
|
if (utils.isNumber(data.totalSprayMat)) {
|
|
totalSprMats += data.totalSprayMat;
|
|
sprMatsUnit = data.totalSprayMatUnit;
|
|
}
|
|
}
|
|
if (!utils.isEmptyArray(data.records)) {
|
|
if (utils.isEmptyArray(importInfo.records)) {
|
|
firstTime = data.records[0].gpsTime;
|
|
lastTime = data.records[data.records.length - 1].gpsTime;
|
|
} else {
|
|
if (data.records[0].gpsTime < firstTime)
|
|
firstTime = data.records[0].gpsTime;
|
|
if (data.records[data.records.length - 1].gpsTime > lastTime)
|
|
lastTime = data.records[data.records.length - 1].gpsTime;
|
|
}
|
|
}
|
|
importInfo.records = utils.appendArray(importInfo.records, data.records);
|
|
}
|
|
// else if (!dataFile['sprayOn']) {
|
|
// // Skip the spray-off if no sprayon file
|
|
// return callback(new Error(Errors.DATA_NO_SPRAYON));
|
|
// }
|
|
callback();
|
|
});
|
|
break;
|
|
case FILE.DATA_SALOG:
|
|
readSatLogAsc(dataFile.file, appFile._id, (err, data) => {
|
|
if (err) return callback(err);
|
|
importInfo = data;
|
|
if (importInfo && !utils.isEmptyArray(importInfo.records)) {
|
|
firstTime = importInfo.records[0].gpsTime;
|
|
lastTime = importInfo.records[importInfo.records.length - 1].gpsTime;
|
|
|
|
// Update agn from the Date and time from the 1st item
|
|
if (importInfo.records[0]['Date']) {
|
|
let fileDT = moment(`${new Date().getFullYear().toString().substring(0, 2)}${importInfo.records[0]['Date']} ${importInfo.records[0]['Time']}`, 'YYYYMMDD HH:mm:ss');
|
|
if (fileDT.isValid()) {
|
|
fileItems[0].agn = fileDT.format('YYMMDDHHmm').substring(1);
|
|
}
|
|
}
|
|
}
|
|
callback();
|
|
});
|
|
break;
|
|
default:
|
|
return callback();
|
|
}
|
|
}, err => {
|
|
// Ensure skipping the loop normally incase of no spray-on data
|
|
if (err && err.message == Errors.DATA_NO_SPRAYON)
|
|
callback();
|
|
else
|
|
callback(err);
|
|
}) // eachSeries
|
|
},
|
|
// Empty file ? => remove the created appFile entry
|
|
function (callback) {
|
|
hasData = (importInfo && !utils.isEmptyArray(importInfo.records));
|
|
if (appFile && !hasData) {
|
|
appFile.remove(callback);
|
|
}
|
|
else callback();
|
|
},
|
|
// Save records of the file to db
|
|
function (callback) {
|
|
if (!hasData) return callback();
|
|
|
|
// Sort spray-on and off data by gspTime (provided data in the two files has been already compensated a day after passing midnight)
|
|
if (fileItems.length > 1 && dataType === FILE.DATA_SHAPE)
|
|
importInfo.records.sort(utils.dynamicSort('gpsTime', true));
|
|
|
|
const chunks = utils.chunkArray(importInfo.records, 1000);
|
|
async.eachSeries(chunks, (chunk, cb) => { // Use series to ensure correct insert order of the records
|
|
AppDetail.insertMany(chunk, { rawResult: true, ordered: true }, (err) => {
|
|
if (err) return cb(err);
|
|
cb();
|
|
});
|
|
}, (err) => {
|
|
// err & console.log(err);
|
|
callback(err);
|
|
});
|
|
},
|
|
function (callback) {
|
|
if (!hasData) return callback();
|
|
|
|
/* Calculate turn time for each file or each pair of shape on/off files
|
|
Turn Time: total turn times after each spray line. Counting start from spray OFF the previous line to spray ON on the next line
|
|
(skip the segments within the same line)
|
|
*/
|
|
let turnTime = { line: null, at: null, nextOff: false, total: 0 }, timeDif = 0, totalSprTime = 0, totalTime = 0;
|
|
let prevTime = -999, prevSprTime = -999;
|
|
let record;
|
|
for (let i = 0; i < importInfo.records.length; i++) {
|
|
record = importInfo.records[i];
|
|
|
|
// Calculate total flight time
|
|
if (prevTime != -999 && prevTime !== record.gpsTime) {
|
|
timeDif = record.gpsTime - prevTime;
|
|
if (timeDif < 0) {
|
|
if (Math.abs(timeDif) >= 80000)
|
|
timeDif = (86400 - prevTime) + record.gpsTime;
|
|
}
|
|
if (timeDif > 0 && timeDif <= 120)
|
|
totalTime += timeDif;
|
|
}
|
|
prevTime = record.gpsTime;
|
|
|
|
// Calculate spray time (secs)
|
|
if (record.sprayStat > 0) {
|
|
if (prevSprTime != -999 && record.sprayStat !== 3) {
|
|
timeDif = record.gpsTime - prevSprTime;
|
|
if (timeDif < 0) {
|
|
if (Math.abs(timeDif) >= 80000)
|
|
timeDif = (86400 - prevSprTime) + record.gpsTime;
|
|
}
|
|
if (timeDif > 0 && timeDif <= 120)
|
|
totalSprTime += timeDif;
|
|
}
|
|
prevSprTime = record.gpsTime;
|
|
}
|
|
|
|
if (fileItems[0].type !== FILE.DATA_SALOG) {
|
|
// Calculate turn time (secs)
|
|
if (null === turnTime.line) {
|
|
if (!record.sprayStat) {
|
|
turnTime.line = record.llnum;
|
|
turnTime.at = record.gpsTime;
|
|
}
|
|
} else {
|
|
if (turnTime.line != record.llnum) {
|
|
if (record.sprayStat) {
|
|
timeDif = record.gpsTime - turnTime.at;
|
|
if (timeDif < 0) {
|
|
if (Math.abs(timeDif) >= 80000)
|
|
timeDif = (86400 - turnTime.at) + record.gpsTime;
|
|
}
|
|
if (timeDif >= 5 && timeDif <= 120)
|
|
turnTime.total += timeDif;
|
|
|
|
turnTime.line = record.llnum;
|
|
turnTime.nextOff = true;
|
|
}
|
|
}
|
|
else {
|
|
if (!record.sprayStat && turnTime.nextOff) { // Mark start for the next turn
|
|
turnTime.at = record.gpsTime;
|
|
turnTime.nextOff = false;
|
|
} else if (record.sprayStat) {
|
|
turnTime.nextOff = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
importInfo.turnTime = turnTime.total;
|
|
importInfo.sprayTime = totalSprTime;
|
|
importInfo.totalTime = totalTime;
|
|
|
|
callback();
|
|
},
|
|
function (callback) {
|
|
if (hasData && appFile) {
|
|
if (utils.isNumber(importInfo.totalSprayed)) appFile.totalSprayed = importInfo.totalSprayed * 1E-4; // m2 to ha
|
|
if (utils.isNumber(importInfo.turnTime)) appFile.totalTurnTime = importInfo.turnTime;
|
|
if (utils.isNumber(importInfo.sprayTime)) appFile.totalSprayTime = importInfo.sprayTime;
|
|
if (utils.isNumber(importInfo.totalTime)) appFile.totalFlightTime = importInfo.totalTime;
|
|
if (utils.isNumber(importInfo.totalSprLength)) appFile.totalSprLength = importInfo.totalSprLength;
|
|
if (totalSprMats) {
|
|
appFile.totalSprayMat = totalSprMats;
|
|
appFile.totalSprayMatUnit = sprMatsUnit;
|
|
}
|
|
appFile.save(callback);
|
|
} else callback();
|
|
}
|
|
], (err) => {
|
|
if (err) return cb(err);
|
|
|
|
if (importInfo && !utils.isEmptyArray(importInfo.records)) {
|
|
importInfo.firstTime = firstTime;
|
|
importInfo.lastTime = lastTime;
|
|
|
|
importInfo.totalSprayMat = totalSprMats;
|
|
importInfo.totalSprayMatUnit = sprMatsUnit;
|
|
|
|
delete importInfo.records;
|
|
gc();
|
|
return cb(null, importInfo);
|
|
}
|
|
else
|
|
return cb();
|
|
});
|
|
}
|
|
|
|
function readNTFile(file, fileMeta, fileId, cb) {
|
|
let binBuf, records = [];
|
|
let sprayedSeg = 0, totalSprays = 0, totalSprMats = 0, sprMatsUnit, totalAppRates = 0, totalSprayRecs = 0;
|
|
let prevUTM_X, prevUTM_Y, prevSwath, prevLine, prevStat = 0;
|
|
|
|
async.series([
|
|
function (callback) {
|
|
fs.readFile(file, (err, data) => {
|
|
if (err) return callback(err);
|
|
binBuf = data;
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
if (!binBuf || !Buffer.isBuffer(binBuf)) return callback();
|
|
|
|
let latlon, startIdx = 0, timeOffset = 0, appliedRate;
|
|
const rateInfo = utils.rateInfoFromFileMeta(fileMeta, RecTypes.AGN_BIN_LQD), recType = rateInfo.recType;
|
|
|
|
while (binBuf.length - startIdx >= FILE.AGN_PACK_SIZE) {
|
|
if (DataUtil.isValidAgn(binBuf, startIdx)) {
|
|
const record = DataUtil.readAgnBinary(binBuf, startIdx, recType);
|
|
if (!record) continue;
|
|
|
|
if (record.type === RecTypes.AGN_AMS) {
|
|
if (records.length) {
|
|
// Copy AMS record fields to the last main record
|
|
records[records.length - 1] = DataUtil.mergeAgnAms(records[records.length - 1], record);
|
|
}
|
|
} else {
|
|
if (record.timeAdv > 0.0) {
|
|
// Shift the lat lon coordinate according to time Advance (system lag)
|
|
latlon = geoUtil.projectLatLong(record.lat, record.lon, record.grSpeed, record.head, record.timeAdv);
|
|
record.lat = latlon.lat; record.lon = latlon.lon;
|
|
}
|
|
record["fileId"] = fileId;
|
|
records.push(record);
|
|
|
|
// For compensation with a day when gpsTime passing mid-night (rolloff)
|
|
if (records.length > 1 && timeOffset === 0
|
|
&& ((records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) < 0
|
|
&& Math.abs(records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) >= 80000))
|
|
timeOffset = 86400;
|
|
record.adjustGpsTime(timeOffset);
|
|
|
|
if (record.sprayStat > 0) {
|
|
({ appliedRate, sprMatsUnit } = getAppliedRate(record, rateInfo, rateInfo.recType === RecTypes.AGN_BIN_LQD));
|
|
|
|
if (record.lhaReq > 0) {
|
|
totalSprayRecs++;
|
|
totalAppRates += record.lhaReq;
|
|
}
|
|
if (record.sprayStat === 3) {
|
|
prevUTM_X = record.utmX;
|
|
prevUTM_Y = record.utmY;
|
|
prevSwath = record.swath;
|
|
prevLine = record.llnum;
|
|
prevStat = record.sprayStat;
|
|
}
|
|
else {
|
|
if (prevStat > 0 && prevLine === record.llnum) {
|
|
sprayedSeg = Math.hypot(record.utmX - prevUTM_X, record.utmY - prevUTM_Y) * prevSwath;
|
|
if (sprayedSeg) {
|
|
totalSprays += sprayedSeg;
|
|
if (appliedRate > 0) {
|
|
totalSprMats += (sprayedSeg * CVCST.SM2HA) * appliedRate;
|
|
}
|
|
}
|
|
}
|
|
prevUTM_X = record.utmX;
|
|
prevUTM_Y = record.utmY;
|
|
prevSwath = record.swath;
|
|
prevLine = record.llnum;
|
|
prevStat = record.sprayStat;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
startIdx += FILE.AGN_PACK_SIZE;
|
|
}
|
|
binBuf = null;
|
|
callback();
|
|
}
|
|
], err => {
|
|
if (err) return cb(err);
|
|
|
|
const fileDataInfo = {
|
|
records: records,
|
|
totalSprayMat: totalSprMats,
|
|
totalSprayMatUnit: sprMatsUnit,
|
|
totalSprayed: totalSprays,
|
|
avgRate: totalSprayRecs > 0 ? totalAppRates / totalSprayRecs : 0
|
|
}
|
|
// gc();
|
|
cb(null, fileDataInfo);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Determine the applied rate based on metadata from file or from recorded data record
|
|
* @param {*} record
|
|
* @param {*} rateInfo
|
|
* @param {*} isLiquid
|
|
* @returns { appliedRate, sprMatsUnit } in Metric: L/Ha or Kg/Ha
|
|
*/
|
|
function getAppliedRate(record, rateInfo, isLiquid) {
|
|
let appliedRate, sprMatsUnit;
|
|
|
|
if (rateInfo && rateInfo.appRate && (!rateInfo.useFC || !record.lminApp)) {
|
|
appliedRate = rateInfo.appRate;
|
|
if (rateInfo.rateUnit === 0) {
|
|
appliedRate = appliedRate * CVCST.OZPA2LPHA;
|
|
sprMatsUnit = 3; // L/Ha
|
|
} else {
|
|
sprMatsUnit = rateInfo.rateUnit;
|
|
// Convert to metric rate to store to db later
|
|
const metricRate = utils.toMetricRate(appliedRate, sprMatsUnit);
|
|
appliedRate = metricRate.value;
|
|
sprMatsUnit = metricRate.unit;
|
|
}
|
|
} else {
|
|
if (isLiquid) {
|
|
appliedRate = utils.appRateFromFlowRate(record.lminApp, record.swath, record.grSpeed);
|
|
sprMatsUnit = 3; // L/Ha
|
|
} else {
|
|
appliedRate = record.lminApp;
|
|
sprMatsUnit = 4; // Kg/Ha
|
|
}
|
|
}
|
|
return { appliedRate, sprMatsUnit };
|
|
}
|
|
|
|
function readShapeDataFile(dataFile, fileMeta, fileId, cb) {
|
|
let sprayedSeg = 0, totalAppRates = 0, totalSprMats = 0, sprMatsUnit, totalSprays = 0, totalSprayRecs = 0;
|
|
let prevUTM_X, prevUTM_Y, prevSwath, prevLine;
|
|
let records = [], latlon;
|
|
|
|
fileShp.readDBF4Items(dataFile.file, ["GPSTIME", "LATITUDE", "LONGITUDE", "GRNDSPEED"], (err, items) => {
|
|
if (err) {
|
|
debug(`Error in readShapeDataFile(): ${dataFile}`, err)
|
|
cb(err);
|
|
return;
|
|
}
|
|
let timeOffset = 0, appliedRate;
|
|
const rateInfo = utils.rateInfoFromFileMeta(fileMeta, RecTypes.AGN_SHP), recType = rateInfo.recType;
|
|
|
|
for (let i = 0; i < items.length; i++) {
|
|
const record = DataUtil.readShpRecord(items[i]);
|
|
if (!record) continue;
|
|
|
|
if (record.timeAdv > 0.0) {
|
|
// Shift the lat lon coordinate according to time Advance (system lag)
|
|
latlon = geoUtil.projectLatLong(record.lat, record.lon, record.grSpeed, record.head, record.timeAdv);
|
|
record.lat = latlon.lat; record.lon = latlon.lon;
|
|
}
|
|
record["fileId"] = fileId;
|
|
records.push(record);
|
|
|
|
// For compensation with a day when gpsTime passing mid-night (rolloff)
|
|
if (i > 0 && records.length > 1 && timeOffset === 0
|
|
&& ((records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) < 0
|
|
&& Math.abs(records[records.length - 1].gpsTime - records[records.length - 2].gpsTime) >= 80000))
|
|
record.adjustGpsTime(timeOffset);
|
|
|
|
// FOR DEDUG
|
|
// fs.appendFileSync(`./${fileName}.csv`, `${appDetail.gpsTime}, ${appDetail.sprayStat}, ${appDetail.llnum}, ${appDetail.lat}, ${appDetail.lon}` + endOfLine);
|
|
if (dataFile['sprayOn']) {
|
|
if (record.lhaReq > 0) {
|
|
totalSprayRecs++;
|
|
totalAppRates += record.lhaReq;
|
|
}
|
|
if (record.sprayStat === 3) {
|
|
prevUTM_X = record.utmX;
|
|
prevUTM_Y = record.utmY;
|
|
prevSwath = record.swath;
|
|
prevLine = record.llnum;
|
|
}
|
|
else {
|
|
({ appliedRate, sprMatsUnit } = getAppliedRate(record, rateInfo, rateInfo.recType === RecTypes.AGN_SHP));
|
|
|
|
if (prevLine === record.llnum) {
|
|
sprayedSeg = Math.hypot(record.utmX - prevUTM_X, record.utmY - prevUTM_Y) * prevSwath;
|
|
if (sprayedSeg) {
|
|
totalSprays += sprayedSeg;
|
|
if (appliedRate > 0) {
|
|
totalSprMats += (sprayedSeg * CVCST.SM2HA) * appliedRate;
|
|
}
|
|
}
|
|
}
|
|
prevUTM_X = record.utmX;
|
|
prevUTM_Y = record.utmY;
|
|
prevSwath = record.swath;
|
|
prevLine = record.llnum;
|
|
}
|
|
}
|
|
}
|
|
|
|
let fileDataInfo = null;
|
|
if (records.length) {
|
|
fileDataInfo = {
|
|
records: records,
|
|
totalSprayMat: totalSprMats,
|
|
totalSprayMatUnit: sprMatsUnit,
|
|
totalSprayed: totalSprays,
|
|
avgRate: totalSprayRecs > 0 ? totalAppRates / totalSprayRecs : 0
|
|
}
|
|
}
|
|
items = null;
|
|
return cb(null, fileDataInfo);
|
|
});
|
|
}
|
|
|
|
function readSatLogAsc(dataFile, fileId, cb) {
|
|
/*
|
|
const hdrs = ['Time', undefined, undefined, 'Alt', undefined, undefined, undefined, undefined, undefined, 'Date', undefined, 'DOP', undefined, undefined,
|
|
undefined, 'Hdg', 'Lat', 'Lon', 'RHumi', 'Speed', 'Spray', undefined, 'SU', undefined, 'Temperature', undefined, undefined, undefined, undefined, 'X-Track'];
|
|
*/
|
|
let records = [], totalSprLength = 0, currStat = -999, prevStat = -999, curLonLat = turf.point([0, 0]), prevLonLat = turf.point([0, 0]), segLength = 0;
|
|
fs.createReadStream(dataFile)
|
|
.pipe(csv.parse({ headers: true, ignoreEmpty: true }))
|
|
.on('error', err => {
|
|
if (cb) cb(err);
|
|
})
|
|
.on('data', rowRec => {
|
|
const record = new WorkRecord();
|
|
record.readSLAscRecord(rowRec);
|
|
if (record.gpsTime && /(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d)/.test(record.gpsTime)) {
|
|
record.decodeSL();
|
|
record["fileId"] = fileId;
|
|
records.push(record);
|
|
|
|
curLonLat.geometry.coordinates = [record.lon, record.lat];
|
|
currStat = record.sprayStat;
|
|
if (prevStat != -999) {
|
|
if (prevStat > 0 && currStat > 0 || record.sprayStat != prevStat) {
|
|
segLength = turf.distance(prevLonLat, curLonLat, { units: "meters" });
|
|
if (segLength <= 1000)
|
|
totalSprLength += segLength;
|
|
}
|
|
}
|
|
|
|
prevStat = record.sprayStat;
|
|
prevLonLat.geometry.coordinates = [curLonLat.lon, curLonLat.lat];
|
|
}
|
|
})
|
|
.on('end', rowCount => {
|
|
curLonLat = prevLonLat = null;
|
|
|
|
const fileDataInfo = {
|
|
records: records,
|
|
totalSprayed: 0,
|
|
totalSprLength: totalSprLength,
|
|
avgRate: 0
|
|
}
|
|
if (cb) cb(null, fileDataInfo);
|
|
});
|
|
}
|
|
|
|
function closeOnErr(err) {
|
|
if (!err) return false;
|
|
try {
|
|
debug("[AMQP] error:", err);
|
|
amqpConn.close();
|
|
} catch (error) {
|
|
}
|
|
return true;
|
|
}
|
|
|
|
function gc() {
|
|
if (global.gc) global.gc();
|
|
}
|