agmission/Development/server/start_workers.js

207 lines
5.2 KiB
JavaScript

#!/usr/bin/env node
/**
* Worker Process Manager
* Starts all worker processes for the AgMission system
*/
const { spawn } = require('child_process');
const path = require('path');
const debug = require('debug')('agm:worker-manager');
// Load environment variables from .env file
const envPath = path.join(__dirname, 'environment.env');
require('dotenv').config({ path: envPath });
// Ensure critical environment variables are set
const requiredEnvVars = [
'DB_HOSTS', 'DB_NAME', 'DB_USR', 'DB_PWD',
'QUEUE_HOST', 'QUEUE_PORT', 'QUEUE_USR', 'QUEUE_PWD'
];
const missingVars = requiredEnvVars.filter(varName => !process.env[varName]);
if (missingVars.length > 0) {
console.error('Missing required environment variables:', missingVars);
console.error('Please check environment.env file and ensure all required variables are set');
process.exit(1);
}
// Worker configurations
const WORKERS = [
// {
// name: 'job_worker',
// script: './workers/job_worker.js',
// description: 'Processes internal job tasks including partner log processing'
// },
{
name: 'partner_sync_worker',
script: './workers/partner_sync_worker.js',
description: 'Handles partner system synchronization'
},
{
name: 'partner_polling_worker',
script: './workers/partner_data_polling_worker.js',
description: 'Polls partner systems for new data'
},
{
name: 'dlq_alert_worker',
script: './workers/dlq_alert_worker.js',
description: 'Monitors DLQs and sends email alerts'
},
// {
// name: 'cleanup_worker',
// script: './workers/cleanup_worker.js',
// description: 'Handles cleanup tasks'
// },
// {
// name: 'obstacle_worker',
// script: './workers/obstacle_worker.js',
// description: 'Processes obstacle detection tasks'
// },
// {
// name: 'invoice_worker',
// script: './workers/invoice_worker.js',
// description: 'Handles invoice processing'
// }
];
// Storage for worker processes
const workerProcesses = new Map();
// Signal handling
process.on('SIGINT', () => {
debug('Received SIGINT, shutting down workers...');
shutdownWorkers();
});
process.on('SIGTERM', () => {
debug('Received SIGTERM, shutting down workers...');
shutdownWorkers();
});
// Start a worker process
function startWorker(worker) {
debug(`Starting ${worker.name}: ${worker.description}`);
const child = spawn('node', [worker.script], {
cwd: __dirname,
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env }
});
// Handle stdout
child.stdout.on('data', (data) => {
process.stdout.write(`[${worker.name}] ${data}`);
});
// Handle stderr
child.stderr.on('data', (data) => {
process.stderr.write(`[${worker.name}] ${data}`);
});
// Handle process exit
child.on('exit', (code, signal) => {
debug(`${worker.name} exited with code ${code}, signal ${signal}`);
workerProcesses.delete(worker.name);
// Restart worker after delay if not intentionally stopped
if (code !== 0 && !shutdownInProgress) {
debug(`Restarting ${worker.name} in 5 seconds...`);
setTimeout(() => {
if (!shutdownInProgress) {
startWorker(worker);
}
}, 5000);
}
});
// Handle process error
child.on('error', (err) => {
debug(`${worker.name} error:`, err);
});
workerProcesses.set(worker.name, { process: child, config: worker });
debug(`${worker.name} started with PID ${child.pid}`);
}
// Shutdown all workers
let shutdownInProgress = false;
function shutdownWorkers() {
if (shutdownInProgress) return;
shutdownInProgress = true;
debug('Shutting down all workers...');
const shutdownPromises = [];
for (const [name, worker] of workerProcesses) {
shutdownPromises.push(new Promise((resolve) => {
const proc = worker.process;
debug(`Stopping ${name}...`);
// Set a timeout for forceful termination
const forceTimeout = setTimeout(() => {
debug(`Force killing ${name}...`);
proc.kill('SIGKILL');
resolve();
}, 10000);
// Handle graceful exit
proc.on('exit', () => {
clearTimeout(forceTimeout);
debug(`${name} stopped`);
resolve();
});
// Send SIGTERM for graceful shutdown
proc.kill('SIGTERM');
}));
}
Promise.all(shutdownPromises).then(() => {
debug('All workers stopped');
process.exit(0);
});
}
// Start all workers
function startAllWorkers() {
debug('Starting AgMission worker processes...');
WORKERS.forEach(worker => {
startWorker(worker);
});
debug(`Started ${WORKERS.length} worker processes`);
}
// Display status
function displayStatus() {
console.log('\n=== AgMission Worker Status ===');
for (const [name, worker] of workerProcesses) {
console.log(`${name}: PID ${worker.process.pid} - ${worker.config.description}`);
}
console.log(`Total workers: ${workerProcesses.size}`);
console.log('==============================\n');
}
// Main execution
if (require.main === module) {
debug('AgMission Worker Manager starting...');
startAllWorkers();
// // Display initial status
// setTimeout(displayStatus, 2000);
// // Display status every 30 seconds
// setInterval(displayStatus, 30000);
}
module.exports = {
startWorker,
shutdownWorkers,
workerProcesses
};