610 lines
20 KiB
JavaScript
610 lines
20 KiB
JavaScript
'use strict';
|
|
|
|
module.exports = function (app) {
|
|
const debug = require('debug')('agm:upload-job'),
|
|
async = require('async'),
|
|
glob = require('glob'),
|
|
domain = require('domain'),
|
|
Job = require('../model/job'),
|
|
App = require('../model/application'),
|
|
User = require('../model/user'),
|
|
path = require('path'),
|
|
multer = require('multer'),
|
|
uniqid = require('uniqid'),
|
|
fs = require('fs-extra'),
|
|
unzip = require('extract-zip'),
|
|
unzipStream = require('unzip-stream'),
|
|
taskQHelper = require('../helpers/job_queue').getInstance(),
|
|
fileHelper = require('../helpers/file_helper'),
|
|
fileAgNav = require('../helpers/file_storage'),
|
|
fileKML = require('../helpers/file_kml'),
|
|
fileShp = require('../helpers/file_shp'),
|
|
fileSatLog = require('../helpers/file_satlog'),
|
|
utils = require('../helpers/utils'),
|
|
polyUtil = require('../helpers/poly_util'),
|
|
jobUtil = require('../helpers/job_util'),
|
|
ObjectId = require('mongodb').ObjectId,
|
|
Redis = require('ioredis'),
|
|
FILE = require('../helpers/file_constants'),
|
|
{ JobUpdateOp } = require('../helpers/job_constants'),
|
|
{ UserTypes, Errors, AppStatus, DEL_APP_IDS } = require('../helpers/constants'),
|
|
{ AppError, AppParamError, AppInputError } = require('../helpers/app_error'),
|
|
{ getUserInfo, checkUsageLimits } = require('../middlewares/app_validator'),
|
|
env = require('../helpers/env');
|
|
|
|
const redis = new Redis({ password: env.REDIS_PWD, showFriendlyErrorStack: !app.isProd });
|
|
const uploadPath = env.UPLOAD_DIR;
|
|
const unzipPath = env.UNZIP_DIR;
|
|
|
|
const storage = multer.diskStorage({
|
|
destination: function (req, file, cb) {
|
|
cb(null, uploadPath)
|
|
},
|
|
filename: function (req, file, cb) {
|
|
cb(null, uniqid() + '_' + file.originalname);
|
|
}
|
|
});
|
|
|
|
const limits = { fileSize: (env.MAX_UPLOAD_SIZE_MB || 120) * 1048576, files: env.MAX_UPLOAD_FILES || 99 };
|
|
const upload = multer({
|
|
storage: storage,
|
|
limits: limits
|
|
}).any("jobs");
|
|
|
|
function removeTempFiles(files) {
|
|
fileHelper.removeFiles(files, (err) => { });
|
|
}
|
|
|
|
function checkFileDuplication(userInfo, jobId, file, updateOp, cb) {
|
|
if (!userInfo || !jobId || !file) return cb && cb(AppParamError.create());
|
|
|
|
const filter = {
|
|
jobId: jobId,
|
|
fileName: { $regex: new RegExp('^' + utils.escapeRegExp(`${file.originalname}`) + '$', 'i') },
|
|
fileSize: file.size, status: { $ne: -1 },
|
|
errorMsg: null,
|
|
markedDelete: { $ne: true }
|
|
};
|
|
App.findOne(filter)
|
|
.lean()
|
|
.then(app => {
|
|
if (app) {
|
|
if (userInfo.kind === UserTypes.DEVICE || (app.updateOp === updateOp) || (app.proStatus > 0 && app.updateOp <= 3))
|
|
return cb && cb(AppError.create(Errors.DUPLICATED_FILE));
|
|
}
|
|
return cb && cb();
|
|
})
|
|
.catch(err => {
|
|
cb && cb(err);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check whether the Job existed and the user has access to the job
|
|
* @param {*} userInfo userInfo with { userId: <userId>, kind: <user type> }
|
|
* @param {*} jobId jobId
|
|
* @param {*} cb callback function
|
|
* @returns Return the Job or error if the uploading user does not have access to the job (belong to other applicators or clients)
|
|
*/
|
|
function checkUserJobUpload(userInfo, jobId, cb) {
|
|
if (!jobId)
|
|
return cb(AppError.create(Errors.JOB_NOT_FOUND));
|
|
|
|
let _job;
|
|
Job.findById(jobId, '_id client byPuid')
|
|
.populate({
|
|
path: 'client',
|
|
model: UserTypes.CLIENT,
|
|
select: '_id name parent'
|
|
})
|
|
.lean()
|
|
.then(job => {
|
|
if (!job) AppError.throw(Errors.JOB_NOT_FOUND);
|
|
|
|
_job = job;
|
|
|
|
if (userInfo.kind === UserTypes.APP)
|
|
return job.byPuid.equals(userInfo.userId);
|
|
else
|
|
return User.findOne({ _id: userInfo.userId, parent: job.byPuid }, '_id').lean();
|
|
})
|
|
.then(matched => {
|
|
if (cb) {
|
|
return !matched ? cb(AppError.create(Errors.JOB_NOT_FOUND)) : cb(null, _job);
|
|
}
|
|
})
|
|
.catch(err => {
|
|
cb && cb(err);
|
|
return;
|
|
});
|
|
}
|
|
|
|
function checkForUsageLimits(req, cb) {
|
|
if (!req) return cb && (cb(AppInputError.create()));
|
|
|
|
const userInfo = getUserInfo(req);
|
|
checkUsageLimits(userInfo)
|
|
.then(() => { cb && (cb()); })
|
|
.catch(err => { cb && (cb(err)); })
|
|
}
|
|
|
|
function uploadJob_post(req, res, next) {
|
|
/**
|
|
* Handle 1 uploaded job data file .zip
|
|
* 1. Check whether the file is valid or not
|
|
* 2. Create a bground RabbitMQ task to process this job data file.
|
|
* 3. Response result to the client
|
|
*
|
|
* Note in jobId:
|
|
* - jobId in the request body parameter takes precedence over the one in the job.json file
|
|
* - For the case of uploading data from other device using Web API, jobId will be get from the .json file if any
|
|
*/
|
|
let uploadRes;
|
|
upload(req, res, async function (err) {
|
|
if (err) return next(err);
|
|
|
|
// Check for empty form files
|
|
if (utils.isEmptyArray(req.files)) return next(AppError.create(Errors.NO_FILE));
|
|
|
|
|
|
let jobId = req.body.jobId, file = req.files[0];
|
|
const jobImportTask = {
|
|
jobId: jobId,
|
|
file: { name: file.filename, originalName: file.originalname, size: file.size },
|
|
metaFile: '',
|
|
updateOp: Number(req.body.updateOp) || 1, // Default Append
|
|
uid: req.uid
|
|
};
|
|
|
|
let filePath = path.join(uploadPath, file.filename),
|
|
isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(filePath),
|
|
jsonContent = null,
|
|
userInfo = { userId: ObjectId(req.uid), kind: req.ut };
|
|
|
|
async.series([
|
|
function (callback) {
|
|
checkUserJobUpload(userInfo, jobId, callback);
|
|
},
|
|
function (callback) {
|
|
checkFileDuplication(userInfo, jobId, file, jobImportTask.updateOp, callback);
|
|
},
|
|
function (callback) {
|
|
// Check for over usage limits
|
|
if (jobImportTask.updateOp == JobUpdateOp.APPEND || jobImportTask.updateOp == JobUpdateOp.OVERWRITE) {
|
|
checkForUsageLimits(req, callback);
|
|
} else callback();
|
|
},
|
|
function (callback) {
|
|
if (isKmlOrKmz) {
|
|
createAppEntry(req, jobImportTask, false, (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
uploadRes = data;
|
|
callback();
|
|
});
|
|
}
|
|
else
|
|
return callback();
|
|
},
|
|
function (callback) {
|
|
if (isKmlOrKmz) return callback();
|
|
|
|
readJobInfoFromZip(filePath, (err, data) => {
|
|
if (err)
|
|
return callback(AppError.create(Errors.CORRUPTED_ZIP));
|
|
|
|
if (data) {
|
|
jobImportTask.metaFile = data.file;
|
|
jsonContent = data.content;
|
|
}
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
if (isKmlOrKmz) return callback();
|
|
|
|
if (jsonContent && jobImportTask.updateOp === JobUpdateOp.OVERWRITE) {
|
|
let jsonData, parsedJobId = 0;
|
|
try {
|
|
jsonData = JSON.parse(jsonContent);
|
|
parsedJobId = Number(jsonData ? jsonData._id || 0 : 0);
|
|
} catch (err) { }
|
|
|
|
if (parsedJobId && parsedJobId !== Number(jobId))
|
|
return callback(AppError.create(Errors.WRONG_JOB_FILE));
|
|
}
|
|
|
|
// Enqueue the Job importing task for this job with this job file
|
|
createAppEntry(req, jobImportTask, false, (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
uploadRes = data;
|
|
callback();
|
|
});
|
|
}
|
|
],
|
|
err => {
|
|
if (err) {
|
|
next(err);
|
|
removeTempFiles([file.path]);
|
|
}
|
|
else {
|
|
res.json(uploadRes);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function readJobInfoFromZip(zipFilePath, cb) {
|
|
let foundJson = false, jobJsonFN, jobJsonContent, returned = false;
|
|
const d = domain.create();
|
|
d.on('error', err => {
|
|
if (!returned) {
|
|
returned = true;
|
|
debug(err);
|
|
return cb(err);
|
|
} else {
|
|
d.exit(); // Exit the domain to avoid further next uncaughtable errors (bound to the streams) might occur
|
|
}
|
|
});
|
|
d.run(function () {
|
|
const zipStream = fs.createReadStream(zipFilePath);
|
|
// Notes: Must use domain to catch errors while the unzipstream parsing the piped stream, on('error') does not work
|
|
zipStream.pipe(unzipStream.Parse({ debug: false }))
|
|
.on('entry', entry => {
|
|
if (!foundJson && entry.type == 'File' && entry.path && /job.json$/i.test(entry.path)) {
|
|
foundJson = true;
|
|
jobJsonFN = entry.path;
|
|
|
|
const chunks = [];
|
|
entry.on('error', err => { throw err; });
|
|
entry.on('data', chunk => chunks.push(chunk));
|
|
entry.on('end', () => {
|
|
if (chunks.length) {
|
|
try {
|
|
jobJsonContent = Buffer.from(Buffer.concat(chunks)).toString();
|
|
returned = true;
|
|
cb(null, { file: jobJsonFN, content: jobJsonContent });
|
|
} catch (error) {
|
|
throw error;
|
|
}
|
|
}
|
|
});
|
|
// 'close' never emmited
|
|
} else {
|
|
entry.autodrain();
|
|
}
|
|
})
|
|
.on('close', () => {
|
|
if (!foundJson && !returned) { // 'close' always occurs multiple times
|
|
returned = true;
|
|
cb();
|
|
}
|
|
})
|
|
.on('error', err => { throw err });
|
|
});
|
|
}
|
|
|
|
/***
|
|
* Create an Application entry along with a background task for processing the uploaded file
|
|
*/
|
|
function createAppEntry(request, importTask, byImport = false, cb) {
|
|
const appl = new App({
|
|
jobId: importTask.jobId,
|
|
fileName: importTask.file.originalName,
|
|
savedFilename: importTask.file.name,
|
|
fileSize: importTask.file.size,
|
|
updateOp: importTask.updateOp,
|
|
status: importTask.err ? 0 : 1,
|
|
errorMsg: importTask.err,
|
|
cid: importTask.clientId,
|
|
byUser: ObjectId(request.uid),
|
|
byImport: byImport
|
|
});
|
|
|
|
appl.save((err, app) => {
|
|
if (err) return cb(err);
|
|
|
|
importTask.appId = app._id;
|
|
|
|
const done = (cb) => {
|
|
const doneObj = {
|
|
jobId: app.jobId || null,
|
|
fileName: app.fileName || importTask.file.originalname,
|
|
fileSize: app.fileSize || importTask.file.size,
|
|
updateOp: importTask.updateOp,
|
|
errorMsg: app.errorMsg || '',
|
|
createdDate: app.createdDate
|
|
};
|
|
if (app && request.ut != 9) {
|
|
doneObj.status = app.status;
|
|
doneObj.proStatus = app.proStatus;
|
|
doneObj._id = app._id;
|
|
}
|
|
cb(null, doneObj);
|
|
};
|
|
|
|
if (!importTask.err) { // No need to enqueue a new importing task if there was any validation error
|
|
taskQHelper.publishJobTask(Buffer.from(JSON.stringify(importTask)), (err) => {
|
|
if (err) {
|
|
// If failed to add to the queue, It will be re-enqueued from the offline list later.
|
|
debug("Create AppTask failed !", err);
|
|
}
|
|
done(cb);
|
|
})
|
|
} else {
|
|
done(cb);
|
|
}
|
|
});
|
|
}
|
|
|
|
const uploadData = multer({
|
|
storage: storage,
|
|
limits: limits,
|
|
}).any("data");
|
|
|
|
|
|
function uploadData_post(req, res, next) {
|
|
/**
|
|
* Handle multiples files
|
|
* 1. Check whether the file is valid or not
|
|
* 2. Create a bground task to process each file.
|
|
* 3. Response with initial result to the client
|
|
*
|
|
* For each file without meta data file (.job) => automatically create a new job under the provided client (clientId)
|
|
*/
|
|
let _job, input;
|
|
uploadData(req, res, function (err) {
|
|
if (err) return next(err);
|
|
|
|
input = req.body; // All additional form data (k,v)
|
|
|
|
if (utils.isEmptyArray(req.files)) return next(AppError.create(Errors.NO_FILE));
|
|
|
|
let filePath, isKmlOrKmz = false, processResult = [], userInfo = { userId: ObjectId(req.uid), kind: req.ut };
|
|
async.eachSeries(req.files, (file, cb) => {
|
|
|
|
const importTask = {
|
|
file: { name: file.filename, originalName: file.originalname, size: file.size },
|
|
metaFile: '',
|
|
updateOp: JobUpdateOp.OVERWRITE,
|
|
clientId: input.clientId,
|
|
uid: req.uid
|
|
};
|
|
filePath = path.join(uploadPath, file.filename);
|
|
isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(filePath);
|
|
|
|
async.series([
|
|
function (callback) {
|
|
if (isKmlOrKmz)
|
|
return callback();
|
|
else {
|
|
readJobInfoFromZip(filePath, (err, data) => {
|
|
if (err) {
|
|
importTask.err = Errors.CORRUPTED_ZIP;
|
|
}
|
|
|
|
if (data) {
|
|
importTask.metaFile = data.file;
|
|
let jsonData, parsedJobId = 0;
|
|
try {
|
|
jsonData = JSON.parse(data.content);
|
|
parsedJobId = Number(jsonData ? jsonData._id || 0 : 0);
|
|
if (parsedJobId) {
|
|
importTask.jobId = parsedJobId;
|
|
importTask.updateOp = JobUpdateOp.DATA_ONLY;
|
|
}
|
|
} catch (err) { }
|
|
}
|
|
callback();
|
|
});
|
|
}
|
|
},
|
|
function (callback) {
|
|
if (importTask.jobId) {
|
|
checkUserJobUpload(userInfo, importTask.jobId, (err, job) => {
|
|
if (err || !job)
|
|
importTask.err = Errors.JOB_NOT_FOUND;
|
|
else
|
|
_job = job;
|
|
callback();
|
|
});
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
if (importTask.jobId && importTask.err != Errors.JOB_NOT_FOUND) {
|
|
checkFileDuplication(userInfo, importTask.jobId, file, JobUpdateOp.DATA_ONLY, err => {
|
|
if (err)
|
|
importTask.err = Errors.DUPLICATED_FILE;
|
|
callback();
|
|
});
|
|
} else {
|
|
callback();
|
|
}
|
|
},
|
|
function (callback) {
|
|
if (!importTask.jobId && !importTask.err) {
|
|
checkForUsageLimits(req, err => {
|
|
if (err)
|
|
importTask.err = err;
|
|
callback();
|
|
})
|
|
} else callback();
|
|
},
|
|
function (callback) {
|
|
// Save to db app list or enqueue a new Job importing task
|
|
createAppEntry(req, importTask, true, (err, data) => {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
if (_job && _job.client) {
|
|
data["cname"] = _job.client.name;
|
|
data["cid"] = _job.client._id;
|
|
}
|
|
else if (importTask.clientId) {
|
|
data["cid"] = importTask.clientId;
|
|
}
|
|
processResult.push(data);
|
|
|
|
callback();
|
|
});
|
|
}
|
|
], err1 => {
|
|
if (err1 || importTask.err) {
|
|
removeTempFiles([file.path]);
|
|
}
|
|
cb();
|
|
}); // Inner Async serries
|
|
}, (err) => {
|
|
if (err)
|
|
return next(err);
|
|
// console.log(JSON.stringify(processResult, null, 2));
|
|
res.json(processResult);
|
|
}); // Outter Async serries
|
|
});
|
|
}
|
|
|
|
async function cancelImport_post(req, res) {
|
|
const appIds = req.body.appIds;
|
|
if (!Array.isArray(appIds) || !appIds.length) return res.json(null);
|
|
|
|
await redis.sadd(DEL_APP_IDS, appIds);
|
|
const appObIds = appIds.map(it => ObjectId(it));
|
|
await App.updateMany({ _id: { $in: appObIds }, status: { $ne: AppStatus.DONE } }, { status: AppStatus.WAS_CANCELLED });
|
|
res.json(appIds);
|
|
}
|
|
|
|
const uploadAreas = multer({
|
|
storage: storage,
|
|
limits: limits,
|
|
}).any("areas");
|
|
|
|
const sprayItemsOps = { nonull: false, nocase: true, dot: true, ignore: ['**/{(q[0-9]*.t*|n[0-9]*.t*),n[0-9]*spr+(on|off).+(dbf|prj|shp|shx)}'] };
|
|
|
|
function uploadAreas_post(req, res, next) {
|
|
let input;
|
|
uploadAreas(req, res, function (err) {
|
|
req.setTimeout(0);
|
|
res.setTimeout(0);
|
|
|
|
if (err) return next(err);
|
|
|
|
input = req.body; // All additional form data {key,value}
|
|
|
|
if (!input.readOnly && !input.clientId) return next(AppParamError.create());
|
|
|
|
let areaTypeAndFiles = [], areaFiles = [], uFile = req.files[0], areas = [], newAreas = [], dup = 0;
|
|
|
|
const isKmlOrKmz = /.*(.kml|.kmz)+$/i.test(uFile.originalname);
|
|
|
|
if (isKmlOrKmz) areaFiles = [uFile.path];
|
|
const unZipPath = isKmlOrKmz ? uploadPath : path.join(unzipPath, uFile.filename.replace(path.extname(uFile.filename), ''), '/');
|
|
|
|
async.series([
|
|
function (callback) {
|
|
if (!isKmlOrKmz) {
|
|
unzip(uFile.path, { dir: unZipPath }, (err) => {
|
|
if (err)
|
|
return callback(AppError.create(Errors.CORRUPTED_ZIP));
|
|
callback();
|
|
});
|
|
} else callback();
|
|
},
|
|
function (callback) {
|
|
if (!isKmlOrKmz) {
|
|
glob(path.join(unZipPath, "**/*.{job,kml,kmz,no1,prj,shp,dbf,agn,dsp,xcl}"), sprayItemsOps, (err, files) => {
|
|
if (err)
|
|
return callback(err);
|
|
areaFiles = files;
|
|
if (areaFiles.length === 0)
|
|
return callback(AppError.create(Errors.AREAS_NOT_FOUND));
|
|
callback();
|
|
});
|
|
}
|
|
else callback();
|
|
},
|
|
function (callback) {
|
|
fileHelper.areasFromList(unZipPath, areaFiles, (err, areas) => {
|
|
areaTypeAndFiles = areas;
|
|
callback();
|
|
});
|
|
},
|
|
function (callback) {
|
|
const ops = { onlyAreas: true, sprZoneColor: 'blue', isUS: false, appRate: -1 };
|
|
const readCb = (err, data, cb) => {
|
|
if (err) {
|
|
if (err)
|
|
return cb(err);
|
|
|
|
if (areaTypeAndFiles.length === 1) // will not proceed if error happened with the first area file(s)
|
|
return cb(AppError.create(Errors.INVALID_AREAS_FILE));
|
|
}
|
|
if (data) {
|
|
if (!utils.isEmptyArray(data.sprayAreas))
|
|
areas = areas.concat(data.sprayAreas);
|
|
if (!utils.isEmptyArray(data.xclAreas))
|
|
areas = areas.concat(data.xclAreas);
|
|
}
|
|
cb();
|
|
};
|
|
|
|
async.eachSeries(areaTypeAndFiles, (areaFile, esCB) => {
|
|
// if (!app.isProd) debug(areaFile.area);
|
|
switch (areaFile.type) {
|
|
case FILE.FILE_NO1:
|
|
case FILE.FILE_PRJ:
|
|
fileAgNav.readAGN_PRJ(unZipPath, areaFile, ops, (err, data) => readCb(err, data, esCB));
|
|
break;
|
|
case FILE.FILE_KMZ:
|
|
case FILE.FILE_KML:
|
|
fileKML.readKmlKmzToGeoItems(unZipPath, areaFile, ops, (err, data) => readCb(err, data, esCB));
|
|
break;
|
|
case FILE.FILE_SHP:
|
|
fileShp.readSHPToGeoItems(unZipPath, areaFile, ops, (err, data) => readCb(err, data, esCB));
|
|
break;
|
|
case FILE.FILE_SATLOG_JOB:
|
|
fileSatLog.readSatLogJob(unZipPath, areaFile, ops, (err, data) => readCb(err, data, esCB));
|
|
break;
|
|
default:
|
|
return esCB();
|
|
}
|
|
}, callback);
|
|
},
|
|
function (callback) {
|
|
if (utils.isEmptyArray(areas)) return callback(AppError.create(Errors.AREAS_NOT_FOUND));
|
|
|
|
if (input.readOnly) {
|
|
newAreas = areas;
|
|
return callback();
|
|
} else {
|
|
jobUtil.addAreasToLib(areas, { returnAreas: true, clientId: input.clientId, debug: !app.isProd })
|
|
.then(data => {
|
|
newAreas = data.areas;
|
|
dup = data.dup;
|
|
callback();
|
|
})
|
|
.catch(err => {
|
|
callback(err);
|
|
});
|
|
}
|
|
}
|
|
], function (err) {
|
|
if (!app.isProd) debug('DONE ...');
|
|
if (err) next(err);
|
|
else
|
|
res.json({ areas: polyUtil.toGeoItems(newAreas), dup: dup });
|
|
|
|
// Clean up files or folders here
|
|
const delFiles = [uFile.path];
|
|
if (!isKmlOrKmz) delFiles.push(unZipPath);
|
|
removeTempFiles(delFiles);
|
|
});
|
|
}); // upload handling
|
|
}
|
|
|
|
return ({
|
|
uploadJob_post, uploadData_post, cancelImport_post, uploadAreas_post
|
|
});
|
|
} |