agmission/Development/server/docs/PARTNER_INTEGRATION_IMPLEMENTATION.md

326 lines
11 KiB
Markdown

# Partner Integration System - Implementation Summary
## Overview
This document outlines the comprehensive partner integration system implemented for SatLoc and other partner aircraft systems. The system provides immediate job upload capabilities, automated data polling, and robust data synchronization.
## Architecture Components
### 1. Dedicated Partner Queue System
**Queue Configuration:**
- **Queue Name:** `partner_tasks` (production) / `dev_partner_tasks` (development)
- **DLQ:** `partner_tasks_failed` — managed via global `/api/dlq/:queueName/*` endpoints
- **Purpose:** Separate partner operations from internal job processing (`dev_jobs`)
- **Worker:** `partner_sync_worker.js` consumes all partner-related tasks
**Task Types** (from `PartnerTasks` in `helpers/constants.js`):
- `upload_partner_job` — Upload job assignments to partner aircraft
- `process_partner_log` — Process a downloaded partner log file
- `process_partner_data_file` — Process a locally-stored partner data file
> Note: Data polling is done by cron inside `partner_data_polling_worker.js` (every 15 min prod / 1 min dev), not via a queue task.
**Queue Improvements (Recent):**
- **Channel Management:** Proper channel reset on errors/close
- **Offline Queue:** Tasks are queued when connection is unavailable
- **Error Propagation:** Proper error handling with callbacks
- **Async Support:** Added `addTaskASync()` for promise-based usage
- **Enum Constants:** All task types use `PartnerTasks` enum for consistency
### 2. Enhanced Job Assignment (`controllers/job.js`)
**Design Architecture:**
- **Internal User IDs:** All job assignments use internal user IDs, never partner system user IDs
- **Partner Detection:** System detects partner integration requirements from assignment context
- **Credential Separation:** Partner system users provide API credentials for worker communication only
**New Features:**
- **Immediate Upload:** When assigning jobs to partner aircraft, system attempts immediate upload if API is available
- **Health Checking:** Validates partner API connectivity before upload attempts
- **Fallback Queuing:** Jobs are queued for later processing if immediate upload fails
- **Status Tracking:** New `AssignStatus.UPLOADED = 2` for tracking successfully uploaded jobs
**Key Functions:**
```javascript
processPartnerAssignment(assignment, jobData) // Main assignment processing (consolidated method)
checkPartnerAPIHealth(partnerCode, customerId) // API health validation
```
**Assignment Flow:**
1. Accept assignment request with internal user ID
2. Create JobAssign record with internal user ID
3. Detect partner integration requirements from user/customer context
4. Queue partner sync jobs using partner system user credentials
### 3. Partner Data Polling Worker (`workers/partner_data_polling_worker.js`)
**Functionality:**
- **Automated Discovery:** Periodically polls all partner systems for new aircraft and logs
- **Smart Filtering:** Only processes new logs that haven't been handled before
- **Task Queuing:** Creates processing tasks for discovered data
- **Configurable Intervals:** Adjustable polling frequency per partner system
**Polling Logic:**
- Groups partners by customer for efficient API calls
- Retrieves aircraft lists and available log data
- Filters new logs using `PartnerLogTracker` model
- Queues `process_partner_log` tasks for new data
### 4. Partner Log Tracking (`model/partner_log_tracker.js`)
**Purpose:** Prevents duplicate processing of partner log data
**Schema Features:**
- Unique indexing on partner, aircraft, and log combinations
- Processing status and retry count tracking
- Job matching and application file references
- Processing duration and error tracking
### 5. SatLoc API Integration (`services/partner_sync_service.js`)
**Real Implementation:**
- **Authentication:** Individual partner system user credentials per customer
- **Endpoints:** Full implementation of SatLoc Cloud API specification
- **Error Handling:** Comprehensive error handling with graceful fallbacks
- **Health Monitoring:** API connectivity monitoring for immediate upload decisions
**Key Endpoints:**
- `UploadJobData` - Upload job assignments to aircraft
- `GetAircraftList` - Retrieve available aircraft
- `GetAircraftLogData` - Download application log data
### 6. Data Processing Pipeline
**Log Processing Workflow:**
1. **Discovery:** Polling worker identifies new logs
2. **Queuing:** Creates `process_partner_log` tasks
3. **Processing:** Downloads and parses log data
4. **Matching:** Matches logs to specific job assignments
5. **Storage:** Creates AgMission application files and details
6. **Tracking:** Updates processing status and prevents duplicates
**Job Matching Algorithm:**
- Aircraft ID correlation (primary)
- Time proximity analysis (secondary)
- Geographic bounding box comparison (future enhancement)
## Deployment and Operation
### Starting Workers
**All Workers:**
```bash
npm run start:workers
```
**Individual Workers:**
```bash
npm run start:job-worker # Internal job processing only
npm run start:partner-sync # Partner synchronization tasks
npm run start:partner-polling # Automated partner data discovery
```
### Configuration
**Environment Variables:**
- `QUEUE_HOST`, `QUEUE_PORT` - AMQP broker configuration
- `PARTNER_POLLING_INTERVAL` - Polling frequency (default: 300000ms)
- Partner-specific credentials in database
### Monitoring
**Worker Manager (`start_workers.js`):**
- Automatic worker restart on failure
- Process status monitoring
- Graceful shutdown handling
- Centralized logging with worker identification
**Logging:**
- Debug namespace: `agm:partner-*` for all partner operations
- Individual worker namespaces for targeted debugging
- Processing duration and error tracking
## Database Schema Updates
### 1. Assignment Status Enhancement
```javascript
// helpers/constants.js - AssignStatus (actual values)
const AssignStatus = Object.freeze({
NEW: 0, // Freshly assigned
DOWNLOADED: 1, // Log file downloaded from partner
UPLOADED: 2 // Job successfully uploaded to partner system
});
```
### 2. Job Assignment Model (`model/job_assign.js`)
```javascript
// Partner-specific fields:
extJobId: String, // External job ID from partner system (indexed)
notes: String, // Partner-specific notes or instructions
status: Number, // AssignStatus enum: NEW=0, DOWNLOADED=1, UPLOADED=2
// Static helpers: populateWithPartnerInfo(), findByIdWithPartnerInfo()
// Instance helpers: hasPartnerIntegration(), getPartnerCode(), getPartnerAircraftId()
```
### 3. Partner Log Tracking (`model/partner_log_tracker.js`)
One record per log file. Status lifecycle:
```
PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED | FAILED | SKIPPED
```
Key fields: `logId`, `partnerCode`, `aircraftId`, `customerId`, `logFileName`, `localFilePath`, `retryCount`, `errorMessage`, `matchedJobId`, `processedAt`
### 4. Task Tracker (`model/task_tracker.js`) — Added Phase 2, Jan 2026
One record per queue task. Status lifecycle:
```
QUEUED → PROCESSING → COMPLETED | FAILED
```
Key features:
- Unique `taskId + executionId` index prevents duplicate tracking
- `findRetryChain(taskId)` — all retry attempts for a single logical task
- `findStuckTasks(queueName, timeoutMs)` — detects hung tasks
- `getQueueStats(queueName)` — aggregate status counts
### DLQ Recovery
All DLQ operations are queue-native (no MongoDB coupling). Use the global API or web dashboard:
```bash
# Retry all failed tasks
POST /api/dlq/partner_tasks/retryAll
# Retry by RabbitMQ position
POST /api/dlq/partner_tasks/retryByPosition { "position": 0 }
# Retry all SATLOC tasks
POST /api/dlq/partner_tasks/retryByHeader { "headerName": "x-partner-code", "headerValue": "SATLOC" }
# Web dashboard:
http://localhost:4100/public/dlq-monitor.html
```
See [DLQ_INDEX.md](DLQ_INDEX.md) for full DLQ documentation.
### SatLoc Cloud API Implementation
**Authentication:**
- Individual customer credentials stored securely
- Token-based authentication with refresh logic
- Environment-specific endpoint configuration
**Job Upload Format:**
```javascript
{
CustomerId: "customer_id",
AircraftList: [{
AircraftId: "aircraft_id",
JobDataList: [{
JobName: "job_name",
TargetSprayRate: rate_value,
Notes: "additional_notes",
AreaData: "base64_encoded_kml"
}]
}]
}
```
**Data Retrieval:**
- Automated aircraft list polling
- Log data download with metadata
- Structured data parsing and validation
## Error Handling and Resilience
### Partner API Failures
- Immediate upload attempts with health checking
- Graceful fallback to queue-based processing
- Retry logic with exponential backoff
- Comprehensive error logging and tracking
### Data Processing Errors
- Individual log processing failure isolation
- Retry mechanisms with configurable limits
- Error state tracking and reporting
- Partial success handling for batch operations
### System Recovery
- Worker restart on failure
- Queue persistence for unprocessed tasks
- State recovery from database tracking
- Duplicate prevention during recovery
## Performance Considerations
### Polling Optimization
- Efficient customer grouping for API calls
- Configurable polling intervals
- Smart filtering to reduce API load
- Batch processing for multiple logs
### Queue Management
- Dedicated partner queue prevents internal job blocking
- Task prioritization for time-sensitive operations
- Worker scaling capabilities
- Queue monitoring and alerts
### Database Optimization
- Efficient indexing on partner tracking collections
- Bulk operations for application data insertion
- Query optimization for job matching
- Archive strategies for processed logs
## Security Implementation
### API Security
- Individual customer credentials isolation
- Secure credential storage and retrieval
- Token refresh and expiration handling
- API endpoint validation and sanitization
### Data Protection
- Secure log data transmission
- Encrypted storage of sensitive information
- Access control for partner operations
- Audit logging for all partner interactions
## Future Enhancements
### Geographic Matching
- Enhanced job-log matching using geographic boundaries
- Spatial indexing for performance optimization
- Area coverage analysis and validation
### Real-time Notifications
- WebSocket integration for immediate status updates
- Partner API webhook support
- Real-time dashboard for partner operations
### Advanced Analytics
- Partner performance metrics and reporting
- Data quality analysis and validation
- Predictive maintenance and optimization
### Multi-Partner Support
- Additional partner system integrations
- Unified partner management interface
- Cross-partner data correlation and analysis
## Testing and Validation
### Integration Testing
- Partner API connectivity validation
- End-to-end workflow testing
- Error scenario simulation
- Performance and load testing
### Data Validation
- Log parsing accuracy verification
- Job matching algorithm validation
- Application data integrity checks
- Duplicate prevention testing
This implementation provides a robust, scalable foundation for partner aircraft integration with immediate upload capabilities, automated data discovery, and comprehensive error handling.