process camapign emails in queue (#104)

* process camapign emails in queue

* remove validity

* add bulk queue

* fix typo
This commit is contained in:
KM Koushik
2025-02-08 21:34:48 +11:00
committed by GitHub
parent de85dde705
commit d497c29eba
4 changed files with 152 additions and 71 deletions

View File

@@ -0,0 +1,9 @@
export const SES_WEBHOOK_QUEUE = "ses-webhook";
export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing";
export const DEFAULT_QUEUE_OPTIONS = {
removeOnComplete: true,
removeOnFail: {
age: 30 * 24 * 3600, // 30 days
},
};

View File

@@ -5,6 +5,12 @@ import { env } from "~/env";
import { Campaign, Contact, EmailStatus } from "@prisma/client";
import { validateDomainFromEmail } from "./domain-service";
import { EmailQueueService } from "./email-queue-service";
import { Queue, Worker } from "bullmq";
import { getRedis } from "../redis";
import {
CAMPAIGN_MAIL_PROCESSING_QUEUE,
DEFAULT_QUEUE_OPTIONS,
} from "../queue/queue-constants";
export async function sendCampaign(id: string) {
let campaign = await db.campaign.findUnique({
@@ -196,6 +202,69 @@ type CampainEmail = {
contacts: Array<Contact>;
};
type CampaignEmailJob = {
contact: Contact;
campaign: Campaign;
emailConfig: {
from: string;
subject: string;
replyTo?: string[];
cc?: string[];
bcc?: string[];
teamId: number;
campaignId: string;
previewText?: string;
domainId: number;
region: string;
};
};
async function processContactEmail(jobData: CampaignEmailJob) {
const { contact, campaign, emailConfig } = jobData;
const jsonContent = JSON.parse(campaign.content || "{}");
const renderer = new EmailRenderer(jsonContent);
const unsubscribeUrl = createUnsubUrl(contact.id, emailConfig.campaignId);
const html = await renderer.render({
shouldReplaceVariableValues: true,
variableValues: {
email: contact.email,
firstName: contact.firstName,
lastName: contact.lastName,
},
linkValues: {
"{{unsend_unsubscribe_url}}": unsubscribeUrl,
},
});
// Create single email
const email = await db.email.create({
data: {
to: [contact.email],
replyTo: emailConfig.replyTo,
cc: emailConfig.cc,
bcc: emailConfig.bcc,
from: emailConfig.from,
subject: emailConfig.subject,
html,
text: emailConfig.previewText,
teamId: emailConfig.teamId,
campaignId: emailConfig.campaignId,
contactId: contact.id,
domainId: emailConfig.domainId,
},
});
// Queue email for sending
await EmailQueueService.queueEmail(
email.id,
emailConfig.region,
false,
unsubscribeUrl
);
}
export async function sendCampaignEmail(
campaign: Campaign,
emailData: CampainEmail
@@ -212,76 +281,31 @@ export async function sendCampaignEmail(
previewText,
} = emailData;
const jsonContent = JSON.parse(campaign.content || "{}");
const renderer = new EmailRenderer(jsonContent);
const domain = await validateDomainFromEmail(from, teamId);
const contactWithHtml = await Promise.all(
contacts.map(async (contact) => {
const unsubscribeUrl = createUnsubUrl(contact.id, campaignId);
console.log("Bulk queueing contacts");
return {
...contact,
html: await renderer.render({
shouldReplaceVariableValues: true,
variableValues: {
email: contact.email,
firstName: contact.firstName,
lastName: contact.lastName,
},
linkValues: {
"{{unsend_unsubscribe_url}}": unsubscribeUrl,
},
}),
};
})
);
// Create emails in bulk
await db.email.createMany({
data: contactWithHtml.map((contact) => ({
to: [contact.email],
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
from,
subject,
html: contact.html,
text: previewText,
teamId,
campaignId,
contactId: contact.id,
domainId: domain.id,
})),
});
// Fetch created emails
const emails = await db.email.findMany({
where: {
teamId,
campaignId,
},
});
// Queue emails
await Promise.all(
emails.map((email) => {
let unsubscribeUrl = undefined;
if (email.contactId) {
unsubscribeUrl = createUnsubUrl(email.contactId, campaignId);
}
EmailQueueService.queueEmail(
email.id,
domain.region,
false,
unsubscribeUrl
);
})
await CampaignEmailService.queueBulkContacts(
contacts.map((contact) => ({
contact,
campaign,
emailConfig: {
from,
subject,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
teamId,
campaignId,
previewText,
domainId: domain.id,
region: domain.region,
},
}))
);
}
@@ -327,3 +351,42 @@ export async function updateCampaignAnalytics(
data: updateData,
});
}
const CAMPAIGN_EMAIL_CONCURRENCY = 200;
class CampaignEmailService {
private static campaignQueue = new Queue(CAMPAIGN_MAIL_PROCESSING_QUEUE, {
connection: getRedis(),
});
static worker = new Worker(
CAMPAIGN_MAIL_PROCESSING_QUEUE,
async (job) => {
await processContactEmail(job.data);
},
{
connection: getRedis(),
concurrency: CAMPAIGN_EMAIL_CONCURRENCY,
}
);
static async queueContact(data: CampaignEmailJob) {
return await this.campaignQueue.add(
`contact-${data.contact.id}`,
data,
DEFAULT_QUEUE_OPTIONS
);
}
static async queueBulkContacts(data: CampaignEmailJob[]) {
return await this.campaignQueue.addBulk(
data.map((item) => ({
name: `contact-${item.contact.id}`,
data: item,
opts: {
...DEFAULT_QUEUE_OPTIONS,
},
}))
);
}
}

View File

@@ -6,6 +6,7 @@ import { getConfigurationSetName } from "~/utils/ses-utils";
import { db } from "../db";
import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses";
import { getRedis } from "../redis";
import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants";
function createQueueAndWorker(region: string, quota: number, suffix: string) {
const connection = getRedis();
@@ -107,7 +108,7 @@ export class EmailQueueService {
queue.add(
emailId,
{ emailId, timestamp: Date.now(), unsubUrl, isBulk },
{ jobId: emailId, delay }
{ jobId: emailId, delay, ...DEFAULT_QUEUE_OPTIONS }
);
}

View File

@@ -5,6 +5,10 @@ import { updateCampaignAnalytics } from "./campaign-service";
import { env } from "~/env";
import { getRedis } from "../redis";
import { Queue, Worker } from "bullmq";
import {
DEFAULT_QUEUE_OPTIONS,
SES_WEBHOOK_QUEUE,
} from "../queue/queue-constants";
export async function parseSesHook(data: SesEvent) {
const mailStatus = getEmailStatus(data);
@@ -157,12 +161,12 @@ function getEmailData(data: SesEvent) {
}
export class SesHookParser {
private static sesHookQueue = new Queue("ses-web-hook", {
private static sesHookQueue = new Queue(SES_WEBHOOK_QUEUE, {
connection: getRedis(),
});
private static worker = new Worker(
"ses-web-hook",
SES_WEBHOOK_QUEUE,
async (job) => {
await this.execute(job.data);
},
@@ -177,6 +181,10 @@ export class SesHookParser {
}
static async queue(data: { event: SesEvent; messageId: string }) {
return await this.sesHookQueue.add(data.messageId, data.event);
return await this.sesHookQueue.add(
data.messageId,
data.event,
DEFAULT_QUEUE_OPTIONS
);
}
}