const debug = require('debug')('agm:job-queuer'), amqp = require('amqplib/callback_api'), util = require('util'), utils = require('./utils'), env = require('./env'); let amqpConn = null; let connTimer; const JOB_QUEUE = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs' const GDATA_QUEUE = 'gdata'; function JobQueuer(ops = {}) { this._conOps = { protocol: ops.protocol || 'amqp', hostname: ops.hostname || env.QUEUE_HOST, port: ops.port || env.QUEUE_PORT || 5672, username: ops.username || env.QUEUE_USR, password: ops.password || env.QUEUE_PWD, vhost: ops.vhost || env.QUEUE_VHOST || '/', hearbeat: ops.heartbeat || env.QUEUE_HEARTBEAT || 0 }; } // Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect JobQueuer.prototype.start = function start() { if (amqpConn) closeOnErr('force_close_reconnect'); amqp.connect(JobQueuer.instance._conOps, function (err, conn) { if (err) { console.error("[AMQP]", err.message); connTimer = setTimeout(start, 5000); return; } conn.on("error", function (err) { if (err.message !== "Connection closing") { console.error("[AMQP] conn error", err.message); } }); conn.on("close", function () { console.error("[AMQP] reconnecting"); connTimer = setTimeout(start, 5000); return; }); amqpConn = conn; process.once('SIGINT', () => { closeOnErr('force_close_SIGINT'); }); whenConnected(); }); } function whenConnected() { debug("[AMQP] Job Queuer connected"); startPublisher(); } /** * Rabbitmq message queue testing section */ let pubChannel = null; let offlinePubQueue = []; function startPublisher() { amqpConn.createConfirmChannel(function (err, ch) { if (closeOnErr(err)) return; ch.on("error", function (err) { console.error("[AMQP] channel error", err.message); }); ch.on("close", function () { console.error("[AMQP] channel closed"); }); pubChannel = ch; // Start processing for the pending offline queue items left on the queue if any let m; while ((m = offlinePubQueue.shift())) { if (!m) break; pubChannel.publish(m[0], m[1], m[2]); } }); } /** * Enqueue a new task to the background task queue * @param {*} exchange then exchange name, default direct exchange to be '' * @param {*} routingKey the queue name, '', default will be the jobs queue * @param {*} content the item binary content. i.e.: Buffer.from(JSON.stringify(importTask)) */ JobQueuer.prototype.publish = function publish(exchange, routingKey, content, cb) { if (!pubChannel) { debug("Publish channel for the Job Queue is not valid !"); return; } pubChannel.publish(exchange, !routingKey ? JOB_QUEUE : routingKey, content, { persistent: true }, function (err) { if (err) { debug(err); offlinePubQueue.push([exchange, routingKey, content]); closeOnErr(err); cb && cb(err); } cb && cb(); // debug(' [.] Published %s', content.toString()); }); } JobQueuer.prototype.publishJobTask = function publish(content, cb) { this.publish('', JOB_QUEUE, content, cb); } JobQueuer.prototype.publishLocTask = function publish(content, cb) { this.publish('', GDATA_QUEUE, content, cb); } JobQueuer.prototype.publishJobTaskASync = util.promisify(JobQueuer.prototype.publishJobTask); function closeOnErr(err) { if (connTimer) clearTimeout(connTimer); if (!err) return false; console.error("[AMQP] error", err); if (amqpConn) { try { amqpConn.close(); } catch (ignore) { } } return true; } /* ************************************************************************ SINGLETON CLASS DEFINITION ************************************************************************ */ JobQueuer.instance = null; /** * Singleton getInstance definition * @return singleton class */ module.exports = { getInstance(ops) { // TODO: Add connection options to set connection params from the caller; refactor to ES6 class (optional) if (JobQueuer.instance === null) { debug("Created a new JobQueuer"); JobQueuer.instance = new JobQueuer(ops); } return JobQueuer.instance; } }