agmission/Development/server/docs/PARTNER_INTEGRATION_ARCHITECTURE.md

1052 lines
34 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Partner Integration Architecture Documentation
## Recent Updates Summary (August 2025)
### Binary Processing Architecture Enhancement
- **SatLocBinaryProcessor**: New wrapper around proven `SatLocLogParser`
- 100% parsing success rate (21,601/21,601 records)
- Enhanced application statistics calculation
- Memory-efficient processing delegation
- Comprehensive spray/environmental metrics
- **File Download Integration**: Enhanced polling worker functionality
- Downloads and stores partner files locally before processing
- Separates file acquisition from file processing
- Improved reliability and error handling
- Local file path tracking in `PartnerLogTracker`
### Worker Responsibilities Update
- **Partner Sync Worker**:
- Consumes `UPLOAD_PARTNER_JOB` and `PROCESS_PARTNER_LOG` tasks from `partner_tasks` queue
- `UPLOAD_PARTNER_JOB`: calls `partnerSyncService.uploadJobToPartner()` → stores `extJobId` on `JobAssign`, sets `status = UPLOADED`
- `PROCESS_PARTNER_LOG`: claims `PartnerLogTracker` atomically, runs `SatLocLogParser` + `SatLocApplicationProcessor` → creates `ApplicationDetail` records
- Circuit breaker prevents repeated failures on problematic files
- Uses individual partner system user credentials (no global environment variables)
- **Partner Data Polling Worker**:
- Cron-driven (every 15 min prod / 1 min dev); does not consume queue tasks
- Queries `JobAssign` where `status = UPLOADED` to find jobs successfully sent to partner
- Groups by `partnerCode` + `customerId`; calls `partnerService.getAircraftLogs(customerId, aircraftId)`
- Downloads new log files via `partnerService.getAircraftLogData(customerId, logId)` → stored to `SATLOC_STORAGE_PATH`
- Tracks per-log state in `PartnerLogTracker` (PENDING → DOWNLOADING → DOWNLOADED)
- Enqueues `PROCESS_PARTNER_LOG` tasks to `partner_tasks` queue
- Periodic cleanup of stuck DOWNLOADING / DOWNLOADED / PROCESSING tracker records
- **Job Worker**:
- Handles only internal data submitted by internal systems/clients
- Removed partner task processing (moved to dedicated partner sync worker)
- Focuses on traditional job processing workflows
### Queue Architecture Update
- **Dedicated Partner Queue**: `partner_tasks` (production) / `dev_partner_tasks` (development)
- **DLQ**: `partner_tasks_failed` — dead-letter queue; managed via global `/api/dlq/:queueName/*` endpoints (Step 8)
- **Job Upload Triggers**: After successful job upload to partner, polling worker discovers and processes returned log files
- **Task Types**: `upload_partner_job`, `process_partner_log`, `process_partner_data_file` (see `PartnerTasks` in `helpers/constants.js`)
- **Polling**: cron in `partner_data_polling_worker.js` — every 15 min (prod) / 1 min (dev); not queue-driven
### Model Changes
- **Job Assignment Model** (`model/job_assign.js`): Fields relevant to partner integration:
- `extJobId`: External job ID returned by partner after successful upload
- `notes`: Optional free-text notes
- `status`: `AssignStatus` enum — `NEW=0`, `DOWNLOADED=1`, `UPLOADED=2`
- `partnerAircraftId` is on the assigned **User**'s `partnerInfo.partnerAircraftId` (not on `JobAssign` directly)
### API Endpoint Updates
- **Partner Management APIs**: Updated to use RESTful conventions with consistent `:id` parameters
- `GET /api/partners` (list partners)
- `GET /api/partners/:id` (get partner details)
- `POST /api/partners` (create partner)
- `PUT /api/partners/:id` (update partner)
- `DELETE /api/partners/:id` (soft delete partner)
- **Partner System User APIs**: Full RESTful CRUD operations
- `GET /api/partners/systemUsers` (list all system users)
- `POST /api/partners/systemUsers` (create system user)
- `POST /api/partners/systemUsers/testAuth` (test partner credentials)
- `GET /api/partners/systemUsers/:id` (get system user details)
- `PUT /api/partners/systemUsers/:id` (update system user)
- `DELETE /api/partners/systemUsers/:id` (soft delete system user)
Single-record operations also work through the standard user endpoints:
- `GET /api/users/:id` (auto-populates `partner` + `parent` refs for `PARTNER_SYSTEM_USER` kind)
- `PUT /api/users/:id` (update via generic user handler)
- `DELETE /api/users/:id` (soft delete — `active: false` — for `PARTNER_SYSTEM_USER` kind)
- **Other Partner Routes**:
- `GET /api/partners/customers` (list customers for a partner, with subscription info)
- `GET /api/partners/aircraft` (get aircraft list from partner API)
- `POST /api/partners/uploadJob` (manually trigger job upload to partner)
- `POST /api/partners/syncData` (trigger partner data sync)
- **Global DLQ APIs** (all queues including `partner_tasks`):
- `GET /api/dlq/:queueName/list`
- `POST /api/dlq/:queueName/retryAll`
- `POST /api/dlq/:queueName/retryByPosition`
- `POST /api/dlq/:queueName/retryByHeader`
- `POST /api/dlq/:queueName/purge`
### SatLoc Integration Updates
- **API Endpoints**: Updated to match actual SatLoc Cloud API structure
- `GET /api/Satloc/IsAlive` - Health check
- `GET /api/Satloc/AuthenticateAPIUser` - Authentication
- `GET /api/Satloc/GetAircraftList` - Get aircraft list
- `GET /api/Satloc/GetAircraftLogs` - Get aircraft logs
- `GET /api/Satloc/GetAircraftLogData` - Get log data
- `POST /api/Satloc/UploadJobData` - Upload job data
### Service Layer Updates
- **SatLoc Service**: Updated to match actual API endpoints and responses
- **Environment Configuration**: Added proper SatLoc configuration parameters
## Current Architecture Status (Feb 2026)
> This is the authoritative current-state summary. See individual sections below for diagrams and detail.
### Active Partners
| Code | Service File | Status |
|------|-------------|--------|
| `SATLOC` | `services/satloc_service.js` | ✅ Active |
| `AGIDRONEX` | *(not yet implemented)* | 🔲 Planned (config stub exists in `helpers/partner_config.js`) |
### Service Layer
- `services/base_partner_service.js` — abstract base; defines contract: `uploadJobDataToAircraft`, `healthCheck`, `getAircraftList`, `getAircraftLogs`, `getStoragePath`, `resolveLogFilePath`
- `helpers/partner_service_factory.js` — singleton factory with instance cache; used by all workers and `services/partner_sync_service.js`
- `services/partner_sync_service.js` — orchestrates upload/sync flows; used by `partner_sync_worker.js`
- `services/satloc_service.js` — full SatLoc implementation; Redis-backed auth cache, per-customer `PartnerSystemUser` credential lookup
### Queue System
| Queue | Env Var | Dev Name | Purpose |
|-------|---------|----------|---------|
| Partner tasks | `QUEUE_NAME_PARTNER` | `dev_partner_tasks` | All partner operations |
| DLQ | *(auto)* | `dev_partner_tasks_failed` | Failed messages |
**Task types** (`PartnerTasks` in `helpers/constants.js`):
- `UPLOAD_PARTNER_JOB` — upload job to partner aircraft
- `PROCESS_PARTNER_LOG` — process a downloaded log file
- `PROCESS_PARTNER_DATA_FILE` — process already-downloaded local file
> Note: Polling is cron-driven; there is no `POLL_PARTNER_DATA` queue task.
### Workers
| Worker | Responsibility |
|--------|---------------|
| `partner_data_polling_worker.js` | Cron polls partner APIs (15 min prod / 1 min dev), downloads log files, creates `PartnerLogTracker`, enqueues `PROCESS_PARTNER_LOG` |
| `partner_sync_worker.js` | Consumes `partner_tasks` queue; handles uploads and log processing |
| `job_worker.js` | Internal job processing only; no partner tasks |
### Tracking Models
- `model/partner_log_tracker.js` — per-log-file lifecycle: `PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED/FAILED/SKIPPED`
- `model/task_tracker.js` — per-queue-task lifecycle (added Phase 2, Jan 2026): `QUEUED → PROCESSING → COMPLETED/FAILED`; supports retry chain tracking, stuck-task detection
### Auth/Credential Flow
1. `PartnerSystemUser` record (kind: `PARTNER_SYSTEM_USER`) stores per-customer credentials
2. `satloc_service.getCachedAuth(customerId)` — checks Redis first; authenticates via SatLoc API on miss; caches JWT with TTL; auto-retries once on expired cache
### DLQ Recovery
All DLQ operations use queue-native global API (no MongoDB coupling):
```
POST /api/dlq/partner_tasks/retryAll
POST /api/dlq/partner_tasks/retryByPosition { "position": 0 }
POST /api/dlq/partner_tasks/retryByHeader { "headerName": "x-partner-code", "headerValue": "SATLOC" }
```
---
## Overview
This document provides comprehensive design diagrams and documentation for the multi-partner integration system in the AgMission platform. The system is designed to handle job assignments, data downloads, and processing for multiple partners including Satloc and future integrations.
## User Architecture Design
### Internal vs Partner System Users
The partner integration system employs a dual-user architecture that separates assignment concerns from API credential management:
1. **Internal Users**: Regular AgMission users who receive job assignments
- Used for all job assignments in the `JobAssign` collection
- Maintain normal user permissions and access patterns
- Assignment API accepts only internal user IDs in the `uid` field
2. **Partner System Users**: Credential-only records for external API access
- Store partner-specific authentication credentials (API keys, tokens, etc.)
- Used exclusively by workers for communicating with partner external APIs
- Never used directly for job assignments
- Managed through `/api/partners/systemUsers` (collection) or `/api/users/:id` (single record)
- `GET /api/users/:id` populates `partner` + `parent` refs; `DELETE /api/users/:id` performs soft delete
### Assignment Flow
```mermaid
graph LR
A[Job Assignment Request] --> B[Internal User ID]
B --> C[JobAssign Record]
C --> D[Partner Detection]
D --> E[Worker Queue]
E --> F[Partner System User Credentials]
F --> G[External Partner API]
```
**Key Principle**: Job assignments always use internal user IDs, while partner system users provide the credentials needed for external API communication.
## Table of Contents
1. [User Architecture Design](#user-architecture-design)
2. [System Architecture Overview](#system-architecture-overview)
3. [Current State Analysis](#current-state-analysis)
4. [Enhanced Architecture Design](#enhanced-architecture-design)
5. [Data Flow Diagrams](#data-flow-diagrams)
6. [Database Schema](#database-schema)
7. [API Design](#api-design)
8. [Sequence Diagrams](#sequence-diagrams)
9. [Error Handling and Retry Logic](#error-handling-and-retry-logic)
10. [Monitoring and Observability](#monitoring-and-observability)
11. [Deployment Architecture](#deployment-architecture)
## System Architecture Overview
### Current Architecture
> Reflects actual implementation (Feb 2026).
```mermaid
graph TB
subgraph "Job Assignment (controllers/job.js)"
A["POST /api/jobs/assign"] --> B["Create JobAssign<br/>status = NEW"]
B --> C{Partner aircraft?}
C -->|Yes| D[checkPartnerAPIHealth]
D -->|API live| E["uploadJobToPartner<br/>(immediate, in transaction)"]
D -->|API offline or fails| F["Queue UPLOAD_PARTNER_JOB<br/>→ partner_tasks"]
E --> G["JobAssign status = UPLOADED<br/>extJobId stored"]
F --> PQ[(partner_tasks queue)]
PQ --> SW
SW[partner_sync_worker] -->|"UPLOAD_PARTNER_JOB"| E
C -->|No| J["JobAssign status = NEW<br/>(internal only)"]
end
subgraph "Data Polling (partner_data_polling_worker — cron)"
CRON["Cron<br/>15 min prod / 1 min dev"] --> PA
PA["Query JobAssign<br/>status = UPLOADED"] --> PG
PG["Group by partnerCode + customerId"] --> AL
AL["getAircraftLogs<br/>(per aircraft)"] --> DL
DL["Download log file<br/>getAircraftLogData<br/>→ SATLOC_STORAGE_PATH"] --> TRK
TRK["PartnerLogTracker<br/>PENDING → DOWNLOADING → DOWNLOADED"] --> EQ
EQ["Queue PROCESS_PARTNER_LOG<br/>→ partner_tasks"]
end
subgraph "Log Processing (partner_sync_worker)"
EQ --> SW2[partner_sync_worker]
SW2 -->|"PROCESS_PARTNER_LOG<br/>(claim tracker atomically)"| PARSE
PARSE["SatLocLogParser<br/>+ SatLocApplicationProcessor"] --> OUT
OUT["ApplicationFile +<br/>ApplicationDetail records"]
PARSE --> DONE["PartnerLogTracker → PROCESSED"]
PARSE -->|on error| DLQ["DLQ: partner_tasks_failed"]
end
subgraph "SatLoc Cloud API"
SLAPI["POST /UploadJobData<br/>GET /GetAircraftLogs<br/>GET /GetAircraftLogData"]
end
E --> SLAPI
AL --> SLAPI
DL --> SLAPI
style A fill:#e1f5fe
style CRON fill:#fff9c4
style DONE fill:#d4edda
style DLQ fill:#f8d7da
```
### Multi-Partner Architecture (Actual Implementation)
```mermaid
graph TB
subgraph "Partner Service Layer"
PSF["helpers/partner_service_factory.js<br/>(singleton, instance cache)"]
BASE["services/base_partner_service.js<br/>(abstract base)"]
SLSVC["services/satloc_service.js<br/>(SatLoc impl, Redis auth cache)"]
PSS["services/partner_sync_service.js<br/>(orchestrates upload/sync)"]
PSF --> SLSVC
SLSVC --> BASE
PSS --> PSF
end
subgraph "Workers"
PW["partner_data_polling_worker<br/>(cron-driven)"]
SW["partner_sync_worker<br/>(queue consumer)"]
PW --> PSF
SW --> PSS
end
subgraph "Queue"
Q[("partner_tasks queue<br/>dev_partner_tasks in dev")]
DLQ[("partner_tasks_failed DLQ")]
end
subgraph "External APIs"
SLAPI["SatLoc Cloud API<br/>https://satloccloudfc.com/api/Satloc"]
FUTURE["Future Partners<br/>(AGIDRONEX, etc)"]
end
subgraph "Data"
DB[(MongoDB)]
FS["Local File Storage<br/>SATLOC_STORAGE_PATH"]
REDIS[(Redis auth cache)]
end
PW -->|"1. getAircraftLogs"| SLAPI
PW -->|"2. getAircraftLogData"| SLAPI
PW -->|"3. save file"| FS
PW -->|"4. Queue PROCESS_PARTNER_LOG"| Q
Q -->|consume| SW
SW -->|"UPLOAD_PARTNER_JOB: POST /UploadJobData"| SLAPI
SW -->|"PROCESS_PARTNER_LOG: read file"| FS
SW -->|"write ApplicationDetail"| DB
SW -->|on error| DLQ
SLSVC <-->|JWT cache| REDIS
style PSF fill:#e8f5e8
style Q fill:#fff9c4
style DLQ fill:#f8d7da
style SLAPI fill:#e3f2fd
```
---
> ⚠️ **The sections below (Current State Analysis through Roadmap) are the original design specification from the initial architecture phase, JulAug 2025.** They describe intended design patterns and may not match the current implementation. For current state see the [Current Architecture Status](#current-architecture-status-feb-2026) section above.
### Original Design: Enhanced Multi-Partner Architecture
```mermaid
graph TB
subgraph "Core Platform"
A[Job Management] --> B[Partner Registry]
B --> C[Partner Service Factory]
C --> D[Partner Adapters]
E[Enhanced JobAssign] --> F[Sync Queue System]
F --> G[Data Polling Service]
G --> H[Data Processing Pipeline]
end
subgraph "Partner Services"
D --> I[Satloc Service]
D --> J[Partner 2 Service]
D --> K[Future Partner Services]
end
subgraph "External APIs"
I --> L[Satloc API]
J --> M[Partner 2 API]
K --> N[Partner N API]
end
subgraph "Data Storage"
H --> O[Enhanced Application Schema]
H --> P[Partner Metadata Store]
E --> Q[Extended JobAssign Schema]
end
style A fill:#e8f5e8
style F fill:#fff3e0
style O fill:#f3e5f5
```
## Current State Analysis
> *Original pre-implementation analysis — kept for historical context.*
### Existing Components
#### 1. Job Assignment Flow
- **Controller**: `controllers/job.js` - `assign_post()` function
- **Model**: `model/job_assign.js` - Simple assignment tracking
- **Process**: Store assignments in `job_assign` collection → Clients poll for assignments
#### 2. Job Download Flow
- **Controller**: `controllers/export.js` - `newJobs_post()`, `downloadJob_post()`
- **Process**: Clients poll → Download job data → Update assignment status
#### 3. Data Processing Flow
- **Collections**: `application`, `application_file`, `application_detail`
- **Queue**: RabbitMQ coordination
- **Worker**: `job_worker.js` handles data processing
### Current Limitations
1. **Single Integration Pattern**: Designed for internal workflow only
2. **No Partner Abstraction**: Direct API calls without abstraction layer
3. **Limited Retry Logic**: Basic error handling without sophisticated retry
4. **No State Tracking**: Minimal sync state management
5. **Polling Only**: No push notification capabilities
## Enhanced Architecture Design
> *Original core design components (design spec).*
### Core Components
#### 1. Partner Service Interface
```typescript
interface PartnerService {
getPartnerCode(): string;
assignJob(job: Job, aircraft: Aircraft): Promise<PartnerJobResult>;
downloadFlightData(aircraftId: string, jobId: string): Promise<FlightDataFile[]>;
uploadJobDefinition(job: Job): Promise<string>;
processMissionData(data: any): Promise<MissionResult>;
syncJobStatus(externalJobId: string): Promise<JobStatus>;
convertToInternalFormat(data: any, format: string): Promise<InternalData[]>;
}
```
#### 2. Partner Registry
```typescript
class PartnerRegistry {
private partners: Map<string, PartnerService> = new Map();
register(partnerCode: string, service: PartnerService): void;
get(partnerCode: string): PartnerService | undefined;
getAll(): PartnerService[];
isPartnerSupported(partnerCode: string): boolean;
}
```
#### 3. Enhanced Data Models
```typescript
interface EnhancedJobAssign {
_id: ObjectId;
user: ObjectId;
job: ObjectId;
status: AssignStatus;
date: Date;
// Partner integration fields
partnerCode: 'internal' | 'satloc' | string;
externalJobId?: string;
partnerMetadata?: any;
// Sync state tracking
syncState: {
jobUpload: SyncStatus;
dataPolling: SyncStatus;
};
// Retry configuration
retryConfig: RetryConfig;
}
interface SyncStatus {
status: 'pending' | 'syncing' | 'synced' | 'failed';
attempts: number;
lastAttempt?: Date;
lastSuccess?: Date;
error?: string;
}
```
## Data Flow Diagrams
### 1. Job Assignment Flow with Partners
> Shows actual implementation flow in `controllers/job.js`.
```mermaid
sequenceDiagram
participant A as User (FE/API Client)
participant JC as Job Controller
participant PSS as partnerSyncService
participant SA as SatLoc API
participant Q as partner_tasks queue
participant DB as Database
A->>JC: POST /api/jobs/assign
JC->>DB: Validate job + users
DB-->>JC: Job + assignment data
JC->>DB: Create JobAssign (status=NEW)
Note over JC,PSS: Partner integration detected from user context
JC->>PSS: checkPartnerAPIHealth(partnerCode)
alt API is live
PSS->>SA: GET /IsAlive
SA-->>PSS: alive
JC->>PSS: uploadJobToPartner(assignId)
PSS->>SA: POST /UploadJobData
SA-->>PSS: extJobId
PSS->>DB: JobAssign status=UPLOADED, extJobId stored
JC-->>A: Assignment complete
else API offline or upload fails
JC->>Q: Queue UPLOAD_PARTNER_JOB {assignId}
JC-->>A: Assignment complete (upload queued)
Note over Q,DB: partner_sync_worker processes later
Q->>PSS: UPLOAD_PARTNER_JOB
PSS->>SA: POST /UploadJobData
SA-->>PSS: extJobId
PSS->>DB: JobAssign status=UPLOADED, extJobId stored
end
```
### 2. Data Polling and Processing Flow
> Shows actual cron-driven polling in `partner_data_polling_worker.js`.
```mermaid
sequenceDiagram
participant CR as Cron (15min/1min)
participant PW as partner_data_polling_worker
participant DB as Database
participant PSS as partnerSyncService
participant SA as SatLoc API
participant FS as Local Storage
participant Q as partner_tasks queue
participant SW as partner_sync_worker
CR->>PW: trigger pollAllPartnerSystems()
PW->>DB: Query JobAssign WHERE status=UPLOADED
DB-->>PW: Uploaded assignments (grouped by partnerCode+customerId)
loop per aircraft
PW->>SA: GET /GetAircraftLogs (customerId, aircraftId)
SA-->>PW: log list
PW->>DB: Filter out already-processed logs (PartnerLogTracker processed=true)
loop per new log
PW->>DB: Upsert PartnerLogTracker (PENDING → DOWNLOADING)
PW->>SA: GET /GetAircraftLogData (customerId, logId)
SA-->>PW: log file binary
PW->>FS: Save to SATLOC_STORAGE_PATH
PW->>DB: PartnerLogTracker → DOWNLOADED
PW->>Q: Queue PROCESS_PARTNER_LOG {logId, logFileName, customerId, aircraftId, taskId, executionId}
PW->>DB: PartnerLogTracker.enqueuedAt set
end
end
Q->>SW: PROCESS_PARTNER_LOG
SW->>DB: TaskTracker idempotency check (claim or skip)
SW->>DB: PartnerLogTracker → PROCESSING (atomic claim)
SW->>FS: Read log file
SW->>SW: SatLocLogParser + SatLocApplicationProcessor
SW->>DB: Save ApplicationFile + ApplicationDetail records
SW->>DB: PartnerLogTracker → PROCESSED
```
### 3. Error Handling and Retry Flow
```mermaid
flowchart TD
A[Operation Start] --> B{Operation Success?}
B -->|Yes| C[Update Success State]
B -->|No| D{Max Retries Reached?}
D -->|No| E[Calculate Backoff Delay]
E --> F[Increment Retry Count]
F --> G[Schedule Retry]
G --> H[Wait for Delay]
H --> A
D -->|Yes| I[Mark as Failed]
I --> J[Send Alert]
J --> K[Dead Letter Queue]
C --> L[Continue Normal Flow]
style C fill:#d4edda
style I fill:#f8d7da
style K fill:#fff3cd
```
## Database Schema
### Enhanced JobAssign Schema
```javascript
const JobAssignSchema = new Schema({
_id: ObjectId,
user: { type: ObjectId, ref: 'User', required: true },
job: { type: Number, ref: 'Job', required: true },
status: {
type: Number,
enum: [0, 1, 2], // 0: pending, 1: downloaded, 2: completed
default: 0
},
date: { type: Date, default: Date.now },
// Partner integration fields
partnerCode: {
type: String,
enum: ['internal', 'satloc', 'other'],
default: 'internal'
},
externalJobId: { type: String, sparse: true },
partnerMetadata: { type: Schema.Types.Mixed },
// Sync state tracking
syncState: {
jobUpload: {
status: {
type: String,
enum: ['pending', 'syncing', 'synced', 'failed'],
default: 'pending'
},
attempts: { type: Number, default: 0 },
lastAttempt: Date,
lastSuccess: Date,
error: String
},
dataPolling: {
status: {
type: String,
enum: ['idle', 'polling', 'synced', 'failed'],
default: 'idle'
},
attempts: { type: Number, default: 0 },
lastAttempt: Date,
lastSuccess: Date,
lastDataCheck: Date,
error: String
}
},
// Retry configuration
retryConfig: {
maxAttempts: { type: Number, default: 5 },
backoffMultiplier: { type: Number, default: 2 },
baseDelay: { type: Number, default: 5000 }
}
});
// Indexes for performance
JobAssignSchema.index({ user: 1, status: 1 });
JobAssignSchema.index({ partnerCode: 1, 'syncState.jobUpload.status': 1 });
JobAssignSchema.index({ partnerCode: 1, 'syncState.dataPolling.status': 1 });
```
### Enhanced Application Schema
```javascript
const EnhancedApplicationSchema = new Schema({
// Existing fields
jobId: { type: Number, required: true },
fileName: { type: String, required: true },
fileSize: { type: Number, required: true },
status: { type: Number, default: 1 },
// Partner-specific fields
partnerCode: { type: String, default: 'internal' },
externalJobId: { type: String, sparse: true },
partnerMetadata: { type: Schema.Types.Mixed },
dataFormat: {
type: String,
enum: ['internal', 'satloc', 'other'],
default: 'internal'
},
// Processing tracking
processingStage: {
type: String,
enum: ['uploaded', 'parsing', 'processing', 'completed', 'failed'],
default: 'uploaded'
},
processingStarted: Date,
processingCompleted: Date,
processingErrors: [String],
// Performance metrics
processingDuration: Number, // in milliseconds
recordsProcessed: Number,
conversionMetrics: {
originalFormat: String,
conversionTime: Number,
dataLoss: Number // percentage
}
});
```
## API Design
### Enhanced Job Assignment API
```javascript
// POST /api/jobs/:jobId/assign
{
"dlOp": { "type": 1 },
"asUsers": [
{
"uid": "internal_user_id", // Always use internal user IDs for assignments
"partnerCode": "satloc", // Optional, defaults to 'internal'
"partnerConfig": { // Partner-specific configuration
"aircraftId": "AC001",
"priority": "high"
}
}
],
"avUsers": [...]
}
// Response
{
"ok": true,
"assignments": [
{
"userId": "internal_user_id", // Assignment uses internal user ID
"partnerCode": "satloc",
"externalJobId": "satloc_job_123",
"syncStatus": "synced"
}
],
"errors": []
}
```
### Partner Management API
```javascript
// GET /api/partners
[
{
"_id": "partner_id",
"name": "Satloc",
"code": "satloc",
"active": true,
"capabilities": ["job_upload", "data_download", "real_time_sync"],
"apiVersion": "v2.1"
}
]
// GET /api/partners/:partnerId/status
{
"partnerId": "satloc",
"status": "online",
"lastSync": "2025-07-18T10:30:00Z",
"activeJobs": 15,
"pendingSyncs": 2,
"errors": []
}
```
### Enhanced Job Download API
```javascript
// GET /api/export/newJobs
{
"internal": [
{
"job": { "_id": 123, "name": "Field A" },
"date": "2025-07-18T09:00:00Z",
"assignmentId": "assign_123"
}
],
"partners": {
"satloc": [
{
"job": { "_id": 124, "name": "Field B" },
"date": "2025-07-18T09:15:00Z",
"assignmentId": "assign_124",
"externalJobId": "satloc_job_456",
"syncStatus": "synced"
}
]
}
}
// POST /api/export/downloadJob
{
"jobId": 124,
"partnerType": "satloc",
"format": "native" // or "converted"
}
```
## Sequence Diagrams
### Complete Job Assignment to Data Processing Flow
> Accurate end-to-end sequence reflecting actual code (Feb 2026).
```mermaid
sequenceDiagram
participant A as Admin
participant JC as Job Controller
participant PSS as partnerSyncService
participant SA as SatLoc API
participant Q as partner_tasks queue
participant DB as Database
participant CR as Cron (15min/1min)
participant PW as Polling Worker
participant FS as Local Storage
participant SW as Sync Worker
Note over A,DB: ── Job Assignment Phase ──
A->>JC: POST /api/jobs/assign
JC->>DB: Create JobAssign (status=NEW)
JC->>PSS: checkPartnerAPIHealth(partnerCode)
PSS->>SA: GET /IsAlive
alt Partner API live — immediate upload
SA-->>PSS: alive
JC->>PSS: uploadJobToPartner(assignId)
PSS->>SA: POST /UploadJobData
SA-->>PSS: extJobId
PSS->>DB: JobAssign status=UPLOADED, extJobId saved
JC-->>A: Assignment complete
else Partner API offline or upload fails
JC->>Q: Queue UPLOAD_PARTNER_JOB {assignId}
JC-->>A: Assignment complete (upload queued)
Q->>SW: consume UPLOAD_PARTNER_JOB
SW->>PSS: uploadJobToPartner(assignId)
PSS->>SA: POST /UploadJobData
SA-->>PSS: extJobId
PSS->>DB: JobAssign status=UPLOADED, extJobId saved
end
Note over CR,DB: ── Polling Phase (cron, independent of above) ──
CR->>PW: trigger poll
PW->>DB: Query JobAssign WHERE status=UPLOADED
DB-->>PW: assigned aircraft list
loop per aircraft with UPLOADED assignment
PW->>SA: GET /GetAircraftLogs (customerId, aircraftId)
SA-->>PW: available log list
PW->>DB: Filter already-processed (PartnerLogTracker processed=true)
loop per new log file
PW->>DB: Upsert PartnerLogTracker PENDING→DOWNLOADING
PW->>SA: GET /GetAircraftLogData (customerId, logId)
SA-->>PW: log binary
PW->>FS: Save to SATLOC_STORAGE_PATH
PW->>DB: PartnerLogTracker → DOWNLOADED
PW->>Q: Queue PROCESS_PARTNER_LOG
end
end
Note over SW,DB: ── Log Processing Phase ──
Q->>SW: consume PROCESS_PARTNER_LOG
SW->>DB: TaskTracker idempotency check (skip if already claimed)
SW->>DB: PartnerLogTracker → PROCESSING (atomic)
SW->>FS: Read saved log file
SW->>SW: SatLocLogParser → parse binary records
SW->>SW: SatLocApplicationProcessor → match to JobAssign
SW->>DB: Save ApplicationFile + ApplicationDetail records
SW->>DB: PartnerLogTracker → PROCESSED
SW->>Q: ack message
```
### Error Handling and Recovery Flow
```mermaid
sequenceDiagram
participant S as Service
participant DB as Database
participant Q as Queue System
participant M as Monitor
participant A as Alert System
S->>DB: Operation Attempt
DB-->>S: Error Response
S->>DB: Increment Retry Count
S->>Q: Calculate Backoff Delay
alt Retry Available
Q->>S: Schedule Retry Task
Note over S,Q: Exponential Backoff Delay
Q->>S: Execute Retry
S->>DB: Retry Operation
else Max Retries Reached
S->>DB: Mark as Failed
S->>M: Report Failure
M->>A: Send Alert
M->>Q: Move to Dead Letter Queue
end
```
## Error Handling and Retry Logic
### Retry Strategy Configuration
```javascript
const RetryConfig = {
jobUpload: {
maxAttempts: 5,
baseDelay: 5000, // 5 seconds
maxDelay: 300000, // 5 minutes
backoffMultiplier: 2,
jitter: true
},
dataPolling: {
maxAttempts: 3,
baseDelay: 10000, // 10 seconds
maxDelay: 600000, // 10 minutes
backoffMultiplier: 2,
jitter: false
},
dataProcessing: {
maxAttempts: 3,
baseDelay: 2000, // 2 seconds
maxDelay: 60000, // 1 minute
backoffMultiplier: 2,
jitter: true
}
};
```
### Error Categories and Handling
| Error Type | Retry Strategy | Alert Level | Action |
|------------|----------------|-------------|---------|
| Network Timeout | Exponential Backoff | Warning | Auto-retry |
| Authentication Error | Fixed Delay | High | Manual intervention |
| Rate Limit | Fixed Delay | Low | Auto-retry |
| Data Format Error | No Retry | High | Log and skip |
| Partner API Down | Long Backoff | Critical | Alert ops team |
## Monitoring and Observability
### Key Metrics to Track
1. **Assignment Metrics**
- Assignment success rate by partner
- Time to sync job to partner
- External job ID generation rate
2. **Sync Metrics**
- Sync queue depth
- Sync success/failure rates
- Average sync duration by partner
3. **Data Processing Metrics**
- Data polling frequency
- Data conversion success rate
- Processing latency by data format
4. **Error Metrics**
- Error rate by operation type
- Retry exhaustion rate
- Dead letter queue depth
### Monitoring Dashboard Structure
```mermaid
graph TB
subgraph "Operations Dashboard"
A[Partner Health Status]
B[Active Jobs by Partner]
C[Sync Queue Metrics]
end
subgraph "Performance Dashboard"
D[Assignment Success Rates]
E[Data Processing Latency]
F[Error Rates by Partner]
end
subgraph "Error Dashboard"
G[Failed Operations]
H[Retry Statistics]
I[Dead Letter Queue]
end
style A fill:#d4edda
style G fill:#f8d7da
style C fill:#fff3cd
```
## Deployment Architecture
### Production Environment
```mermaid
graph TB
subgraph "Load Balancer"
LB[Nginx/ALB]
end
subgraph "Application Tier"
A1[App Server 1]
A2[App Server 2]
A3[App Server N]
end
subgraph "Queue Infrastructure"
R1[Redis Cluster]
R2[RabbitMQ Cluster]
end
subgraph "Background Workers"
W1[Sync Worker Pool]
W2[Processing Worker Pool]
W3[Monitor Worker]
end
subgraph "Data Tier"
M1[MongoDB Primary]
M2[MongoDB Secondary]
M3[MongoDB Arbiter]
end
subgraph "External Services"
S1[Satloc API]
S2[Partner 2 API]
S3[Partner N API]
end
LB --> A1
LB --> A2
LB --> A3
A1 --> R1
A1 --> R2
A1 --> M1
W1 --> R1
W1 --> S1
W2 --> M1
W3 --> R1
style LB fill:#e3f2fd
style W1 fill:#f3e5f5
style M1 fill:#e8f5e8
```
### Scalability Considerations
1. **Horizontal Scaling**
- Stateless application servers
- Worker pool scaling based on queue depth
- Database read replicas for reporting
2. **Partner Isolation**
- Separate queues per partner type
- Circuit breaker pattern for partner APIs
- Independent retry policies
3. **Data Partitioning**
- Partition by partner type
- Time-based partitioning for historical data
- Separate indexes for partner queries
## Implementation Roadmap
### Phase 1: Foundation (Weeks 1-2)
- [ ] Implement Partner Service interface
- [ ] Create Partner Registry
- [ ] Enhance JobAssign schema
- [ ] Basic Satloc service implementation
### Phase 2: Queue System (Weeks 3-4)
- [ ] Implement Partner Sync Queue
- [ ] Add retry logic with exponential backoff
- [ ] Create monitoring dashboard
- [ ] Error handling and dead letter queues
### Phase 3: Data Processing (Weeks 5-6)
- [ ] Enhanced Application schema
- [ ] Partner data conversion pipeline
- [ ] Performance optimizations
- [ ] Comprehensive testing
### Phase 4: Production (Weeks 7-8)
- [ ] Production deployment
- [ ] Monitoring and alerting setup
- [ ] Documentation and training
- [ ] Performance tuning
This architecture provides a robust, scalable foundation for multi-partner integration while maintaining backward compatibility with existing systems.