agmission/Development/server/workers/cleanup_worker.js

210 lines
7.1 KiB
JavaScript

'use strict';
const cron = require('node-cron'),
debug = require('debug')('agm:delete_worker'),
env = require('../helpers/env.js'),
isProd = env.PRODUCTION,
{ DBConnection } = require('../helpers/db/connect'),
models = require('../model'),
{ JobStatus } = require('../helpers/job_constants.js'),
jobUtil = require('../helpers/job_util'),
utils = require('../helpers/utils.js'),
moment = require('moment');
const app = {};
// Initialize database connection
const workerDB = new DBConnection('Cleanup Worker');
process
.on('uncaughtException', function (err) {
debug(err);
process.exit(1);
})
.on('unhandledRejection', (reason, p) => {
debug(reason, 'Unhandled Rejection at Promise', p);
});
// Initialize the database connection
workerDB.initialize({ setupExitHandlers: false });
async function loadLibs() {
const mod = await import('@mickeyjohn/geodesy/utm.js');
app.locals = app.locals || {};
app.locals.UTM = mod.default;
app.locals.LatLonUTM = mod.LatLon; // More accuracy, for converting from ll to utm
app.locals.Dms = mod.Dms;
}
/*
* * * * * *
| | | | | |
| | | | | day of week
| | | | month
| | | day of month
| | hour
| minute
second ( optional )
field value
second 0-59
minute 0-59
hour 0-23
day of month 1-31
month 1-12 (or names)
day of week 0-7 (or names, 0 or 7 are Sunday)
*/
const cleanMarkedDeleteData = {
schedule: isProd ? '*/5 * * * *' : `*/1 * * * 1-5`,
status: 0
};
const cleanMarkDeletedTask = cron.schedule(cleanMarkedDeleteData.schedule, async () => {
// Check and only proceed when is idle and the db connection is connected
if (!workerDB.isReady() || cleanMarkedDeleteData.status)
return;
// debug("Start cleanMarkedDeleteData Task at %s ...", moment.utc().toISOString());
cleanMarkedDeleteData.status = 1;
try {
await cleanMarkedDeleteOnes(models.App, "Application");
await cleanMarkedDeleteOnes(models.Client, "Client", 5);
await cleanMarkedDeleteOnes(models.Job, "Job", 20);
await cleanMarkedDeleteOnes(models.Customer, "Customer", 2);
await cleanMarkedDeleteOnes(models.Vehicle, "Vehicle", 10);
await cleanMarkedDeleteOnes(models.Pilot, "Pilot");
} catch (error) {
debug(error);
} finally {
// debug("cleanMarkedDeleteData is Done.", moment.utc().toISOString());
cleanMarkedDeleteData.status = 0;
}
},
{
scheduled: true,
timezone: "Etc/UTC",
name: "cleanMarkedDeleteData"
});
async function cleanMarkedDeleteOnes(model, name, limit = null) {
if (!model) return;
const delOnes = await model.find({ markedDelete: true }).limit(limit);
if (delOnes.length) {
debug(`Deleting ${delOnes.length} ${name} ...`);
for (let i = 0; i < delOnes.length; i++) {
await delOnes[i].removeFull(null, false);
}
debug(`Deleted ${delOnes.length} ${name}.`);
}
}
/**
* Cleaning old data and archiving old jobs
*/
const cleanOldData = {
schedule: isProd ? `00 6 * * *` : `0 */3 * * 0,6`,
status: 0,
name: "cleanOldData-archivingJobs"
};
const cleanOldDataTask = cron.schedule(cleanOldData.schedule, async () => {
// Check and only proceed when is idle and the db connection is connected
if (!workerDB.isReady() || cleanOldData.status)
return;
let numOfJobs = 0;
debug(`Start ${cleanOldData.name} Task at %s ...`, moment.utc().toISOString());
cleanOldData.status = 1;
if (!app.jobCtl) {
if (!app.locals) await loadLibs();
app.jobCtl = require('../controllers/job.js')(app.locals);
}
try {
// Filter for eligible jobs to be archived, skip the one from agnav.com's user for now.
const excludedPIds = (await models.User.find({ username: /agnav.com/i, kind: '1' }, '_id', { lean: true })).map(u => u._id);
const fromDate = moment.utc().subtract(env.ARCHIVE_JOBS_DAYS, 'days').toDate();
const pipeline = [
{ $match: { markedDelete: { $ne: true }, byPuid: { $nin: excludedPIds }, status: { $ne: JobStatus.ARCHIVED } } },
{
$lookup: {
from: 'applications',
localField: '_id',
foreignField: 'jobId',
as: 'jobs_apps'
}
},
{ $unwind: { path: '$jobs_apps', preserveNullAndEmptyArrays: true } },
{
$group: {
'_id': '$_id',
'totalSprayed': { $sum: '$jobs_apps.totalSprayed' },
'updateDate': { $max: '$jobs_apps.updateDate' }
}
},
{ $match: { updateDate: { $lt: fromDate }, totalSprayed: { $gt: 0 } } },
// { $group: { _id: null, count: { $sum: 1 } } } // Count total number of records
{ $match: { totalSprayed: { $gt: 0 } } },
{
$project: {
'_id': 1
}
}
];
//TODO: Use a transaction for atomic updates if possible. However, it would not work for jobs with large amount of data because of the 1 minute max transaction timeout
const jobIds = (await models.Job.aggregate(pipeline).limit(100)).map(j => j._id);
if (jobIds.length) {
debug(`Start archiving ${jobIds.length} jobs`);
let job, weatherInfo, updateObj;
for (let i = 0; i < jobIds.length; i++) {
const jobId = jobIds[i]; updateObj = { status: JobStatus.ARCHIVED };
const jobAppData = await app.jobCtl.getAppDataByJobId(jobId, '-_id lat lon sprayStat llnum gpsTime satsIn stdHdop', { wApps: true, dataOp: 1, wFileId: true, withJob: true });
if (jobAppData && !utils.isEmptyArray(jobAppData.data)) {
for await (const jad of jobAppData.data) {
if (jad.data && jad.data.length)
await models.AppFile.findOneAndUpdate({ _id: jad.id }, { data: jad.data }, { new: true, lean: true });
// Remove all application details or recorded points of files stored within the DB collection.
await models.AppDetail.deleteMany({ fileId: jad.id });
}
job = jobAppData.job;
// Update the job weatherInfo with the aggregated wi from data if this info does not exist with the job
if (utils.isEmptyObj(job.weatherInfo)) {
const wi = await jobUtil.getDataWeatherInfo(jobAppData.fileIds);
if (!utils.isEmptyArray(wi) && !utils.isEmptyObj(wi[0])) {
weatherInfo = ({
windSpd: utils.mpSecToKnot(wi[0]['avgWindSpd']).toFixed(1),
windDir: utils.deg2Compass(wi[0]['avgWindDir']),
temp: utils.inCorF(wi[0]['avgTemp'], job.measureUnit, false),
humid: utils.truncR(wi[0]['avgHumid'], 0)
});
}
}
// Update the job status (and wi if any)
if (weatherInfo) updateObj = { ...updateObj, ...{ weatherInfo: weatherInfo } };
}
await models.Job.updateOne({ _id: jobId }, updateObj);
numOfJobs++;
job = null, weatherInfo = null; updateObj = null;
}
}
} catch (error) {
debug(error);
} finally {
let msg = `Done ${cleanOldData.name} at ${moment.utc().toISOString()}.`;
if (numOfJobs) msg += ` ${numOfJobs} jobs were archived.`;
debug(msg);
cleanOldData.status = 0;
}
},
{
scheduled: true,
timezone: "Etc/UTC",
name: cleanOldData.name,
runOnInit: true
});