205 lines
7.0 KiB
JavaScript
205 lines
7.0 KiB
JavaScript
'use strict';
|
|
|
|
const cron = require('node-cron'),
|
|
debug = require('debug')('agm:delete_worker'),
|
|
env = require('../helpers/env.js'),
|
|
isProd = env.PRODUCTION,
|
|
dbConn = require('../helpers/db/connect.js')(),
|
|
// dbConn = require('../helpers/db/connect-remote.js')(),
|
|
models = require('../model'),
|
|
{ JobStatus } = require('../helpers/job_constants.js'),
|
|
jobUtil = require('../helpers/job_util'),
|
|
utils = require('../helpers/utils.js'),
|
|
moment = require('moment');
|
|
|
|
const app = {};
|
|
|
|
process
|
|
.on('uncaughtException', function (err) {
|
|
debug(err);
|
|
process.exit(1);
|
|
})
|
|
.on('unhandledRejection', (reason, p) => {
|
|
debug(reason, 'Unhandled Rejection at Promise', p);
|
|
});
|
|
|
|
async function loadLibs() {
|
|
const mod = await import('@mickeyjohn/geodesy/utm.js');
|
|
app.locals = app.locals || {};
|
|
app.locals.UTM = mod.default;
|
|
app.locals.LatLonUTM = mod.LatLon; // More accuracy, for converting from ll to utm
|
|
app.locals.Dms = mod.Dms;
|
|
}
|
|
|
|
/*
|
|
* * * * * *
|
|
| | | | | |
|
|
| | | | | day of week
|
|
| | | | month
|
|
| | | day of month
|
|
| | hour
|
|
| minute
|
|
second ( optional )
|
|
|
|
field value
|
|
second 0-59
|
|
minute 0-59
|
|
hour 0-23
|
|
day of month 1-31
|
|
month 1-12 (or names)
|
|
day of week 0-7 (or names, 0 or 7 are Sunday)
|
|
*/
|
|
const cleanMarkedDeleteData = {
|
|
schedule: isProd ? '*/5 * * * *' : `*/1 * * * 1-5`,
|
|
status: 0
|
|
};
|
|
|
|
const cleanMarkDeletedTask = cron.schedule(cleanMarkedDeleteData.schedule, async () => {
|
|
// Check and only proceed when is idle and the db connection is connected
|
|
if (!dbConn || dbConn.readyState !== 1 || cleanMarkedDeleteData.status)
|
|
return;
|
|
// debug("Start cleanMarkedDeleteData Task at %s ...", moment.utc().toISOString());
|
|
cleanMarkedDeleteData.status = 1;
|
|
|
|
try {
|
|
await cleanMarkedDeleteOnes(models.App, "Application");
|
|
await cleanMarkedDeleteOnes(models.Client, "Client", 5);
|
|
await cleanMarkedDeleteOnes(models.Job, "Job", 20);
|
|
await cleanMarkedDeleteOnes(models.Customer, "Customer", 2);
|
|
await cleanMarkedDeleteOnes(models.Vehicle, "Vehicle", 10);
|
|
await cleanMarkedDeleteOnes(models.Pilot, "Pilot");
|
|
} catch (error) {
|
|
debug(error);
|
|
} finally {
|
|
// debug("cleanMarkedDeleteData is Done.", moment.utc().toISOString());
|
|
cleanMarkedDeleteData.status = 0;
|
|
}
|
|
},
|
|
{
|
|
scheduled: true,
|
|
timezone: "Etc/UTC",
|
|
name: "cleanMarkedDeleteData"
|
|
});
|
|
|
|
async function cleanMarkedDeleteOnes(model, name, limit = null) {
|
|
if (!model) return;
|
|
|
|
const delOnes = await model.find({ markedDelete: true }).limit(limit);
|
|
if (delOnes.length) {
|
|
debug(`Deleting ${delOnes.length} ${name} ...`);
|
|
for (let i = 0; i < delOnes.length; i++) {
|
|
await delOnes[i].removeFull(null, false);
|
|
}
|
|
debug(`Deleted ${delOnes.length} ${name}.`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cleaning old data and archiving old jobs
|
|
*/
|
|
const cleanOldData = {
|
|
schedule: isProd ? `00 6 * * *` : `0 */3 * * 0,6`,
|
|
status: 0,
|
|
name: "cleanOldData-archivingJobs"
|
|
};
|
|
const cleanOldDataTask = cron.schedule(cleanOldData.schedule, async () => {
|
|
// Check and only proceed when is idle and the db connection is connected
|
|
if (!dbConn || dbConn.readyState !== 1 || cleanOldData.status)
|
|
return;
|
|
|
|
let numOfJobs = 0;
|
|
debug(`Start ${cleanOldData.name} Task at %s ...`, moment.utc().toISOString());
|
|
cleanOldData.status = 1;
|
|
if (!app.jobCtl) {
|
|
if (!app.locals) await loadLibs();
|
|
app.jobCtl = require('../controllers/job.js')(app.locals);
|
|
}
|
|
|
|
try {
|
|
// Filter for eligible jobs to be archived, skip the one from agnav.com's user for now.
|
|
const excludedPIds = (await models.User.find({ username: /agnav.com/i, kind: '1' }, '_id', { lean: true })).map(u => u._id);
|
|
const fromDate = moment.utc().subtract(env.ARCHIVE_JOBS_DAYS, 'days').toDate();
|
|
const pipeline = [
|
|
{ $match: { markedDelete: { $ne: true }, byPuid: { $nin: excludedPIds }, status: { $ne: JobStatus.ARCHIVED } } },
|
|
{
|
|
$lookup: {
|
|
from: 'applications',
|
|
localField: '_id',
|
|
foreignField: 'jobId',
|
|
as: 'jobs_apps'
|
|
}
|
|
},
|
|
{ $unwind: { path: '$jobs_apps', preserveNullAndEmptyArrays: true } },
|
|
{
|
|
$group: {
|
|
'_id': '$_id',
|
|
'totalSprayed': { $sum: '$jobs_apps.totalSprayed' },
|
|
'updateDate': { $max: '$jobs_apps.updateDate' }
|
|
}
|
|
},
|
|
{ $match: { updateDate: { $lt: fromDate }, totalSprayed: { $gt: 0 } } },
|
|
// { $group: { _id: null, count: { $sum: 1 } } } // Count total number of records
|
|
{ $match: { totalSprayed: { $gt: 0 } } },
|
|
{
|
|
$project: {
|
|
'_id': 1
|
|
}
|
|
}
|
|
];
|
|
//TODO: Use a transaction for atomic updates if possible. However, it would not work for jobs with large amount of data because of the 1 minute max transaction timeout
|
|
const jobIds = (await models.Job.aggregate(pipeline).limit(100)).map(j => j._id);
|
|
if (jobIds.length) {
|
|
debug(`Start archiving ${jobIds.length} jobs`);
|
|
|
|
let job, weatherInfo, updateObj;
|
|
for (let i = 0; i < jobIds.length; i++) {
|
|
const jobId = jobIds[i]; updateObj = { status: JobStatus.ARCHIVED };
|
|
|
|
const jobAppData = await app.jobCtl.getAppDataByJobId(jobId, '-_id lat lon sprayStat llnum gpsTime satsIn stdHdop', { wApps: true, dataOp: 1, wFileId: true, withJob: true });
|
|
if (jobAppData && !utils.isEmptyArray(jobAppData.data)) {
|
|
for await (const jad of jobAppData.data) {
|
|
if (jad.data && jad.data.length)
|
|
await models.AppFile.findOneAndUpdate({ _id: jad.id }, { data: jad.data }, { new: true, lean: true });
|
|
// Remove all application details or recorded points of files stored within the DB collection.
|
|
await models.AppDetail.deleteMany({ fileId: jad.id });
|
|
}
|
|
|
|
job = jobAppData.job;
|
|
// Update the job weatherInfo with the aggregated wi from data if this info does not exist with the job
|
|
if (utils.isEmptyObj(job.weatherInfo)) {
|
|
const wi = await jobUtil.getDataWeatherInfo(jobAppData.fileIds);
|
|
if (!utils.isEmptyArray(wi) && !utils.isEmptyObj(wi[0])) {
|
|
weatherInfo = ({
|
|
windSpd: utils.mpSecToKnot(wi[0]['avgWindSpd']).toFixed(1),
|
|
windDir: utils.deg2Compass(wi[0]['avgWindDir']),
|
|
temp: utils.inCorF(wi[0]['avgTemp'], job.measureUnit, false),
|
|
humid: utils.truncR(wi[0]['avgHumid'], 0)
|
|
});
|
|
}
|
|
}
|
|
// Update the job status (and wi if any)
|
|
if (weatherInfo) updateObj = { ...updateObj, ...{ weatherInfo: weatherInfo } };
|
|
}
|
|
|
|
await models.Job.updateOne({ _id: jobId }, updateObj);
|
|
numOfJobs++;
|
|
job = null, weatherInfo = null; updateObj = null;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
debug(error);
|
|
} finally {
|
|
let msg = `Done ${cleanOldData.name} at ${moment.utc().toISOString()}.`;
|
|
if (numOfJobs) msg += ` ${numOfJobs} jobs were archived.`;
|
|
debug(msg);
|
|
|
|
cleanOldData.status = 0;
|
|
}
|
|
},
|
|
{
|
|
scheduled: true,
|
|
timezone: "Etc/UTC",
|
|
name: cleanOldData.name,
|
|
runOnInit: true
|
|
});
|