3.9 KiB
3.9 KiB
Partner Sync Worker - Task Data Flow Verification
Task Data Flow Analysis ✅
1. UPLOAD_PARTNER_JOB Task
Enqueued Data (from controllers/job.js):
{
assignId: assignment._id.toString(),
jobId: assignment.job._id.toString(),
partnerCode: partnerCode,
customerId: customerId,
partnerAircraftId: assignment.getPartnerAircraftId()
}
Processing (processPartnerJobUpload):
- ✅
taskData.assignId→ Passed topartnerSyncService.uploadJobToPartner() - ✅ Other fields available for debugging/logging but not needed by service
- ✅ Service fetches assignment data internally, so minimal task data required
2. PROCESS_PARTNER_LOG Task
Enqueued Data (from partner_data_polling_worker.js):
{
customerId: group.customerId,
partnerCode: group.partnerCode,
aircraftId: aircraftId,
logId: logInfo.id,
logFileName: logInfo.logFileName,
uploadedDate: logInfo.uploadedDate,
localFilePath: downloadedPath,
assignments: group.assignments.filter(...),
trackerFilter: filter
}
Processing (processPartnerLog):
Database Operations:
- ✅
taskData.trackerFilter→ Used for atomic claiming operations - ✅ Fallback filter built from:
logId,partnerCode,aircraftId,customerId
Context Data Building:
contextData = {
// Core file info
fileName: taskData.logFileName, ✅
fileSize: fileStats.size, ✅
uploadedDate: taskData.uploadedDate, ✅
// Task info (ALL fields now included)
taskInfo: {
source: 'partner_sync',
partnerCode: taskData.partnerCode, ✅
aircraftId: taskData.aircraftId, ✅
customerId: taskData.customerId, ✅ NEW
logId: taskData.logId, ✅
logFileName: taskData.logFileName, ✅
fileSize: fileStats.size, ✅
uploadedDate: taskData.uploadedDate, ✅
processingTimestamp: new Date(), ✅
trackerFilter: taskData.trackerFilter ✅ NEW
},
// Job matching data
assignments: taskData.assignments, ✅ NEW
// Additional metadata
meta: {
enqueuedFrom: 'partner_data_polling_worker', ✅ NEW
taskType: 'PROCESS_PARTNER_LOG', ✅ NEW
localFilePath: taskData.localFilePath ✅ NEW
}
}
3. Error Handling & Logging
Enhanced Error Context:
// Task failed logging
pino.error({
err: error,
taskMsg: {
logFileName: taskMsg.logFileName, ✅
type: taskMsg.type, ✅
customerId: taskMsg.customerId, ✅ NEW
partnerCode: taskMsg.partnerCode, ✅ NEW
aircraftId: taskMsg.aircraftId ✅ NEW
}
}, 'Partner task failed');
// DLQ warning logging
pino.warn({
logFileName: taskMsg.logFileName, ✅
customerId: taskMsg.customerId, ✅
partnerCode: taskMsg.partnerCode, ✅
aircraftId: taskMsg.aircraftId, ✅
retryCount: taskMsg.retryCount, ✅
lastError: error.message, ✅
isRedelivered ✅
}, 'Task exceeded max retries, sending to dead letter queue');
4. Bug Fixes Applied
Fixed: Database Filter Mismatch
Before:
const filter = {
partnerAircraftId: taskMsg.aircraftId // ❌ Wrong field name
};
After:
const filter = {
aircraftId: taskMsg.aircraftId // ✅ Matches PartnerLogTracker model
};
Summary ✅
All enqueued task data is now properly passed down through the processing pipeline:
- Complete Data Passing - All available task data fields are included in context
- Enhanced Job Matching - Assignment data now available to SatLocApplicationProcessor
- Improved Debugging - Comprehensive logging with all task context
- Database Consistency - Fixed field name mismatches
- Atomic Operations - Tracker filters passed correctly for database operations
Data Flow Verification: COMPLETE ✅