agmission/Development/server/model/task_tracker.js

275 lines
6.8 KiB
JavaScript

'use strict';
const mongoose = require('mongoose'),
Schema = mongoose.Schema;
/**
* TaskTracker - Universal task execution tracking across all queue types
*
* Provides:
* - Deduplication: Prevents same task from being enqueued multiple times
* - Idempotency: Prevents same execution from being processed twice
* - Tracing: Links all retry attempts through shared taskId
* - Monitoring: Real-time status tracking and metrics
*
* Key Design:
* - taskId: Business identity (stable across retries)
* - executionId: Execution identity (unique per attempt)
* - No separate correlationId needed - taskId serves this purpose
*/
const TaskTrackerStatus = Object.freeze({
QUEUED: 'queued', // Task enqueued, waiting for processing
PROCESSING: 'processing', // Currently being processed by worker
COMPLETED: 'completed', // Successfully completed
FAILED: 'failed', // Failed but eligible for retry
DLQ: 'dlq', // Sent to Dead Letter Queue (max retries exceeded)
ARCHIVED: 'archived' // Archived from DLQ (manually or automatically)
});
const ErrorCategory = Object.freeze({
TRANSIENT: 'transient', // Network timeouts, temporary unavailability
VALIDATION: 'validation', // Invalid input, missing required fields
PROCESSING: 'processing', // Business logic errors, data inconsistencies
INFRASTRUCTURE: 'infrastructure', // Database, filesystem, resource errors
PARTNER_API: 'partner_api', // External partner API errors
UNKNOWN: 'unknown' // Unclassified errors
});
const taskTrackerSchema = new Schema({
// === Universal Task Identity ===
taskId: {
type: String,
required: true,
index: true,
trim: true
// Format: "{queueType}:{naturalKey}"
// Examples:
// "partner_tasks:SATLOC:695d:02220710"
// "jobs:12345:userId123:process"
// "notifications:userId456:EMAIL:8a3f9c2e"
},
// === Execution Identity (Idempotency) ===
executionId: {
type: String,
required: true,
unique: true,
index: true,
trim: true
// UUID v4 - unique per execution attempt
// New executionId generated for each retry
},
// === Queue Context ===
queueName: {
type: String,
required: true,
index: true,
trim: true
// e.g., 'dev_partner_tasks', 'dev_jobs', 'partner_tasks'
},
messageVersion: {
type: String,
default: '1.0',
trim: true
// Allows schema evolution without breaking existing messages
},
// === Processing Status ===
status: {
type: String,
enum: Object.values(TaskTrackerStatus),
default: TaskTrackerStatus.QUEUED,
required: true,
index: true
},
// === Lifecycle Timestamps ===
enqueuedAt: {
type: Date,
required: true,
default: Date.now,
index: true
},
processingStartedAt: {
type: Date,
index: true
// Used for stuck task detection
},
completedAt: {
type: Date
},
failedAt: {
type: Date
},
archivedAt: {
type: Date
},
// === Retry Logic ===
retryCount: {
type: Number,
default: 0,
index: true
},
maxRetries: {
type: Number,
default: 5
},
lastRetryAt: {
type: Date
},
// === Error Tracking ===
errorMessage: {
type: String
},
errorCategory: {
type: String,
enum: Object.values(ErrorCategory),
index: true
},
errorStack: {
type: String
},
// === Domain-Specific Data ===
metadata: {
type: Schema.Types.Mixed,
default: {}
// Flexible JSON for queue-specific fields:
// Partner tasks: { partnerCode, aircraftId, logId, customerId, logFileName }
// Jobs: { jobId, userId, jobType, assignId }
// Notifications: { userId, notificationType, recipientEmail }
},
// === Archival Info ===
archivedReason: {
type: String,
trim: true
// e.g., "Manual retry initiated", "Auto-archived after 7 days", "Non-recoverable error"
},
archivedBy: {
type: String,
trim: true
// User ID or system process that archived the task
},
// === Audit Trail ===
createdAt: {
type: Date,
default: Date.now,
index: true
},
updatedAt: {
type: Date,
default: Date.now
}
});
// === INDEXES ===
// Primary unique constraint: taskId + executionId (prevents duplicate processing)
taskTrackerSchema.index({ taskId: 1, executionId: 1 }, { unique: true });
// Deduplication check: Find recent tasks with same taskId
taskTrackerSchema.index({ taskId: 1, status: 1, enqueuedAt: -1 });
// Queue operations: Filter by queue and status
taskTrackerSchema.index({ queueName: 1, status: 1, enqueuedAt: -1 });
// Stuck task detection: Find PROCESSING tasks older than threshold
taskTrackerSchema.index({ status: 1, processingStartedAt: 1 });
// Error analysis: Group by error category
taskTrackerSchema.index({ errorCategory: 1, failedAt: -1 });
// Retry chain lookup: Find all executions for a taskId
taskTrackerSchema.index({ taskId: 1, createdAt: 1 });
// Monitoring: Count by status
taskTrackerSchema.index({ status: 1, updatedAt: -1 });
// === METHODS ===
// Update updatedAt on save
taskTrackerSchema.pre('save', function (next) {
this.updatedAt = new Date();
next();
});
// Instance method: Check if task can be retried
taskTrackerSchema.methods.canRetry = function() {
return this.retryCount < this.maxRetries;
};
// Instance method: Check if task is stuck
taskTrackerSchema.methods.isStuck = function(timeoutMs = 30 * 60 * 1000) {
if (this.status !== TaskTrackerStatus.PROCESSING) {
return false;
}
if (!this.processingStartedAt) {
return false;
}
return Date.now() - this.processingStartedAt.getTime() > timeoutMs;
};
// Static method: Find retry chain
taskTrackerSchema.statics.findRetryChain = async function(taskId) {
return this.find({ taskId })
.sort({ createdAt: 1 })
.lean();
};
// Static method: Find stuck tasks
taskTrackerSchema.statics.findStuckTasks = async function(queueName, timeoutMs = 30 * 60 * 1000) {
const threshold = new Date(Date.now() - timeoutMs);
return this.find({
queueName,
status: TaskTrackerStatus.PROCESSING,
processingStartedAt: { $lt: threshold }
}).lean();
};
// Static method: Get queue statistics
taskTrackerSchema.statics.getQueueStats = async function(queueName) {
const stats = await this.aggregate([
{ $match: { queueName } },
{ $group: {
_id: '$status',
count: { $sum: 1 }
}}
]);
const result = {
queued: 0,
processing: 0,
completed: 0,
failed: 0,
dlq: 0,
archived: 0
};
stats.forEach(stat => {
result[stat._id.toLowerCase()] = stat.count;
});
return result;
};
module.exports = mongoose.model('Task_Tracker', taskTrackerSchema);
module.exports.TaskTrackerStatus = TaskTrackerStatus;
module.exports.ErrorCategory = ErrorCategory;