1651 lines
45 KiB
Markdown
1651 lines
45 KiB
Markdown
# Partner Integration Implementation Guide
|
|
|
|
## Overview
|
|
|
|
This guide provides step-by-step implementation instructions for the AgMission partner integration system using a dual-user approach with environment-based configuration.
|
|
|
|
## System Architecture
|
|
|
|
### Dual User Model
|
|
|
|
The system uses two types of user entities:
|
|
|
|
1. **Partner Organizations**: Companies providing integration services (e.g., SatLoc, AgIDronex)
|
|
2. **Partner System Users**: Customer accounts within each partner system
|
|
|
|
```
|
|
AgMission Customer
|
|
↓
|
|
Partner System User (SatLoc Account)
|
|
↓
|
|
Partner Organization (SatLoc Company)
|
|
↓
|
|
External API Integration
|
|
```
|
|
|
|
### Benefits of This Approach
|
|
|
|
- **Simplified Management**: Partners are User entities with authentication/authorization
|
|
- **Customer Isolation**: Each customer has their own partner system credentials
|
|
- **Environment Configuration**: Partner settings managed via environment variables
|
|
- **Scalability**: Easy to add new partners and customer accounts
|
|
|
|
## Implementation Steps
|
|
|
|
### Step 1: Environment Configuration
|
|
|
|
Create partner configuration in your `.env` file:
|
|
|
|
```bash
|
|
# Global Partner Settings
|
|
PARTNER_SYNC_INTERVAL=300000
|
|
PARTNER_HEALTH_CHECK_INTERVAL=60000
|
|
PARTNER_MAX_CONCURRENT_JOBS=10
|
|
PARTNER_ENCRYPT_CREDENTIALS=true
|
|
|
|
# SatLoc Configuration
|
|
SATLOC_API_ENDPOINT=https://www.satloccloud.com/api/Satloc
|
|
SATLOC_API_KEY=your_default_satloc_key
|
|
SATLOC_API_SECRET=your_default_satloc_secret
|
|
SATLOC_API_TIMEOUT=30000
|
|
SATLOC_RETRY_ATTEMPTS=3
|
|
SATLOC_RATE_LIMIT=60
|
|
```
|
|
|
|
### Step 2: Create Partner Organization
|
|
|
|
```javascript
|
|
// Create SatLoc partner organization
|
|
POST /api/partner/createPartner
|
|
{
|
|
"name": "SatLoc Cloud",
|
|
"username": "satloc",
|
|
"partnerCode": "SATLOC",
|
|
"partnerName": "SatLoc Cloud Services",
|
|
"active": true,
|
|
"configuration": {
|
|
"supportsRealtime": false,
|
|
"maxFileSize": 10485760,
|
|
"supportedFormats": ["kml", "shp", "geojson"]
|
|
}
|
|
}
|
|
```
|
|
|
|
### Step 3: Create Partner System Users
|
|
|
|
For each customer that needs SatLoc integration:
|
|
|
|
```javascript
|
|
// Create customer's SatLoc account
|
|
POST /api/partner/createSystemUser
|
|
{
|
|
"partnerId": "partner_organization_id",
|
|
"customerId": "agmission_customer_id",
|
|
"name": "Customer SatLoc Account",
|
|
"partnerUserId": "customer_satloc_user_id",
|
|
"partnerUsername": "customer_username_in_satloc",
|
|
"companyId": "customer_satloc_company_id",
|
|
"apiKey": "customer_specific_api_key",
|
|
"apiSecret": "customer_specific_api_secret",
|
|
"active": true
|
|
}
|
|
```
|
|
|
|
## Implementation Phases
|
|
|
|
### Phase 1: Foundation Setup (Weeks 1-2)
|
|
|
|
#### 1.1 Partner Service Interface Implementation
|
|
|
|
Create the core partner service interface:
|
|
|
|
```javascript
|
|
// File: services/partners/PartnerService.js
|
|
class PartnerService {
|
|
constructor(config) {
|
|
this.config = config;
|
|
this.partnerType = config.code;
|
|
}
|
|
|
|
/**
|
|
* Get the partner type identifier
|
|
* @returns {string} Partner type (e.g., 'satloc', 'dji')
|
|
*/
|
|
getPartnerType() {
|
|
return this.partnerType;
|
|
}
|
|
|
|
/**
|
|
* Upload job definition to partner system
|
|
* @param {Object} job - Job object from database
|
|
* @returns {Promise<{externalJobId: string, metadata?: any}>}
|
|
*/
|
|
async uploadJobDefinition(job) {
|
|
throw new Error('uploadJobDefinition must be implemented by partner service');
|
|
}
|
|
|
|
/**
|
|
* Download flight data from partner system
|
|
* @param {string} aircraftId - Partner's aircraft identifier
|
|
* @param {string} externalJobId - Partner's job identifier
|
|
* @returns {Promise<FlightDataFile[]>}
|
|
*/
|
|
async downloadFlightData(aircraftId, externalJobId) {
|
|
throw new Error('downloadFlightData must be implemented by partner service');
|
|
}
|
|
|
|
/**
|
|
* Check job status in partner system
|
|
* @param {string} externalJobId - Partner's job identifier
|
|
* @returns {Promise<JobStatus>}
|
|
*/
|
|
async syncJobStatus(externalJobId) {
|
|
throw new Error('syncJobStatus must be implemented by partner service');
|
|
}
|
|
|
|
/**
|
|
* Convert partner data format to internal format
|
|
* @param {any} data - Raw partner data
|
|
* @param {string} format - Original data format
|
|
* @returns {Promise<InternalData[]>}
|
|
*/
|
|
async convertToInternalFormat(data, format) {
|
|
throw new Error('convertToInternalFormat must be implemented by partner service');
|
|
}
|
|
|
|
/**
|
|
* Validate partner configuration
|
|
* @returns {Promise<boolean>}
|
|
*/
|
|
async validateConfiguration() {
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Health check for partner service
|
|
* @returns {Promise<{status: string, responseTime: number}>}
|
|
*/
|
|
async healthCheck() {
|
|
const start = Date.now();
|
|
try {
|
|
// Implement partner-specific health check
|
|
await this.ping();
|
|
return {
|
|
status: 'healthy',
|
|
responseTime: Date.now() - start
|
|
};
|
|
} catch (error) {
|
|
return {
|
|
status: 'unhealthy',
|
|
responseTime: Date.now() - start,
|
|
error: error.message
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Basic ping to partner API
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async ping() {
|
|
// Default implementation - override in specific services
|
|
throw new Error('ping must be implemented by partner service');
|
|
}
|
|
}
|
|
|
|
module.exports = PartnerService;
|
|
```
|
|
|
|
#### 1.2 Partner Registry Implementation
|
|
|
|
```javascript
|
|
// File: services/partners/PartnerRegistry.js
|
|
const debug = require('debug')('agm:partner-registry');
|
|
|
|
class PartnerRegistry {
|
|
constructor() {
|
|
this.partners = new Map();
|
|
this.healthCheckInterval = 300000; // 5 minutes
|
|
this.healthCheckTimer = null;
|
|
}
|
|
|
|
/**
|
|
* Register a partner service
|
|
* @param {string} partnerType - Partner type identifier
|
|
* @param {PartnerService} service - Partner service instance
|
|
*/
|
|
register(partnerType, service) {
|
|
if (!service || typeof service.getPartnerType !== 'function') {
|
|
throw new Error(`Invalid partner service for type: ${partnerType}`);
|
|
}
|
|
|
|
this.partners.set(partnerType, service);
|
|
debug(`Registered partner service: ${partnerType}`);
|
|
}
|
|
|
|
/**
|
|
* Get a partner service by type
|
|
* @param {string} partnerType - Partner type identifier
|
|
* @returns {PartnerService|undefined}
|
|
*/
|
|
get(partnerType) {
|
|
return this.partners.get(partnerType);
|
|
}
|
|
|
|
/**
|
|
* Get all registered partner services
|
|
* @returns {PartnerService[]}
|
|
*/
|
|
getAll() {
|
|
return Array.from(this.partners.values());
|
|
}
|
|
|
|
/**
|
|
* Check if a partner type is supported
|
|
* @param {string} partnerType - Partner type identifier
|
|
* @returns {boolean}
|
|
*/
|
|
isPartnerSupported(partnerType) {
|
|
return this.partners.has(partnerType);
|
|
}
|
|
|
|
/**
|
|
* Get list of supported partner types
|
|
* @returns {string[]}
|
|
*/
|
|
getSupportedPartners() {
|
|
return Array.from(this.partners.keys());
|
|
}
|
|
|
|
/**
|
|
* Start health monitoring for all partners
|
|
*/
|
|
startHealthMonitoring() {
|
|
if (this.healthCheckTimer) {
|
|
clearInterval(this.healthCheckTimer);
|
|
}
|
|
|
|
this.healthCheckTimer = setInterval(async () => {
|
|
await this.performHealthChecks();
|
|
}, this.healthCheckInterval);
|
|
|
|
debug('Started partner health monitoring');
|
|
}
|
|
|
|
/**
|
|
* Stop health monitoring
|
|
*/
|
|
stopHealthMonitoring() {
|
|
if (this.healthCheckTimer) {
|
|
clearInterval(this.healthCheckTimer);
|
|
this.healthCheckTimer = null;
|
|
}
|
|
debug('Stopped partner health monitoring');
|
|
}
|
|
|
|
/**
|
|
* Perform health checks on all partners
|
|
*/
|
|
async performHealthChecks() {
|
|
const healthResults = [];
|
|
|
|
for (const [partnerType, service] of this.partners) {
|
|
try {
|
|
const result = await service.healthCheck();
|
|
healthResults.push({
|
|
partnerType,
|
|
...result,
|
|
timestamp: new Date()
|
|
});
|
|
} catch (error) {
|
|
healthResults.push({
|
|
partnerType,
|
|
status: 'error',
|
|
error: error.message,
|
|
timestamp: new Date()
|
|
});
|
|
}
|
|
}
|
|
|
|
// Store health results or emit events
|
|
this.emit('healthCheck', healthResults);
|
|
return healthResults;
|
|
}
|
|
|
|
/**
|
|
* Validate all partner configurations
|
|
* @returns {Promise<{valid: boolean, errors: string[]}>}
|
|
*/
|
|
async validateAllConfigurations() {
|
|
const errors = [];
|
|
|
|
for (const [partnerType, service] of this.partners) {
|
|
try {
|
|
const isValid = await service.validateConfiguration();
|
|
if (!isValid) {
|
|
errors.push(`Invalid configuration for partner: ${partnerType}`);
|
|
}
|
|
} catch (error) {
|
|
errors.push(`Configuration validation failed for ${partnerType}: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
return {
|
|
valid: errors.length === 0,
|
|
errors
|
|
};
|
|
}
|
|
}
|
|
|
|
// Make registry an event emitter for health monitoring
|
|
const EventEmitter = require('events');
|
|
Object.setPrototypeOf(PartnerRegistry.prototype, EventEmitter.prototype);
|
|
|
|
module.exports = PartnerRegistry;
|
|
```
|
|
|
|
#### 1.3 Satloc Service Implementation
|
|
|
|
```javascript
|
|
// File: services/partners/SatlocService.js
|
|
const PartnerService = require('./PartnerService');
|
|
const axios = require('axios');
|
|
const FormData = require('form-data');
|
|
const debug = require('debug')('agm:satloc-service');
|
|
|
|
class SatlocService extends PartnerService {
|
|
constructor(config) {
|
|
super(config);
|
|
this.baseUrl = process.env.SATLOC_BASE_URL || 'https://www.satloccloud.com/api/Satloc';
|
|
this.credentials = null;
|
|
this.apiClient = axios.create({
|
|
baseURL: this.baseUrl,
|
|
timeout: config.apiConfig?.timeout?.request || 30000,
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'User-Agent': 'AgMission-Integration/1.0'
|
|
}
|
|
});
|
|
|
|
// Add request/response interceptors for logging and error handling
|
|
this.setupInterceptors();
|
|
}
|
|
|
|
setupInterceptors() {
|
|
this.apiClient.interceptors.request.use(
|
|
(config) => {
|
|
debug(`Satloc API Request: ${config.method?.toUpperCase()} ${config.url}`);
|
|
return config;
|
|
},
|
|
(error) => {
|
|
debug(`Satloc API Request Error: ${error.message}`);
|
|
return Promise.reject(error);
|
|
}
|
|
);
|
|
|
|
this.apiClient.interceptors.response.use(
|
|
(response) => {
|
|
debug(`Satloc API Response: ${response.status} ${response.config.url}`);
|
|
return response;
|
|
},
|
|
(error) => {
|
|
debug(`Satloc API Error: ${error.response?.status} ${error.message}`);
|
|
return Promise.reject(this.handleApiError(error));
|
|
}
|
|
);
|
|
}
|
|
|
|
handleApiError(error) {
|
|
if (error.response) {
|
|
const satlocError = error.response.data;
|
|
if (satlocError && !satlocError.IsSuccess) {
|
|
return new Error(`Satloc API Error: ${satlocError.ErrorMessage}`);
|
|
}
|
|
return new Error(`Satloc API Error: ${error.response.status} - ${error.response.data?.message || error.message}`);
|
|
} else if (error.request) {
|
|
return new Error(`Satloc API Timeout: No response received`);
|
|
} else {
|
|
return new Error(`Satloc API Error: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
async authenticate() {
|
|
try {
|
|
const response = await this.apiClient.get('/AuthenticateAPIUser', {
|
|
params: {
|
|
userLogin: partnerSystemUser.username,
|
|
password: partnerSystemUser.password
|
|
}
|
|
});
|
|
|
|
if (response.data.IsSuccess) {
|
|
this.credentials = response.data.Result;
|
|
debug('Satloc authentication successful');
|
|
return {
|
|
success: true,
|
|
data: this.credentials
|
|
};
|
|
} else {
|
|
throw new Error(response.data.ErrorMessage);
|
|
}
|
|
} catch (error) {
|
|
debug(`Satloc authentication failed: ${error.message}`);
|
|
return {
|
|
success: false,
|
|
error: error.message
|
|
};
|
|
}
|
|
}
|
|
|
|
async uploadJobDefinition(job) {
|
|
if (!this.credentials) {
|
|
await this.authenticate();
|
|
}
|
|
|
|
try {
|
|
// Convert AgMission job format to Satloc format
|
|
const satlocJobData = this.convertJobToSatlocFormat(job);
|
|
|
|
const formData = new FormData();
|
|
formData.append('userId', this.credentials.UserId);
|
|
formData.append('aircraftId', job.aircraftId); // Should be mapped to Satloc aircraft
|
|
formData.append('jobData', satlocJobData.fileBuffer, satlocJobData.filename);
|
|
formData.append('metadata', JSON.stringify(satlocJobData.metadata));
|
|
|
|
const response = await axios.post(`${this.baseUrl}/UploadJobData`, formData, {
|
|
headers: {
|
|
...formData.getHeaders(),
|
|
'Authorization': `Bearer ${this.credentials.Token}`
|
|
},
|
|
timeout: this.config.timeout
|
|
});
|
|
|
|
if (response.data.IsSuccess) {
|
|
debug(`Job ${job._id} uploaded to Satloc successfully`);
|
|
return {
|
|
externalJobId: response.data.Result.JobId,
|
|
metadata: {
|
|
externalJobId: response.data.Result.JobId,
|
|
uploadTime: new Date(),
|
|
status: response.data.Result.Status,
|
|
estimatedCompletion: response.data.Result.EstimatedCompletion
|
|
}
|
|
};
|
|
} else {
|
|
throw new Error(response.data.ErrorMessage);
|
|
}
|
|
} catch (error) {
|
|
debug(`Failed to upload job ${job._id} to Satloc: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async downloadFlightData(aircraftId, logId) {
|
|
if (!this.credentials) {
|
|
await this.authenticate();
|
|
}
|
|
|
|
try {
|
|
const response = await this.apiClient.get('/GetAircraftLogData', {
|
|
params: {
|
|
userId: this.credentials.UserId,
|
|
logId: logId
|
|
},
|
|
responseType: 'stream'
|
|
});
|
|
|
|
// Convert stream to buffer
|
|
const chunks = [];
|
|
for await (const chunk of response.data) {
|
|
chunks.push(chunk);
|
|
}
|
|
const buffer = Buffer.concat(chunks);
|
|
|
|
const dataFiles = [{
|
|
filename: `satloc_flight_${logId}.dat`,
|
|
data: buffer,
|
|
format: 'satloc',
|
|
timestamp: new Date(),
|
|
size: buffer.length,
|
|
metadata: {
|
|
logId: logId,
|
|
aircraftId: aircraftId
|
|
}
|
|
}];
|
|
|
|
debug(`Downloaded flight data for log ${logId} from Satloc`);
|
|
return dataFiles;
|
|
} catch (error) {
|
|
debug(`Failed to download flight data for log ${logId}: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async syncJobStatus(externalJobId) {
|
|
if (!this.credentials) {
|
|
await this.authenticate();
|
|
}
|
|
|
|
try {
|
|
// Satloc doesn't have direct job status endpoint, so we check via aircraft logs
|
|
const aircraftResponse = await this.apiClient.get('/GetAircraftList', {
|
|
params: {
|
|
userId: this.credentials.UserId,
|
|
companyId: this.credentials.CompanyId
|
|
}
|
|
});
|
|
|
|
if (aircraftResponse.data.IsSuccess) {
|
|
// For now, return in-progress status
|
|
// In actual implementation, you would correlate with aircraft logs
|
|
return {
|
|
status: 'in_progress',
|
|
progress: 50,
|
|
lastUpdate: new Date(),
|
|
metadata: {
|
|
externalJobId: externalJobId
|
|
}
|
|
};
|
|
} else {
|
|
throw new Error(aircraftResponse.data.ErrorMessage);
|
|
}
|
|
} catch (error) {
|
|
debug(`Failed to sync job status for ${externalJobId}: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async convertToInternalFormat(data, format) {
|
|
try {
|
|
const internalData = [];
|
|
|
|
if (format === 'satloc') {
|
|
// Parse Satloc binary format and convert to internal format
|
|
const parsed = this.parseSatlocData(data);
|
|
|
|
for (const record of parsed) {
|
|
internalData.push({
|
|
lat: record.latitude,
|
|
lon: record.longitude,
|
|
sprayStat: record.sprayStatus,
|
|
gpsTime: record.timestamp,
|
|
alt: record.altitude,
|
|
grSpeed: record.groundSpeed,
|
|
partnerSpecific: {
|
|
satlocRecord: record
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
debug(`Converted ${internalData.length} records from Satloc format`);
|
|
return internalData;
|
|
} catch (error) {
|
|
debug(`Failed to convert Satloc data to internal format: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async ping() {
|
|
try {
|
|
const response = await this.apiClient.get('/IsAlive');
|
|
if (response.data.IsSuccess) {
|
|
return {
|
|
status: 'healthy',
|
|
message: response.data.Result
|
|
};
|
|
} else {
|
|
throw new Error(response.data.ErrorMessage);
|
|
}
|
|
} catch (error) {
|
|
throw new Error(`Satloc ping failed: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
async getAircraft() {
|
|
if (!this.credentials) {
|
|
await this.authenticate();
|
|
}
|
|
|
|
try {
|
|
const response = await this.apiClient.get('/GetAircraftList', {
|
|
params: {
|
|
userId: this.credentials.UserId,
|
|
companyId: this.credentials.CompanyId
|
|
}
|
|
});
|
|
|
|
if (response.data.IsSuccess) {
|
|
return response.data.Result.map(aircraft => ({
|
|
id: aircraft.AircraftId,
|
|
name: aircraft.AircraftName,
|
|
type: aircraft.AircraftType,
|
|
status: aircraft.Status,
|
|
lastSeen: aircraft.LastSeen
|
|
}));
|
|
} else {
|
|
throw new Error(response.data.ErrorMessage);
|
|
}
|
|
} catch (error) {
|
|
debug(`Failed to get aircraft list: ${error.message}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Helper methods
|
|
convertJobToSatlocFormat(job) {
|
|
// Convert AgMission job to Satloc format
|
|
const satlocJob = {
|
|
name: job.name,
|
|
areas: job.sprayAreas?.map(area => ({
|
|
name: area.properties?.name || 'Spray Area',
|
|
coordinates: area.geometry?.coordinates || [],
|
|
applicationRate: area.properties?.appRate || job.appRate
|
|
})) || [],
|
|
aircraft: {
|
|
swathWidth: job.swathWidth,
|
|
measurementUnit: job.measureUnit ? 'imperial' : 'metric'
|
|
},
|
|
schedule: {
|
|
startDate: job.startDate,
|
|
endDate: job.endDate
|
|
}
|
|
};
|
|
|
|
// Create file buffer from job data
|
|
const jobJson = JSON.stringify(satlocJob, null, 2);
|
|
const fileBuffer = Buffer.from(jobJson, 'utf-8');
|
|
|
|
return {
|
|
fileBuffer: fileBuffer,
|
|
filename: `agm_job_${job._id}.json`,
|
|
metadata: {
|
|
jobId: job._id.toString(),
|
|
jobName: job.name,
|
|
missionType: 'survey',
|
|
plannedDate: job.startDate,
|
|
priority: 'normal',
|
|
estimatedDuration: 3600
|
|
}
|
|
};
|
|
}
|
|
|
|
parseSatlocData(data) {
|
|
// Implement Satloc binary format parsing
|
|
// This is a placeholder - actual implementation would depend on Satloc's data format
|
|
const records = [];
|
|
|
|
try {
|
|
// Basic parsing logic for Satloc data format
|
|
// In real implementation, this would parse the actual binary format
|
|
const textData = data.toString('utf-8');
|
|
const lines = textData.split('\n');
|
|
|
|
for (const line of lines) {
|
|
if (line.trim() && !line.startsWith('#')) {
|
|
const parts = line.split(',');
|
|
if (parts.length >= 6) {
|
|
records.push({
|
|
timestamp: new Date(parts[0]),
|
|
latitude: parseFloat(parts[1]),
|
|
longitude: parseFloat(parts[2]),
|
|
altitude: parseFloat(parts[3]),
|
|
groundSpeed: parseFloat(parts[4]),
|
|
sprayStatus: parseInt(parts[5]) || 0
|
|
});
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
debug(`Error parsing Satloc data: ${error.message}`);
|
|
}
|
|
|
|
return records;
|
|
}
|
|
}
|
|
|
|
module.exports = SatlocService;
|
|
```
|
|
|
|
#### 1.4 Enhanced JobAssign Model
|
|
|
|
Update the existing JobAssign model to support partner integration:
|
|
|
|
```javascript
|
|
// File: model/job_assign.js (Enhanced)
|
|
const mongoose = require('mongoose'),
|
|
Schema = mongoose.Schema,
|
|
{ AssignStatus } = require('../helpers/constants');
|
|
|
|
const syncStateSchema = new Schema({
|
|
status: {
|
|
type: String,
|
|
enum: ['pending', 'syncing', 'synced', 'failed'],
|
|
default: 'pending'
|
|
},
|
|
attempts: { type: Number, default: 0, min: 0 },
|
|
lastAttempt: { type: Date },
|
|
lastSuccess: { type: Date },
|
|
error: { type: String },
|
|
errorCode: { type: String },
|
|
nextRetry: { type: Date }
|
|
}, { _id: false });
|
|
|
|
const retryPolicySchema = new Schema({
|
|
maxAttempts: { type: Number, default: 5, min: 1, max: 10 },
|
|
baseDelay: { type: Number, default: 5000, min: 1000 },
|
|
maxDelay: { type: Number, default: 300000 },
|
|
backoffMultiplier: { type: Number, default: 2, min: 1, max: 5 },
|
|
jitter: { type: Boolean, default: true }
|
|
}, { _id: false });
|
|
|
|
const schema = new Schema({
|
|
// Existing fields
|
|
job: { type: Number, ref: 'Job', required: true },
|
|
user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
|
|
status: {
|
|
type: Number,
|
|
enum: {
|
|
values: Object.values(AssignStatus),
|
|
default: AssignStatus.NEW
|
|
}
|
|
},
|
|
date: { type: Date, required: false, default: Date.now },
|
|
|
|
// New partner integration fields
|
|
partnerType: {
|
|
type: String,
|
|
enum: ['internal', 'satloc', 'dji', 'parrot', 'other'],
|
|
default: 'internal',
|
|
index: true
|
|
},
|
|
|
|
externalJobId: {
|
|
type: String,
|
|
sparse: true,
|
|
index: true
|
|
},
|
|
|
|
partnerMetadata: {
|
|
type: Schema.Types.Mixed,
|
|
default: null
|
|
},
|
|
|
|
// Sync state tracking
|
|
syncState: {
|
|
jobUpload: {
|
|
type: syncStateSchema,
|
|
default: function() {
|
|
return {
|
|
status: this.partnerType === 'internal' ? 'synced' : 'pending',
|
|
attempts: 0
|
|
};
|
|
}
|
|
},
|
|
dataPolling: {
|
|
type: syncStateSchema,
|
|
default: () => ({
|
|
status: 'idle',
|
|
attempts: 0
|
|
})
|
|
}
|
|
},
|
|
|
|
// Partner-specific configuration
|
|
partnerConfig: {
|
|
aircraftId: { type: String },
|
|
priority: {
|
|
type: String,
|
|
enum: ['low', 'normal', 'high'],
|
|
default: 'normal'
|
|
},
|
|
timeout: { type: Number, default: 30000 },
|
|
retryPolicy: {
|
|
type: retryPolicySchema,
|
|
default: () => ({})
|
|
}
|
|
},
|
|
|
|
// Performance metrics
|
|
metrics: {
|
|
syncDuration: { type: Number },
|
|
dataSize: { type: Number },
|
|
conversionTime: { type: Number },
|
|
uploadTime: { type: Number },
|
|
downloadTime: { type: Number }
|
|
}
|
|
}, {
|
|
timestamps: true,
|
|
toJSON: { virtuals: true },
|
|
toObject: { virtuals: true }
|
|
});
|
|
|
|
// Indexes
|
|
schema.index({ user: 1, status: 1 });
|
|
schema.index({ job: 1, partnerType: 1 });
|
|
schema.index({ partnerType: 1, 'syncState.jobUpload.status': 1 });
|
|
schema.index({ partnerType: 1, 'syncState.dataPolling.status': 1 });
|
|
schema.index({ externalJobId: 1, partnerType: 1 }, { unique: true, sparse: true });
|
|
|
|
// Virtual fields
|
|
schema.virtual('isPartnerAssignment').get(function() {
|
|
return this.partnerType !== 'internal';
|
|
});
|
|
|
|
schema.virtual('needsSync').get(function() {
|
|
return this.isPartnerAssignment &&
|
|
this.syncState.jobUpload.status === 'pending';
|
|
});
|
|
|
|
schema.virtual('needsPolling').get(function() {
|
|
return this.isPartnerAssignment &&
|
|
this.syncState.jobUpload.status === 'synced' &&
|
|
this.syncState.dataPolling.status === 'idle' &&
|
|
this.status < 2;
|
|
});
|
|
|
|
// Methods
|
|
schema.methods.updateSyncState = function(operation, update) {
|
|
if (!this.syncState[operation]) {
|
|
this.syncState[operation] = {};
|
|
}
|
|
Object.assign(this.syncState[operation], update);
|
|
this.markModified(`syncState.${operation}`);
|
|
};
|
|
|
|
schema.methods.incrementRetryCount = function(operation) {
|
|
this.syncState[operation].attempts = (this.syncState[operation].attempts || 0) + 1;
|
|
this.syncState[operation].lastAttempt = new Date();
|
|
this.markModified(`syncState.${operation}`);
|
|
};
|
|
|
|
schema.methods.calculateNextRetry = function(operation) {
|
|
const policy = this.partnerConfig.retryPolicy;
|
|
const attempts = this.syncState[operation].attempts || 0;
|
|
|
|
let delay = policy.baseDelay * Math.pow(policy.backoffMultiplier, attempts);
|
|
delay = Math.min(delay, policy.maxDelay);
|
|
|
|
if (policy.jitter) {
|
|
delay = delay * (0.5 + Math.random() * 0.5);
|
|
}
|
|
|
|
return new Date(Date.now() + delay);
|
|
};
|
|
|
|
module.exports = mongoose.model('JobAssign', schema);
|
|
```
|
|
|
|
### Phase 2: Queue System Implementation (Weeks 3-4)
|
|
|
|
#### 2.1 Partner Sync Queue System
|
|
|
|
```javascript
|
|
// File: services/queues/PartnerSyncQueue.js
|
|
const Bull = require('bull');
|
|
const Redis = require('ioredis');
|
|
const JobAssign = require('../../model/job_assign');
|
|
const debug = require('debug')('agm:partner-sync-queue');
|
|
|
|
class PartnerSyncQueue {
|
|
constructor(partnerRegistry, redisConfig) {
|
|
this.partnerRegistry = partnerRegistry;
|
|
this.redis = new Redis(redisConfig);
|
|
|
|
// Create separate queues for different operations
|
|
this.jobSyncQueue = new Bull('partner-job-sync', {
|
|
redis: redisConfig,
|
|
defaultJobOptions: {
|
|
removeOnComplete: 50,
|
|
removeOnFail: 100,
|
|
attempts: 1 // We handle retries manually
|
|
}
|
|
});
|
|
|
|
this.dataPollQueue = new Bull('partner-data-poll', {
|
|
redis: redisConfig,
|
|
defaultJobOptions: {
|
|
removeOnComplete: 20,
|
|
removeOnFail: 50,
|
|
attempts: 1
|
|
}
|
|
});
|
|
|
|
this.setupProcessors();
|
|
this.setupEventHandlers();
|
|
}
|
|
|
|
setupProcessors() {
|
|
// Job sync processor
|
|
this.jobSyncQueue.process('sync-job', 5, async (job) => {
|
|
return await this.processSyncJob(job.data);
|
|
});
|
|
|
|
// Data polling processor
|
|
this.dataPollQueue.process('poll-data', 10, async (job) => {
|
|
return await this.processDataPoll(job.data);
|
|
});
|
|
}
|
|
|
|
setupEventHandlers() {
|
|
this.jobSyncQueue.on('completed', (job, result) => {
|
|
debug(`Job sync completed: ${job.id}`, result);
|
|
});
|
|
|
|
this.jobSyncQueue.on('failed', (job, err) => {
|
|
debug(`Job sync failed: ${job.id}`, err.message);
|
|
});
|
|
|
|
this.dataPollQueue.on('completed', (job, result) => {
|
|
debug(`Data poll completed: ${job.id}`, result);
|
|
});
|
|
|
|
this.dataPollQueue.on('failed', (job, err) => {
|
|
debug(`Data poll failed: ${job.id}`, err.message);
|
|
});
|
|
}
|
|
|
|
async queueJobSync(assignmentId, options = {}) {
|
|
const delay = options.delay || 0;
|
|
const priority = this.getPriority(options.priority || 'normal');
|
|
|
|
const jobOptions = {
|
|
delay,
|
|
priority,
|
|
jobId: `sync-${assignmentId}-${Date.now()}`,
|
|
...options.jobOptions
|
|
};
|
|
|
|
const job = await this.jobSyncQueue.add('sync-job',
|
|
{ assignmentId, options },
|
|
jobOptions
|
|
);
|
|
|
|
debug(`Queued job sync for assignment ${assignmentId}, job ID: ${job.id}`);
|
|
return job;
|
|
}
|
|
|
|
async queueDataPoll(assignmentId, options = {}) {
|
|
const delay = options.delay || 0;
|
|
const priority = this.getPriority(options.priority || 'normal');
|
|
|
|
const jobOptions = {
|
|
delay,
|
|
priority,
|
|
jobId: `poll-${assignmentId}-${Date.now()}`,
|
|
...options.jobOptions
|
|
};
|
|
|
|
const job = await this.dataPollQueue.add('poll-data',
|
|
{ assignmentId, options },
|
|
jobOptions
|
|
);
|
|
|
|
debug(`Queued data poll for assignment ${assignmentId}, job ID: ${job.id}`);
|
|
return job;
|
|
}
|
|
|
|
async processSyncJob(data) {
|
|
const { assignmentId, options } = data;
|
|
const startTime = Date.now();
|
|
|
|
try {
|
|
const assignment = await JobAssign.findById(assignmentId)
|
|
.populate('job')
|
|
.populate('user');
|
|
|
|
if (!assignment) {
|
|
throw new Error(`Assignment not found: ${assignmentId}`);
|
|
}
|
|
|
|
// Update sync state to syncing
|
|
assignment.updateSyncState('jobUpload', {
|
|
status: 'syncing',
|
|
lastAttempt: new Date()
|
|
});
|
|
assignment.incrementRetryCount('jobUpload');
|
|
await assignment.save();
|
|
|
|
// Get partner service
|
|
const partnerService = this.partnerRegistry.get(assignment.partnerType);
|
|
if (!partnerService) {
|
|
throw new Error(`Partner service not found: ${assignment.partnerType}`);
|
|
}
|
|
|
|
// Upload job to partner
|
|
const result = await partnerService.uploadJobDefinition(assignment.job);
|
|
|
|
// Update assignment with success
|
|
assignment.externalJobId = result.externalJobId;
|
|
assignment.partnerMetadata = result.metadata;
|
|
assignment.updateSyncState('jobUpload', {
|
|
status: 'synced',
|
|
lastSuccess: new Date(),
|
|
error: null,
|
|
errorCode: null
|
|
});
|
|
assignment.metrics = assignment.metrics || {};
|
|
assignment.metrics.syncDuration = Date.now() - startTime;
|
|
assignment.metrics.uploadTime = Date.now() - startTime;
|
|
|
|
await assignment.save();
|
|
|
|
// Schedule data polling
|
|
await this.queueDataPoll(assignmentId, {
|
|
delay: 30000, // Poll after 30 seconds
|
|
priority: assignment.partnerConfig.priority
|
|
});
|
|
|
|
return {
|
|
success: true,
|
|
externalJobId: result.externalJobId,
|
|
syncDuration: Date.now() - startTime
|
|
};
|
|
|
|
} catch (error) {
|
|
// Handle sync failure
|
|
await this.handleSyncFailure(assignmentId, 'jobUpload', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async processDataPoll(data) {
|
|
const { assignmentId, options } = data;
|
|
const startTime = Date.now();
|
|
|
|
try {
|
|
const assignment = await JobAssign.findById(assignmentId)
|
|
.populate('job')
|
|
.populate('user');
|
|
|
|
if (!assignment) {
|
|
throw new Error(`Assignment not found: ${assignmentId}`);
|
|
}
|
|
|
|
if (!assignment.externalJobId) {
|
|
throw new Error(`No external job ID for assignment: ${assignmentId}`);
|
|
}
|
|
|
|
// Update polling state
|
|
assignment.updateSyncState('dataPolling', {
|
|
status: 'polling',
|
|
lastAttempt: new Date()
|
|
});
|
|
assignment.incrementRetryCount('dataPolling');
|
|
await assignment.save();
|
|
|
|
// Get partner service
|
|
const partnerService = this.partnerRegistry.get(assignment.partnerType);
|
|
if (!partnerService) {
|
|
throw new Error(`Partner service not found: ${assignment.partnerType}`);
|
|
}
|
|
|
|
// Check for available data
|
|
const dataFiles = await partnerService.downloadFlightData(
|
|
assignment.partnerConfig.aircraftId || assignment.user._id.toString(),
|
|
assignment.externalJobId
|
|
);
|
|
|
|
if (dataFiles && dataFiles.length > 0) {
|
|
// Process data files
|
|
await this.processDataFiles(assignment, dataFiles);
|
|
|
|
// Update polling state to synced
|
|
assignment.updateSyncState('dataPolling', {
|
|
status: 'synced',
|
|
lastSuccess: new Date(),
|
|
error: null,
|
|
errorCode: null
|
|
});
|
|
assignment.metrics = assignment.metrics || {};
|
|
assignment.metrics.downloadTime = Date.now() - startTime;
|
|
assignment.metrics.dataSize = dataFiles.reduce((sum, file) => sum + file.data.length, 0);
|
|
|
|
await assignment.save();
|
|
|
|
return {
|
|
success: true,
|
|
filesFound: dataFiles.length,
|
|
dataSize: assignment.metrics.dataSize
|
|
};
|
|
|
|
} else {
|
|
// No data available yet, schedule next poll
|
|
assignment.updateSyncState('dataPolling', {
|
|
status: 'idle',
|
|
lastDataCheck: new Date()
|
|
});
|
|
|
|
await assignment.save();
|
|
|
|
// Schedule next poll if job is still active
|
|
if (assignment.status < 2) {
|
|
const pollInterval = assignment.partnerConfig.pollInterval || 60000;
|
|
await this.queueDataPoll(assignmentId, {
|
|
delay: pollInterval,
|
|
priority: assignment.partnerConfig.priority
|
|
});
|
|
}
|
|
|
|
return { success: true, filesFound: 0 };
|
|
}
|
|
|
|
} catch (error) {
|
|
// Handle polling failure
|
|
await this.handleSyncFailure(assignmentId, 'dataPolling', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async processDataFiles(assignment, dataFiles) {
|
|
const { JobQueuer } = require('../../helpers/job_queue');
|
|
const { App, AppFile } = require('../../model');
|
|
const jobQueuer = JobQueuer.getInstance();
|
|
|
|
for (const dataFile of dataFiles) {
|
|
try {
|
|
// Create Application record
|
|
const app = new App({
|
|
jobId: assignment.job._id,
|
|
fileName: dataFile.filename,
|
|
fileSize: dataFile.data.length,
|
|
status: 1, // Processing
|
|
partnerType: assignment.partnerType,
|
|
externalJobId: assignment.externalJobId,
|
|
assignmentId: assignment._id,
|
|
originalData: {
|
|
format: dataFile.format,
|
|
encoding: 'binary',
|
|
checksum: this.calculateChecksum(dataFile.data)
|
|
},
|
|
partnerMetadata: dataFile.metadata,
|
|
processingStage: 'uploaded'
|
|
});
|
|
|
|
await app.save();
|
|
|
|
// Create AppFile record
|
|
const appFile = new AppFile({
|
|
appId: app._id,
|
|
name: dataFile.filename,
|
|
data: dataFile.data,
|
|
partnerType: assignment.partnerType,
|
|
originalFormat: dataFile.format,
|
|
meta: dataFile.metadata
|
|
});
|
|
|
|
await appFile.save();
|
|
|
|
// Queue for processing
|
|
const processingMessage = {
|
|
appId: app._id,
|
|
fileId: appFile._id,
|
|
jobId: assignment.job._id,
|
|
partnerType: assignment.partnerType,
|
|
externalJobId: assignment.externalJobId,
|
|
assignmentId: assignment._id,
|
|
timestamp: new Date()
|
|
};
|
|
|
|
// Use partner-specific queue
|
|
const queueName = `${assignment.partnerType}_jobs`;
|
|
await jobQueuer.publish('', queueName, Buffer.from(JSON.stringify(processingMessage)));
|
|
|
|
debug(`Queued processing for file ${dataFile.filename} from ${assignment.partnerType}`);
|
|
|
|
} catch (error) {
|
|
debug(`Error processing data file ${dataFile.filename}:`, error.message);
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
async handleSyncFailure(assignmentId, operation, error) {
|
|
try {
|
|
const assignment = await JobAssign.findById(assignmentId);
|
|
if (!assignment) return;
|
|
|
|
const attempts = assignment.syncState[operation].attempts || 0;
|
|
const maxAttempts = assignment.partnerConfig.retryPolicy.maxAttempts || 5;
|
|
|
|
assignment.updateSyncState(operation, {
|
|
status: 'failed',
|
|
error: error.message,
|
|
errorCode: this.categorizeError(error)
|
|
});
|
|
|
|
if (attempts < maxAttempts) {
|
|
// Schedule retry
|
|
const nextRetry = assignment.calculateNextRetry(operation);
|
|
assignment.syncState[operation].nextRetry = nextRetry;
|
|
|
|
await assignment.save();
|
|
|
|
// Queue retry
|
|
if (operation === 'jobUpload') {
|
|
await this.queueJobSync(assignmentId, {
|
|
delay: nextRetry.getTime() - Date.now(),
|
|
priority: 'high' // Increase priority for retries
|
|
});
|
|
} else if (operation === 'dataPolling') {
|
|
await this.queueDataPoll(assignmentId, {
|
|
delay: nextRetry.getTime() - Date.now(),
|
|
priority: 'high'
|
|
});
|
|
}
|
|
|
|
debug(`Scheduled retry ${attempts + 1}/${maxAttempts} for ${operation} at ${nextRetry}`);
|
|
} else {
|
|
// Max retries reached
|
|
await assignment.save();
|
|
debug(`Max retries reached for ${operation} on assignment ${assignmentId}`);
|
|
|
|
// Emit event for monitoring
|
|
this.emit('maxRetriesReached', {
|
|
assignmentId,
|
|
operation,
|
|
error: error.message,
|
|
attempts
|
|
});
|
|
}
|
|
|
|
} catch (saveError) {
|
|
debug(`Error handling sync failure:`, saveError.message);
|
|
}
|
|
}
|
|
|
|
categorizeError(error) {
|
|
const message = error.message.toLowerCase();
|
|
|
|
if (message.includes('timeout') || message.includes('network')) {
|
|
return 'NETWORK_ERROR';
|
|
} else if (message.includes('authentication') || message.includes('unauthorized')) {
|
|
return 'AUTH_ERROR';
|
|
} else if (message.includes('rate limit')) {
|
|
return 'RATE_LIMIT';
|
|
} else if (message.includes('not found')) {
|
|
return 'NOT_FOUND';
|
|
} else {
|
|
return 'UNKNOWN_ERROR';
|
|
}
|
|
}
|
|
|
|
calculateChecksum(data) {
|
|
const crypto = require('crypto');
|
|
return crypto.createHash('sha256').update(data).digest('hex');
|
|
}
|
|
|
|
getPriority(priority) {
|
|
const priorities = {
|
|
low: 1,
|
|
normal: 5,
|
|
high: 10
|
|
};
|
|
return priorities[priority] || priorities.normal;
|
|
}
|
|
|
|
// Clean up and monitoring methods
|
|
async getQueueStats() {
|
|
const [syncActive, syncWaiting, syncCompleted, syncFailed] = await Promise.all([
|
|
this.jobSyncQueue.getActive(),
|
|
this.jobSyncQueue.getWaiting(),
|
|
this.jobSyncQueue.getCompleted(),
|
|
this.jobSyncQueue.getFailed()
|
|
]);
|
|
|
|
const [pollActive, pollWaiting, pollCompleted, pollFailed] = await Promise.all([
|
|
this.dataPollQueue.getActive(),
|
|
this.dataPollQueue.getWaiting(),
|
|
this.dataPollQueue.getCompleted(),
|
|
this.dataPollQueue.getFailed()
|
|
]);
|
|
|
|
return {
|
|
jobSync: {
|
|
active: syncActive.length,
|
|
waiting: syncWaiting.length,
|
|
completed: syncCompleted.length,
|
|
failed: syncFailed.length
|
|
},
|
|
dataPoll: {
|
|
active: pollActive.length,
|
|
waiting: pollWaiting.length,
|
|
completed: pollCompleted.length,
|
|
failed: pollFailed.length
|
|
}
|
|
};
|
|
}
|
|
|
|
async shutdown() {
|
|
debug('Shutting down partner sync queue...');
|
|
await Promise.all([
|
|
this.jobSyncQueue.close(),
|
|
this.dataPollQueue.close()
|
|
]);
|
|
await this.redis.disconnect();
|
|
debug('Partner sync queue shutdown complete');
|
|
}
|
|
}
|
|
|
|
// Make it an event emitter for monitoring
|
|
const EventEmitter = require('events');
|
|
Object.setPrototypeOf(PartnerSyncQueue.prototype, EventEmitter.prototype);
|
|
|
|
module.exports = PartnerSyncQueue;
|
|
```
|
|
|
|
### Phase 3: Enhanced Job Controller (Week 4)
|
|
|
|
#### 3.1 Update Job Assignment Controller
|
|
|
|
Update the existing `assign_post` function to support partner integration:
|
|
|
|
```javascript
|
|
// File: controllers/job.js (Enhanced assign_post function)
|
|
async function assign_post(req, res) {
|
|
const _params = req.body;
|
|
if (!_params || !_params.jobId || !_params.dlOp || !_params.asUsers) {
|
|
AppParamError.throw();
|
|
}
|
|
|
|
const job = await Job.findById(_params.jobId).select('dlOp');
|
|
if (!job) AppError.throw(Errors.JOB_NOT_FOUND);
|
|
|
|
// Update job download options
|
|
if (_params.dlOp.type !== job.dlOp.type) {
|
|
await Job.updateOne({ _id: _params.jobId }, {
|
|
$set: { "dlOp.type": _params.dlOp.type }
|
|
});
|
|
}
|
|
|
|
// Handle removal of assignments
|
|
let _avIds = [];
|
|
if (!utils.isEmptyArray(_params.avUsers)) {
|
|
for (const it of _params.avUsers) {
|
|
if (ObjectId.isValid(it.uid)) _avIds.push(ObjectId(it.uid));
|
|
}
|
|
}
|
|
|
|
if (_avIds.length) {
|
|
await JobAssign.deleteMany({
|
|
$or: [
|
|
{ job: _params.jobId, status: 0 },
|
|
{ job: _params.jobId, user: { $in: _avIds } }
|
|
]
|
|
});
|
|
} else {
|
|
await JobAssign.deleteMany({ job: _params.jobId, status: 0 });
|
|
}
|
|
|
|
// Handle new assignments
|
|
const assignmentResults = [];
|
|
const errors = [];
|
|
|
|
if (!utils.isEmptyArray(_params.asUsers)) {
|
|
const asUIds = [];
|
|
for (const it of _params.asUsers) {
|
|
if (ObjectId.isValid(it.uid)) asUIds.push(ObjectId(it.uid));
|
|
}
|
|
|
|
const doneJUs = await JobAssign.find({
|
|
job: _params.jobId,
|
|
user: { $in: asUIds },
|
|
status: { $gt: 0 }
|
|
}, 'user').lean();
|
|
|
|
const _doneIds = doneJUs ? doneJUs.map(it => it.user) : [];
|
|
|
|
const newItems = [];
|
|
for (const it of _params.asUsers) {
|
|
if (!utils.objectIdIn(_doneIds, it.uid)) {
|
|
const partnerType = it.partnerType || 'internal';
|
|
|
|
// Validate partner type
|
|
if (partnerType !== 'internal') {
|
|
const partnerRegistry = req.app.locals.partnerRegistry;
|
|
if (!partnerRegistry.isPartnerSupported(partnerType)) {
|
|
errors.push({
|
|
userId: it.uid,
|
|
error: `Unsupported partner type: ${partnerType}`
|
|
});
|
|
continue;
|
|
}
|
|
}
|
|
|
|
const assignmentData = {
|
|
user: it.uid,
|
|
job: _params.jobId,
|
|
status: 0,
|
|
partnerType,
|
|
partnerConfig: {
|
|
aircraftId: it.partnerConfig?.aircraftId,
|
|
priority: it.partnerConfig?.priority || 'normal',
|
|
timeout: it.partnerConfig?.timeout || 30000,
|
|
retryPolicy: {
|
|
maxAttempts: 5,
|
|
baseDelay: 5000,
|
|
maxDelay: 300000,
|
|
backoffMultiplier: 2,
|
|
jitter: true
|
|
}
|
|
},
|
|
syncState: {
|
|
jobUpload: {
|
|
status: partnerType === 'internal' ? 'synced' : 'pending',
|
|
attempts: 0
|
|
},
|
|
dataPolling: {
|
|
status: 'idle',
|
|
attempts: 0
|
|
}
|
|
}
|
|
};
|
|
|
|
// Add partner-specific metadata
|
|
if (it.partnerConfig) {
|
|
assignmentData.partnerMetadata = it.partnerConfig.customFields || {};
|
|
}
|
|
|
|
newItems.push(assignmentData);
|
|
}
|
|
}
|
|
|
|
if (newItems.length) {
|
|
const insertedAssignments = await JobAssign.insertMany(newItems);
|
|
|
|
// Queue partner sync tasks for non-internal assignments
|
|
const partnerSyncQueue = req.app.locals.partnerSyncQueue;
|
|
|
|
for (const assignment of insertedAssignments) {
|
|
if (assignment.partnerType !== 'internal') {
|
|
try {
|
|
const syncJob = await partnerSyncQueue.queueJobSync(assignment._id, {
|
|
priority: assignment.partnerConfig.priority,
|
|
delay: 1000 // Small delay to ensure database consistency
|
|
});
|
|
|
|
assignmentResults.push({
|
|
assignmentId: assignment._id,
|
|
userId: assignment.user,
|
|
partnerType: assignment.partnerType,
|
|
status: 'assigned',
|
|
syncStatus: 'queued',
|
|
syncJobId: syncJob.id
|
|
});
|
|
} catch (syncError) {
|
|
errors.push({
|
|
userId: assignment.user,
|
|
partnerType: assignment.partnerType,
|
|
error: `Failed to queue sync: ${syncError.message}`
|
|
});
|
|
}
|
|
} else {
|
|
assignmentResults.push({
|
|
assignmentId: assignment._id,
|
|
userId: assignment.user,
|
|
partnerType: assignment.partnerType,
|
|
status: 'assigned',
|
|
syncStatus: 'synced'
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Return enhanced response
|
|
res.json({
|
|
ok: true,
|
|
assignments: assignmentResults,
|
|
errors,
|
|
summary: {
|
|
totalAssignments: assignmentResults.length,
|
|
successfulAssignments: assignmentResults.length,
|
|
failedAssignments: errors.length,
|
|
partnerAssignments: assignmentResults.filter(a => a.partnerType !== 'internal').length,
|
|
internalAssignments: assignmentResults.filter(a => a.partnerType === 'internal').length
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
|
|
```javascript
|
|
// tests/unit/services/partners/SatlocService.test.js
|
|
const SatlocService = require('../../../../services/partners/SatlocService');
|
|
const nock = require('nock');
|
|
|
|
describe('SatlocService', () => {
|
|
let satlocService;
|
|
let mockConfig;
|
|
|
|
beforeEach(() => {
|
|
mockConfig = {
|
|
code: 'satloc',
|
|
apiConfig: {
|
|
baseUrl: 'https://api.satloc.test',
|
|
timeout: { request: 5000 },
|
|
authentication: {
|
|
credentials: { token: 'test-token' }
|
|
}
|
|
}
|
|
};
|
|
satlocService = new SatlocService(mockConfig);
|
|
});
|
|
|
|
describe('uploadJobDefinition', () => {
|
|
it('should upload job successfully', async () => {
|
|
const mockJob = {
|
|
_id: 123,
|
|
name: 'Test Job',
|
|
sprayAreas: [],
|
|
swathWidth: 10,
|
|
measureUnit: true
|
|
};
|
|
|
|
nock('https://api.satloc.test')
|
|
.post('/jobs')
|
|
.reply(200, {
|
|
jobId: 'satloc_123',
|
|
version: '1.0'
|
|
});
|
|
|
|
const result = await satlocService.uploadJobDefinition(mockJob);
|
|
|
|
expect(result.externalJobId).toBe('satloc_123');
|
|
expect(result.metadata.satlocJobId).toBe('satloc_123');
|
|
});
|
|
|
|
it('should handle upload failure', async () => {
|
|
const mockJob = { _id: 123, name: 'Test Job' };
|
|
|
|
nock('https://api.satloc.test')
|
|
.post('/jobs')
|
|
.reply(500, { message: 'Internal Server Error' });
|
|
|
|
await expect(satlocService.uploadJobDefinition(mockJob))
|
|
.rejects.toThrow('Satloc API Error: 500');
|
|
});
|
|
});
|
|
});
|
|
```
|
|
|
|
### Integration Tests
|
|
|
|
```javascript
|
|
// tests/integration/partner-assignment.test.js
|
|
const request = require('supertest');
|
|
const app = require('../../server');
|
|
const JobAssign = require('../../model/job_assign');
|
|
|
|
describe('Partner Assignment Integration', () => {
|
|
beforeEach(async () => {
|
|
await JobAssign.deleteMany({});
|
|
});
|
|
|
|
it('should assign job to partner successfully', async () => {
|
|
const assignmentData = {
|
|
jobId: 123,
|
|
dlOp: { type: 1 },
|
|
asUsers: [{
|
|
uid: 'user_123',
|
|
partnerType: 'satloc',
|
|
partnerConfig: {
|
|
aircraftId: 'AC001',
|
|
priority: 'high'
|
|
}
|
|
}]
|
|
};
|
|
|
|
const response = await request(app)
|
|
.post('/api/jobs/123/assign')
|
|
.send(assignmentData)
|
|
.expect(200);
|
|
|
|
expect(response.body.ok).toBe(true);
|
|
expect(response.body.assignments).toHaveLength(1);
|
|
expect(response.body.assignments[0].partnerType).toBe('satloc');
|
|
|
|
// Verify database record
|
|
const assignment = await JobAssign.findById(response.body.assignments[0].assignmentId);
|
|
expect(assignment.partnerType).toBe('satloc');
|
|
expect(assignment.partnerConfig.aircraftId).toBe('AC001');
|
|
});
|
|
});
|
|
```
|
|
|
|
## Deployment Guide
|
|
|
|
### Environment Setup
|
|
|
|
```bash
|
|
# 1. Install dependencies
|
|
npm install bull ioredis form-data
|
|
|
|
# 2. Set environment variables (Customer-specific credentials stored in PartnerSystemUser records)
|
|
export REDIS_URL=redis://localhost:6379
|
|
export SATLOC_BASE_URL=https://www.satloccloud.com/api/Satloc
|
|
export SATLOC_TIMEOUT=30000
|
|
|
|
# 3. Run database migration
|
|
node scripts/migrate-job-assignments.js
|
|
|
|
# 4. Start queue workers
|
|
node workers/partner-sync-worker.js &
|
|
```
|
|
|
|
### Production Checklist
|
|
|
|
- [ ] Redis cluster configured for queue persistence
|
|
- [ ] Partner API credentials securely stored
|
|
- [ ] Monitoring and alerting set up
|
|
- [ ] Queue worker processes configured with PM2
|
|
- [ ] Database indexes created
|
|
- [ ] Backup and recovery procedures tested
|
|
- [ ] Load testing completed
|
|
- [ ] Documentation updated
|
|
|
|
This implementation guide provides a comprehensive roadmap for integrating the multi-partner system while maintaining backward compatibility and ensuring robust operation.
|