agmission/Development/track-server/track-channel.js

659 lines
19 KiB
JavaScript

'use strict';
const uniqid = require('uniqid'),
utils = require('./helpers/utils'),
once = require('events'),
ObjectId = require('db-util').ObjectId,
simplify = require('simplify-path'),
env = require('./helpers/env'),
error = require('error-handler'),
debug = require('debug')('track-channel'); // console.log;
const mockPromise = (func, delay = 500) => new Promise((resolve) =>
setTimeout(() => {
const val = func();
resolve(val);
}, delay));
class TrackChannel {
constructor(options) {
this.options = Object.assign({}, {
pingInterval: (env && env.HEARTBEAT || 59) * 1000,
clientRetryInterval: (env && env.RETRY_INT || 3) * 1000,
historySize: 30
}, options);
this.db = this.options.db;
/** List of connected tracking clients
* {
* 'sseId': { req, res, lastPing: <datetime>, ready: boolean <(ready for pumping new track data?)>, sentLast: boolean <ever sent last known> }
* }
*
*/
this.clients = {};
/** List of tracking vehicles
* { 'vId': { trackers: { 'sseId': <lastGPS DateTime> }, pos: [recent pos (max of historySize)] } }
*/
this.vehicles = {}; //
// Broadcast ping to all connected clients to tell browsers to keep those connections alive
if (this.options.pingInterval) {
this.pingTimer = setInterval(() => this.publish('', 'p'), this.options.pingInterval / 2);
}
}
/**
* Set new Gps tracking data for vehicles
* @param {*} gData Vehicle GPS data { 'vehId': [data] }
* @notes Data element must contain at least: { lat: <lat>, lon: <lon>, gdt: <gpsDateTime> }
* TEST: var now=new Date(); setVD({ veh1: [{ gdt: now }]})
*/
async setVehGpsData(gData) {
if (utils.isEmpty(gData)) return;
const vId = Object.getOwnPropertyNames(gData)[0];
if (!vId) return;
try {
gData[vId][0].gdt = new Date(gData[vId][0].gdt);
if (!this.vehicles[vId] || utils.isEmpty(this.vehicles[vId].trackers))
this.vehicles[vId] = { trackers: {}, pos: gData[vId] };
else {
// Just simply append new data to cache. Might need to check for continuous gpdDT
this.vehicles[vId].pos = [...(this.vehicles[vId].pos || []), ...gData[vId]];
// TODO: might need to sort the this.vehicles[vId].pos by gdt by 'ascending' to make sure data always in correct order
// Check & trim history data cache
if (this.vehicles[vId].pos.length > this.options.historySize) {
this.vehicles[vId].pos = this.vehicles[vId].pos.slice((this.vehicles[vId].pos.length - this.options.historySize) - 1);
}
}
// Send update to trackers
if (this.vehicles[vId] && !utils.isEmpty(this.vehicles[vId].trackers)) {
let promises = []; // , lastGDT;
for (let t in this.vehicles[vId].trackers) {
if (Object.prototype.hasOwnProperty.call(this.vehicles[vId].trackers, t)) {
// lastGDT = this.vehicles[vId].trackers[t];
if (this.clients[t] && this.clients[t]['ready']) { // Skip until available
const aFn = (async () => {
// debug('client ', t, this.clients[t]['ready'], 'time: ', this.vehicles[vId].trackers[t]);
let toSend = {};
// let tosendLast = false;
// // Check to include recent real data came when the client was not ready (the gap after client reconnected)
// let i = this.vehicles[vId].pos.length - 1;
// if (utils.isDateTime(lastGDT)) {
// if (utils.dateDiff(lastGDT, this.vehicles[vId].pos[i].gdt) <= 0) {
// toSend[vId] = this.vehicles[vId].pos.slice(i);
// tosendLast = true;
// } else {
// while (i >= 0 && utils.dateDiff(lastGDT, this.vehicles[vId].pos[i].gdt) > 0) {
// i--;
// }
// if (i === this.vehicles[vId].pos.length - 1)
// tosendLast = true;
// else
// toSend[vId] = this.vehicles[vId].pos.slice(i + 1);
// }
// } else {
// Just send the new data if last gdt N/A
toSend = utils.clone(gData);
// tosendLast = true;
// }
// debug('Sending data to: ' + t);
// if (!tosendLast || (tosendLast && !this.clients[t]['sentLast'])) {
await this.publishOne(t, toSend, 'd');
// if (!this.clients[t]['sentLast'])
// this.clients[t]['sentLast'] = true;
// else
// this.vehicles[vId].trackers[t] = utils.last(toSend[vId]).gdt;
// }
})();
promises.push(aFn);
}
}
}
// Ignore sending errors, continue sending to others if any failed to be sent
await Promise.all(promises.map(p => p.catch(error => debug(error))));
promises = null;
// lastGDT = null;
}
} catch (err) {
debug(err);
}
}
async trackOn(req, res) {
debug(req.url, new Date());
const input = req.body, client = this.clients[input.sId];
try {
if (input.sId && client) {
this.clients[input.sId].ready = false; // Mark this client as not yet ready for realtime data
this._removeVTracker(input.sId); // Clear previous tracking list then re-update
if (input.vt && input.vt.length) {
let tv;
for (let i = 0; i < input.vt.length; i++) {
tv = input.vt[i];
if (!this.vehicles[tv.v])
this.vehicles[tv.v] = { trackers: {}, pos: [] };
// TODO: get latest data of each vehicle to send to the client
// let recentData = await this._getRecentVehData(tv.v, tv.gdt);
let recentData;
if (recentData) {
await this.publishOne(input.sId, recentData, 'd');
this.vehicles[tv.v].trackers[input.sId] = utils.last(recentData[tv.v]).gdt;
recentData = null;
} else {
this.vehicles[tv.v].trackers[input.sId] = new Date(tv.gdt);
// debug(tv.v, this.vehicles[tv.v].trackers[input.sId]);
}
}
}
this.clients[input.sId].ready = true; // Mark the vehicle ready to be updated with real data
res.send({ 'trackOn': 'OK' }).end();
} else {
res.status(409).end();
}
} catch (err) {
error.handleResErr(res, err);
}
}
async _getRecentVehData(vId, fromDT) {
/* This is just for simulation of getting recent data from the db to avoid gap */
const sample = 1;
let data = {};
if (!fromDT) { // getLatestRecord
} else {
data = await mockPromise(() => {
let gdata = [], time = (new Date(fromDT)).getTime() - sample * 10000;
for (let i = 0; i < sample; i++) {
gdata.push({ lat: 33.44, lon: -79.33, gdt: (new Date(time + i * 10000)) });
}
const res = {}; res[vId] = gdata;
return res;
});
}
return data;
}
async register(req, res) {
const c = { req: req, res: res };
c.req.socket.setTimeout(0);
c.req.socket.setKeepAlive(true);
c.res.writeHead(200, {
'Connection': 'keep-alive',
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
});
// let padding = new Array(2049);
// c.res.write(':' + padding.join(' ') + '\n'); // 2kB padding for IE
c.res.write('retry: ' + this.options.clientRetryInterval + '\n\n');
let sseId = uniqid();
this.clients[sseId] = c;
c.req.on('close', () => this._unregister(sseId));
c.res.on('close', () => this._unregister(sseId));
await this.publishOne(sseId, sseId, 'login');
return this.clients[sseId];
}
_unregister(sseId) {
const c = this.clients[sseId];
if (c) {
// debug(sseId + ' client closed.');
c.req.off('close', () => this._unregister(sseId));
c.res.off('close', () => this._unregister(sseId));
c.res.end();
delete this.clients[sseId];
this._removeVTracker(sseId);
}
}
_removeVTracker(sseId) {
for (let k in this.vehicles) {
if (this.vehicles[k].trackers && this.vehicles[k].trackers[sseId])
delete this.vehicles[k].trackers[sseId];
}
}
async publish(data, eventName, toId = '*') {
if (toId === '*' || !toId) {
for (let c in this.clients) {
if (Object.prototype.hasOwnProperty.call(this.clients, c))
await this.publishOne(c, data, eventName);
}
} else {
await this.publishOne(toId, data, eventName);
}
}
async publishOne(toId, data, eventName) {
if (!toId || !this.clients[toId]) return;
// debug('publishOne: ' + toId);
const clientSocket = this.clients[toId].res;
if (!clientSocket) { // Corrupted registration => remove
this._unregister(toId);
return;
}
if (eventName === 'p' && this.clients[toId].lastPing) { // Check and skip unnecessary pings
let now = new Date(), pToNow = now - this.clients[toId].lastPing;
// debug(toId, "Time:", pToNow);
if (pToNow < this.options.pingInterval) {
this.clients[toId].lastPing = new Date((now.getTime() + pToNow) - this.options.pingInterval);
return;
}
}
let _data;
if (typeof data === 'object')
_data = JSON.stringify(data);
else
_data = data;
// Convert lines in json text to data: <text> \n (sse protocol)
_data = _data ? _data.split(/[\r\n|\n]+/).map(str => 'data: ' + str).join('\n') : '';
let output = (
(eventName ? 'event: ' + eventName + '\n' : '') +
(_data || 'data: ') + '\n\n'
);
if (!clientSocket.write(output))
try {
await once(clientSocket, 'drain'); // Handle backpressure on write
} catch (error) {
debug(error);
}
this.clients[toId].lastPing = new Date();
output = null;
_data = null;
data = null;
// debug('Sent to' + toId);
}
// _write(stream, data) {
// return new Promise((resolve, reject) => {
// if (!stream)
// return reject('Invalid stream');
// if (!stream.write(data)) {
// stream.once('drain', resolve);
// } else {
// process.nextTick(resolve);
// }
// });
// }
listClients() {
const rollupByIP = {};
let cv;
for (let c in this.clients) {
if (Object.prototype.hasOwnProperty.call(this.clients, c)) {
cv = this.clients[c];
if (cv) {
const ip = cv.req.headers['x-real-ip'] || cv.req.socket.remoteAddress;
if (!(ip in rollupByIP)) {
rollupByIP[ip] = [];
}
rollupByIP[ip].push(c);
}
}
}
return rollupByIP;
}
getClientsCount() {
return Object.keys(this.clients).length;
}
close() {
clearInterval(this.pingTimer);
for (let c in this.clients) {
if (Object.prototype.hasOwnProperty.call(this.clients, c))
this._removeVTracker(c);
}
debug('Channel closed gracefully !');
}
/**
* Set new Gps tracking data for vehicles.
* ** FOR TESTING ONLY **
* @param {*} gData Vehicle GPS data { 'vehId': [data] }
*/
async setGpsData(gData) {
if (!gData) { // For dev testing only
// const now = new Date();
gData = {};
gData['veh1'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }];
gData['veh2'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }];
gData['veh3'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }];
gData['veh4'] = [{ lat: 33.33, lon: -79.33, stat: 0, gdt: new Date() }];
gData['veh5'] = [{ lat: 33.34, lon: -79.33, stat: 0, gdt: new Date() }];
}
let payload, all = [];
for (let k in gData) {
if (Object.prototype.hasOwnProperty.call(gData, k)) {
payload = {};
payload[k] = gData[k];
all.push(this.publish(payload, 'd', '*'));
}
}
await Promise.all(all).catch(err => console.log(err));
}
clear() {
this.vehicles = [];
for (let sseId in this.clients) {
if (this.clients[sseId]) {
try {
this.clients[sseId].res.end();
this._unregister(sseId);
} catch (error) { }
}
}
}
/**
* Get all vehicles under this user along with their latest location
* @param {*} ops { byPuid: <parentId> }
*/
async getVehicles(req, res) {
// TODO: add param validation
const applId = ObjectId(req.body.byPuid);
try {
const vPipeline = [
{
$match: {
parent: applId, unitId: { $nin: [null, ''] }, kind: '9'
}
},
{
$lookup:
{
from: 'location_cache',
localField: 'unitId',
foreignField: 'unitId',
as: 'locCache'
}
},
{ $unwind: { path: '$locCache', 'preserveNullAndEmptyArrays': true } },
{
$project: {
unitId: '$unitId',
name: '$name',
acType: { $ifNull: ['$vehicleType', 0] },
kind: 1,
model: '$model',
color: { $ifNull: ['$color', 'yellow'] },
last: {
$cond: [
{ $not: ['$locCache'] },
null,
{
lat: '$locCache.lat',
lon: '$locCache.lon',
alt: '$locCache.alt',
inputs: '$locCache.inputs',
speed: '$locCache.speed',
head: '$locCache.head',
temp: '$locCache.temp',
humid: '$locCache.humid',
appRate: '$locCache.appRate',
windSpd: '$locCache.windSpd',
windHdg: '$locCache.windHdg',
gdt: '$locCache.gdt'
}
]
}
}
}
];
const vehs = await this.db.collection('users').aggregate(vPipeline).toArray();
const uPipeline = [
{
$match: {
$and: [{ kind: { $ne: '9' }, _id: { $ne: ObjectId(req.uid) }, active: true, username: { $nin: [null, ''] } }, { $or: [{ _id: applId }, { parent: applId }] }]
}
},
{
$addFields: { 'userId': { '$toString': '$_id' } }
},
{
$lookup:
{
let: { 'userId': '$userId' },
from: 'location_cache',
pipeline: [
{ $match: { $expr: { $eq: ['$unitId', '$$userId'] } } }
],
as: 'locCache'
}
},
{ $unwind: { path: '$locCache', 'preserveNullAndEmptyArrays': true } },
{
$project: {
unitId: '$userId', kind: 1,
usernames: { $split: ['$username', '@'] },
last: {
$cond: [
{ $not: ['$locCache'] },
null,
{
lat: '$locCache.lat',
lon: '$locCache.lon',
gdt: '$locCache.gdt'
}
]
}
}
},
{
$match: { last: { $ne: null } }
},
{
$project: {
unitId: 1, last: 1, kind: 1,
name: { $arrayElemAt: ['$usernames', 0] }
}
}
];
const users = await this.db.collection('users').aggregate(uPipeline).toArray();
res.json(vehs.concat(users)).end();
} catch (err) {
error.handleResErr(res, err);
}
}
async findTracks(req, res) {
let result = [];
const ops = req.body;
const from = new Date(ops.from);
const to = new Date(ops.to);
try {
if (ops.unitId && from && to) {
const filter = { unitId: ops.unitId };
filter['date'] = from == to ? from.toISOString().slice(0, 10) : { $gte: from.toISOString().slice(0, 10), $lte: to.toISOString().slice(0, 10) };
result = await this.db.collection('locations').aggregate([
{ $match: filter },
{
$project: {
dist: 1, total: 1,
start: { $arrayElemAt: ['$locs', 0] }, end: { $arrayElemAt: ['$locs', -1] }
}
},
{
$project: {
start: '$start.gdt', end: '$end.gdt',
dist: 1, total: 1
}
}
]).toArray();
}
res.json(result).end();
} catch (err) {
error.handleResErr(res, err);
}
}
/**
* Extract location list into spray-off and spray-on segments.
* @param {*} data track location list
* @returns [{ spray: <true/false>, data: [[lat,lon]]}]
*/
_getTracksSegments(data) {
if (!data || !data.length) return [];
let start = 0, cur = 1, seg = [], segs = [], tooFar = false;
while (cur < data.length) {
if ((tooFar = (utils.distance([data[cur - 1].lat, data[cur - 1].lon], [data[cur].lat, data[cur].lon]) >= 5 * 1e3))
|| data[cur].inputs != data[cur - 1].inputs || cur === data.length - 1) {
seg = data.slice(start, tooFar ? cur : cur + 1);
if (seg.length > 1) {
if (seg.length > 1) segs.push(({ spray: !!(seg[0].inputs), data: seg }));
}
seg = [];
start = cur;
}
cur++;
}
return segs;
}
async getTracks(req, res) {
const ops = req.body;
let result = [];
try {
if (ops && ops.unitId && ops.dates && ops.dates.length) {
const vlocs = await this.db.collection('locations').find({ unitId: ops.unitId, date: { $in: ops.dates } }).toArray();
if (vlocs && vlocs.length) {
// TODO: Convert these loop logic into ASYNC if any
for (let i = 0; i < vlocs.length; i++) {
const vloc = vlocs[i];
let segs = [];
if (vloc.locs && vloc.locs.length) {
segs = this._getTracksSegments(vloc.locs);
for (let k = 0; k < segs.length; k++) {
if (segs[k].data.length > 2) {
const lonlats = segs[k].data.map(it => [it.lon, it.lat]);
const simplified = simplify(lonlats, 0.00001);
segs[k].data = simplified.map(it => [it[1], it[0]]);
} else {
segs[k].data = segs[k].data.map(it => [it.lat, it.lon]);
}
}
}
result.push(({ date: vloc.date, end: utils.last(vloc.locs).gdt, dist: vloc.dist, data: segs }));
}
}
}
res.json(result).end();
} catch (err) {
error.handleResErr(res, err);
}
}
/** export Tracks on demand - API */
async exportTracks(req, res) {
const ops = req.body;
if (!ops.unitId || !ops.from || isNaN(Date.parse(ops.from)))
return error.handleResErr(res, 'invalid_param');
try {
var fromDate = ops.from.slice(0, 10);
const pipeline = (
[
{
$match: { parent: ObjectId(req.uid), unitId: ops.unitId.trim() }
},
{
$lookup:
{
from: 'locations',
let: { unitId: "$unitId", fromDate: fromDate },
pipeline: [
{
$match:
{
$expr:
{
$and:
[
{ $eq: ["$unitId", "$$unitId"] },
{ $gte: ["$date", "$$fromDate"] }
]
}
}
},
],
as: 'vlocs'
}
},
{ $unwind: { path: '$vlocs', 'preserveNullAndEmptyArrays': true } },
{
$project: {
uid: '$unitId',
locs: {
$filter: {
input: "$vlocs.locs", as: "loc",
cond: {
$and: [
{ $gt: ["$$loc.gdt", new Date(ops.from)] },
]
}
}
}
}
},
{ $unwind: "$locs" },
{
$project: {
_id: 0,
uid: '$uid',
lat: '$locs.lat',
lng: '$locs.lon',
alt: '$locs.alt',
hdg: '$locs.head',
spd: '$locs.speed',
gdt: '$locs.gdt',
spr: '$locs.inputs'
}
}
]);
if (ops.to && !isNaN(Date.parse(ops.to))) {
pipeline[1].$lookup.let['toDate'] = ops.to.slice(0, 10);
pipeline[1].$lookup.pipeline[0].$match.$expr.$and.push({ $lte: ["$date", "$$toDate"] });
pipeline[3].$project.locs.$filter.cond.$and.push({ $lte: ["$$loc.gdt", new Date(ops.to)] });
}
const result = await this.db.collection('users').aggregate(pipeline).toArray();
res.json(result).end();
} catch (err) {
error.handleResErr(res, err);
}
}
}
module.exports = {
TrackChannel
};