const express = require('express'); const { v4: uuidv4 } = require('uuid'); const router = express.Router(); const { createObjectCsvWriter } = require('csv-writer'); const fs = require('fs'); const path = require('path'); const emailService = require('../services/emailService'); module.exports = (pool, query, authMiddleware) => { // Apply authentication middleware to all routes router.use(authMiddleware); /** * Get all email campaigns * GET /api/admin/email-campaigns */ router.get('/', async (req, res, next) => { try { const { status } = req.query; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Prepare query conditions let condition = ''; const params = []; if (status) { condition = 'WHERE status = $1'; params.push(status); } // Get all campaigns with basic stats const campaignsQuery = ` SELECT ec.*, COALESCE(stats.recipient_count, 0) as recipient_count FROM email_campaigns ec LEFT JOIN ( SELECT campaign_id, COUNT(DISTINCT subscriber_id) as recipient_count FROM campaign_recipients GROUP BY campaign_id ) stats ON ec.id = stats.campaign_id ${condition} ORDER BY CASE WHEN ec.status = 'draft' THEN 1 WHEN ec.status = 'scheduled' THEN 2 WHEN ec.status = 'sending' THEN 3 WHEN ec.status = 'sent' THEN 4 ELSE 5 END, ec.created_at DESC `; const result = await query(campaignsQuery, params); res.json(result.rows); } catch (error) { next(error); } }); /** * Get a single email campaign * GET /api/admin/email-campaigns/:id */ router.get('/:id', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Get campaign with list info const campaignQuery = ` SELECT ec.*, ml.name as list_name, COUNT(cr.subscriber_id) as recipient_count FROM email_campaigns ec LEFT JOIN campaign_recipients cr ON ec.id = cr.campaign_id LEFT JOIN mailing_lists ml ON ec.list_ids[1] = ml.id WHERE ec.id = $1 GROUP BY ec.id, ml.name `; const campaignResult = await query(campaignQuery, [id]); if (campaignResult.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } res.json(campaignResult.rows[0]); } catch (error) { next(error); } }); /** * Create a new email campaign * POST /api/admin/email-campaigns */ router.post('/', async (req, res, next) => { try { const { name, subject, preheader, fromName, fromEmail, content, design, listIds, status = 'draft' } = req.body; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Validate required fields if (!name || !subject || !fromName || !fromEmail) { return res.status(400).json({ error: true, message: 'Name, subject, from name, and from email are required' }); } // Create new campaign const campaignId = uuidv4(); const result = await query( `INSERT INTO email_campaigns ( id, name, subject, preheader, from_name, from_email, content, design, list_ids, status, created_by ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *`, [ campaignId, name, subject, preheader, fromName, fromEmail, content, design, listIds, status, req.user.id ] ); res.status(201).json(result.rows[0]); } catch (error) { next(error); } }); /** * Update an email campaign * PUT /api/admin/email-campaigns/:id */ router.put('/:id', async (req, res, next) => { try { const { id } = req.params; const { name, subject, preheader, fromName, fromEmail, content, design, listIds, status } = req.body; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT status FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } // Cannot edit campaigns that are already sent const currentStatus = campaignCheck.rows[0].status; if (currentStatus === 'sent' || currentStatus === 'sending') { return res.status(400).json({ error: true, message: 'Cannot edit a campaign that has already been sent or is currently sending' }); } // Update campaign const result = await query( `UPDATE email_campaigns SET name = $1, subject = $2, preheader = $3, from_name = $4, from_email = $5, content = $6, design = $7, list_ids = $8, status = $9, updated_at = NOW() WHERE id = $10 RETURNING *`, [ name, subject, preheader, fromName, fromEmail, content, design, listIds, status, id ] ); res.json(result.rows[0]); } catch (error) { next(error); } }); /** * Delete an email campaign * DELETE /api/admin/email-campaigns/:id */ router.delete('/:id', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT status FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } // Cannot delete campaigns that have been sent const currentStatus = campaignCheck.rows[0].status; if (currentStatus === 'sent' || currentStatus === 'sending') { return res.status(400).json({ error: true, message: 'Cannot delete a campaign that has already been sent or is currently sending' }); } // Begin transaction const client = await pool.connect(); try { await client.query('BEGIN'); // Delete campaign recipients await client.query( 'DELETE FROM campaign_recipients WHERE campaign_id = $1', [id] ); // Delete campaign await client.query( 'DELETE FROM email_campaigns WHERE id = $1', [id] ); await client.query('COMMIT'); res.json({ success: true, message: 'Campaign deleted successfully' }); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } catch (error) { next(error); } }); /** * Duplicate an email campaign * POST /api/admin/email-campaigns/:id/duplicate */ router.post('/:id/duplicate', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT * FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } const originalCampaign = campaignCheck.rows[0]; // Create new campaign ID const newCampaignId = uuidv4(); // Insert duplicated campaign const result = await query( `INSERT INTO email_campaigns ( id, name, subject, preheader, from_name, from_email, content, design, list_ids, status, created_by ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'draft', $10) RETURNING *`, [ newCampaignId, `Copy of ${originalCampaign.name}`, originalCampaign.subject, originalCampaign.preheader, originalCampaign.from_name, originalCampaign.from_email, originalCampaign.content, originalCampaign.design, originalCampaign.list_ids, req.user.id ] ); res.status(201).json(result.rows[0]); } catch (error) { next(error); } }); /** * Preview/send test email * POST /api/admin/email-campaigns/:id/preview */ router.post('/:id/preview', async (req, res, next) => { try { const { id } = req.params; const { email } = req.body; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } if (!email) { return res.status(400).json({ error: true, message: 'Email address is required' }); } // Get campaign details const campaignQuery = ` SELECT * FROM email_campaigns WHERE id = $1 `; const campaignResult = await query(campaignQuery, [id]); if (campaignResult.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } const campaign = campaignResult.rows[0]; // Send test email with emailService try { await emailService.sendTestEmail({ to: email, subject: `[TEST] ${campaign.subject}`, preheader: campaign.preheader, from: `${campaign.from_name} <${campaign.from_email}>`, content: campaign.content }); res.json({ success: true, message: `Test email sent to ${email}` }); } catch (error) { return res.status(500).json({ error: true, message: `Failed to send test email: ${error.message}` }); } } catch (error) { next(error); } }); /** * Send campaign immediately * POST /api/admin/email-campaigns/:id/send */ router.post('/:id/send', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Get campaign details const campaignQuery = ` SELECT * FROM email_campaigns WHERE id = $1 `; const campaignResult = await query(campaignQuery, [id]); if (campaignResult.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } const campaign = campaignResult.rows[0]; // Ensure campaign is in a valid state for sending if (campaign.status !== 'draft' && campaign.status !== 'scheduled') { return res.status(400).json({ error: true, message: `Cannot send campaign with status "${campaign.status}"` }); } // Begin transaction const client = await pool.connect(); try { await client.query('BEGIN'); // Update campaign status await client.query( `UPDATE email_campaigns SET status = 'sending', sent_at = NOW(), updated_at = NOW() WHERE id = $1`, [id] ); // Get recipients from mailing lists if (!campaign.list_ids || campaign.list_ids.length === 0) { await client.query('ROLLBACK'); return res.status(400).json({ error: true, message: 'Campaign has no mailing lists selected' }); } // Prepare the query parameters for the list IDs const listIdParams = campaign.list_ids; const placeholders = listIdParams.map((_, i) => `$${i + 1}`).join(','); // Get active subscribers from selected mailing lists const subscribersQuery = ` SELECT DISTINCT s.id, s.email, s.first_name, s.last_name FROM subscribers s JOIN mailing_list_subscribers ms ON s.id = ms.subscriber_id WHERE ms.list_id IN (${placeholders}) AND s.status = 'active' `; const subscribersResult = await client.query(subscribersQuery, listIdParams); const subscribers = subscribersResult.rows; if (subscribers.length === 0) { await client.query('ROLLBACK'); return res.status(400).json({ error: true, message: 'Selected mailing lists have no active subscribers' }); } // Add recipients to campaign_recipients table for (const subscriber of subscribers) { await client.query( `INSERT INTO campaign_recipients (campaign_id, subscriber_id) VALUES ($1, $2) ON CONFLICT (campaign_id, subscriber_id) DO NOTHING`, [id, subscriber.id] ); } await client.query('COMMIT'); // Now send the actual emails using a background process // We'll update the campaign status after all emails have been sent sendCampaignEmails(id, campaign, subscribers).then(async () => { // Update campaign status to sent after all emails have been processed await query( `UPDATE email_campaigns SET status = 'sent', updated_at = NOW() WHERE id = $1`, [id] ); console.log(`Campaign ${id} completed sending to all recipients`); }).catch(err => { console.error(`Error sending campaign ${id}:`, err); }); res.json({ success: true, message: `Campaign scheduled for sending to ${subscribers.length} recipients`, id }); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } catch (error) { next(error); } }); /** * Helper function to send campaign emails to all subscribers * This runs asynchronously after the HTTP response has been sent * * @param {string} campaignId - The campaign ID * @param {Object} campaign - The campaign object * @param {Array} subscribers - Array of subscriber objects * @returns {Promise} */ async function sendCampaignEmails(campaignId, campaign, subscribers) { // Use a smaller batch size to avoid overwhelming the email server const batchSize = 20; const totalSubscribers = subscribers.length; let processed = 0; try { // Process subscribers in batches for (let i = 0; i < totalSubscribers; i += batchSize) { const batch = subscribers.slice(i, i + batchSize); // Send emails in parallel within each batch await Promise.all(batch.map(async (subscriber) => { try { let personalizedContent = personalizeContent(campaign.content, subscriber); let personalizedSubject = personalizeContent(campaign.subject, subscriber); let personalizedPreheader = campaign.preheader ? personalizeContent(campaign.preheader, subscriber) : ''; await emailService.sendCampaignEmail({ to: subscriber.email, subject: personalizedSubject, preheader: personalizedPreheader, from: `${campaign.from_name} <${campaign.from_email}>`, content: personalizedContent, campaignId: campaignId, subscriberId: subscriber.id }); // Log email sending activity await query( `INSERT INTO subscriber_activity (subscriber_id, campaign_id, type) VALUES ($1, $2, 'sent')`, [subscriber.id, campaignId] ); processed++; // Log progress periodically if (processed % 50 === 0 || processed === totalSubscribers) { console.log(`Campaign ${campaignId}: ${processed}/${totalSubscribers} emails sent`); } } catch (error) { console.error(`Error sending email to ${subscriber.email}:`, error); // Log error in subscriber activity try { await query( `INSERT INTO subscriber_activity (subscriber_id, campaign_id, type, details) VALUES ($1, $2, 'error', $3)`, [subscriber.id, campaignId, error.message.substring(0, 255)] ); } catch (logError) { console.error('Error logging subscriber activity:', logError); } } })); // Add a small delay between batches to avoid rate limiting if (i + batchSize < totalSubscribers) { await new Promise(resolve => setTimeout(resolve, 1000)); } } console.log(`Campaign ${campaignId} completed: ${processed}/${totalSubscribers} emails sent successfully`); return processed; } catch (error) { console.error(`Error sending campaign ${campaignId}:`, error); throw error; } } /** * Schedule campaign for later * POST /api/admin/email-campaigns/:id/schedule */ router.post('/:id/schedule', async (req, res, next) => { try { const { id } = req.params; const { scheduledDate } = req.body; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } if (!scheduledDate) { return res.status(400).json({ error: true, message: 'Scheduled date is required' }); } // Validate date is in the future const scheduleTime = new Date(scheduledDate); const now = new Date(); if (scheduleTime <= now) { return res.status(400).json({ error: true, message: 'Scheduled date must be in the future' }); } // Get campaign details const campaignQuery = ` SELECT * FROM email_campaigns WHERE id = $1 `; const campaignResult = await query(campaignQuery, [id]); if (campaignResult.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } const campaign = campaignResult.rows[0]; // Ensure campaign is in a valid state for scheduling if (campaign.status !== 'draft' && campaign.status !== 'scheduled') { return res.status(400).json({ error: true, message: `Cannot schedule campaign with status "${campaign.status}"` }); } // Update campaign const result = await query( `UPDATE email_campaigns SET status = 'scheduled', scheduled_for = $1, updated_at = NOW() WHERE id = $2 RETURNING *`, [scheduleTime, id] ); res.json({ success: true, message: `Campaign scheduled for ${scheduleTime.toISOString()}`, campaign: result.rows[0] }); } catch (error) { next(error); } }); /** * Get campaign analytics * GET /api/admin/email-campaigns/:id/analytics */ router.get('/:id/analytics', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT id FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } // Get campaign recipients count const recipientsQuery = ` SELECT COUNT(*) as sent FROM campaign_recipients WHERE campaign_id = $1 `; const recipientsResult = await query(recipientsQuery, [id]); const sent = parseInt(recipientsResult.rows[0].sent, 10); // Get delivery stats const deliveryQuery = ` SELECT COUNT(*) FILTER (WHERE a.type = 'bounce') as bounced, COUNT(*) FILTER (WHERE a.type = 'sent') as delivered FROM campaign_recipients cr LEFT JOIN subscriber_activity a ON cr.subscriber_id = a.subscriber_id AND a.campaign_id = cr.campaign_id WHERE cr.campaign_id = $1 `; const deliveryResult = await query(deliveryQuery, [id]); const bounced = parseInt(deliveryResult.rows[0].bounced, 10); const delivered = sent - bounced; // In reality, delivered would be from webhooks // Get engagement stats const engagementQuery = ` SELECT COUNT(DISTINCT subscriber_id) FILTER (WHERE type = 'open') as opened, COUNT(DISTINCT subscriber_id) FILTER (WHERE type = 'click') as clicked, COUNT(DISTINCT subscriber_id) FILTER (WHERE type = 'unsubscribe') as unsubscribed FROM subscriber_activity WHERE campaign_id = $1 `; const engagementResult = await query(engagementQuery, [id]); const opened = parseInt(engagementResult.rows[0].opened, 10); const clicked = parseInt(engagementResult.rows[0].clicked, 10); const unsubscribed = parseInt(engagementResult.rows[0].unsubscribed, 10); // Get link performance const linksQuery = ` SELECT l.id, l.url, l.text, COUNT(a.id) as clicks, COUNT(DISTINCT a.subscriber_id) as unique_clicks FROM campaign_links l LEFT JOIN subscriber_activity a ON l.id = a.link_id AND a.type = 'click' WHERE l.campaign_id = $1 GROUP BY l.id, l.url, l.text ORDER BY unique_clicks DESC `; const linksResult = await query(linksQuery, [id]); // Get opens by hour (for graphing) const opensByHourQuery = ` SELECT DATE_TRUNC('hour', timestamp) as hour, COUNT(*) as opens FROM subscriber_activity WHERE campaign_id = $1 AND type = 'open' GROUP BY hour ORDER BY hour `; const opensByHourResult = await query(opensByHourQuery, [id]); res.json({ sent, delivered, bounced, opened, clicked, unsubscribed, links: linksResult.rows, opens_by_hour: opensByHourResult.rows }); } catch (error) { next(error); } }); /** * Get campaign subscriber activity * GET /api/admin/email-campaigns/:id/activity */ router.get('/:id/activity', async (req, res, next) => { try { const { id } = req.params; const { page = 0, pageSize = 25, search = '', type } = req.query; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT id FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } // Prepare query conditions const conditions = ['a.campaign_id = $1']; const queryParams = [id]; let paramIndex = 2; if (search) { conditions.push(`( s.email ILIKE ${paramIndex} OR s.first_name ILIKE ${paramIndex} OR s.last_name ILIKE ${paramIndex} )`); queryParams.push(`%${search}%`); paramIndex++; } if (type && type !== 'all') { conditions.push(`a.type = ${paramIndex}`); queryParams.push(type); paramIndex++; } // Build WHERE clause const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; // Get total count const countQuery = ` SELECT COUNT(*) as total FROM subscriber_activity a JOIN subscribers s ON a.subscriber_id = s.id ${whereClause} `; const countResult = await query(countQuery, queryParams); const totalCount = parseInt(countResult.rows[0].total, 10); // Get activity with pagination const offset = page * pageSize; const activityQuery = ` SELECT a.id, a.type, a.timestamp, a.details, a.url, s.email, s.first_name, s.last_name FROM subscriber_activity a JOIN subscribers s ON a.subscriber_id = s.id ${whereClause} ORDER BY a.timestamp DESC LIMIT ${paramIndex} OFFSET ${paramIndex + 1} `; queryParams.push(parseInt(pageSize, 10), offset); console.log("queryParams", queryParams) console.log("activityQuery", activityQuery) const activityResult = await query(activityQuery, queryParams); res.json({ activities: activityResult.rows, totalCount, page: parseInt(page, 10), pageSize: parseInt(pageSize, 10) }); } catch (error) { next(error); } }); /** * Export campaign report * GET /api/admin/email-campaigns/:id/export */ router.get('/:id/export', async (req, res, next) => { try { const { id } = req.params; if (!req.user.is_admin) { return res.status(403).json({ error: true, message: 'Admin access required' }); } // Check if campaign exists const campaignCheck = await query( 'SELECT name FROM email_campaigns WHERE id = $1', [id] ); if (campaignCheck.rows.length === 0) { return res.status(404).json({ error: true, message: 'Campaign not found' }); } const campaignName = campaignCheck.rows[0].name; // Get all subscriber activity for this campaign const activityQuery = ` SELECT s.email, s.first_name, s.last_name, a.type, a.timestamp, a.details, a.url FROM subscriber_activity a JOIN subscribers s ON a.subscriber_id = s.id WHERE a.campaign_id = $1 ORDER BY s.email, a.timestamp `; const activityResult = await query(activityQuery, [id]); const activities = activityResult.rows; // Create a temp file for CSV const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const fileName = `campaign-report-${campaignName.replace(/[^a-z0-9]/gi, '-').toLowerCase()}-${timestamp}.csv`; const filePath = path.join(__dirname, '../uploads/temp', fileName); // Create CSV writer const csvWriter = createObjectCsvWriter({ path: filePath, header: [ { id: 'email', title: 'Email' }, { id: 'first_name', title: 'First Name' }, { id: 'last_name', title: 'Last Name' }, { id: 'type', title: 'Activity Type' }, { id: 'timestamp', title: 'Timestamp' }, { id: 'details', title: 'Details' }, { id: 'url', title: 'URL' } ] }); // Format dates in the data const formattedActivities = activities.map(activity => ({ ...activity, timestamp: activity.timestamp ? new Date(activity.timestamp).toISOString() : '' })); // Write to CSV await csvWriter.writeRecords(formattedActivities); // Send file and delete after sending res.download(filePath, fileName, (err) => { // Delete the temp file after sending fs.unlink(filePath, (unlinkErr) => { if (unlinkErr) console.error('Error deleting temp file:', unlinkErr); }); if (err) { console.error('Error sending file:', err); if (!res.headersSent) { res.status(500).json({ error: true, message: 'Error sending file' }); } } }); } catch (error) { next(error); } }); return router; }; /** * Replace personalization variables in content * * @param {string} content - The email content * @param {Object} subscriber - The subscriber object * @returns {string} - Content with variables replaced */ function personalizeContent(content, subscriber) { if (!content) return ''; let personalized = content; if (personalized.includes('{{first_name}}')) { personalized = personalized.replace(/{{first_name}}/g, subscriber.first_name || ''); } if (personalized.includes('{{last_name}}')) { personalized = personalized.replace(/{{last_name}}/g, subscriber.last_name || ''); } if (personalized.includes('{{email}}')) { personalized = personalized.replace(/{{email}}/g, subscriber.email || ''); } return personalized; }