agmission/Development/server/helpers/job_queue.js

153 lines
4.2 KiB
JavaScript

const
debug = require('debug')('agm:job-queuer'),
amqp = require('amqplib/callback_api'),
util = require('util'),
utils = require('./utils'),
env = require('./env');
let amqpConn = null;
let connTimer;
const JOB_QUEUE = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs'
const GDATA_QUEUE = 'gdata';
function JobQueuer(ops = {}) {
this._conOps = {
protocol: ops.protocol || 'amqp',
hostname: ops.hostname || env.QUEUE_HOST,
port: ops.port || env.QUEUE_PORT || 5672,
username: ops.username || env.QUEUE_USR,
password: ops.password || env.QUEUE_PWD,
vhost: ops.vhost || env.QUEUE_VHOST || '/',
hearbeat: ops.heartbeat || env.QUEUE_HEARTBEAT || 0
};
}
// Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect
JobQueuer.prototype.start = function start() {
if (amqpConn)
closeOnErr('force_close_reconnect');
amqp.connect(JobQueuer.instance._conOps, function (err, conn) {
if (err) {
console.error("[AMQP]", err.message);
connTimer = setTimeout(start, 5000);
return;
}
conn.on("error", function (err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
connTimer = setTimeout(start, 5000);
return;
});
amqpConn = conn;
process.once('SIGINT', () => {
closeOnErr('force_close_SIGINT');
});
whenConnected();
});
}
function whenConnected() {
debug("[AMQP] Job Queuer connected");
startPublisher();
}
/**
* Rabbitmq message queue testing section
*/
let pubChannel = null;
let offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function (err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function (err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function () {
console.error("[AMQP] channel closed");
});
pubChannel = ch;
// Start processing for the pending offline queue items left on the queue if any
let m;
while ((m = offlinePubQueue.shift())) {
if (!m) break;
pubChannel.publish(m[0], m[1], m[2]);
}
});
}
/**
* Enqueue a new task to the background task queue
* @param {*} exchange then exchange name, default direct exchange to be ''
* @param {*} routingKey the queue name, '', default will be the jobs queue
* @param {*} content the item binary content. i.e.: Buffer.from(JSON.stringify(importTask))
*/
JobQueuer.prototype.publish = function publish(exchange, routingKey, content, cb) {
if (!pubChannel) {
debug("Publish channel for the Job Queue is not valid !");
return;
}
pubChannel.publish(exchange, !routingKey ? JOB_QUEUE : routingKey, content, { persistent: true }, function (err) {
if (err) {
debug(err);
offlinePubQueue.push([exchange, routingKey, content]);
closeOnErr(err);
cb && cb(err);
}
cb && cb();
// debug(' [.] Published %s', content.toString());
});
}
JobQueuer.prototype.publishJobTask = function publish(content, cb) {
this.publish('', JOB_QUEUE, content, cb);
}
JobQueuer.prototype.publishLocTask = function publish(content, cb) {
this.publish('', GDATA_QUEUE, content, cb);
}
JobQueuer.prototype.publishJobTaskASync = util.promisify(JobQueuer.prototype.publishJobTask);
function closeOnErr(err) {
if (connTimer) clearTimeout(connTimer);
if (!err) return false;
console.error("[AMQP] error", err);
if (amqpConn) {
try {
amqpConn.close();
} catch (ignore) { }
}
return true;
}
/* ************************************************************************
SINGLETON CLASS DEFINITION
************************************************************************ */
JobQueuer.instance = null;
/**
* Singleton getInstance definition
* @return singleton class
*/
module.exports = {
getInstance(ops) {
// TODO: Add connection options to set connection params from the caller; refactor to ES6 class (optional)
if (JobQueuer.instance === null) {
debug("Created a new JobQueuer");
JobQueuer.instance = new JobQueuer(ops);
}
return JobQueuer.instance;
}
}