345 lines
11 KiB
JavaScript
345 lines
11 KiB
JavaScript
/* eslint-disable vars-on-top */
|
|
'use strict';
|
|
|
|
const net = require('net'),
|
|
amqp = require('amqplib'),
|
|
async = require('async'),
|
|
MongoClient = require('mongodb').MongoClient,
|
|
MongoError = require('mongodb').MongoError,
|
|
numUtil = require('number-util'),
|
|
keys = require('./keys'),
|
|
LatLon = require('geodesy').LatLonEllipsoidal,
|
|
fs = require('fs-extra'),
|
|
debug = require('debug')('gps-server');
|
|
|
|
const protoOps = {
|
|
'AGNAV': { port: 6080, device: require('device-tcp-socket').AgNavTCPDevice, parser: require('binary-package-parser').AgNavParser },
|
|
'RAP': { port: 6082, device: require('device-tcp-socket').RAPTCPDevice, parser: require('binary-package-parser').RAPParser }
|
|
};
|
|
const protoOp = protoOps[process.env.PROTOCOL];
|
|
if (!protoOp)
|
|
shutDown('INVALID_PROTO');
|
|
|
|
const MAX_QUEUE_CONN_RETRY = 6;
|
|
const MAX_TRK_DIST = 5; // kilometers
|
|
const DB_NAME = 'agmission';
|
|
const LOC_COL = 'locations';
|
|
const LOC_CACHE_COL = 'location_cache';
|
|
|
|
const MCON_URI = `mongodb://localhost:27017/${DB_NAME}/?replSet=rs0&retryWrites=false`;
|
|
const connOps = { family: 4, useNewUrlParser: true, useUnifiedTopology: true, auth: { user: 'agm', password: 'Agm2017', authSource: DB_NAME } };
|
|
|
|
const BK_FILE = './backup.json';
|
|
|
|
var server, srvClosing = false, mgClient;
|
|
const trkQueue = 'gdata';
|
|
var mqChannel;
|
|
var trkVehs = {}; // { 'unitId': { last: <last location>, } }
|
|
|
|
const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms)));
|
|
|
|
var qConnTry = 0;
|
|
|
|
async function connectRabbitMq() {
|
|
qConnTry++;
|
|
if (qConnTry >= MAX_QUEUE_CONN_RETRY) {
|
|
handleError(new Error('Reached MAX_QUEUE_CONN_RETRY'), true);
|
|
return;
|
|
}
|
|
try {
|
|
const conn = await amqp.connect('amqp://127.0.0.1', { heartbeat: 60 });
|
|
conn.on('error', function (err) {
|
|
if (err.message !== 'Connection closing') {
|
|
debug('[AMQP] conn error', err.message);
|
|
}
|
|
});
|
|
conn.on('close', async () => {
|
|
console.error('[AMQP] reconnecting');
|
|
await delay(3000);
|
|
connectRabbitMq();
|
|
});
|
|
mqChannel = await conn.createChannel();
|
|
debug('Channel created');
|
|
qConnTry = 0;
|
|
} catch (err) {
|
|
console.error(err);
|
|
handleError(err);
|
|
debug('[AMQP] reconnecting in 3s');
|
|
return delay(3000).then(() => connectRabbitMq());
|
|
}
|
|
}
|
|
|
|
function makeTrackPackage(id, gdata) {
|
|
const pkg = {};
|
|
pkg[id.toString()] = [Object.assign({}, gdata)];
|
|
return pkg;
|
|
}
|
|
|
|
function updateData(gdata) {
|
|
// Fix rollup gpsdatetime problem if any
|
|
if (gdata.gdt instanceof Date) {
|
|
// Compensate 1024 weeks to the date if older than 19.6 years
|
|
if (((((new Date() - gdata.gdt) / 86400000) / 365) / 19.6) >= 1)
|
|
gdata.gdt.setDate((gdata.gdt.getDate() + (7 * 1024)));
|
|
}
|
|
gdata.lat = numUtil.fixedTo(gdata.lat, 6);
|
|
gdata.lon = numUtil.fixedTo(gdata.lon, 6);
|
|
gdata.appRate = gdata['appRate'] ? numUtil.fixedTo(gdata.appRate, 1) : 0;
|
|
}
|
|
|
|
async function main() {
|
|
try {
|
|
srvClosing = false;
|
|
|
|
mgClient = await MongoClient.connect(MCON_URI, connOps);
|
|
await connectRabbitMq();
|
|
await mqChannel.assertQueue(trkQueue, { durable: true });
|
|
|
|
const parser = new protoOp.parser();
|
|
|
|
// Get vehicles with lastest track location
|
|
await getVehsLastTrack();
|
|
|
|
// Handle last parsed non-processed data in special cases (RabbitMq or MongoDB connection broke down)
|
|
if (await fs.pathExistsSync(BK_FILE)) {
|
|
const bkData = await fs.readJson(BK_FILE);
|
|
if (bkData && bkData.length) {
|
|
let dataById;
|
|
for (const v in trkVehs) {
|
|
if (!trkVehs[v] || !trkVehs[v].last) continue;
|
|
|
|
dataById = bkData.filter(it => it.id === v && (it.data.gdt && it.data.gdt > trkVehs[v].last.gdt.toISOString()));
|
|
if (dataById && dataById.length) {
|
|
// Re-enqueue last unprocess data for this AC
|
|
if (!trkVehs[v].queue) trkVehs[v].queue = getProcessQueue();
|
|
dataById.forEach(data => {
|
|
data.data.gdt = new Date(data.data.gdt);
|
|
trkVehs[v].queue.push(data);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
await fs.remove(BK_FILE);
|
|
}
|
|
|
|
// Construct a server
|
|
server = new net.Server(socket => {
|
|
const session = `${socket.remoteAddress}:${socket.remotePort}`;
|
|
debug('New connection: %s', session);
|
|
|
|
const devSocket = new protoOp.device(socket);
|
|
|
|
devSocket.on('error', err => debug(session + ': ' + err));
|
|
devSocket.on('close', hadError => {
|
|
// if (hadError)
|
|
debug('Connection closed: %s, Err: %o', session, hadError);
|
|
devSocket.destroy();
|
|
});
|
|
|
|
// Process incomming data from each device connection
|
|
devSocket.on('data', (resPkg) => {
|
|
if (!devSocket.id || !(resPkg && resPkg.length)) return;
|
|
|
|
// LOGGING
|
|
if ((+keys.LOG_RAW) && keys.LOG_IDS.length && keys.LOG_IDS.split(',').includes(devSocket.id)) {
|
|
debug('RAW %s: %s', devSocket.id, resPkg.toString('hex').toUpperCase());
|
|
}
|
|
|
|
try {
|
|
// Setup 1 data process task queue for each connection
|
|
if (!trkVehs[devSocket.id])
|
|
trkVehs[devSocket.id] = { queue: getProcessQueue() };
|
|
else if (!trkVehs[devSocket.id].queue)
|
|
trkVehs[devSocket.id].queue = getProcessQueue();
|
|
|
|
if (!trkVehs[devSocket.id].socket) trkVehs[devSocket.id].socket = devSocket;
|
|
|
|
const gdata = parser.parse(resPkg);
|
|
if (gdata) {
|
|
trkVehs[devSocket.id].queue.push({ id: devSocket.id, data: gdata });
|
|
// LOGGING
|
|
if (+keys.LOG_DEBUG && keys.LOG_IDS.length && keys.LOG_IDS.split(',').includes(devSocket.id)) {
|
|
debug('DEBUG %s: %o', devSocket.id, gdata);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
debug(error);
|
|
debug('ERR %s: %s', devSocket.id, resPkg.toString('hex').toUpperCase());
|
|
}
|
|
});
|
|
});
|
|
|
|
server.on('error', (err) => debug(err));
|
|
server.on('close', () => debug('Server shutdowned'));
|
|
|
|
server.listen(protoOp.port, '0.0.0.0', () => debug(`Listening on port ${protoOp.port}...`));
|
|
|
|
} catch (error) {
|
|
handleError(error, true);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Worker function that processes each incoming parsed data package
|
|
* @param {*} data { id: <unitId>, data: gdata }
|
|
*/
|
|
async function processData(data) {
|
|
if (!data || !data['id'] || !data['data']) return;
|
|
|
|
let trkVeh, id = data.id, gdata = data.data, gDate, dist = 0;
|
|
try {
|
|
updateData(gdata);
|
|
|
|
// Set new track location
|
|
await mqChannel.sendToQueue(trkQueue, Buffer.from(JSON.stringify(makeTrackPackage(id, gdata))));
|
|
|
|
if (!mgClient.isConnected())
|
|
mgClient = await MongoClient.connect(MCON_URI, connOps);
|
|
|
|
// Calculate distance offset and update the new track for the date
|
|
trkVeh = trkVehs[id];
|
|
gDate = gdata.gdt.toISOString().slice(0, 10);
|
|
|
|
if (trkVeh && trkVeh.last) {
|
|
const prevLL = new LatLon(trkVeh.last.lat, trkVeh.last.lon);
|
|
const currLL = new LatLon(gdata.lat, gdata.lon);
|
|
|
|
if (prevLL.lat != currLL.lat && prevLL.lon != currLL.lon) {
|
|
dist = prevLL.distanceTo(currLL);
|
|
if ((dist * 1e-3) >= MAX_TRK_DIST)
|
|
dist = 0;
|
|
}
|
|
// debug('%s,%s, prevLL.lat, prevLL.lon);
|
|
// debug('%s,%s,%s, %s, %s', currLL.lat, currLL.lon, gdata.inputs, gdata.gdt.toISOString(), dist);
|
|
trkVeh.date = gDate;
|
|
trkVeh.last = gdata;
|
|
} else {
|
|
trkVehs[id].last = gdata;
|
|
}
|
|
|
|
// Update the Track to DB
|
|
const _session = mgClient.startSession();
|
|
try {
|
|
await _session.withTransaction(async () => {
|
|
await mgClient.db(DB_NAME).collection(LOC_COL).updateOne(
|
|
{ 'unitId': id, date: gDate },
|
|
{
|
|
$push: { 'locs': gdata }, // Q: Sort when adding new a track point ??? probably not if settings configured properly
|
|
$inc: { 'total': 1, 'dist': dist }
|
|
},
|
|
{ upsert: true, session: _session });
|
|
// Update TRK_CACHE_COL for the latest track point
|
|
await mgClient.db(DB_NAME).collection(LOC_CACHE_COL).updateOne(
|
|
{ 'unitId': id },
|
|
{ $currentDate: { 'updatedAt': true }, $set: gdata },
|
|
{ upsert: true, session: _session });
|
|
});
|
|
} finally {
|
|
await _session.endSession();
|
|
}
|
|
} catch (error) {
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a data process queue instance
|
|
*/
|
|
function getProcessQueue() {
|
|
return async.queue((data, callback) => {
|
|
processData(data)
|
|
.catch(error => {
|
|
if (error instanceof MongoError || (error.message && error.message.includes('ECONNREFUSED') || error.message.includes('Channel closed'))) {
|
|
if (trkVehs[data.id].queue) {
|
|
trkVehs[data.id].queue.pause();
|
|
trkVehs[data.id].queue.unshift(data);
|
|
}
|
|
|
|
if (!srvClosing) {
|
|
// Marked as closing was handled by one of the socket's queue worker to avoid doing this multiple times
|
|
srvClosing = true;
|
|
server.close();
|
|
let bkData = [];
|
|
for (const tv of Object.values(trkVehs).filter(it => it.socket || it.queue)) {
|
|
if (tv.socket) tv.socket.destroy(new Error("CONN_ERROR"));
|
|
if (tv.queue && !tv.queue.idle()) {
|
|
tv.queue.pause();
|
|
bkData = bkData.concat([...tv.queue]);
|
|
}
|
|
}
|
|
// Backup unprocessed data to local file
|
|
if (bkData.length) fs.writeJSONSync(BK_FILE, bkData);
|
|
|
|
handleError(error, true);
|
|
}
|
|
}
|
|
})
|
|
.finally(() => callback());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get AC list with latest data if any
|
|
*/
|
|
async function getVehsLastTrack() {
|
|
if (!mgClient.isConnected())
|
|
mgClient = await MongoClient.connect(MCON_URI, connOps);
|
|
const vTrks = await mgClient.db(DB_NAME).collection(LOC_COL).aggregate([
|
|
{ $sort: { date: 1 } },
|
|
{
|
|
$group:
|
|
{
|
|
_id: '$unitId',
|
|
item: { $last: '$$ROOT' }
|
|
}
|
|
},
|
|
{
|
|
$project: {
|
|
_id: 0,
|
|
unitId: '$item.unitId',
|
|
date: '$item.date',
|
|
last: { $arrayElemAt: ['$item.locs', -1] }
|
|
}
|
|
}
|
|
], { 'allowDiskUse': true }).toArray();
|
|
|
|
for (let i = 0; i < vTrks.length; i++) {
|
|
const vt = vTrks[i];
|
|
if (!trkVehs[vt.unitId])
|
|
trkVehs[vt.unitId] = { last: vt.last };
|
|
else
|
|
trkVehs[vt.unitId].last = vt.last;
|
|
}
|
|
}
|
|
|
|
async function handleError(error, forceExit = false) {
|
|
if (error) debug(error);
|
|
if (forceExit) process.exit(0);
|
|
}
|
|
|
|
// process.on('SIGINT', () => {
|
|
// shutDown('SIGINT');
|
|
// });
|
|
process.on('SIGTERM', () => {
|
|
shutDown('SIGTERM');
|
|
});
|
|
|
|
function shutDown(msg) {
|
|
// Try to shutdown the server gracefully
|
|
debug(`${msg} signal received.`);
|
|
debug('Closing gps server.');
|
|
if (server) {
|
|
server.close(() => {
|
|
debug('GPS server closed.');
|
|
if (mgClient) {
|
|
mgClient.close(false, () => { // boolean means [force], see in mongoose doc
|
|
debug('MongoDb connection closed.');
|
|
process.exit(0);
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}
|
|
// Start the server app
|
|
main();
|