agmission/Development/server/docs/archived/TASK_TRACKER_2KEY_DESIGN.md

7.6 KiB

TaskTracker: Simplified 2-Key Design

Overview

Universal task execution tracking for all queue types (partner_tasks, jobs, notifications) with a simplified 2-key design instead of the traditional 3-key approach.

Architecture Decision: 2 Keys vs 3 Keys

Traditional Approach (3 Keys)

{
  taskId: "partner_tasks:SATLOC:695d:02220710",        // Business identity
  idempotencyKey: "a1b2c3d4-...",                       // Execution identity
  correlationId: "partner_tasks:SATLOC:695d:02220710"  // Trace chain
}

Problems:

  • Redundant: correlationId often same as taskId
  • Complex: 3 fields to manage, more indexes needed
  • Confusion: When to use which ID?

Simplified Approach (2 Keys)

{
  taskId: "partner_tasks:SATLOC:695d:02220710",  // Business identity (stable)
  executionId: "a1b2c3d4-e5f6-..."               // Execution identity (unique)
}
// Note: No separate correlationId - taskId serves this purpose!

Benefits:

  • Simpler: 2 keys instead of 3
  • Same functionality: Deduplication + Idempotency + Tracing
  • Better performance: Fewer indexes, faster queries
  • Easier to understand: Clear separation of concerns

How It Works

1. Deduplication (Prevent Duplicate Enqueues)

Use taskId only:

const taskId = generateTaskId('dev_partner_tasks', message);
// => "partner_tasks:SATLOC:695d:02220710"

// Check if already queued/processing
const recentTask = await TaskTracker.findOne({
  taskId,
  status: { $in: ['queued', 'processing'] },
  enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) }
});

if (recentTask) {
  return; // Skip duplicate
}

2. Idempotency (Prevent Duplicate Processing)

Use taskId + executionId:

const executionId = generateExecutionId();
// => UUID v4: "a1b2c3d4-e5f6-7890-abcd-ef1234567890"

// Atomic claim
const tracker = await TaskTracker.findOneAndUpdate(
  { 
    taskId,           // Same business task
    executionId,      // Specific execution attempt
    status: { $in: ['queued', 'failed'] }
  },
  { $set: { status: 'processing' } },
  { new: true }
);

if (!tracker) {
  return; // Already processed by another worker
}

3. Tracing (Track Retry Chains)

Use taskId for tracing (no separate correlationId needed!):

// Find complete retry history - single query!
const retryChain = await TaskTracker.find({ taskId })
  .sort({ createdAt: 1 })
  .lean();

// Returns all attempts:
// [
//   { executionId: "uuid-1", status: "failed", error: "timeout" },
//   { executionId: "uuid-2", status: "failed", error: "timeout" },
//   { executionId: "uuid-3", status: "completed" }
// ]

Why no correlationId? The taskId itself correlates all retries! When you retry from DLQ:

  • Keep same taskId (business identity preserved)
  • Generate new executionId (new execution attempt)
  • Query { taskId } returns entire chain automatically

Files

File Purpose
model/task_tracker.js TaskTracker model with 2-key design
services/task_id_generator.js Generate taskId and executionId
tests/test_task_tracker_2key.js Test script demonstrating 2-key design

Usage Example

At Enqueue Time

const { generateTaskId, generateExecutionId } = require('./services/task_id_generator');
const TaskTracker = require('./model/task_tracker');

// Generate IDs
const taskId = generateTaskId('dev_partner_tasks', message);
const executionId = generateExecutionId();

// Deduplication check
const existing = await TaskTracker.findOne({
  taskId,
  status: { $in: ['queued', 'processing'] },
  enqueuedAt: { $gt: new Date(Date.now() - 5 * 60000) }
});

if (existing) {
  console.log('Task already queued, skipping');
  return;
}

// Create tracker
await TaskTracker.create({
  taskId,
  executionId,
  queueName: 'dev_partner_tasks',
  status: 'queued',
  metadata: {
    partnerCode: message.partnerCode,
    aircraftId: message.aircraftId,
    logId: message.logId,
    customerId: message.customerId
  }
});

// Enqueue message with IDs
await channel.sendToQueue(queueName, Buffer.from(JSON.stringify({
  ...message,
  taskId,
  executionId
})), {
  persistent: true,
  headers: { taskId, executionId }
});

In Worker

// Extract from message
const { taskId, executionId } = JSON.parse(msg.content.toString());

// Atomic claim (idempotency)
const tracker = await TaskTracker.findOneAndUpdate(
  { taskId, executionId, status: { $in: ['queued', 'failed'] } },
  { $set: { status: 'processing', processingStartedAt: new Date() } },
  { new: true }
);

if (!tracker) {
  console.log('Already processed, skipping');
  channel.ack(msg);
  return;
}

try {
  // Process task
  await processTask(message);
  
  // Mark completed
  await TaskTracker.updateOne(
    { executionId },
    { $set: { status: 'completed', completedAt: new Date() } }
  );
  
  channel.ack(msg);
  
} catch (error) {
  if (tracker.canRetry()) {
    await TaskTracker.updateOne(
      { executionId },
      { 
        $set: { status: 'failed', errorMessage: error.message },
        $inc: { retryCount: 1 }
      }
    );
    channel.nack(msg, false, true); // Requeue
  } else {
    await TaskTracker.updateOne(
      { executionId },
      { $set: { status: 'dlq', errorMessage: error.message } }
    );
    channel.nack(msg, false, false); // Send to DLQ
  }
}

Tracing Full History

// Find all attempts for a task (automatic correlation via taskId!)
const history = await TaskTracker.find({ taskId })
  .sort({ createdAt: 1 })
  .lean();

history.forEach((execution, index) => {
  console.log(`Attempt ${index + 1}:`);
  console.log(`  executionId: ${execution.executionId}`);
  console.log(`  status: ${execution.status}`);
  console.log(`  error: ${execution.errorMessage || 'N/A'}`);
  console.log(`  created: ${execution.createdAt}`);
});

Testing

# Run test script
node tests/test_task_tracker_2key.js

# Expected output:
# ✓ taskId generation
# ✓ executionId generation (unique per attempt)
# ✓ Deduplication works
# ✓ Idempotency works (atomic claim)
# ✓ Retry chain tracing (no correlationId needed!)

Migration from PartnerLogTracker

Phase 1: Parallel Tracking

  • Deploy TaskTracker alongside existing PartnerLogTracker
  • Workers update both systems
  • Validate consistency

Phase 2: Switch Reads

  • APIs query TaskTracker instead of PartnerLogTracker
  • Workers still update both
  • Monitor performance

Phase 3: Deprecate Old System

  • Workers stop updating PartnerLogTracker
  • Archive old data
  • Remove old model and indexes

Benefits Summary

Feature Old (3 keys) New (2 keys)
Keys taskId + idempotencyKey + correlationId taskId + executionId
Indexes 9+ 6
Deduplication Manual logic Built-in via taskId
Idempotency Complex checks Atomic via taskId + executionId
Tracing Separate correlationId Automatic via taskId
Query Performance Slower (more fields) Faster (fewer fields)
Code Complexity Higher Lower
Universal No (partner_tasks only) Yes (all queue types)

Key Insight

The taskId IS the correlationId! There's no need for a separate field because:

  • All retries share the same taskId (business identity)
  • Each retry has unique executionId (execution identity)
  • Query { taskId } naturally returns the entire retry chain

This reduces complexity while maintaining full functionality.


Status: Implementation complete
Next Steps: Test with real partner_tasks, then roll out to other queues
Documentation: See PARTNER_TASK_DATA_FLOW_ANALYSIS.md