agmission/Development/server/controllers/api_export.js
Devin Major df31b2080d
All checks were successful
Server Tests / Mocha – Unit & Utility Tests (push) Successful in 42s
-(#3013) Data Export - Implement Data Export API BE (Cont.)
+ Added public data export API enhancements, tests, and customer documentation
  + Extended /api/v1 data export endpoints with richer session, records, area, and async export output
  + Added confirmed/fallback report values, client metadata, mapped area, over-spray, volume/apprate (string) units, and weather blocks
  + Normalized flowController to "No FC" and align record field names with playback output
  + Converted record wind speed output to knots, add Fligh Mater only record/export fields behind fm=true, and persist fm on export jobs
  + Added export status/area constants, HTTP 202 support, route-level API docs, and per-account export rate limiting support
  + Added comprehensive endpoint, format, and verification test coverage plus test-suite README
  + Added customer-facing data export design, integration, rate-limit, and documentation index guides
  + Updated README/DLQ docs and related documentation links to current HTTPS dashboard paths
2026-04-24 09:05:55 -04:00

513 lines
20 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'use strict';
/**
* Async Export controller — /api/v1/jobs/:jobId/export and /api/v1/exports/:exportId
*
* Flow:
* 1. POST /api/v1/jobs/:jobId/export → creates ExportJob record (status=pending),
* kicks off async generation, returns { exportId, status: 'pending' }.
* 2. GET /api/v1/exports/:exportId → poll status; when ready returns { status: 'ready', downloadUrl }.
* 3. GET /api/v1/exports/:exportId/download → streams the file, schedules cleanup.
*
* FE / integration notes:
* - For the daily 17:00 batch: POST export after previous day's jobs are confirmed sprayed,
* poll every 1030 s, then download the CSV when ready.
* - interval param works identically to the records endpoint (GPS point thinning).
* - CSV has all raw trace fields + job/session header columns repeated per row for
* direct Power BI / data-warehouse import without joins.
*/
const path = require('path');
const fs = require('fs');
const { Transform, pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
const ObjectId = require('mongodb').ObjectId;
const moment = require('moment');
const { Job, App, AppFile, AppDetail } = require('../model');
const ExportJob = require('../model/export_job');
const { AppParamError, AppAuthError } = require('../helpers/app_error');
const { Errors, HttpStatus, ExportUnits, ExportJobStatus } = require('../helpers/constants');
const utils = require('../helpers/utils');
const env = require('../helpers/env');
const EXPORT_TTL_HOURS = env.EXPORT_TTL_HOURS || 24;
const EXPORT_DEDUP_MINS = env.EXPORT_DEDUP_MINS ?? 5;
/**
* On startup: delete orphaned export files whose ExportJob is expired or missing.
* Runs fire-and-forget so it never blocks server startup.
*/
setImmediate(async () => {
try {
const pattern = /^export_[a-f0-9]+\.(csv|geojson)$/;
const files = await fs.promises.readdir(env.TEMP_DIR).catch(() => []);
for (const file of files) {
if (!pattern.test(file)) continue;
const id = file.replace(/^export_/, '').replace(/\.(csv|geojson)$/, '');
const exists = ObjectId.isValid(id) && await ExportJob.exists({ _id: id, expiresAt: { $gt: new Date() } });
if (!exists) {
fs.unlink(path.join(env.TEMP_DIR, file), () => {});
}
}
} catch { /* non-fatal */ }
});
// Re-use the same helpers from api_pub (inline to avoid a shared helper module for now)
function parseInterval(raw) {
if (raw == null || raw === '') return null;
const v = parseFloat(raw);
return isFinite(v) && v > 0 ? v : null;
}
function decodeSatsIn(raw) { return utils.isNumber(raw) ? (raw > 99 ? raw - 100 : raw) : null; }
function decodeCorrectionFields(tslu, calcodeFreq) {
const correctionId = utils.isNumber(tslu) ? (tslu > 100 ? tslu - 100 : tslu) : null;
let waasId = null;
if (utils.isNumber(calcodeFreq) && calcodeFreq >= 20001 && calcodeFreq <= 29999) waasId = calcodeFreq - 20000;
return { correctionId, waasId };
}
function computeAppRateApplied(lminApp, grSpeed, swath) {
if (!utils.isNumber(lminApp) || !utils.isNumber(grSpeed) || !utils.isNumber(swath)) return null;
if (grSpeed === 0 || swath === 0) return null;
return lminApp / (grSpeed * swath) * 10000;
}
function getLaserAlt(detail) {
return detail?.laserAlt ?? detail?.raserAlt ?? '';
}
/**
* Convert AppDetail.gpsTime to an ISO UTC timestamp.
* Supports both epoch-seconds and legacy seconds-of-day values.
*/
function toRecordTimeUtc(gpsTime, appStartDateTime) {
if (!utils.isNumber(gpsTime)) return null;
// Epoch seconds (>= year 2000-01-01 UTC) can be converted directly.
if (gpsTime >= 946684800) {
return moment.unix(gpsTime).utc().toISOString();
}
// Legacy format: seconds-of-day, anchor to app start date when available.
const base = moment.utc(appStartDateTime, [moment.ISO_8601, 'YYYYMMDDTHHmmss'], true);
if (base.isValid()) {
const dayOffset = Math.floor(gpsTime / 86400);
const secOfDay = ((gpsTime % 86400) + 86400) % 86400;
return base.clone().startOf('day').add(dayOffset, 'days').add(secOfDay, 'seconds').toISOString();
}
// Fallback for malformed app start datetime.
return moment.unix(gpsTime).utc().toISOString();
}
/** Verify job ownership — throws on mismatch. */
async function ownerJob(jobId, ownerId) {
const job = await Job.findOne({ _id: jobId, markedDelete: { $ne: true } }).lean();
if (!job) AppParamError.throw(Errors.JOB_NOT_FOUND);
if (!job.byPuid || job.byPuid.toString() !== ownerId.toString()) AppAuthError.throw();
return job;
}
// ─── Unit conversion helpers ─────────────────────────────────────────────────
// All raw AppDetail values are stored in SI/metric units.
// When units='us', these factors convert to US customary equivalents.
const CONV = {
msToMph: v => +(v * 2.23694).toFixed(4), // m/s → mph
msToKt: v => +(v * 1.94384).toFixed(4), // m/s → kt (knots, matches playback display)
mToFt: v => +(v * 3.28084).toFixed(3), // m → ft
cToF: v => +(v * 9 / 5 + 32).toFixed(2), // °C → °F
LminToGmin: v => +(v * 0.264172).toFixed(4), // L/min → gal/min
LhaToGac: v => +(v * 0.10694).toFixed(4), // L/ha → gal/ac
};
function applyConv(v, fn) {
return (v != null && v !== '') ? fn(Number(v)) : v;
}
/**
* Returns CSV column definitions for the requested unit system.
* Each entry: { key (row-object property), header (CSV column name) }.
*/
function getCsvColumns(units, includeFm = false) {
const us = units === ExportUnits.US;
const cols = [
// Job / session metadata — no unit conversion
{ key: 'jobId' }, { key: 'orderNumber' }, { key: 'jobName' },
{ key: 'clientId' }, { key: 'clientName' },
{ key: 'sessionId' }, { key: 'fileName' }, { key: 'pilotName' },
// GPS data
{ key: 'timeUtc' }, { key: 'gpsTime' }, { key: 'lat' }, { key: 'lon' },
{ key: 'utmX' }, { key: 'utmY' },
{ key: 'alt', header: us ? 'alt_ft' : 'alt_m' },
{ key: 'grSpeed', header: us ? 'groundSpeed_mph' : 'groundSpeed_ms' },
{ key: 'heading' },
{ key: 'xTrack', header: us ? 'crossTrackError_ft' : 'crossTrackError_m' },
{ key: 'lockedLine' }, { key: 'hdop' }, { key: 'satsInView' },
{ key: 'correctionId' }, { key: 'waasId' },
{ key: 'sprayStat' },
// Application data
{ key: 'flowRateApplied', header: us ? 'flowRateApplied_galMin' : 'flowRateApplied_Lmin' },
{ key: 'flowRateRequired', header: us ? 'flowRateRequired_galMin' : 'flowRateRequired_Lmin' },
{ key: 'appRateRequired', header: us ? 'appRateRequired_galAc' : 'appRateRequired_Lha' },
{ key: 'appRateApplied', header: us ? 'appRateApplied_galAc' : 'appRateApplied_Lha' },
{ key: 'swathWidth', header: us ? 'swathWidth_ft' : 'swathWidth_m' },
{ key: 'boomPressure_psi' },
{ key: 'flowController' },
{ key: 'sprayOnLag_s' }, { key: 'sprayOffLag_s' }, { key: 'pulsesPerLiter' },
{ key: 'rpm' },
// MET — wind in knots (metric) or mph (US) to match playback display
{ key: 'windSpeed_kt', header: us ? 'windSpeed_mph' : 'windSpeed_kt' },
{ key: 'windDir_deg' },
{ key: 'temp_c', header: us ? 'temp_f' : 'temp_c' },
{ key: 'humidity_pct' },
];
if (includeFm) {
// Flight Master / AgDisp fields — only when fm=true requested
cols.push(
{ key: 'sprayHeight_m' },
{ key: 'driftX_m' }, { key: 'driftY_m' },
{ key: 'depositX_m' }, { key: 'depositY_m' },
{ key: 'radarAlt_m' },
{ key: 'laserAlt_m' } // DB field is raserAlt (schema typo); exposed as laserAlt_m
);
}
return cols;
}
function escapeCsv(val) {
if (val == null) return '';
const s = String(val);
if (s.includes(',') || s.includes('"') || s.includes('\n')) return `"${s.replace(/"/g, '""')}"`;
return s;
}
function recordToRow(d, sessionMeta, jobHeader, units, includeFm = false) {
const us = units === ExportUnits.US;
const { correctionId, waasId } = decodeCorrectionFields(d.tslu, d.calcodeFreq);
const appRateApplied = computeAppRateApplied(d.lminApp, d.grSpeed, d.swath);
const fcName = sessionMeta.meta?.fcName;
const row = {
...jobHeader,
sessionId: sessionMeta.appId,
fileName: sessionMeta.fileName,
pilotName: sessionMeta.operator ?? '',
timeUtc: toRecordTimeUtc(d.gpsTime, sessionMeta.appStartDateTime),
gpsTime: d.gpsTime ?? '',
lat: d.lat ?? '', lon: d.lon ?? '',
utmX: d.utmX ?? '', utmY: d.utmY ?? '',
alt: us ? applyConv(d.alt, CONV.mToFt) : (d.alt ?? ''),
grSpeed: us ? applyConv(d.grSpeed, CONV.msToMph) : (d.grSpeed ?? ''),
heading: d.head ?? '',
xTrack: us ? applyConv(d.xTrack, CONV.mToFt) : (d.xTrack ?? ''),
lockedLine: d.llnum ?? '', hdop: d.stdHdop ?? '',
satsInView: decodeSatsIn(d.satsIn) ?? '',
correctionId: correctionId ?? '', waasId: waasId ?? '',
sprayStat: d.sprayStat ?? '',
flowRateApplied: us ? applyConv(d.lminApp, CONV.LminToGmin) : (d.lminApp ?? ''),
flowRateRequired: us ? applyConv(d.lminReq, CONV.LminToGmin) : (d.lminReq ?? ''),
appRateRequired: us ? applyConv(d.lhaReq, CONV.LhaToGac) : (d.lhaReq ?? ''),
appRateApplied: us ? applyConv(appRateApplied, CONV.LhaToGac) : (appRateApplied ?? ''),
swathWidth: us ? applyConv(d.swath, CONV.mToFt) : (d.swath ?? ''),
boomPressure_psi: d.psi ?? '',
flowController: (fcName && !/none/i.test(fcName)) ? fcName : 'No FC',
sprayOnLag_s: sessionMeta.meta?.sprOnLag ?? '',
sprayOffLag_s: sessionMeta.meta?.sprOffLag ?? '',
pulsesPerLiter: sessionMeta.meta?.pulsesPerLit ?? '',
rpm: (Array.isArray(d.rpm) && d.rpm.length) ? JSON.stringify(d.rpm) : '',
// Wind speed in knots (metric) or mph (US) — matches playback display
windSpeed_kt: us ? applyConv(d.windSpd, CONV.msToMph) : applyConv(d.windSpd, CONV.msToKt),
windDir_deg: d.windDir ?? '',
temp_c: us ? applyConv(d.temp, CONV.cToF) : (d.temp ?? ''),
humidity_pct: d.humid ?? ''
};
if (includeFm) {
row.sprayHeight_m = d.sprayHeight ?? '';
row.driftX_m = d.driftX ?? '';
row.driftY_m = d.driftY ?? '';
row.depositX_m = d.depositX ?? '';
row.depositY_m = d.depositY ?? '';
row.radarAlt_m = d.radarAlt ?? '';
row.laserAlt_m = getLaserAlt(d);
}
const cols = getCsvColumns(units, includeFm);
return cols.map(c => escapeCsv(row[c.key])).join(',') + '\n';
}
// ─── Async generation ─────────────────────────────────────────────────────────
async function generateExport(exportJobId) {
const exportJob = await ExportJob.findById(exportJobId);
if (!exportJob) return;
try {
exportJob.status = ExportJobStatus.PROCESSING;
await exportJob.save();
const job = await Job.findById(exportJob.jobId, 'name orderNumber client')
.populate('client', '_id name')
.lean();
const jobHeader = {
jobId: exportJob.jobId,
orderNumber: job?.orderNumber ?? '',
jobName: job?.name ?? '',
clientId: job?.client?._id?.toString() ?? '',
clientName: job?.client?.name ?? ''
};
const apps = await App.find({ jobId: exportJob.jobId, markedDelete: { $ne: true } }).lean();
const appFiles = await AppFile.find(
{ appId: { $in: apps.map(a => a._id) }, markedDelete: { $ne: true } }
).lean();
const filesByAppId = {};
for (const f of appFiles) {
const key = f.appId.toString();
if (!filesByAppId[key]) filesByAppId[key] = [];
filesByAppId[key].push(f);
}
const interval = exportJob.interval;
const includeFm = !!exportJob.fm;
const outPath = path.join(env.TEMP_DIR, `export_${exportJobId}.${exportJob.format}`);
const writeStream = fs.createWriteStream(outPath);
const units = exportJob.units || 'metric';
if (exportJob.format === 'csv') {
// Write header row (unit-aware column names)
const cols = getCsvColumns(units, includeFm);
writeStream.write(cols.map(c => c.header || c.key).join(',') + '\n');
for (const app of apps) {
const files = filesByAppId[app._id.toString()] || [];
for (const appFile of files) {
const sessionMeta = {
appId: app._id,
fileName: app.fileName,
operator: appFile.meta?.operator,
meta: appFile.meta,
appStartDateTime: app.startDateTime
};
const cursor = AppDetail.find(
{ fileId: appFile._id, sprayStat: { $ne: 3 } },
null,
{ sort: { _id: 1 }, lean: true }
).cursor();
let prevGpsTime = null;
for await (const record of cursor) {
if (interval) {
if (prevGpsTime !== null && (record.gpsTime - prevGpsTime) < interval) continue;
prevGpsTime = record.gpsTime;
}
writeStream.write(recordToRow(record, sessionMeta, jobHeader, units, includeFm));
}
}
}
} else if (exportJob.format === 'geojson') {
// GeoJSON FeatureCollection — one Feature per GPS point.
// sprayStat=3 excluded: it is a spray segment START marker, not application data.
writeStream.write('{"type":"FeatureCollection","features":[\n');
let first = true;
for (const app of apps) {
const files = filesByAppId[app._id.toString()] || [];
for (const appFile of files) {
const cursor = AppDetail.find(
{ fileId: appFile._id, sprayStat: { $ne: 3 } },
null,
{ sort: { _id: 1 }, lean: true }
).cursor();
let prevGpsTime = null;
for await (const d of cursor) {
if (interval) {
if (prevGpsTime !== null && (d.gpsTime - prevGpsTime) < interval) continue;
prevGpsTime = d.gpsTime;
}
if (!utils.isNumber(d.lon) || !utils.isNumber(d.lat)) continue;
const feature = {
type: 'Feature',
geometry: { type: 'Point', coordinates: [d.lon, d.lat, d.alt ?? 0] },
properties: {
jobId: exportJob.jobId, sessionId: String(app._id), fileName: app.fileName,
timeUtc: toRecordTimeUtc(d.gpsTime, app.startDateTime) || null,
sprayStat: d.sprayStat, grSpeed: d.grSpeed
}
};
writeStream.write((first ? '' : ',\n') + JSON.stringify(feature));
first = false;
}
}
}
writeStream.write('\n]}');
}
await new Promise((resolve, reject) => {
writeStream.end();
writeStream.on('finish', resolve);
writeStream.on('error', reject);
});
const expiresAt = new Date(Date.now() + EXPORT_TTL_HOURS * 3600 * 1000);
exportJob.status = ExportJobStatus.READY;
exportJob.filePath = outPath;
exportJob.expiresAt = expiresAt;
await exportJob.save();
} catch (err) {
exportJob.status = ExportJobStatus.ERROR;
exportJob.errorMsg = err.message;
await exportJob.save();
console.error('[export] generation failed', err);
}
}
// ─── Route handlers ───────────────────────────────────────────────────────────
/**
* POST /api/v1/jobs/:jobId/export
* Body: { format: 'csv' | 'geojson', interval?: number }
*/
async function triggerExport(req, res) {
const jobId = parseInt(req.params.jobId, 10);
if (!isFinite(jobId)) AppParamError.throw('invalid jobId');
await ownerJob(jobId, req.uid);
const format = req.body?.format;
if (!['csv', 'geojson'].includes(format)) {
return res.status(HttpStatus.BAD_REQUEST).json({ error: 'format must be csv or geojson' });
}
const interval = parseInterval(req.body?.interval);
const rawUnits = req.body?.units;
const units = rawUnits === ExportUnits.US ? ExportUnits.US : ExportUnits.METRIC;
const fm = req.body?.fm === true; // opt-in: include Flight Master / AgDisp fields
// Deduplication: reuse an existing export for the same params within the dedup window.
// - ready + not yet expired → can be re-downloaded immediately
// - pending/processing + created within dedup window → generation already in flight
const dedupSince = new Date(Date.now() - EXPORT_DEDUP_MINS * 60 * 1000);
const existing = await ExportJob.findOne({
owner: ObjectId(req.uid),
jobId,
format,
interval: interval ?? null,
units,
fm: fm || false,
$or: [
{ status: ExportJobStatus.READY, expiresAt: { $gt: new Date() } },
{ status: { $in: [ExportJobStatus.PENDING, ExportJobStatus.PROCESSING] }, createdAt: { $gte: dedupSince } }
]
}).sort({ createdAt: -1 }).lean();
if (existing) {
const statusCode = existing.status === ExportJobStatus.READY ? HttpStatus.OK : HttpStatus.ACCEPTED;
const payload = {
exportId: existing._id,
status: existing.status,
format: existing.format,
units: existing.units,
createdAt: existing.createdAt,
reused: true
};
if (existing.status === ExportJobStatus.READY) payload.downloadUrl = `/api/v1/exports/${existing._id}/download`;
return res.status(statusCode).json(payload);
}
const exportJob = await ExportJob.create({
owner: ObjectId(req.uid),
jobId,
format,
interval,
units,
fm,
status: ExportJobStatus.PENDING
});
// Kick off async generation — do not await
setImmediate(() => generateExport(exportJob._id));
res.status(HttpStatus.ACCEPTED).json({
exportId: exportJob._id,
status: exportJob.status,
format: exportJob.format,
units: exportJob.units,
createdAt: exportJob.createdAt
});
}
/**
* GET /api/v1/exports/:exportId
* Poll for export status. When ready, includes downloadUrl.
*/
async function getExportStatus(req, res) {
const exportId = req.params.exportId;
if (!ObjectId.isValid(exportId)) AppParamError.throw('invalid exportId');
const exportJob = await ExportJob.findOne({
_id: ObjectId(exportId),
owner: ObjectId(req.uid)
}).lean();
if (!exportJob) return res.status(HttpStatus.NOT_FOUND).json({ error: Errors.NOT_FOUND });
const payload = {
exportId: exportJob._id,
status: exportJob.status,
format: exportJob.format,
units: exportJob.units,
createdAt: exportJob.createdAt,
expiresAt: exportJob.expiresAt ?? null,
error: exportJob.errorMsg ?? null
};
if (exportJob.status === ExportJobStatus.READY) {
// Provide a download URL — the frontend calls this to stream the file
payload.downloadUrl = `/api/v1/exports/${exportId}/download`;
}
res.json(payload);
}
/**
* GET /api/v1/exports/:exportId/download
* Streams the generated export file. Schedules file deletion after streaming.
*/
async function downloadExport(req, res) {
const exportId = req.params.exportId;
if (!ObjectId.isValid(exportId)) AppParamError.throw('invalid exportId');
const exportJob = await ExportJob.findOne({
_id: ObjectId(exportId),
owner: ObjectId(req.uid),
status: ExportJobStatus.READY
}).lean();
if (!exportJob || !exportJob.filePath) {
return res.status(HttpStatus.NOT_FOUND).json({ error: Errors.NOT_FOUND });
}
const ext = exportJob.format === 'geojson' ? 'geojson' : 'csv';
const contentType = exportJob.format === 'geojson' ? 'application/geo+json' : 'text/csv';
const filename = `export_job${exportJob.jobId}_${exportJob._id}.${ext}`;
res.setHeader('Content-Type', contentType);
res.setHeader('Content-Disposition', `attachment; filename="${filename}"`);
const readStream = fs.createReadStream(exportJob.filePath);
readStream.pipe(res);
readStream.on('error', (err) => {
console.error('[export] stream error', err);
res.end();
});
}
module.exports = { triggerExport, getExportStatus, downloadExport };