const debug = require('debug')('agm:job-queuer'), amqp = require('amqplib/callback_api'), util = require('util'), utils = require('./utils'), env = require('./env'); let amqpConn = null; let connTimer; const JOB_QUEUE = env.PRODUCTION ? env.QUEUE_NAME_JOBS : 'dev_jobs' const GDATA_QUEUE = 'gdata'; const PARTNER_QUEUE = env.QUEUE_NAME_PARTNER; function JobQueuer(ops = {}) { this._conOps = { protocol: ops.protocol || 'amqp', hostname: ops.hostname || env.QUEUE_HOST, port: ops.port || env.QUEUE_PORT || 5672, username: ops.username || env.QUEUE_USR, password: ops.password || env.QUEUE_PWD, vhost: ops.vhost || env.QUEUE_VHOST || '/', hearbeat: ops.heartbeat || env.QUEUE_HEARTBEAT || 0 }; } // Rabbitmq connection. If the connection is closed or fails to be established at all, we will reconnect JobQueuer.prototype.start = function start() { if (amqpConn) closeOnErr('force_close_reconnect'); amqp.connect(JobQueuer.instance._conOps, function (err, conn) { if (err) { console.error("[AMQP]", err.message); connTimer = setTimeout(start, 5000); return; } conn.on("error", function (err) { if (err.message !== "Connection closing") { console.error("[AMQP] conn error", err.message); } }); conn.on("close", function () { console.error("[AMQP] reconnecting"); connTimer = setTimeout(start, 5000); return; }); amqpConn = conn; process.once('SIGINT', () => { closeOnErr('force_close_SIGINT'); }); whenConnected(); }); } function whenConnected() { debug("[AMQP] Job Queuer connected"); startPublisher(); } /** * Rabbitmq message queue testing section */ let pubChannel = null; let offlinePubQueue = []; function startPublisher() { amqpConn.createConfirmChannel(function (err, ch) { if (closeOnErr(err)) return; ch.on("error", function (err) { console.error("[AMQP] channel error", err.message); pubChannel = null; // Reset channel on error }); ch.on("close", function () { console.error("[AMQP] channel closed"); pubChannel = null; // Reset channel on close }); pubChannel = ch; debug("[AMQP] Publisher channel created"); // Start processing for the pending offline queue items left on the queue if any let processedCount = 0; let m; while ((m = offlinePubQueue.shift())) { if (!m) break; // Handle both old format (3 items) and new format (4 items with callback) if (m.length === 4) { // New format with callback pubChannel.publish(m[0], m[1], m[2], { persistent: true }, m[3]); } else { // Old format without callback pubChannel.publish(m[0], m[1], m[2], { persistent: true }); } processedCount++; } if (processedCount > 0) { debug(`[AMQP] Processed ${processedCount} offline queued messages`); } }); } /** * Enqueue a new task to the background task queue * @param {*} exchange then exchange name, default direct exchange to be '' * @param {*} routingKey the queue name, '', default will be the jobs queue * @param {*} content the item binary content. i.e.: Buffer.from(JSON.stringify(importTask)) */ JobQueuer.prototype.publish = function publish(exchange, routingKey, content, cb) { if (!pubChannel) { const error = new Error("Publish channel for the Job Queue is not available - connection may not be established yet"); debug("Publish channel for the Job Queue is not valid !"); // Add to offline queue for later processing offlinePubQueue.push([exchange, routingKey, content, cb]); // Call callback with error if provided if (cb) { return cb(error); } return; } pubChannel.publish(exchange, !routingKey ? JOB_QUEUE : routingKey, content, { persistent: true }, function (err) { if (err) { debug('Publish error:', err); offlinePubQueue.push([exchange, routingKey, content, cb]); closeOnErr(err); } if (cb) cb(err); // debug(' [.] Published %s', content.toString()); }); } JobQueuer.prototype.publishJobTask = function publish(content, cb) { this.publish('', JOB_QUEUE, content, cb); } JobQueuer.prototype.publishLocTask = function publish(content, cb) { this.publish('', GDATA_QUEUE, content, cb); } // Add partner-specific task publishing JobQueuer.prototype.publishPartnerTask = function publish(content, cb) { // Use dedicated partner queue for partner tasks const partnerContent = Buffer.isBuffer(content) ? content : Buffer.from(content); this.publish('', PARTNER_QUEUE, partnerContent, cb); } // Add task helper method for easier usage JobQueuer.prototype.addTask = function addTask(taskType, taskData, cb) { try { const task = { type: taskType, data: taskData, timestamp: new Date().toISOString() }; // Convert task to string, then to Buffer const taskString = JSON.stringify(task); this.publishPartnerTask(taskString, cb); } catch (error) { debug('Error in addTask:', error); if (cb) cb(error); } } JobQueuer.prototype.publishJobTaskASync = util.promisify(JobQueuer.prototype.publishJobTask); JobQueuer.prototype.publishPartnerTaskASync = util.promisify(JobQueuer.prototype.publishPartnerTask); JobQueuer.prototype.addTaskASync = util.promisify(JobQueuer.prototype.addTask); function closeOnErr(err) { if (connTimer) clearTimeout(connTimer); if (!err) return false; console.error("[AMQP] error", err); if (amqpConn) { try { amqpConn.close(); } catch (ignore) { } } return true; } /* ************************************************************************ SINGLETON CLASS DEFINITION ************************************************************************ */ JobQueuer.instance = null; /** * Singleton getInstance definition * @return singleton class */ module.exports = { getInstance(ops) { // TODO: Add connection options to set connection params from the caller; refactor to ES6 class (optional) if (JobQueuer.instance === null) { debug("Created a new JobQueuer"); JobQueuer.instance = new JobQueuer(ops); } return JobQueuer.instance; } }