background worker, deploy script
This commit is contained in:
parent
c065835d7a
commit
7b45659a50
6 changed files with 2005 additions and 1 deletions
1788
backend/package-lock.json
generated
Normal file
1788
backend/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -10,6 +10,7 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-s3": "^3.802.0",
|
||||
"@aws-sdk/client-sqs": "^3.799.0",
|
||||
"axios": "^1.9.0",
|
||||
"cors": "^2.8.5",
|
||||
"csv-parser": "^3.2.0",
|
||||
|
|
|
|||
54
backend/src/services/queueService.js
Normal file
54
backend/src/services/queueService.js
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
// services/queueService.js
|
||||
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
|
||||
const config = require('../config');
|
||||
|
||||
class QueueService {
|
||||
constructor() {
|
||||
this.enabled = config.aws.sqs.enabled;
|
||||
this.sqsClient = null;
|
||||
|
||||
if (this.enabled) {
|
||||
this.sqsClient = new SQSClient({ region: config.aws.region });
|
||||
}
|
||||
}
|
||||
|
||||
async sendMessage(queueUrl, messageBody) {
|
||||
if (!this.enabled) {
|
||||
// In self-hosted mode, execute immediately
|
||||
console.log('Direct execution (no queue):', messageBody);
|
||||
await this._processMessageDirectly(messageBody);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const command = new SendMessageCommand({
|
||||
QueueUrl: queueUrl,
|
||||
MessageBody: JSON.stringify(messageBody)
|
||||
});
|
||||
|
||||
await this.sqsClient.send(command);
|
||||
} catch (error) {
|
||||
console.error('Queue send error:', error);
|
||||
// Fallback to direct processing
|
||||
await this._processMessageDirectly(messageBody);
|
||||
}
|
||||
}
|
||||
|
||||
async _processMessageDirectly(messageBody) {
|
||||
// Direct processing for self-hosted mode
|
||||
const emailService = require('./emailService');
|
||||
|
||||
switch(messageBody.type) {
|
||||
case 'LOW_STOCK_ALERT':
|
||||
await emailService.sendLowStockAlert(messageBody);
|
||||
break;
|
||||
case 'ORDER_CONFIRMATION':
|
||||
await emailService.sendOrderConfirmation(messageBody);
|
||||
break;
|
||||
// Add other message types
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const queueService = new QueueService();
|
||||
module.exports = queueService;
|
||||
93
backend/src/worker.js
Normal file
93
backend/src/worker.js
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
const { pool, query } = require('./db');
|
||||
const config = require('./config');
|
||||
const notificationService = require('./services/notificationService');
|
||||
const queueService = require('./services/queueService');
|
||||
const { Consumer } = require('sqs-consumer');
|
||||
const { SQSClient } = require('@aws-sdk/client-sqs');
|
||||
|
||||
console.log('Starting worker process...');
|
||||
console.log(`Environment: ${process.env.ENVIRONMENT || 'beta'}`);
|
||||
console.log(`Deployment mode: ${process.env.DEPLOYMENT_MODE || 'self-hosted'}`);
|
||||
|
||||
// Worker initialization
|
||||
async function initWorker() {
|
||||
try {
|
||||
await pool.connect();
|
||||
console.log('Worker connected to database');
|
||||
|
||||
// Set up processing intervals for database-based notifications
|
||||
const interval = process.env.ENVIRONMENT === 'prod' ? 10 * 60 * 1000 : 2 * 60 * 1000;
|
||||
|
||||
setInterval(async () => {
|
||||
try {
|
||||
console.log('Processing low stock notifications...');
|
||||
const processedCount = await notificationService.processLowStockNotifications(pool, query);
|
||||
console.log(`Processed ${processedCount} low stock notifications`);
|
||||
} catch (error) {
|
||||
console.error('Error processing low stock notifications:', error);
|
||||
}
|
||||
}, interval);
|
||||
|
||||
// For cloud mode, add SQS message consumption here
|
||||
if (config.aws && config.aws.sqs && config.aws.sqs.enabled && config.aws.sqs.queueUrl) {
|
||||
console.log(`Starting SQS consumer for queue: ${config.aws.sqs.queueUrl}`);
|
||||
|
||||
// Create SQS consumer
|
||||
const consumer = Consumer.create({
|
||||
queueUrl: config.aws.sqs.queueUrl,
|
||||
handleMessage: async (message) => {
|
||||
try {
|
||||
console.log('Processing SQS message:', message.MessageId);
|
||||
const messageBody = JSON.parse(message.Body);
|
||||
|
||||
// Use the direct processing method from queueService
|
||||
await queueService._processMessageDirectly(messageBody);
|
||||
|
||||
console.log('Successfully processed message:', message.MessageId);
|
||||
} catch (error) {
|
||||
console.error('Error processing message:', message.MessageId, error);
|
||||
throw error; // Rethrow to handle message as failed
|
||||
}
|
||||
},
|
||||
sqs: new SQSClient({ region: config.aws.region }),
|
||||
batchSize: 10,
|
||||
visibilityTimeout: 60,
|
||||
waitTimeSeconds: 20
|
||||
});
|
||||
|
||||
consumer.on('error', (err) => {
|
||||
console.error('SQS consumer error:', err.message);
|
||||
});
|
||||
|
||||
consumer.on('processing_error', (err) => {
|
||||
console.error('SQS message processing error:', err.message);
|
||||
});
|
||||
|
||||
consumer.start();
|
||||
console.log('SQS consumer started');
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('Worker initialization error:', error);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Start the worker
|
||||
initWorker().catch(err => {
|
||||
console.error('Unhandled worker error:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on('SIGTERM', async () => {
|
||||
console.log('Worker received SIGTERM, shutting down gracefully');
|
||||
await pool.end();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('Worker received SIGINT, shutting down gracefully');
|
||||
await pool.end();
|
||||
process.exit(0);
|
||||
});
|
||||
|
|
@ -24,7 +24,7 @@ services:
|
|||
env_file:
|
||||
- ./backend/.env
|
||||
ports:
|
||||
- "4000:4000"
|
||||
- "${PORT:-4000}:4000"
|
||||
volumes:
|
||||
- ./backend:/app
|
||||
- /app/node_modules
|
||||
|
|
@ -32,6 +32,9 @@ services:
|
|||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
redis:
|
||||
condition: service_started
|
||||
required: false
|
||||
networks:
|
||||
- app-network
|
||||
|
||||
|
|
@ -55,8 +58,50 @@ services:
|
|||
networks:
|
||||
- app-network
|
||||
|
||||
# Redis service - only active in cloud mode
|
||||
redis:
|
||||
image: redis:alpine
|
||||
command: ${REDIS_PASSWORD:+--requirepass ${REDIS_PASSWORD}}
|
||||
volumes:
|
||||
- redis_data:/data
|
||||
networks:
|
||||
- app-network
|
||||
profiles:
|
||||
- ${DEPLOYMENT_MODE:-self-hosted} == cloud ? default : tools
|
||||
healthcheck:
|
||||
test: ["CMD", "redis-cli", ${REDIS_PASSWORD:+"-a", "${REDIS_PASSWORD}"}, "ping"]
|
||||
interval: 5s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
# Background worker for SQS and job processing - only active in cloud mode
|
||||
worker:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
command: node src/worker.js
|
||||
env_file:
|
||||
- ./backend/.env
|
||||
environment:
|
||||
- WORKER_MODE=true
|
||||
volumes:
|
||||
- ./backend:/app
|
||||
- /app/node_modules
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
redis:
|
||||
condition: service_started
|
||||
required: false
|
||||
networks:
|
||||
- app-network
|
||||
profiles:
|
||||
- cloud
|
||||
restart: always
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
redis_data:
|
||||
|
||||
networks:
|
||||
app-network:
|
||||
|
|
|
|||
23
start.sh
Executable file
23
start.sh
Executable file
|
|
@ -0,0 +1,23 @@
|
|||
#!/bin/bash
|
||||
# start.sh
|
||||
|
||||
MODE=${1:-self-hosted}
|
||||
|
||||
if [ "$MODE" == "cloud" ]; then
|
||||
echo "Starting in CLOUD mode"
|
||||
# Make sure .env has cloud settings
|
||||
grep -q "DEPLOYMENT_MODE=cloud" ./backend/.env || \
|
||||
sed -i 's/DEPLOYMENT_MODE=.*/DEPLOYMENT_MODE=cloud/' ./backend/.env
|
||||
|
||||
# Start with cloud profile
|
||||
docker compose --profile cloud up -d
|
||||
else
|
||||
echo "Starting in SELF-HOSTED mode"
|
||||
# Make sure .env has self-hosted settings
|
||||
grep -q "DEPLOYMENT_MODE=self-hosted" ./backend/.env || \
|
||||
sed -i 's/DEPLOYMENT_MODE=.*/DEPLOYMENT_MODE=self-hosted/' ./backend/.env
|
||||
|
||||
# Start without extra services
|
||||
docker compose up -d
|
||||
fi
|
||||
echo "Deployment complete in $MODE mode"
|
||||
Loading…
Reference in a new issue