agmission/Development/server/docs/archived/DLQ_NON_DESTRUCTIVE_IMPLEMENTATION.md

7.8 KiB

DLQ Non-Destructive Message Retrieval - Implementation Summary

Date: January 20, 2026
Changes: Queue-agnostic DLQ operations + RabbitMQ Management API integration


What Was Fixed

1. Non-Destructive Message Peeking (/api/dlq/:queueName/messages)

Exclusively uses RabbitMQ Management API for true non-destructive peeking.

Implementation: Management API Only

// POST http://localhost:15672/api/queues/%2f/queue_name/get
{
  "count": 50,
  "ackmode": "ack_requeue_true",  // ← TRUE non-destructive peek
  "encoding": "auto"
}

Benefits:

  • True peeking - messages never leave queue
  • Order preserved - no requeuing needed
  • No race conditions - atomic operation
  • Multiple concurrent reads - safe for simultaneous access
  • No fallback complexity - single, reliable method

Requirements:

  • RabbitMQ Management plugin enabled: rabbitmq-plugins enable rabbitmq_management
  • Set RABBITMQ_MGMT_ENABLED=true in environment (default: true)
  • Default port: 15672

Error Handling: If Management API is not available, endpoint returns 503 error with clear message:

{
  "error": {
    ".tag": "unknown_app_error",
    "message": "RabbitMQ Management API not available. Ensure plugin is enabled: rabbitmq-plugins enable rabbitmq_management"
  }
}

Response Format:

{
  "messages": [
    {
      "position": 1,
      "taskInfo": {...},
      "errorMessage": "...",
      "retryCount": 0,
      "enqueuedAt": "2026-01-20T10:30:00Z",
      "headers": {...},
      "redelivered": false
    }
  ],
  "count": 3,
  "queueName": "dev_partner_tasks_failed",
  "method": "management-api"
}

Note: AMQP fallback has been removed. The endpoint now exclusively uses Management API for guaranteed non-destructive operation.


2. Queue-Agnostic DLQ Operations

Fixed: All retry operations now use standard _failed suffix (not _dlq)

Before (inconsistent):

// ❌ Some used _dlq, some used _failed
const dlqName = `${queueName}_dlq`;  // Wrong!

After (consistent):

// ✅ All operations use _failed
const dlqName = `${queueName}_failed`;  // Standard convention

Endpoints Fixed:

  • /api/dlq/:queueName/retryAll
  • /api/dlq/:queueName/retryByPosition
  • /api/dlq/:queueName/retryByHeader
  • /api/dlq/:queueName/messages
  • /api/dlq/:queueName/stats
  • /api/dlq/:queueName/purge

Now works for ANY queue:

# Partner tasks
curl http://localhost:4100/api/dlq/dev_partner_tasks/messages

# Job processing
curl http://localhost:4100/api/dlq/dev_jobs/messages

# Any custom queue
curl http://localhost:4100/api/dlq/my_custom_queue/messages

3. RetryAll Improvements

What /retryAll Does:

  • Moves messages from DLQ back to main queue
  • Adds retry metadata headers
  • Processes up to maxMessages (default: 100)

Response Format:

{
  "success": true,
  "processed": 42,
  "retriedCount": 42,  // Deprecated: use 'processed'
  "queueName": "dev_partner_tasks",
  "dlqName": "dev_partner_tasks_failed"
}

Usage:

# Retry all messages (up to 100)
curl -X POST http://localhost:4100/api/dlq/dev_partner_tasks/retryAll

# Retry specific count
curl -X POST http://localhost:4100/api/dlq/dev_partner_tasks/retryAll \
  -H "Content-Type: application/json" \
  -d '{"maxMessages": 10}'

Is it queue-agnostic? YES - Works with ANY queue type:

  • dev_partner_tasksdev_partner_tasks_failed
  • dev_jobsdev_jobs_failed
  • notificationsnotifications_failed
  • Any custom queue with _failed DLQ

📁 Files Modified

Core Implementation

Test Scripts


🧪 Testing

Quick Test (No Auth Required)

# Demonstrates old bug vs new fix
node tests/test_dlq_messages_direct.js

# Test Management API integration
node tests/test_dlq_mgmt_api.js

Full Integration Test (Requires Auth)

# Get admin token first, then:
node tests/test_dlq_routes.js --queue dev_partner_tasks --token YOUR_TOKEN

🔧 Configuration

Enable Management API (Required)

1. Enable plugin:

rabbitmq-plugins enable rabbitmq_management

2. Configure user permissions:

RabbitMQ users need specific tags to access the Management API. The agm user needs the monitoring or management tag.

# Option A: Add monitoring tag (read-only access - RECOMMENDED)
rabbitmqctl set_user_tags agm monitoring

# Option B: Add management tag (full management access)
rabbitmqctl set_user_tags agm management

# Option C: Keep existing tags and add monitoring
rabbitmqctl set_user_tags agm monitoring policymaker

# Verify user tags
rabbitmqctl list_users

User Tag Permissions:

  • monitoring: Can view queues, connections, channels (read-only) Recommended for production
  • management: Full management access (create/delete queues, etc.)
  • administrator: Full admin access (user management, vhosts, etc.)

3. Set vhost permissions (if needed):

# Ensure agm user has access to the vhost
rabbitmqctl set_permissions -p / agm ".*" ".*" ".*"

4. Verify access:

# Test authentication
curl -u agm:Ag@Rabbit2024 http://localhost:15672/api/overview

# Should return JSON with RabbitMQ cluster info
# If 401 error: user lacks Management API tags
# If connection refused: Management plugin not enabled

5. Configure in environment:

RABBITMQ_MGMT_PORT=15672
RABBITMQ_MGMT_ENABLED=true  # Default: true

6. Restart server:

node server.js

Troubleshooting 401 Errors

If you see 401 errors:

# Check current user tags
rabbitmqctl list_users
# Output: agm []  ← No tags means no Management API access!

# Add monitoring tag
rabbitmqctl set_user_tags agm monitoring

# Verify
rabbitmqctl list_users
# Output: agm [monitoring]  ← Now has access!

Without Management API

Endpoint will return 503 error with instructions to enable the plugin. No fallback is provided to ensure truly non-destructive operation.


📊 Performance Comparison

Method Truly Non-Destructive? Order Preserved? Concurrent Safe? Requires Plugin? Status
Management API Yes Yes Yes ⚠️ Yes ACTIVE
AMQP Batch ⚠️ Mostly No ⚠️ Risky No REMOVED
Old AMQP Loop No No No No REMOVED

🎯 Summary

Problem → Solution

  1. Message Duplication → Removed AMQP fallback entirely
  2. No True Peeking → Exclusively use Management API
  3. Inconsistent Naming → Standardized _failed suffix
  4. Not Queue-Agnostic → Works with any queue type

Key Takeaways

  • /messages endpoint is now truly non-destructive (Management API only)
  • All DLQ operations work with ANY queue type
  • Clear error messages when Management API unavailable
  • Test scripts prove correctness
  • Simplified codebase (removed fallback complexity)
  • ⚠️ Breaking change: Requires Management plugin enabled

Next Steps

  1. Enable RabbitMQ Management plugin (required)
  2. Run test scripts to verify setup
  3. Monitor DLQ operations via /stats endpoint
  4. Use /messages for debugging without side effects