7.6 KiB
7.6 KiB
TaskTracker: Simplified 2-Key Design
Overview
Universal task execution tracking for all queue types (partner_tasks, jobs, notifications) with a simplified 2-key design instead of the traditional 3-key approach.
Architecture Decision: 2 Keys vs 3 Keys
Traditional Approach (3 Keys)
{
taskId: "partner_tasks:SATLOC:695d:02220710", // Business identity
idempotencyKey: "a1b2c3d4-...", // Execution identity
correlationId: "partner_tasks:SATLOC:695d:02220710" // Trace chain
}
Problems:
- Redundant: correlationId often same as taskId
- Complex: 3 fields to manage, more indexes needed
- Confusion: When to use which ID?
Simplified Approach (2 Keys) ✅
{
taskId: "partner_tasks:SATLOC:695d:02220710", // Business identity (stable)
executionId: "a1b2c3d4-e5f6-..." // Execution identity (unique)
}
// Note: No separate correlationId - taskId serves this purpose!
Benefits:
- ✅ Simpler: 2 keys instead of 3
- ✅ Same functionality: Deduplication + Idempotency + Tracing
- ✅ Better performance: Fewer indexes, faster queries
- ✅ Easier to understand: Clear separation of concerns
How It Works
1. Deduplication (Prevent Duplicate Enqueues)
Use taskId only:
const taskId = generateTaskId('dev_partner_tasks', message);
// => "partner_tasks:SATLOC:695d:02220710"
// Check if already queued/processing
const recentTask = await TaskTracker.findOne({
taskId,
status: { $in: ['queued', 'processing'] },
enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) }
});
if (recentTask) {
return; // Skip duplicate
}
2. Idempotency (Prevent Duplicate Processing)
Use taskId + executionId:
const executionId = generateExecutionId();
// => UUID v4: "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
// Atomic claim
const tracker = await TaskTracker.findOneAndUpdate(
{
taskId, // Same business task
executionId, // Specific execution attempt
status: { $in: ['queued', 'failed'] }
},
{ $set: { status: 'processing' } },
{ new: true }
);
if (!tracker) {
return; // Already processed by another worker
}
3. Tracing (Track Retry Chains)
Use taskId for tracing (no separate correlationId needed!):
// Find complete retry history - single query!
const retryChain = await TaskTracker.find({ taskId })
.sort({ createdAt: 1 })
.lean();
// Returns all attempts:
// [
// { executionId: "uuid-1", status: "failed", error: "timeout" },
// { executionId: "uuid-2", status: "failed", error: "timeout" },
// { executionId: "uuid-3", status: "completed" }
// ]
Why no correlationId? The taskId itself correlates all retries! When you retry from DLQ:
- Keep same
taskId(business identity preserved) - Generate new
executionId(new execution attempt) - Query
{ taskId }returns entire chain automatically
Files
| File | Purpose |
|---|---|
model/task_tracker.js |
TaskTracker model with 2-key design |
services/task_id_generator.js |
Generate taskId and executionId |
tests/test_task_tracker_2key.js |
Test script demonstrating 2-key design |
Usage Example
At Enqueue Time
const { generateTaskId, generateExecutionId } = require('./services/task_id_generator');
const TaskTracker = require('./model/task_tracker');
// Generate IDs
const taskId = generateTaskId('dev_partner_tasks', message);
const executionId = generateExecutionId();
// Deduplication check
const existing = await TaskTracker.findOne({
taskId,
status: { $in: ['queued', 'processing'] },
enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) }
});
if (existing) {
console.log('Task already queued, skipping');
return;
}
// Create tracker
await TaskTracker.create({
taskId,
executionId,
queueName: 'dev_partner_tasks',
status: 'queued',
metadata: {
partnerCode: message.partnerCode,
aircraftId: message.aircraftId,
logId: message.logId,
customerId: message.customerId
}
});
// Enqueue message with IDs
await channel.sendToQueue(queueName, Buffer.from(JSON.stringify({
...message,
taskId,
executionId
})), {
persistent: true,
headers: { taskId, executionId }
});
In Worker
// Extract from message
const { taskId, executionId } = JSON.parse(msg.content.toString());
// Atomic claim (idempotency)
const tracker = await TaskTracker.findOneAndUpdate(
{ taskId, executionId, status: { $in: ['queued', 'failed'] } },
{ $set: { status: 'processing', processingStartedAt: new Date() } },
{ new: true }
);
if (!tracker) {
console.log('Already processed, skipping');
channel.ack(msg);
return;
}
try {
// Process task
await processTask(message);
// Mark completed
await TaskTracker.updateOne(
{ executionId },
{ $set: { status: 'completed', completedAt: new Date() } }
);
channel.ack(msg);
} catch (error) {
if (tracker.canRetry()) {
await TaskTracker.updateOne(
{ executionId },
{
$set: { status: 'failed', errorMessage: error.message },
$inc: { retryCount: 1 }
}
);
channel.nack(msg, false, true); // Requeue
} else {
await TaskTracker.updateOne(
{ executionId },
{ $set: { status: 'dlq', errorMessage: error.message } }
);
channel.nack(msg, false, false); // Send to DLQ
}
}
Tracing Full History
// Find all attempts for a task (automatic correlation via taskId!)
const history = await TaskTracker.find({ taskId })
.sort({ createdAt: 1 })
.lean();
history.forEach((execution, index) => {
console.log(`Attempt ${index + 1}:`);
console.log(` executionId: ${execution.executionId}`);
console.log(` status: ${execution.status}`);
console.log(` error: ${execution.errorMessage || 'N/A'}`);
console.log(` created: ${execution.createdAt}`);
});
Testing
# Run test script
node tests/test_task_tracker_2key.js
# Expected output:
# ✓ taskId generation
# ✓ executionId generation (unique per attempt)
# ✓ Deduplication works
# ✓ Idempotency works (atomic claim)
# ✓ Retry chain tracing (no correlationId needed!)
Migration from PartnerLogTracker
Phase 1: Parallel Tracking
- Deploy TaskTracker alongside existing PartnerLogTracker
- Workers update both systems
- Validate consistency
Phase 2: Switch Reads
- APIs query TaskTracker instead of PartnerLogTracker
- Workers still update both
- Monitor performance
Phase 3: Deprecate Old System
- Workers stop updating PartnerLogTracker
- Archive old data
- Remove old model and indexes
Benefits Summary
| Feature | Old (3 keys) | New (2 keys) |
|---|---|---|
| Keys | taskId + idempotencyKey + correlationId | taskId + executionId |
| Indexes | 9+ | 6 |
| Deduplication | Manual logic | Built-in via taskId |
| Idempotency | Complex checks | Atomic via taskId + executionId |
| Tracing | Separate correlationId | Automatic via taskId |
| Query Performance | Slower (more fields) | Faster (fewer fields) |
| Code Complexity | Higher | Lower |
| Universal | No (partner_tasks only) | Yes (all queue types) |
Key Insight
The taskId IS the correlationId! There's no need for a separate field because:
- All retries share the same
taskId(business identity) - Each retry has unique
executionId(execution identity) - Query
{ taskId }naturally returns the entire retry chain
This reduces complexity while maintaining full functionality.
Status: Implementation complete
Next Steps: Test with real partner_tasks, then roll out to other queues
Documentation: See PARTNER_TASK_DATA_FLOW_ANALYSIS.md