agmission/Development/server/controllers/upload_job.js

495 lines
17 KiB
JavaScript

'use strict';
module.exports = function (app) {
const debug = require('debug')('agm:upload-job'),
util = require('util'),
glob = util.promisify(require('glob')),
domain = require('domain'),
Job = require('../model/job'),
App = require('../model/application'),
User = require('../model/user'),
path = require('path'),
multer = require('multer'),
uniqid = require('uniqid'),
fs = require('fs-extra'),
unzip = require('extract-zip'),
unzipStream = require('unzip-stream'),
taskQHelper = require('../helpers/job_queue').getInstance(),
fileHelper = require('../helpers/file_helper'),
fileAgNav = require('../helpers/file_storage'),
fileKML = require('../helpers/file_kml'),
fileShp = require('../helpers/file_shp'),
fileSatLog = require('../helpers/file_satlog'),
utils = require('../helpers/utils'),
polyUtil = require('../helpers/poly_util'),
jobUtil = require('../helpers/job_util'),
ObjectId = require('mongodb').ObjectId,
Redis = require('ioredis'),
FILE = require('../helpers/file_constants'),
{ JobUpdateOp } = require('../helpers/job_constants'),
{ UserTypes, Errors, AppStatus, DEL_APP_IDS } = require('../helpers/constants'),
{ AppError, AppParamError } = require('../helpers/app_error'),
{ checkRqUsageLimits } = require('../middlewares/app_validator'),
env = require('../helpers/env');
const redis = new Redis({ password: env.REDIS_PWD, showFriendlyErrorStack: !app.isProd });
const uploadPath = env.UPLOAD_DIR;
const unzipPath = env.UNZIP_DIR;
const filenameFnc = function (req, file, cb) {
cb(null, uniqid() + '_' + file.originalname);
};
const storageFnc = multer.diskStorage({
destination: function (req, file, cb) {
cb(null, uploadPath)
},
filename: filenameFnc
});
const limits = { fileSize: (env.MAX_UPLOAD_SIZE_MB || 120) * 1048576, files: env.MAX_UPLOAD_FILES || 99 };
const upload = multer({
storage: storageFnc,
limits: limits
}).any("jobs");
async function checkFileDuplication(userInfo, jobId, file, updateOp) {
if (!userInfo || !jobId || !file) throw AppParamError.create();
const filter = {
jobId: jobId,
fileName: { $regex: new RegExp('^' + utils.escapeRegExp(`${file.originalname}`) + '$', 'i') },
fileSize: file.size, status: { $ne: -1 },
errorMsg: null,
markedDelete: { $ne: true }
};
const app = await App.findOne(filter).lean();
if (app) {
if (userInfo.kind === UserTypes.DEVICE || (app.updateOp === updateOp) || (app.proStatus > 0 && app.updateOp <= 3)) {
throw AppError.create(Errors.DUPLICATED_FILE);
}
}
}
/**
* Check whether the Job existed and the user has access to the job
* @param {*} userInfo userInfo with { userId: <userId>, kind: <user type> }
* @param {*} jobId jobId
* @returns Return the Job or error if the uploading user does not have access to the job (belong to other applicators or clients)
*/
async function checkUserJobUpload(userInfo, jobId) {
if (!jobId) throw AppError.create(Errors.JOB_NOT_FOUND);
const job = await Job.findById(jobId, '_id client byPuid')
.populate({
path: 'client',
model: UserTypes.CLIENT,
select: '_id name parent'
})
.lean();
if (!job) throw AppError.create(Errors.JOB_NOT_FOUND);
if (userInfo.kind === UserTypes.APP) {
if (!job.byPuid.equals(userInfo.userId)) throw AppError.create(Errors.JOB_NOT_FOUND);
} else {
const matched = await User.findOne({ _id: userInfo.userId, parent: job.byPuid }, '_id').lean();
if (!matched) throw AppError.create(Errors.JOB_NOT_FOUND);
}
return job;
}
async function readJobInfoFromZip(zipFilePath) {
return new Promise((resolve, reject) => {
let foundJson = false, jobJsonFN, jobJsonContent, returned = false;
const d = domain.create();
d.on('error', err => {
if (!returned) {
returned = true;
debug(err);
reject(err);
} else {
d.exit(); // Exit the domain to avoid further next uncaughtable errors (bound to the streams) might occur
}
});
d.run(function () {
const zipStream = fs.createReadStream(zipFilePath);
// Notes: Must use domain to catch errors while the unzipstream parsing the piped stream, on('error') does not work
zipStream.pipe(unzipStream.Parse({ debug: false }))
.on('entry', entry => {
if (!foundJson && entry.type == 'File' && entry.path && /job.json$/i.test(entry.path)) {
foundJson = true;
jobJsonFN = entry.path;
const chunks = [];
entry.on('error', err => { throw err; });
entry.on('data', chunk => chunks.push(chunk));
entry.on('end', () => {
if (chunks.length) {
try {
jobJsonContent = Buffer.from(Buffer.concat(chunks)).toString();
returned = true;
resolve({ file: jobJsonFN, content: jobJsonContent });
} catch (error) {
throw error;
}
}
});
// 'close' never emmited
} else {
entry.autodrain();
}
})
.on('close', () => {
if (!foundJson && !returned) { // 'close' always occurs multiple times
returned = true;
resolve();
}
})
.on('error', err => { throw err });
});
});
}
/***
* Create an Application entry along with a background task for processing the uploaded file
*/
async function createAppEntry(request, importTask, byImport = false) {
const appl = new App({
jobId: importTask.jobId,
fileName: importTask.file.originalName,
savedFilename: importTask.file.name,
fileSize: importTask.file.size,
updateOp: importTask.updateOp,
status: importTask.err ? 0 : 1,
errorMsg: importTask.err,
cid: importTask.clientId,
byUser: ObjectId(request.uid),
byImport: byImport
});
const app = await appl.save();
importTask.appId = app._id;
if (!importTask.err) { // No need to enqueue a new importing task if there was any validation error
await taskQHelper.publishJobTaskASync(Buffer.from(JSON.stringify(importTask)));
}
const doneObj = {
jobId: app.jobId || null,
fileName: app.fileName || importTask.file.originalname,
fileSize: app.fileSize || importTask.file.size,
updateOp: importTask.updateOp,
errorMsg: app.errorMsg || '',
createdDate: app.createdDate
};
if (app && request.ut != UserTypes.DEVICE) {
doneObj.status = app.status;
doneObj.proStatus = app.proStatus;
doneObj._id = app._id;
}
return doneObj;
}
const uploadData = multer({
storage: storageFnc,
limits: limits,
}).any("data");
async function uploadData_post(req, res) {
/**
* Handle multiples files
* 1. Check whether the file is valid or not
* 2. Create a bground task to process each file.
* 3. Response with initial result to the client
*
* For each file without meta data file (.job) => automatically create a new job under the provided client (clientId)
*/
try {
await new Promise((resolve, reject) => {
uploadData(req, res, err => {
if (err) return reject(err);
resolve();
});
});
const input = req.body;
if (utils.isEmptyArray(req.files)) throw AppError.create(Errors.NO_FILE);
const processResult = [];
const userInfo = { userId: ObjectId(req.uid), kind: req.ut };
for (const file of req.files) {
const filePath = path.join(uploadPath, file.filename);
const isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(filePath);
const importTask = {
file: { name: file.filename, originalName: file.originalname, size: file.size },
metaFile: '',
updateOp: JobUpdateOp.OVERWRITE,
clientId: input.clientId,
uid: req.uid
};
if (!isKmlOrKmz) {
try {
const data = await readJobInfoFromZip(filePath);
if (data) {
importTask.metaFile = data.file;
const jsonData = JSON.parse(data.content);
const parsedJobId = Number(jsonData ? jsonData._id || 0 : 0);
if (parsedJobId) {
importTask.jobId = parsedJobId;
importTask.updateOp = JobUpdateOp.DATA_ONLY;
}
}
} catch (err) {
importTask.err = Errors.CORRUPTED_ZIP;
}
}
let _job = null;
if (importTask.jobId) {
try {
_job = await checkUserJobUpload(userInfo, importTask.jobId);
} catch (err) {
importTask.err = Errors.JOB_NOT_FOUND;
}
if (importTask.err !== Errors.JOB_NOT_FOUND) {
try {
await checkFileDuplication(userInfo, importTask.jobId, file, JobUpdateOp.DATA_ONLY);
} catch (err) {
importTask.err = Errors.DUPLICATED_FILE;
}
}
} else {
try {
await checkRqUsageLimits(req, res);
} catch (err) {
importTask.err = err;
}
}
try {
const data = await createAppEntry(req, importTask, true);
if (_job && _job.client) {
data["cname"] = _job.client.name;
data["cid"] = _job.client._id;
} else if (importTask.clientId) {
data["cid"] = importTask.clientId;
}
processResult.push(data);
} catch (err) {
await fileHelper.removeFiles([file.path]);
}
}
res.json(processResult);
} catch (err) {
throw err;
}
}
async function uploadJob_post(req, res) {
try {
await new Promise((resolve, reject) => {
upload(req, res, (err) => {
if (err) return reject(err);
resolve();
});
});
// Check for empty form files
if (utils.isEmptyArray(req.files)) throw AppError.create(Errors.NO_FILE);
let jobId = req.body.jobId, file = req.files[0];
const jobImportTask = {
jobId: jobId,
file: { name: file.filename, originalName: file.originalname, size: file.size },
metaFile: '',
updateOp: Number(req.body.updateOp) || 1, // Default Append
uid: req.uid
};
let filePath = path.join(uploadPath, file.filename),
isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(filePath),
jsonContent = null,
userInfo = { userId: ObjectId(req.uid), kind: req.ut };
await checkUserJobUpload(userInfo, jobId);
await checkFileDuplication(userInfo, jobId, file, jobImportTask.updateOp);
// Check for over usage limits
if (jobImportTask.updateOp == JobUpdateOp.APPEND || jobImportTask.updateOp == JobUpdateOp.OVERWRITE) {
await checkRqUsageLimits(req, res);
}
if (isKmlOrKmz) {
const uploadRes = await createAppEntry(req, jobImportTask, false);
return res.json(uploadRes);
}
const data = await readJobInfoFromZip(filePath);
if (data) {
jobImportTask.metaFile = data.file;
jsonContent = data.content;
}
if (jsonContent && jobImportTask.updateOp === JobUpdateOp.OVERWRITE) {
let jsonData, parsedJobId = 0;
try {
jsonData = JSON.parse(jsonContent);
parsedJobId = Number(jsonData ? jsonData._id || 0 : 0);
} catch (err) { }
if (parsedJobId && parsedJobId !== Number(jobId))
throw AppError.create(Errors.WRONG_JOB_FILE);
}
const uploadRes = await createAppEntry(req, jobImportTask, false);
res.json(uploadRes);
} catch (err) {
if (req.files && req.files[0]) {
await fileHelper.removeFiles([req.files[0].path]);
}
throw err;
}
}
async function cancelImport_post(req, res) {
const appIds = req.body.appIds;
if (!Array.isArray(appIds) || !appIds.length) return res.json(null);
await redis.sadd(DEL_APP_IDS, appIds);
const appObIds = appIds.map(it => ObjectId(it));
await App.updateMany({ _id: { $in: appObIds }, status: { $ne: AppStatus.DONE } }, { status: AppStatus.WAS_CANCELLED });
res.json(appIds);
}
const sprayItemsOps = { nonull: false, nocase: true, dot: true, ignore: ['**/{(q[0-9]*.t*|n[0-9]*.t*),n[0-9]*spr+(on|off).+(dbf|prj|shp|shx)}'] };
const uploadAreas = multer({
storage: multer.diskStorage({
destination: function (req, file, cb) {
cb(null, env.AREAS_UPLOAD_DIR)
},
filename: filenameFnc
}),
limits: limits,
}).any("areas");
async function uploadAreas_post(req, res, next) {
const areasUploadPath = env.AREAS_UPLOAD_DIR, areasUnzipPath = path.join(areasUploadPath, 'unzip/');
let input, isKmlOrKmz = false, uFile, unZipPath;
try {
await new Promise((resolve, reject) => {
uploadAreas(req, res, (err) => {
if (err) return reject(err);
resolve();
});
});
req.setTimeout(0);
res.setTimeout(0);
input = req.body; // All additional form data {key,value}. Set here to make the request got updated with uploaded files
if (!input.readOnly && !input.clientId) throw AppParamError.create();
let areaTypeAndFiles = [], areaFiles = [], newAreas = [], dup = 0;
uFile = req.files[0];
isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(uFile.originalname);
if (isKmlOrKmz) areaFiles = [uFile.path];
unZipPath = isKmlOrKmz ? areasUploadPath : path.join(areasUnzipPath, uFile.filename.replace(path.extname(uFile.filename), ''), '/');
if (!isKmlOrKmz) {
try {
await unzip(uFile.path, { dir: unZipPath });
} catch (err) {
throw AppError.create(Errors.CORRUPTED_ZIP);
}
}
if (!isKmlOrKmz) {
try {
areaFiles = await glob(path.join(unZipPath, "**/*.{job,kml,kmz,no1,prj,shp,dbf,agn,dsp,xcl}"), sprayItemsOps);
if (areaFiles.length === 0) throw AppError.create(Errors.AREAS_NOT_FOUND);
} catch (err) {
throw err;
}
}
areaTypeAndFiles = await fileHelper.areasFromListASync(unZipPath, areaFiles);
const ops = { onlyAreas: true, sprZoneColor: 'blue', isUS: false, appRate: -1 };
const badFiles = [];
for (const areaFile of areaTypeAndFiles) {
let data, areas = [];
try {
switch (areaFile.type) {
case FILE.FILE_NO1:
case FILE.FILE_PRJ:
data = await fileAgNav.readAGN_PRJAsync(unZipPath, areaFile, ops);
break;
case FILE.FILE_KMZ:
case FILE.FILE_KML:
data = await fileKML.readKmlKmzToGeoItemsASync(unZipPath, areaFile, ops);
break;
case FILE.FILE_SHP:
data = await fileShp.readSHPToGeoItems(unZipPath, areaFile, ops);
break;
case FILE.FILE_SATLOG_JOB:
data = await fileSatLog.readSatLogJob(unZipPath, areaFile, ops);
break;
default:
continue;
}
if (data) {
if (!utils.isEmptyArray(data.sprayAreas)) areas = areas.concat(data.sprayAreas);
if (!utils.isEmptyArray(data.xclAreas)) areas = areas.concat(data.xclAreas);
}
// debug(`areas: ${areaFile.area}`, areas.length);
// Mar 11/2025, changed logic: Try to save found areas to DB. If OK, add areas to the DB. if error or there invalid polygons, record the file then continue the next file
if (!utils.isEmptyArray(areas)) {
const addRS = await jobUtil.addAreasToLib(areas, { returnAreas: true, clientId: input.clientId, debug: false/*!app.isProd*/ });
newAreas = newAreas.concat(addRS.areas); // Add areas to the result
// debug(`areas: ${areaFile.area}`, addRS.areas.length, "dup: ", addRS.dup);
dup += addRS.dup; // Count the duplicated areas
}
} catch (err) {
debug(areaFile, err);
if (areaTypeAndFiles.length === 1) throw AppError.create(Errors.INVALID_AREAS_FILE);
// Add to the result and continue to next file
badFiles.push({ name: areaFile.area, count: areas.length });
}
}
if (utils.isEmptyArray(newAreas) && !dup) throw AppError.create(Errors.AREAS_NOT_FOUND);
res.json({ areas: polyUtil.toGeoItems(newAreas), dup: dup, ...(badFiles.length && { badFiles: badFiles }) });
} catch (err) {
throw err;
} finally {
// Clean up uploaded files or folders after processing
const delFiles = [uFile.path];
if (!isKmlOrKmz && unZipPath) delFiles.push(unZipPath);
!utils.isEmptyArray(delFiles) && (await fileHelper.removeFilesAsync(delFiles));
}
}
return ({
uploadJob_post, uploadData_post, cancelImport_post, uploadAreas_post
});
}