1496 lines
58 KiB
JavaScript
1496 lines
58 KiB
JavaScript
'use strict';
|
|
|
|
/**
|
|
* Job ID Migration and Copy Worker
|
|
*
|
|
* This script provides two main functions:
|
|
*
|
|
* 1. Migrate job IDs within the same database:
|
|
* - Takes specific job IDs and maps them to new IDs
|
|
* - Updates references in job_logs, job_assigns, applications, app_files, app_details, and invoices
|
|
* - Creates new jobs with new IDs and deletes the old ones in a transaction
|
|
* - Maintains all relationships
|
|
*
|
|
* 2. Copy jobs from a previous database to the current one:
|
|
* - Connects to both source and destination databases
|
|
* - Copies jobs and all related data (logs, assignments, applications, app_files, app_details)
|
|
* - Properly maintains all relationships between entities
|
|
* - Optionally assigns new IDs to the copied jobs
|
|
* - Intelligently handles duplicate records with configurable linking behavior
|
|
* - Provides detailed statistics on created vs linked records
|
|
* - Implements robust error handling with retries and recovery
|
|
* - Shows progress with ETA and detailed statistics during migration
|
|
*
|
|
* Key Features:
|
|
* - Transaction-level retry logic with exponential backoff for error resilience
|
|
* - Detailed statistics tracking for created vs linked records
|
|
* - Comprehensive error categorization and reporting
|
|
* - Progress tracking with estimated time remaining
|
|
* - Configurable behavior for duplicate records and error handling
|
|
*
|
|
* Usage:
|
|
* # To migrate job IDs within the same database:
|
|
* DEBUG=agm:shift-job-ids node server/workers/migrateJobIds.js
|
|
*
|
|
* # To copy jobs from a previous database:
|
|
* DEBUG=agm:shift-job-ids SOURCE_DB_URI="mongodb://user:pass@host:port/dbname" JOB_IDS="123,456,789" node server/workers/migrateJobIds.js copy
|
|
*
|
|
* Environment Variables:
|
|
* - DRY_RUN=true # Only logs what would happen without making changes
|
|
* - SOURCE_DB_URI # MongoDB URI for source database (required for copy operation)
|
|
* - JOB_IDS # Comma-separated list of job IDs to copy (alternative to command-line argument)
|
|
* - USE_NEW_IDS=true # Whether to assign new IDs to copied jobs (default: false)
|
|
* - KEEP_RELATED_IDS=true # Whether to keep original IDs for applications, app files, and app details (default: false)
|
|
* - LINK_TO_EXISTING=false # Whether to link related data to existing records when duplicates are found (default: true)
|
|
* - COPY_APP_DETAILS=false # Whether to copy application details (default: true)
|
|
* - MAX_RETRIES=3 # Maximum number of retries for network errors and transactions (default: 3)
|
|
* - RETRY_DELAY=1000 # Base delay in ms between retries, will be multiplied by 2^retryCount (default: 1000)
|
|
* - SHOW_PROGRESS=true # Whether to show a progress indicator during migration (default: true)
|
|
* - USE_CHUNKED_PROCESSING=true # Use chunked processing for all jobs (default: true)
|
|
* - USE_SEPARATE_TRANSACTIONS=false # Use separate transactions for all entity types (default: false)
|
|
*/
|
|
|
|
const debug = require('debug')('agm:reconcile-jobs'),
|
|
env = require('../helpers/env.js'),
|
|
isProd = env.PRODUCTION,
|
|
dbConn = require('../helpers/db/connect.js')(),
|
|
mongoose = require('mongoose'),
|
|
mongoUtil = require('../helpers/mongo.js'),
|
|
mongoEnhanced = require('../helpers/mongo_enhanced.js'),
|
|
utils = require('../helpers/utils.js'),
|
|
Job = require('../model/job.js'),
|
|
JobLog = require('../model/job_log.js'),
|
|
JobAssign = require('../model/job_assign.js'),
|
|
Application = require('../model/application.js'),
|
|
AppFile = require('../model/application_file.js'),
|
|
AppDetail = require('../model/application_detail.js'),
|
|
Invoice = require('../model/invoice.js');
|
|
|
|
// New starting ID
|
|
const NEW_START_ID = isProd ? 91105 + 1 : 30;
|
|
|
|
// Specific job IDs to migrate
|
|
const JOB_IDS_TO_MIGRATE = isProd ?
|
|
[
|
|
87628, 87629, 87631, 87637, 87640, 87645, 87646,
|
|
87630, 87632, 87633, 87634, 87635, 87638, 87639, 87641, 87642, 87643, 87644, 87647
|
|
]
|
|
: [20];
|
|
|
|
// Set to true to only log what would happen without making changes
|
|
const DRY_RUN = false;
|
|
|
|
// Use enhanced transaction options from mongoEnhanced for consistency
|
|
const DEFAULT_TRANSACTION_OPTIONS = {
|
|
...mongoEnhanced.DEFAULT_TRANSACTION_OPTIONS,
|
|
maxCommitTimeMS: 60000, // Increased to 60 seconds for better reliability
|
|
};
|
|
|
|
// Constants for retry mechanism
|
|
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES || '7', 10); // Increased from 5 to 7
|
|
const RETRY_DELAY_MS = parseInt(process.env.RETRY_DELAY || '1000', 10);
|
|
const USE_CHUNKED_PROCESSING = process.env.USE_CHUNKED_PROCESSING !== 'false';
|
|
const USE_SEPARATE_TRANSACTIONS = process.env.USE_SEPARATE_TRANSACTIONS === 'true';
|
|
const COPY_APP_DETAILS = process.env.COPY_APP_DETAILS !== 'false'; // Default true, set to false to skip
|
|
|
|
// Chunk sizes for processing - smaller chunks reduce transaction timeout risk
|
|
const CHUNK_SIZE = parseInt(process.env.CHUNK_SIZE || '5', 10); // Reduced from default 10 to 5
|
|
|
|
// Use enhanced retry logic from mongoEnhanced
|
|
const withTransactionRetry = mongoEnhanced.withTransactionRetry;
|
|
|
|
// Enhanced transaction wrapper using mongoEnhanced for better reliability
|
|
async function withTransaction(operation, sessionOptions = DEFAULT_TRANSACTION_OPTIONS) {
|
|
return mongoEnhanced.enhancedRunInTransaction(operation, sessionOptions);
|
|
}
|
|
|
|
// Use enhanced safe operation from mongoEnhanced
|
|
const safeMongoOperation = mongoEnhanced.safeMongoOperation;
|
|
|
|
/**
|
|
* Set up global process error handling
|
|
*/
|
|
process
|
|
.on('uncaughtException', function (err) {
|
|
debug(err);
|
|
process.exit(1);
|
|
})
|
|
.on('unhandledRejection', (reason, p) => {
|
|
debug(reason, 'Unhandled Rejection at Promise', p);
|
|
});
|
|
|
|
/**
|
|
* Shift specific job IDs to new IDs starting from NEW_START_ID
|
|
* Uses MongoDB transactions to ensure atomicity
|
|
* Updates all references in related collections
|
|
* Processes one job at a time to ensure data consistency
|
|
* Creates new jobs with new IDs and deletes the old ones
|
|
*
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function shiftSpecificJobIds() {
|
|
// First get the jobs outside of transaction to determine what we need to migrate
|
|
const jobs = await Job.find({ _id: { $in: JOB_IDS_TO_MIGRATE } }).sort({ _id: 1 });
|
|
debug(`Found ${jobs.length} specified jobs to migrate`);
|
|
|
|
if (!jobs.length) {
|
|
debug('No jobs to migrate');
|
|
return;
|
|
}
|
|
|
|
// Create an ID mapping (oldId -> newId)
|
|
const idMapping = {};
|
|
jobs.forEach((job, index) => {
|
|
idMapping[job._id] = NEW_START_ID + index;
|
|
});
|
|
|
|
debug('ID mapping created:');
|
|
debug(Object.entries(idMapping).map(([oldId, newId]) => ({ oldId, newId })));
|
|
|
|
if (DRY_RUN) {
|
|
debug('DRY RUN MODE - No changes will be made');
|
|
return;
|
|
}
|
|
|
|
// Process one job at a time for better data consistency
|
|
for (const job of jobs) {
|
|
const oldJobId = job._id;
|
|
const newJobId = idMapping[oldJobId];
|
|
|
|
debug(`Processing job ${oldJobId} -> ${newJobId}`);
|
|
|
|
// Perform the migration using enhanced transaction wrapper
|
|
await withTransaction(async (session) => {
|
|
// 1. Update references in related collections using safe operations
|
|
// 1.1 Update JobLog references
|
|
const jobLogResult = await safeMongoOperation(
|
|
async (s) => JobLog.updateMany(
|
|
{ job: oldJobId },
|
|
{ $set: { job: newJobId } },
|
|
{ session: s }
|
|
),
|
|
session
|
|
);
|
|
debug(`Updated ${jobLogResult.modifiedCount} job logs for job ${oldJobId}`);
|
|
|
|
// 1.2 Update JobAssign references
|
|
const jobAssignResult = await safeMongoOperation(
|
|
async (s) => JobAssign.updateMany(
|
|
{ job: oldJobId },
|
|
{ $set: { job: newJobId } },
|
|
{ session: s }
|
|
),
|
|
session
|
|
);
|
|
debug(`Updated ${jobAssignResult.modifiedCount} job assignments for job ${oldJobId}`);
|
|
|
|
// 1.3 Update Application references
|
|
const appResult = await safeMongoOperation(
|
|
async (s) => Application.updateMany(
|
|
{ jobId: oldJobId },
|
|
{ $set: { jobId: newJobId } },
|
|
{ session: s }
|
|
),
|
|
session
|
|
);
|
|
debug(`Updated ${appResult.modifiedCount} applications for job ${oldJobId}`);
|
|
|
|
// 1.4 Get all affected application IDs to process app files and app details
|
|
const affectedApplications = await safeMongoOperation(
|
|
async (s) => Application.find(
|
|
{ jobId: newJobId },
|
|
{ _id: 1 },
|
|
{ session: s }
|
|
),
|
|
session
|
|
);
|
|
|
|
const applicationIds = affectedApplications.map(app => app._id);
|
|
|
|
if (applicationIds.length) {
|
|
debug(`Found ${applicationIds.length} applications affected by job ID change`);
|
|
}
|
|
|
|
// 1.5 Update Invoice references that contain this job
|
|
const invoicesToUpdate = await safeMongoOperation(
|
|
async (s) => Invoice.find({
|
|
'jobs.job': oldJobId
|
|
}, { _id: 1 }, { session: s }),
|
|
session
|
|
);
|
|
|
|
if (invoicesToUpdate.length) {
|
|
debug(`Found ${invoicesToUpdate.length} invoices referencing job ${oldJobId}`);
|
|
|
|
// For each invoice, update the job references in the jobs array
|
|
for (const invoice of invoicesToUpdate) {
|
|
// We need to fetch, modify, and save each invoice
|
|
const fullInvoice = await safeMongoOperation(
|
|
async (s) => Invoice.findById(invoice._id, {}, { session: s }),
|
|
session
|
|
);
|
|
|
|
let updated = false;
|
|
if (fullInvoice.jobs && fullInvoice.jobs.length) {
|
|
fullInvoice.jobs.forEach(jobItem => {
|
|
if (jobItem.job && jobItem.job == oldJobId) {
|
|
jobItem.job = newJobId;
|
|
updated = true;
|
|
}
|
|
});
|
|
|
|
if (updated) {
|
|
await safeMongoOperation(
|
|
async (s) => fullInvoice.save({ session: s }),
|
|
session
|
|
);
|
|
debug(`Updated invoice ${fullInvoice._id} with new job ID`);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2. Update the job document with the new ID
|
|
// Note: MongoDB doesn't allow direct updates to the _id field
|
|
// We need to use a two-step process: 1) Create new document with new ID, 2) Delete old document
|
|
|
|
// Get the current state of the job (in case it was modified since we queried it)
|
|
const currentJob = await safeMongoOperation(
|
|
async (s) => Job.findById(oldJobId, {}, { session: s }),
|
|
session
|
|
);
|
|
|
|
if (!currentJob) {
|
|
debug(`Warning: Could not find job ${oldJobId} to update, it may have been deleted or modified`);
|
|
} else {
|
|
// Create a new document based on the current state but with the new ID
|
|
const jobData = currentJob.toObject();
|
|
delete jobData._id; // Remove the old ID
|
|
|
|
// Insert a new document with the new ID using upsert to avoid duplicate key errors
|
|
await safeMongoOperation(
|
|
async (s) => Job.collection.replaceOne({ _id: newJobId }, { ...jobData, _id: newJobId }, { session: s, upsert: true }),
|
|
session
|
|
);
|
|
debug(`Created new job with ID ${newJobId}`);
|
|
|
|
// Delete the old document with the old ID
|
|
const deleteResult = await safeMongoOperation(
|
|
async (s) => Job.deleteOne({ _id: oldJobId }, { session: s }),
|
|
session
|
|
);
|
|
|
|
if (deleteResult.deletedCount === 1) {
|
|
debug(`Successfully migrated job ID from ${oldJobId} to ${newJobId}`);
|
|
} else {
|
|
debug(`Warning: Failed to delete old job with ID ${oldJobId}`);
|
|
throw new Error(`Failed to migrate job with ID ${oldJobId}`);
|
|
}
|
|
}
|
|
|
|
debug(`Transaction for job ${oldJobId} -> ${newJobId} completed successfully`);
|
|
});
|
|
|
|
debug(`Successfully migrated job ${oldJobId} -> ${newJobId}`);
|
|
}
|
|
|
|
debug('All jobs have been successfully migrated');
|
|
}
|
|
|
|
/**
|
|
* Copy jobs from a previous database to the current database
|
|
* Connects to both databases and copies all related data
|
|
*
|
|
* @param {Object} options - Copy options
|
|
* @param {string} options.sourceDbUri - Source database URI
|
|
* @param {Array<number>} options.jobIds - Job IDs to copy
|
|
* @param {boolean} options.useNewIds - Whether to assign new IDs to copied jobs
|
|
* @param {boolean} options.keepRelatedIds - Whether to keep original IDs for related entities
|
|
* @param {boolean} options.linkToExisting - Whether to link to existing records
|
|
* @param {number} options.newStartId - Starting ID for new jobs
|
|
* @param {boolean} options.dryRun - Whether to only log without making changes
|
|
* @returns {Promise<Object>} Statistics about the copy operation
|
|
*/
|
|
async function copyJobsFromPreviousDb({
|
|
sourceDbUri,
|
|
jobIds,
|
|
useNewIds = false,
|
|
keepRelatedIds = false,
|
|
linkToExisting = true,
|
|
newStartId = NEW_START_ID,
|
|
dryRun = false
|
|
}) {
|
|
debug(`Copying jobs from previous database: ${jobIds.join(', ')}`);
|
|
debug(`Options: useNewIds=${useNewIds}, keepRelatedIds=${keepRelatedIds}, linkToExisting=${linkToExisting}`);
|
|
|
|
if (dryRun) {
|
|
debug('DRY RUN MODE - No changes will be made');
|
|
}
|
|
|
|
// Initialize statistics
|
|
const stats = {
|
|
jobs: 0,
|
|
jobLogs: 0,
|
|
jobAssigns: 0,
|
|
applications: 0,
|
|
appFiles: 0,
|
|
appDetails: 0,
|
|
linkedLogs: 0,
|
|
linkedAssigns: 0,
|
|
linkedApplications: 0,
|
|
linkedFiles: 0,
|
|
linkedDetails: 0,
|
|
failureReasons: {}
|
|
};
|
|
|
|
// Results to return
|
|
const result = {
|
|
processed: 0,
|
|
succeeded: 0,
|
|
skipped: 0,
|
|
failed: 0,
|
|
skippedIds: [],
|
|
failedIds: [],
|
|
stats,
|
|
linked: {
|
|
logs: 0,
|
|
assigns: 0,
|
|
applications: 0,
|
|
files: 0,
|
|
details: 0
|
|
}
|
|
};
|
|
|
|
// Connect to source database
|
|
let sourceConnection;
|
|
try {
|
|
sourceConnection = await mongoose.createConnection(sourceDbUri, {
|
|
useNewUrlParser: true,
|
|
useUnifiedTopology: true
|
|
});
|
|
// Wait for connection to be open before using models
|
|
await new Promise((resolve, reject) => {
|
|
sourceConnection.once('open', resolve);
|
|
sourceConnection.once('error', reject);
|
|
});
|
|
debug('Connected to source database');
|
|
} catch (error) {
|
|
debug(`Failed to connect to source database: ${error.message}`);
|
|
throw error;
|
|
}
|
|
|
|
// Create models for the source database
|
|
const SourceJob = sourceConnection.model('Job', Job.schema);
|
|
const SourceJobLog = sourceConnection.model('Job_Log', JobLog.schema);
|
|
const SourceJobAssign = sourceConnection.model('Job_Assign', JobAssign.schema);
|
|
const SourceApplication = sourceConnection.model('Application', Application.schema);
|
|
const SourceAppFile = sourceConnection.model('AppFile', AppFile.schema);
|
|
const SourceAppDetail = sourceConnection.model('Application_Detail', AppDetail.schema);
|
|
|
|
try {
|
|
// Process each job
|
|
for (const jobId of jobIds) {
|
|
debug(`Processing job ID ${jobId}...`);
|
|
result.processed++;
|
|
|
|
// Skip if job already exists in destination database
|
|
const existingJob = await Job.findById(jobId);
|
|
if (existingJob) {
|
|
debug(`Job ${jobId} already exists in destination database,...`);
|
|
// debug(`Job ${jobId} already exists in destination database, skipping`);
|
|
// result.skipped++;
|
|
// result.skippedIds.push(jobId);
|
|
// continue;
|
|
}
|
|
|
|
try {
|
|
// Fetch job from source database
|
|
const sourceJob = await SourceJob.findById(jobId);
|
|
if (!sourceJob) {
|
|
debug(`Job ${jobId} not found in source database`);
|
|
result.failed++;
|
|
result.failedIds.push(jobId);
|
|
stats.failureReasons[jobId] = {
|
|
category: 'NotFound',
|
|
message: 'Job not found in source database'
|
|
};
|
|
continue;
|
|
}
|
|
|
|
// Determine new job ID
|
|
const newJobId = useNewIds ? newStartId + result.succeeded : jobId;
|
|
|
|
// Fetch related data
|
|
const jobLogs = await SourceJobLog.find({ job: jobId });
|
|
const jobAssigns = await SourceJobAssign.find({ job: jobId });
|
|
const applications = await SourceApplication.find({ jobId });
|
|
|
|
debug(`Found ${jobLogs.length} logs, ${jobAssigns.length} assignments, ${applications.length} applications`);
|
|
|
|
// Create mappings for IDs
|
|
const applicationIdMapping = {};
|
|
const appFileIdMapping = {};
|
|
|
|
// Create lists for related data
|
|
const appFiles = [];
|
|
const appDetails = [];
|
|
|
|
// Fetch application files and details
|
|
for (const app of applications) {
|
|
// Generate new ID for application if needed
|
|
const newAppId = keepRelatedIds ? app._id : new mongoose.Types.ObjectId();
|
|
applicationIdMapping[app._id] = newAppId;
|
|
|
|
// Get application files
|
|
const files = await SourceAppFile.find({ appId: app._id });
|
|
for (const file of files) {
|
|
const newFileId = keepRelatedIds ? file._id : new mongoose.Types.ObjectId();
|
|
appFileIdMapping[file._id] = newFileId;
|
|
appFiles.push(file);
|
|
}
|
|
|
|
// Get application details (if enabled)
|
|
if (COPY_APP_DETAILS) {
|
|
// Get application details by both appId and fileId relationships
|
|
const fileIds = await SourceAppFile.find({ appId: app._id }).distinct('_id');
|
|
const detailsByAppId = await SourceAppDetail.find({ appId: app._id });
|
|
const detailsByFileId = fileIds.length > 0 ? await SourceAppDetail.find({ fileId: { $in: fileIds } }) : [];
|
|
|
|
// Combine and deduplicate application details
|
|
const allDetails = [...detailsByAppId, ...detailsByFileId];
|
|
const uniqueDetails = allDetails.filter((detail, index, self) =>
|
|
index === self.findIndex(d => d._id.toString() === detail._id.toString())
|
|
);
|
|
appDetails.push(...uniqueDetails);
|
|
}
|
|
}
|
|
|
|
debug(`Found ${appFiles.length} app files${COPY_APP_DETAILS ? `, ${appDetails.length} app details` : ' (app details skipped)'}`);
|
|
|
|
if (dryRun) {
|
|
debug(`DRY RUN: Would copy job ${jobId} with ${jobLogs.length} logs, ${jobAssigns.length} assignments, ${applications.length} applications, ${appFiles.length} files${COPY_APP_DETAILS ? `, ${appDetails.length} details` : ' (app details skipped)'}`);
|
|
result.succeeded++;
|
|
continue;
|
|
}
|
|
|
|
// Use different processing approaches based on configuration
|
|
let success = false;
|
|
|
|
if (USE_SEPARATE_TRANSACTIONS) {
|
|
// Use completely separate transactions
|
|
debug(`Using separate transactions for job ${jobId}`);
|
|
success = await processJobWithSeparateTransactions(
|
|
sourceJob, jobId, newJobId, jobLogs, jobAssigns, applications,
|
|
appFiles, appDetails, applicationIdMapping, appFileIdMapping, keepRelatedIds, stats
|
|
);
|
|
} else if (USE_CHUNKED_PROCESSING) {
|
|
// Use chunked processing for regular jobs
|
|
debug(`Using chunked processing for job ${jobId}`);
|
|
success = await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
try {
|
|
session.startTransaction();
|
|
|
|
// 1. Create the job using upsert to avoid duplicate key errors
|
|
const jobData = sourceJob.toObject();
|
|
const newJob = { ...jobData, _id: newJobId };
|
|
const jobUpsertResult = await Job.collection.replaceOne(
|
|
{ _id: newJobId },
|
|
newJob,
|
|
{ session, upsert: true }
|
|
);
|
|
|
|
if (jobUpsertResult.upsertedCount > 0) {
|
|
debug(`Created job with ID ${newJobId} in destination database via upsert`);
|
|
stats.jobs++;
|
|
} else if (jobUpsertResult.matchedCount > 0) {
|
|
debug(`Updated existing job with ID ${newJobId} in destination database`);
|
|
stats.jobs++;
|
|
} else {
|
|
debug(`Warning: Job upsert for ${newJobId} did not create or match any documents`);
|
|
}
|
|
|
|
// 2. Process all related data in chunks
|
|
await processRegularJobInChunks(
|
|
sourceJob, jobId, newJobId, jobLogs, jobAssigns, applications,
|
|
appFiles, appDetails, applicationIdMapping, appFileIdMapping, keepRelatedIds, session, linkToExisting, stats
|
|
);
|
|
|
|
await session.commitTransaction();
|
|
return true;
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
} else {
|
|
// Use a single transaction for the entire job
|
|
debug(`Using single transaction for job ${jobId}`);
|
|
success = await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
try {
|
|
session.startTransaction();
|
|
|
|
// 1. Create the job using upsert to avoid duplicate key errors
|
|
const jobData = sourceJob.toObject();
|
|
const newJob = { ...jobData, _id: newJobId };
|
|
const jobUpsertResult = await Job.collection.replaceOne(
|
|
{ _id: newJobId },
|
|
newJob,
|
|
{ session, upsert: true }
|
|
);
|
|
|
|
if (jobUpsertResult.upsertedCount > 0) {
|
|
debug(`Created job with ID ${newJobId} in destination database via upsert`);
|
|
stats.jobs++;
|
|
} else if (jobUpsertResult.matchedCount > 0) {
|
|
debug(`Updated existing job with ID ${newJobId} in destination database`);
|
|
stats.jobs++;
|
|
} else {
|
|
debug(`Warning: Job upsert for ${newJobId} did not create or match any documents`);
|
|
}
|
|
|
|
// 2. Create job logs
|
|
for (const sourceJobLog of jobLogs) {
|
|
const logData = sourceJobLog.toObject();
|
|
const logId = keepRelatedIds ? sourceJobLog._id : new mongoose.Types.ObjectId();
|
|
const newJobLog = { ...logData, job: newJobId, _id: logId };
|
|
await safeInsertWithRecovery(JobLog.collection, newJobLog, { job: newJobId }, session, 'linkedLogs', false, linkToExisting, stats);
|
|
}
|
|
stats.jobLogs += jobLogs.length;
|
|
|
|
// 3. Create job assignments
|
|
for (const sourceJobAssign of jobAssigns) {
|
|
const assignData = sourceJobAssign.toObject();
|
|
const assignId = keepRelatedIds ? sourceJobAssign._id : new mongoose.Types.ObjectId();
|
|
const newJobAssign = { ...assignData, job: newJobId, _id: assignId };
|
|
await safeInsertWithRecovery(JobAssign.collection, newJobAssign, { job: newJobId }, session, 'linkedAssigns', false, linkToExisting, stats);
|
|
}
|
|
stats.jobAssigns += jobAssigns.length;
|
|
|
|
// 4. Create applications and related data
|
|
for (const sourceApp of applications) {
|
|
const appData = sourceApp.toObject();
|
|
const newAppId = applicationIdMapping[sourceApp._id];
|
|
const newApp = { ...appData, jobId: newJobId, _id: newAppId };
|
|
await safeInsertWithRecovery(Application.collection, newApp, { jobId: newJobId }, session, 'linkedApplications', false, linkToExisting, stats);
|
|
stats.applications++;
|
|
|
|
// 5. Create application files
|
|
const appFilesForThisApp = appFiles.filter(file => file.appId.toString() === sourceApp._id.toString());
|
|
for (const sourceAppFile of appFilesForThisApp) {
|
|
const fileData = sourceAppFile.toObject();
|
|
const newFileId = appFileIdMapping[sourceAppFile._id];
|
|
const newAppFile = {
|
|
...fileData,
|
|
appId: newAppId,
|
|
_id: newFileId
|
|
};
|
|
await safeInsertWithRecovery(AppFile.collection, newAppFile, { appId: newAppId }, session, 'linkedFiles', false, linkToExisting, stats);
|
|
stats.appFiles++;
|
|
}
|
|
|
|
// 6. Create application details (if enabled)
|
|
if (COPY_APP_DETAILS) {
|
|
// Filter application details by both appId and fileId relationships
|
|
const sourceAppFileIds = appFiles
|
|
.filter(file => file.appId.toString() === sourceApp._id.toString())
|
|
.map(file => file._id.toString());
|
|
|
|
const appDetailsForThisApp = appDetails.filter(detail =>
|
|
detail.appId?.toString() === sourceApp._id.toString() ||
|
|
(detail.fileId && sourceAppFileIds.includes(detail.fileId.toString()))
|
|
);
|
|
|
|
for (const sourceAppDetail of appDetailsForThisApp) {
|
|
const detailData = sourceAppDetail.toObject();
|
|
const detailId = keepRelatedIds ? sourceAppDetail._id : new mongoose.Types.ObjectId();
|
|
|
|
// Determine the fileId to use
|
|
let newFileId = null;
|
|
if (sourceAppDetail.fileId && appFileIdMapping[sourceAppDetail.fileId]) {
|
|
newFileId = appFileIdMapping[sourceAppDetail.fileId];
|
|
}
|
|
|
|
const newAppDetail = {
|
|
...detailData,
|
|
fileId: newFileId, // Primary relationship using fileId
|
|
appId: sourceAppDetail.appId ? newAppId : undefined, // Preserve appId if it exists
|
|
_id: detailId
|
|
};
|
|
|
|
// Remove undefined fields
|
|
if (newAppDetail.appId === undefined) delete newAppDetail.appId;
|
|
|
|
await safeInsertWithRecovery(AppDetail.collection, newAppDetail, { fileId: newFileId }, session, 'linkedDetails', false, linkToExisting, stats);
|
|
stats.appDetails++;
|
|
}
|
|
}
|
|
}
|
|
|
|
await session.commitTransaction();
|
|
return true;
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
if (success) {
|
|
debug(`Successfully copied job ${jobId} to ${newJobId}`);
|
|
result.succeeded++;
|
|
} else {
|
|
debug(`Failed to copy job ${jobId}`);
|
|
result.failed++;
|
|
result.failedIds.push(jobId);
|
|
stats.failureReasons[jobId] = {
|
|
category: 'ProcessingError',
|
|
message: 'Failed to process job'
|
|
};
|
|
}
|
|
} catch (error) {
|
|
debug(`Error processing job ${jobId}: ${error.message}`);
|
|
result.failed++;
|
|
result.failedIds.push(jobId);
|
|
stats.failureReasons[jobId] = {
|
|
category: error.name || 'UnknownError',
|
|
message: error.message
|
|
};
|
|
}
|
|
}
|
|
|
|
// Update linked counts in result
|
|
result.linked = {
|
|
logs: stats.linkedLogs,
|
|
assigns: stats.linkedAssigns,
|
|
applications: stats.linkedApplications,
|
|
files: stats.linkedFiles,
|
|
details: stats.linkedDetails
|
|
};
|
|
|
|
return result;
|
|
} finally {
|
|
// Close source database connection
|
|
if (sourceConnection) {
|
|
await sourceConnection.close();
|
|
debug('Closed connection to source database');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process a regular job in chunks to reduce transaction size and duration
|
|
*
|
|
* @param {Object} sourceJob - The source job
|
|
* @param {Number} sourceJobId - The original job ID from source database
|
|
* @param {Number} newJobId - The new job ID in destination database
|
|
* @param {Array} jobLogs - Job logs to copy
|
|
* @param {Array} jobAssigns - Job assignments to copy
|
|
* @param {Array} applications - Applications to copy
|
|
* @param {Array} appFiles - Application files to copy
|
|
* @param {Array} appDetails - Application details to copy
|
|
* @param {Object} applicationIdMapping - Mapping from old to new application IDs
|
|
* @param {Object} appFileIdMapping - Mapping from old to new application file IDs
|
|
* @param {Boolean} keepRelatedIds - Whether to keep original IDs for related entities
|
|
* @param {Object} session - MongoDB session
|
|
* @param {Boolean} linkToExisting - Whether to link to existing records
|
|
* @param {Object} stats - Statistics object to update
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function processRegularJobInChunks(
|
|
sourceJob,
|
|
sourceJobId,
|
|
newJobId,
|
|
jobLogs,
|
|
jobAssigns,
|
|
applications,
|
|
appFiles,
|
|
appDetails,
|
|
applicationIdMapping,
|
|
appFileIdMapping,
|
|
keepRelatedIds,
|
|
session,
|
|
linkToExisting,
|
|
stats
|
|
) {
|
|
// Constants for batch sizes - using configurable chunk sizes
|
|
const LOG_BATCH_SIZE = parseInt(process.env.LOG_BATCH_SIZE || '30', 10); // Reduced from 50 to 30
|
|
const ASSIGN_BATCH_SIZE = parseInt(process.env.ASSIGN_BATCH_SIZE || '15', 10); // Reduced from 20 to 15
|
|
const APP_BATCH_SIZE = CHUNK_SIZE; // Use the main chunk size (default 5)
|
|
|
|
// 1. Process job logs in batches
|
|
if (jobLogs.length > 0) {
|
|
debug(`Processing ${jobLogs.length} job logs in batches of ${LOG_BATCH_SIZE}`);
|
|
for (let i = 0; i < jobLogs.length; i += LOG_BATCH_SIZE) {
|
|
const batch = jobLogs.slice(i, i + LOG_BATCH_SIZE);
|
|
debug(`Processing batch of ${batch.length} job logs (${i + 1}-${Math.min(i + LOG_BATCH_SIZE, jobLogs.length)} of ${jobLogs.length})`);
|
|
|
|
const logCreationPromises = batch.map(async (sourceJobLog) => {
|
|
const logData = sourceJobLog.toObject();
|
|
const logId = keepRelatedIds ? sourceJobLog._id : new mongoose.Types.ObjectId();
|
|
const newJobLog = { ...logData, job: newJobId, _id: logId };
|
|
return safeInsertWithRecovery(JobLog.collection, newJobLog, { job: newJobId }, session, 'linkedLogs', false, linkToExisting, stats);
|
|
});
|
|
|
|
await Promise.all(logCreationPromises);
|
|
stats.jobLogs += batch.length;
|
|
|
|
// Add a small delay between batches to reduce contention
|
|
if (i + LOG_BATCH_SIZE < jobLogs.length) {
|
|
await new Promise(resolve => setTimeout(resolve, 50));
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2. Process job assignments in batches
|
|
if (jobAssigns.length > 0) {
|
|
debug(`Processing ${jobAssigns.length} job assignments in batches of ${ASSIGN_BATCH_SIZE}`);
|
|
for (let i = 0; i < jobAssigns.length; i += ASSIGN_BATCH_SIZE) {
|
|
const batch = jobAssigns.slice(i, i + ASSIGN_BATCH_SIZE);
|
|
debug(`Processing batch of ${batch.length} job assignments (${i + 1}-${Math.min(i + ASSIGN_BATCH_SIZE, jobAssigns.length)} of ${jobAssigns.length})`);
|
|
|
|
const assignCreationPromises = batch.map(async (sourceJobAssign) => {
|
|
const assignData = sourceJobAssign.toObject();
|
|
const assignId = keepRelatedIds ? sourceJobAssign._id : new mongoose.Types.ObjectId();
|
|
const newJobAssign = { ...assignData, job: newJobId, _id: assignId };
|
|
return safeInsertWithRecovery(JobAssign.collection, newJobAssign, { job: newJobId }, session, 'linkedAssigns', false, linkToExisting, stats);
|
|
});
|
|
|
|
await Promise.all(assignCreationPromises);
|
|
stats.jobAssigns += batch.length;
|
|
|
|
// Add a small delay between batches to reduce contention
|
|
if (i + ASSIGN_BATCH_SIZE < jobAssigns.length) {
|
|
await new Promise(resolve => setTimeout(resolve, 50));
|
|
}
|
|
}
|
|
}
|
|
|
|
// 3. Process applications and related data in batches
|
|
if (applications.length > 0) {
|
|
debug(`Processing ${applications.length} applications in batches of ${APP_BATCH_SIZE}`);
|
|
|
|
for (let i = 0; i < applications.length; i += APP_BATCH_SIZE) {
|
|
const batch = applications.slice(i, i + APP_BATCH_SIZE);
|
|
debug(`Processing batch of ${batch.length} applications (${i + 1}-${Math.min(i + APP_BATCH_SIZE, applications.length)} of ${applications.length})`);
|
|
|
|
for (const sourceApp of batch) {
|
|
// Create application
|
|
const appData = sourceApp.toObject();
|
|
const newAppId = applicationIdMapping[sourceApp._id];
|
|
const newApp = { ...appData, jobId: newJobId, _id: newAppId };
|
|
await safeInsertWithRecovery(Application.collection, newApp, { jobId: newJobId }, session, 'linkedApplications', false, linkToExisting, stats);
|
|
stats.applications++;
|
|
|
|
// Find related app files and details
|
|
const relatedAppFiles = appFiles.filter(file => file.appId.toString() === sourceApp._id.toString());
|
|
|
|
// Filter application details by both appId and fileId relationships
|
|
let relatedAppDetails = [];
|
|
if (COPY_APP_DETAILS) {
|
|
const sourceAppFileIds = relatedAppFiles.map(file => file._id.toString());
|
|
relatedAppDetails = appDetails.filter(detail =>
|
|
detail.appId?.toString() === sourceApp._id.toString() ||
|
|
(detail.fileId && sourceAppFileIds.includes(detail.fileId.toString()))
|
|
);
|
|
}
|
|
|
|
// Process app files sequentially to avoid transaction conflicts
|
|
if (relatedAppFiles.length > 0) {
|
|
debug(`Processing ${relatedAppFiles.length} app files sequentially for application ${newAppId}`);
|
|
for (let fileIndex = 0; fileIndex < relatedAppFiles.length; fileIndex++) {
|
|
const sourceAppFile = relatedAppFiles[fileIndex];
|
|
const fileData = sourceAppFile.toObject();
|
|
const newFileId = appFileIdMapping[sourceAppFile._id];
|
|
const newAppFile = {
|
|
...fileData,
|
|
appId: newAppId,
|
|
_id: newFileId
|
|
};
|
|
|
|
debug(`Processing app file ${fileIndex + 1}/${relatedAppFiles.length} (ID: ${newFileId})`);
|
|
const updateFields = { appId: newAppId };
|
|
await safeInsertWithRecovery(
|
|
AppFile.collection,
|
|
newAppFile,
|
|
updateFields,
|
|
session,
|
|
'linkedFiles',
|
|
false, // Standard retry logic for app files
|
|
linkToExisting,
|
|
stats
|
|
);
|
|
stats.appFiles++;
|
|
|
|
// Add a small delay between app file operations to reduce contention
|
|
if (fileIndex < relatedAppFiles.length - 1) {
|
|
await new Promise(resolve => setTimeout(resolve, 25 + Math.random() * 25));
|
|
}
|
|
}
|
|
debug(`Completed processing ${relatedAppFiles.length} app files for application ${newAppId}`);
|
|
}
|
|
|
|
// Process app details using bulk insert (if enabled)
|
|
if (COPY_APP_DETAILS && relatedAppDetails.length > 0) {
|
|
debug(`Processing ${relatedAppDetails.length} app details using bulk insert for application ${newAppId}`);
|
|
|
|
// Prepare all app details for bulk insert
|
|
const appDetailsToInsert = relatedAppDetails.map((sourceAppDetail) => {
|
|
const detailData = sourceAppDetail.toObject();
|
|
const detailId = keepRelatedIds ? sourceAppDetail._id : new mongoose.Types.ObjectId();
|
|
|
|
// Determine the fileId to use
|
|
let newFileId = null;
|
|
if (sourceAppDetail.fileId && appFileIdMapping[sourceAppDetail.fileId]) {
|
|
newFileId = appFileIdMapping[sourceAppDetail.fileId];
|
|
}
|
|
|
|
const newAppDetail = {
|
|
...detailData,
|
|
fileId: newFileId, // Primary relationship using fileId
|
|
appId: sourceAppDetail.appId ? newAppId : undefined, // Preserve appId if it exists
|
|
_id: detailId
|
|
};
|
|
|
|
// Remove undefined fields
|
|
if (newAppDetail.appId === undefined) delete newAppDetail.appId;
|
|
|
|
return newAppDetail;
|
|
});
|
|
|
|
// Process in chunks of 1000 documents
|
|
const chunks = utils.chunkArray(appDetailsToInsert, 1000);
|
|
debug(`Processing ${appDetailsToInsert.length} app details in ${chunks.length} chunks of max 1000 documents each`);
|
|
|
|
// Process chunks sequentially using native async/await
|
|
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
|
|
const chunk = chunks[chunkIndex];
|
|
try {
|
|
await AppDetail.insertMany(chunk, { rawResult: true, ordered: true, session });
|
|
debug(`Successfully inserted chunk ${chunkIndex + 1}/${chunks.length} with ${chunk.length} app details`);
|
|
} catch (err) {
|
|
debug(`Error in bulk insert for app details chunk ${chunkIndex + 1}/${chunks.length}: ${err.message}`);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
stats.appDetails += relatedAppDetails.length;
|
|
debug(`Completed bulk insert of ${relatedAppDetails.length} app details for application ${newAppId}`);
|
|
}
|
|
}
|
|
|
|
// Add a small delay between application batches to reduce contention
|
|
if (i + APP_BATCH_SIZE < applications.length) {
|
|
await new Promise(resolve => setTimeout(resolve, 50 + Math.random() * 100));
|
|
}
|
|
}
|
|
}
|
|
|
|
debug(`All data for job ${newJobId} processed in smaller chunks`);
|
|
}
|
|
|
|
/**
|
|
* Safe insert with recovery for various errors using upsert operations
|
|
* Uses replaceOne with upsert to handle both new documents and duplicates automatically
|
|
* without causing transaction aborts
|
|
*
|
|
* @param {Object} collection - MongoDB collection to insert into
|
|
* @param {Object} document - Document to insert
|
|
* @param {Object} updateFields - Fields to update for finding existing documents (legacy parameter, not used with upsert approach)
|
|
* @param {Object} session - MongoDB session
|
|
* @param {string} statCounter - Name of the linked counter in stats
|
|
* @param {boolean} linkToExisting - Whether to link to existing records
|
|
* @param {Object} stats - Statistics object to update
|
|
* @returns {Promise<boolean>} - Whether the operation succeeded
|
|
*/
|
|
async function safeInsertWithRecovery(
|
|
collection,
|
|
document,
|
|
updateFields,
|
|
session,
|
|
statCounter = null,
|
|
linkToExisting = true,
|
|
stats = null
|
|
) {
|
|
const maxRetries = MAX_RETRIES;
|
|
let retryCount = 0;
|
|
const collectionName = collection.collectionName;
|
|
|
|
while (retryCount <= maxRetries) {
|
|
try {
|
|
// Use replaceOne with upsert directly to handle both new inserts and duplicates
|
|
const filter = { _id: document._id };
|
|
const updateDoc = { ...document };
|
|
|
|
const upsertResult = await collection.replaceOne(
|
|
filter,
|
|
updateDoc,
|
|
{
|
|
session,
|
|
upsert: true
|
|
}
|
|
);
|
|
|
|
if (upsertResult.upsertedCount > 0) {
|
|
debug(`Created new ${collectionName} document via upsert`);
|
|
return true;
|
|
} else if (upsertResult.matchedCount > 0) {
|
|
debug(`Linked to existing ${collectionName} document via upsert (matched: ${upsertResult.matchedCount}, modified: ${upsertResult.modifiedCount})`);
|
|
|
|
// Increment the linked counter since we found an existing document
|
|
if (statCounter && stats && linkToExisting) {
|
|
stats[statCounter]++;
|
|
}
|
|
return true;
|
|
} else {
|
|
debug(`Upsert for ${collectionName} did not create or match any documents, retrying`);
|
|
// Fall through to retry logic
|
|
}
|
|
} catch (error) {
|
|
retryCount++;
|
|
|
|
// Check if we should retry based on error type
|
|
const shouldRetry = retryCount <= maxRetries && (
|
|
(error.errorLabels && error.errorLabels.includes("TransientTransactionError")) ||
|
|
(error.message && (
|
|
error.message.includes("has been aborted") ||
|
|
error.message.includes("transaction aborted") ||
|
|
error.message.includes("Transaction has been aborted") ||
|
|
error.message.includes("Cannot use a session that has ended")
|
|
))
|
|
);
|
|
|
|
if (shouldRetry) {
|
|
const baseDelay = RETRY_DELAY_MS * Math.pow(2, retryCount - 1);
|
|
const jitter = Math.random() * 0.5 * baseDelay; // Up to 50% jitter
|
|
const delay = Math.floor(baseDelay + jitter);
|
|
|
|
debug(`Retrying ${collectionName} operation (attempt ${retryCount}/${maxRetries}) after ${delay}ms delay due to error: ${error.message}`);
|
|
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
} else {
|
|
// If we've exceeded retries, throw the error
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error(`Failed to insert ${collectionName} document after ${maxRetries} retries`);
|
|
}
|
|
|
|
/**
|
|
* Process a job using separate transactions for each entity type.
|
|
* This is the most conservative approach for jobs that require maximum transaction isolation.
|
|
*
|
|
* @param {Object} sourceJob - The source job to copy
|
|
* @param {Number} oldJobId - The original job ID
|
|
* @param {Number} newJobId - The new job ID to use
|
|
* @param {Array} jobLogs - Job logs to copy
|
|
* @param {Array} jobAssigns - Job assignments to copy
|
|
* @param {Array} applications - Applications to copy
|
|
* @param {Array} appFiles - Application files to copy
|
|
* @param {Array} appDetails - Application details to copy
|
|
* @param {Object} applicationIdMapping - Mapping from old to new application IDs
|
|
* @param {Object} appFileIdMapping - Mapping from old to new application file IDs
|
|
* @param {Boolean} keepRelatedIds - Whether to keep original IDs for related entities
|
|
* @param {Object} stats - Statistics object to update
|
|
* @returns {Promise<Boolean>} - Whether the job was successfully processed
|
|
*/
|
|
async function processJobWithSeparateTransactions(
|
|
sourceJob,
|
|
oldJobId,
|
|
newJobId,
|
|
jobLogs,
|
|
jobAssigns,
|
|
applications,
|
|
appFiles,
|
|
appDetails,
|
|
applicationIdMapping,
|
|
appFileIdMapping,
|
|
keepRelatedIds,
|
|
stats
|
|
) {
|
|
debug(`Processing job ${oldJobId} -> ${newJobId} using completely separate transactions`);
|
|
|
|
try {
|
|
// Transaction 1: Create the job
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
const jobData = sourceJob.toObject();
|
|
const newJob = { ...jobData, _id: newJobId };
|
|
const jobUpsertResult = await Job.collection.replaceOne(
|
|
{ _id: newJobId },
|
|
newJob,
|
|
{ session, upsert: true }
|
|
);
|
|
|
|
if (jobUpsertResult.upsertedCount > 0) {
|
|
debug(`Created job with ID ${newJobId} in destination database via upsert (separate transaction)`);
|
|
stats.jobs++;
|
|
} else if (jobUpsertResult.matchedCount > 0) {
|
|
debug(`Updated existing job with ID ${newJobId} in destination database (separate transaction)`);
|
|
stats.jobs++;
|
|
} else {
|
|
debug(`Warning: Job upsert for ${newJobId} did not create or match any documents (separate transaction)`);
|
|
}
|
|
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Add delay between transactions
|
|
await new Promise(resolve => setTimeout(resolve, 500 + Math.random() * 300));
|
|
|
|
// Transaction 2: Process job logs in small batches
|
|
if (jobLogs.length > 0) {
|
|
const BATCH_SIZE = parseInt(process.env.LOG_BATCH_SIZE || '15', 10); // Reduced from 20 to 15
|
|
for (let i = 0; i < jobLogs.length; i += BATCH_SIZE) {
|
|
const batch = jobLogs.slice(i, i + BATCH_SIZE);
|
|
debug(`Processing batch of ${batch.length} job logs (${i + 1}-${Math.min(i + BATCH_SIZE, jobLogs.length)} of ${jobLogs.length})`);
|
|
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
|
|
const promises = batch.map(async (sourceJobLog) => {
|
|
const logData = sourceJobLog.toObject();
|
|
const logId = keepRelatedIds ? sourceJobLog._id : new mongoose.Types.ObjectId();
|
|
const newJobLog = { ...logData, job: newJobId, _id: logId };
|
|
return safeInsertWithRecovery(JobLog.collection, newJobLog, { job: newJobId }, session, 'linkedLogs', true, true, stats);
|
|
});
|
|
|
|
await Promise.all(promises);
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Add delay between batches
|
|
await new Promise(resolve => setTimeout(resolve, 200 + Math.random() * 200));
|
|
}
|
|
stats.jobLogs += jobLogs.length;
|
|
debug(`Completed processing all ${jobLogs.length} job logs`);
|
|
}
|
|
|
|
// Transaction 3: Process job assignments
|
|
if (jobAssigns.length > 0) {
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
|
|
const promises = jobAssigns.map(async (sourceJobAssign) => {
|
|
const assignData = sourceJobAssign.toObject();
|
|
const assignId = keepRelatedIds ? sourceJobAssign._id : new mongoose.Types.ObjectId();
|
|
const newJobAssign = { ...assignData, job: newJobId, _id: assignId };
|
|
return safeInsertWithRecovery(JobAssign.collection, newJobAssign, { job: newJobId }, session, 'linkedAssigns', true, true, stats);
|
|
});
|
|
|
|
await Promise.all(promises);
|
|
stats.jobAssigns += jobAssigns.length;
|
|
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Add delay between transactions
|
|
await new Promise(resolve => setTimeout(resolve, 300 + Math.random() * 200));
|
|
}
|
|
|
|
// Transaction 4: Process applications and related data one by one
|
|
for (let i = 0; i < applications.length; i++) {
|
|
const sourceApp = applications[i];
|
|
const appData = sourceApp.toObject();
|
|
const newAppId = applicationIdMapping[sourceApp._id];
|
|
|
|
// Create application in its own transaction
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
|
|
const newApp = { ...appData, jobId: newJobId, _id: newAppId };
|
|
await safeInsertWithRecovery(
|
|
Application.collection,
|
|
newApp,
|
|
{ jobId: newJobId },
|
|
session,
|
|
'linkedApplications',
|
|
true,
|
|
true,
|
|
stats
|
|
);
|
|
stats.applications++;
|
|
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Small delay between application transactions
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
|
|
// Find related app files
|
|
const relatedAppFiles = appFiles.filter(file => file.appId.toString() === sourceApp._id.toString());
|
|
|
|
// Process app files in a separate transaction
|
|
if (relatedAppFiles.length > 0) {
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
|
|
const promises = relatedAppFiles.map(async (sourceAppFile) => {
|
|
const fileData = sourceAppFile.toObject();
|
|
const newFileId = appFileIdMapping[sourceAppFile._id];
|
|
const newAppFile = {
|
|
...fileData,
|
|
appId: newAppId,
|
|
_id: newFileId
|
|
};
|
|
|
|
return safeInsertWithRecovery(
|
|
AppFile.collection,
|
|
newAppFile,
|
|
{ appId: newAppId },
|
|
session,
|
|
'linkedFiles',
|
|
true,
|
|
true,
|
|
stats
|
|
);
|
|
});
|
|
|
|
await Promise.all(promises);
|
|
stats.appFiles += relatedAppFiles.length;
|
|
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Small delay after file processing
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
}
|
|
|
|
// Find related app details and process them (if enabled)
|
|
if (COPY_APP_DETAILS) {
|
|
// Filter application details by both appId and fileId relationships
|
|
const sourceAppFileIds = relatedAppFiles.map(file => file._id.toString());
|
|
const relatedAppDetails = appDetails.filter(detail =>
|
|
detail.appId?.toString() === sourceApp._id.toString() ||
|
|
(detail.fileId && sourceAppFileIds.includes(detail.fileId.toString()))
|
|
);
|
|
|
|
// Process app details using bulk insert in a separate transaction
|
|
if (relatedAppDetails.length > 0) {
|
|
await withTransactionRetry(async () => {
|
|
const session = await mongoose.startSession(DEFAULT_TRANSACTION_OPTIONS);
|
|
|
|
try {
|
|
session.startTransaction();
|
|
|
|
// Prepare all app details for bulk insert
|
|
const appDetailsToInsert = relatedAppDetails.map((sourceAppDetail) => {
|
|
const detailData = sourceAppDetail.toObject();
|
|
const detailId = keepRelatedIds ? sourceAppDetail._id : new mongoose.Types.ObjectId();
|
|
|
|
// Determine the fileId to use
|
|
let newFileId = null;
|
|
if (sourceAppDetail.fileId && appFileIdMapping[sourceAppDetail.fileId]) {
|
|
newFileId = appFileIdMapping[sourceAppDetail.fileId];
|
|
}
|
|
|
|
const newAppDetail = {
|
|
...detailData,
|
|
fileId: newFileId, // Primary relationship using fileId
|
|
appId: sourceAppDetail.appId ? newAppId : undefined, // Preserve appId if it exists
|
|
_id: detailId
|
|
};
|
|
|
|
// Remove undefined fields
|
|
if (newAppDetail.appId === undefined) delete newAppDetail.appId;
|
|
|
|
return newAppDetail;
|
|
});
|
|
|
|
// Process in chunks of 1000 documents
|
|
const chunks = utils.chunkArray(appDetailsToInsert, 1000);
|
|
debug(`Processing ${appDetailsToInsert.length} app details in ${chunks.length} chunks using bulk insert (separate transaction)`);
|
|
|
|
// Process chunks sequentially using native async/await
|
|
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
|
|
const chunk = chunks[chunkIndex];
|
|
try {
|
|
await AppDetail.insertMany(chunk, { rawResult: true, ordered: true, session });
|
|
debug(`Successfully inserted chunk ${chunkIndex + 1}/${chunks.length} with ${chunk.length} app details (separate transaction)`);
|
|
} catch (err) {
|
|
debug(`Error in bulk insert for app details chunk ${chunkIndex + 1}/${chunks.length} (separate transaction): ${err.message}`);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
stats.appDetails += relatedAppDetails.length;
|
|
debug(`Completed bulk insert of ${relatedAppDetails.length} app details (separate transaction)`);
|
|
|
|
await session.commitTransaction();
|
|
} catch (error) {
|
|
if (session.inTransaction()) {
|
|
try {
|
|
await session.abortTransaction();
|
|
} catch (abortError) {
|
|
debug(`Error aborting transaction: ${abortError.message}`);
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
if (typeof session.endSession === 'function') {
|
|
try {
|
|
session.endSession();
|
|
} catch (endError) {
|
|
debug(`Error ending session: ${endError.message}`);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
debug(`Processed application ${i + 1}/${applications.length}`);
|
|
}
|
|
|
|
debug(`Successfully processed job ${oldJobId} -> ${newJobId} using separate transactions`);
|
|
return true;
|
|
} catch (error) {
|
|
debug(`Error processing job ${oldJobId} with separate transactions: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Main execution function
|
|
* Parses command-line arguments and executes the appropriate function
|
|
*/
|
|
(async function main() {
|
|
const args = process.argv.slice(2);
|
|
const command = args[0]?.toLowerCase();
|
|
|
|
dbConn.once('open', async () => {
|
|
try {
|
|
debug('Database connected');
|
|
|
|
if (command === 'copy') {
|
|
const sourceDbUri = process.env.SOURCE_DB_URI;
|
|
const jobIdsArg = process.env.JOB_IDS || args[1] || JOB_IDS_TO_MIGRATE.join(',');
|
|
const useNewIds = process.env.USE_NEW_IDS === 'true' || false;
|
|
const keepRelatedIds = process.env.KEEP_RELATED_IDS === 'true' || false;
|
|
const linkToExisting = process.env.LINK_TO_EXISTING !== 'false';
|
|
|
|
if (!sourceDbUri) {
|
|
debug('Error: SOURCE_DB_URI environment variable is required for copy operation');
|
|
process.exit(1);
|
|
}
|
|
|
|
if (!jobIdsArg) {
|
|
debug('Error: JOB_IDS environment variable or command-line argument required for copy operation');
|
|
process.exit(1);
|
|
}
|
|
|
|
const jobIds = jobIdsArg.split(',').map(id => parseInt(id.trim())).filter(id => !isNaN(id));
|
|
|
|
if (!jobIds.length) {
|
|
debug('Error: No valid job IDs provided');
|
|
process.exit(1);
|
|
}
|
|
|
|
debug('Starting copy operation from external database');
|
|
|
|
if (mongoose.connection.readyState !== 1) {
|
|
debug('Error: Destination database is not connected (readyState:', mongoose.connection.readyState, ')');
|
|
debug('Waiting for connection to be established...');
|
|
|
|
let attempts = 0;
|
|
while (mongoose.connection.readyState !== 1 && attempts < 10) {
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
attempts++;
|
|
debug(`Connection attempt ${attempts}, readyState: ${mongoose.connection.readyState}`);
|
|
}
|
|
|
|
if (mongoose.connection.readyState !== 1) {
|
|
debug('Error: Failed to establish connection to destination database');
|
|
process.exit(1);
|
|
}
|
|
|
|
debug('Destination database connection established successfully');
|
|
}
|
|
|
|
const result = await copyJobsFromPreviousDb({
|
|
sourceDbUri,
|
|
jobIds,
|
|
useNewIds,
|
|
keepRelatedIds,
|
|
linkToExisting,
|
|
newStartId: NEW_START_ID,
|
|
dryRun: DRY_RUN
|
|
});
|
|
|
|
debug('Copy operation result:', result);
|
|
if (result.skipped > 0) {
|
|
debug(`Skipped ${result.skipped} jobs that already exist in destination database:`, result.skippedIds.join(', '));
|
|
}
|
|
if (result.failed > 0) {
|
|
debug(`Failed to copy ${result.failed} jobs:`);
|
|
result.failedIds.forEach(jobId => {
|
|
const reason = result.stats.failureReasons[jobId];
|
|
debug(` Job ${jobId}: [${reason.category || 'Unknown'}] ${reason.message}`);
|
|
});
|
|
}
|
|
|
|
const linked = result.linked;
|
|
if (linked && (linked.logs > 0 || linked.assigns > 0 || linked.applications > 0 || linked.files > 0 || linked.details > 0)) {
|
|
debug('Linked existing records in destination database:');
|
|
if (linked.logs > 0) debug(` - ${linked.logs} job logs`);
|
|
if (linked.assigns > 0) debug(` - ${linked.assigns} job assignments`);
|
|
if (linked.applications > 0) debug(` - ${linked.applications} applications`);
|
|
if (linked.files > 0) debug(` - ${linked.files} application files`);
|
|
if (linked.details > 0) debug(` - ${linked.details} application details`);
|
|
}
|
|
} else {
|
|
await shiftSpecificJobIds();
|
|
debug('Job ID shifting completed successfully');
|
|
}
|
|
} catch (error) {
|
|
debug('Operation failed:', error);
|
|
} finally {
|
|
await mongoose.connection.close();
|
|
process.exit();
|
|
}
|
|
});
|
|
})();
|
|
|
|
module.exports = {
|
|
shiftSpecificJobIds,
|
|
safeMongoOperation,
|
|
withTransactionRetry,
|
|
copyJobsFromPreviousDb,
|
|
processJobWithSeparateTransactions
|
|
}; |