#!/usr/bin/env node /** * Worker Process Manager * Starts all worker processes for the AgMission system */ const { spawn } = require('child_process'); const path = require('path'); const debug = require('debug')('agm:worker-manager'); // Load environment variables from .env file const envPath = path.join(__dirname, 'environment.env'); require('dotenv').config({ path: envPath }); // Ensure critical environment variables are set const requiredEnvVars = [ 'DB_HOSTS', 'DB_NAME', 'DB_USR', 'DB_PWD', 'QUEUE_HOST', 'QUEUE_PORT', 'QUEUE_USR', 'QUEUE_PWD' ]; const missingVars = requiredEnvVars.filter(varName => !process.env[varName]); if (missingVars.length > 0) { console.error('Missing required environment variables:', missingVars); console.error('Please check environment.env file and ensure all required variables are set'); process.exit(1); } // Worker configurations const WORKERS = [ // { // name: 'job_worker', // script: './workers/job_worker.js', // description: 'Processes internal job tasks including partner log processing' // }, { name: 'partner_sync_worker', script: './workers/partner_sync_worker.js', description: 'Handles partner system synchronization' }, { name: 'partner_polling_worker', script: './workers/partner_data_polling_worker.js', description: 'Polls partner systems for new data' }, { name: 'dlq_alert_worker', script: './workers/dlq_alert_worker.js', description: 'Monitors DLQs and sends email alerts' }, // { // name: 'cleanup_worker', // script: './workers/cleanup_worker.js', // description: 'Handles cleanup tasks' // }, // { // name: 'obstacle_worker', // script: './workers/obstacle_worker.js', // description: 'Processes obstacle detection tasks' // }, // { // name: 'invoice_worker', // script: './workers/invoice_worker.js', // description: 'Handles invoice processing' // } ]; // Storage for worker processes const workerProcesses = new Map(); // Signal handling process.on('SIGINT', () => { debug('Received SIGINT, shutting down workers...'); shutdownWorkers(); }); process.on('SIGTERM', () => { debug('Received SIGTERM, shutting down workers...'); shutdownWorkers(); }); // Start a worker process function startWorker(worker) { debug(`Starting ${worker.name}: ${worker.description}`); const child = spawn('node', [worker.script], { cwd: __dirname, stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env } }); // Handle stdout child.stdout.on('data', (data) => { process.stdout.write(`[${worker.name}] ${data}`); }); // Handle stderr child.stderr.on('data', (data) => { process.stderr.write(`[${worker.name}] ${data}`); }); // Handle process exit child.on('exit', (code, signal) => { debug(`${worker.name} exited with code ${code}, signal ${signal}`); workerProcesses.delete(worker.name); // Restart worker after delay if not intentionally stopped if (code !== 0 && !shutdownInProgress) { debug(`Restarting ${worker.name} in 5 seconds...`); setTimeout(() => { if (!shutdownInProgress) { startWorker(worker); } }, 5000); } }); // Handle process error child.on('error', (err) => { debug(`${worker.name} error:`, err); }); workerProcesses.set(worker.name, { process: child, config: worker }); debug(`${worker.name} started with PID ${child.pid}`); } // Shutdown all workers let shutdownInProgress = false; function shutdownWorkers() { if (shutdownInProgress) return; shutdownInProgress = true; debug('Shutting down all workers...'); const shutdownPromises = []; for (const [name, worker] of workerProcesses) { shutdownPromises.push(new Promise((resolve) => { const proc = worker.process; debug(`Stopping ${name}...`); // Set a timeout for forceful termination const forceTimeout = setTimeout(() => { debug(`Force killing ${name}...`); proc.kill('SIGKILL'); resolve(); }, 10000); // Handle graceful exit proc.on('exit', () => { clearTimeout(forceTimeout); debug(`${name} stopped`); resolve(); }); // Send SIGTERM for graceful shutdown proc.kill('SIGTERM'); })); } Promise.all(shutdownPromises).then(() => { debug('All workers stopped'); process.exit(0); }); } // Start all workers function startAllWorkers() { debug('Starting AgMission worker processes...'); WORKERS.forEach(worker => { startWorker(worker); }); debug(`Started ${WORKERS.length} worker processes`); } // Display status function displayStatus() { console.log('\n=== AgMission Worker Status ==='); for (const [name, worker] of workerProcesses) { console.log(`${name}: PID ${worker.process.pid} - ${worker.config.description}`); } console.log(`Total workers: ${workerProcesses.size}`); console.log('==============================\n'); } // Main execution if (require.main === module) { debug('AgMission Worker Manager starting...'); startAllWorkers(); // // Display initial status // setTimeout(displayStatus, 2000); // // Display status every 30 seconds // setInterval(displayStatus, 30000); } module.exports = { startWorker, shutdownWorkers, workerProcesses };