'use strict'; /** * Generic Collection Copy Worker * * This script copies documents from one MongoDB collection to another based o } else if (arg.startsWith('--filter-date=') || arg === '-d') { parsed.filterDate = arg.startsWith('--filter-date=') ? arg.split('=')[1] : args[++i]; } else if (arg.startsWith('--end-date=') || arg === '-e') { parsed.endDate = arg.startsWith('--end-date=') ? arg.split('=')[1] : args[++i]; } else if (arg.startsWith('--batch-size=') || arg === '-b') {ate filtering. * It uses ObjectId timestamp filtering for documents created from a specific date. * * Key Features: * - Works with any MongoDB collection (configurable source/target) * - Can use Mongoose models or direct collection access * - Filters documents by ObjectId timestamp (more reliable than date fields) * - Uses batch processing with configurable batch sizes (default 1000 documents) * - Implements bulk upsert operations for efficient data transfer * - OPTIMIZED: Uses progressive counting instead of expensive countDocuments() * - Provides detailed progress tracking and statistics without scanning entire collections * - Supports dry-run mode for testing * - Handles duplicate documents gracefully with upsert operations * - Implements robust error handling with retry logic * - Command-line argument support for easier scripting * - Supports end-date filtering for safety with live data * - Allows checkpoint saving for resumable operations * * Usage Examples: * # Copy application details (default behavior for backward compatibility) * DEBUG=agm:copy-collection node workers/copyApplicationDetails.js * * # Copy users collection using environment variables * DEBUG=agm:copy-collection SOURCE_COLLECTION=users TARGET_COLLECTION=users_backup SOURCE_MODEL=user node workers/copyApplicationDetails.js * * # Copy any collection without model (direct collection access) * DEBUG=agm:copy-collection SOURCE_COLLECTION=jobs TARGET_COLLECTION=jobs_archive USE_MODEL=false node workers/copyApplicationDetails.js * * # Copy with custom date filter using environment variables * DEBUG=agm:copy-collection FILTER_DATE="2025-01-01" SOURCE_COLLECTION=invoices TARGET_COLLECTION=invoices_2025 node workers/copyApplicationDetails.js * * # Copy using command-line arguments (overrides environment variables) * node workers/copyApplicationDetails.js --source=users --target=users_backup --model=user --filter-date="2024-01-01" * * # Copy in dry-run mode with command-line arguments * node workers/copyApplicationDetails.js --source=logs --target=logs_backup --dry-run --verbose * * # Generic collection copying (same functionality, clearer naming) * node workers/copyApplicationDetails.js --source=any_collection --target=backup_collection --batch-size=500 * * Command-line Arguments (override environment variables): * --source Source collection name * --target Target collection name * --model Source model file name (without .js extension) * --filter-date Date to filter from (YYYY-MM-DD format or ISO string) * --end-date Date to filter to (YYYY-MM-DD format or ISO string) - for safety with live data * --batch-size Number of documents per batch * --dry-run Only show what would be copied without making changes * --no-model Use direct collection access instead of Mongoose model * --verbose, -v Enable detailed logging * --checkpoint-file File to save/load processing checkpoint (for resumable operations) * --resume Resume from last checkpoint/execution (auto-detect last processed ID) * --last-run Show when the script was last executed * --status Show current execution status and progress * --help, -h Show this help message * * Environment Variables: * - DRY_RUN=true # Only logs what would happen without making changes * - FILTER_DATE="2025-05-06" # Date to filter from (default: May 06, 2025) * - END_DATE # Date to filter to (optional, for safety with live data) * - BATCH_SIZE=1000 # Number of documents per batch (default: 1000) * - SOURCE_COLLECTION="application_details" # Source collection name * - TARGET_COLLECTION="application_details_old" # Target collection name * - SOURCE_MODEL="application_detail" # Source model file name (without .js) * - USE_MODEL=true # Whether to use Mongoose model or direct collection access (default: true) * - MAX_RETRIES=3 # Maximum number of retries for errors (default: 3) * - RETRY_DELAY=1000 # Base delay in ms between retries (default: 1000) * - SHOW_PROGRESS=true # Whether to show progress indicator (default: true) * - CONNECTION_REFRESH_INTERVAL=20 # Refresh connection every N batches (default: 20) * - QUERY_TIMEOUT_MS=30000 # Query timeout in milliseconds (default: 30000) * - VERBOSE=false # Enable verbose logging for debugging (default: false) * - CHECKPOINT_FILE # File to save/load processing checkpoint */ const debug = require('debug')('agm:copy-collection'); const env = require('../helpers/env.js'); const { DBConnection } = require('../helpers/db/connect.js'); const mongoose = require('mongoose'); const utils = require('../helpers/utils.js'); /** * Parse command-line arguments * @returns {Object} Parsed arguments object */ function parseArguments() { const args = process.argv.slice(2); const parsed = {}; for (let i = 0; i < args.length; i++) { const arg = args[i]; if (arg === '--help' || arg === '-h') { console.log(` Generic Collection Copy Worker Usage: node copyApplicationDetails.js [options] Options: --source Source collection name --target Target collection name --model Source model file name (without .js extension) --filter-date Date to filter from (YYYY-MM-DD format or ISO string) --end-date Date to filter to (YYYY-MM-DD format or ISO string) - for safety with live data --batch-size Number of documents per batch (default: 1000) --dry-run Only show what would be copied without making changes --no-model Use direct collection access instead of Mongoose model --verbose, -v Enable detailed logging --checkpoint-file File to save/load processing checkpoint (for resumable operations) --resume Resume from last checkpoint/execution (auto-detect last processed ID) --last-run Show when the script was last executed --status Show current execution status and progress --help, -h Show this help message Environment Variables: DRY_RUN, FILTER_DATE, END_DATE, BATCH_SIZE, SOURCE_COLLECTION, TARGET_COLLECTION, SOURCE_MODEL, USE_MODEL, MAX_RETRIES, RETRY_DELAY, SHOW_PROGRESS, CONNECTION_REFRESH_INTERVAL, QUERY_TIMEOUT_MS, VERBOSE, CHECKPOINT_FILE Examples: # Copy application details with default settings node copyApplicationDetails.js # Copy users to backup with command-line args node copyApplicationDetails.js --source=users --target=users_backup --model=user # Copy in dry-run mode with verbose logging node copyApplicationDetails.js --source=logs --target=logs_backup --dry-run --verbose # Check when script was last executed node copyApplicationDetails.js --last-run # Check current execution status node copyApplicationDetails.js --status # Resume from last checkpoint (auto-detect where to continue) node copyApplicationDetails.js --resume --verbose `); process.exit(0); } if (arg === '--last-run') { parsed.showLastRun = true; } else if (arg === '--status') { parsed.showStatus = true; } else if (arg === '--resume') { parsed.resume = true; } else if (arg.startsWith('--source=')) { parsed.sourceCollection = arg.split('=')[1]; } else if (arg.startsWith('--target=')) { parsed.targetCollection = arg.split('=')[1]; } else if (arg.startsWith('--model=')) { parsed.sourceModel = arg.split('=')[1]; } else if (arg.startsWith('--filter-date=')) { parsed.filterDate = arg.split('=')[1]; } else if (arg.startsWith('--end-date=')) { parsed.endDate = arg.split('=')[1]; } else if (arg.startsWith('--batch-size=')) { const value = arg.split('=')[1]; parsed.batchSize = parseInt(value, 10); } else if (arg === '--dry-run') { parsed.dryRun = true; } else if (arg === '--no-model') { parsed.useModel = false; } else if (arg === '--verbose' || arg === '-v') { parsed.verbose = true; } else if (arg.startsWith('--checkpoint-file')) { parsed.checkpointFile = arg.startsWith('--checkpoint-file=') ? arg.split('=')[1] : args[++i]; } } return parsed; } // Parse command-line arguments (these override environment variables) const cliArgs = parseArguments(); // Configuration from environment variables with CLI argument overrides const DRY_RUN = cliArgs.dryRun !== undefined ? cliArgs.dryRun : (process.env.DRY_RUN === 'true'); const FILTER_DATE = cliArgs.filterDate || process.env.FILTER_DATE || '2025-05-06'; const END_DATE = cliArgs.endDate || process.env.END_DATE || null; const BATCH_SIZE = cliArgs.batchSize || parseInt(process.env.BATCH_SIZE || '1000', 10); const SOURCE_COLLECTION = cliArgs.sourceCollection || process.env.SOURCE_COLLECTION || 'application_details'; const TARGET_COLLECTION = cliArgs.targetCollection || process.env.TARGET_COLLECTION || 'application_details_old'; const SOURCE_MODEL = cliArgs.sourceModel || process.env.SOURCE_MODEL || 'application_detail'; // Model file name without .js const MAX_RETRIES = parseInt(process.env.MAX_RETRIES || '3', 10); const RETRY_DELAY_MS = parseInt(process.env.RETRY_DELAY || '1000', 10); const SHOW_PROGRESS = process.env.SHOW_PROGRESS !== 'false'; const CONNECTION_REFRESH_INTERVAL = parseInt(process.env.CONNECTION_REFRESH_INTERVAL || '20', 10); const QUERY_TIMEOUT_MS = parseInt(process.env.QUERY_TIMEOUT_MS || '30000', 10); const VERBOSE = cliArgs.verbose !== undefined ? cliArgs.verbose : (process.env.VERBOSE === 'true'); const USE_MODEL = cliArgs.useModel !== undefined ? cliArgs.useModel : (process.env.USE_MODEL !== 'false'); // Whether to use Mongoose model or direct collection access const CHECKPOINT_FILE = cliArgs.checkpointFile || process.env.CHECKPOINT_FILE || `${SOURCE_COLLECTION}_to_${TARGET_COLLECTION}_checkpoint.json`; const RESUME_MODE = cliArgs.resume || false; // Handle informational commands if (cliArgs.showLastRun || cliArgs.showStatus) { const fs = require('fs'); const path = require('path'); try { if (!fs.existsSync(CHECKPOINT_FILE)) { console.log(`No checkpoint file found at: ${CHECKPOINT_FILE}`); console.log('This script has never been run or no checkpoint was saved.'); process.exit(0); } const checkpoint = JSON.parse(fs.readFileSync(CHECKPOINT_FILE, 'utf8')); if (cliArgs.showLastRun) { console.log('='.repeat(50)); console.log('LAST EXECUTION INFORMATION'); console.log('='.repeat(50)); if (checkpoint.lastExecution) { console.log(`Last started: ${new Date(checkpoint.lastExecution.startTime).toLocaleString()}`); if (checkpoint.lastExecution.endTime) { console.log(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`); const duration = (new Date(checkpoint.lastExecution.endTime) - new Date(checkpoint.lastExecution.startTime)) / 1000; console.log(`Duration: ${duration.toFixed(2)} seconds`); console.log(`Status: COMPLETED`); } else { console.log(`Status: INTERRUPTED (never completed)`); } if (checkpoint.lastExecution.stats) { const stats = checkpoint.lastExecution.stats; console.log(`Documents processed: ${stats.processed || 0}`); console.log(`Documents found: ${stats.totalFound || 0}`); console.log(`Batches processed: ${stats.batches || 0}`); console.log(`Errors: ${stats.errors || 0}`); } console.log(`Source: ${checkpoint.lastExecution.sourceCollection || 'Unknown'}`); console.log(`Target: ${checkpoint.lastExecution.targetCollection || 'Unknown'}`); console.log(`Filter date: ${checkpoint.lastExecution.filterDate || 'None'}`); console.log(`End date: ${checkpoint.lastExecution.endDate || 'None'}`); } else { console.log('No execution history found in checkpoint file.'); } } if (cliArgs.showStatus) { console.log('='.repeat(50)); console.log('CURRENT STATUS'); console.log('='.repeat(50)); if (checkpoint.currentExecution && !checkpoint.currentExecution.endTime) { console.log(`Status: RUNNING`); console.log(`Started: ${new Date(checkpoint.currentExecution.startTime).toLocaleString()}`); const runningTime = (Date.now() - new Date(checkpoint.currentExecution.startTime)) / 1000; console.log(`Running for: ${runningTime.toFixed(2)} seconds`); if (checkpoint.currentExecution.lastProcessedId) { console.log(`Last processed ID: ${checkpoint.currentExecution.lastProcessedId}`); } if (checkpoint.currentExecution.stats) { const stats = checkpoint.currentExecution.stats; console.log(`Documents processed: ${stats.processed || 0}`); console.log(`Batches processed: ${stats.batches || 0}`); if (stats.totalFound) { const progress = ((stats.processed || 0) / stats.totalFound * 100).toFixed(1); console.log(`Progress: ${progress}%`); } } } else { console.log(`Status: NOT RUNNING`); if (checkpoint.lastExecution && checkpoint.lastExecution.endTime) { console.log(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`); } } } console.log('='.repeat(50)); } catch (error) { console.error(`Error reading checkpoint file: ${error.message}`); process.exit(1); } process.exit(0); } // Dynamically load the source model if specified let SourceModel = null; if (USE_MODEL) { try { SourceModel = require(`../model/${SOURCE_MODEL}`); if (VERBOSE) debug(`Loaded model: ${SOURCE_MODEL}`); } catch (error) { debug(`Warning: Could not load model '../model/${SOURCE_MODEL}': ${error.message}`); debug('Falling back to direct collection access...'); } } // Configure mongoose for long-running operations mongoose.set('maxTimeMS', 60000); // 60 seconds timeout for queries // Only set up connection event handlers in verbose mode if (VERBOSE) { mongoose.connection.on('error', (err) => { debug('MongoDB connection error:', err); }); mongoose.connection.on('disconnected', () => { debug('MongoDB disconnected, attempting to reconnect...'); }); mongoose.connection.on('reconnected', () => { debug('MongoDB reconnected successfully'); }); } /** * Load checkpoint data from file * @returns {Object|null} Checkpoint data or null if file doesn't exist */ function loadCheckpoint() { const fs = require('fs'); try { if (!fs.existsSync(CHECKPOINT_FILE)) { return null; } const checkpoint = JSON.parse(fs.readFileSync(CHECKPOINT_FILE, 'utf8')); if (VERBOSE) debug(`Loaded checkpoint from: ${CHECKPOINT_FILE}`); return checkpoint; } catch (error) { debug(`Error loading checkpoint: ${error.message}`); return null; } } /** * Save checkpoint data to file * @param {Object} checkpoint - Checkpoint data to save */ function saveCheckpoint(checkpoint) { const fs = require('fs'); try { // Ensure directory exists const dir = require('path').dirname(CHECKPOINT_FILE); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } fs.writeFileSync(CHECKPOINT_FILE, JSON.stringify(checkpoint, null, 2)); if (VERBOSE) debug(`Saved checkpoint to: ${CHECKPOINT_FILE}`); } catch (error) { debug(`Error saving checkpoint: ${error.message}`); } } /** * Initialize execution tracking * @returns {Object} Execution metadata */ function initializeExecution() { // Load existing checkpoint or create new one let checkpoint = loadCheckpoint() || {}; // Handle resume mode if (RESUME_MODE) { if (checkpoint.currentExecution && !checkpoint.currentExecution.endTime) { // Resume from interrupted execution if (VERBOSE) debug('Resuming from interrupted execution...'); debug(`Resuming from: ${new Date(checkpoint.currentExecution.startTime).toLocaleString()}`); if (checkpoint.currentExecution.lastProcessedId) { debug(`Last processed ID: ${checkpoint.currentExecution.lastProcessedId}`); } return checkpoint.currentExecution; } else if (checkpoint.lastExecution && checkpoint.lastExecution.endTime) { // Resume from last completed execution's end point if (VERBOSE) debug('Starting new execution from last completed run end point...'); debug(`Last completed: ${new Date(checkpoint.lastExecution.endTime).toLocaleString()}`); // Create new execution starting from where the last one ended const execution = { startTime: new Date().toISOString(), endTime: null, sourceCollection: SOURCE_COLLECTION, targetCollection: TARGET_COLLECTION, filterDate: checkpoint.lastExecution.endDate || checkpoint.lastExecution.filterDate, // Use end date of last run as start endDate: END_DATE, batchSize: BATCH_SIZE, dryRun: DRY_RUN, lastProcessedId: checkpoint.lastExecution.lastProcessedId || null, stats: null, resumedFrom: 'lastExecution' }; checkpoint.currentExecution = execution; saveCheckpoint(checkpoint); return execution; } else { debug('No previous execution found to resume from. Starting fresh...'); } } // Create new execution const execution = { startTime: new Date().toISOString(), endTime: null, sourceCollection: SOURCE_COLLECTION, targetCollection: TARGET_COLLECTION, filterDate: FILTER_DATE, endDate: END_DATE, batchSize: BATCH_SIZE, dryRun: DRY_RUN, lastProcessedId: null, stats: null }; // Move current execution to last execution if it was completed if (checkpoint.currentExecution && checkpoint.currentExecution.endTime) { checkpoint.lastExecution = checkpoint.currentExecution; } // Set new current execution checkpoint.currentExecution = execution; saveCheckpoint(checkpoint); if (VERBOSE) debug('Initialized execution tracking'); return execution; } /** * Update execution progress * @param {Object} execution - Current execution metadata * @param {Object} stats - Current statistics * @param {String} lastProcessedId - Last processed document ID */ function updateExecutionProgress(execution, stats, lastProcessedId = null) { execution.stats = { ...stats }; if (lastProcessedId) { execution.lastProcessedId = lastProcessedId.toString(); } const checkpoint = loadCheckpoint() || {}; checkpoint.currentExecution = execution; saveCheckpoint(checkpoint); } /** * Complete execution tracking * @param {Object} execution - Current execution metadata * @param {Object} finalStats - Final statistics */ function completeExecution(execution, finalStats) { execution.endTime = new Date().toISOString(); execution.stats = { ...finalStats }; const checkpoint = loadCheckpoint() || {}; checkpoint.currentExecution = execution; checkpoint.lastExecution = execution; saveCheckpoint(checkpoint); if (VERBOSE) debug('Completed execution tracking'); } /** * Create ObjectId from date for filtering * @param {Date|string} date - Date to convert to ObjectId * @returns {mongoose.Types.ObjectId} ObjectId with timestamp */ function createObjectIdFromDate(date) { const dateObj = new Date(date); const timestamp = Math.floor(dateObj.getTime() / 1000); const objectIdHex = timestamp.toString(16) + '0000000000000000'; return new mongoose.Types.ObjectId(objectIdHex); } /** * Sleep for specified milliseconds * @param {number} ms - Milliseconds to sleep * @returns {Promise} */ function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } /** * Retry wrapper for operations with exponential backoff * @param {Function} operation - Operation to retry * @param {string} operationName - Name of the operation for logging * @param {number} maxRetries - Maximum number of retries * @returns {Promise} Result of the operation */ async function withRetry(operation, operationName, maxRetries = MAX_RETRIES) { let lastError; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await operation(); } catch (error) { lastError = error; if (attempt === maxRetries) { if (VERBOSE) debug(`${operationName} failed after ${maxRetries + 1} attempts: ${error.message}`); throw error; } const delay = RETRY_DELAY_MS * Math.pow(2, attempt); if (VERBOSE) debug(`${operationName} failed (attempt ${attempt + 1}/${maxRetries + 1}): ${error.message}. Retrying in ${delay}ms...`); await sleep(delay); } } throw lastError; } /** * Process a batch of documents with bulk upsert * @param {Array} documents - Documents to process * @param {Object} targetCollection - Target MongoDB collection * @param {Object} stats - Statistics object to update * @returns {Promise} */ async function processBatch(documents, targetCollection, stats) { if (!documents || documents.length === 0) { return; } if (VERBOSE) debug(`Processing batch of ${documents.length} documents...`); if (DRY_RUN) { if (VERBOSE) debug(`DRY RUN: Would upsert ${documents.length} documents to ${TARGET_COLLECTION}`); stats.processed += documents.length; stats.dryRunCount += documents.length; return; } // Prepare bulk operations for upsert const bulkOps = documents.map(doc => ({ replaceOne: { filter: { _id: doc._id }, replacement: doc.toObject ? doc.toObject() : doc, upsert: true } })); // Execute bulk operation with retry const result = await withRetry(async () => { return await targetCollection.bulkWrite(bulkOps, { ordered: false, // Allow partial success writeConcern: { w: 'majority', j: true } }); }, `Bulk upsert batch of ${documents.length} documents`); // Update statistics stats.processed += documents.length; stats.upserted += result.upsertedCount || 0; stats.modified += result.modifiedCount || 0; stats.matched += result.matchedCount || 0; if (VERBOSE) debug(`Batch completed: ${result.upsertedCount} upserted, ${result.modifiedCount} modified, ${result.matchedCount} matched`); } /** * Display progress information * @param {number} processed - Number of documents processed * @param {number} total - Total number of documents * @param {Date} startTime - Start time of the operation */ function showProgress(processed, total, startTime) { if (!SHOW_PROGRESS || total === 0) return; const elapsed = Date.now() - startTime.getTime(); const rate = processed / (elapsed / 1000); const remaining = total - processed; const eta = remaining > 0 ? remaining / rate : 0; const percentage = ((processed / total) * 100).toFixed(1); const formatTime = (seconds) => { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); const secs = Math.floor(seconds % 60); if (hours > 0) { return `${hours}h ${minutes}m`; } else if (minutes > 0) { return `${minutes}m ${secs}s`; } else { return `${secs}s`; } }; // Simplified progress format for non-verbose mode if (VERBOSE) { debug(`Progress: ${processed}/${total} (${percentage}%) | Rate: ${rate.toFixed(1)} docs/sec | ETA: ${formatTime(eta)}`); } else { debug(`${percentage}% (${processed}/${total}) | ${rate.toFixed(0)} docs/sec | ETA: ${formatTime(eta)}`); } } /** * Main function to copy documents between collections * @returns {Promise} Statistics about the copy operation */ async function copyCollection() { const startTime = new Date(); // Initialize execution tracking const execution = initializeExecution(); // Use execution's filter date for resume scenarios const effectiveFilterDate = execution.filterDate || FILTER_DATE; const effectiveEndDate = execution.endDate || END_DATE; debug(`Starting collection copy from ${effectiveFilterDate}...`); debug(`Configuration: DRY_RUN=${DRY_RUN}, BATCH_SIZE=${BATCH_SIZE}, SOURCE=${SOURCE_COLLECTION}, TARGET=${TARGET_COLLECTION}`); debug(`Model: ${SourceModel ? SOURCE_MODEL : 'Direct collection access'}, USE_MODEL=${USE_MODEL}, VERBOSE=${VERBOSE}`); if (CHECKPOINT_FILE) { debug(`Checkpoint file: ${CHECKPOINT_FILE}`); } if (RESUME_MODE) { debug(`Resume mode: ${RESUME_MODE}, using filterDate: ${effectiveFilterDate}`); } // Create ObjectId filter for the specified date const filterObjectId = createObjectIdFromDate(effectiveFilterDate); const endObjectId = effectiveEndDate ? createObjectIdFromDate(effectiveEndDate) : null; debug(`Using ObjectId filter: ${filterObjectId} (created from date: ${FILTER_DATE})`); if (endObjectId) { debug(`Using end ObjectId filter: ${endObjectId} (created from date: ${END_DATE})`); } // Initialize statistics const stats = { processed: 0, upserted: 0, modified: 0, matched: 0, errors: 0, dryRunCount: 0, totalFound: 0, batches: 0 }; try { // Skip expensive counting for large collections - use progressive counting instead // This optimization prevents scanning billion+ records just for progress reporting if (VERBOSE) debug('Starting document processing with progressive counting...'); // Quick check if there are any documents to process const hasDocuments = await withRetry(async () => { const query = { _id: { $gte: filterObjectId } }; if (endObjectId) query._id.$lte = endObjectId; if (SourceModel) { return await SourceModel.findOne(query).select('_id').lean(); } else { const sourceCollection = mongoose.connection.db.collection(SOURCE_COLLECTION); return await sourceCollection.findOne(query, { projection: { _id: 1 } }); } }, 'Check for documents'); if (!hasDocuments) { debug(`No documents found with _id >= ${filterObjectId} and _id <= ${endObjectId || 'Infinity'} (created from ${FILTER_DATE} to ${END_DATE || 'Infinity'})`); completeExecution(execution, stats); return stats; } debug(`Found documents to process - starting copy operation with progressive counting`); // Get target collection const targetCollection = mongoose.connection.db.collection(TARGET_COLLECTION); // Process documents in batches using cursor-based pagination let lastProcessedId = filterObjectId; // Resume from checkpoint if available if (execution.lastProcessedId) { lastProcessedId = new mongoose.Types.ObjectId(execution.lastProcessedId); if (VERBOSE) debug(`Resuming from last processed ID: ${lastProcessedId}`); } let hasMore = true; while (hasMore) { try { // Fetch batch of documents using cursor-based pagination if (VERBOSE) debug(`Fetching batch ${stats.batches + 1} (lastId: ${lastProcessedId}, limit: ${BATCH_SIZE})...`); const documents = await withRetry(async () => { const query = { _id: { $gte: lastProcessedId } }; if (endObjectId) { query._id.$lte = endObjectId; } if (SourceModel) { // Use Mongoose model return await SourceModel.find(query) .sort({ _id: 1 }) .limit(BATCH_SIZE) .lean() // Use lean() for better performance .maxTimeMS(QUERY_TIMEOUT_MS); // Set query timeout } else { // Use direct collection access const sourceCollection = mongoose.connection.db.collection(SOURCE_COLLECTION); return await sourceCollection.find(query) .sort({ _id: 1 }) .limit(BATCH_SIZE) .maxTimeMS(QUERY_TIMEOUT_MS) .toArray(); } }, `Fetch batch ${stats.batches + 1}`); if (documents.length === 0) { hasMore = false; break; } stats.batches++; // Process the batch await processBatch(documents, targetCollection, stats); // Update execution progress checkpoint updateExecutionProgress(execution, stats, lastProcessedId); // Update progress (only show every 10 batches unless verbose) if (VERBOSE || stats.batches % 10 === 0) { // Show progressive progress without total count const elapsedSeconds = (Date.now() - startTime.getTime()) / 1000; const rate = stats.processed / (elapsedSeconds || 1); debug(`Progress: ${stats.processed} processed (${rate.toFixed(1)} docs/sec) | Batch ${stats.batches}`); } // Update cursor for next iteration lastProcessedId = new mongoose.Types.ObjectId(documents[documents.length - 1]._id.toString()); // Increment the ObjectId to avoid processing the same document twice const buffer = lastProcessedId.toHexString(); const incremented = (parseInt(buffer.slice(-8), 16) + 1).toString(16).padStart(8, '0'); lastProcessedId = new mongoose.Types.ObjectId(buffer.slice(0, -8) + incremented); // Check if we've processed all documents if (documents.length < BATCH_SIZE) { hasMore = false; } // Small delay between batches to reduce database load (skip if processing quickly) if (hasMore && (VERBOSE || stats.batches % 50 === 0)) { await sleep(100); } // Refresh connection every N batches to prevent session timeout if (stats.batches % CONNECTION_REFRESH_INTERVAL === 0) { if (VERBOSE) debug(`Refreshing database connection after ${stats.batches} batches...`); try { // Ping the database to ensure connection is alive await mongoose.connection.db.admin().ping(); if (VERBOSE) debug('Database connection refreshed successfully'); } catch (pingError) { if (VERBOSE) debug(`Database ping failed: ${pingError.message}`); // The retry logic will handle reconnection if needed } } } catch (error) { debug(`Error processing batch ${stats.batches + 1}: ${error.message}`); stats.errors++; // Handle session expiration errors specifically if (error.message.includes('expired sessions') || error.message.includes('session') || error.message.includes('topology') || error.codeName === 'Interrupted') { if (VERBOSE) debug('Session-related error detected, attempting to refresh connection...'); try { await mongoose.connection.db.admin().ping(); if (VERBOSE) debug('Connection refreshed, retrying batch...'); // Don't increment skip/lastProcessedId, retry the same batch continue; } catch (refreshError) { if (VERBOSE) debug(`Connection refresh failed: ${refreshError.message}`); } } // Move to next batch for other errors if (lastProcessedId) { // Increment ObjectId to skip problematic batch const buffer = lastProcessedId.toHexString(); const incremented = (parseInt(buffer.slice(-8), 16) + BATCH_SIZE).toString(16).padStart(8, '0'); lastProcessedId = new mongoose.Types.ObjectId(buffer.slice(0, -8) + incremented); } // Break if too many consecutive errors if (stats.errors > 5) { debug('Too many errors, stopping operation'); break; } } } const endTime = new Date(); const duration = (endTime.getTime() - startTime.getTime()) / 1000; debug('Copy operation completed!'); debug('='.repeat(50)); debug(`Source collection: ${SOURCE_COLLECTION}`); debug(`Target collection: ${TARGET_COLLECTION}`); debug(`Total documents processed: ${stats.processed} (progressive counting enabled)`); debug(`Total batches processed: ${stats.batches}`); debug(`Documents processed: ${stats.processed}`); if (DRY_RUN) { debug(`Dry run count: ${stats.dryRunCount}`); } else { debug(`Documents upserted: ${stats.upserted}`); debug(`Documents modified: ${stats.modified}`); debug(`Documents matched: ${stats.matched}`); } debug(`Errors encountered: ${stats.errors}`); debug(`Duration: ${duration.toFixed(2)} seconds`); debug(`Average rate: ${(stats.processed / duration).toFixed(1)} documents/second`); debug('='.repeat(50)); // Complete execution tracking completeExecution(execution, stats); return stats; } catch (error) { debug(`Fatal error during copy operation: ${error.message}`); // Update execution with error status if (execution) { execution.error = error.message; execution.endTime = new Date().toISOString(); const checkpoint = loadCheckpoint() || {}; checkpoint.currentExecution = execution; saveCheckpoint(checkpoint); } throw error; } } /** * Set up global process error handling */ process .on('uncaughtException', function (err) { debug('Uncaught Exception:', err); process.exit(1); }) .on('unhandledRejection', (reason, p) => { debug('Unhandled Rejection at Promise:', p, 'reason:', reason); process.exit(1); }); /** * Main execution - follows the same pattern as migrateJobIds.js */ async function main() { const dbConn = new DBConnection('Copy Collection Script'); try { await dbConn.initialize({ setupExitHandlers: false }); debug('Database connected'); // Run the copy operation const result = await copyCollection(); // Log final result if (result.errors > 0) { debug(`Operation completed with ${result.errors} errors`); } else { debug('Operation completed successfully'); } } catch (error) { debug('Operation failed:', error); } finally { await dbConn.close(); process.exit(); } } // Execute main function if script is run directly if (require.main === module) { main(); } // Export the main function for programmatic use (for example, from copyCollection.js wrapper) module.exports = { copyCollection, // Keep the old name for backward compatibility copyApplicationDetails: copyCollection };