agmission/Development/track-server/track-server.js

300 lines
7.9 KiB
JavaScript

'use strict';
const express = require('express'),
TrackChannel = require('./track-channel').TrackChannel,
jwt = require('jsonwebtoken'),
repl = require('repl'),
amqp = require('amqplib'),
dbUtil = require('db-util'),
spdy = require('spdy'), // Use Node-spdy for HTTP2 support instead of express's https
fs = require('fs'),
env = require('./helpers/env'),
debug = require('debug')('track-server'); // console.log
const app = express();
const MAX_QUEUE_CONN_RETRY = 6;
const trackChannel = new TrackChannel();
var mqChannel;
var qConnTry = 0;
// Create SSE channels. Non-default options can be passed as an object and only affect that instance
const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms)));
async function connectDb() {
const conOps = {
db: env.DB_NAME, user: env.DB_USR, pass: env.DB_PWD,
hosts: env.DB_HOSTS, replicaSet: env.DB_REPLSET
};
app.locals.conn = await dbUtil.native.connect(conOps);
app.locals.db = app.locals.conn.db();
debug('Mongodb connected.');
}
async function setUpMQ() {
try {
await mqChannel.assertQueue(env.QUEUE_NAME_GDATA, { durable: true });
await mqChannel.consume(env.QUEUE_NAME_GDATA, async (msg) => {
if (msg !== null) {
let gdata = JSON.parse(msg.content);
await trackChannel.setVehGpsData(gdata);
try {
await mqChannel.ack(msg);
} catch (error) {
debug(error);
}
gdata = null;
msg = null;
}
});
} catch (err) { debug(err); }
}
async function connectRabbitMq() {
qConnTry++;
if (qConnTry >= MAX_QUEUE_CONN_RETRY) {
return exit();
}
try {
const conOps = {
protocol: 'amqp',
hostname: env.QUEUE_HOST, port: env.QUEUE_PORT || 5672,
username: env.QUEUE_USR, password: env.QUEUE_PWD,
vhost: env.QUEUE_VHOST || '/',
heartbeat: env.QUEUE_HEARTBEAT || 580
};
const conn = await amqp.connect(conOps);
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
debug('[AMQP] conn error', err.message);
}
});
conn.on('close', async () => {
console.error('[AMQP] reconnecting');
await delay(3000);
await connectRabbitMq();
});
mqChannel = await conn.createChannel();
debug('[AMQP] Channel created');
qConnTry = 0;
await setUpMQ();
} catch (err) {
console.error(err);
debug('[AMQP] reconnecting in 1s');
return delay(3000).then(() => connectRabbitMq());
}
}
// Serve the static part of the demo
app.use(express.static(__dirname + '/demo/public'));
// Parsers for POST JSON data
app.use(express.json({ limit: '1mb' }));
// for parsing application/x-www-form-urlencoded
// app.use(express.urlencoded({ limit: "10mb", extended: true, parameterLimit: 100000 }));
// Allow all CORS requests
app.use(function (req, res, next) {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Headers', '*');
res.setHeader('Access-Control-Allow-Methods', '*');
next();
});
app.use('/track/', function (req, res, next) {
let token;
// debug(req.url);
if (req.path.endsWith('/stream'))
token = req.query.token;
else
token = req.headers.authorization;
if (token)
token = token.split(' ');
if (!token || !token.length || !token[token.length - 1].length)
return res.status(401).send('not_authorized').end();
const tokens = jwt.decode(token[token.length - 1].trim());
if (!tokens['uid'] || !tokens['ut']) {
return res.status(401).send('not_authorized').end();
} else {
req.uid = tokens['uid'];
req.ut = tokens['ut'];
}
next();
});
app.post('/track/getVehicles', async (req, res) => {
await trackChannel.getVehicles(req, res);
});
app.post('/track/trackOn', async (req, res) => {
await trackChannel.trackOn(req, res);
});
// Serve the event streams
app.get('/track/stream', async (req, res) => {
try {
// Register this request to the requested channel. The request is now attached to the SSE channel instance and
// will receive any events published to it.
await trackChannel.register(req, res);
} catch (error) {
debug(error);
}
});
app.post('/track/setTrackData', async (req, res) => {
// debug('%o', req.body);
await trackChannel.setVehGpsData(req.body);
res.send({ ok: true });
});
app.post('/track/findTracks', async (req, res) => {
await trackChannel.findTracks(req, res);
});
app.post('/track/getTracks', async (req, res) => {
await trackChannel.getTracks(req, res);
});
/**
* @api {post} /api/v1/tracks Get Tracking Data
* @apiVersion 1.0.0
* @apiName PostTracks
* @apiGroup Tracks
* @apiDescription Query for tracking data.
* @apiParam {String} unitId The tracking unit Identification (required)
* @apiParam {String} from DateTime string to get the data from (required)
* @apiParam {String} to DateTime string to get the data to (optional).
* It will be ignored If the value is not specified or invalid.
* @apiParamExample {json} Request-Example:
* {
* "unitId": "0000000xxx", "from": "2017-12-01T00:00:00.000Z", "to": "2017-12-01T23:59:59.000Z"
* }
* Or
* {
* "unitId": "0000000xxx", "from": "2017-12-01 00:00:00.000Z", "to": "2017-12-01 23:59:59.000Z"
* }
* @apiSuccess {Object[]} items List of found track data in JSON array.
* @apiSuccess {Object} items.track Track Data object
* @apiSuccess {String} items.track.uid The tracking unit Identification
* @apiSuccess {Number} items.track.lat Latitude in degree
* @apiSuccess {Number} items.track.lng Longitude in degree
* @apiSuccess {Number} items.track.alt Altitude in Meters
* @apiSuccess {Number} items.track.hdg Heading (GPS bearing 0..359)
* @apiSuccess {Number} items.track.spd Speed in Km/h
* @apiSuccess {String} items.track.gdt GPS datetime in ISO 8601 format
* @apiSuccess {Number} items.track.spr 1: spray on, 0: spray off
* @apiSuccessExample {json} Success
* [
* {
* "uid": "0000000xxx",
* "lat": 24.72025,
* "lng": -81.0701,
* "alt": 3,
* "hdg": 66,
* "spd": 9.72,
* "gdt": "2017-12-01T21:53:42.000Z",
* "spr": 0
* },
* {
* "uid": "0000000xxx",
* "lat": 24.72035,
* "lng": -81.06985,
* "alt": 3,
* "hdg": 66,
* "spd": 9.72,
* "gdt": "2017-12-01T21:53:52.000Z",
* "spr": 0
* }
* ]
*/
app.post('/track/tracks', async (req, res) => {
await trackChannel.exportTracks(req, res);
});
// Return a 404 if no routes match
app.use((req, res) => {
res.set('Cache-Control', 'private, no-store');
res.status(404).end('Not found');
});
// Create a HTTPS - HTTP2 server
spdy.createServer({ key: fs.readFileSync(env.SSL_KEY), cert: fs.readFileSync(env.SSL_CERT) }, app)
.listen(env.PORT, async (error) => {
const onAppErr = (err) => {
debug(error);
process.exit(1);
}
if (error) return onAppErr(error);
try {
await connectDb();
trackChannel.db = app.locals.db;
await connectRabbitMq();
debug(`HTTPS-v2 Server listening on port ${env.PORT}`);
} catch (error) {
onAppErr(error);
}
});
process.on('SIGINT', () => {
exit();
});
process.on('SIGTERM', () => {
exit();
});
function exit() {
if (app.locals.conn)
try { app.locals.conn.close(); debug('db connection closed.'); } catch (e) { }
trackChannel.close();
process.exit(1);
}
/* Command line support for server status monitoring */
const rc = repl.start('').context;
rc.clear = () => {
process.stdout.write('\u001B[2J\u001B[0;0f');
};
rc.cls = rc.clear;
rc.l = () => {
// console.log("list clients");
console.log(trackChannel.listClients());
};
rc.c = () => {
console.log(`${trackChannel.getClientsCount()} clients`);
};
rc.p = (data, event) => {
trackChannel.publish(data, event);
};
rc.setData = (gData) => {
trackChannel.setGpsData(gData);
};
rc.setVD = (vId) => {
const gData = {};
gData[vId] = [{ lat: 44.44, lon: -79.433, gdt: new Date() }];
trackChannel.setVehGpsData(gData).catch(err => debug(err));
};
rc.getVehicles = async () => {
return await trackChannel.getVehicles();
};
rc.clear = () => {
trackChannel.clear();
};
rc.mu = () => {
if (global.gc) global.gc();
const ms = process.memoryUsage();
ms.heapTotal = ms.heapTotal / 1024 / 1024;
ms.rss = ms.rss / 1024 / 1024;
ms.heapUsed = ms.heapUsed / 1024 / 1024;
ms.external = ms.external / 1024 / 1024;
debug(ms);
};