agmission/Development/server/docs/PARTNER_LOG_FILE_PROCESSING.md

16 KiB

Partner Log File Processing

This document describes how partner log files are discovered, downloaded, and processed in the AgMission partner integration system.

Last updated: February 2026. For full architecture diagrams see PARTNER_INTEGRATION_ARCHITECTURE.md.

Overview

Partner log files flow through a three-stage pipeline split across two workers:

  1. Discovery & Downloadpartner_data_polling_worker.js (cron-driven)
  2. Queue handoffPROCESS_PARTNER_LOG task to partner_tasks queue
  3. Parsing & Storagepartner_sync_worker.js processes the local file

Components

1. Partner Data Polling Worker (workers/partner_data_polling_worker.js)

  • Cron schedule: every 15 min (prod) / 1 min (dev)
  • Queries JobAssign where status = UPLOADED to find jobs awaiting log data
  • Calls partnerService.getAircraftLogs(customerId, aircraftId) to list available logs
  • Filters out already-processed logs via PartnerLogTracker (processed = true)
  • Downloads each new log via partnerService.getAircraftLogData(customerId, logId) — returns binary buffer
  • Saves binary to SATLOC_STORAGE_PATH/{logFileName}
  • Updates PartnerLogTracker: PENDING → DOWNLOADING → DOWNLOADED
  • Enqueues PROCESS_PARTNER_LOG task to partner_tasks queue

2. Partner Log Tracker (model/partner_log_tracker.js)

Tracks per-log-file lifecycle atomically to prevent duplicate processing:

PENDING → DOWNLOADING → DOWNLOADED → PROCESSING → PROCESSED / FAILED

Key fields: logId, partnerCode, aircraftId, customerId, logFileName, savedLocalFile, enqueuedAt, retryCount

3. Partner Sync Worker (workers/partner_sync_worker.js)

  • Consumes PROCESS_PARTNER_LOG tasks from partner_tasks queue
  • TaskTracker idempotency check — skips if already claimed by another worker instance
  • Atomically claims PartnerLogTrackerPROCESSING
  • Reads the saved local file from SATLOC_STORAGE_PATH
  • Parses with SatLocLogParser (helpers/satloc_log_parser.js)
  • Processes records with SatLocApplicationProcessor (helpers/satloc_application_processor.js)
  • Writes ApplicationFile + ApplicationDetail records to MongoDB
  • Sets PartnerLogTracker → PROCESSED
  • On unrecoverable error: nack → RabbitMQ routes to partner_tasks_failed DLQ

4. SatLocLogParser (helpers/satloc_log_parser.js)

  • Core binary parser supporting 43+ SatLoc record types
  • Parses .LOG binary files produced by SatLoc G4 devices
  • Memory-efficient streaming processing

5. SatLocApplicationProcessor (helpers/satloc_application_processor.js)

  • Converts parsed records to ApplicationDetail documents
  • Matches log data to JobAssign records via aircraft ID + time proximity
  • Handles multi-file grouping: one Application → many ApplicationFile → many ApplicationDetail

6. Partner Service Factory (helpers/partner_service_factory.js)

  • Singleton factory; creates and caches partner service instances
  • Provides fetchLogFile(logFileName, partnerCode) for manual/utility use
  • All workers get partner services through this factory

Process Flow

Cron trigger (15 min prod / 1 min dev)
  └─ Query JobAssign WHERE status = UPLOADED
  └─ Per aircraft: getAircraftLogs(customerId, aircraftId) → SatLoc API
  └─ Filter new logs (not in PartnerLogTracker with processed=true)
  └─ Per new log:
        ├─ Upsert PartnerLogTracker (PENDING)
        ├─ Claim → DOWNLOADING
        ├─ getAircraftLogData(customerId, logId) → binary Buffer
        ├─ Write to SATLOC_STORAGE_PATH/
        ├─ Update tracker → DOWNLOADED
        └─ Queue PROCESS_PARTNER_LOG {customerId, partnerCode, aircraftId, logId, logFileName, taskId, executionId}

partner_sync_worker consumes PROCESS_PARTNER_LOG:
  ├─ TaskTracker idempotency check (skip if claimed)
  ├─ Claim PartnerLogTracker → PROCESSING
  ├─ Read file from SATLOC_STORAGE_PATH
  ├─ SatLocLogParser.parse() → records[]
  ├─ SatLocApplicationProcessor.process() → ApplicationDetail[]
  ├─ Write to MongoDB
  ├─ PartnerLogTracker → PROCESSED
  └─ ack() message

Task Message Format

// PROCESS_PARTNER_LOG task payload (enqueued by polling worker)
{
  type: 'process_partner_log',
  data: {
    customerId: '<ObjectId string>',
    partnerCode: 'SATLOC',
    aircraftId: '<partnerAircraftId>',
    logId: '<partner log ID>',
    logFileName: '2507140724SatlocG4_b4ef.LOG',
    taskId: '<deterministic task ID>',      // for deduplication
    executionId: '<random execution ID>'    // for idempotency
  }
}

SatLoc Data Mapping

Core Position Data (Record Type 1)

  • GPS Coordinates: lat, lonApplicationDetail.lat/lon
  • Timestamps: timestampgpsTime (Unix epoch)
  • Motion: speed, track, altitudegrSpeed, head, alt
  • Spray status: sprayStat → boolean mapping

Environmental Records

  • Wind (Type 50): windSpeed, windDirectionwindSpd, windDir
  • Environment (Type 110): temperature, humiditytemp, humid

Flow & Application Data

  • Flow Monitor (Type 30): pressure, flowRatepsi, lminApp
  • Target Rates (Type 32): targetRatelminReq

File Type Support

Currently Supported

  • SatLoc .LOG files — binary format per Transland_SATLOC_Log_File_Formats_v3_76.md

Adding New Partner File Types

To add a second partner:

  1. Implement services/<partner>_service.js extending base_partner_service.js
  2. Register in helpers/partner_config.js
  3. Register in helpers/partner_service_factory.js
  4. The polling worker and sync worker will auto-discover via the factory

Error Handling

  • Download failure: tracker reset to FAILED; retried on next cron run (up to PARTNER_MAX_RETRIES)
  • Processing failure: nack → DLQ partner_tasks_failed; retry via /api/dlq/partner_tasks/retryAll
  • Stuck tasks: periodic cleanup in polling worker resets DOWNLOADING/DOWNLOADED/PROCESSING trackers that exceed timeout thresholds
  • Circuit breaker: sync worker blocks files that fail repeatedly (configurable; lenient in dev)

Environment Variables

SATLOC_STORAGE_PATH=/path/to/satloc/uploads  # where log files are stored
SATLOC_API_ENDPOINT=https://www.satloccloudfc.com/api/Satloc
SATLOC_API_TIMEOUT=30000
PARTNER_MAX_RETRIES=5
PARTNER_POLLING_ENABLED=true
PARTNER_SYNC_ENABLED=true

DLQ Recovery

# Retry all failed log processing tasks
POST /api/dlq/partner_tasks/retryAll

# Retry specific position
POST /api/dlq/partner_tasks/retryByPosition  { "position": 0 }

# Retry by partner code
POST /api/dlq/partner_tasks/retryByHeader  { "headerName": "x-partner-code", "headerValue": "SATLOC" }

See DLQ_API_REFERENCE.md for full DLQ documentation.

Enhanced Architecture

Components

  1. Partner Data Polling Worker (workers/partner_data_polling_worker.js)

    • Enhanced: Downloads log files using partnerService.downloadLogFile()
    • Enhanced: Stores files locally in partner-specific directories
    • Enhanced: Updates PartnerLogTracker with download status and file paths
    • Enhanced: Enqueues PROCESS_PARTNER_DATA_FILE tasks with local paths
  2. SatLocBinaryProcessor (helpers/satloc_binary_processor.js)

    • New: Wrapper around proven SatLocLogParser with enhanced statistics
    • Achievement: 100% parsing success rate (21,601/21,601 records)
    • Performance: < 2 seconds for 20MB+ files, < 100MB memory peak
    • Statistics: Comprehensive spray/environmental metrics calculation
  3. Partner Sync Worker (workers/partner_sync_worker.js)

    • Enhanced: Processes local partner log files using SatLocBinaryProcessor
    • Enhanced: Comprehensive statistics calculation and application metrics
    • Handles SatLoc binary log files and other partner formats
    • Saves enhanced application details and updates application status
  4. SatLocLogParser (helpers/satloc_log_parser.js)

    • Proven: Core binary parsing engine supporting 43+ record types
    • Battle-tested with comprehensive error handling
    • Memory-efficient streaming processing
    • Foundation for 100% parsing success rate
  5. Partner Service Factory (helpers/partner_service_factory.js)

    • Centralized factory for creating partner service instances
    • Provides unified downloadLogFile() method across all partners
    • Enhanced: Supports local file storage and tracking
  6. Partner Services (services/satloc_service.js, etc.)

    • Individual service classes for each partner
    • Enhanced: Implement downloadLogFile() for reliable file acquisition
    • Partner-specific API communication and file fetching logic
  7. Partner Configuration (helpers/partner_config.js)

    • Storage configuration for each partner
    • File path settings, extensions, and validation rules

Enhanced Process Flow

1. File Download and Storage Process (Partner Data Polling Worker)

  1. Discovery: Polling worker identifies new logs via partner API calls
  2. Download: Uses partnerService.downloadLogFile() to download base64 content
  3. Storage: Decodes and stores files in partner-specific directories
  4. Tracking: Updates PartnerLogTracker with local file path and download status
  5. Queuing: Enqueues PROCESS_PARTNER_DATA_FILE task with local file path

2. Binary Log Processing (Partner Sync Worker)

  1. Receive Task: Partner Sync Worker receives PROCESS_PARTNER_DATA_FILE task
  2. Binary Processing: Uses SatLocBinaryProcessor to parse local file with 100% success
  3. Statistics Calculation: Enhanced metrics including spray/environmental data
  4. Application Details: Extracts comprehensive application details and saves to database
  5. Status Updates: Updates ApplicationFile and Application status, PartnerLogTracker completion

3. Performance Achievements

  • Success Rate: 100% (21,601/21,601 valid records)
  • Processing Speed: < 2 seconds for 20MB+ binary files
  • Memory Efficiency: < 100MB peak memory usage
  • Record Coverage: 43+ supported SatLoc record types
  • Statistics Enhancement: Comprehensive spray and environmental metrics

Configuration

Same as before - see environment variables section below.

Environment Variables

Add the following environment variables for each partner:

# SatLoc Configuration
SATLOC_STORAGE_PATH=/data/partners/satloc
SATLOC_TEMP_PATH=/tmp/satloc
SATLOC_MAX_FILE_AGE=7776000000  # 90 days in milliseconds

# AGIDRONEX Configuration
AGIDRONEX_STORAGE_PATH=/data/partners/agidronex
AGIDRONEX_TEMP_PATH=/tmp/agidronex
AGIDRONEX_MAX_FILE_AGE=7776000000

Storage Directory Structure

/data/partners/
├── satloc/
│   ├── 2007281153SatlocG40010.log
│   ├── 2007281154SatlocG40010.log
│   └── ...
└── agidronex/
    ├── flight_log_001.log
    ├── flight_log_002.log
    └── ...

Usage Examples

1. Partner Log File Processing (Automatic)

Partner log files are automatically processed when uploaded as part of a job:

// When a job is uploaded with .log files, they are automatically:
// 1. Detected by the job worker during file classification
// 2. ApplicationFile record is created
// 3. Enqueued to partner sync worker for processing
// 4. Processed asynchronously by partner sync worker

// The process is transparent to the user - no manual intervention required

2. Manual Partner Log File Processing

To manually enqueue a partner data file for processing:

// In job worker context
const filePath = '/path/to/partner/logfile.log';
const fileId = applicationFile._id;
const applicationId = application._id;
const fileName = 'example.log';

const success = await enqueuePartnerDataFile(filePath, fileId, applicationId, fileName);
if (success) {
  console.log('Partner data file enqueued successfully');
} else {
  console.log('Failed to enqueue partner data file');
}

3. Partner Sync Worker Task Processing

The partner sync worker processes different types of tasks:

// Task types handled by partner sync worker
const taskTypes = {
  UPLOAD_PARTNER_JOB: 'upload_partner_job',           // Upload jobs to partner systems
  PROCESS_PARTNER_LOG: 'process_partner_log',         // Process logs from partner APIs  
  PROCESS_PARTNER_DATA_FILE: 'process_partner_data_file' // Process uploaded partner files
};

// Example task message for partner data file processing
const taskMessage = {
  type: 'process_partner_data_file',
  data: {
    filePath: '/tmp/uploads/example.log',
    fileId: '60a7c8d8f123456789abcdef',
    applicationId: '60a7c8d8f123456789abcde0',
    fileName: 'example.log',
    enqueuedAt: new Date()
  }
};

File Type Support

Currently Supported

  1. SatLoc Log Files (.log)
    • Binary format according to LOGFileFormat_Air_3_76.md specification
    • Parsed using SatLocLogParser class
    • Extracts position, GPS, flow, and application data

Adding New Partner File Types

To add support for new partner file formats:

  1. Add file extension detection in job_worker.js:
// In importData() function, file classification section
} else if ((match = basename.match(/^.*\.dat$/i))) {
  // New partner format
  const stats = fs.statSync(file);
  const m = stats && stats.mtime ? moment.utc(stats.mtime) : moment.utc();
  agn = m.format('YYMMDDHHmm').substring(1);
  typedFiles.push({ type: FILE.DATA_PARTNER_LOG, agn: agn, file: file });
  1. Update parser logic in readPartnerLogFile():
// Add new parser for different file extensions
if (fileName.endsWith('.log')) {
  // SatLoc format
  const parser = new SatLocLogParser(options);
  // ... existing logic
} else if (fileName.endsWith('.dat')) {
  // New partner format
  const parser = new NewPartnerParser(options);
  // ... new parsing logic
}
  1. Add partner configuration in partner_config.js:
storage: {
  basePath: env.NEWPARTNER_STORAGE_PATH || '/data/partners/newpartner',
  tempPath: env.NEWPARTNER_TEMP_PATH || '/tmp/newpartner',
  logFileExtensions: ['.dat', '.bin', '.txt'],
  maxFileAge: parseInt(env.NEWPARTNER_MAX_FILE_AGE) || 7776000000
}

Error Handling

The system includes comprehensive error handling:

  • File not found: Proper error messages when log files don't exist
  • Permission errors: Clear feedback on access issues
  • Invalid file extensions: Validation against allowed extensions
  • File age validation: Optional warnings for old files
  • Parser errors: Graceful handling of corrupted or invalid log files

Performance Considerations

  1. File Size Limits: Configure maxFileSize in partner configuration
  2. Batch Processing: Job worker processes files in batches of 1000 records
  3. Memory Management: Parser includes garbage collection hints
  4. Caching: File metadata cached to avoid repeated filesystem calls

Testing

To test the log file processing:

  1. Create test log files in partner storage directories

  2. Run syntax checks:

    node -c helpers/partner_service_factory.js
    node -c services/satloc_service.js
    node -c workers/job_worker.js
    
  3. Test file fetching:

    node -e "
    const factory = require('./helpers/partner_service_factory');
    factory.fetchLogFile('test.log', 'SATLOC').then(console.log).catch(console.error);
    "
    

Troubleshooting

Common Issues

  1. Storage path not found

    • Ensure PARTNER_STORAGE_PATH environment variables are set
    • Verify directory permissions
  2. File extension not supported

    • Check logFileExtensions configuration
    • Add new extensions as needed
  3. Parser errors

    • Verify file format matches expected partner specification
    • Check parser configuration options
  4. Memory issues with large files

    • Adjust batchSize in parser options
    • Monitor memory usage during processing

Security

  • File access is restricted to configured storage directories
  • File extension validation prevents processing of unauthorized file types
  • Path traversal protection ensures files cannot be accessed outside storage areas
  • File age validation helps prevent processing of potentially corrupted old files