agmission/Development/server/scripts/copyCollection.js

913 lines
34 KiB
JavaScript

'use strict';
/**
* Generic Collection Copy Worker
*
* This script copies documents from one MongoDB collection to another based o } else if (arg.startsWith('--filter-date=') || arg === '-d') {
parsed.filterDate = arg.startsWith('--filter-date=') ? arg.split('=')[1] : args[++i];
} else if (arg.startsWith('--end-date=') || arg === '-e') {
parsed.endDate = arg.startsWith('--end-date=') ? arg.split('=')[1] : args[++i];
} else if (arg.startsWith('--batch-size=') || arg === '-b') {ate filtering.
* It uses ObjectId timestamp filtering for documents created from a specific date.
*
* Key Features:
* - Works with any MongoDB collection (configurable source/target)
* - Can use Mongoose models or direct collection access
* - Filters documents by ObjectId timestamp (more reliable than date fields)
* - Uses batch processing with configurable batch sizes (default 1000 documents)
* - Implements bulk upsert operations for efficient data transfer
* - Provides detailed progress tracking and statistics
* - Supports dry-run mode for testing
* - Handles duplicate documents gracefully with upsert operations
* - Implements robust error handling with retry logic
* - Command-line argument support for easier scripting
* - Supports end-date filtering for safety with live data
* - Allows checkpoint saving for resumable operations
*
* Usage Examples:
* # Copy application details (default behavior for backward compatibility)
* DEBUG=agm:copy-collection node workers/copyApplicationDetails.js
*
* # Copy users collection using environment variables
* DEBUG=agm:copy-collection SOURCE_COLLECTION=users TARGET_COLLECTION=users_backup SOURCE_MODEL=user node workers/copyApplicationDetails.js
*
* # Copy any collection without model (direct collection access)
* DEBUG=agm:copy-collection SOURCE_COLLECTION=jobs TARGET_COLLECTION=jobs_archive USE_MODEL=false node workers/copyApplicationDetails.js
*
* # Copy with custom date filter using environment variables
* DEBUG=agm:copy-collection FILTER_DATE="2025-01-01" SOURCE_COLLECTION=invoices TARGET_COLLECTION=invoices_2025 node workers/copyApplicationDetails.js
*
* # Copy using command-line arguments (overrides environment variables)
* node workers/copyApplicationDetails.js --source=users --target=users_backup --model=user --filter-date="2024-01-01"
*
* # Copy in dry-run mode with command-line arguments
* node workers/copyApplicationDetails.js --source=logs --target=logs_backup --dry-run --verbose
*
* # Generic collection copying (same functionality, clearer naming)
* node workers/copyApplicationDetails.js --source=any_collection --target=backup_collection --batch-size=500
*
* Command-line Arguments (override environment variables):
* --source Source collection name
* --target Target collection name
* --model Source model file name (without .js extension)
* --filter-date Date to filter from (YYYY-MM-DD format or ISO string)
* --end-date Date to filter to (YYYY-MM-DD format or ISO string) - for safety with live data
* --batch-size Number of documents per batch
* --dry-run Only show what would be copied without making changes
* --no-model Use direct collection access instead of Mongoose model
* --verbose, -v Enable detailed logging
* --checkpoint-file File to save/load processing checkpoint (for resumable operations)
* --resume Resume from last checkpoint/execution (auto-detect last processed ID)
* --last-run Show when the script was last executed
* --status Show current execution status and progress
* --help, -h Show this help message
*
* Environment Variables:
* - DRY_RUN=true # Only logs what would happen without making changes
* - FILTER_DATE="2025-05-06" # Date to filter from (default: May 06, 2025)
* - END_DATE # Date to filter to (optional, for safety with live data)
* - BATCH_SIZE=1000 # Number of documents per batch (default: 1000)
* - SOURCE_COLLECTION="application_details" # Source collection name
* - TARGET_COLLECTION="application_details_old" # Target collection name
* - SOURCE_MODEL="application_detail" # Source model file name (without .js)
* - USE_MODEL=true # Whether to use Mongoose model or direct collection access (default: true)
* - MAX_RETRIES=3 # Maximum number of retries for errors (default: 3)
* - RETRY_DELAY=1000 # Base delay in ms between retries (default: 1000)
* - SHOW_PROGRESS=true # Whether to show progress indicator (default: true)
* - CONNECTION_REFRESH_INTERVAL=20 # Refresh connection every N batches (default: 20)
* - QUERY_TIMEOUT_MS=30000 # Query timeout in milliseconds (default: 30000)
* - VERBOSE=false # Enable verbose logging for debugging (default: false)
* - CHECKPOINT_FILE # File to save/load processing checkpoint
*/
const debug = require('debug')('agm:copy-collection');
const env = require('../helpers/env.js');
const dbConn = require('../helpers/db/connect.js')();
const mongoose = require('mongoose');
const utils = require('../helpers/utils.js');
/**
* Parse command-line arguments
* @returns {Object} Parsed arguments object
*/
function parseArguments() {
const args = process.argv.slice(2);
const parsed = {};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg === '--help' || arg === '-h') {
console.log(`
Generic Collection Copy Worker
Usage: node copyApplicationDetails.js [options]
Options:
--source <collection> Source collection name
--target <collection> Target collection name
--model <model> Source model file name (without .js extension)
--filter-date <date> Date to filter from (YYYY-MM-DD format or ISO string)
--end-date <date> Date to filter to (YYYY-MM-DD format or ISO string) - for safety with live data
--batch-size <size> Number of documents per batch (default: 1000)
--dry-run Only show what would be copied without making changes
--no-model Use direct collection access instead of Mongoose model
--verbose, -v Enable detailed logging
--checkpoint-file <file> File to save/load processing checkpoint (for resumable operations)
--resume Resume from last checkpoint/execution (auto-detect last processed ID)
--last-run Show when the script was last executed
--status Show current execution status and progress
--help, -h Show this help message
Environment Variables:
DRY_RUN, FILTER_DATE, END_DATE, BATCH_SIZE, SOURCE_COLLECTION, TARGET_COLLECTION,
SOURCE_MODEL, USE_MODEL, MAX_RETRIES, RETRY_DELAY, SHOW_PROGRESS,
CONNECTION_REFRESH_INTERVAL, QUERY_TIMEOUT_MS, VERBOSE, CHECKPOINT_FILE
Examples:
# Copy application details with default settings
node copyApplicationDetails.js
# Copy users to backup with command-line args
node copyApplicationDetails.js --source=users --target=users_backup --model=user
# Copy in dry-run mode with verbose logging
node copyApplicationDetails.js --source=logs --target=logs_backup --dry-run --verbose
# Check when script was last executed
node copyApplicationDetails.js --last-run
# Check current execution status
node copyApplicationDetails.js --status
# Resume from last checkpoint (auto-detect where to continue)
node copyApplicationDetails.js --resume --verbose
`);
process.exit(0);
}
if (arg === '--last-run') {
parsed.showLastRun = true;
} else if (arg === '--status') {
parsed.showStatus = true;
} else if (arg === '--resume') {
parsed.resume = true;
} else if (arg.startsWith('--source=')) {
parsed.sourceCollection = arg.split('=')[1];
} else if (arg.startsWith('--target=')) {
parsed.targetCollection = arg.split('=')[1];
} else if (arg.startsWith('--model=')) {
parsed.sourceModel = arg.split('=')[1];
} else if (arg.startsWith('--filter-date=')) {
parsed.filterDate = arg.split('=')[1];
} else if (arg.startsWith('--end-date=')) {
parsed.endDate = arg.split('=')[1];
} else if (arg.startsWith('--batch-size=')) {
const value = arg.split('=')[1];
parsed.batchSize = parseInt(value, 10);
} else if (arg === '--dry-run') {
parsed.dryRun = true;
} else if (arg === '--no-model') {
parsed.useModel = false;
} else if (arg === '--verbose' || arg === '-v') {
parsed.verbose = true;
} else if (arg.startsWith('--checkpoint-file')) {
parsed.checkpointFile = arg.startsWith('--checkpoint-file=') ? arg.split('=')[1] : args[++i];
}
}
return parsed;
}
// Parse command-line arguments (these override environment variables)
const cliArgs = parseArguments();
// Configuration from environment variables with CLI argument overrides
const DRY_RUN = cliArgs.dryRun !== undefined ? cliArgs.dryRun : (process.env.DRY_RUN === 'true');
const FILTER_DATE = cliArgs.filterDate || process.env.FILTER_DATE || '2025-05-06';
const END_DATE = cliArgs.endDate || process.env.END_DATE || null;
const BATCH_SIZE = cliArgs.batchSize || parseInt(process.env.BATCH_SIZE || '1000', 10);
const SOURCE_COLLECTION = cliArgs.sourceCollection || process.env.SOURCE_COLLECTION || 'application_details';
const TARGET_COLLECTION = cliArgs.targetCollection || process.env.TARGET_COLLECTION || 'application_details_old';
const SOURCE_MODEL = cliArgs.sourceModel || process.env.SOURCE_MODEL || 'application_detail'; // Model file name without .js
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES || '3', 10);
const RETRY_DELAY_MS = parseInt(process.env.RETRY_DELAY || '1000', 10);
const SHOW_PROGRESS = process.env.SHOW_PROGRESS !== 'false';
const CONNECTION_REFRESH_INTERVAL = parseInt(process.env.CONNECTION_REFRESH_INTERVAL || '20', 10);
const QUERY_TIMEOUT_MS = parseInt(process.env.QUERY_TIMEOUT_MS || '30000', 10);
const VERBOSE = cliArgs.verbose !== undefined ? cliArgs.verbose : (process.env.VERBOSE === 'true');
const USE_MODEL = cliArgs.useModel !== undefined ? cliArgs.useModel : (process.env.USE_MODEL !== 'false'); // Whether to use Mongoose model or direct collection access
const CHECKPOINT_FILE = cliArgs.checkpointFile || process.env.CHECKPOINT_FILE || `${SOURCE_COLLECTION}_to_${TARGET_COLLECTION}_checkpoint.json`;
const RESUME_MODE = cliArgs.resume || false;
// Handle informational commands
if (cliArgs.showLastRun || cliArgs.showStatus) {
const fs = require('fs');
const path = require('path');
try {
if (!fs.existsSync(CHECKPOINT_FILE)) {
console.log(`No checkpoint file found at: ${CHECKPOINT_FILE}`);
console.log('This script has never been run or no checkpoint was saved.');
process.exit(0);
}
const checkpoint = JSON.parse(fs.readFileSync(CHECKPOINT_FILE, 'utf8'));
if (cliArgs.showLastRun) {
console.log('='.repeat(50));
console.log('LAST EXECUTION INFORMATION');
console.log('='.repeat(50));
if (checkpoint.lastExecution) {
console.log(`Last started: ${new Date(checkpoint.lastExecution.startTime).toLocaleString()}`);
if (checkpoint.lastExecution.endTime) {
console.log(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`);
const duration = (new Date(checkpoint.lastExecution.endTime) - new Date(checkpoint.lastExecution.startTime)) / 1000;
console.log(`Duration: ${duration.toFixed(2)} seconds`);
console.log(`Status: COMPLETED`);
} else {
console.log(`Status: INTERRUPTED (never completed)`);
}
if (checkpoint.lastExecution.stats) {
const stats = checkpoint.lastExecution.stats;
console.log(`Documents processed: ${stats.processed || 0}`);
console.log(`Documents found: ${stats.totalFound || 0}`);
console.log(`Batches processed: ${stats.batches || 0}`);
console.log(`Errors: ${stats.errors || 0}`);
}
console.log(`Source: ${checkpoint.lastExecution.sourceCollection || 'Unknown'}`);
console.log(`Target: ${checkpoint.lastExecution.targetCollection || 'Unknown'}`);
console.log(`Filter date: ${checkpoint.lastExecution.filterDate || 'None'}`);
console.log(`End date: ${checkpoint.lastExecution.endDate || 'None'}`);
} else {
console.log('No execution history found in checkpoint file.');
}
}
if (cliArgs.showStatus) {
console.log('='.repeat(50));
console.log('CURRENT STATUS');
console.log('='.repeat(50));
if (checkpoint.currentExecution && !checkpoint.currentExecution.endTime) {
console.log(`Status: RUNNING`);
console.log(`Started: ${new Date(checkpoint.currentExecution.startTime).toLocaleString()}`);
const runningTime = (Date.now() - new Date(checkpoint.currentExecution.startTime)) / 1000;
console.log(`Running for: ${runningTime.toFixed(2)} seconds`);
if (checkpoint.currentExecution.lastProcessedId) {
console.log(`Last processed ID: ${checkpoint.currentExecution.lastProcessedId}`);
}
if (checkpoint.currentExecution.stats) {
const stats = checkpoint.currentExecution.stats;
console.log(`Documents processed: ${stats.processed || 0}`);
console.log(`Batches processed: ${stats.batches || 0}`);
if (stats.totalFound) {
const progress = ((stats.processed || 0) / stats.totalFound * 100).toFixed(1);
console.log(`Progress: ${progress}%`);
}
}
} else {
console.log(`Status: NOT RUNNING`);
if (checkpoint.lastExecution && checkpoint.lastExecution.endTime) {
console.log(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`);
}
}
}
console.log('='.repeat(50));
} catch (error) {
console.error(`Error reading checkpoint file: ${error.message}`);
process.exit(1);
}
process.exit(0);
}
// Dynamically load the source model if specified
let SourceModel = null;
if (USE_MODEL) {
try {
SourceModel = require(`../model/${SOURCE_MODEL}`);
if (VERBOSE) debug(`Loaded model: ${SOURCE_MODEL}`);
} catch (error) {
debug(`Warning: Could not load model '../model/${SOURCE_MODEL}': ${error.message}`);
debug('Falling back to direct collection access...');
}
}
// Configure mongoose for long-running operations
mongoose.set('maxTimeMS', 60000); // 60 seconds timeout for queries
// Only set up connection event handlers in verbose mode
if (VERBOSE) {
mongoose.connection.on('error', (err) => {
debug('MongoDB connection error:', err);
});
mongoose.connection.on('disconnected', () => {
debug('MongoDB disconnected, attempting to reconnect...');
});
mongoose.connection.on('reconnected', () => {
debug('MongoDB reconnected successfully');
});
}
/**
* Load checkpoint data from file
* @returns {Object|null} Checkpoint data or null if file doesn't exist
*/
function loadCheckpoint() {
const fs = require('fs');
try {
if (!fs.existsSync(CHECKPOINT_FILE)) {
return null;
}
const checkpoint = JSON.parse(fs.readFileSync(CHECKPOINT_FILE, 'utf8'));
if (VERBOSE) debug(`Loaded checkpoint from: ${CHECKPOINT_FILE}`);
return checkpoint;
} catch (error) {
debug(`Error loading checkpoint: ${error.message}`);
return null;
}
}
/**
* Save checkpoint data to file
* @param {Object} checkpoint - Checkpoint data to save
*/
function saveCheckpoint(checkpoint) {
const fs = require('fs');
try {
// Ensure directory exists
const dir = require('path').dirname(CHECKPOINT_FILE);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(CHECKPOINT_FILE, JSON.stringify(checkpoint, null, 2));
if (VERBOSE) debug(`Saved checkpoint to: ${CHECKPOINT_FILE}`);
} catch (error) {
debug(`Error saving checkpoint: ${error.message}`);
}
}
/**
* Initialize execution tracking
* @returns {Object} Execution metadata
*/
function initializeExecution() {
// Load existing checkpoint or create new one
let checkpoint = loadCheckpoint() || {};
// Handle resume mode
if (RESUME_MODE) {
if (checkpoint.currentExecution && !checkpoint.currentExecution.endTime) {
// Resume from interrupted execution
if (VERBOSE) debug('Resuming from interrupted execution...');
debug(`Resuming from: ${new Date(checkpoint.currentExecution.startTime).toLocaleString()}`);
if (checkpoint.currentExecution.lastProcessedId) {
debug(`Last processed ID: ${checkpoint.currentExecution.lastProcessedId}`);
}
return checkpoint.currentExecution;
} else if (checkpoint.lastExecution && checkpoint.lastExecution.endTime) {
// Resume from last completed execution's end point
if (VERBOSE) debug('Starting new execution from last completed run end point...');
debug(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`);
// Create new execution starting from where the last one ended
const execution = {
startTime: new Date().toISOString(),
endTime: null,
sourceCollection: SOURCE_COLLECTION,
targetCollection: TARGET_COLLECTION,
filterDate: checkpoint.lastExecution.endDate || checkpoint.lastExecution.filterDate, // Use end date of last run as start
endDate: END_DATE,
batchSize: BATCH_SIZE,
dryRun: DRY_RUN,
lastProcessedId: checkpoint.lastExecution.lastProcessedId || null,
stats: null,
resumedFrom: 'lastExecution'
};
checkpoint.currentExecution = execution;
saveCheckpoint(checkpoint);
return execution;
} else {
debug('No previous execution found to resume from. Starting fresh...');
}
}
// Create new execution
const execution = {
startTime: new Date().toISOString(),
endTime: null,
sourceCollection: SOURCE_COLLECTION,
targetCollection: TARGET_COLLECTION,
filterDate: FILTER_DATE,
endDate: END_DATE,
batchSize: BATCH_SIZE,
dryRun: DRY_RUN,
lastProcessedId: null,
stats: null
};
// Move current execution to last execution if it was completed
if (checkpoint.currentExecution && checkpoint.currentExecution.endTime) {
checkpoint.lastExecution = checkpoint.currentExecution;
}
// Set new current execution
checkpoint.currentExecution = execution;
saveCheckpoint(checkpoint);
if (VERBOSE) debug('Initialized execution tracking');
return execution;
}
/**
* Update execution progress
* @param {Object} execution - Current execution metadata
* @param {Object} stats - Current statistics
* @param {String} lastProcessedId - Last processed document ID
*/
function updateExecutionProgress(execution, stats, lastProcessedId = null) {
execution.stats = { ...stats };
if (lastProcessedId) {
execution.lastProcessedId = lastProcessedId.toString();
}
const checkpoint = loadCheckpoint() || {};
checkpoint.currentExecution = execution;
saveCheckpoint(checkpoint);
}
/**
* Complete execution tracking
* @param {Object} execution - Current execution metadata
* @param {Object} finalStats - Final statistics
*/
function completeExecution(execution, finalStats) {
execution.endTime = new Date().toISOString();
execution.stats = { ...finalStats };
const checkpoint = loadCheckpoint() || {};
checkpoint.currentExecution = execution;
checkpoint.lastExecution = execution;
saveCheckpoint(checkpoint);
if (VERBOSE) debug('Completed execution tracking');
}
/**
* Create ObjectId from date for filtering
* @param {Date|string} date - Date to convert to ObjectId
* @returns {mongoose.Types.ObjectId} ObjectId with timestamp
*/
function createObjectIdFromDate(date) {
const dateObj = new Date(date);
const timestamp = Math.floor(dateObj.getTime() / 1000);
const objectIdHex = timestamp.toString(16) + '0000000000000000';
return new mongoose.Types.ObjectId(objectIdHex);
}
/**
* Sleep for specified milliseconds
* @param {number} ms - Milliseconds to sleep
* @returns {Promise<void>}
*/
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Retry wrapper for operations with exponential backoff
* @param {Function} operation - Operation to retry
* @param {string} operationName - Name of the operation for logging
* @param {number} maxRetries - Maximum number of retries
* @returns {Promise<any>} Result of the operation
*/
async function withRetry(operation, operationName, maxRetries = MAX_RETRIES) {
let lastError;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
if (attempt === maxRetries) {
if (VERBOSE) debug(`${operationName} failed after ${maxRetries + 1} attempts: ${error.message}`);
throw error;
}
const delay = RETRY_DELAY_MS * Math.pow(2, attempt);
if (VERBOSE) debug(`${operationName} failed (attempt ${attempt + 1}/${maxRetries + 1}): ${error.message}. Retrying in ${delay}ms...`);
await sleep(delay);
}
}
throw lastError;
}
/**
* Get the total count of documents matching the filter
* @param {mongoose.Types.ObjectId} filterObjectId - ObjectId to filter from
* @param {mongoose.Types.ObjectId} endObjectId - ObjectId to filter to
* @returns {Promise<number>} Total count of matching documents
*/
async function getTotalCount(filterObjectId, endObjectId) {
const query = {
_id: { $gte: filterObjectId }
};
if (endObjectId) {
query._id.$lte = endObjectId;
}
return await withRetry(async () => {
if (SourceModel) {
// Use Mongoose model
return await SourceModel.countDocuments(query);
} else {
// Use direct collection access
const sourceCollection = mongoose.connection.db.collection(SOURCE_COLLECTION);
return await sourceCollection.countDocuments(query);
}
}, 'Count documents');
}
/**
* Process a batch of documents with bulk upsert
* @param {Array} documents - Documents to process
* @param {Object} targetCollection - Target MongoDB collection
* @param {Object} stats - Statistics object to update
* @returns {Promise<void>}
*/
async function processBatch(documents, targetCollection, stats) {
if (!documents || documents.length === 0) {
return;
}
if (VERBOSE) debug(`Processing batch of ${documents.length} documents...`);
if (DRY_RUN) {
if (VERBOSE) debug(`DRY RUN: Would upsert ${documents.length} documents to ${TARGET_COLLECTION}`);
stats.processed += documents.length;
stats.dryRunCount += documents.length;
return;
}
// Prepare bulk operations for upsert
const bulkOps = documents.map(doc => ({
replaceOne: {
filter: { _id: doc._id },
replacement: doc.toObject ? doc.toObject() : doc,
upsert: true
}
}));
// Execute bulk operation with retry
const result = await withRetry(async () => {
return await targetCollection.bulkWrite(bulkOps, {
ordered: false, // Allow partial success
writeConcern: { w: 'majority', j: true }
});
}, `Bulk upsert batch of ${documents.length} documents`);
// Update statistics
stats.processed += documents.length;
stats.upserted += result.upsertedCount || 0;
stats.modified += result.modifiedCount || 0;
stats.matched += result.matchedCount || 0;
if (VERBOSE) debug(`Batch completed: ${result.upsertedCount} upserted, ${result.modifiedCount} modified, ${result.matchedCount} matched`);
}
/**
* Display progress information
* @param {number} processed - Number of documents processed
* @param {number} total - Total number of documents
* @param {Date} startTime - Start time of the operation
*/
function showProgress(processed, total, startTime) {
if (!SHOW_PROGRESS || total === 0) return;
const elapsed = Date.now() - startTime.getTime();
const rate = processed / (elapsed / 1000);
const remaining = total - processed;
const eta = remaining > 0 ? remaining / rate : 0;
const percentage = ((processed / total) * 100).toFixed(1);
const formatTime = (seconds) => {
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
const secs = Math.floor(seconds % 60);
if (hours > 0) {
return `${hours}h ${minutes}m`;
} else if (minutes > 0) {
return `${minutes}m ${secs}s`;
} else {
return `${secs}s`;
}
};
// Simplified progress format for non-verbose mode
if (VERBOSE) {
debug(`Progress: ${processed}/${total} (${percentage}%) | Rate: ${rate.toFixed(1)} docs/sec | ETA: ${formatTime(eta)}`);
} else {
debug(`${percentage}% (${processed}/${total}) | ${rate.toFixed(0)} docs/sec | ETA: ${formatTime(eta)}`);
}
}
/**
* Main function to copy documents between collections
* @returns {Promise<Object>} Statistics about the copy operation
*/
async function copyCollection() {
const startTime = new Date();
// Initialize execution tracking
const execution = initializeExecution();
// Use execution's filter date for resume scenarios
const effectiveFilterDate = execution.filterDate || FILTER_DATE;
const effectiveEndDate = execution.endDate || END_DATE;
debug(`Starting collection copy from ${effectiveFilterDate}...`);
debug(`Configuration: DRY_RUN=${DRY_RUN}, BATCH_SIZE=${BATCH_SIZE}, SOURCE=${SOURCE_COLLECTION}, TARGET=${TARGET_COLLECTION}`);
debug(`Model: ${SourceModel ? SOURCE_MODEL : 'Direct collection access'}, USE_MODEL=${USE_MODEL}, VERBOSE=${VERBOSE}`);
if (CHECKPOINT_FILE) {
debug(`Checkpoint file: ${CHECKPOINT_FILE}`);
}
if (RESUME_MODE) {
debug(`Resume mode: ${RESUME_MODE}, using filterDate: ${effectiveFilterDate}`);
}
// Create ObjectId filter for the specified date
const filterObjectId = createObjectIdFromDate(effectiveFilterDate);
const endObjectId = effectiveEndDate ? createObjectIdFromDate(effectiveEndDate) : null;
debug(`Using ObjectId filter: ${filterObjectId} (created from date: ${FILTER_DATE})`);
if (endObjectId) {
debug(`Using end ObjectId filter: ${endObjectId} (created from date: ${END_DATE})`);
}
// Initialize statistics
const stats = {
processed: 0,
upserted: 0,
modified: 0,
matched: 0,
errors: 0,
dryRunCount: 0,
totalFound: 0,
batches: 0
};
try {
// Get total count for progress tracking
if (VERBOSE) debug('Counting documents to process...');
const totalCount = await getTotalCount(filterObjectId, endObjectId);
stats.totalFound = totalCount;
// Update execution with initial stats
updateExecutionProgress(execution, stats);
if (totalCount === 0) {
debug(`No documents found with _id >= ${filterObjectId} and _id <= ${endObjectId || 'Infinity'} (created from ${FILTER_DATE} to ${END_DATE || 'Infinity'})`);
completeExecution(execution, stats);
return stats;
}
debug(`Found ${totalCount} documents to process`);
// Get target collection
const targetCollection = mongoose.connection.db.collection(TARGET_COLLECTION);
// Process documents in batches using cursor-based pagination
let lastProcessedId = filterObjectId;
// Resume from checkpoint if available
if (execution.lastProcessedId) {
lastProcessedId = new mongoose.Types.ObjectId(execution.lastProcessedId);
if (VERBOSE) debug(`Resuming from last processed ID: ${lastProcessedId}`);
}
let hasMore = true;
while (hasMore) {
try {
// Fetch batch of documents using cursor-based pagination
if (VERBOSE) debug(`Fetching batch ${stats.batches + 1} (lastId: ${lastProcessedId}, limit: ${BATCH_SIZE})...`);
const documents = await withRetry(async () => {
const query = {
_id: { $gte: lastProcessedId }
};
if (endObjectId) {
query._id.$lte = endObjectId;
}
if (SourceModel) {
// Use Mongoose model
return await SourceModel.find(query)
.sort({ _id: 1 })
.limit(BATCH_SIZE)
.lean() // Use lean() for better performance
.maxTimeMS(QUERY_TIMEOUT_MS); // Set query timeout
} else {
// Use direct collection access
const sourceCollection = mongoose.connection.db.collection(SOURCE_COLLECTION);
return await sourceCollection.find(query)
.sort({ _id: 1 })
.limit(BATCH_SIZE)
.maxTimeMS(QUERY_TIMEOUT_MS)
.toArray();
}
}, `Fetch batch ${stats.batches + 1}`);
if (documents.length === 0) {
hasMore = false;
break;
}
stats.batches++;
// Process the batch
await processBatch(documents, targetCollection, stats);
// Update execution progress checkpoint
updateExecutionProgress(execution, stats, lastProcessedId);
// Update progress (only show every 10 batches unless verbose)
if (VERBOSE || stats.batches % 10 === 0) {
showProgress(stats.processed, totalCount, startTime);
}
// Update cursor for next iteration
lastProcessedId = new mongoose.Types.ObjectId(documents[documents.length - 1]._id.toString());
// Increment the ObjectId to avoid processing the same document twice
const buffer = lastProcessedId.toHexString();
const incremented = (parseInt(buffer.slice(-8), 16) + 1).toString(16).padStart(8, '0');
lastProcessedId = new mongoose.Types.ObjectId(buffer.slice(0, -8) + incremented);
// Check if we've processed all documents
if (documents.length < BATCH_SIZE) {
hasMore = false;
}
// Small delay between batches to reduce database load (skip if processing quickly)
if (hasMore && (VERBOSE || stats.batches % 50 === 0)) {
await sleep(100);
}
// Refresh connection every N batches to prevent session timeout
if (stats.batches % CONNECTION_REFRESH_INTERVAL === 0) {
if (VERBOSE) debug(`Refreshing database connection after ${stats.batches} batches...`);
try {
// Ping the database to ensure connection is alive
await mongoose.connection.db.admin().ping();
if (VERBOSE) debug('Database connection refreshed successfully');
} catch (pingError) {
if (VERBOSE) debug(`Database ping failed: ${pingError.message}`);
// The retry logic will handle reconnection if needed
}
}
} catch (error) {
debug(`Error processing batch ${stats.batches + 1}: ${error.message}`);
stats.errors++;
// Handle session expiration errors specifically
if (error.message.includes('expired sessions') ||
error.message.includes('session') ||
error.message.includes('topology') ||
error.codeName === 'Interrupted') {
if (VERBOSE) debug('Session-related error detected, attempting to refresh connection...');
try {
await mongoose.connection.db.admin().ping();
if (VERBOSE) debug('Connection refreshed, retrying batch...');
// Don't increment skip/lastProcessedId, retry the same batch
continue;
} catch (refreshError) {
if (VERBOSE) debug(`Connection refresh failed: ${refreshError.message}`);
}
}
// Move to next batch for other errors
if (lastProcessedId) {
// Increment ObjectId to skip problematic batch
const buffer = lastProcessedId.toHexString();
const incremented = (parseInt(buffer.slice(-8), 16) + BATCH_SIZE).toString(16).padStart(8, '0');
lastProcessedId = new mongoose.Types.ObjectId(buffer.slice(0, -8) + incremented);
}
// Break if too many consecutive errors
if (stats.errors > 5) {
debug('Too many errors, stopping operation');
break;
}
}
}
const endTime = new Date();
const duration = (endTime.getTime() - startTime.getTime()) / 1000;
debug('Copy operation completed!');
debug('='.repeat(50));
debug(`Source collection: ${SOURCE_COLLECTION}`);
debug(`Target collection: ${TARGET_COLLECTION}`);
debug(`Total documents found: ${stats.totalFound}`);
debug(`Total batches processed: ${stats.batches}`);
debug(`Documents processed: ${stats.processed}`);
if (DRY_RUN) {
debug(`Dry run count: ${stats.dryRunCount}`);
} else {
debug(`Documents upserted: ${stats.upserted}`);
debug(`Documents modified: ${stats.modified}`);
debug(`Documents matched: ${stats.matched}`);
}
debug(`Errors encountered: ${stats.errors}`);
debug(`Duration: ${duration.toFixed(2)} seconds`);
debug(`Average rate: ${(stats.processed / duration).toFixed(1)} documents/second`);
debug('='.repeat(50));
// Complete execution tracking
completeExecution(execution, stats);
return stats;
} catch (error) {
debug(`Fatal error during copy operation: ${error.message}`);
// Update execution with error status
if (execution) {
execution.error = error.message;
execution.endTime = new Date().toISOString();
const checkpoint = loadCheckpoint() || {};
checkpoint.currentExecution = execution;
saveCheckpoint(checkpoint);
}
throw error;
}
}
/**
* Set up global process error handling
*/
process
.on('uncaughtException', function (err) {
debug('Uncaught Exception:', err);
process.exit(1);
})
.on('unhandledRejection', (reason, p) => {
debug('Unhandled Rejection at Promise:', p, 'reason:', reason);
process.exit(1);
});
/**
* Main execution - follows the same pattern as migrateJobIds.js
*/
dbConn.once('open', async () => {
try {
debug('Database connected');
// Run the copy operation
const result = await copyCollection();
// Log final result
if (result.errors > 0) {
debug(`Operation completed with ${result.errors} errors`);
} else {
debug('Operation completed successfully');
}
} catch (error) {
debug('Operation failed:', error);
} finally {
await mongoose.connection.close();
process.exit();
}
});
// Export the main function for programmatic use (for example, from copyCollection.js wrapper)
module.exports = {
copyCollection,
// Keep the old name for backward compatibility
copyApplicationDetails: copyCollection
};