agmission/Development/server/docs/DLQ_ARCHITECTURE_DIAGRAMS.md

7.7 KiB

Partner DLQ System Architecture Diagrams

System Overview

graph TB
    subgraph Users
        Web[Web Dashboard]
        API[API Client]
        CLI[CLI Tool]
    end
    
    Web -->|HTTP/REST| Router
    API -->|HTTP/REST| Router
    CLI -->|HTTP/REST| Router
    
    Router[Express Router<br/>/api/dlq/*]
    
    Router --> Auth[Authentication<br/>authAllowAdmin]
    Auth --> Controller[Partner DLQ Controller<br/>partner_dlq.js]
    
    Controller --> RabbitMQ[RabbitMQ DLQ Queue]
    Controller --> MongoDB[MongoDB Tracker Status]
    
    subgraph Background Services
        Worker[DLQ Handler Worker<br/>partner_dlq_handler.js<br/>- Monitors DLQ<br/>- Auto-processes messages<br/>- Categorizes errors]
    end
    
    Worker -.-> RabbitMQ
    Worker -.-> MongoDB

Message Flow

flowchart TD
    Polling[Polling Worker<br/>Downloads] -->|Enqueue Task| Queue[Partner Tasks Queue<br/>partner_tasks]
    Queue -->|Consume| Sync[Sync Worker<br/>Process]
    
    Sync -->|Success| Processed[Status: PROCESSED]
    Sync -->|Retry < Max| Requeue[Back to Main Queue]
    Sync -->|Max Retries| DLQ[DLQ - Failed<br/>partner_tasks_failed]
    
    DLQ -->|DLQ Handler| Handler{Error Analysis}
    
    Handler -->|Transient<br/>Age < 2h| Retry[RETRY<br/>Requeue]
    Handler -->|Validation<br/>Non-recoverable| Archive1[ARCHIVE<br/>Record]
    Handler -->|Other<br/>Age > 24h| Archive2[ARCHIVE<br/>Record]

Error Categorization

flowchart TD
    Failed[Failed Message in DLQ] --> Analyze[Analyze Error Message]
    
    Analyze --> Keyword[Keyword Matching]
    Analyze --> Pattern[Pattern Recognition]
    Analyze --> Context[Context Analysis]
    
    Keyword --> Category[Error Category]
    Pattern --> Category
    Context --> Category
    
    Category --> Transient[TRANSIENT<br/>• timeout<br/>• network<br/>• connref]
    Category --> Validation[VALIDATION<br/>• invalid<br/>• missing<br/>• format]
    Category --> Processing[PROCESSING<br/>• parse<br/>• calc err<br/>• data]
    Category --> Infrastructure[INFRASTRUCTURE<br/>• database<br/>• fs error<br/>• disk]
    Category --> PartnerAPI[PARTNER API<br/>• auth<br/>• rate limit]
    Category --> Unknown[UNKNOWN<br/>• ???]
    
    Transient -->|Age < 2h| Retry1[RETRY]
    Validation --> Archive1[ARCHIVE]
    Processing --> Keep1[KEEP]
    Infrastructure -->|Retry w/ backoff| Retry2[RETRY]
    PartnerAPI -->|Retry w/ delay| Retry3[RETRY]
    Unknown -->|Manual Review| Keep2[KEEP]

API Endpoint Structure

graph TD
    Root[/api/dlq/]
    
    Root --> Stats[GET /stats]
    Root --> Messages[GET /messages?limit=50]
    Root --> Process[POST /process]
    Root --> RetryAll[POST /:queueName/retryAll]
    Root --> RetryPos[POST /:queueName/retryByPosition]
    Root --> RetryHeader[POST /:queueName/retryByHeader]
    Root --> Purge[DELETE /purge]
    
    Stats --> RMQ1[RabbitMQ.checkQueue<br/>messageCount, consumerCount]
    Stats --> Mongo1[MongoDB.aggregate<br/>status counts]
    Stats --> Mongo2[MongoDB.find<br/>recent failures]
    
    Messages --> RMQ2[RabbitMQ.get noAck=false<br/>Peek & requeue messages]
    
    Process --> RMQ3[RabbitMQ.get in loop<br/>Parse message]
    Process --> Mongo3[MongoDB.findOne<br/>Get error details]
    Process --> Cat[categorizeError<br/>Determine category]
    Cat --> Decision{Decision Logic}
    Decision -->|RETRY| Update1[Update tracker + requeue]
    Decision -->|ARCHIVE| Update2[Update tracker + ack]
    Decision -->|KEEP| Requeue[Requeue unchanged]
    
    Retry --> Mongo4[MongoDB.findById<br/>Validate tracker]
    Retry --> Mongo5[MongoDB.updateOne<br/>Reset status to downloaded]
    Retry --> RMQ4[RabbitMQ.sendToQueue<br/>Enqueue with retry headers]
    
    Archive --> Mongo6[MongoDB.updateOne<br/>Set status=archived + metadata]
    
    Purge --> Validate[Validate confirm=true]
    Validate --> RMQ5[RabbitMQ.purgeQueue<br/>Clear all DLQ messages]

Web Dashboard Architecture

graph TD
    Dashboard[dlq-monitor.html]
    
    subgraph HTML Structure
        Stats[Statistics Grid<br/>6 cards:<br/>DLQ Messages, Failed Tasks<br/>Processing, Downloaded<br/>Processed, Archived]
        Actions[Actions Section<br/>Refresh, Process DLQ<br/>Dry Run, Purge]
        Failures[Failures List<br/>Recent 20 failures<br/>Error details, Category badge<br/>Retry/Archive buttons]
    end
    
%%     subgraph CSS Styling
%%         Gradient[Gradient background]
%%         Cards[Card layouts Grid]
%%         Colors[Color coding:<br/>Red=danger, Green=success<br/>Yellow=warning, Blue=info]
%%         Responsive[Responsive design]
%%     end
    
    subgraph JavaScript Logic
        Refresh[refreshStats<br/>fetch /api/dlq/partner_tasks/stats]
        ProcessDLQ[processDLQ dryRun<br/>fetch /api/dlq/partner_tasks/process]
        RetryAll[retryAll queueName<br/>fetch /api/dlq/:queueName/retryAll]
        RetryByPos[retryByPosition<br/>fetch /api/dlq/:queueName/retryByPosition]
        RetryByHeader[retryByHeader<br/>fetch /api/dlq/:queueName/retryByHeader]
        PurgeDLQ[purgeDLQ<br/>fetch /api/dlq/partner_tasks/purge]
        Categorize[categorizeError<br/>Pattern matching client-side]
        AutoRefresh[Auto-refresh 30s<br/>setInterval refreshStats 30000]
    end
    
    Dashboard --> Stats
    Dashboard --> Actions
    Dashboard --> Failures
    Dashboard --> Gradient
    Dashboard --> Cards
    Dashboard --> Colors
    Dashboard --> Responsive
    Dashboard --> Refresh
    Dashboard --> ProcessDLQ
    Dashboard --> RetryTask
    Dashboard --> ArchiveTask
    Dashboard --> PurgeDLQ
    Dashboard --> Categorize
    Dashboard --> AutoRefresh
    
    Refresh -->|Updates| Stats
    Refresh -->|Populates| Failures

Data Models

erDiagram
    PartnerLogTracker ||--o{ Partner : "partnerId"
    PartnerLogTracker ||--o{ Customer : "customerId"
    
    PartnerLogTracker {
        ObjectId _id
        String logFileName
        ObjectId partnerId
        ObjectId customerId
        String status
        String errorMessage
        Number retryCount
        Date processingStartedAt
        Date updatedAt
        Date archivedAt
        String archivedReason
        String archivedBy
    }
    
    Partner {
        String name
        String code
    }
    
    Customer {
        String name
        String email
    }
    
    DLQMessage {
        Buffer content
        Object properties
        Object fields
    }
    
    DLQMessage ||--|| Properties : has
    DLQMessage ||--|| Fields : has
    
    Properties {
        Date timestamp
        Object headers
        String deliveryTag
    }
    
    Fields {
        String routingKey
        Boolean redelivered
    }

Status Values:

  • downloaded → Initial state
  • processing → Currently being processed
  • processed → Successfully completed
  • failed → Triggers DLQ
  • archived → Manually archived from DLQ

DLQ Message Headers:

  • x-retry-count - Number of retry attempts
  • x-death - Death information from RabbitMQ
  • x-retry-from-dlq - Flag indicating manual retry

Security Flow

flowchart TD
    Request[HTTP Request<br/>Header: Authorization Bearer token]
    
    Request --> Router[Express Router]
    Router --> Auth[authAllowAdmin Middleware]
    
    Auth --> Verify{Verify JWT token}
    
    Verify -->|Invalid| Unauth[401 Unauthorized]
    Verify -->|Valid| Role{Check Role}
    
    Role -->|Admin| Allow[req.user = decoded<br/>Allow access]
    Role -->|User| Forbid1[403 Forbidden]
    Role -->|Other| Forbid2[403 Forbidden]
    
    Allow --> Controller[Partner DLQ Controller<br/>Authorized Access]

These diagrams provide a comprehensive visual understanding of the Partner DLQ system architecture, data flow, and component interactions.