106 lines
3.2 KiB
JavaScript
106 lines
3.2 KiB
JavaScript
const utils = require('./utils'),
|
|
mongoose = require('mongoose'),
|
|
debug = require('debug')('agm:mongo_util');
|
|
|
|
function getTranOps(readLevel = 'snapshot') {
|
|
return ({
|
|
readConcern: { level: readLevel },
|
|
writeConcern: { w: 'majority' }
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Runs the txnFunc and retries if TransientTransactionError encountered
|
|
* @param {*} txnFunc a function which performs transaction within
|
|
* @param {*} session the mongo connection's session
|
|
*/
|
|
async function runTransactionWithRetry(txnFunc, session) {
|
|
if (!txnFunc && typeof (txnFunc) !== "function") throw new Error("invalid_func_param");
|
|
|
|
while (true) {
|
|
try {
|
|
await txnFunc(session); // performs transaction
|
|
break;
|
|
} catch (error) {
|
|
// If transient error, retry the whole transaction
|
|
if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("TransientTransactionError")) {
|
|
debug("TransientTransactionError, retrying transaction ...");
|
|
continue;
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Retries commit if UnknownTransactionCommitResult encountered
|
|
* @param {*} session the mongo connection's session
|
|
*/
|
|
async function commitWithRetry(session) {
|
|
if (!session) return;
|
|
while (true) {
|
|
try {
|
|
await session.commitTransaction(); // Uses write concern set at transaction start.
|
|
break;
|
|
} catch (error) {
|
|
// Can retry commit
|
|
if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("UnknownTransactionCommitResult")) {
|
|
debug("UnknownTransactionCommitResult, retrying commit operation ...");
|
|
continue;
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Perform a function with retry when mongo "TransientTransactionError" error occurs
|
|
* @param {*} fnc the function to run and retry
|
|
* @param {*} maxRetries maximum retry times. 1-5 times.
|
|
* @param {*} delaySecs delay, sleep time before trying again, 1 - 15 seconds. Default 5 seconds
|
|
*/
|
|
async function runWithRetry(fnc, maxRetries = 3, delaySecs = 5) {
|
|
if (!func && typeof (func) !== "function") throw new Error("invalid_func_param");
|
|
const _delay = Math.min(Math.max(delaySecs, 1), 15);
|
|
const _maxRetries = Math.min(Math.max(maxRetries, 1), 5);
|
|
|
|
let tries = 0;
|
|
while (tries < _maxRetries) {
|
|
try {
|
|
tries++;
|
|
await fnc();
|
|
break;
|
|
} catch (error) {
|
|
// If transient error, retry after delay
|
|
if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("TransientTransactionError")) {
|
|
await utils.delay(_delay * 1000);
|
|
continue;
|
|
} else {
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async function runInTransaction(func, ses = null, endSesDone = true) {
|
|
if (!func && typeof (func) !== "function") throw new Error("invalid_func_param");
|
|
|
|
const session = ses || (await mongoose.startSession(getTranOps()));
|
|
try {
|
|
session.startTransaction();
|
|
await func(session);
|
|
await commitWithRetry(session);
|
|
} catch (error) {
|
|
await session.abortTransaction();
|
|
debug(error);
|
|
throw error;
|
|
} finally {
|
|
endSesDone && session.endSession();
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
getTranOps, runTransactionWithRetry, commitWithRetry, runWithRetry, runInTransaction
|
|
} |