#!/usr/bin/env node 'use strict'; /** * Script to migrate existing partner queue to use dead letter exchange * WARNING: This will temporarily delete and recreate the queue! * Only run this when no workers are running and no messages are in the queue. * * Usage: node scripts/migrate_queue_to_dlx.js [--confirm] */ const amqp = require('amqplib'); const env = require('../helpers/env'); const PARTNER_QUEUE = env.PRODUCTION ? env.QUEUE_NAME_PARTNER || 'partner_tasks' : 'dev_partner_tasks'; const DLQ_QUEUE = `${PARTNER_QUEUE}_failed`; async function migrateQueueToDLX() { const args = process.argv.slice(2); const isConfirmed = args.includes('--confirm'); if (!isConfirmed) { console.log('⚠️ QUEUE MIGRATION TO DEAD LETTER EXCHANGE'); console.log(''); console.log('This script will:'); console.log('1. Check if the queue has messages'); console.log('2. Delete the existing queue (if empty)'); console.log('3. Recreate it with dead letter exchange support'); console.log(''); console.log('⚠️ WARNING: This will temporarily delete the queue!'); console.log('Make sure no workers are running and no messages are queued.'); console.log(''); console.log('Run with --confirm to proceed'); return; } console.log(`Migrating queue: ${PARTNER_QUEUE}`); const conOps = { protocol: 'amqp', hostname: env.QUEUE_HOST || 'localhost', port: env.QUEUE_PORT || 5672, username: env.QUEUE_USR || 'agm', password: env.QUEUE_PWD, vhost: env.QUEUE_VHOST || '/', heartbeat: env.QUEUE_HEARTBEAT || 0, frameMax: 0 }; try { const conn = await amqp.connect(conOps); const ch = await conn.createChannel(); // Check if queue exists and has messages try { const queueInfo = await ch.checkQueue(PARTNER_QUEUE); console.log(`Current queue status: ${queueInfo.messageCount} messages, ${queueInfo.consumerCount} consumers`); if (queueInfo.messageCount > 0) { console.log('❌ Queue has messages! Cannot migrate safely.'); console.log('Please wait for all messages to be processed first.'); await conn.close(); return; } if (queueInfo.consumerCount > 0) { console.log('❌ Queue has active consumers! Cannot migrate safely.'); console.log('Please stop all workers first.'); await conn.close(); return; } } catch (error) { if (error.message.includes('NOT_FOUND')) { console.log('Queue does not exist yet - will create with DLX'); } else { throw error; } } console.log('✅ Safe to proceed with migration'); // Step 1: Create DLQ infrastructure (simplified) console.log('Creating failed message queue...'); await ch.assertQueue(DLQ_QUEUE, { durable: true }); console.log(`✅ Created failed queue: ${DLQ_QUEUE}`); // Step 2: Delete existing queue (if it exists) try { await ch.deleteQueue(PARTNER_QUEUE, { ifEmpty: true }); console.log(`✅ Deleted existing queue: ${PARTNER_QUEUE}`); } catch (error) { if (error.message.includes('NOT_FOUND')) { console.log(`Queue ${PARTNER_QUEUE} did not exist`); } else { throw error; } } // Step 3: Recreate queue with simplified DLX await ch.assertQueue(PARTNER_QUEUE, { durable: true, arguments: { 'x-dead-letter-exchange': '', 'x-dead-letter-routing-key': DLQ_QUEUE } }); console.log(`✅ Created queue with simplified DLX: ${PARTNER_QUEUE}`); // Verify the setup const newQueueInfo = await ch.checkQueue(PARTNER_QUEUE); console.log(`✅ Migration complete! Queue: ${newQueueInfo.messageCount} messages`); console.log(''); console.log('Queue structure:'); console.log(` Main Queue: ${PARTNER_QUEUE} (with simplified DLX)`); console.log(` Failed Queue: ${DLQ_QUEUE}`); await conn.close(); } catch (error) { console.error('❌ Migration failed:', error.message); process.exit(1); } } // Handle command line execution if (require.main === module) { migrateQueueToDLX().catch(console.error); } module.exports = { migrateQueueToDLX };