436 lines
16 KiB
Markdown
436 lines
16 KiB
Markdown
# Partner Log File Processing
|
|
|
|
This document describes how partner log files are discovered, downloaded, and processed in the AgMission partner integration system.
|
|
|
|
> **Last updated**: February 2026. For full architecture diagrams see [PARTNER_INTEGRATION_ARCHITECTURE.md](PARTNER_INTEGRATION_ARCHITECTURE.md).
|
|
|
|
## Overview
|
|
|
|
Partner log files flow through a three-stage pipeline split across two workers:
|
|
|
|
1. **Discovery & Download** — `partner_data_polling_worker.js` (cron-driven)
|
|
2. **Queue handoff** — `PROCESS_PARTNER_LOG` task to `partner_tasks` queue
|
|
3. **Parsing & Storage** — `partner_sync_worker.js` processes the local file
|
|
|
|
## Components
|
|
|
|
### 1. Partner Data Polling Worker (`workers/partner_data_polling_worker.js`)
|
|
- Cron schedule: every 15 min (prod) / 1 min (dev)
|
|
- Queries `JobAssign` where `status = UPLOADED` to find jobs awaiting log data
|
|
- Calls `partnerService.getAircraftLogs(customerId, aircraftId)` to list available logs
|
|
- Filters out already-processed logs via `PartnerLogTracker` (`processed = true`)
|
|
- Downloads each new log via `partnerService.getAircraftLogData(customerId, logId)` — returns binary buffer
|
|
- Saves binary to `SATLOC_STORAGE_PATH/{logFileName}`
|
|
- Updates `PartnerLogTracker`: `PENDING → DOWNLOADING → DOWNLOADED`
|
|
- Enqueues `PROCESS_PARTNER_LOG` task to `partner_tasks` queue
|
|
|
|
### 2. Partner Log Tracker (`model/partner_log_tracker.js`)
|
|
Tracks per-log-file lifecycle atomically to prevent duplicate processing:
|
|
```
|
|
PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED / FAILED
|
|
```
|
|
Key fields: `logId`, `partnerCode`, `aircraftId`, `customerId`, `logFileName`, `savedLocalFile`, `enqueuedAt`, `retryCount`
|
|
|
|
### 3. Partner Sync Worker (`workers/partner_sync_worker.js`)
|
|
- Consumes `PROCESS_PARTNER_LOG` tasks from `partner_tasks` queue
|
|
- `TaskTracker` idempotency check — skips if already claimed by another worker instance
|
|
- Atomically claims `PartnerLogTracker` → `PROCESSING`
|
|
- Reads the saved local file from `SATLOC_STORAGE_PATH`
|
|
- Parses with `SatLocLogParser` (`helpers/satloc_log_parser.js`)
|
|
- Processes records with `SatLocApplicationProcessor` (`helpers/satloc_application_processor.js`)
|
|
- Writes `ApplicationFile` + `ApplicationDetail` records to MongoDB
|
|
- Sets `PartnerLogTracker → PROCESSED`
|
|
- On unrecoverable error: `nack` → RabbitMQ routes to `partner_tasks_failed` DLQ
|
|
|
|
### 4. SatLocLogParser (`helpers/satloc_log_parser.js`)
|
|
- Core binary parser supporting 43+ SatLoc record types
|
|
- Parses `.LOG` binary files produced by SatLoc G4 devices
|
|
- Memory-efficient streaming processing
|
|
|
|
### 5. SatLocApplicationProcessor (`helpers/satloc_application_processor.js`)
|
|
- Converts parsed records to `ApplicationDetail` documents
|
|
- Matches log data to `JobAssign` records via aircraft ID + time proximity
|
|
- Handles multi-file grouping: one `Application` → many `ApplicationFile` → many `ApplicationDetail`
|
|
|
|
### 6. Partner Service Factory (`helpers/partner_service_factory.js`)
|
|
- Singleton factory; creates and caches partner service instances
|
|
- Provides `fetchLogFile(logFileName, partnerCode)` for manual/utility use
|
|
- All workers get partner services through this factory
|
|
|
|
## Process Flow
|
|
|
|
```
|
|
Cron trigger (15 min prod / 1 min dev)
|
|
└─ Query JobAssign WHERE status = UPLOADED
|
|
└─ Per aircraft: getAircraftLogs(customerId, aircraftId) → SatLoc API
|
|
└─ Filter new logs (not in PartnerLogTracker with processed=true)
|
|
└─ Per new log:
|
|
├─ Upsert PartnerLogTracker (PENDING)
|
|
├─ Claim → DOWNLOADING
|
|
├─ getAircraftLogData(customerId, logId) → binary Buffer
|
|
├─ Write to SATLOC_STORAGE_PATH/
|
|
├─ Update tracker → DOWNLOADED
|
|
└─ Queue PROCESS_PARTNER_LOG {customerId, partnerCode, aircraftId, logId, logFileName, taskId, executionId}
|
|
|
|
partner_sync_worker consumes PROCESS_PARTNER_LOG:
|
|
├─ TaskTracker idempotency check (skip if claimed)
|
|
├─ Claim PartnerLogTracker → PROCESSING
|
|
├─ Read file from SATLOC_STORAGE_PATH
|
|
├─ SatLocLogParser.parse() → records[]
|
|
├─ SatLocApplicationProcessor.process() → ApplicationDetail[]
|
|
├─ Write to MongoDB
|
|
├─ PartnerLogTracker → PROCESSED
|
|
└─ ack() message
|
|
```
|
|
|
|
## Task Message Format
|
|
|
|
```javascript
|
|
// PROCESS_PARTNER_LOG task payload (enqueued by polling worker)
|
|
{
|
|
type: 'process_partner_log',
|
|
data: {
|
|
customerId: '<ObjectId string>',
|
|
partnerCode: 'SATLOC',
|
|
aircraftId: '<partnerAircraftId>',
|
|
logId: '<partner log ID>',
|
|
logFileName: '2507140724SatlocG4_b4ef.LOG',
|
|
taskId: '<deterministic task ID>', // for deduplication
|
|
executionId: '<random execution ID>' // for idempotency
|
|
}
|
|
}
|
|
```
|
|
|
|
## SatLoc Data Mapping
|
|
|
|
### Core Position Data (Record Type 1)
|
|
- GPS Coordinates: `lat`, `lon` → `ApplicationDetail.lat/lon`
|
|
- Timestamps: `timestamp` → `gpsTime` (Unix epoch)
|
|
- Motion: `speed`, `track`, `altitude` → `grSpeed`, `head`, `alt`
|
|
- Spray status: `sprayStat` → boolean mapping
|
|
|
|
### Environmental Records
|
|
- **Wind (Type 50)**: `windSpeed`, `windDirection` → `windSpd`, `windDir`
|
|
- **Environment (Type 110)**: `temperature`, `humidity` → `temp`, `humid`
|
|
|
|
### Flow & Application Data
|
|
- **Flow Monitor (Type 30)**: `pressure`, `flowRate` → `psi`, `lminApp`
|
|
- **Target Rates (Type 32)**: `targetRate` → `lminReq`
|
|
|
|
## File Type Support
|
|
|
|
### Currently Supported
|
|
- **SatLoc `.LOG` files** — binary format per `Transland_SATLOC_Log_File_Formats_v3_76.md`
|
|
|
|
### Adding New Partner File Types
|
|
|
|
To add a second partner:
|
|
1. Implement `services/<partner>_service.js` extending `base_partner_service.js`
|
|
2. Register in `helpers/partner_config.js`
|
|
3. Register in `helpers/partner_service_factory.js`
|
|
4. The polling worker and sync worker will auto-discover via the factory
|
|
|
|
## Error Handling
|
|
|
|
- **Download failure**: tracker reset to `FAILED`; retried on next cron run (up to `PARTNER_MAX_RETRIES`)
|
|
- **Processing failure**: `nack` → DLQ `partner_tasks_failed`; retry via `/api/dlq/partner_tasks/retryAll`
|
|
- **Stuck tasks**: periodic cleanup in polling worker resets `DOWNLOADING`/`DOWNLOADED`/`PROCESSING` trackers that exceed timeout thresholds
|
|
- **Circuit breaker**: sync worker blocks files that fail repeatedly (configurable; lenient in dev)
|
|
|
|
## Environment Variables
|
|
|
|
```bash
|
|
SATLOC_STORAGE_PATH=/path/to/satloc/uploads # where log files are stored
|
|
SATLOC_API_ENDPOINT=https://www.satloccloudfc.com/api/Satloc
|
|
SATLOC_API_TIMEOUT=30000
|
|
PARTNER_MAX_RETRIES=5
|
|
PARTNER_POLLING_ENABLED=true
|
|
PARTNER_SYNC_ENABLED=true
|
|
```
|
|
|
|
## DLQ Recovery
|
|
|
|
```bash
|
|
# Retry all failed log processing tasks
|
|
POST /api/dlq/partner_tasks/retryAll
|
|
|
|
# Retry specific position
|
|
POST /api/dlq/partner_tasks/retryByPosition { "position": 0 }
|
|
|
|
# Retry by partner code
|
|
POST /api/dlq/partner_tasks/retryByHeader { "headerName": "x-partner-code", "headerValue": "SATLOC" }
|
|
```
|
|
|
|
See [DLQ_API_REFERENCE.md](DLQ_API_REFERENCE.md) for full DLQ documentation.
|
|
|
|
## Enhanced Architecture
|
|
|
|
### Components
|
|
|
|
1. **Partner Data Polling Worker** (`workers/partner_data_polling_worker.js`)
|
|
- **Enhanced**: Downloads log files using `partnerService.downloadLogFile()`
|
|
- **Enhanced**: Stores files locally in partner-specific directories
|
|
- **Enhanced**: Updates `PartnerLogTracker` with download status and file paths
|
|
- **Enhanced**: Enqueues `PROCESS_PARTNER_DATA_FILE` tasks with local paths
|
|
|
|
2. **SatLocBinaryProcessor** (`helpers/satloc_binary_processor.js`)
|
|
- **New**: Wrapper around proven `SatLocLogParser` with enhanced statistics
|
|
- **Achievement**: 100% parsing success rate (21,601/21,601 records)
|
|
- **Performance**: < 2 seconds for 20MB+ files, < 100MB memory peak
|
|
- **Statistics**: Comprehensive spray/environmental metrics calculation
|
|
|
|
3. **Partner Sync Worker** (`workers/partner_sync_worker.js`)
|
|
- **Enhanced**: Processes local partner log files using `SatLocBinaryProcessor`
|
|
- **Enhanced**: Comprehensive statistics calculation and application metrics
|
|
- Handles SatLoc binary log files and other partner formats
|
|
- Saves enhanced application details and updates application status
|
|
|
|
4. **SatLocLogParser** (`helpers/satloc_log_parser.js`)
|
|
- **Proven**: Core binary parsing engine supporting 43+ record types
|
|
- Battle-tested with comprehensive error handling
|
|
- Memory-efficient streaming processing
|
|
- Foundation for 100% parsing success rate
|
|
|
|
5. **Partner Service Factory** (`helpers/partner_service_factory.js`)
|
|
- Centralized factory for creating partner service instances
|
|
- Provides unified `downloadLogFile()` method across all partners
|
|
- **Enhanced**: Supports local file storage and tracking
|
|
|
|
6. **Partner Services** (`services/satloc_service.js`, etc.)
|
|
- Individual service classes for each partner
|
|
- **Enhanced**: Implement `downloadLogFile()` for reliable file acquisition
|
|
- Partner-specific API communication and file fetching logic
|
|
|
|
7. **Partner Configuration** (`helpers/partner_config.js`)
|
|
- Storage configuration for each partner
|
|
- File path settings, extensions, and validation rules
|
|
|
|
## Enhanced Process Flow
|
|
|
|
### 1. File Download and Storage Process (Partner Data Polling Worker)
|
|
1. **Discovery**: Polling worker identifies new logs via partner API calls
|
|
2. **Download**: Uses `partnerService.downloadLogFile()` to download base64 content
|
|
3. **Storage**: Decodes and stores files in partner-specific directories
|
|
4. **Tracking**: Updates `PartnerLogTracker` with local file path and download status
|
|
5. **Queuing**: Enqueues `PROCESS_PARTNER_DATA_FILE` task with local file path
|
|
|
|
### 2. Binary Log Processing (Partner Sync Worker)
|
|
1. **Receive Task**: Partner Sync Worker receives `PROCESS_PARTNER_DATA_FILE` task
|
|
2. **Binary Processing**: Uses `SatLocBinaryProcessor` to parse local file with 100% success
|
|
3. **Statistics Calculation**: Enhanced metrics including spray/environmental data
|
|
4. **Application Details**: Extracts comprehensive application details and saves to database
|
|
5. **Status Updates**: Updates ApplicationFile and Application status, PartnerLogTracker completion
|
|
|
|
### 3. Performance Achievements
|
|
- **Success Rate**: 100% (21,601/21,601 valid records)
|
|
- **Processing Speed**: < 2 seconds for 20MB+ binary files
|
|
- **Memory Efficiency**: < 100MB peak memory usage
|
|
- **Record Coverage**: 43+ supported SatLoc record types
|
|
- **Statistics Enhancement**: Comprehensive spray and environmental metrics
|
|
|
|
## Configuration
|
|
|
|
Same as before - see environment variables section below.
|
|
|
|
### Environment Variables
|
|
|
|
Add the following environment variables for each partner:
|
|
|
|
```bash
|
|
# SatLoc Configuration
|
|
SATLOC_STORAGE_PATH=/data/partners/satloc
|
|
SATLOC_TEMP_PATH=/tmp/satloc
|
|
SATLOC_MAX_FILE_AGE=7776000000 # 90 days in milliseconds
|
|
|
|
# AGIDRONEX Configuration
|
|
AGIDRONEX_STORAGE_PATH=/data/partners/agidronex
|
|
AGIDRONEX_TEMP_PATH=/tmp/agidronex
|
|
AGIDRONEX_MAX_FILE_AGE=7776000000
|
|
```
|
|
|
|
### Storage Directory Structure
|
|
|
|
```
|
|
/data/partners/
|
|
├── satloc/
|
|
│ ├── 2007281153SatlocG40010.log
|
|
│ ├── 2007281154SatlocG40010.log
|
|
│ └── ...
|
|
└── agidronex/
|
|
├── flight_log_001.log
|
|
├── flight_log_002.log
|
|
└── ...
|
|
```
|
|
|
|
## Usage Examples
|
|
|
|
### 1. Partner Log File Processing (Automatic)
|
|
|
|
Partner log files are automatically processed when uploaded as part of a job:
|
|
|
|
```javascript
|
|
// When a job is uploaded with .log files, they are automatically:
|
|
// 1. Detected by the job worker during file classification
|
|
// 2. ApplicationFile record is created
|
|
// 3. Enqueued to partner sync worker for processing
|
|
// 4. Processed asynchronously by partner sync worker
|
|
|
|
// The process is transparent to the user - no manual intervention required
|
|
```
|
|
|
|
### 2. Manual Partner Log File Processing
|
|
|
|
To manually enqueue a partner data file for processing:
|
|
|
|
```javascript
|
|
// In job worker context
|
|
const filePath = '/path/to/partner/logfile.log';
|
|
const fileId = applicationFile._id;
|
|
const applicationId = application._id;
|
|
const fileName = 'example.log';
|
|
|
|
const success = await enqueuePartnerDataFile(filePath, fileId, applicationId, fileName);
|
|
if (success) {
|
|
console.log('Partner data file enqueued successfully');
|
|
} else {
|
|
console.log('Failed to enqueue partner data file');
|
|
}
|
|
```
|
|
|
|
### 3. Partner Sync Worker Task Processing
|
|
|
|
The partner sync worker processes different types of tasks:
|
|
|
|
```javascript
|
|
// Task types handled by partner sync worker
|
|
const taskTypes = {
|
|
UPLOAD_PARTNER_JOB: 'upload_partner_job', // Upload jobs to partner systems
|
|
PROCESS_PARTNER_LOG: 'process_partner_log', // Process logs from partner APIs
|
|
PROCESS_PARTNER_DATA_FILE: 'process_partner_data_file' // Process uploaded partner files
|
|
};
|
|
|
|
// Example task message for partner data file processing
|
|
const taskMessage = {
|
|
type: 'process_partner_data_file',
|
|
data: {
|
|
filePath: '/tmp/uploads/example.log',
|
|
fileId: '60a7c8d8f123456789abcdef',
|
|
applicationId: '60a7c8d8f123456789abcde0',
|
|
fileName: 'example.log',
|
|
enqueuedAt: new Date()
|
|
}
|
|
};
|
|
```
|
|
|
|
## File Type Support
|
|
|
|
### Currently Supported
|
|
|
|
1. **SatLoc Log Files** (`.log`)
|
|
- Binary format according to LOGFileFormat_Air_3_76.md specification
|
|
- Parsed using `SatLocLogParser` class
|
|
- Extracts position, GPS, flow, and application data
|
|
|
|
### Adding New Partner File Types
|
|
|
|
To add support for new partner file formats:
|
|
|
|
1. **Add file extension detection** in `job_worker.js`:
|
|
```javascript
|
|
// In importData() function, file classification section
|
|
} else if ((match = basename.match(/^.*\.dat$/i))) {
|
|
// New partner format
|
|
const stats = fs.statSync(file);
|
|
const m = stats && stats.mtime ? moment.utc(stats.mtime) : moment.utc();
|
|
agn = m.format('YYMMDDHHmm').substring(1);
|
|
typedFiles.push({ type: FILE.DATA_PARTNER_LOG, agn: agn, file: file });
|
|
```
|
|
|
|
2. **Update parser logic** in `readPartnerLogFile()`:
|
|
```javascript
|
|
// Add new parser for different file extensions
|
|
if (fileName.endsWith('.log')) {
|
|
// SatLoc format
|
|
const parser = new SatLocLogParser(options);
|
|
// ... existing logic
|
|
} else if (fileName.endsWith('.dat')) {
|
|
// New partner format
|
|
const parser = new NewPartnerParser(options);
|
|
// ... new parsing logic
|
|
}
|
|
```
|
|
|
|
3. **Add partner configuration** in `partner_config.js`:
|
|
```javascript
|
|
storage: {
|
|
basePath: env.NEWPARTNER_STORAGE_PATH || '/data/partners/newpartner',
|
|
tempPath: env.NEWPARTNER_TEMP_PATH || '/tmp/newpartner',
|
|
logFileExtensions: ['.dat', '.bin', '.txt'],
|
|
maxFileAge: parseInt(env.NEWPARTNER_MAX_FILE_AGE) || 7776000000
|
|
}
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
The system includes comprehensive error handling:
|
|
|
|
- **File not found**: Proper error messages when log files don't exist
|
|
- **Permission errors**: Clear feedback on access issues
|
|
- **Invalid file extensions**: Validation against allowed extensions
|
|
- **File age validation**: Optional warnings for old files
|
|
- **Parser errors**: Graceful handling of corrupted or invalid log files
|
|
|
|
## Performance Considerations
|
|
|
|
1. **File Size Limits**: Configure `maxFileSize` in partner configuration
|
|
2. **Batch Processing**: Job worker processes files in batches of 1000 records
|
|
3. **Memory Management**: Parser includes garbage collection hints
|
|
4. **Caching**: File metadata cached to avoid repeated filesystem calls
|
|
|
|
## Testing
|
|
|
|
To test the log file processing:
|
|
|
|
1. **Create test log files** in partner storage directories
|
|
2. **Run syntax checks**:
|
|
```bash
|
|
node -c helpers/partner_service_factory.js
|
|
node -c services/satloc_service.js
|
|
node -c workers/job_worker.js
|
|
```
|
|
|
|
3. **Test file fetching**:
|
|
```bash
|
|
node -e "
|
|
const factory = require('./helpers/partner_service_factory');
|
|
factory.fetchLogFile('test.log', 'SATLOC').then(console.log).catch(console.error);
|
|
"
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **Storage path not found**
|
|
- Ensure `PARTNER_STORAGE_PATH` environment variables are set
|
|
- Verify directory permissions
|
|
|
|
2. **File extension not supported**
|
|
- Check `logFileExtensions` configuration
|
|
- Add new extensions as needed
|
|
|
|
3. **Parser errors**
|
|
- Verify file format matches expected partner specification
|
|
- Check parser configuration options
|
|
|
|
4. **Memory issues with large files**
|
|
- Adjust `batchSize` in parser options
|
|
- Monitor memory usage during processing
|
|
|
|
## Security
|
|
|
|
- File access is restricted to configured storage directories
|
|
- File extension validation prevents processing of unauthorized file types
|
|
- Path traversal protection ensures files cannot be accessed outside storage areas
|
|
- File age validation helps prevent processing of potentially corrupted old files
|