149 lines
4.0 KiB
JavaScript
149 lines
4.0 KiB
JavaScript
const
|
|
debug = require('debug')('agm:job-queuer'),
|
|
amqp = require('amqplib/callback_api'),
|
|
utils = require('./utils'),
|
|
env = require('./env');
|
|
|
|
let amqpConn = null;
|
|
let connTimer;
|
|
|
|
const JOB_QUEUE = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs'
|
|
const GDATA_QUEUE = 'gdata';
|
|
|
|
function JobQueuer(ops = {}) {
|
|
this._conOps = {
|
|
protocol: ops.protocol || 'amqp',
|
|
hostname: ops.hostname || env.QUEUE_HOST,
|
|
port: ops.port || env.QUEUE_PORT || 5672,
|
|
username: ops.username || env.QUEUE_USR,
|
|
password: ops.password || env.QUEUE_PWD,
|
|
vhost: ops.vhost || env.QUEUE_VHOST || '/',
|
|
hearbeat: ops.heartbeat || env.QUEUE_HEARTBEAT || 0
|
|
};
|
|
}
|
|
|
|
// Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect
|
|
JobQueuer.prototype.start = function start() {
|
|
if (amqpConn)
|
|
closeOnErr('force_close_reconnect');
|
|
|
|
amqp.connect(JobQueuer.instance._conOps, function (err, conn) {
|
|
if (err) {
|
|
console.error("[AMQP]", err.message);
|
|
connTimer = setTimeout(start, 5000);
|
|
return;
|
|
}
|
|
conn.on("error", function (err) {
|
|
if (err.message !== "Connection closing") {
|
|
console.error("[AMQP] conn error", err.message);
|
|
}
|
|
});
|
|
conn.on("close", function () {
|
|
console.error("[AMQP] reconnecting");
|
|
connTimer = setTimeout(start, 5000);
|
|
return;
|
|
});
|
|
|
|
amqpConn = conn;
|
|
process.once('SIGINT', () => {
|
|
closeOnErr('force_close_SIGINT');
|
|
});
|
|
|
|
whenConnected();
|
|
});
|
|
}
|
|
|
|
function whenConnected() {
|
|
debug("[AMQP] Job Queuer connected");
|
|
startPublisher();
|
|
}
|
|
|
|
/**
|
|
* Rabbitmq message queue testing section
|
|
*/
|
|
let pubChannel = null;
|
|
let offlinePubQueue = [];
|
|
function startPublisher() {
|
|
amqpConn.createConfirmChannel(function (err, ch) {
|
|
if (closeOnErr(err)) return;
|
|
|
|
ch.on("error", function (err) {
|
|
console.error("[AMQP] channel error", err.message);
|
|
});
|
|
ch.on("close", function () {
|
|
console.error("[AMQP] channel closed");
|
|
});
|
|
|
|
pubChannel = ch;
|
|
// Start processing for the pending offline queue items left on the queue if any
|
|
let m;
|
|
while ((m = offlinePubQueue.shift())) {
|
|
if (!m) break;
|
|
pubChannel.publish(m[0], m[1], m[2]);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Enqueue a new task to the background task queue
|
|
* @param {*} exchange then exchange name, default direct exchange to be ''
|
|
* @param {*} routingKey the queue name, '', default will be the jobs queue
|
|
* @param {*} content the item binary content. i.e.: Buffer.from(JSON.stringify(importTask))
|
|
*/
|
|
JobQueuer.prototype.publish = function publish(exchange, routingKey, content, cb) {
|
|
if (!pubChannel) {
|
|
debug("Publish channel for the Job Queue is not valid !");
|
|
return;
|
|
}
|
|
|
|
pubChannel.publish(exchange, !routingKey ? JOB_QUEUE : routingKey, content, { persistent: true }, function (err) {
|
|
if (err) {
|
|
debug(err);
|
|
offlinePubQueue.push([exchange, routingKey, content]);
|
|
closeOnErr(err);
|
|
cb && cb(err);
|
|
}
|
|
cb && cb();
|
|
// debug(' [.] Published %s', content.toString());
|
|
});
|
|
}
|
|
|
|
JobQueuer.prototype.publishJobTask = function publish(content, cb) {
|
|
this.publish('', JOB_QUEUE, content, cb);
|
|
}
|
|
|
|
JobQueuer.prototype.publishLocTask = function publish(content, cb) {
|
|
this.publish('', GDATA_QUEUE, content, cb);
|
|
}
|
|
|
|
function closeOnErr(err) {
|
|
if (connTimer) clearTimeout(connTimer);
|
|
if (!err) return false;
|
|
console.error("[AMQP] error", err);
|
|
if (amqpConn) {
|
|
try {
|
|
amqpConn.close();
|
|
} catch (ignore) { }
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/* ************************************************************************
|
|
SINGLETON CLASS DEFINITION
|
|
************************************************************************ */
|
|
JobQueuer.instance = null;
|
|
|
|
/**
|
|
* Singleton getInstance definition
|
|
* @return singleton class
|
|
*/
|
|
module.exports = {
|
|
getInstance(ops) {
|
|
// TODO: Add connection options to set connection params from the caller; refactor to ES6 class (optional)
|
|
if (JobQueuer.instance === null) {
|
|
debug("Created a new JobQueuer");
|
|
JobQueuer.instance = new JobQueuer(ops);
|
|
}
|
|
return JobQueuer.instance;
|
|
}
|
|
} |