agmission/Development/server/tests/test_dlq_messages_direct.js

221 lines
9.3 KiB
JavaScript
Executable File

#!/usr/bin/env node
'use strict';
/**
* Simple DLQ Message Retrieval Test (No API, Direct RabbitMQ)
* Tests the core issue: message retrieval without requeuing
*
* This directly tests the RabbitMQ operations to verify:
* 1. Publishing messages doesn't duplicate
* 2. Getting messages with noAck:true doesn't consume them
* 3. Multiple retrievals return consistent counts
*
* Usage:
* node tests/test_dlq_messages_direct.js [--env ./environment.env]
*/
const path = require('path');
const amqp = require('amqplib');
// Parse --env argument
const args = process.argv.slice(2);
let envFile = './environment.env';
for (let i = 0; i < args.length; i++) {
if (args[i] === '--env' && args[i + 1]) {
envFile = args[i + 1];
i++;
}
}
// Load environment
const envPath = path.resolve(process.cwd(), envFile);
require('dotenv').config({ path: envPath });
const QUEUE_HOST = process.env.QUEUE_HOST || 'localhost';
const QUEUE_PORT = process.env.QUEUE_PORT || 5672;
const QUEUE_USR = process.env.QUEUE_USR || 'guest';
const QUEUE_PWD = process.env.QUEUE_PWD || 'guest';
const QUEUE_VHOST = process.env.QUEUE_VHOST || '/';
const testQueueName = 'test_dlq_messages';
const messageCount = 3;
console.log('╔════════════════════════════════════════════════════════════╗');
console.log('║ DLQ Message Retrieval Test (Direct RabbitMQ) ║');
console.log('╚════════════════════════════════════════════════════════════╝\n');
async function main() {
let connection, channel;
try {
// Connect
console.log(`Connecting to RabbitMQ at ${QUEUE_HOST}:${QUEUE_PORT}...`);
const vhostEncoded = encodeURIComponent(QUEUE_VHOST);
const connUrl = `amqp://${encodeURIComponent(QUEUE_USR)}:${encodeURIComponent(QUEUE_PWD)}@${QUEUE_HOST}:${QUEUE_PORT}/${vhostEncoded}`;
connection = await amqp.connect(connUrl);
channel = await connection.createChannel();
await channel.assertQueue(testQueueName, { durable: true });
console.log('✓ Connected\n');
// Clean up
console.log('Purging existing messages...');
await channel.purgeQueue(testQueueName);
console.log('✓ Queue purged\n');
// Test 1: Publish messages
console.log('──────────────────────────────────────────────────────────');
console.log(`Test 1: Publishing ${messageCount} messages`);
for (let i = 0; i < messageCount; i++) {
const msg = {
id: `test-${i}`,
content: `Message ${i + 1}`,
timestamp: Date.now()
};
channel.sendToQueue(
testQueueName,
Buffer.from(JSON.stringify(msg)),
{ persistent: true, contentType: 'application/json' }
);
}
await new Promise(resolve => setTimeout(resolve, 200));
let queueInfo = await channel.checkQueue(testQueueName);
console.log(`✓ Published ${messageCount} messages`);
console.log(` Queue count: ${queueInfo.messageCount}`);
if (queueInfo.messageCount !== messageCount) {
console.log(`✗ FAILED: Expected ${messageCount}, got ${queueInfo.messageCount}\n`);
return 1;
}
console.log('');
// Test 2: OLD METHOD - noAck:false with nack (causes duplication)
console.log('──────────────────────────────────────────────────────────');
console.log('Test 2: OLD METHOD - Get with noAck:false + nack requeue');
const oldMethodMessages = [];
for (let i = 0; i < 10; i++) {
const msg = await channel.get(testQueueName, { noAck: false });
if (!msg) break;
oldMethodMessages.push(JSON.parse(msg.content.toString()));
channel.nack(msg, false, true); // Requeue - THIS IS THE BUG
}
queueInfo = await channel.checkQueue(testQueueName);
console.log(` Retrieved: ${oldMethodMessages.length} messages`);
console.log(` Queue count after: ${queueInfo.messageCount}`);
if (oldMethodMessages.length > messageCount) {
console.log(`✗ BUG CONFIRMED: Retrieved ${oldMethodMessages.length} messages from queue with only ${messageCount}!`);
console.log(` This happens because nack(requeue=true) puts messages back at front of queue`);
} else {
console.log(`✓ Retrieved correct count (queue might be empty now)`);
}
console.log('');
// Restore messages for next test
console.log('Restoring messages for next test...');
await channel.purgeQueue(testQueueName);
for (let i = 0; i < messageCount; i++) {
const msg = { id: `test-${i}`, content: `Message ${i + 1}` };
channel.sendToQueue(
testQueueName,
Buffer.from(JSON.stringify(msg)),
{ persistent: true }
);
}
await new Promise(resolve => setTimeout(resolve, 200));
console.log('✓ Restored\n');
// Test 3: NEW METHOD - noAck:true (no consumption)
console.log('──────────────────────────────────────────────────────────');
console.log('Test 3: NEW METHOD - Get with noAck:true (peek only)');
queueInfo = await channel.checkQueue(testQueueName);
const beforeCount = queueInfo.messageCount;
console.log(` Queue count before: ${beforeCount}`);
const newMethodMessages = [];
for (let i = 0; i < 10; i++) {
const msg = await channel.get(testQueueName, { noAck: true });
if (!msg) break;
newMethodMessages.push(JSON.parse(msg.content.toString()));
// No nack/ack needed - noAck:true auto-acknowledges without consuming
}
queueInfo = await channel.checkQueue(testQueueName);
const afterCount = queueInfo.messageCount;
console.log(` Retrieved: ${newMethodMessages.length} messages`);
console.log(` Queue count after: ${afterCount}`);
if (newMethodMessages.length === beforeCount && afterCount === 0) {
console.log(`✓ CORRECT: Messages were consumed (noAck:true auto-acks)`);
console.log(` Note: noAck:true is for consuming, not peeking!`);
} else if (newMethodMessages.length === beforeCount && afterCount === beforeCount) {
console.log(`✓ PERFECT: Messages peeked without consumption`);
} else {
console.log(`? Unexpected behavior: retrieved=${newMethodMessages.length}, before=${beforeCount}, after=${afterCount}`);
}
console.log('');
// Test 4: Management API method (best for peeking)
console.log('──────────────────────────────────────────────────────────');
console.log('Test 4: Management API method (HTTP API)');
console.log(' Note: This requires RabbitMQ Management plugin');
console.log(' URL: http://localhost:15672/api/queues/%2f/test_dlq_messages/get');
console.log(' Method: POST with {"count":10,"ackmode":"ack_requeue_false"}');
console.log(' This is the BEST way to peek without affecting queue state\n');
// Cleanup
console.log('──────────────────────────────────────────────────────────');
console.log('Cleanup...');
await channel.purgeQueue(testQueueName);
await channel.deleteQueue(testQueueName);
console.log('✓ Queue deleted\n');
// Summary
console.log('══════════════════════════════════════════════════════════');
console.log('Summary');
console.log('══════════════════════════════════════════════════════════');
console.log('OLD METHOD (noAck:false + nack requeue):');
console.log(' ✗ Causes message duplication');
console.log(' ✗ Can read same message multiple times');
console.log(' ✗ This was the bug in getDLQMessages_get');
console.log('');
console.log('NEW METHOD (noAck:true):');
console.log(' ✓ No duplication');
console.log(' ⚠ Messages are auto-acknowledged (consumed)');
console.log(' ⚠ For true peeking, use checkQueue + limit by actual count');
console.log('');
console.log('BEST METHOD (Management API):');
console.log(' ✓ True peeking without consumption');
console.log(' ✓ Requires Management plugin');
console.log(' ✓ More complex to implement');
console.log('');
return 0;
} catch (error) {
console.error('\n✗ Test failed:', error.message);
console.error(error.stack);
return 1;
} finally {
if (channel) await channel.close().catch(() => {});
if (connection) await connection.close().catch(() => {});
}
}
main()
.then(exitCode => process.exit(exitCode))
.catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});