'use strict'; const uniqid = require('uniqid'), utils = require('./helpers/utils'), once = require('events'), ObjectId = require('db-util').ObjectId, simplify = require('simplify-path'), env = require('./helpers/env'), error = require('error-handler'), debug = require('debug')('track-channel'); // console.log; const mockPromise = (func, delay = 500) => new Promise((resolve) => setTimeout(() => { const val = func(); resolve(val); }, delay)); class TrackChannel { constructor(options) { this.options = Object.assign({}, { pingInterval: (env && env.HEARTBEAT || 59) * 1000, clientRetryInterval: (env && env.RETRY_INT || 3) * 1000, historySize: 30 }, options); this.db = this.options.db; /** List of connected tracking clients * { * 'sseId': { req, res, lastPing: , ready: boolean <(ready for pumping new track data?)>, sentLast: boolean } * } * */ this.clients = {}; /** List of tracking vehicles * { 'vId': { trackers: { 'sseId': }, pos: [recent pos (max of historySize)] } } */ this.vehicles = {}; // // Broadcast ping to all connected clients to tell browsers to keep those connections alive if (this.options.pingInterval) { this.pingTimer = setInterval(() => this.publish('', 'p'), this.options.pingInterval / 2); } } /** * Set new Gps tracking data for vehicles * @param {*} gData Vehicle GPS data { 'vehId': [data] } * @notes Data element must contain at least: { lat: , lon: , gdt: } * TEST: var now=new Date(); setVD({ veh1: [{ gdt: now }]}) */ async setVehGpsData(gData) { if (utils.isEmpty(gData)) return; const vId = Object.getOwnPropertyNames(gData)[0]; if (!vId) return; try { gData[vId][0].gdt = new Date(gData[vId][0].gdt); if (!this.vehicles[vId] || utils.isEmpty(this.vehicles[vId].trackers)) this.vehicles[vId] = { trackers: {}, pos: gData[vId] }; else { // Just simply append new data to cache. Might need to check for continuous gpdDT this.vehicles[vId].pos = [...(this.vehicles[vId].pos || []), ...gData[vId]]; // TODO: might need to sort the this.vehicles[vId].pos by gdt by 'ascending' to make sure data always in correct order // Check & trim history data cache if (this.vehicles[vId].pos.length > this.options.historySize) { this.vehicles[vId].pos = this.vehicles[vId].pos.slice((this.vehicles[vId].pos.length - this.options.historySize) - 1); } } // Send update to trackers if (this.vehicles[vId] && !utils.isEmpty(this.vehicles[vId].trackers)) { let promises = []; // , lastGDT; for (let t in this.vehicles[vId].trackers) { if (Object.prototype.hasOwnProperty.call(this.vehicles[vId].trackers, t)) { // lastGDT = this.vehicles[vId].trackers[t]; if (this.clients[t] && this.clients[t]['ready']) { // Skip until available const aFn = (async () => { // debug('client ', t, this.clients[t]['ready'], 'time: ', this.vehicles[vId].trackers[t]); let toSend = {}; // let tosendLast = false; // // Check to include recent real data came when the client was not ready (the gap after client reconnected) // let i = this.vehicles[vId].pos.length - 1; // if (utils.isDateTime(lastGDT)) { // if (utils.dateDiff(lastGDT, this.vehicles[vId].pos[i].gdt) <= 0) { // toSend[vId] = this.vehicles[vId].pos.slice(i); // tosendLast = true; // } else { // while (i >= 0 && utils.dateDiff(lastGDT, this.vehicles[vId].pos[i].gdt) > 0) { // i--; // } // if (i === this.vehicles[vId].pos.length - 1) // tosendLast = true; // else // toSend[vId] = this.vehicles[vId].pos.slice(i + 1); // } // } else { // Just send the new data if last gdt N/A toSend = utils.clone(gData); // tosendLast = true; // } // debug('Sending data to: ' + t); // if (!tosendLast || (tosendLast && !this.clients[t]['sentLast'])) { await this.publishOne(t, toSend, 'd'); // if (!this.clients[t]['sentLast']) // this.clients[t]['sentLast'] = true; // else // this.vehicles[vId].trackers[t] = utils.last(toSend[vId]).gdt; // } })(); promises.push(aFn); } } } // Ignore sending errors, continue sending to others if any failed to be sent await Promise.all(promises.map(p => p.catch(error => debug(error)))); promises = null; // lastGDT = null; } } catch (err) { debug(err); } } async trackOn(req, res) { debug(req.url, new Date()); const input = req.body, client = this.clients[input.sId]; try { if (input.sId && client) { this.clients[input.sId].ready = false; // Mark this client as not yet ready for realtime data this._removeVTracker(input.sId); // Clear previous tracking list then re-update if (input.vt && input.vt.length) { let tv; for (let i = 0; i < input.vt.length; i++) { tv = input.vt[i]; if (!this.vehicles[tv.v]) this.vehicles[tv.v] = { trackers: {}, pos: [] }; // TODO: get latest data of each vehicle to send to the client // let recentData = await this._getRecentVehData(tv.v, tv.gdt); let recentData; if (recentData) { await this.publishOne(input.sId, recentData, 'd'); this.vehicles[tv.v].trackers[input.sId] = utils.last(recentData[tv.v]).gdt; recentData = null; } else { this.vehicles[tv.v].trackers[input.sId] = new Date(tv.gdt); // debug(tv.v, this.vehicles[tv.v].trackers[input.sId]); } } } this.clients[input.sId].ready = true; // Mark the vehicle ready to be updated with real data res.send({ 'trackOn': 'OK' }).end(); } else { res.status(409).end(); } } catch (err) { error.handleResErr(res, err); } } async _getRecentVehData(vId, fromDT) { /* This is just for simulation of getting recent data from the db to avoid gap */ const sample = 1; let data = {}; if (!fromDT) { // getLatestRecord } else { data = await mockPromise(() => { let gdata = [], time = (new Date(fromDT)).getTime() - sample * 10000; for (let i = 0; i < sample; i++) { gdata.push({ lat: 33.44, lon: -79.33, gdt: (new Date(time + i * 10000)) }); } const res = {}; res[vId] = gdata; return res; }); } return data; } async register(req, res) { const c = { req: req, res: res }; c.req.socket.setTimeout(0); c.req.socket.setKeepAlive(true); c.res.writeHead(200, { 'Connection': 'keep-alive', 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache' }); // let padding = new Array(2049); // c.res.write(':' + padding.join(' ') + '\n'); // 2kB padding for IE c.res.write('retry: ' + this.options.clientRetryInterval + '\n\n'); let sseId = uniqid(); this.clients[sseId] = c; c.req.on('close', () => this._unregister(sseId)); c.res.on('close', () => this._unregister(sseId)); await this.publishOne(sseId, sseId, 'login'); return this.clients[sseId]; } _unregister(sseId) { const c = this.clients[sseId]; if (c) { // debug(sseId + ' client closed.'); c.req.off('close', () => this._unregister(sseId)); c.res.off('close', () => this._unregister(sseId)); c.res.end(); delete this.clients[sseId]; this._removeVTracker(sseId); } } _removeVTracker(sseId) { for (let k in this.vehicles) { if (this.vehicles[k].trackers && this.vehicles[k].trackers[sseId]) delete this.vehicles[k].trackers[sseId]; } } async publish(data, eventName, toId = '*') { if (toId === '*' || !toId) { for (let c in this.clients) { if (Object.prototype.hasOwnProperty.call(this.clients, c)) await this.publishOne(c, data, eventName); } } else { await this.publishOne(toId, data, eventName); } } async publishOne(toId, data, eventName) { if (!toId || !this.clients[toId]) return; // debug('publishOne: ' + toId); const clientSocket = this.clients[toId].res; if (!clientSocket) { // Corrupted registration => remove this._unregister(toId); return; } if (eventName === 'p' && this.clients[toId].lastPing) { // Check and skip unnecessary pings let now = new Date(), pToNow = now - this.clients[toId].lastPing; // debug(toId, "Time:", pToNow); if (pToNow < this.options.pingInterval) { this.clients[toId].lastPing = new Date((now.getTime() + pToNow) - this.options.pingInterval); return; } } let _data; if (typeof data === 'object') _data = JSON.stringify(data); else _data = data; // Convert lines in json text to data: \n (sse protocol) _data = _data ? _data.split(/[\r\n|\n]+/).map(str => 'data: ' + str).join('\n') : ''; let output = ( (eventName ? 'event: ' + eventName + '\n' : '') + (_data || 'data: ') + '\n\n' ); if (!clientSocket.write(output)) try { await once(clientSocket, 'drain'); // Handle backpressure on write } catch (error) { debug(error); } this.clients[toId].lastPing = new Date(); output = null; _data = null; data = null; // debug('Sent to' + toId); } // _write(stream, data) { // return new Promise((resolve, reject) => { // if (!stream) // return reject('Invalid stream'); // if (!stream.write(data)) { // stream.once('drain', resolve); // } else { // process.nextTick(resolve); // } // }); // } listClients() { const rollupByIP = {}; let cv; for (let c in this.clients) { if (Object.prototype.hasOwnProperty.call(this.clients, c)) { cv = this.clients[c]; if (cv) { const ip = cv.req.headers['x-real-ip'] || cv.req.socket.remoteAddress; if (!(ip in rollupByIP)) { rollupByIP[ip] = []; } rollupByIP[ip].push(c); } } } return rollupByIP; } getClientsCount() { return Object.keys(this.clients).length; } close() { clearInterval(this.pingTimer); for (let c in this.clients) { if (Object.prototype.hasOwnProperty.call(this.clients, c)) this._removeVTracker(c); } debug('Channel closed gracefully !'); } /** * Set new Gps tracking data for vehicles. * ** FOR TESTING ONLY ** * @param {*} gData Vehicle GPS data { 'vehId': [data] } */ async setGpsData(gData) { if (!gData) { // For dev testing only // const now = new Date(); gData = {}; gData['veh1'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }]; gData['veh2'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }]; gData['veh3'] = [{ lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }, { lat: 33.3, lon: -79.33, stat: 0, gdt: new Date() }]; gData['veh4'] = [{ lat: 33.33, lon: -79.33, stat: 0, gdt: new Date() }]; gData['veh5'] = [{ lat: 33.34, lon: -79.33, stat: 0, gdt: new Date() }]; } let payload, all = []; for (let k in gData) { if (Object.prototype.hasOwnProperty.call(gData, k)) { payload = {}; payload[k] = gData[k]; all.push(this.publish(payload, 'd', '*')); } } await Promise.all(all).catch(err => console.log(err)); } clear() { this.vehicles = []; for (let sseId in this.clients) { if (this.clients[sseId]) { try { this.clients[sseId].res.end(); this._unregister(sseId); } catch (error) { } } } } /** * Get all vehicles under this user along with their latest location * @param {*} ops { byPuid: } */ async getVehicles(req, res) { // TODO: add param validation const applId = ObjectId(req.body.byPuid); try { const vPipeline = [ { $match: { markedDelete: { $ne: true }, parent: applId, unitId: { $nin: [null, ''] }, kind: '9', } }, { $lookup: { from: 'location_cache', localField: 'unitId', foreignField: 'unitId', as: 'locCache' } }, { $unwind: { path: '$locCache', 'preserveNullAndEmptyArrays': true } }, { $project: { unitId: '$unitId', name: '$name', acType: { $ifNull: ['$vehicleType', 0] }, kind: 1, model: '$model', color: { $ifNull: ['$color', 'yellow'] }, last: { $cond: [ { $not: ['$locCache'] }, null, { lat: '$locCache.lat', lon: '$locCache.lon', alt: '$locCache.alt', inputs: '$locCache.inputs', speed: '$locCache.speed', head: '$locCache.head', temp: '$locCache.temp', humid: '$locCache.humid', appRate: '$locCache.appRate', windSpd: '$locCache.windSpd', windHdg: '$locCache.windHdg', gdt: '$locCache.gdt' } ] }, tracking: 1, trackonDate: 1 } } ]; const vehs = await this.db.collection('users').aggregate(vPipeline).toArray(); const uPipeline = [ { $match: { $and: [{ kind: { $ne: '9' }, _id: { $ne: ObjectId(req.uid) }, active: true, username: { $nin: [null, ''] } }, { $or: [{ _id: applId }, { parent: applId }] }] } }, { $addFields: { 'userId': { '$toString': '$_id' } } }, { $lookup: { let: { 'userId': '$userId' }, from: 'location_cache', pipeline: [ { $match: { $expr: { $eq: ['$unitId', '$$userId'] } } } ], as: 'locCache' } }, { $unwind: { path: '$locCache', 'preserveNullAndEmptyArrays': true } }, { $project: { unitId: '$userId', kind: 1, usernames: { $split: ['$username', '@'] }, last: { $cond: [ { $not: ['$locCache'] }, null, { lat: '$locCache.lat', lon: '$locCache.lon', gdt: '$locCache.gdt' } ] } } }, { $match: { last: { $ne: null } } }, { $project: { unitId: 1, last: 1, kind: 1, name: { $arrayElemAt: ['$usernames', 0] } } } ]; const users = await this.db.collection('users').aggregate(uPipeline).toArray(); res.json(vehs.concat(users)).end(); } catch (err) { error.handleResErr(res, err); } } async findTracks(req, res) { let result = []; const ops = req.body; const from = new Date(ops.from); const to = new Date(ops.to); try { if (ops.unitId && from && to) { const filter = { unitId: ops.unitId }; filter['date'] = from == to ? from.toISOString().slice(0, 10) : { $gte: from.toISOString().slice(0, 10), $lte: to.toISOString().slice(0, 10) }; result = await this.db.collection('locations').aggregate([ { $match: filter }, { $project: { dist: 1, total: 1, start: { $arrayElemAt: ['$locs', 0] }, end: { $arrayElemAt: ['$locs', -1] } } }, { $project: { start: '$start.gdt', end: '$end.gdt', dist: 1, total: 1 } } ]).toArray(); } res.json(result).end(); } catch (err) { error.handleResErr(res, err); } } /** * Extract location list into spray-off and spray-on segments. * @param {*} data track location list * @returns [{ spray: , data: [[lat,lon]]}] */ _getTracksSegments(data) { if (!data || !data.length) return []; let start = 0, cur = 1, seg = [], segs = [], tooFar = false; while (cur < data.length) { if ((tooFar = (utils.distance([data[cur - 1].lat, data[cur - 1].lon], [data[cur].lat, data[cur].lon]) >= 5 * 1e3)) || data[cur].inputs != data[cur - 1].inputs || cur === data.length - 1) { seg = data.slice(start, tooFar ? cur : cur + 1); if (seg.length > 1) { if (seg.length > 1) segs.push(({ spray: !!(seg[0].inputs), data: seg })); } seg = []; start = cur; } cur++; } return segs; } async getTracks(req, res) { const ops = req.body; let result = []; try { if (ops && ops.unitId && ops.dates && ops.dates.length) { const vlocs = await this.db.collection('locations').find({ unitId: ops.unitId, date: { $in: ops.dates } }).toArray(); if (vlocs && vlocs.length) { // TODO: Convert these loop logic into ASYNC if any for (let i = 0; i < vlocs.length; i++) { const vloc = vlocs[i]; let segs = []; if (vloc.locs && vloc.locs.length) { segs = this._getTracksSegments(vloc.locs); for (let k = 0; k < segs.length; k++) { if (segs[k].data.length > 2) { const lonlats = segs[k].data.map(it => [it.lon, it.lat]); const simplified = simplify(lonlats, 0.00001); segs[k].data = simplified.map(it => [it[1], it[0]]); } else { segs[k].data = segs[k].data.map(it => [it.lat, it.lon]); } } } result.push(({ date: vloc.date, end: utils.last(vloc.locs).gdt, dist: vloc.dist, data: segs })); } } } res.json(result).end(); } catch (err) { error.handleResErr(res, err); } } /** export Tracks on demand - API */ async exportTracks(req, res) { const ops = req.body; if (!ops.unitId || !ops.from || isNaN(Date.parse(ops.from))) return error.handleResErr(res, 'invalid_param'); try { var fromDate = ops.from.slice(0, 10); const pipeline = ( [ { $match: { parent: ObjectId(req.uid), unitId: ops.unitId.trim() } }, { $lookup: { from: 'locations', let: { unitId: "$unitId", fromDate: fromDate }, pipeline: [ { $match: { $expr: { $and: [ { $eq: ["$unitId", "$$unitId"] }, { $gte: ["$date", "$$fromDate"] } ] } } }, ], as: 'vlocs' } }, { $unwind: { path: '$vlocs', 'preserveNullAndEmptyArrays': true } }, { $project: { uid: '$unitId', locs: { $filter: { input: "$vlocs.locs", as: "loc", cond: { $and: [ { $gt: ["$$loc.gdt", new Date(ops.from)] }, ] } } } } }, { $unwind: "$locs" }, { $project: { _id: 0, uid: '$uid', lat: '$locs.lat', lng: '$locs.lon', alt: '$locs.alt', hdg: '$locs.head', spd: '$locs.speed', gdt: '$locs.gdt', spr: '$locs.inputs' } } ]); if (ops.to && !isNaN(Date.parse(ops.to))) { pipeline[1].$lookup.let['toDate'] = ops.to.slice(0, 10); pipeline[1].$lookup.pipeline[0].$match.$expr.$and.push({ $lte: ["$date", "$$toDate"] }); pipeline[3].$project.locs.$filter.cond.$and.push({ $lte: ["$$loc.gdt", new Date(ops.to)] }); } const result = await this.db.collection('users').aggregate(pipeline).toArray(); res.json(result).end(); } catch (err) { error.handleResErr(res, err); } } } module.exports = { TrackChannel };