'use strict'; const TCPDevice = require('./tcp-device'); const numberUtil = require('number-util'); const READ_0 = 0; const READ_1 = 1; const READ_BLOCKED = 99; const START_MARK = Buffer.from('FBFB', 'hex'); const END_MARK = Buffer.from('0D0A', 'hex'); const PKG_SIZE = 40; class AgNavTCPDevice extends TCPDevice { constructor(socket, ops) { super(socket, ops); this.id = null; this.msgReceived = 0; this._startPos = -1; this._endPos = -1; this._firstPart = null; this._secondPart = null; this._cmdBuf = null; this._readState = READ_0; } /** * _onData is triggered by the "readable" event on the underlying TCP socket. * It is called each time there is new data * received. It is responsible for reading data from the socket and * performing the appropriate action given the current read state. */ _onData() { try { // Loop while there was still data to process on the socket's buffer. // This will stop when we don't have enough data or encountering a back pressure issue; let readMore = true; do { switch (this._readState) { case READ_0: readMore = this._processRead_0(); break; case READ_1: readMore = this._processRead_1(); break; case READ_BLOCKED: readMore = false; break; default: throw new Error('Unknown read state'); } } while (readMore); } catch (err) { console.log(err); // Terminate on failures as we won't be able to recovery since data was corrupted and we won't // be able to any more data without additional errors. this.destroy(err); } } _processRead_0() { this._firstPart = this._socket.read(PKG_SIZE / 2); if (!this._firstPart) return false; this._startPos = this._firstPart.indexOf(START_MARK); if (this._startPos === -1) { this._guardInvalidTries(); this._readState = READ_0; } else { this._readState = READ_1; } return true; } _processRead_1() { // Read payload data by the length this._secondPart = this._socket.read(this._startPos + (PKG_SIZE / 2)); if (!this._secondPart) { return false; } this._endPos = this._secondPart.indexOf(END_MARK); if (this._endPos === -1) { this._guardInvalidTries(); // Could not find the match for end mark => skip the first part to find the next start mark this.unshift(this._secondPart); } else { if ((this._firstPart.length - this._startPos) + this._endPos + END_MARK.length <= PKG_SIZE) { this._invalidTry = 0; this.msgReceived++; this._cmdBuf = Buffer.alloc(PKG_SIZE - START_MARK.length); this._firstPart.copy(this._cmdBuf, 0, this._startPos + START_MARK.length); this._secondPart.copy(this._cmdBuf, this._firstPart.length - this._startPos - START_MARK.length, 0, this._endPos + END_MARK.length); if (!this.id) { this.id = numberUtil.padZero(this._cmdBuf.readInt32LE(1), 10); } // Check for the case of package size < PKG_SIZE if (this._secondPart.length > this._endPos + END_MARK.length) { // return the rest to the internal buffer for the next read this.unshift(this._secondPart.slice(this._endPos + END_MARK.length)); } // Push the message onto the read buffer for the consumer to read. We are mindful of slow reads by the consumer // and will respect backpressure signals. const pushOk = this.push(this._cmdBuf); if (pushOk) { this._readState = READ_0; return true; } else { console.log('socket read is blocked'); this._readState = READ_BLOCKED; return false; } } else { this._invalidTry++; if (this._invalidTry > this.ops.maxInvalidPackages) throw new Error('Reached maximum invalid read try'); } } this._readState = READ_0; return true; } } module.exports = AgNavTCPDevice;