agmission/Development/server/docs/PARTNER_INTEGRATION_ARCHITECTURE.md

34 KiB
Raw Blame History

Partner Integration Architecture Documentation

Recent Updates Summary (August 2025)

Binary Processing Architecture Enhancement

  • SatLocBinaryProcessor: New wrapper around proven SatLocLogParser

    • 100% parsing success rate (21,601/21,601 records)
    • Enhanced application statistics calculation
    • Memory-efficient processing delegation
    • Comprehensive spray/environmental metrics
  • File Download Integration: Enhanced polling worker functionality

    • Downloads and stores partner files locally before processing
    • Separates file acquisition from file processing
    • Improved reliability and error handling
    • Local file path tracking in PartnerLogTracker

Worker Responsibilities Update

  • Partner Sync Worker:

    • Consumes UPLOAD_PARTNER_JOB and PROCESS_PARTNER_LOG tasks from partner_tasks queue
    • UPLOAD_PARTNER_JOB: calls partnerSyncService.uploadJobToPartner() → stores extJobId on JobAssign, sets status = UPLOADED
    • PROCESS_PARTNER_LOG: claims PartnerLogTracker atomically, runs SatLocLogParser + SatLocApplicationProcessor → creates ApplicationDetail records
    • Circuit breaker prevents repeated failures on problematic files
    • Uses individual partner system user credentials (no global environment variables)
  • Partner Data Polling Worker:

    • Cron-driven (every 15 min prod / 1 min dev); does not consume queue tasks
    • Queries JobAssign where status = UPLOADED to find jobs successfully sent to partner
    • Groups by partnerCode + customerId; calls partnerService.getAircraftLogs(customerId, aircraftId)
    • Downloads new log files via partnerService.getAircraftLogData(customerId, logId) → stored to SATLOC_STORAGE_PATH
    • Tracks per-log state in PartnerLogTracker (PENDING → DOWNLOADING → DOWNLOADED)
    • Enqueues PROCESS_PARTNER_LOG tasks to partner_tasks queue
    • Periodic cleanup of stuck DOWNLOADING / DOWNLOADED / PROCESSING tracker records
  • Job Worker:

    • Handles only internal data submitted by internal systems/clients
    • Removed partner task processing (moved to dedicated partner sync worker)
    • Focuses on traditional job processing workflows

Queue Architecture Update

  • Dedicated Partner Queue: partner_tasks (production) / dev_partner_tasks (development)
  • DLQ: partner_tasks_failed — dead-letter queue; managed via global /api/dlq/:queueName/* endpoints (Step 8)
  • Job Upload Triggers: After successful job upload to partner, polling worker discovers and processes returned log files
  • Task Types: upload_partner_job, process_partner_log, process_partner_data_file (see PartnerTasks in helpers/constants.js)
  • Polling: cron in partner_data_polling_worker.js — every 15 min (prod) / 1 min (dev); not queue-driven

Model Changes

  • Job Assignment Model (model/job_assign.js): Fields relevant to partner integration:
    • extJobId: External job ID returned by partner after successful upload
    • notes: Optional free-text notes
    • status: AssignStatus enum — NEW=0, DOWNLOADED=1, UPLOADED=2
    • partnerAircraftId is on the assigned User's partnerInfo.partnerAircraftId (not on JobAssign directly)

API Endpoint Updates

  • Partner Management APIs: Updated to use RESTful conventions with consistent :id parameters

    • GET /api/partners (list partners)
    • GET /api/partners/:id (get partner details)
    • POST /api/partners (create partner)
    • PUT /api/partners/:id (update partner)
    • DELETE /api/partners/:id (soft delete partner)
  • Partner System User APIs: Full RESTful CRUD operations

    • GET /api/partners/systemUsers (list all system users)
    • POST /api/partners/systemUsers (create system user)
    • POST /api/partners/systemUsers/testAuth (test partner credentials)
    • GET /api/partners/systemUsers/:id (get system user details)
    • PUT /api/partners/systemUsers/:id (update system user)
    • DELETE /api/partners/systemUsers/:id (soft delete system user)

    Single-record operations also work through the standard user endpoints:

    • GET /api/users/:id (auto-populates partner + parent refs for PARTNER_SYSTEM_USER kind)
    • PUT /api/users/:id (update via generic user handler)
    • DELETE /api/users/:id (soft delete — active: false — for PARTNER_SYSTEM_USER kind)
  • Other Partner Routes:

    • GET /api/partners/customers (list customers for a partner, with subscription info)
    • GET /api/partners/aircraft (get aircraft list from partner API)
    • POST /api/partners/uploadJob (manually trigger job upload to partner)
    • POST /api/partners/syncData (trigger partner data sync)
  • Global DLQ APIs (all queues including partner_tasks):

    • GET /api/dlq/:queueName/list
    • POST /api/dlq/:queueName/retryAll
    • POST /api/dlq/:queueName/retryByPosition
    • POST /api/dlq/:queueName/retryByHeader
    • POST /api/dlq/:queueName/purge

SatLoc Integration Updates

  • API Endpoints: Updated to match actual SatLoc Cloud API structure
    • GET /api/Satloc/IsAlive - Health check
    • GET /api/Satloc/AuthenticateAPIUser - Authentication
    • GET /api/Satloc/GetAircraftList - Get aircraft list
    • GET /api/Satloc/GetAircraftLogs - Get aircraft logs
    • GET /api/Satloc/GetAircraftLogData - Get log data
    • POST /api/Satloc/UploadJobData - Upload job data

Service Layer Updates

  • SatLoc Service: Updated to match actual API endpoints and responses
  • Environment Configuration: Added proper SatLoc configuration parameters

Current Architecture Status (Feb 2026)

This is the authoritative current-state summary. See individual sections below for diagrams and detail.

Active Partners

Code Service File Status
SATLOC services/satloc_service.js Active
AGIDRONEX (not yet implemented) 🔲 Planned (config stub exists in helpers/partner_config.js)

Service Layer

  • services/base_partner_service.js — abstract base; defines contract: uploadJobDataToAircraft, healthCheck, getAircraftList, getAircraftLogs, getStoragePath, resolveLogFilePath
  • helpers/partner_service_factory.js — singleton factory with instance cache; used by all workers and services/partner_sync_service.js
  • services/partner_sync_service.js — orchestrates upload/sync flows; used by partner_sync_worker.js
  • services/satloc_service.js — full SatLoc implementation; Redis-backed auth cache, per-customer PartnerSystemUser credential lookup

Queue System

Queue Env Var Dev Name Purpose
Partner tasks QUEUE_NAME_PARTNER dev_partner_tasks All partner operations
DLQ (auto) dev_partner_tasks_failed Failed messages

Task types (PartnerTasks in helpers/constants.js):

  • UPLOAD_PARTNER_JOB — upload job to partner aircraft
  • PROCESS_PARTNER_LOG — process a downloaded log file
  • PROCESS_PARTNER_DATA_FILE — process already-downloaded local file

Note: Polling is cron-driven; there is no POLL_PARTNER_DATA queue task.

Workers

Worker Responsibility
partner_data_polling_worker.js Cron polls partner APIs (15 min prod / 1 min dev), downloads log files, creates PartnerLogTracker, enqueues PROCESS_PARTNER_LOG
partner_sync_worker.js Consumes partner_tasks queue; handles uploads and log processing
job_worker.js Internal job processing only; no partner tasks

Tracking Models

  • model/partner_log_tracker.js — per-log-file lifecycle: PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED/FAILED/SKIPPED
  • model/task_tracker.js — per-queue-task lifecycle (added Phase 2, Jan 2026): QUEUED → PROCESSING → COMPLETED/FAILED; supports retry chain tracking, stuck-task detection

Auth/Credential Flow

  1. PartnerSystemUser record (kind: PARTNER_SYSTEM_USER) stores per-customer credentials
  2. satloc_service.getCachedAuth(customerId) — checks Redis first; authenticates via SatLoc API on miss; caches JWT with TTL; auto-retries once on expired cache

DLQ Recovery

All DLQ operations use queue-native global API (no MongoDB coupling):

POST /api/dlq/partner_tasks/retryAll
POST /api/dlq/partner_tasks/retryByPosition  { "position": 0 }
POST /api/dlq/partner_tasks/retryByHeader    { "headerName": "x-partner-code", "headerValue": "SATLOC" }

Overview

This document provides comprehensive design diagrams and documentation for the multi-partner integration system in the AgMission platform. The system is designed to handle job assignments, data downloads, and processing for multiple partners including Satloc and future integrations.

User Architecture Design

Internal vs Partner System Users

The partner integration system employs a dual-user architecture that separates assignment concerns from API credential management:

  1. Internal Users: Regular AgMission users who receive job assignments

    • Used for all job assignments in the JobAssign collection
    • Maintain normal user permissions and access patterns
    • Assignment API accepts only internal user IDs in the uid field
  2. Partner System Users: Credential-only records for external API access

    • Store partner-specific authentication credentials (API keys, tokens, etc.)
    • Used exclusively by workers for communicating with partner external APIs
    • Never used directly for job assignments
    • Managed through /api/partners/systemUsers (collection) or /api/users/:id (single record)
    • GET /api/users/:id populates partner + parent refs; DELETE /api/users/:id performs soft delete

Assignment Flow

graph LR
    A[Job Assignment Request] --> B[Internal User ID]
    B --> C[JobAssign Record]
    C --> D[Partner Detection]
    D --> E[Worker Queue]
    E --> F[Partner System User Credentials]
    F --> G[External Partner API]

Key Principle: Job assignments always use internal user IDs, while partner system users provide the credentials needed for external API communication.

Table of Contents

  1. User Architecture Design
  2. System Architecture Overview
  3. Current State Analysis
  4. Enhanced Architecture Design
  5. Data Flow Diagrams
  6. Database Schema
  7. API Design
  8. Sequence Diagrams
  9. Error Handling and Retry Logic
  10. Monitoring and Observability
  11. Deployment Architecture

System Architecture Overview

Current Architecture

Reflects actual implementation (Feb 2026).

graph TB
    subgraph "Job Assignment  (controllers/job.js)"
        A["POST /api/jobs/assign"] --> B["Create JobAssign<br/>status = NEW"]
        B --> C{Partner aircraft?}
        C -->|Yes| D[checkPartnerAPIHealth]
        D -->|API live| E["uploadJobToPartner<br/>(immediate, in transaction)"]
        D -->|API offline or fails| F["Queue UPLOAD_PARTNER_JOB<br/>→ partner_tasks"]
        E --> G["JobAssign status = UPLOADED<br/>extJobId stored"]
        F --> PQ[(partner_tasks queue)]
        PQ --> SW
        SW[partner_sync_worker] -->|"UPLOAD_PARTNER_JOB"| E
        C -->|No| J["JobAssign status = NEW<br/>(internal only)"]
    end

    subgraph "Data Polling  (partner_data_polling_worker — cron)"
        CRON["Cron<br/>15 min prod / 1 min dev"] --> PA
        PA["Query JobAssign<br/>status = UPLOADED"] --> PG
        PG["Group by partnerCode + customerId"] --> AL
        AL["getAircraftLogs<br/>(per aircraft)"] --> DL
        DL["Download log file<br/>getAircraftLogData<br/>→ SATLOC_STORAGE_PATH"] --> TRK
        TRK["PartnerLogTracker<br/>PENDING → DOWNLOADING → DOWNLOADED"] --> EQ
        EQ["Queue PROCESS_PARTNER_LOG<br/>→ partner_tasks"]
    end

    subgraph "Log Processing  (partner_sync_worker)"
        EQ --> SW2[partner_sync_worker]
        SW2 -->|"PROCESS_PARTNER_LOG<br/>(claim tracker atomically)"| PARSE
        PARSE["SatLocLogParser<br/>+ SatLocApplicationProcessor"] --> OUT
        OUT["ApplicationFile +<br/>ApplicationDetail records"]
        PARSE --> DONE["PartnerLogTracker → PROCESSED"]
        PARSE -->|on error| DLQ["DLQ: partner_tasks_failed"]
    end

    subgraph "SatLoc Cloud API"
        SLAPI["POST /UploadJobData<br/>GET /GetAircraftLogs<br/>GET /GetAircraftLogData"]
    end

    E --> SLAPI
    AL --> SLAPI
    DL --> SLAPI

    style A fill:#e1f5fe
    style CRON fill:#fff9c4
    style DONE fill:#d4edda
    style DLQ fill:#f8d7da

Multi-Partner Architecture (Actual Implementation)

graph TB
    subgraph "Partner Service Layer"
        PSF["helpers/partner_service_factory.js<br/>(singleton, instance cache)"]
        BASE["services/base_partner_service.js<br/>(abstract base)"]
        SLSVC["services/satloc_service.js<br/>(SatLoc impl, Redis auth cache)"]
        PSS["services/partner_sync_service.js<br/>(orchestrates upload/sync)"]
        PSF --> SLSVC
        SLSVC --> BASE
        PSS --> PSF
    end

    subgraph "Workers"
        PW["partner_data_polling_worker<br/>(cron-driven)"]
        SW["partner_sync_worker<br/>(queue consumer)"]
        PW --> PSF
        SW --> PSS
    end

    subgraph "Queue"
        Q[("partner_tasks queue<br/>dev_partner_tasks in dev")]
        DLQ[("partner_tasks_failed DLQ")]
    end

    subgraph "External APIs"
        SLAPI["SatLoc Cloud API<br/>https://satloccloudfc.com/api/Satloc"]
        FUTURE["Future Partners<br/>(AGIDRONEX, etc)"]  
    end

    subgraph "Data"
        DB[(MongoDB)]
        FS["Local File Storage<br/>SATLOC_STORAGE_PATH"]
        REDIS[(Redis auth cache)]
    end

    PW -->|"1. getAircraftLogs"| SLAPI
    PW -->|"2. getAircraftLogData"| SLAPI
    PW -->|"3. save file"| FS
    PW -->|"4. Queue PROCESS_PARTNER_LOG"| Q
    Q -->|consume| SW
    SW -->|"UPLOAD_PARTNER_JOB: POST /UploadJobData"| SLAPI
    SW -->|"PROCESS_PARTNER_LOG: read file"| FS
    SW -->|"write ApplicationDetail"| DB
    SW -->|on error| DLQ
    SLSVC <-->|JWT cache| REDIS

    style PSF fill:#e8f5e8
    style Q fill:#fff9c4
    style DLQ fill:#f8d7da
    style SLAPI fill:#e3f2fd

⚠️ The sections below (Current State Analysis through Roadmap) are the original design specification from the initial architecture phase, JulAug 2025. They describe intended design patterns and may not match the current implementation. For current state see the Current Architecture Status section above.

Original Design: Enhanced Multi-Partner Architecture

graph TB
    subgraph "Core Platform"
        A[Job Management] --> B[Partner Registry]
        B --> C[Partner Service Factory]
        C --> D[Partner Adapters]
        
        E[Enhanced JobAssign] --> F[Sync Queue System]
        F --> G[Data Polling Service]
        G --> H[Data Processing Pipeline]
    end
    
    subgraph "Partner Services"
        D --> I[Satloc Service]
        D --> J[Partner 2 Service]
        D --> K[Future Partner Services]
    end
    
    subgraph "External APIs"
        I --> L[Satloc API]
        J --> M[Partner 2 API]
        K --> N[Partner N API]
    end
    
    subgraph "Data Storage"
        H --> O[Enhanced Application Schema]
        H --> P[Partner Metadata Store]
        E --> Q[Extended JobAssign Schema]
    end
    
    style A fill:#e8f5e8
    style F fill:#fff3e0
    style O fill:#f3e5f5

Current State Analysis

Original pre-implementation analysis — kept for historical context.

Existing Components

1. Job Assignment Flow

  • Controller: controllers/job.js - assign_post() function
  • Model: model/job_assign.js - Simple assignment tracking
  • Process: Store assignments in job_assign collection → Clients poll for assignments

2. Job Download Flow

  • Controller: controllers/export.js - newJobs_post(), downloadJob_post()
  • Process: Clients poll → Download job data → Update assignment status

3. Data Processing Flow

  • Collections: application, application_file, application_detail
  • Queue: RabbitMQ coordination
  • Worker: job_worker.js handles data processing

Current Limitations

  1. Single Integration Pattern: Designed for internal workflow only
  2. No Partner Abstraction: Direct API calls without abstraction layer
  3. Limited Retry Logic: Basic error handling without sophisticated retry
  4. No State Tracking: Minimal sync state management
  5. Polling Only: No push notification capabilities

Enhanced Architecture Design

Original core design components (design spec).

Core Components

1. Partner Service Interface

interface PartnerService {
  getPartnerCode(): string;
  assignJob(job: Job, aircraft: Aircraft): Promise<PartnerJobResult>;
  downloadFlightData(aircraftId: string, jobId: string): Promise<FlightDataFile[]>;
  uploadJobDefinition(job: Job): Promise<string>;
  processMissionData(data: any): Promise<MissionResult>;
  syncJobStatus(externalJobId: string): Promise<JobStatus>;
  convertToInternalFormat(data: any, format: string): Promise<InternalData[]>;
}

2. Partner Registry

class PartnerRegistry {
  private partners: Map<string, PartnerService> = new Map();
  
  register(partnerCode: string, service: PartnerService): void;
  get(partnerCode: string): PartnerService | undefined;
  getAll(): PartnerService[];
  isPartnerSupported(partnerCode: string): boolean;
}

3. Enhanced Data Models

interface EnhancedJobAssign {
  _id: ObjectId;
  user: ObjectId;
  job: ObjectId;
  status: AssignStatus;
  date: Date;
  
  // Partner integration fields
  partnerCode: 'internal' | 'satloc' | string;
  externalJobId?: string;
  partnerMetadata?: any;
  
  // Sync state tracking
  syncState: {
    jobUpload: SyncStatus;
    dataPolling: SyncStatus;
  };
  
  // Retry configuration
  retryConfig: RetryConfig;
}

interface SyncStatus {
  status: 'pending' | 'syncing' | 'synced' | 'failed';
  attempts: number;
  lastAttempt?: Date;
  lastSuccess?: Date;
  error?: string;
}

Data Flow Diagrams

1. Job Assignment Flow with Partners

Shows actual implementation flow in controllers/job.js.

sequenceDiagram
    participant A as User (FE/API Client)
    participant JC as Job Controller
    participant PSS as partnerSyncService
    participant SA as SatLoc API
    participant Q as partner_tasks queue
    participant DB as Database

    A->>JC: POST /api/jobs/assign
    JC->>DB: Validate job + users
    DB-->>JC: Job + assignment data
    JC->>DB: Create JobAssign (status=NEW)

    Note over JC,PSS: Partner integration detected from user context
    JC->>PSS: checkPartnerAPIHealth(partnerCode)

    alt API is live
        PSS->>SA: GET /IsAlive
        SA-->>PSS: alive
        JC->>PSS: uploadJobToPartner(assignId)
        PSS->>SA: POST /UploadJobData
        SA-->>PSS: extJobId
        PSS->>DB: JobAssign status=UPLOADED, extJobId stored
        JC-->>A: Assignment complete
    else API offline or upload fails
        JC->>Q: Queue UPLOAD_PARTNER_JOB {assignId}
        JC-->>A: Assignment complete (upload queued)
        Note over Q,DB: partner_sync_worker processes later
        Q->>PSS: UPLOAD_PARTNER_JOB
        PSS->>SA: POST /UploadJobData
        SA-->>PSS: extJobId
        PSS->>DB: JobAssign status=UPLOADED, extJobId stored
    end

2. Data Polling and Processing Flow

Shows actual cron-driven polling in partner_data_polling_worker.js.

sequenceDiagram
    participant CR as Cron (15min/1min)
    participant PW as partner_data_polling_worker
    participant DB as Database
    participant PSS as partnerSyncService
    participant SA as SatLoc API
    participant FS as Local Storage
    participant Q as partner_tasks queue
    participant SW as partner_sync_worker

    CR->>PW: trigger pollAllPartnerSystems()
    PW->>DB: Query JobAssign WHERE status=UPLOADED
    DB-->>PW: Uploaded assignments (grouped by partnerCode+customerId)

    loop per aircraft
        PW->>SA: GET /GetAircraftLogs (customerId, aircraftId)
        SA-->>PW: log list
        PW->>DB: Filter out already-processed logs (PartnerLogTracker processed=true)

        loop per new log
            PW->>DB: Upsert PartnerLogTracker (PENDING → DOWNLOADING)
            PW->>SA: GET /GetAircraftLogData (customerId, logId)
            SA-->>PW: log file binary
            PW->>FS: Save to SATLOC_STORAGE_PATH
            PW->>DB: PartnerLogTracker → DOWNLOADED
            PW->>Q: Queue PROCESS_PARTNER_LOG {logId, logFileName, customerId, aircraftId, taskId, executionId}
            PW->>DB: PartnerLogTracker.enqueuedAt set
        end
    end

    Q->>SW: PROCESS_PARTNER_LOG
    SW->>DB: TaskTracker idempotency check (claim or skip)
    SW->>DB: PartnerLogTracker → PROCESSING (atomic claim)
    SW->>FS: Read log file
    SW->>SW: SatLocLogParser + SatLocApplicationProcessor
    SW->>DB: Save ApplicationFile + ApplicationDetail records
    SW->>DB: PartnerLogTracker → PROCESSED

3. Error Handling and Retry Flow

flowchart TD
    A[Operation Start] --> B{Operation Success?}
    B -->|Yes| C[Update Success State]
    B -->|No| D{Max Retries Reached?}
    D -->|No| E[Calculate Backoff Delay]
    E --> F[Increment Retry Count]
    F --> G[Schedule Retry]
    G --> H[Wait for Delay]
    H --> A
    D -->|Yes| I[Mark as Failed]
    I --> J[Send Alert]
    J --> K[Dead Letter Queue]
    C --> L[Continue Normal Flow]
    
    style C fill:#d4edda
    style I fill:#f8d7da
    style K fill:#fff3cd

Database Schema

Enhanced JobAssign Schema

const JobAssignSchema = new Schema({
  _id: ObjectId,
  user: { type: ObjectId, ref: 'User', required: true },
  job: { type: Number, ref: 'Job', required: true },
  status: { 
    type: Number, 
    enum: [0, 1, 2], // 0: pending, 1: downloaded, 2: completed
    default: 0 
  },
  date: { type: Date, default: Date.now },
  
  // Partner integration fields
  partnerCode: { 
    type: String, 
    enum: ['internal', 'satloc', 'other'], 
    default: 'internal' 
  },
  externalJobId: { type: String, sparse: true },
  partnerMetadata: { type: Schema.Types.Mixed },
  
  // Sync state tracking
  syncState: {
    jobUpload: {
      status: { 
        type: String, 
        enum: ['pending', 'syncing', 'synced', 'failed'], 
        default: 'pending' 
      },
      attempts: { type: Number, default: 0 },
      lastAttempt: Date,
      lastSuccess: Date,
      error: String
    },
    dataPolling: {
      status: { 
        type: String, 
        enum: ['idle', 'polling', 'synced', 'failed'], 
        default: 'idle' 
      },
      attempts: { type: Number, default: 0 },
      lastAttempt: Date,
      lastSuccess: Date,
      lastDataCheck: Date,
      error: String
    }
  },
  
  // Retry configuration
  retryConfig: {
    maxAttempts: { type: Number, default: 5 },
    backoffMultiplier: { type: Number, default: 2 },
    baseDelay: { type: Number, default: 5000 }
  }
});

// Indexes for performance
JobAssignSchema.index({ user: 1, status: 1 });
JobAssignSchema.index({ partnerCode: 1, 'syncState.jobUpload.status': 1 });
JobAssignSchema.index({ partnerCode: 1, 'syncState.dataPolling.status': 1 });

Enhanced Application Schema

const EnhancedApplicationSchema = new Schema({
  // Existing fields
  jobId: { type: Number, required: true },
  fileName: { type: String, required: true },
  fileSize: { type: Number, required: true },
  status: { type: Number, default: 1 },
  
  // Partner-specific fields
  partnerCode: { type: String, default: 'internal' },
  externalJobId: { type: String, sparse: true },
  partnerMetadata: { type: Schema.Types.Mixed },
  dataFormat: { 
    type: String, 
    enum: ['internal', 'satloc', 'other'], 
    default: 'internal' 
  },
  
  // Processing tracking
  processingStage: { 
    type: String, 
    enum: ['uploaded', 'parsing', 'processing', 'completed', 'failed'],
    default: 'uploaded'
  },
  processingStarted: Date,
  processingCompleted: Date,
  processingErrors: [String],
  
  // Performance metrics
  processingDuration: Number, // in milliseconds
  recordsProcessed: Number,
  conversionMetrics: {
    originalFormat: String,
    conversionTime: Number,
    dataLoss: Number // percentage
  }
});

API Design

Enhanced Job Assignment API

// POST /api/jobs/:jobId/assign
{
  "dlOp": { "type": 1 },
  "asUsers": [
    {
      "uid": "internal_user_id",  // Always use internal user IDs for assignments
      "partnerCode": "satloc",    // Optional, defaults to 'internal'
      "partnerConfig": {          // Partner-specific configuration
        "aircraftId": "AC001",
        "priority": "high"
      }
    }
  ],
  "avUsers": [...]
}

// Response
{
  "ok": true,
  "assignments": [
    {
      "userId": "internal_user_id",  // Assignment uses internal user ID
      "partnerCode": "satloc",
      "externalJobId": "satloc_job_123",
      "syncStatus": "synced"
    }
  ],
  "errors": []
}

Partner Management API

// GET /api/partners
[
  {
    "_id": "partner_id",
    "name": "Satloc",
    "code": "satloc",
    "active": true,
    "capabilities": ["job_upload", "data_download", "real_time_sync"],
    "apiVersion": "v2.1"
  }
]

// GET /api/partners/:partnerId/status
{
  "partnerId": "satloc",
  "status": "online",
  "lastSync": "2025-07-18T10:30:00Z",
  "activeJobs": 15,
  "pendingSyncs": 2,
  "errors": []
}

Enhanced Job Download API

// GET /api/export/newJobs
{
  "internal": [
    {
      "job": { "_id": 123, "name": "Field A" },
      "date": "2025-07-18T09:00:00Z",
      "assignmentId": "assign_123"
    }
  ],
  "partners": {
    "satloc": [
      {
        "job": { "_id": 124, "name": "Field B" },
        "date": "2025-07-18T09:15:00Z",
        "assignmentId": "assign_124",
        "externalJobId": "satloc_job_456",
        "syncStatus": "synced"
      }
    ]
  }
}

// POST /api/export/downloadJob
{
  "jobId": 124,
  "partnerType": "satloc",
  "format": "native" // or "converted"
}

Sequence Diagrams

Complete Job Assignment to Data Processing Flow

Accurate end-to-end sequence reflecting actual code (Feb 2026).

sequenceDiagram
    participant A as Admin
    participant JC as Job Controller
    participant PSS as partnerSyncService
    participant SA as SatLoc API
    participant Q as partner_tasks queue
    participant DB as Database
    participant CR as Cron (15min/1min)
    participant PW as Polling Worker
    participant FS as Local Storage
    participant SW as Sync Worker

    Note over A,DB: ── Job Assignment Phase ──
    A->>JC: POST /api/jobs/assign
    JC->>DB: Create JobAssign (status=NEW)
    JC->>PSS: checkPartnerAPIHealth(partnerCode)
    PSS->>SA: GET /IsAlive

    alt Partner API live — immediate upload
        SA-->>PSS: alive
        JC->>PSS: uploadJobToPartner(assignId)
        PSS->>SA: POST /UploadJobData
        SA-->>PSS: extJobId
        PSS->>DB: JobAssign status=UPLOADED, extJobId saved
        JC-->>A: Assignment complete
    else Partner API offline or upload fails
        JC->>Q: Queue UPLOAD_PARTNER_JOB {assignId}
        JC-->>A: Assignment complete (upload queued)
        Q->>SW: consume UPLOAD_PARTNER_JOB
        SW->>PSS: uploadJobToPartner(assignId)
        PSS->>SA: POST /UploadJobData
        SA-->>PSS: extJobId
        PSS->>DB: JobAssign status=UPLOADED, extJobId saved
    end

    Note over CR,DB: ── Polling Phase (cron, independent of above) ──
    CR->>PW: trigger poll
    PW->>DB: Query JobAssign WHERE status=UPLOADED
    DB-->>PW: assigned aircraft list

    loop per aircraft with UPLOADED assignment
        PW->>SA: GET /GetAircraftLogs (customerId, aircraftId)
        SA-->>PW: available log list
        PW->>DB: Filter already-processed (PartnerLogTracker processed=true)

        loop per new log file
            PW->>DB: Upsert PartnerLogTracker PENDING→DOWNLOADING
            PW->>SA: GET /GetAircraftLogData (customerId, logId)
            SA-->>PW: log binary
            PW->>FS: Save to SATLOC_STORAGE_PATH
            PW->>DB: PartnerLogTracker → DOWNLOADED
            PW->>Q: Queue PROCESS_PARTNER_LOG
        end
    end

    Note over SW,DB: ── Log Processing Phase ──
    Q->>SW: consume PROCESS_PARTNER_LOG
    SW->>DB: TaskTracker idempotency check (skip if already claimed)
    SW->>DB: PartnerLogTracker → PROCESSING (atomic)
    SW->>FS: Read saved log file
    SW->>SW: SatLocLogParser → parse binary records
    SW->>SW: SatLocApplicationProcessor → match to JobAssign
    SW->>DB: Save ApplicationFile + ApplicationDetail records
    SW->>DB: PartnerLogTracker → PROCESSED
    SW->>Q: ack message

Error Handling and Recovery Flow

sequenceDiagram
    participant S as Service
    participant DB as Database
    participant Q as Queue System
    participant M as Monitor
    participant A as Alert System
    
    S->>DB: Operation Attempt
    DB-->>S: Error Response
    S->>DB: Increment Retry Count
    S->>Q: Calculate Backoff Delay
    
    alt Retry Available
        Q->>S: Schedule Retry Task
        Note over S,Q: Exponential Backoff Delay
        Q->>S: Execute Retry
        S->>DB: Retry Operation
    else Max Retries Reached
        S->>DB: Mark as Failed
        S->>M: Report Failure
        M->>A: Send Alert
        M->>Q: Move to Dead Letter Queue
    end

Error Handling and Retry Logic

Retry Strategy Configuration

const RetryConfig = {
  jobUpload: {
    maxAttempts: 5,
    baseDelay: 5000,      // 5 seconds
    maxDelay: 300000,     // 5 minutes
    backoffMultiplier: 2,
    jitter: true
  },
  dataPolling: {
    maxAttempts: 3,
    baseDelay: 10000,     // 10 seconds
    maxDelay: 600000,     // 10 minutes
    backoffMultiplier: 2,
    jitter: false
  },
  dataProcessing: {
    maxAttempts: 3,
    baseDelay: 2000,      // 2 seconds
    maxDelay: 60000,      // 1 minute
    backoffMultiplier: 2,
    jitter: true
  }
};

Error Categories and Handling

Error Type Retry Strategy Alert Level Action
Network Timeout Exponential Backoff Warning Auto-retry
Authentication Error Fixed Delay High Manual intervention
Rate Limit Fixed Delay Low Auto-retry
Data Format Error No Retry High Log and skip
Partner API Down Long Backoff Critical Alert ops team

Monitoring and Observability

Key Metrics to Track

  1. Assignment Metrics

    • Assignment success rate by partner
    • Time to sync job to partner
    • External job ID generation rate
  2. Sync Metrics

    • Sync queue depth
    • Sync success/failure rates
    • Average sync duration by partner
  3. Data Processing Metrics

    • Data polling frequency
    • Data conversion success rate
    • Processing latency by data format
  4. Error Metrics

    • Error rate by operation type
    • Retry exhaustion rate
    • Dead letter queue depth

Monitoring Dashboard Structure

graph TB
    subgraph "Operations Dashboard"
        A[Partner Health Status]
        B[Active Jobs by Partner]
        C[Sync Queue Metrics]
    end
    
    subgraph "Performance Dashboard"
        D[Assignment Success Rates]
        E[Data Processing Latency]
        F[Error Rates by Partner]
    end
    
    subgraph "Error Dashboard"
        G[Failed Operations]
        H[Retry Statistics]
        I[Dead Letter Queue]
    end
    
    style A fill:#d4edda
    style G fill:#f8d7da
    style C fill:#fff3cd

Deployment Architecture

Production Environment

graph TB
    subgraph "Load Balancer"
        LB[Nginx/ALB]
    end
    
    subgraph "Application Tier"
        A1[App Server 1]
        A2[App Server 2]
        A3[App Server N]
    end
    
    subgraph "Queue Infrastructure"
        R1[Redis Cluster]
        R2[RabbitMQ Cluster]
    end
    
    subgraph "Background Workers"
        W1[Sync Worker Pool]
        W2[Processing Worker Pool]
        W3[Monitor Worker]
    end
    
    subgraph "Data Tier"
        M1[MongoDB Primary]
        M2[MongoDB Secondary]
        M3[MongoDB Arbiter]
    end
    
    subgraph "External Services"
        S1[Satloc API]
        S2[Partner 2 API]
        S3[Partner N API]
    end
    
    LB --> A1
    LB --> A2
    LB --> A3
    
    A1 --> R1
    A1 --> R2
    A1 --> M1
    
    W1 --> R1
    W1 --> S1
    W2 --> M1
    W3 --> R1
    
    style LB fill:#e3f2fd
    style W1 fill:#f3e5f5
    style M1 fill:#e8f5e8

Scalability Considerations

  1. Horizontal Scaling

    • Stateless application servers
    • Worker pool scaling based on queue depth
    • Database read replicas for reporting
  2. Partner Isolation

    • Separate queues per partner type
    • Circuit breaker pattern for partner APIs
    • Independent retry policies
  3. Data Partitioning

    • Partition by partner type
    • Time-based partitioning for historical data
    • Separate indexes for partner queries

Implementation Roadmap

Phase 1: Foundation (Weeks 1-2)

  • Implement Partner Service interface
  • Create Partner Registry
  • Enhance JobAssign schema
  • Basic Satloc service implementation

Phase 2: Queue System (Weeks 3-4)

  • Implement Partner Sync Queue
  • Add retry logic with exponential backoff
  • Create monitoring dashboard
  • Error handling and dead letter queues

Phase 3: Data Processing (Weeks 5-6)

  • Enhanced Application schema
  • Partner data conversion pipeline
  • Performance optimizations
  • Comprehensive testing

Phase 4: Production (Weeks 7-8)

  • Production deployment
  • Monitoring and alerting setup
  • Documentation and training
  • Performance tuning

This architecture provides a robust, scalable foundation for multi-partner integration while maintaining backward compatibility with existing systems.