agmission/Development/server/docs/IMPLEMENTATION_GUIDE.md

1651 lines
45 KiB
Markdown

# Partner Integration Implementation Guide
## Overview
This guide provides step-by-step implementation instructions for the AgMission partner integration system using a dual-user approach with environment-based configuration.
## System Architecture
### Dual User Model
The system uses two types of user entities:
1. **Partner Organizations**: Companies providing integration services (e.g., SatLoc, AgIDronex)
2. **Partner System Users**: Customer accounts within each partner system
```
AgMission Customer
Partner System User (SatLoc Account)
Partner Organization (SatLoc Company)
External API Integration
```
### Benefits of This Approach
- **Simplified Management**: Partners are User entities with authentication/authorization
- **Customer Isolation**: Each customer has their own partner system credentials
- **Environment Configuration**: Partner settings managed via environment variables
- **Scalability**: Easy to add new partners and customer accounts
## Implementation Steps
### Step 1: Environment Configuration
Create partner configuration in your `.env` file:
```bash
# Global Partner Settings
PARTNER_SYNC_INTERVAL=300000
PARTNER_HEALTH_CHECK_INTERVAL=60000
PARTNER_MAX_CONCURRENT_JOBS=10
PARTNER_ENCRYPT_CREDENTIALS=true
# SatLoc Configuration
SATLOC_API_ENDPOINT=https://www.satloccloud.com/api/Satloc
SATLOC_API_KEY=your_default_satloc_key
SATLOC_API_SECRET=your_default_satloc_secret
SATLOC_API_TIMEOUT=30000
SATLOC_RETRY_ATTEMPTS=3
SATLOC_RATE_LIMIT=60
```
### Step 2: Create Partner Organization
```javascript
// Create SatLoc partner organization
POST /api/partner/createPartner
{
"name": "SatLoc Cloud",
"username": "satloc",
"partnerCode": "SATLOC",
"partnerName": "SatLoc Cloud Services",
"active": true,
"configuration": {
"supportsRealtime": false,
"maxFileSize": 10485760,
"supportedFormats": ["kml", "shp", "geojson"]
}
}
```
### Step 3: Create Partner System Users
For each customer that needs SatLoc integration:
```javascript
// Create customer's SatLoc account
POST /api/partner/createSystemUser
{
"partnerId": "partner_organization_id",
"customerId": "agmission_customer_id",
"name": "Customer SatLoc Account",
"partnerUserId": "customer_satloc_user_id",
"partnerUsername": "customer_username_in_satloc",
"companyId": "customer_satloc_company_id",
"apiKey": "customer_specific_api_key",
"apiSecret": "customer_specific_api_secret",
"active": true
}
```
## Implementation Phases
### Phase 1: Foundation Setup (Weeks 1-2)
#### 1.1 Partner Service Interface Implementation
Create the core partner service interface:
```javascript
// File: services/partners/PartnerService.js
class PartnerService {
constructor(config) {
this.config = config;
this.partnerType = config.code;
}
/**
* Get the partner type identifier
* @returns {string} Partner type (e.g., 'satloc', 'dji')
*/
getPartnerType() {
return this.partnerType;
}
/**
* Upload job definition to partner system
* @param {Object} job - Job object from database
* @returns {Promise<{externalJobId: string, metadata?: any}>}
*/
async uploadJobDefinition(job) {
throw new Error('uploadJobDefinition must be implemented by partner service');
}
/**
* Download flight data from partner system
* @param {string} aircraftId - Partner's aircraft identifier
* @param {string} externalJobId - Partner's job identifier
* @returns {Promise<FlightDataFile[]>}
*/
async downloadFlightData(aircraftId, externalJobId) {
throw new Error('downloadFlightData must be implemented by partner service');
}
/**
* Check job status in partner system
* @param {string} externalJobId - Partner's job identifier
* @returns {Promise<JobStatus>}
*/
async syncJobStatus(externalJobId) {
throw new Error('syncJobStatus must be implemented by partner service');
}
/**
* Convert partner data format to internal format
* @param {any} data - Raw partner data
* @param {string} format - Original data format
* @returns {Promise<InternalData[]>}
*/
async convertToInternalFormat(data, format) {
throw new Error('convertToInternalFormat must be implemented by partner service');
}
/**
* Validate partner configuration
* @returns {Promise<boolean>}
*/
async validateConfiguration() {
return true;
}
/**
* Health check for partner service
* @returns {Promise<{status: string, responseTime: number}>}
*/
async healthCheck() {
const start = Date.now();
try {
// Implement partner-specific health check
await this.ping();
return {
status: 'healthy',
responseTime: Date.now() - start
};
} catch (error) {
return {
status: 'unhealthy',
responseTime: Date.now() - start,
error: error.message
};
}
}
/**
* Basic ping to partner API
* @returns {Promise<void>}
*/
async ping() {
// Default implementation - override in specific services
throw new Error('ping must be implemented by partner service');
}
}
module.exports = PartnerService;
```
#### 1.2 Partner Registry Implementation
```javascript
// File: services/partners/PartnerRegistry.js
const debug = require('debug')('agm:partner-registry');
class PartnerRegistry {
constructor() {
this.partners = new Map();
this.healthCheckInterval = 300000; // 5 minutes
this.healthCheckTimer = null;
}
/**
* Register a partner service
* @param {string} partnerType - Partner type identifier
* @param {PartnerService} service - Partner service instance
*/
register(partnerType, service) {
if (!service || typeof service.getPartnerType !== 'function') {
throw new Error(`Invalid partner service for type: ${partnerType}`);
}
this.partners.set(partnerType, service);
debug(`Registered partner service: ${partnerType}`);
}
/**
* Get a partner service by type
* @param {string} partnerType - Partner type identifier
* @returns {PartnerService|undefined}
*/
get(partnerType) {
return this.partners.get(partnerType);
}
/**
* Get all registered partner services
* @returns {PartnerService[]}
*/
getAll() {
return Array.from(this.partners.values());
}
/**
* Check if a partner type is supported
* @param {string} partnerType - Partner type identifier
* @returns {boolean}
*/
isPartnerSupported(partnerType) {
return this.partners.has(partnerType);
}
/**
* Get list of supported partner types
* @returns {string[]}
*/
getSupportedPartners() {
return Array.from(this.partners.keys());
}
/**
* Start health monitoring for all partners
*/
startHealthMonitoring() {
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
}
this.healthCheckTimer = setInterval(async () => {
await this.performHealthChecks();
}, this.healthCheckInterval);
debug('Started partner health monitoring');
}
/**
* Stop health monitoring
*/
stopHealthMonitoring() {
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
this.healthCheckTimer = null;
}
debug('Stopped partner health monitoring');
}
/**
* Perform health checks on all partners
*/
async performHealthChecks() {
const healthResults = [];
for (const [partnerType, service] of this.partners) {
try {
const result = await service.healthCheck();
healthResults.push({
partnerType,
...result,
timestamp: new Date()
});
} catch (error) {
healthResults.push({
partnerType,
status: 'error',
error: error.message,
timestamp: new Date()
});
}
}
// Store health results or emit events
this.emit('healthCheck', healthResults);
return healthResults;
}
/**
* Validate all partner configurations
* @returns {Promise<{valid: boolean, errors: string[]}>}
*/
async validateAllConfigurations() {
const errors = [];
for (const [partnerType, service] of this.partners) {
try {
const isValid = await service.validateConfiguration();
if (!isValid) {
errors.push(`Invalid configuration for partner: ${partnerType}`);
}
} catch (error) {
errors.push(`Configuration validation failed for ${partnerType}: ${error.message}`);
}
}
return {
valid: errors.length === 0,
errors
};
}
}
// Make registry an event emitter for health monitoring
const EventEmitter = require('events');
Object.setPrototypeOf(PartnerRegistry.prototype, EventEmitter.prototype);
module.exports = PartnerRegistry;
```
#### 1.3 Satloc Service Implementation
```javascript
// File: services/partners/SatlocService.js
const PartnerService = require('./PartnerService');
const axios = require('axios');
const FormData = require('form-data');
const debug = require('debug')('agm:satloc-service');
class SatlocService extends PartnerService {
constructor(config) {
super(config);
this.baseUrl = process.env.SATLOC_BASE_URL || 'https://www.satloccloud.com/api/Satloc';
this.credentials = null;
this.apiClient = axios.create({
baseURL: this.baseUrl,
timeout: config.apiConfig?.timeout?.request || 30000,
headers: {
'Content-Type': 'application/json',
'User-Agent': 'AgMission-Integration/1.0'
}
});
// Add request/response interceptors for logging and error handling
this.setupInterceptors();
}
setupInterceptors() {
this.apiClient.interceptors.request.use(
(config) => {
debug(`Satloc API Request: ${config.method?.toUpperCase()} ${config.url}`);
return config;
},
(error) => {
debug(`Satloc API Request Error: ${error.message}`);
return Promise.reject(error);
}
);
this.apiClient.interceptors.response.use(
(response) => {
debug(`Satloc API Response: ${response.status} ${response.config.url}`);
return response;
},
(error) => {
debug(`Satloc API Error: ${error.response?.status} ${error.message}`);
return Promise.reject(this.handleApiError(error));
}
);
}
handleApiError(error) {
if (error.response) {
const satlocError = error.response.data;
if (satlocError && !satlocError.IsSuccess) {
return new Error(`Satloc API Error: ${satlocError.ErrorMessage}`);
}
return new Error(`Satloc API Error: ${error.response.status} - ${error.response.data?.message || error.message}`);
} else if (error.request) {
return new Error(`Satloc API Timeout: No response received`);
} else {
return new Error(`Satloc API Error: ${error.message}`);
}
}
async authenticate() {
try {
const response = await this.apiClient.get('/AuthenticateAPIUser', {
params: {
userLogin: partnerSystemUser.username,
password: partnerSystemUser.password
}
});
if (response.data.IsSuccess) {
this.credentials = response.data.Result;
debug('Satloc authentication successful');
return {
success: true,
data: this.credentials
};
} else {
throw new Error(response.data.ErrorMessage);
}
} catch (error) {
debug(`Satloc authentication failed: ${error.message}`);
return {
success: false,
error: error.message
};
}
}
async uploadJobDefinition(job) {
if (!this.credentials) {
await this.authenticate();
}
try {
// Convert AgMission job format to Satloc format
const satlocJobData = this.convertJobToSatlocFormat(job);
const formData = new FormData();
formData.append('userId', this.credentials.UserId);
formData.append('aircraftId', job.aircraftId); // Should be mapped to Satloc aircraft
formData.append('jobData', satlocJobData.fileBuffer, satlocJobData.filename);
formData.append('metadata', JSON.stringify(satlocJobData.metadata));
const response = await axios.post(`${this.baseUrl}/UploadJobData`, formData, {
headers: {
...formData.getHeaders(),
'Authorization': `Bearer ${this.credentials.Token}`
},
timeout: this.config.timeout
});
if (response.data.IsSuccess) {
debug(`Job ${job._id} uploaded to Satloc successfully`);
return {
externalJobId: response.data.Result.JobId,
metadata: {
externalJobId: response.data.Result.JobId,
uploadTime: new Date(),
status: response.data.Result.Status,
estimatedCompletion: response.data.Result.EstimatedCompletion
}
};
} else {
throw new Error(response.data.ErrorMessage);
}
} catch (error) {
debug(`Failed to upload job ${job._id} to Satloc: ${error.message}`);
throw error;
}
}
async downloadFlightData(aircraftId, logId) {
if (!this.credentials) {
await this.authenticate();
}
try {
const response = await this.apiClient.get('/GetAircraftLogData', {
params: {
userId: this.credentials.UserId,
logId: logId
},
responseType: 'stream'
});
// Convert stream to buffer
const chunks = [];
for await (const chunk of response.data) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
const dataFiles = [{
filename: `satloc_flight_${logId}.dat`,
data: buffer,
format: 'satloc',
timestamp: new Date(),
size: buffer.length,
metadata: {
logId: logId,
aircraftId: aircraftId
}
}];
debug(`Downloaded flight data for log ${logId} from Satloc`);
return dataFiles;
} catch (error) {
debug(`Failed to download flight data for log ${logId}: ${error.message}`);
throw error;
}
}
async syncJobStatus(externalJobId) {
if (!this.credentials) {
await this.authenticate();
}
try {
// Satloc doesn't have direct job status endpoint, so we check via aircraft logs
const aircraftResponse = await this.apiClient.get('/GetAircraftList', {
params: {
userId: this.credentials.UserId,
companyId: this.credentials.CompanyId
}
});
if (aircraftResponse.data.IsSuccess) {
// For now, return in-progress status
// In actual implementation, you would correlate with aircraft logs
return {
status: 'in_progress',
progress: 50,
lastUpdate: new Date(),
metadata: {
externalJobId: externalJobId
}
};
} else {
throw new Error(aircraftResponse.data.ErrorMessage);
}
} catch (error) {
debug(`Failed to sync job status for ${externalJobId}: ${error.message}`);
throw error;
}
}
async convertToInternalFormat(data, format) {
try {
const internalData = [];
if (format === 'satloc') {
// Parse Satloc binary format and convert to internal format
const parsed = this.parseSatlocData(data);
for (const record of parsed) {
internalData.push({
lat: record.latitude,
lon: record.longitude,
sprayStat: record.sprayStatus,
gpsTime: record.timestamp,
alt: record.altitude,
grSpeed: record.groundSpeed,
partnerSpecific: {
satlocRecord: record
}
});
}
}
debug(`Converted ${internalData.length} records from Satloc format`);
return internalData;
} catch (error) {
debug(`Failed to convert Satloc data to internal format: ${error.message}`);
throw error;
}
}
async ping() {
try {
const response = await this.apiClient.get('/IsAlive');
if (response.data.IsSuccess) {
return {
status: 'healthy',
message: response.data.Result
};
} else {
throw new Error(response.data.ErrorMessage);
}
} catch (error) {
throw new Error(`Satloc ping failed: ${error.message}`);
}
}
async getAircraft() {
if (!this.credentials) {
await this.authenticate();
}
try {
const response = await this.apiClient.get('/GetAircraftList', {
params: {
userId: this.credentials.UserId,
companyId: this.credentials.CompanyId
}
});
if (response.data.IsSuccess) {
return response.data.Result.map(aircraft => ({
id: aircraft.AircraftId,
name: aircraft.AircraftName,
type: aircraft.AircraftType,
status: aircraft.Status,
lastSeen: aircraft.LastSeen
}));
} else {
throw new Error(response.data.ErrorMessage);
}
} catch (error) {
debug(`Failed to get aircraft list: ${error.message}`);
throw error;
}
}
// Helper methods
convertJobToSatlocFormat(job) {
// Convert AgMission job to Satloc format
const satlocJob = {
name: job.name,
areas: job.sprayAreas?.map(area => ({
name: area.properties?.name || 'Spray Area',
coordinates: area.geometry?.coordinates || [],
applicationRate: area.properties?.appRate || job.appRate
})) || [],
aircraft: {
swathWidth: job.swathWidth,
measurementUnit: job.measureUnit ? 'imperial' : 'metric'
},
schedule: {
startDate: job.startDate,
endDate: job.endDate
}
};
// Create file buffer from job data
const jobJson = JSON.stringify(satlocJob, null, 2);
const fileBuffer = Buffer.from(jobJson, 'utf-8');
return {
fileBuffer: fileBuffer,
filename: `agm_job_${job._id}.json`,
metadata: {
jobId: job._id.toString(),
jobName: job.name,
missionType: 'survey',
plannedDate: job.startDate,
priority: 'normal',
estimatedDuration: 3600
}
};
}
parseSatlocData(data) {
// Implement Satloc binary format parsing
// This is a placeholder - actual implementation would depend on Satloc's data format
const records = [];
try {
// Basic parsing logic for Satloc data format
// In real implementation, this would parse the actual binary format
const textData = data.toString('utf-8');
const lines = textData.split('\n');
for (const line of lines) {
if (line.trim() && !line.startsWith('#')) {
const parts = line.split(',');
if (parts.length >= 6) {
records.push({
timestamp: new Date(parts[0]),
latitude: parseFloat(parts[1]),
longitude: parseFloat(parts[2]),
altitude: parseFloat(parts[3]),
groundSpeed: parseFloat(parts[4]),
sprayStatus: parseInt(parts[5]) || 0
});
}
}
}
} catch (error) {
debug(`Error parsing Satloc data: ${error.message}`);
}
return records;
}
}
module.exports = SatlocService;
```
#### 1.4 Enhanced JobAssign Model
Update the existing JobAssign model to support partner integration:
```javascript
// File: model/job_assign.js (Enhanced)
const mongoose = require('mongoose'),
Schema = mongoose.Schema,
{ AssignStatus } = require('../helpers/constants');
const syncStateSchema = new Schema({
status: {
type: String,
enum: ['pending', 'syncing', 'synced', 'failed'],
default: 'pending'
},
attempts: { type: Number, default: 0, min: 0 },
lastAttempt: { type: Date },
lastSuccess: { type: Date },
error: { type: String },
errorCode: { type: String },
nextRetry: { type: Date }
}, { _id: false });
const retryPolicySchema = new Schema({
maxAttempts: { type: Number, default: 5, min: 1, max: 10 },
baseDelay: { type: Number, default: 5000, min: 1000 },
maxDelay: { type: Number, default: 300000 },
backoffMultiplier: { type: Number, default: 2, min: 1, max: 5 },
jitter: { type: Boolean, default: true }
}, { _id: false });
const schema = new Schema({
// Existing fields
job: { type: Number, ref: 'Job', required: true },
user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
status: {
type: Number,
enum: {
values: Object.values(AssignStatus),
default: AssignStatus.NEW
}
},
date: { type: Date, required: false, default: Date.now },
// New partner integration fields
partnerType: {
type: String,
enum: ['internal', 'satloc', 'dji', 'parrot', 'other'],
default: 'internal',
index: true
},
externalJobId: {
type: String,
sparse: true,
index: true
},
partnerMetadata: {
type: Schema.Types.Mixed,
default: null
},
// Sync state tracking
syncState: {
jobUpload: {
type: syncStateSchema,
default: function() {
return {
status: this.partnerType === 'internal' ? 'synced' : 'pending',
attempts: 0
};
}
},
dataPolling: {
type: syncStateSchema,
default: () => ({
status: 'idle',
attempts: 0
})
}
},
// Partner-specific configuration
partnerConfig: {
aircraftId: { type: String },
priority: {
type: String,
enum: ['low', 'normal', 'high'],
default: 'normal'
},
timeout: { type: Number, default: 30000 },
retryPolicy: {
type: retryPolicySchema,
default: () => ({})
}
},
// Performance metrics
metrics: {
syncDuration: { type: Number },
dataSize: { type: Number },
conversionTime: { type: Number },
uploadTime: { type: Number },
downloadTime: { type: Number }
}
}, {
timestamps: true,
toJSON: { virtuals: true },
toObject: { virtuals: true }
});
// Indexes
schema.index({ user: 1, status: 1 });
schema.index({ job: 1, partnerType: 1 });
schema.index({ partnerType: 1, 'syncState.jobUpload.status': 1 });
schema.index({ partnerType: 1, 'syncState.dataPolling.status': 1 });
schema.index({ externalJobId: 1, partnerType: 1 }, { unique: true, sparse: true });
// Virtual fields
schema.virtual('isPartnerAssignment').get(function() {
return this.partnerType !== 'internal';
});
schema.virtual('needsSync').get(function() {
return this.isPartnerAssignment &&
this.syncState.jobUpload.status === 'pending';
});
schema.virtual('needsPolling').get(function() {
return this.isPartnerAssignment &&
this.syncState.jobUpload.status === 'synced' &&
this.syncState.dataPolling.status === 'idle' &&
this.status < 2;
});
// Methods
schema.methods.updateSyncState = function(operation, update) {
if (!this.syncState[operation]) {
this.syncState[operation] = {};
}
Object.assign(this.syncState[operation], update);
this.markModified(`syncState.${operation}`);
};
schema.methods.incrementRetryCount = function(operation) {
this.syncState[operation].attempts = (this.syncState[operation].attempts || 0) + 1;
this.syncState[operation].lastAttempt = new Date();
this.markModified(`syncState.${operation}`);
};
schema.methods.calculateNextRetry = function(operation) {
const policy = this.partnerConfig.retryPolicy;
const attempts = this.syncState[operation].attempts || 0;
let delay = policy.baseDelay * Math.pow(policy.backoffMultiplier, attempts);
delay = Math.min(delay, policy.maxDelay);
if (policy.jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}
return new Date(Date.now() + delay);
};
module.exports = mongoose.model('JobAssign', schema);
```
### Phase 2: Queue System Implementation (Weeks 3-4)
#### 2.1 Partner Sync Queue System
```javascript
// File: services/queues/PartnerSyncQueue.js
const Bull = require('bull');
const Redis = require('ioredis');
const JobAssign = require('../../model/job_assign');
const debug = require('debug')('agm:partner-sync-queue');
class PartnerSyncQueue {
constructor(partnerRegistry, redisConfig) {
this.partnerRegistry = partnerRegistry;
this.redis = new Redis(redisConfig);
// Create separate queues for different operations
this.jobSyncQueue = new Bull('partner-job-sync', {
redis: redisConfig,
defaultJobOptions: {
removeOnComplete: 50,
removeOnFail: 100,
attempts: 1 // We handle retries manually
}
});
this.dataPollQueue = new Bull('partner-data-poll', {
redis: redisConfig,
defaultJobOptions: {
removeOnComplete: 20,
removeOnFail: 50,
attempts: 1
}
});
this.setupProcessors();
this.setupEventHandlers();
}
setupProcessors() {
// Job sync processor
this.jobSyncQueue.process('sync-job', 5, async (job) => {
return await this.processSyncJob(job.data);
});
// Data polling processor
this.dataPollQueue.process('poll-data', 10, async (job) => {
return await this.processDataPoll(job.data);
});
}
setupEventHandlers() {
this.jobSyncQueue.on('completed', (job, result) => {
debug(`Job sync completed: ${job.id}`, result);
});
this.jobSyncQueue.on('failed', (job, err) => {
debug(`Job sync failed: ${job.id}`, err.message);
});
this.dataPollQueue.on('completed', (job, result) => {
debug(`Data poll completed: ${job.id}`, result);
});
this.dataPollQueue.on('failed', (job, err) => {
debug(`Data poll failed: ${job.id}`, err.message);
});
}
async queueJobSync(assignmentId, options = {}) {
const delay = options.delay || 0;
const priority = this.getPriority(options.priority || 'normal');
const jobOptions = {
delay,
priority,
jobId: `sync-${assignmentId}-${Date.now()}`,
...options.jobOptions
};
const job = await this.jobSyncQueue.add('sync-job',
{ assignmentId, options },
jobOptions
);
debug(`Queued job sync for assignment ${assignmentId}, job ID: ${job.id}`);
return job;
}
async queueDataPoll(assignmentId, options = {}) {
const delay = options.delay || 0;
const priority = this.getPriority(options.priority || 'normal');
const jobOptions = {
delay,
priority,
jobId: `poll-${assignmentId}-${Date.now()}`,
...options.jobOptions
};
const job = await this.dataPollQueue.add('poll-data',
{ assignmentId, options },
jobOptions
);
debug(`Queued data poll for assignment ${assignmentId}, job ID: ${job.id}`);
return job;
}
async processSyncJob(data) {
const { assignmentId, options } = data;
const startTime = Date.now();
try {
const assignment = await JobAssign.findById(assignmentId)
.populate('job')
.populate('user');
if (!assignment) {
throw new Error(`Assignment not found: ${assignmentId}`);
}
// Update sync state to syncing
assignment.updateSyncState('jobUpload', {
status: 'syncing',
lastAttempt: new Date()
});
assignment.incrementRetryCount('jobUpload');
await assignment.save();
// Get partner service
const partnerService = this.partnerRegistry.get(assignment.partnerType);
if (!partnerService) {
throw new Error(`Partner service not found: ${assignment.partnerType}`);
}
// Upload job to partner
const result = await partnerService.uploadJobDefinition(assignment.job);
// Update assignment with success
assignment.externalJobId = result.externalJobId;
assignment.partnerMetadata = result.metadata;
assignment.updateSyncState('jobUpload', {
status: 'synced',
lastSuccess: new Date(),
error: null,
errorCode: null
});
assignment.metrics = assignment.metrics || {};
assignment.metrics.syncDuration = Date.now() - startTime;
assignment.metrics.uploadTime = Date.now() - startTime;
await assignment.save();
// Schedule data polling
await this.queueDataPoll(assignmentId, {
delay: 30000, // Poll after 30 seconds
priority: assignment.partnerConfig.priority
});
return {
success: true,
externalJobId: result.externalJobId,
syncDuration: Date.now() - startTime
};
} catch (error) {
// Handle sync failure
await this.handleSyncFailure(assignmentId, 'jobUpload', error);
throw error;
}
}
async processDataPoll(data) {
const { assignmentId, options } = data;
const startTime = Date.now();
try {
const assignment = await JobAssign.findById(assignmentId)
.populate('job')
.populate('user');
if (!assignment) {
throw new Error(`Assignment not found: ${assignmentId}`);
}
if (!assignment.externalJobId) {
throw new Error(`No external job ID for assignment: ${assignmentId}`);
}
// Update polling state
assignment.updateSyncState('dataPolling', {
status: 'polling',
lastAttempt: new Date()
});
assignment.incrementRetryCount('dataPolling');
await assignment.save();
// Get partner service
const partnerService = this.partnerRegistry.get(assignment.partnerType);
if (!partnerService) {
throw new Error(`Partner service not found: ${assignment.partnerType}`);
}
// Check for available data
const dataFiles = await partnerService.downloadFlightData(
assignment.partnerConfig.aircraftId || assignment.user._id.toString(),
assignment.externalJobId
);
if (dataFiles && dataFiles.length > 0) {
// Process data files
await this.processDataFiles(assignment, dataFiles);
// Update polling state to synced
assignment.updateSyncState('dataPolling', {
status: 'synced',
lastSuccess: new Date(),
error: null,
errorCode: null
});
assignment.metrics = assignment.metrics || {};
assignment.metrics.downloadTime = Date.now() - startTime;
assignment.metrics.dataSize = dataFiles.reduce((sum, file) => sum + file.data.length, 0);
await assignment.save();
return {
success: true,
filesFound: dataFiles.length,
dataSize: assignment.metrics.dataSize
};
} else {
// No data available yet, schedule next poll
assignment.updateSyncState('dataPolling', {
status: 'idle',
lastDataCheck: new Date()
});
await assignment.save();
// Schedule next poll if job is still active
if (assignment.status < 2) {
const pollInterval = assignment.partnerConfig.pollInterval || 60000;
await this.queueDataPoll(assignmentId, {
delay: pollInterval,
priority: assignment.partnerConfig.priority
});
}
return { success: true, filesFound: 0 };
}
} catch (error) {
// Handle polling failure
await this.handleSyncFailure(assignmentId, 'dataPolling', error);
throw error;
}
}
async processDataFiles(assignment, dataFiles) {
const { JobQueuer } = require('../../helpers/job_queue');
const { App, AppFile } = require('../../model');
const jobQueuer = JobQueuer.getInstance();
for (const dataFile of dataFiles) {
try {
// Create Application record
const app = new App({
jobId: assignment.job._id,
fileName: dataFile.filename,
fileSize: dataFile.data.length,
status: 1, // Processing
partnerType: assignment.partnerType,
externalJobId: assignment.externalJobId,
assignmentId: assignment._id,
originalData: {
format: dataFile.format,
encoding: 'binary',
checksum: this.calculateChecksum(dataFile.data)
},
partnerMetadata: dataFile.metadata,
processingStage: 'uploaded'
});
await app.save();
// Create AppFile record
const appFile = new AppFile({
appId: app._id,
name: dataFile.filename,
data: dataFile.data,
partnerType: assignment.partnerType,
originalFormat: dataFile.format,
meta: dataFile.metadata
});
await appFile.save();
// Queue for processing
const processingMessage = {
appId: app._id,
fileId: appFile._id,
jobId: assignment.job._id,
partnerType: assignment.partnerType,
externalJobId: assignment.externalJobId,
assignmentId: assignment._id,
timestamp: new Date()
};
// Use partner-specific queue
const queueName = `${assignment.partnerType}_jobs`;
await jobQueuer.publish('', queueName, Buffer.from(JSON.stringify(processingMessage)));
debug(`Queued processing for file ${dataFile.filename} from ${assignment.partnerType}`);
} catch (error) {
debug(`Error processing data file ${dataFile.filename}:`, error.message);
throw error;
}
}
}
async handleSyncFailure(assignmentId, operation, error) {
try {
const assignment = await JobAssign.findById(assignmentId);
if (!assignment) return;
const attempts = assignment.syncState[operation].attempts || 0;
const maxAttempts = assignment.partnerConfig.retryPolicy.maxAttempts || 5;
assignment.updateSyncState(operation, {
status: 'failed',
error: error.message,
errorCode: this.categorizeError(error)
});
if (attempts < maxAttempts) {
// Schedule retry
const nextRetry = assignment.calculateNextRetry(operation);
assignment.syncState[operation].nextRetry = nextRetry;
await assignment.save();
// Queue retry
if (operation === 'jobUpload') {
await this.queueJobSync(assignmentId, {
delay: nextRetry.getTime() - Date.now(),
priority: 'high' // Increase priority for retries
});
} else if (operation === 'dataPolling') {
await this.queueDataPoll(assignmentId, {
delay: nextRetry.getTime() - Date.now(),
priority: 'high'
});
}
debug(`Scheduled retry ${attempts + 1}/${maxAttempts} for ${operation} at ${nextRetry}`);
} else {
// Max retries reached
await assignment.save();
debug(`Max retries reached for ${operation} on assignment ${assignmentId}`);
// Emit event for monitoring
this.emit('maxRetriesReached', {
assignmentId,
operation,
error: error.message,
attempts
});
}
} catch (saveError) {
debug(`Error handling sync failure:`, saveError.message);
}
}
categorizeError(error) {
const message = error.message.toLowerCase();
if (message.includes('timeout') || message.includes('network')) {
return 'NETWORK_ERROR';
} else if (message.includes('authentication') || message.includes('unauthorized')) {
return 'AUTH_ERROR';
} else if (message.includes('rate limit')) {
return 'RATE_LIMIT';
} else if (message.includes('not found')) {
return 'NOT_FOUND';
} else {
return 'UNKNOWN_ERROR';
}
}
calculateChecksum(data) {
const crypto = require('crypto');
return crypto.createHash('sha256').update(data).digest('hex');
}
getPriority(priority) {
const priorities = {
low: 1,
normal: 5,
high: 10
};
return priorities[priority] || priorities.normal;
}
// Clean up and monitoring methods
async getQueueStats() {
const [syncActive, syncWaiting, syncCompleted, syncFailed] = await Promise.all([
this.jobSyncQueue.getActive(),
this.jobSyncQueue.getWaiting(),
this.jobSyncQueue.getCompleted(),
this.jobSyncQueue.getFailed()
]);
const [pollActive, pollWaiting, pollCompleted, pollFailed] = await Promise.all([
this.dataPollQueue.getActive(),
this.dataPollQueue.getWaiting(),
this.dataPollQueue.getCompleted(),
this.dataPollQueue.getFailed()
]);
return {
jobSync: {
active: syncActive.length,
waiting: syncWaiting.length,
completed: syncCompleted.length,
failed: syncFailed.length
},
dataPoll: {
active: pollActive.length,
waiting: pollWaiting.length,
completed: pollCompleted.length,
failed: pollFailed.length
}
};
}
async shutdown() {
debug('Shutting down partner sync queue...');
await Promise.all([
this.jobSyncQueue.close(),
this.dataPollQueue.close()
]);
await this.redis.disconnect();
debug('Partner sync queue shutdown complete');
}
}
// Make it an event emitter for monitoring
const EventEmitter = require('events');
Object.setPrototypeOf(PartnerSyncQueue.prototype, EventEmitter.prototype);
module.exports = PartnerSyncQueue;
```
### Phase 3: Enhanced Job Controller (Week 4)
#### 3.1 Update Job Assignment Controller
Update the existing `assign_post` function to support partner integration:
```javascript
// File: controllers/job.js (Enhanced assign_post function)
async function assign_post(req, res) {
const _params = req.body;
if (!_params || !_params.jobId || !_params.dlOp || !_params.asUsers) {
AppParamError.throw();
}
const job = await Job.findById(_params.jobId).select('dlOp');
if (!job) AppError.throw(Errors.JOB_NOT_FOUND);
// Update job download options
if (_params.dlOp.type !== job.dlOp.type) {
await Job.updateOne({ _id: _params.jobId }, {
$set: { "dlOp.type": _params.dlOp.type }
});
}
// Handle removal of assignments
let _avIds = [];
if (!utils.isEmptyArray(_params.avUsers)) {
for (const it of _params.avUsers) {
if (ObjectId.isValid(it.uid)) _avIds.push(ObjectId(it.uid));
}
}
if (_avIds.length) {
await JobAssign.deleteMany({
$or: [
{ job: _params.jobId, status: 0 },
{ job: _params.jobId, user: { $in: _avIds } }
]
});
} else {
await JobAssign.deleteMany({ job: _params.jobId, status: 0 });
}
// Handle new assignments
const assignmentResults = [];
const errors = [];
if (!utils.isEmptyArray(_params.asUsers)) {
const asUIds = [];
for (const it of _params.asUsers) {
if (ObjectId.isValid(it.uid)) asUIds.push(ObjectId(it.uid));
}
const doneJUs = await JobAssign.find({
job: _params.jobId,
user: { $in: asUIds },
status: { $gt: 0 }
}, 'user').lean();
const _doneIds = doneJUs ? doneJUs.map(it => it.user) : [];
const newItems = [];
for (const it of _params.asUsers) {
if (!utils.objectIdIn(_doneIds, it.uid)) {
const partnerType = it.partnerType || 'internal';
// Validate partner type
if (partnerType !== 'internal') {
const partnerRegistry = req.app.locals.partnerRegistry;
if (!partnerRegistry.isPartnerSupported(partnerType)) {
errors.push({
userId: it.uid,
error: `Unsupported partner type: ${partnerType}`
});
continue;
}
}
const assignmentData = {
user: it.uid,
job: _params.jobId,
status: 0,
partnerType,
partnerConfig: {
aircraftId: it.partnerConfig?.aircraftId,
priority: it.partnerConfig?.priority || 'normal',
timeout: it.partnerConfig?.timeout || 30000,
retryPolicy: {
maxAttempts: 5,
baseDelay: 5000,
maxDelay: 300000,
backoffMultiplier: 2,
jitter: true
}
},
syncState: {
jobUpload: {
status: partnerType === 'internal' ? 'synced' : 'pending',
attempts: 0
},
dataPolling: {
status: 'idle',
attempts: 0
}
}
};
// Add partner-specific metadata
if (it.partnerConfig) {
assignmentData.partnerMetadata = it.partnerConfig.customFields || {};
}
newItems.push(assignmentData);
}
}
if (newItems.length) {
const insertedAssignments = await JobAssign.insertMany(newItems);
// Queue partner sync tasks for non-internal assignments
const partnerSyncQueue = req.app.locals.partnerSyncQueue;
for (const assignment of insertedAssignments) {
if (assignment.partnerType !== 'internal') {
try {
const syncJob = await partnerSyncQueue.queueJobSync(assignment._id, {
priority: assignment.partnerConfig.priority,
delay: 1000 // Small delay to ensure database consistency
});
assignmentResults.push({
assignmentId: assignment._id,
userId: assignment.user,
partnerType: assignment.partnerType,
status: 'assigned',
syncStatus: 'queued',
syncJobId: syncJob.id
});
} catch (syncError) {
errors.push({
userId: assignment.user,
partnerType: assignment.partnerType,
error: `Failed to queue sync: ${syncError.message}`
});
}
} else {
assignmentResults.push({
assignmentId: assignment._id,
userId: assignment.user,
partnerType: assignment.partnerType,
status: 'assigned',
syncStatus: 'synced'
});
}
}
}
}
// Return enhanced response
res.json({
ok: true,
assignments: assignmentResults,
errors,
summary: {
totalAssignments: assignmentResults.length,
successfulAssignments: assignmentResults.length,
failedAssignments: errors.length,
partnerAssignments: assignmentResults.filter(a => a.partnerType !== 'internal').length,
internalAssignments: assignmentResults.filter(a => a.partnerType === 'internal').length
}
});
}
```
## Testing Strategy
### Unit Tests
```javascript
// tests/unit/services/partners/SatlocService.test.js
const SatlocService = require('../../../../services/partners/SatlocService');
const nock = require('nock');
describe('SatlocService', () => {
let satlocService;
let mockConfig;
beforeEach(() => {
mockConfig = {
code: 'satloc',
apiConfig: {
baseUrl: 'https://api.satloc.test',
timeout: { request: 5000 },
authentication: {
credentials: { token: 'test-token' }
}
}
};
satlocService = new SatlocService(mockConfig);
});
describe('uploadJobDefinition', () => {
it('should upload job successfully', async () => {
const mockJob = {
_id: 123,
name: 'Test Job',
sprayAreas: [],
swathWidth: 10,
measureUnit: true
};
nock('https://api.satloc.test')
.post('/jobs')
.reply(200, {
jobId: 'satloc_123',
version: '1.0'
});
const result = await satlocService.uploadJobDefinition(mockJob);
expect(result.externalJobId).toBe('satloc_123');
expect(result.metadata.satlocJobId).toBe('satloc_123');
});
it('should handle upload failure', async () => {
const mockJob = { _id: 123, name: 'Test Job' };
nock('https://api.satloc.test')
.post('/jobs')
.reply(500, { message: 'Internal Server Error' });
await expect(satlocService.uploadJobDefinition(mockJob))
.rejects.toThrow('Satloc API Error: 500');
});
});
});
```
### Integration Tests
```javascript
// tests/integration/partner-assignment.test.js
const request = require('supertest');
const app = require('../../server');
const JobAssign = require('../../model/job_assign');
describe('Partner Assignment Integration', () => {
beforeEach(async () => {
await JobAssign.deleteMany({});
});
it('should assign job to partner successfully', async () => {
const assignmentData = {
jobId: 123,
dlOp: { type: 1 },
asUsers: [{
uid: 'user_123',
partnerType: 'satloc',
partnerConfig: {
aircraftId: 'AC001',
priority: 'high'
}
}]
};
const response = await request(app)
.post('/api/jobs/123/assign')
.send(assignmentData)
.expect(200);
expect(response.body.ok).toBe(true);
expect(response.body.assignments).toHaveLength(1);
expect(response.body.assignments[0].partnerType).toBe('satloc');
// Verify database record
const assignment = await JobAssign.findById(response.body.assignments[0].assignmentId);
expect(assignment.partnerType).toBe('satloc');
expect(assignment.partnerConfig.aircraftId).toBe('AC001');
});
});
```
## Deployment Guide
### Environment Setup
```bash
# 1. Install dependencies
npm install bull ioredis form-data
# 2. Set environment variables (Customer-specific credentials stored in PartnerSystemUser records)
export REDIS_URL=redis://localhost:6379
export SATLOC_BASE_URL=https://www.satloccloud.com/api/Satloc
export SATLOC_TIMEOUT=30000
# 3. Run database migration
node scripts/migrate-job-assignments.js
# 4. Start queue workers
node workers/partner-sync-worker.js &
```
### Production Checklist
- [ ] Redis cluster configured for queue persistence
- [ ] Partner API credentials securely stored
- [ ] Monitoring and alerting set up
- [ ] Queue worker processes configured with PM2
- [ ] Database indexes created
- [ ] Backup and recovery procedures tested
- [ ] Load testing completed
- [ ] Documentation updated
This implementation guide provides a comprehensive roadmap for integrating the multi-partner system while maintaining backward compatibility and ensuring robust operation.