331 lines
14 KiB
JavaScript
331 lines
14 KiB
JavaScript
'use strict';
|
|
|
|
const cron = require('node-cron'),
|
|
debug = require('debug')('agm:invoice_worker'),
|
|
env = require('../helpers/env.js'),
|
|
isProd = env.PRODUCTION,
|
|
{ DBConnection } = require('../helpers/db/connect'),
|
|
models = require('../model'),
|
|
{ InvoiceStatus, InvoiceStatusAction, DEFAULT_LANG, SubType } = require('../helpers/constants'),
|
|
utils = require('../helpers/utils.js'),
|
|
{ isEqual, cloneDeep } = require('lodash'),
|
|
moment = require('moment'),
|
|
{ stripe } = require('../helpers/subscription_util'),
|
|
mailer = require('../helpers/mailer');
|
|
|
|
// Initialize database connection
|
|
const workerDB = new DBConnection('Invoice Worker');
|
|
|
|
// Register fatal handlers
|
|
const path = require('path');
|
|
const { registerFatalHandlers } = require('../helpers/process_fatal_handlers');
|
|
registerFatalHandlers(process, {
|
|
env,
|
|
debug,
|
|
kindPrefix: 'invoice_worker',
|
|
reportFilePath: path.join(__dirname, 'invoice_worker.rlog'),
|
|
});
|
|
|
|
// Initialize the database connection
|
|
workerDB.initialize({ setupExitHandlers: false });
|
|
|
|
// Checking on invoices and update (i.e.: status to automatically transit into the next status such as Draft to Open) them accordlingly
|
|
const processInvoices = {
|
|
schedule: isProd ? '*/1 * * * *' : `*/1 * * * *`,
|
|
status: 0,
|
|
name: "processInvoices"
|
|
};
|
|
const processInvoicesTask = cron.schedule(processInvoices.schedule, async () => {
|
|
// Check and only proceed when is idle and the db connection is connected
|
|
if (!workerDB.isReady() || processInvoices.status)
|
|
return;
|
|
|
|
let result = { nModified: 0 };
|
|
try {
|
|
processInvoices.status = 1;
|
|
|
|
const currDateUTC = moment.utc().endOf('day'); // Shift today to the end of day for easily checking with other dates counting by day
|
|
const bulkUpdateOps = [];
|
|
|
|
//1. Retreive the eligible invoices, by batch, for processing then loop through them while retreiving which is not a proper way.
|
|
const applInvoices = await models.Invoice.aggregate([
|
|
{
|
|
$match: { $or: [{ status: InvoiceStatus.Draft, openDate: { $lte: currDateUTC } }, { status: InvoiceStatus.Open }] }
|
|
},
|
|
{
|
|
$group: { _id: "$byPuid", "invoices": { $push: "$$ROOT" } }
|
|
},
|
|
{
|
|
$project: { "invoices._id": 1, "invoices.status": 1, "invoices.openDate": 1, "invoices.dueDate": 1, "invoices.clients": 1 }
|
|
},
|
|
{
|
|
$sort: { "invoices.openDate": 1, "invoices.dueDate": 1 }
|
|
},
|
|
{ $limit: env.INV_PROCESS_LIMIT }]);
|
|
|
|
let invUpdateOrg, invUpdate, openDate, dueDate;
|
|
/**
|
|
* The cached infoset by applicator userid.
|
|
* Structure: { <applId>: { <invSetting> : { the setting object }, etc. } }. The cached data by applicator for performance optimization (avoid too many round trips to the db servers)
|
|
*/
|
|
const applInfoSet = { nModified: 0 };
|
|
let invSetting, applId;
|
|
for (const applInv of applInvoices) {
|
|
if (!applInv?._id || utils.isEmptyArray(applInv.invoices)) continue;
|
|
|
|
applId = applInv._id;
|
|
invSetting = applInfoSet[applId];
|
|
if (invSetting === undefined) {
|
|
invSetting = await models.InvoiceSetting.findOne({ byPuid: applId, userId: null }, {}, { lean: true });
|
|
applInfoSet[applId] = invSetting;
|
|
}
|
|
|
|
let dueToUncollectibleDays = 0;
|
|
if (invSetting && invSetting.dueToUncollectibleOp === InvoiceStatusAction.MARK_UNCOLLECTIBLE) {
|
|
dueToUncollectibleDays = invSetting.dueToUncollectibleDays || env.INV_MAX_OVERDUE_DAYS;
|
|
}
|
|
|
|
for (const invoice of applInv.invoices) {
|
|
invUpdate = { status: invoice.status };
|
|
invUpdateOrg = cloneDeep(invUpdate);
|
|
|
|
openDate = moment.utc(invoice.openDate);
|
|
dueDate = moment.utc(invoice.dueDate);
|
|
|
|
// Case 1: invoice status is draft and current date >= open date => set status = open
|
|
if (invoice.status == InvoiceStatus.Draft && currDateUTC >= openDate) {
|
|
invUpdate.status = InvoiceStatus.Open;
|
|
}
|
|
|
|
if (invoice.status === InvoiceStatus.Open) {
|
|
// Case 2: Check Open invoice to transit to Paid or Uncollectible (should bases on a number of days param)
|
|
if (dueToUncollectibleDays && currDateUTC.diff(dueDate, "days") > dueToUncollectibleDays) {
|
|
invUpdate.status = InvoiceStatus.Uncollectible;
|
|
} else {
|
|
const clients = invoice.clients;
|
|
let isPaid = true;
|
|
// Check on all clients's amountDue to decide whether the invoice was fully paid
|
|
for (const client of clients) {
|
|
if (client.amountDue == undefined || Number(client.amountDue) > 0) {
|
|
isPaid = false;
|
|
break;
|
|
}
|
|
}
|
|
if (isPaid) {
|
|
invUpdate.status = InvoiceStatus.Paid;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!isEqual(invUpdateOrg, invUpdate)) {
|
|
const invUpdateDoc = { 'updateOne': { 'filter': { '_id': invoice._id, status: { $ne: InvoiceStatus.Paid } }, 'update': invUpdate } };
|
|
bulkUpdateOps.push(invUpdateDoc);
|
|
}
|
|
}
|
|
|
|
if (!utils.isEmptyArray(bulkUpdateOps)) {
|
|
const upResult = await models.Invoice.bulkWrite(bulkUpdateOps);
|
|
(upResult.ok) && (result.nModified += upResult.nModified);
|
|
}
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
debug(error);
|
|
} finally {
|
|
if (result) {
|
|
debug('Done. processInvoicesTask:', result);
|
|
}
|
|
processInvoices.status = 0;
|
|
}
|
|
},
|
|
{
|
|
scheduled: true,
|
|
timezone: "Etc/UTC",
|
|
name: processInvoices.name,
|
|
runOnInit: true
|
|
});
|
|
|
|
// ─── Promo expiry advance-warning reminder ────────────────────────────────────
|
|
// Runs once daily at 9 AM UTC. For every active Stripe subscription schedule
|
|
// that has promoId metadata and a discounted phase ending within
|
|
// PROMO_EXPIRY_WARNING_DAYS days, sends a "your promo ends soon" email reusing
|
|
// the promo-expired template (with isWarning=true).
|
|
// Deduplication: stores promoReminderSentAt in Stripe SUBSCRIPTION metadata so
|
|
// the email is sent at most once per subscription. The flag is automatically
|
|
// cleared (set to '') by updatePromoSubscriptionSchedules and
|
|
// updateScheduleEndBehavior whenever the discount end-date is updated, so a
|
|
// fresh reminder will be sent as the new deadline approaches.
|
|
const promoReminder = {
|
|
schedule: process.env.PROMO_EXPIRY_WARNING_CRON || (isProd ? '0 9 * * *' : `*/1 * * * *`),
|
|
status: 0,
|
|
name: 'checkPromoExpiryReminders'
|
|
};
|
|
|
|
cron.schedule(promoReminder.schedule, async () => {
|
|
if (!workerDB.isReady() || promoReminder.status) return;
|
|
|
|
const warningDays = env.PROMO_EXPIRY_WARNING_DAYS;
|
|
if (!warningDays || warningDays <= 0) return;
|
|
|
|
promoReminder.status = 1;
|
|
let reminded = 0;
|
|
|
|
/**
|
|
* Process a single active schedule for the promo expiry reminder.
|
|
* @param {Object} schedule - Stripe subscriptionSchedule object
|
|
* @param {number} effectiveNow - Unix timestamp to use as "now" (frozen_time for test clocks)
|
|
* @returns {boolean} true if a reminder was sent
|
|
*/
|
|
async function processScheduleReminder(schedule, effectiveNow) {
|
|
if (schedule.status !== 'active') { return false; }
|
|
if (!schedule.subscription) { return false; }
|
|
|
|
// Find a coupon-bearing phase ending within the warning window.
|
|
// Evaluated per-phase so each subscription is judged on its own end_date,
|
|
// not a globally-computed cutoff (which would drift if the worker was offline).
|
|
const expiringPhase = schedule.phases?.find(ph => {
|
|
if (!ph.end_date || !(ph.coupon || ph.discounts?.length)) return false;
|
|
const daysUntilEnd = (ph.end_date - effectiveNow) / 86400;
|
|
return daysUntilEnd > 0 && daysUntilEnd <= warningDays;
|
|
});
|
|
if (!expiringPhase) {
|
|
!env.PRODUCTION && debug(`skip ${schedule.id}: no phase ending within ${warningDays}d (effectiveNow=${effectiveNow})`);
|
|
return false;
|
|
}
|
|
|
|
// Retrieve the subscription — it is the canonical store for promoId and the
|
|
// reminder dedup flag (schedule metadata is not reliably set across all creation
|
|
// paths, and schedules can be recreated when toggling cancel_at_period_end).
|
|
const subscription = await stripe.subscriptions.retrieve(schedule.subscription, {
|
|
expand: ['items.data.price.product']
|
|
});
|
|
|
|
const promoId = subscription.metadata?.promoId;
|
|
if (!promoId) { debug(`skip ${schedule.id}: subscription has no promoId`); return false; }
|
|
|
|
// Already sent a reminder for this subscription?
|
|
if (subscription.metadata?.promoReminderSentAt) { debug(`skip ${schedule.id}: reminder already sent`); return false; }
|
|
|
|
// Look up the promo definition from settings
|
|
const settings = await models.Setting.findOne({ userId: null });
|
|
const promo = settings?.subscriptionPromos?.find(p => p._id?.toString() === promoId);
|
|
|
|
// Look up customer in DB (active, not deleted)
|
|
const applicator = await models.Customer.findOne({
|
|
'membership.custId': schedule.customer,
|
|
active: true,
|
|
markedDelete: { $ne: true }
|
|
}).lean();
|
|
if (!applicator) { debug(`skip ${schedule.id}: no active DB customer for ${schedule.customer}`); return false; }
|
|
|
|
const subName = subscription?.items?.data?.length
|
|
? subscription.items.data.map(it => it.price?.product?.name).filter(Boolean).join(', ')
|
|
: 'Subscription';
|
|
|
|
const daysRemaining = Math.max(1, Math.round((expiringPhase.end_date - effectiveNow) / 86400));
|
|
const promoEndDate = moment.unix(expiringPhase.end_date).utc().format('MMMM D, YYYY [UTC]');
|
|
const promoStartDate = expiringPhase.start_date
|
|
? moment.unix(expiringPhase.start_date).utc().format('MMMM D, YYYY [UTC]')
|
|
: null;
|
|
|
|
let promoDiscount = utils.formatPromoDiscount(promo);
|
|
|
|
const isTaxable = subscription?.automatic_tax?.enabled === true;
|
|
const chargeAmount = subscription?.items?.data?.length
|
|
? `$${(subscription.items.data.reduce((sum, it) => sum + (it.price.unit_amount * (it.quantity || 1)), 0) / 100).toFixed(2)}`
|
|
: null;
|
|
|
|
const nextBillingDate = subscription?.current_period_end
|
|
? moment.unix(subscription.current_period_end).utc().format('MMMM D, YYYY [UTC]')
|
|
: null;
|
|
|
|
// Infer sub kind from price keys
|
|
let subKind = promo?.type || SubType.PACKAGE;
|
|
if (!promo?.type && subscription?.items?.data?.length) {
|
|
const allAddon = subscription.items.data.every(it => {
|
|
const pk = env.PRICE_MAP[it.price?.id];
|
|
return pk && pk.startsWith('addon_');
|
|
});
|
|
subKind = allAddon ? SubType.ADDON : SubType.PACKAGE;
|
|
}
|
|
|
|
const emailLocals = {
|
|
name: applicator.name || applicator.contact || applicator.username,
|
|
promoName: promo?.name || 'Promotional Discount',
|
|
subName,
|
|
subKind,
|
|
promoDiscount,
|
|
promoStartDate,
|
|
promoEndDate,
|
|
daysRemaining,
|
|
newBillingDate: nextBillingDate,
|
|
chargeAmount,
|
|
isTaxable,
|
|
isWarning: true,
|
|
userId: applicator._id?.toString(),
|
|
lang: applicator.lang || DEFAULT_LANG
|
|
};
|
|
|
|
await mailer.sendPromoExpiredEmail(emailLocals, applicator.username);
|
|
|
|
// Mark reminder sent on the subscription so the flag survives schedule recreation
|
|
// (e.g. toggling cancel_at_period_end recreates the schedule).
|
|
// Cleared automatically when the discount end-date changes (see updatePromoSubscriptionSchedules).
|
|
// Store effectiveNow so test-clock runs reflect the frozen time, not wall-clock.
|
|
await stripe.subscriptions.update(subscription.id, {
|
|
metadata: { ...subscription.metadata, promoReminderSentAt: String(effectiveNow) }
|
|
});
|
|
|
|
debug(`Promo expiry warning sent to ${applicator.username} for schedule ${schedule.id} (${daysRemaining} days remaining)`);
|
|
return true;
|
|
}
|
|
|
|
try {
|
|
const now = moment.utc().unix();
|
|
|
|
// ── Main loop: production schedules (not attached to test clocks) ──────────
|
|
// Note: Stripe's subscriptionSchedules.list() excludes test-clock-attached
|
|
// schedules entirely — they are handled separately below in dev mode.
|
|
for await (const schedule of stripe.subscriptionSchedules.list()) {
|
|
try {
|
|
reminded += (await processScheduleReminder(schedule, now)) ? 1 : 0;
|
|
} catch (innerErr) {
|
|
debug(`promoReminder: error processing schedule ${schedule.id}:`, innerErr.message);
|
|
}
|
|
}
|
|
|
|
// ── Dev-only loop: schedules attached to Stripe test clocks ───────────────
|
|
// Stripe isolates test-clock resources from the standard list. We iterate
|
|
// test clocks explicitly and list their subscriptions, using each clock's
|
|
// frozen_time as effectiveNow so the window check is relative to the
|
|
// simulated date rather than real wall-clock time.
|
|
if (!isProd) {
|
|
for await (const clock of stripe.testHelpers.testClocks.list()) {
|
|
if (clock.status !== 'ready' || !clock.frozen_time) continue;
|
|
const effectiveNow = clock.frozen_time;
|
|
for await (const sub of stripe.subscriptions.list({ test_clock: clock.id, status: 'active' })) {
|
|
if (!sub.schedule) continue;
|
|
const scheduleId = typeof sub.schedule === 'string' ? sub.schedule : sub.schedule.id;
|
|
try {
|
|
const schedule = await stripe.subscriptionSchedules.retrieve(scheduleId);
|
|
reminded += (await processScheduleReminder(schedule, effectiveNow)) ? 1 : 0;
|
|
} catch (innerErr) {
|
|
debug(`promoReminder: error processing test-clock schedule ${scheduleId}:`, innerErr.message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
debug(`checkPromoExpiryReminders: sent ${reminded} reminder(s)`);
|
|
} catch (err) {
|
|
debug('checkPromoExpiryReminders error:', err.message);
|
|
} finally {
|
|
promoReminder.status = 0;
|
|
}
|
|
}, {
|
|
scheduled: true,
|
|
timezone: 'Etc/UTC',
|
|
name: promoReminder.name,
|
|
runOnInit: false
|
|
});
|