agn-utils/device-tcp-socket/agnav-device.js

139 lines
4.0 KiB
JavaScript

'use strict';
const TCPDevice = require('./tcp-device');
const numberUtil = require('number-util');
const READ_0 = 0;
const READ_1 = 1;
const READ_BLOCKED = 99;
const START_MARK = Buffer.from('FBFB', 'hex');
const END_MARK = Buffer.from('0D0A', 'hex');
const PKG_SIZE = 40;
class AgNavTCPDevice extends TCPDevice {
constructor(socket, ops) {
super(socket, ops);
this.id = null;
this.msgReceived = 0;
this._startPos = -1;
this._endPos = -1;
this._firstPart = null;
this._secondPart = null;
this._cmdBuf = null;
this._readState = READ_0;
}
/**
* _onData is triggered by the "readable" event on the underlying TCP socket.
* It is called each time there is new data * received. It is responsible for reading data from the socket and
* performing the appropriate action given the current read state.
*/
_onData() {
try {
// Loop while there was still data to process on the socket's buffer.
// This will stop when we don't have enough data or encountering a back pressure issue;
let readMore = true;
do {
switch (this._readState) {
case READ_0:
readMore = this._processRead_0();
break;
case READ_1:
readMore = this._processRead_1();
break;
case READ_BLOCKED:
readMore = false;
break;
default:
throw new Error('Unknown read state');
}
} while (readMore);
} catch (err) {
console.log(err);
// Terminate on failures as we won't be able to recovery since data was corrupted and we won't
// be able to any more data without additional errors.
this.destroy(err);
}
}
_processRead_0() {
this._firstPart = this._socket.read(PKG_SIZE / 2);
if (!this._firstPart)
return false;
this._startPos = this._firstPart.indexOf(START_MARK);
if (this._startPos === -1) {
this._guardInvalidTries();
this._readState = READ_0;
} else {
this._readState = READ_1;
}
return true;
}
_processRead_1() {
// Read payload data by the length
this._secondPart = this._socket.read(this._startPos + (PKG_SIZE / 2));
if (!this._secondPart) {
return false;
}
this._endPos = this._secondPart.indexOf(END_MARK);
if (this._endPos === -1) {
this._guardInvalidTries();
// Could not find the match for end mark => skip the first part to find the next start mark
this.unshift(this._secondPart);
} else {
if ((this._firstPart.length - this._startPos) + this._endPos + END_MARK.length <= PKG_SIZE) {
this._invalidTry = 0;
this.msgReceived++;
this._cmdBuf = Buffer.alloc(PKG_SIZE - START_MARK.length);
this._firstPart.copy(this._cmdBuf, 0, this._startPos + START_MARK.length);
this._secondPart.copy(this._cmdBuf, this._firstPart.length - this._startPos - START_MARK.length, 0, this._endPos + END_MARK.length);
if (!this.id) {
this.id = numberUtil.padZero(this._cmdBuf.readInt32LE(1), 10);
}
// Check for the case of package size < PKG_SIZE
if (this._secondPart.length > this._endPos + END_MARK.length) {
// return the rest to the internal buffer for the next read
this.unshift(this._secondPart.slice(this._endPos + END_MARK.length));
}
// Push the message onto the read buffer for the consumer to read. We are mindful of slow reads by the consumer
// and will respect backpressure signals.
const pushOk = this.push(this._cmdBuf);
if (pushOk) {
this._readState = READ_0;
return true;
} else {
console.log('socket read is blocked');
this._readState = READ_BLOCKED;
return false;
}
} else {
this._invalidTry++;
if (this._invalidTry > this.ops.maxInvalidPackages)
throw new Error('Reached maximum invalid read try');
}
}
this._readState = READ_0;
return true;
}
}
module.exports = AgNavTCPDevice;