Add unsend campaign feature (#45)

* Add unsend email editor

Add email editor

Add more email editor

Add renderer partial

Add more marketing email features

* Add more campaign feature

* Add variables

* Getting there

* campaign is there mfs

* Add migration
This commit is contained in:
KM Koushik
2024-08-10 10:09:10 +10:00
committed by GitHub
parent 0c072579b9
commit 5ddc0a7bb9
92 changed files with 11766 additions and 338 deletions

View File

@@ -0,0 +1,309 @@
import { EmailRenderer } from "@unsend/email-editor/src/renderer";
import { db } from "../db";
import { createHash } from "crypto";
import { env } from "~/env";
import { Campaign, Contact, EmailStatus } from "@prisma/client";
import { validateDomainFromEmail } from "./domain-service";
import { EmailQueueService } from "./email-queue-service";
export async function sendCampaign(id: string) {
let campaign = await db.campaign.findUnique({
where: { id },
});
if (!campaign) {
throw new Error("Campaign not found");
}
if (!campaign.content) {
throw new Error("No content added for campaign");
}
let jsonContent: Record<string, any>;
try {
jsonContent = JSON.parse(campaign.content);
const renderer = new EmailRenderer(jsonContent);
const html = await renderer.render();
campaign = await db.campaign.update({
where: { id },
data: { html },
});
} catch (error) {
console.error(error);
throw new Error("Failed to parse campaign content");
}
if (!campaign.contactBookId) {
throw new Error("No contact book found for campaign");
}
const contactBook = await db.contactBook.findUnique({
where: { id: campaign.contactBookId },
include: {
contacts: {
where: {
subscribed: true,
},
},
},
});
if (!contactBook) {
throw new Error("Contact book not found");
}
if (!campaign.html) {
throw new Error("No HTML content for campaign");
}
await sendCampaignEmail(campaign, {
campaignId: campaign.id,
from: campaign.from,
subject: campaign.subject,
html: campaign.html,
replyTo: campaign.replyTo,
cc: campaign.cc,
bcc: campaign.bcc,
teamId: campaign.teamId,
contacts: contactBook.contacts,
});
await db.campaign.update({
where: { id },
data: { status: "SENT", total: contactBook.contacts.length },
});
}
export function createUnsubUrl(contactId: string, campaignId: string) {
const unsubId = `${contactId}-${campaignId}`;
const unsubHash = createHash("sha256")
.update(`${unsubId}-${env.NEXTAUTH_SECRET}`)
.digest("hex");
return `${env.NEXTAUTH_URL}/unsubscribe?id=${unsubId}&hash=${unsubHash}`;
}
export async function unsubscribeContact(id: string, hash: string) {
const [contactId, campaignId] = id.split("-");
if (!contactId || !campaignId) {
throw new Error("Invalid unsubscribe link");
}
// Verify the hash
const expectedHash = createHash("sha256")
.update(`${id}-${env.NEXTAUTH_SECRET}`)
.digest("hex");
if (hash !== expectedHash) {
throw new Error("Invalid unsubscribe link");
}
// Update the contact's subscription status
try {
const contact = await db.contact.findUnique({
where: { id: contactId },
});
if (!contact) {
throw new Error("Contact not found");
}
if (contact.subscribed) {
await db.contact.update({
where: { id: contactId },
data: { subscribed: false },
});
await db.campaign.update({
where: { id: campaignId },
data: {
unsubscribed: {
increment: 1,
},
},
});
}
return contact;
} catch (error) {
console.error("Error unsubscribing contact:", error);
throw new Error("Failed to unsubscribe contact");
}
}
export async function subscribeContact(id: string, hash: string) {
const [contactId, campaignId] = id.split("-");
if (!contactId || !campaignId) {
throw new Error("Invalid subscribe link");
}
// Verify the hash
const expectedHash = createHash("sha256")
.update(`${id}-${env.NEXTAUTH_SECRET}`)
.digest("hex");
if (hash !== expectedHash) {
throw new Error("Invalid subscribe link");
}
// Update the contact's subscription status
try {
const contact = await db.contact.findUnique({
where: { id: contactId },
});
if (!contact) {
throw new Error("Contact not found");
}
if (!contact.subscribed) {
await db.contact.update({
where: { id: contactId },
data: { subscribed: true },
});
await db.campaign.update({
where: { id: campaignId },
data: {
unsubscribed: {
decrement: 1,
},
},
});
}
return true;
} catch (error) {
console.error("Error subscribing contact:", error);
throw new Error("Failed to subscribe contact");
}
}
type CampainEmail = {
campaignId: string;
from: string;
subject: string;
html: string;
replyTo?: string[];
cc?: string[];
bcc?: string[];
teamId: number;
contacts: Array<Contact>;
};
export async function sendCampaignEmail(
campaign: Campaign,
emailData: CampainEmail
) {
const { campaignId, from, subject, replyTo, cc, bcc, teamId, contacts } =
emailData;
const jsonContent = JSON.parse(campaign.content || "{}");
const renderer = new EmailRenderer(jsonContent);
const domain = await validateDomainFromEmail(from, teamId);
const contactWithHtml = await Promise.all(
contacts.map(async (contact) => {
const unsubscribeUrl = createUnsubUrl(contact.id, campaignId);
return {
...contact,
html: await renderer.render({
shouldReplaceVariableValues: true,
variableValues: {
email: contact.email,
firstName: contact.firstName,
lastName: contact.lastName,
},
linkValues: {
"{{unsend_unsubscribe_url}}": unsubscribeUrl,
},
}),
};
})
);
// Create emails in bulk
await db.email.createMany({
data: contactWithHtml.map((contact) => ({
to: [contact.email],
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
from,
subject,
html: contact.html,
teamId,
campaignId,
contactId: contact.id,
domainId: domain.id,
})),
});
// Fetch created emails
const emails = await db.email.findMany({
where: {
teamId,
campaignId,
},
});
// Queue emails
await Promise.all(
emails.map((email) =>
EmailQueueService.queueEmail(email.id, domain.region, false)
)
);
}
export async function updateCampaignAnalytics(
campaignId: string,
emailStatus: EmailStatus
) {
const campaign = await db.campaign.findUnique({
where: { id: campaignId },
});
if (!campaign) {
throw new Error("Campaign not found");
}
const updateData: Record<string, any> = {};
switch (emailStatus) {
case EmailStatus.SENT:
updateData.sent = { increment: 1 };
break;
case EmailStatus.DELIVERED:
updateData.delivered = { increment: 1 };
break;
case EmailStatus.OPENED:
updateData.opened = { increment: 1 };
break;
case EmailStatus.CLICKED:
updateData.clicked = { increment: 1 };
break;
case EmailStatus.BOUNCED:
updateData.bounced = { increment: 1 };
break;
case EmailStatus.COMPLAINED:
updateData.complained = { increment: 1 };
break;
default:
break;
}
await db.campaign.update({
where: { id: campaignId },
data: updateData,
});
}

View File

@@ -0,0 +1,92 @@
import { db } from "../db";
export type ContactInput = {
email: string;
firstName?: string;
lastName?: string;
properties?: Record<string, string>;
subscribed?: boolean;
};
export async function addOrUpdateContact(
contactBookId: string,
contact: ContactInput
) {
const createdContact = await db.contact.upsert({
where: {
contactBookId_email: {
contactBookId,
email: contact.email,
},
},
create: {
contactBookId,
email: contact.email,
firstName: contact.firstName,
lastName: contact.lastName,
properties: contact.properties ?? {},
subscribed: contact.subscribed,
},
update: {
firstName: contact.firstName,
lastName: contact.lastName,
properties: contact.properties ?? {},
subscribed: contact.subscribed,
},
});
return createdContact;
}
export async function updateContact(
contactId: string,
contact: Partial<ContactInput>
) {
return db.contact.update({
where: {
id: contactId,
},
data: contact,
});
}
export async function deleteContact(contactId: string) {
return db.contact.delete({
where: {
id: contactId,
},
});
}
export async function bulkAddContacts(
contactBookId: string,
contacts: Array<ContactInput>
) {
const createdContacts = await Promise.all(
contacts.map((contact) => addOrUpdateContact(contactBookId, contact))
);
return createdContacts;
}
export async function unsubscribeContact(contactId: string) {
await db.contact.update({
where: {
id: contactId,
},
data: {
subscribed: false,
},
});
}
export async function subscribeContact(contactId: string) {
await db.contact.update({
where: {
id: contactId,
},
data: {
subscribed: true,
},
});
}

View File

@@ -4,9 +4,45 @@ import * as tldts from "tldts";
import * as ses from "~/server/aws/ses";
import { db } from "~/server/db";
import { SesSettingsService } from "./ses-settings-service";
import { UnsendApiError } from "../public-api/api-error";
const dnsResolveTxt = util.promisify(dns.resolveTxt);
export async function validateDomainFromEmail(email: string, teamId: number) {
let fromDomain = email.split("@")[1];
if (fromDomain?.endsWith(">")) {
fromDomain = fromDomain.slice(0, -1);
}
if (!fromDomain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "From email is invalid",
});
}
const domain = await db.domain.findUnique({
where: { name: fromDomain, teamId },
});
if (!domain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message:
"Domain of from email is wrong. Use the domain verified by unsend",
});
}
if (domain.status !== "SUCCESS") {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Domain is not verified",
});
}
return domain;
}
export async function createDomain(
teamId: number,
name: string,

View File

@@ -5,54 +5,120 @@ import { getConfigurationSetName } from "~/utils/ses-utils";
import { db } from "../db";
import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses";
import { getRedis } from "../redis";
import { createUnsubUrl } from "./campaign-service";
function createQueueAndWorker(region: string, quota: number, suffix: string) {
const connection = getRedis();
const queueName = `${region}-${suffix}`;
const queue = new Queue(queueName, { connection });
const worker = new Worker(queueName, executeEmail, {
concurrency: quota,
connection,
});
return { queue, worker };
}
export class EmailQueueService {
private static initialized = false;
private static regionQueue = new Map<string, Queue>();
private static regionWorker = new Map<string, Worker>();
public static transactionalQueue = new Map<string, Queue>();
private static transactionalWorker = new Map<string, Worker>();
public static marketingQueue = new Map<string, Queue>();
private static marketingWorker = new Map<string, Worker>();
public static initializeQueue(region: string, quota: number) {
const connection = getRedis();
public static initializeQueue(
region: string,
quota: number,
transactionalQuotaPercentage: number
) {
console.log(`[EmailQueueService]: Initializing queue for region ${region}`);
const queueName = `${region}-transaction`;
const transactionalQuota = Math.floor(
(quota * transactionalQuotaPercentage) / 100
);
const marketingQuota = quota - transactionalQuota;
const queue = new Queue(queueName, { connection });
console.log(
"is transactional queue",
this.transactionalQueue.has(region),
"is marketing queue",
this.marketingQueue.has(region)
);
const worker = new Worker(queueName, executeEmail, {
limiter: {
max: quota,
duration: 1000,
},
concurrency: quota,
connection,
});
if (this.transactionalQueue.has(region)) {
console.log(
`[EmailQueueService]: Updating transactional quota for region ${region} to ${transactionalQuota}`
);
const transactionalWorker = this.transactionalWorker.get(region);
if (transactionalWorker) {
transactionalWorker.concurrency = transactionalQuota;
}
} else {
console.log(
`[EmailQueueService]: Creating transactional queue for region ${region} with quota ${transactionalQuota}`
);
const { queue: transactionalQueue, worker: transactionalWorker } =
createQueueAndWorker(region, transactionalQuota ?? 1, "transaction");
this.transactionalQueue.set(region, transactionalQueue);
this.transactionalWorker.set(region, transactionalWorker);
}
this.regionQueue.set(region, queue);
this.regionWorker.set(region, worker);
if (this.marketingQueue.has(region)) {
console.log(
`[EmailQueueService]: Updating marketing quota for region ${region} to ${marketingQuota}`
);
const marketingWorker = this.marketingWorker.get(region);
if (marketingWorker) {
marketingWorker.concurrency = marketingQuota;
}
} else {
console.log(
`[EmailQueueService]: Creating marketing queue for region ${region} with quota ${marketingQuota}`
);
const { queue: marketingQueue, worker: marketingWorker } =
createQueueAndWorker(region, marketingQuota ?? 1, "marketing");
this.marketingQueue.set(region, marketingQueue);
this.marketingWorker.set(region, marketingWorker);
}
}
public static async queueEmail(emailId: string, region: string) {
public static async queueEmail(
emailId: string,
region: string,
transactional: boolean,
unsubUrl?: string
) {
if (!this.initialized) {
await this.init();
}
const queue = this.regionQueue.get(region);
const queue = transactional
? this.transactionalQueue.get(region)
: this.marketingQueue.get(region);
if (!queue) {
throw new Error(`Queue for region ${region} not found`);
}
queue.add("send-email", { emailId, timestamp: Date.now() });
queue.add("send-email", { emailId, timestamp: Date.now(), unsubUrl });
}
public static async init() {
const sesSettings = await db.sesSetting.findMany();
for (const sesSetting of sesSettings) {
this.initializeQueue(sesSetting.region, sesSetting.sesEmailRateLimit);
this.initializeQueue(
sesSetting.region,
sesSetting.sesEmailRateLimit,
sesSetting.transactionalQuota
);
}
this.initialized = true;
}
}
async function executeEmail(job: Job<{ emailId: string; timestamp: number }>) {
async function executeEmail(
job: Job<{ emailId: string; timestamp: number; unsubUrl?: string }>
) {
console.log(
`[EmailQueueService]: Executing email job ${job.data.emailId}, time elapsed: ${Date.now() - job.data.timestamp}ms`
);
@@ -88,13 +154,15 @@ async function executeEmail(job: Job<{ emailId: string; timestamp: number }>) {
}
console.log(`[EmailQueueService]: Sending email ${email.id}`);
const unsubUrl = job.data.unsubUrl;
try {
const messageId = attachments.length
? await sendEmailWithAttachments({
to: email.to,
from: email.from,
subject: email.subject,
text: email.text ?? undefined,
text: email.text ?? "",
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName,
@@ -105,11 +173,12 @@ async function executeEmail(job: Job<{ emailId: string; timestamp: number }>) {
from: email.from,
subject: email.subject,
replyTo: email.replyTo ?? undefined,
text: email.text ?? undefined,
text: email.text ?? "",
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName,
attachments,
unsubUrl,
});
// Delete attachments after sending the email

View File

@@ -2,7 +2,14 @@ import { EmailContent } from "~/types";
import { db } from "../db";
import { UnsendApiError } from "~/server/public-api/api-error";
import { EmailQueueService } from "./email-queue-service";
import { validateDomainFromEmail } from "./domain-service";
import { Campaign, Contact } from "@prisma/client";
import { EmailRenderer } from "@unsend/email-editor/src/renderer";
import { createUnsubUrl } from "./campaign-service";
/**
Send transactional email
*/
export async function sendEmail(
emailContent: EmailContent & { teamId: number }
) {
@@ -19,29 +26,7 @@ export async function sendEmail(
bcc,
} = emailContent;
let fromDomain = from.split("@")[1];
if (fromDomain?.endsWith(">")) {
fromDomain = fromDomain.slice(0, -1);
}
const domain = await db.domain.findFirst({
where: { teamId, name: fromDomain },
});
if (!domain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message:
"Domain of from email is wrong. Use the email verified by unsend",
});
}
if (domain.status !== "SUCCESS") {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Domain is not verified",
});
}
const domain = await validateDomainFromEmail(from, teamId);
const email = await db.email.create({
data: {
@@ -64,7 +49,7 @@ export async function sendEmail(
});
try {
await EmailQueueService.queueEmail(email.id, domain.region);
await EmailQueueService.queueEmail(email.id, domain.region, true);
} catch (error: any) {
await db.emailEvent.create({
data: {

View File

@@ -1,8 +1,8 @@
import { EmailStatus } from "@prisma/client";
import { SesEvent, SesEventDataKey } from "~/types/aws-types";
import { EmailStatus, Prisma } from "@prisma/client";
import { SesClick, SesEvent, SesEventDataKey } from "~/types/aws-types";
import { db } from "../db";
const STATUS_LIST = Object.values(EmailStatus);
import { updateCampaignAnalytics } from "./campaign-service";
import { env } from "~/env";
export async function parseSesHook(data: SesEvent) {
const mailStatus = getEmailStatus(data);
@@ -34,14 +34,34 @@ export async function parseSesHook(data: SesEvent) {
return true;
}
await db.email.update({
where: {
id: email.id,
},
data: {
latestStatus: getLatestStatus(email.latestStatus, mailStatus),
},
});
// Update the latest status and to avoid race conditions
await db.$executeRaw`
UPDATE "Email"
SET "latestStatus" = CASE
WHEN ${mailStatus}::text::\"EmailStatus\" > "latestStatus" OR "latestStatus" IS NULL
THEN ${mailStatus}::text::\"EmailStatus\"
ELSE "latestStatus"
END
WHERE id = ${email.id}
`;
if (email.campaignId) {
if (
mailStatus !== "CLICKED" ||
!(mailData as SesClick).link.startsWith(`${env.NEXTAUTH_URL}/unsubscribe`)
) {
const mailEvent = await db.emailEvent.findFirst({
where: {
emailId: email.id,
status: mailStatus,
},
});
if (!mailEvent) {
await updateCampaignAnalytics(email.campaignId, mailStatus);
}
}
}
await db.emailEvent.create({
data: {
@@ -89,12 +109,3 @@ function getEmailData(data: SesEvent) {
return data[eventType.toLowerCase() as SesEventDataKey];
}
}
function getLatestStatus(
currentEmailStatus: EmailStatus,
incomingStatus: EmailStatus
) {
const index = STATUS_LIST.indexOf(currentEmailStatus);
const incomingIndex = STATUS_LIST.indexOf(incomingStatus);
return STATUS_LIST[Math.max(index, incomingIndex)] ?? incomingStatus;
}

View File

@@ -52,9 +52,13 @@ export class SesSettingsService {
public static async createSesSetting({
region,
unsendUrl,
sendingRateLimit,
transactionalQuota,
}: {
region: string;
unsendUrl: string;
sendingRateLimit: number;
transactionalQuota: number;
}) {
await this.checkInitialized();
if (this.cache[region]) {
@@ -80,12 +84,62 @@ export class SesSettingsService {
region,
callbackUrl: `${parsedUrl}/api/ses_callback`,
topic: `${idPrefix}-${region}-unsend`,
sesEmailRateLimit: sendingRateLimit,
transactionalQuota,
idPrefix,
},
});
await createSettingInAws(setting);
EmailQueueService.initializeQueue(region, setting.sesEmailRateLimit);
EmailQueueService.initializeQueue(
region,
setting.sesEmailRateLimit,
setting.transactionalQuota
);
console.log(
EmailQueueService.transactionalQueue,
EmailQueueService.marketingQueue
);
await this.invalidateCache();
}
public static async updateSesSetting({
id,
sendingRateLimit,
transactionalQuota,
}: {
id: string;
sendingRateLimit: number;
transactionalQuota: number;
}) {
await this.checkInitialized();
const setting = await db.sesSetting.update({
where: {
id,
},
data: {
transactionalQuota,
sesEmailRateLimit: sendingRateLimit,
},
});
console.log(
EmailQueueService.transactionalQueue,
EmailQueueService.marketingQueue
);
EmailQueueService.initializeQueue(
setting.region,
setting.sesEmailRateLimit,
setting.transactionalQuota
);
console.log(
EmailQueueService.transactionalQueue,
EmailQueueService.marketingQueue
);
await this.invalidateCache();
}