275 lines
6.8 KiB
JavaScript
275 lines
6.8 KiB
JavaScript
'use strict';
|
|
|
|
const mongoose = require('mongoose'),
|
|
Schema = mongoose.Schema;
|
|
|
|
/**
|
|
* TaskTracker - Universal task execution tracking across all queue types
|
|
*
|
|
* Provides:
|
|
* - Deduplication: Prevents same task from being enqueued multiple times
|
|
* - Idempotency: Prevents same execution from being processed twice
|
|
* - Tracing: Links all retry attempts through shared taskId
|
|
* - Monitoring: Real-time status tracking and metrics
|
|
*
|
|
* Key Design:
|
|
* - taskId: Business identity (stable across retries)
|
|
* - executionId: Execution identity (unique per attempt)
|
|
* - No separate correlationId needed - taskId serves this purpose
|
|
*/
|
|
|
|
const TaskTrackerStatus = Object.freeze({
|
|
QUEUED: 'queued', // Task enqueued, waiting for processing
|
|
PROCESSING: 'processing', // Currently being processed by worker
|
|
COMPLETED: 'completed', // Successfully completed
|
|
FAILED: 'failed', // Failed but eligible for retry
|
|
DLQ: 'dlq', // Sent to Dead Letter Queue (max retries exceeded)
|
|
ARCHIVED: 'archived' // Archived from DLQ (manually or automatically)
|
|
});
|
|
|
|
const ErrorCategory = Object.freeze({
|
|
TRANSIENT: 'transient', // Network timeouts, temporary unavailability
|
|
VALIDATION: 'validation', // Invalid input, missing required fields
|
|
PROCESSING: 'processing', // Business logic errors, data inconsistencies
|
|
INFRASTRUCTURE: 'infrastructure', // Database, filesystem, resource errors
|
|
PARTNER_API: 'partner_api', // External partner API errors
|
|
UNKNOWN: 'unknown' // Unclassified errors
|
|
});
|
|
|
|
const taskTrackerSchema = new Schema({
|
|
// === Universal Task Identity ===
|
|
taskId: {
|
|
type: String,
|
|
required: true,
|
|
index: true,
|
|
trim: true
|
|
// Format: "{queueType}:{naturalKey}"
|
|
// Examples:
|
|
// "partner_tasks:SATLOC:695d:02220710"
|
|
// "jobs:12345:userId123:process"
|
|
// "notifications:userId456:EMAIL:8a3f9c2e"
|
|
},
|
|
|
|
// === Execution Identity (Idempotency) ===
|
|
executionId: {
|
|
type: String,
|
|
required: true,
|
|
unique: true,
|
|
index: true,
|
|
trim: true
|
|
// UUID v4 - unique per execution attempt
|
|
// New executionId generated for each retry
|
|
},
|
|
|
|
// === Queue Context ===
|
|
queueName: {
|
|
type: String,
|
|
required: true,
|
|
index: true,
|
|
trim: true
|
|
// e.g., 'dev_partner_tasks', 'dev_jobs', 'partner_tasks'
|
|
},
|
|
|
|
messageVersion: {
|
|
type: String,
|
|
default: '1.0',
|
|
trim: true
|
|
// Allows schema evolution without breaking existing messages
|
|
},
|
|
|
|
// === Processing Status ===
|
|
status: {
|
|
type: String,
|
|
enum: Object.values(TaskTrackerStatus),
|
|
default: TaskTrackerStatus.QUEUED,
|
|
required: true,
|
|
index: true
|
|
},
|
|
|
|
// === Lifecycle Timestamps ===
|
|
enqueuedAt: {
|
|
type: Date,
|
|
required: true,
|
|
default: Date.now,
|
|
index: true
|
|
},
|
|
|
|
processingStartedAt: {
|
|
type: Date,
|
|
index: true
|
|
// Used for stuck task detection
|
|
},
|
|
|
|
completedAt: {
|
|
type: Date
|
|
},
|
|
|
|
failedAt: {
|
|
type: Date
|
|
},
|
|
|
|
archivedAt: {
|
|
type: Date
|
|
},
|
|
|
|
// === Retry Logic ===
|
|
retryCount: {
|
|
type: Number,
|
|
default: 0,
|
|
index: true
|
|
},
|
|
|
|
maxRetries: {
|
|
type: Number,
|
|
default: 5
|
|
},
|
|
|
|
lastRetryAt: {
|
|
type: Date
|
|
},
|
|
|
|
// === Error Tracking ===
|
|
errorMessage: {
|
|
type: String
|
|
},
|
|
|
|
errorCategory: {
|
|
type: String,
|
|
enum: Object.values(ErrorCategory),
|
|
index: true
|
|
},
|
|
|
|
errorStack: {
|
|
type: String
|
|
},
|
|
|
|
// === Domain-Specific Data ===
|
|
metadata: {
|
|
type: Schema.Types.Mixed,
|
|
default: {}
|
|
// Flexible JSON for queue-specific fields:
|
|
// Partner tasks: { partnerCode, aircraftId, logId, customerId, logFileName }
|
|
// Jobs: { jobId, userId, jobType, assignId }
|
|
// Notifications: { userId, notificationType, recipientEmail }
|
|
},
|
|
|
|
// === Archival Info ===
|
|
archivedReason: {
|
|
type: String,
|
|
trim: true
|
|
// e.g., "Manual retry initiated", "Auto-archived after 7 days", "Non-recoverable error"
|
|
},
|
|
|
|
archivedBy: {
|
|
type: String,
|
|
trim: true
|
|
// User ID or system process that archived the task
|
|
},
|
|
|
|
// === Audit Trail ===
|
|
createdAt: {
|
|
type: Date,
|
|
default: Date.now,
|
|
index: true
|
|
},
|
|
|
|
updatedAt: {
|
|
type: Date,
|
|
default: Date.now
|
|
}
|
|
});
|
|
|
|
// === INDEXES ===
|
|
|
|
// Primary unique constraint: taskId + executionId (prevents duplicate processing)
|
|
taskTrackerSchema.index({ taskId: 1, executionId: 1 }, { unique: true });
|
|
|
|
// Deduplication check: Find recent tasks with same taskId
|
|
taskTrackerSchema.index({ taskId: 1, status: 1, enqueuedAt: -1 });
|
|
|
|
// Queue operations: Filter by queue and status
|
|
taskTrackerSchema.index({ queueName: 1, status: 1, enqueuedAt: -1 });
|
|
|
|
// Stuck task detection: Find PROCESSING tasks older than threshold
|
|
taskTrackerSchema.index({ status: 1, processingStartedAt: 1 });
|
|
|
|
// Error analysis: Group by error category
|
|
taskTrackerSchema.index({ errorCategory: 1, failedAt: -1 });
|
|
|
|
// Retry chain lookup: Find all executions for a taskId
|
|
taskTrackerSchema.index({ taskId: 1, createdAt: 1 });
|
|
|
|
// Monitoring: Count by status
|
|
taskTrackerSchema.index({ status: 1, updatedAt: -1 });
|
|
|
|
// === METHODS ===
|
|
|
|
// Update updatedAt on save
|
|
taskTrackerSchema.pre('save', function (next) {
|
|
this.updatedAt = new Date();
|
|
next();
|
|
});
|
|
|
|
// Instance method: Check if task can be retried
|
|
taskTrackerSchema.methods.canRetry = function() {
|
|
return this.retryCount < this.maxRetries;
|
|
};
|
|
|
|
// Instance method: Check if task is stuck
|
|
taskTrackerSchema.methods.isStuck = function(timeoutMs = 30 * 60 * 1000) {
|
|
if (this.status !== TaskTrackerStatus.PROCESSING) {
|
|
return false;
|
|
}
|
|
if (!this.processingStartedAt) {
|
|
return false;
|
|
}
|
|
return Date.now() - this.processingStartedAt.getTime() > timeoutMs;
|
|
};
|
|
|
|
// Static method: Find retry chain
|
|
taskTrackerSchema.statics.findRetryChain = async function(taskId) {
|
|
return this.find({ taskId })
|
|
.sort({ createdAt: 1 })
|
|
.lean();
|
|
};
|
|
|
|
// Static method: Find stuck tasks
|
|
taskTrackerSchema.statics.findStuckTasks = async function(queueName, timeoutMs = 30 * 60 * 1000) {
|
|
const threshold = new Date(Date.now() - timeoutMs);
|
|
return this.find({
|
|
queueName,
|
|
status: TaskTrackerStatus.PROCESSING,
|
|
processingStartedAt: { $lt: threshold }
|
|
}).lean();
|
|
};
|
|
|
|
// Static method: Get queue statistics
|
|
taskTrackerSchema.statics.getQueueStats = async function(queueName) {
|
|
const stats = await this.aggregate([
|
|
{ $match: { queueName } },
|
|
{ $group: {
|
|
_id: '$status',
|
|
count: { $sum: 1 }
|
|
}}
|
|
]);
|
|
|
|
const result = {
|
|
queued: 0,
|
|
processing: 0,
|
|
completed: 0,
|
|
failed: 0,
|
|
dlq: 0,
|
|
archived: 0
|
|
};
|
|
|
|
stats.forEach(stat => {
|
|
result[stat._id.toLowerCase()] = stat.count;
|
|
});
|
|
|
|
return result;
|
|
};
|
|
|
|
module.exports = mongoose.model('Task_Tracker', taskTrackerSchema);
|
|
module.exports.TaskTrackerStatus = TaskTrackerStatus;
|
|
module.exports.ErrorCategory = ErrorCategory;
|