326 lines
11 KiB
Markdown
326 lines
11 KiB
Markdown
# Partner Integration System - Implementation Summary
|
|
|
|
## Overview
|
|
|
|
This document outlines the comprehensive partner integration system implemented for SatLoc and other partner aircraft systems. The system provides immediate job upload capabilities, automated data polling, and robust data synchronization.
|
|
|
|
## Architecture Components
|
|
|
|
### 1. Dedicated Partner Queue System
|
|
|
|
**Queue Configuration:**
|
|
- **Queue Name:** `partner_tasks` (production) / `dev_partner_tasks` (development)
|
|
- **DLQ:** `partner_tasks_failed` — managed via global `/api/dlq/:queueName/*` endpoints
|
|
- **Purpose:** Separate partner operations from internal job processing (`dev_jobs`)
|
|
- **Worker:** `partner_sync_worker.js` consumes all partner-related tasks
|
|
|
|
**Task Types** (from `PartnerTasks` in `helpers/constants.js`):
|
|
- `upload_partner_job` — Upload job assignments to partner aircraft
|
|
- `process_partner_log` — Process a downloaded partner log file
|
|
- `process_partner_data_file` — Process a locally-stored partner data file
|
|
|
|
> Note: Data polling is done by cron inside `partner_data_polling_worker.js` (every 15 min prod / 1 min dev), not via a queue task.
|
|
|
|
**Queue Improvements (Recent):**
|
|
- **Channel Management:** Proper channel reset on errors/close
|
|
- **Offline Queue:** Tasks are queued when connection is unavailable
|
|
- **Error Propagation:** Proper error handling with callbacks
|
|
- **Async Support:** Added `addTaskASync()` for promise-based usage
|
|
- **Enum Constants:** All task types use `PartnerTasks` enum for consistency
|
|
|
|
### 2. Enhanced Job Assignment (`controllers/job.js`)
|
|
|
|
**Design Architecture:**
|
|
- **Internal User IDs:** All job assignments use internal user IDs, never partner system user IDs
|
|
- **Partner Detection:** System detects partner integration requirements from assignment context
|
|
- **Credential Separation:** Partner system users provide API credentials for worker communication only
|
|
|
|
**New Features:**
|
|
- **Immediate Upload:** When assigning jobs to partner aircraft, system attempts immediate upload if API is available
|
|
- **Health Checking:** Validates partner API connectivity before upload attempts
|
|
- **Fallback Queuing:** Jobs are queued for later processing if immediate upload fails
|
|
- **Status Tracking:** New `AssignStatus.UPLOADED = 2` for tracking successfully uploaded jobs
|
|
|
|
**Key Functions:**
|
|
```javascript
|
|
processPartnerAssignment(assignment, jobData) // Main assignment processing (consolidated method)
|
|
checkPartnerAPIHealth(partnerCode, customerId) // API health validation
|
|
```
|
|
|
|
**Assignment Flow:**
|
|
1. Accept assignment request with internal user ID
|
|
2. Create JobAssign record with internal user ID
|
|
3. Detect partner integration requirements from user/customer context
|
|
4. Queue partner sync jobs using partner system user credentials
|
|
|
|
### 3. Partner Data Polling Worker (`workers/partner_data_polling_worker.js`)
|
|
|
|
**Functionality:**
|
|
- **Automated Discovery:** Periodically polls all partner systems for new aircraft and logs
|
|
- **Smart Filtering:** Only processes new logs that haven't been handled before
|
|
- **Task Queuing:** Creates processing tasks for discovered data
|
|
- **Configurable Intervals:** Adjustable polling frequency per partner system
|
|
|
|
**Polling Logic:**
|
|
- Groups partners by customer for efficient API calls
|
|
- Retrieves aircraft lists and available log data
|
|
- Filters new logs using `PartnerLogTracker` model
|
|
- Queues `process_partner_log` tasks for new data
|
|
|
|
### 4. Partner Log Tracking (`model/partner_log_tracker.js`)
|
|
|
|
**Purpose:** Prevents duplicate processing of partner log data
|
|
|
|
**Schema Features:**
|
|
- Unique indexing on partner, aircraft, and log combinations
|
|
- Processing status and retry count tracking
|
|
- Job matching and application file references
|
|
- Processing duration and error tracking
|
|
|
|
### 5. SatLoc API Integration (`services/partner_sync_service.js`)
|
|
|
|
**Real Implementation:**
|
|
- **Authentication:** Individual partner system user credentials per customer
|
|
- **Endpoints:** Full implementation of SatLoc Cloud API specification
|
|
- **Error Handling:** Comprehensive error handling with graceful fallbacks
|
|
- **Health Monitoring:** API connectivity monitoring for immediate upload decisions
|
|
|
|
**Key Endpoints:**
|
|
- `UploadJobData` - Upload job assignments to aircraft
|
|
- `GetAircraftList` - Retrieve available aircraft
|
|
- `GetAircraftLogData` - Download application log data
|
|
|
|
### 6. Data Processing Pipeline
|
|
|
|
**Log Processing Workflow:**
|
|
1. **Discovery:** Polling worker identifies new logs
|
|
2. **Queuing:** Creates `process_partner_log` tasks
|
|
3. **Processing:** Downloads and parses log data
|
|
4. **Matching:** Matches logs to specific job assignments
|
|
5. **Storage:** Creates AgMission application files and details
|
|
6. **Tracking:** Updates processing status and prevents duplicates
|
|
|
|
**Job Matching Algorithm:**
|
|
- Aircraft ID correlation (primary)
|
|
- Time proximity analysis (secondary)
|
|
- Geographic bounding box comparison (future enhancement)
|
|
|
|
## Deployment and Operation
|
|
|
|
### Starting Workers
|
|
|
|
**All Workers:**
|
|
```bash
|
|
npm run start:workers
|
|
```
|
|
|
|
**Individual Workers:**
|
|
```bash
|
|
npm run start:job-worker # Internal job processing only
|
|
npm run start:partner-sync # Partner synchronization tasks
|
|
npm run start:partner-polling # Automated partner data discovery
|
|
```
|
|
|
|
### Configuration
|
|
|
|
**Environment Variables:**
|
|
- `QUEUE_HOST`, `QUEUE_PORT` - AMQP broker configuration
|
|
- `PARTNER_POLLING_INTERVAL` - Polling frequency (default: 300000ms)
|
|
- Partner-specific credentials in database
|
|
|
|
### Monitoring
|
|
|
|
**Worker Manager (`start_workers.js`):**
|
|
- Automatic worker restart on failure
|
|
- Process status monitoring
|
|
- Graceful shutdown handling
|
|
- Centralized logging with worker identification
|
|
|
|
**Logging:**
|
|
- Debug namespace: `agm:partner-*` for all partner operations
|
|
- Individual worker namespaces for targeted debugging
|
|
- Processing duration and error tracking
|
|
|
|
## Database Schema Updates
|
|
|
|
### 1. Assignment Status Enhancement
|
|
```javascript
|
|
// helpers/constants.js - AssignStatus (actual values)
|
|
const AssignStatus = Object.freeze({
|
|
NEW: 0, // Freshly assigned
|
|
DOWNLOADED: 1, // Log file downloaded from partner
|
|
UPLOADED: 2 // Job successfully uploaded to partner system
|
|
});
|
|
```
|
|
|
|
### 2. Job Assignment Model (`model/job_assign.js`)
|
|
```javascript
|
|
// Partner-specific fields:
|
|
extJobId: String, // External job ID from partner system (indexed)
|
|
notes: String, // Partner-specific notes or instructions
|
|
status: Number, // AssignStatus enum: NEW=0, DOWNLOADED=1, UPLOADED=2
|
|
// Static helpers: populateWithPartnerInfo(), findByIdWithPartnerInfo()
|
|
// Instance helpers: hasPartnerIntegration(), getPartnerCode(), getPartnerAircraftId()
|
|
```
|
|
|
|
### 3. Partner Log Tracking (`model/partner_log_tracker.js`)
|
|
|
|
One record per log file. Status lifecycle:
|
|
```
|
|
PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED | FAILED | SKIPPED
|
|
```
|
|
|
|
Key fields: `logId`, `partnerCode`, `aircraftId`, `customerId`, `logFileName`, `localFilePath`, `retryCount`, `errorMessage`, `matchedJobId`, `processedAt`
|
|
|
|
### 4. Task Tracker (`model/task_tracker.js`) — Added Phase 2, Jan 2026
|
|
|
|
One record per queue task. Status lifecycle:
|
|
```
|
|
QUEUED → PROCESSING → COMPLETED | FAILED
|
|
```
|
|
|
|
Key features:
|
|
- Unique `taskId + executionId` index prevents duplicate tracking
|
|
- `findRetryChain(taskId)` — all retry attempts for a single logical task
|
|
- `findStuckTasks(queueName, timeoutMs)` — detects hung tasks
|
|
- `getQueueStats(queueName)` — aggregate status counts
|
|
|
|
### DLQ Recovery
|
|
|
|
All DLQ operations are queue-native (no MongoDB coupling). Use the global API or web dashboard:
|
|
```bash
|
|
# Retry all failed tasks
|
|
POST /api/dlq/partner_tasks/retryAll
|
|
|
|
# Retry by RabbitMQ position
|
|
POST /api/dlq/partner_tasks/retryByPosition { "position": 0 }
|
|
|
|
# Retry all SATLOC tasks
|
|
POST /api/dlq/partner_tasks/retryByHeader { "headerName": "x-partner-code", "headerValue": "SATLOC" }
|
|
|
|
# Web dashboard:
|
|
http://localhost:4100/public/dlq-monitor.html
|
|
```
|
|
|
|
See [DLQ_INDEX.md](DLQ_INDEX.md) for full DLQ documentation.
|
|
|
|
### SatLoc Cloud API Implementation
|
|
|
|
**Authentication:**
|
|
- Individual customer credentials stored securely
|
|
- Token-based authentication with refresh logic
|
|
- Environment-specific endpoint configuration
|
|
|
|
**Job Upload Format:**
|
|
```javascript
|
|
{
|
|
CustomerId: "customer_id",
|
|
AircraftList: [{
|
|
AircraftId: "aircraft_id",
|
|
JobDataList: [{
|
|
JobName: "job_name",
|
|
TargetSprayRate: rate_value,
|
|
Notes: "additional_notes",
|
|
AreaData: "base64_encoded_kml"
|
|
}]
|
|
}]
|
|
}
|
|
```
|
|
|
|
**Data Retrieval:**
|
|
- Automated aircraft list polling
|
|
- Log data download with metadata
|
|
- Structured data parsing and validation
|
|
|
|
## Error Handling and Resilience
|
|
|
|
### Partner API Failures
|
|
- Immediate upload attempts with health checking
|
|
- Graceful fallback to queue-based processing
|
|
- Retry logic with exponential backoff
|
|
- Comprehensive error logging and tracking
|
|
|
|
### Data Processing Errors
|
|
- Individual log processing failure isolation
|
|
- Retry mechanisms with configurable limits
|
|
- Error state tracking and reporting
|
|
- Partial success handling for batch operations
|
|
|
|
### System Recovery
|
|
- Worker restart on failure
|
|
- Queue persistence for unprocessed tasks
|
|
- State recovery from database tracking
|
|
- Duplicate prevention during recovery
|
|
|
|
## Performance Considerations
|
|
|
|
### Polling Optimization
|
|
- Efficient customer grouping for API calls
|
|
- Configurable polling intervals
|
|
- Smart filtering to reduce API load
|
|
- Batch processing for multiple logs
|
|
|
|
### Queue Management
|
|
- Dedicated partner queue prevents internal job blocking
|
|
- Task prioritization for time-sensitive operations
|
|
- Worker scaling capabilities
|
|
- Queue monitoring and alerts
|
|
|
|
### Database Optimization
|
|
- Efficient indexing on partner tracking collections
|
|
- Bulk operations for application data insertion
|
|
- Query optimization for job matching
|
|
- Archive strategies for processed logs
|
|
|
|
## Security Implementation
|
|
|
|
### API Security
|
|
- Individual customer credentials isolation
|
|
- Secure credential storage and retrieval
|
|
- Token refresh and expiration handling
|
|
- API endpoint validation and sanitization
|
|
|
|
### Data Protection
|
|
- Secure log data transmission
|
|
- Encrypted storage of sensitive information
|
|
- Access control for partner operations
|
|
- Audit logging for all partner interactions
|
|
|
|
## Future Enhancements
|
|
|
|
### Geographic Matching
|
|
- Enhanced job-log matching using geographic boundaries
|
|
- Spatial indexing for performance optimization
|
|
- Area coverage analysis and validation
|
|
|
|
### Real-time Notifications
|
|
- WebSocket integration for immediate status updates
|
|
- Partner API webhook support
|
|
- Real-time dashboard for partner operations
|
|
|
|
### Advanced Analytics
|
|
- Partner performance metrics and reporting
|
|
- Data quality analysis and validation
|
|
- Predictive maintenance and optimization
|
|
|
|
### Multi-Partner Support
|
|
- Additional partner system integrations
|
|
- Unified partner management interface
|
|
- Cross-partner data correlation and analysis
|
|
|
|
## Testing and Validation
|
|
|
|
### Integration Testing
|
|
- Partner API connectivity validation
|
|
- End-to-end workflow testing
|
|
- Error scenario simulation
|
|
- Performance and load testing
|
|
|
|
### Data Validation
|
|
- Log parsing accuracy verification
|
|
- Job matching algorithm validation
|
|
- Application data integrity checks
|
|
- Duplicate prevention testing
|
|
|
|
This implementation provides a robust, scalable foundation for partner aircraft integration with immediate upload capabilities, automated data discovery, and comprehensive error handling.
|