diff --git a/apps/web/src/server/queue/queue-constants.ts b/apps/web/src/server/queue/queue-constants.ts new file mode 100644 index 0000000..e00ce95 --- /dev/null +++ b/apps/web/src/server/queue/queue-constants.ts @@ -0,0 +1,9 @@ +export const SES_WEBHOOK_QUEUE = "ses-webhook"; +export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing"; + +export const DEFAULT_QUEUE_OPTIONS = { + removeOnComplete: true, + removeOnFail: { + age: 30 * 24 * 3600, // 30 days + }, +}; diff --git a/apps/web/src/server/service/campaign-service.ts b/apps/web/src/server/service/campaign-service.ts index c245e0d..890d04b 100644 --- a/apps/web/src/server/service/campaign-service.ts +++ b/apps/web/src/server/service/campaign-service.ts @@ -5,6 +5,12 @@ import { env } from "~/env"; import { Campaign, Contact, EmailStatus } from "@prisma/client"; import { validateDomainFromEmail } from "./domain-service"; import { EmailQueueService } from "./email-queue-service"; +import { Queue, Worker } from "bullmq"; +import { getRedis } from "../redis"; +import { + CAMPAIGN_MAIL_PROCESSING_QUEUE, + DEFAULT_QUEUE_OPTIONS, +} from "../queue/queue-constants"; export async function sendCampaign(id: string) { let campaign = await db.campaign.findUnique({ @@ -196,6 +202,69 @@ type CampainEmail = { contacts: Array; }; +type CampaignEmailJob = { + contact: Contact; + campaign: Campaign; + emailConfig: { + from: string; + subject: string; + replyTo?: string[]; + cc?: string[]; + bcc?: string[]; + teamId: number; + campaignId: string; + previewText?: string; + domainId: number; + region: string; + }; +}; + +async function processContactEmail(jobData: CampaignEmailJob) { + const { contact, campaign, emailConfig } = jobData; + const jsonContent = JSON.parse(campaign.content || "{}"); + const renderer = new EmailRenderer(jsonContent); + + const unsubscribeUrl = createUnsubUrl(contact.id, emailConfig.campaignId); + + const html = await renderer.render({ + shouldReplaceVariableValues: true, + variableValues: { + email: contact.email, + firstName: contact.firstName, + lastName: contact.lastName, + }, + linkValues: { + "{{unsend_unsubscribe_url}}": unsubscribeUrl, + }, + }); + + // Create single email + const email = await db.email.create({ + data: { + to: [contact.email], + replyTo: emailConfig.replyTo, + cc: emailConfig.cc, + bcc: emailConfig.bcc, + from: emailConfig.from, + subject: emailConfig.subject, + html, + text: emailConfig.previewText, + teamId: emailConfig.teamId, + campaignId: emailConfig.campaignId, + contactId: contact.id, + domainId: emailConfig.domainId, + }, + }); + + // Queue email for sending + await EmailQueueService.queueEmail( + email.id, + emailConfig.region, + false, + unsubscribeUrl + ); +} + export async function sendCampaignEmail( campaign: Campaign, emailData: CampainEmail @@ -212,76 +281,31 @@ export async function sendCampaignEmail( previewText, } = emailData; - const jsonContent = JSON.parse(campaign.content || "{}"); - const renderer = new EmailRenderer(jsonContent); - const domain = await validateDomainFromEmail(from, teamId); - const contactWithHtml = await Promise.all( - contacts.map(async (contact) => { - const unsubscribeUrl = createUnsubUrl(contact.id, campaignId); + console.log("Bulk queueing contacts"); - return { - ...contact, - html: await renderer.render({ - shouldReplaceVariableValues: true, - variableValues: { - email: contact.email, - firstName: contact.firstName, - lastName: contact.lastName, - }, - linkValues: { - "{{unsend_unsubscribe_url}}": unsubscribeUrl, - }, - }), - }; - }) - ); - - // Create emails in bulk - await db.email.createMany({ - data: contactWithHtml.map((contact) => ({ - to: [contact.email], - replyTo: replyTo - ? Array.isArray(replyTo) - ? replyTo - : [replyTo] - : undefined, - cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, - bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, - from, - subject, - html: contact.html, - text: previewText, - teamId, - campaignId, - contactId: contact.id, - domainId: domain.id, - })), - }); - - // Fetch created emails - const emails = await db.email.findMany({ - where: { - teamId, - campaignId, - }, - }); - - // Queue emails - await Promise.all( - emails.map((email) => { - let unsubscribeUrl = undefined; - if (email.contactId) { - unsubscribeUrl = createUnsubUrl(email.contactId, campaignId); - } - EmailQueueService.queueEmail( - email.id, - domain.region, - false, - unsubscribeUrl - ); - }) + await CampaignEmailService.queueBulkContacts( + contacts.map((contact) => ({ + contact, + campaign, + emailConfig: { + from, + subject, + replyTo: replyTo + ? Array.isArray(replyTo) + ? replyTo + : [replyTo] + : undefined, + cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, + bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, + teamId, + campaignId, + previewText, + domainId: domain.id, + region: domain.region, + }, + })) ); } @@ -327,3 +351,42 @@ export async function updateCampaignAnalytics( data: updateData, }); } + +const CAMPAIGN_EMAIL_CONCURRENCY = 200; + +class CampaignEmailService { + private static campaignQueue = new Queue(CAMPAIGN_MAIL_PROCESSING_QUEUE, { + connection: getRedis(), + }); + + static worker = new Worker( + CAMPAIGN_MAIL_PROCESSING_QUEUE, + async (job) => { + await processContactEmail(job.data); + }, + { + connection: getRedis(), + concurrency: CAMPAIGN_EMAIL_CONCURRENCY, + } + ); + + static async queueContact(data: CampaignEmailJob) { + return await this.campaignQueue.add( + `contact-${data.contact.id}`, + data, + DEFAULT_QUEUE_OPTIONS + ); + } + + static async queueBulkContacts(data: CampaignEmailJob[]) { + return await this.campaignQueue.addBulk( + data.map((item) => ({ + name: `contact-${item.contact.id}`, + data: item, + opts: { + ...DEFAULT_QUEUE_OPTIONS, + }, + })) + ); + } +} diff --git a/apps/web/src/server/service/email-queue-service.ts b/apps/web/src/server/service/email-queue-service.ts index 37d8524..0954ed9 100644 --- a/apps/web/src/server/service/email-queue-service.ts +++ b/apps/web/src/server/service/email-queue-service.ts @@ -6,6 +6,7 @@ import { getConfigurationSetName } from "~/utils/ses-utils"; import { db } from "../db"; import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses"; import { getRedis } from "../redis"; +import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants"; function createQueueAndWorker(region: string, quota: number, suffix: string) { const connection = getRedis(); @@ -107,7 +108,7 @@ export class EmailQueueService { queue.add( emailId, { emailId, timestamp: Date.now(), unsubUrl, isBulk }, - { jobId: emailId, delay } + { jobId: emailId, delay, ...DEFAULT_QUEUE_OPTIONS } ); } diff --git a/apps/web/src/server/service/ses-hook-parser.ts b/apps/web/src/server/service/ses-hook-parser.ts index f09aa22..4de3130 100644 --- a/apps/web/src/server/service/ses-hook-parser.ts +++ b/apps/web/src/server/service/ses-hook-parser.ts @@ -5,6 +5,10 @@ import { updateCampaignAnalytics } from "./campaign-service"; import { env } from "~/env"; import { getRedis } from "../redis"; import { Queue, Worker } from "bullmq"; +import { + DEFAULT_QUEUE_OPTIONS, + SES_WEBHOOK_QUEUE, +} from "../queue/queue-constants"; export async function parseSesHook(data: SesEvent) { const mailStatus = getEmailStatus(data); @@ -157,12 +161,12 @@ function getEmailData(data: SesEvent) { } export class SesHookParser { - private static sesHookQueue = new Queue("ses-web-hook", { + private static sesHookQueue = new Queue(SES_WEBHOOK_QUEUE, { connection: getRedis(), }); private static worker = new Worker( - "ses-web-hook", + SES_WEBHOOK_QUEUE, async (job) => { await this.execute(job.data); }, @@ -177,6 +181,10 @@ export class SesHookParser { } static async queue(data: { event: SesEvent; messageId: string }) { - return await this.sesHookQueue.add(data.messageId, data.event); + return await this.sesHookQueue.add( + data.messageId, + data.event, + DEFAULT_QUEUE_OPTIONS + ); } }