/* eslint-disable vars-on-top */ 'use strict'; const net = require('net'), amqp = require('amqplib'), async = require('async'), numUtil = require('number-util'), keys = require('./keys'), LatLon = require('geodesy').LatLonEllipsoidal, fs = require('fs-extra'), dbUtil = require('db-util'), env = require('./helpers/env'), debug = require('debug')('gps-server'); const protoOps = { 'AGNAV': { port: 6080, device: require('device-tcp-socket').AgNavTCPDevice, parser: require('binary-package-parser').AgNavParser }, 'RAP': { port: 6082, device: require('device-tcp-socket').RAPTCPDevice, parser: require('binary-package-parser').RAPParser } }; const protoOp = protoOps[env.PROTOCOL]; if (!protoOp) { return shutDown(`INVALID_PROTO. The server's PROTOCOL must be one of the values in [${Object.keys(protoOps)}]`); } const MAX_QUEUE_CONN_RETRY = 6; const MAX_TRK_DIST = 5; // kilometers. Max track distance is for determiming when splitting into track segments between 2 consecutive points const LOC_COL = 'locations'; const LOC_CACHE_COL = 'location_cache'; const BK_FILE = './backup.json'; var server, srvClosing = false, mgClient; const trkQueue = env.QUEUE_NAME_GDATA || 'gdata'; var mqChannel; var trkVehs = {}; // { 'unitId': { last: , } } const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms))); async function connectDb() { const conOps = { db: env.DB_NAME, user: env.DB_USR, pass: env.DB_PWD, hosts: env.DB_HOSTS, replicaSet: env.DB_REPL_SET }; const dbClient = await dbUtil.native.connect(conOps); debug('Mongodb connected.'); return dbClient; } function isClientConnected(client) { if (!client) return false; return client.hasOwnProperty('isConnected') ? client.isConnected() : (client.hasOwnProperty('topology') && client.topology.isConnected()); } var qConnTry = 0; async function connectRabbitMq() { const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST, port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR, password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 60 }; qConnTry++; if (qConnTry >= MAX_QUEUE_CONN_RETRY) { handleError(new Error('Reached MAX_QUEUE_CONN_RETRY'), true); return; } try { const conn = await amqp.connect(conOps); conn.on('error', function (err) { if (err.message !== 'Connection closing') { debug('[AMQP] conn error', err.message); } }); conn.on('close', async () => { console.error('[AMQP] reconnecting'); await delay(3000); connectRabbitMq(); }); mqChannel = await conn.createChannel(); debug('[AMQP] Channel created'); qConnTry = 0; } catch (err) { console.error(err); handleError(err); debug('[AMQP] reconnecting in 3s'); return delay(3000).then(() => connectRabbitMq()); } } function makeTrackPackage(id, gdata) { const pkg = {}; pkg[id.toString()] = [Object.assign({}, gdata)]; return pkg; } function updateData(gdata) { // Fix rollup gpsdatetime problem if any if (gdata.gdt instanceof Date) { // Compensate 1024 weeks to the date if older than 19.6 years if (((((new Date() - gdata.gdt) / 86400000) / 365) / 19.6) >= 1) gdata.gdt.setDate((gdata.gdt.getDate() + (7 * 1024))); } gdata.lat = numUtil.fixedTo(gdata.lat, 6); gdata.lon = numUtil.fixedTo(gdata.lon, 6); gdata.appRate = gdata['appRate'] ? numUtil.fixedTo(gdata.appRate, 1) : 0; } async function main() { try { srvClosing = false; mgClient = await connectDb(); await connectRabbitMq(); await mqChannel.assertQueue(trkQueue, { durable: true }); const parser = new protoOp.parser(); // Get vehicles with lastest track location await getVehsLastTrack(); // Handle last parsed non-processed data in special cases (RabbitMq or MongoDB connection broke down) if (await fs.pathExistsSync(BK_FILE)) { const bkData = await fs.readJson(BK_FILE); if (bkData && bkData.length) { let dataById; for (const v in trkVehs) { if (!trkVehs[v] || !trkVehs[v].last) continue; dataById = bkData.filter(it => it.id === v && (it.data.gdt && it.data.gdt > trkVehs[v].last.gdt.toISOString())); if (dataById && dataById.length) { // Re-enqueue last unprocess data for this AC if (!trkVehs[v].queue) trkVehs[v].queue = getProcessQueue(); dataById.forEach(data => { data.data.gdt = new Date(data.data.gdt); trkVehs[v].queue.push(data); }); } } } await fs.remove(BK_FILE); } // Construct a server server = new net.Server(socket => { const session = `${socket.remoteAddress}:${socket.remotePort}`; debug('New connection: %s', session); const devSocket = new protoOp.device(socket); devSocket.on('error', err => debug(session + ': ' + err)); devSocket.on('close', hadError => { // if (hadError) debug('Connection closed: %s, Err: %o', session, hadError); devSocket.destroy(); }); // Process incomming data from each device connection devSocket.on('data', (resPkg) => { if (!devSocket.id || !(resPkg && resPkg.length)) return; // LOGGING if ((+keys.LOG_RAW) && keys.LOG_IDS.length && keys.LOG_IDS.split(',').includes(devSocket.id)) { debug('RAW %s: %s', devSocket.id, resPkg.toString('hex').toUpperCase()); } try { // Setup 1 data process task queue for each connection if (!trkVehs[devSocket.id]) trkVehs[devSocket.id] = { queue: getProcessQueue() }; else if (!trkVehs[devSocket.id].queue) trkVehs[devSocket.id].queue = getProcessQueue(); if (!trkVehs[devSocket.id].socket) trkVehs[devSocket.id].socket = devSocket; const gdata = parser.parse(resPkg); if (gdata) { trkVehs[devSocket.id].queue.push({ id: devSocket.id, data: gdata }); // LOGGING if (+keys.LOG_DEBUG && keys.LOG_IDS.length && keys.LOG_IDS.split(',').includes(devSocket.id)) { debug('DEBUG %s: %o', devSocket.id, gdata); } } } catch (error) { debug(error); debug('ERR %s: %s', devSocket.id, resPkg.toString('hex').toUpperCase()); } }); }); server.on('error', (err) => debug(err)); server.on('close', () => debug('Server shutdowned')); server.listen(protoOp.port, '0.0.0.0', () => debug(`Listening on port ${protoOp.port} for protocol '${env.PROTOCOL}'...`)); } catch (error) { handleError(error, true); } } /** * Worker function that processes each incoming parsed data package * @param {*} data { id: , data: gdata } */ async function processData(data) { if (!data || !data['id'] || !data['data']) return; let trkVeh, id = data.id, gdata = data.data, gDate, dist = 0; try { updateData(gdata); // Set new track location await mqChannel.sendToQueue(trkQueue, Buffer.from(JSON.stringify(makeTrackPackage(id, gdata)))); if (!isClientConnected(mgClient)) mgClient = connectDb(); // Calculate distance offset and update the new track for the date trkVeh = trkVehs[id]; gDate = gdata.gdt.toISOString().slice(0, 10); if (trkVeh && trkVeh.last) { const prevLL = new LatLon(trkVeh.last.lat, trkVeh.last.lon); const currLL = new LatLon(gdata.lat, gdata.lon); if (prevLL.lat != currLL.lat && prevLL.lon != currLL.lon) { dist = prevLL.distanceTo(currLL); if ((dist * 1e-3) >= MAX_TRK_DIST) dist = 0; } // debug('%s,%s, prevLL.lat, prevLL.lon); // debug('%s,%s,%s, %s, %s', currLL.lat, currLL.lon, gdata.inputs, gdata.gdt.toISOString(), dist); trkVeh.date = gDate; trkVeh.last = gdata; } else { trkVehs[id].last = gdata; } // Update the Track to DB const _session = mgClient.startSession(); try { await _session.withTransaction(async () => { await mgClient.db(env.DB_NAME).collection(LOC_COL).updateOne( { 'unitId': id, date: gDate }, { $push: { 'locs': gdata }, // Q: Sort when adding new a track point ??? probably not if settings configured properly $inc: { 'total': 1, 'dist': dist } }, { upsert: true, session: _session }); // Update TRK_CACHE_COL for the latest track point await mgClient.db(env.DB_NAME).collection(LOC_CACHE_COL).updateOne( { 'unitId': id }, { $currentDate: { 'updatedAt': true }, $set: gdata }, { upsert: true, session: _session }); }); } finally { await _session.endSession(); } } catch (error) { throw error; } } /** * Create a data process queue instance */ function getProcessQueue() { return async.queue((data, callback) => { processData(data) .catch(error => { if (error instanceof MongoServerError || (error.message && error.message.includes('ECONNREFUSED') || error.message.includes('Channel closed'))) { if (trkVehs[data.id].queue) { trkVehs[data.id].queue.pause(); trkVehs[data.id].queue.unshift(data); } if (!srvClosing) { // Marked as closing was handled by one of the socket's queue worker to avoid doing this multiple times srvClosing = true; server.close(); let bkData = []; for (const tv of Object.values(trkVehs).filter(it => it.socket || it.queue)) { if (tv.socket) tv.socket.destroy(new Error("CONN_ERROR")); if (tv.queue && !tv.queue.idle()) { tv.queue.pause(); bkData = bkData.concat([...tv.queue]); } } // Backup unprocessed data to local file if (bkData.length) fs.writeJSONSync(BK_FILE, bkData); handleError(error, true); } } }) .finally(() => callback()); }); } /** * Get AC list with latest data if any */ async function getVehsLastTrack() { if (!isClientConnected(mgClient)) mgClient = await connectDb(); const vTrks = await mgClient.db(env.DB_DB).collection(LOC_COL).aggregate([ { $sort: { date: 1 } }, { $group: { _id: '$unitId', item: { $last: '$$ROOT' } } }, { $project: { _id: 0, unitId: '$item.unitId', date: '$item.date', last: { $arrayElemAt: ['$item.locs', -1] } } } ], { 'allowDiskUse': true }).toArray(); for (let i = 0; i < vTrks.length; i++) { const vt = vTrks[i]; if (!trkVehs[vt.unitId]) trkVehs[vt.unitId] = { last: vt.last }; else trkVehs[vt.unitId].last = vt.last; } } async function handleError(error, forceExit = false) { if (error) debug(error); if (forceExit) process.exit(0); } // process.on('SIGINT', () => { // shutDown('SIGINT'); // }); process.on('SIGTERM', () => { shutDown('SIGTERM'); }); function shutDown(msg) { // Try to shutdown the server gracefully debug(`${msg} signal received.`); debug('Closing gps server.'); if (server) { server.close(() => { debug('GPS server closed.'); if (mgClient) { mgClient.close(false, () => { // boolean means [force], see in mongoose doc debug('MongoDb connection closed.'); process.exit(0); }); } }); } } // Start the server app main();