Improve Self host setup (#30)

* Add self host setup

* Improve blunders

* Move to bull mq

* More changes

* Add example code for sending test emails
This commit is contained in:
KM Koushik
2024-06-24 08:21:37 +10:00
committed by GitHub
parent 8a2769621c
commit f77a8829be
67 changed files with 1771 additions and 688 deletions

View File

@@ -1,38 +0,0 @@
import { db } from "../db";
import { JsonValue } from "@prisma/client/runtime/library";
export class AppSettingsService {
private static cache: Record<string, JsonValue> = {};
public static async getSetting(key: string) {
if (!this.cache[key]) {
const setting = await db.appSetting.findUnique({
where: { key },
});
if (setting) {
this.cache[key] = setting.value;
} else {
return null;
}
}
return this.cache[key];
}
public static async setSetting(key: string, value: string) {
await db.appSetting.upsert({
where: { key },
update: { value },
create: { key, value },
});
this.cache[key] = value;
return value;
}
public static async initializeCache(): Promise<void> {
const settings = await db.appSetting.findMany();
settings.forEach((setting) => {
this.cache[setting.key] = setting.value;
});
}
}

View File

@@ -3,18 +3,29 @@ import util from "util";
import * as tldts from "tldts";
import * as ses from "~/server/aws/ses";
import { db } from "~/server/db";
import { SesSettingsService } from "./ses-settings-service";
const dnsResolveTxt = util.promisify(dns.resolveTxt);
export async function createDomain(teamId: number, name: string) {
export async function createDomain(
teamId: number,
name: string,
region: string
) {
const domainStr = tldts.getDomain(name);
if (!domainStr) {
throw new Error("Invalid domain");
}
const setting = await SesSettingsService.getSetting(region);
if (!setting) {
throw new Error("Ses setting not found");
}
const subdomain = tldts.getSubdomain(name);
const publicKey = await ses.addDomain(name);
const publicKey = await ses.addDomain(name, region);
const domain = await db.domain.create({
data: {
@@ -22,6 +33,7 @@ export async function createDomain(teamId: number, name: string) {
publicKey,
teamId,
subdomain,
region,
},
});

View File

@@ -0,0 +1,135 @@
import { Job, Queue, Worker } from "bullmq";
import { env } from "~/env";
import { EmailAttachment } from "~/types";
import { getConfigurationSetName } from "~/utils/ses-utils";
import { db } from "../db";
import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses";
import { getRedis } from "../redis";
export class EmailQueueService {
private static initialized = false;
private static regionQueue = new Map<string, Queue>();
private static regionWorker = new Map<string, Worker>();
public static initializeQueue(region: string, quota: number) {
const connection = getRedis();
console.log(`[EmailQueueService]: Initializing queue for region ${region}`);
const queueName = `${region}-transaction`;
const queue = new Queue(queueName, { connection });
const worker = new Worker(queueName, executeEmail, {
limiter: {
max: quota,
duration: 1000,
},
concurrency: quota,
connection,
});
this.regionQueue.set(region, queue);
this.regionWorker.set(region, worker);
}
public static async queueEmail(emailId: string, region: string) {
if (!this.initialized) {
await this.init();
}
const queue = this.regionQueue.get(region);
if (!queue) {
throw new Error(`Queue for region ${region} not found`);
}
queue.add("send-email", { emailId, timestamp: Date.now() });
}
public static async init() {
const sesSettings = await db.sesSetting.findMany();
for (const sesSetting of sesSettings) {
this.initializeQueue(sesSetting.region, sesSetting.sesEmailRateLimit);
}
this.initialized = true;
}
}
async function executeEmail(job: Job<{ emailId: string; timestamp: number }>) {
console.log(
`[EmailQueueService]: Executing email job ${job.data.emailId}, time elapsed: ${Date.now() - job.data.timestamp}ms`
);
const email = await db.email.findUnique({
where: { id: job.data.emailId },
});
const domain = email?.domainId
? await db.domain.findUnique({
where: { id: email?.domainId },
})
: null;
if (!email) {
console.log(`[EmailQueueService]: Email not found, skipping`);
return;
}
const attachments: Array<EmailAttachment> = email.attachments
? JSON.parse(email.attachments)
: [];
const configurationSetName = await getConfigurationSetName(
domain?.clickTracking ?? false,
domain?.openTracking ?? false,
domain?.region ?? env.AWS_DEFAULT_REGION
);
if (!configurationSetName) {
console.log(`[EmailQueueService]: Configuration set not found, skipping`);
return;
}
console.log(`[EmailQueueService]: Sending email ${email.id}`);
try {
const messageId = attachments.length
? await sendEmailWithAttachments({
to: email.to,
from: email.from,
subject: email.subject,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName,
attachments,
})
: await sendEmailThroughSes({
to: email.to,
from: email.from,
subject: email.subject,
replyTo: email.replyTo ?? undefined,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName,
attachments,
});
// Delete attachments after sending the email
await db.email.update({
where: { id: email.id },
data: { sesEmailId: messageId, attachments: undefined },
});
} catch (error: any) {
await db.emailEvent.create({
data: {
emailId: email.id,
status: "FAILED",
data: {
error: error.toString(),
},
},
});
await db.email.update({
where: { id: email.id },
data: { latestStatus: "FAILED" },
});
}
}

View File

@@ -1,13 +1,23 @@
import { EmailContent } from "~/types";
import { db } from "../db";
import { UnsendApiError } from "~/server/public-api/api-error";
import { queueEmail } from "./job-service";
import { EmailQueueService } from "./email-queue-service";
export async function sendEmail(
emailContent: EmailContent & { teamId: number }
) {
const { to, from, subject, text, html, teamId, attachments, replyTo } =
emailContent;
const {
to,
from,
subject,
text,
html,
teamId,
attachments,
replyTo,
cc,
bcc,
} = emailContent;
const fromDomain = from.split("@")[1];
@@ -32,10 +42,16 @@ export async function sendEmail(
const email = await db.email.create({
data: {
to,
to: Array.isArray(to) ? to : [to],
from,
subject,
replyTo,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
text,
html,
teamId,
@@ -44,7 +60,24 @@ export async function sendEmail(
},
});
queueEmail(email.id);
try {
await EmailQueueService.queueEmail(email.id, domain.region);
} catch (error: any) {
await db.emailEvent.create({
data: {
emailId: email.id,
status: "FAILED",
data: {
error: error.toString(),
},
},
});
await db.email.update({
where: { id: email.id },
data: { latestStatus: "FAILED" },
});
throw error;
}
return email;
}

View File

@@ -1,109 +0,0 @@
import pgBoss from "pg-boss";
import { env } from "~/env";
import { EmailAttachment } from "~/types";
import { db } from "~/server/db";
import {
sendEmailThroughSes,
sendEmailWithAttachments,
} from "~/server/aws/ses";
import { getConfigurationSetName } from "~/utils/ses-utils";
import { sendToDiscord } from "./notification-service";
const boss = new pgBoss({
connectionString: env.DATABASE_URL,
archiveCompletedAfterSeconds: 60 * 60 * 24, // 24 hours
deleteAfterDays: 7, // 7 days
});
let started = false;
export async function getBoss() {
if (!started) {
await boss.start();
await boss.work(
"send_email",
{
teamConcurrency: env.SES_QUEUE_LIMIT,
teamSize: env.SES_QUEUE_LIMIT,
teamRefill: true,
},
executeEmail
);
boss.on("error", async (error) => {
console.error(error);
sendToDiscord(
`Error in pg-boss: ${error.name} \n ${error.cause}\n ${error.message}\n ${error.stack}`
);
await boss.stop();
started = false;
});
started = true;
}
return boss;
}
export async function queueEmail(emailId: string) {
const boss = await getBoss();
await boss.send("send_email", { emailId, timestamp: Date.now() });
}
async function executeEmail(
job: pgBoss.Job<{ emailId: string; timestamp: number }>
) {
console.log(
`[EmailJob]: Executing email job ${job.data.emailId}, time elapsed: ${Date.now() - job.data.timestamp}ms`
);
const email = await db.email.findUnique({
where: { id: job.data.emailId },
});
const domain = email?.domainId
? await db.domain.findUnique({
where: { id: email?.domainId },
})
: null;
if (!email) {
console.log(`[EmailJob]: Email not found, skipping`);
return;
}
const attachments: Array<EmailAttachment> = email.attachments
? JSON.parse(email.attachments)
: [];
const messageId = attachments.length
? await sendEmailWithAttachments({
to: email.to,
from: email.from,
subject: email.subject,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName: getConfigurationSetName(
domain?.clickTracking ?? false,
domain?.openTracking ?? false
),
attachments,
})
: await sendEmailThroughSes({
to: email.to,
from: email.from,
subject: email.subject,
replyTo: email.replyTo ?? undefined,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName: getConfigurationSetName(
domain?.clickTracking ?? false,
domain?.openTracking ?? false
),
attachments,
});
await db.email.update({
where: { id: email.id },
data: { sesEmailId: messageId, attachments: undefined },
});
}

View File

@@ -2,6 +2,8 @@ import { EmailStatus } from "@prisma/client";
import { SesEvent, SesEventDataKey } from "~/types/aws-types";
import { db } from "../db";
const STATUS_LIST = Object.values(EmailStatus);
export async function parseSesHook(data: SesEvent) {
const mailStatus = getEmailStatus(data);
@@ -30,21 +32,12 @@ export async function parseSesHook(data: SesEvent) {
id: email.id,
},
data: {
latestStatus: mailStatus,
latestStatus: getLatestStatus(email.latestStatus, mailStatus),
},
});
await db.emailEvent.upsert({
where: {
emailId_status: {
emailId: email.id,
status: mailStatus,
},
},
update: {
data: mailData as any,
},
create: {
await db.emailEvent.create({
data: {
emailId: email.id,
status: mailStatus,
data: mailData as any,
@@ -89,3 +82,12 @@ function getEmailData(data: SesEvent) {
return data[eventType.toLowerCase() as SesEventDataKey];
}
}
function getLatestStatus(
currentEmailStatus: EmailStatus,
incomingStatus: EmailStatus
) {
const index = STATUS_LIST.indexOf(currentEmailStatus);
const incomingIndex = STATUS_LIST.indexOf(incomingStatus);
return STATUS_LIST[Math.max(index, incomingIndex)] ?? incomingStatus;
}

View File

@@ -5,8 +5,9 @@ import { customAlphabet } from "nanoid";
import * as sns from "~/server/aws/sns";
import * as ses from "~/server/aws/ses";
import { EventType } from "@aws-sdk/client-sesv2";
import { EmailQueueService } from "./email-queue-service";
const nanoid = customAlphabet("1234567890abcdef", 10);
const nanoid = customAlphabet("1234567890abcdefghijklmnopqrstuvwxyz", 10);
const GENERAL_EVENTS: EventType[] = [
"BOUNCE",
@@ -21,15 +22,26 @@ const GENERAL_EVENTS: EventType[] = [
export class SesSettingsService {
private static cache: Record<string, SesSetting> = {};
private static topicArns: Array<string> = [];
private static initialized = false;
public static getSetting(region = env.AWS_DEFAULT_REGION): SesSetting | null {
public static async getSetting(
region = env.AWS_DEFAULT_REGION
): Promise<SesSetting | null> {
await this.checkInitialized();
if (this.cache[region]) {
return this.cache[region] as SesSetting;
}
return null;
}
public static getAllSettings() {
public static async getTopicArns() {
await this.checkInitialized();
return this.topicArns;
}
public static async getAllSettings() {
await this.checkInitialized();
return Object.values(this.cache);
}
@@ -46,15 +58,20 @@ export class SesSettingsService {
region: string;
unsendUrl: string;
}) {
await this.checkInitialized();
if (this.cache[region]) {
throw new Error(`SesSetting for region ${region} already exists`);
}
const unsendUrlValidation = await isValidUnsendUrl(unsendUrl);
const parsedUrl = unsendUrl.endsWith("/")
? unsendUrl.substring(0, unsendUrl.length - 1)
: unsendUrl;
const unsendUrlValidation = await isValidUnsendUrl(parsedUrl);
if (!unsendUrlValidation.isValid) {
throw new Error(
`Unsend URL ${unsendUrl} is not valid, status: ${unsendUrlValidation.code} ${unsendUrlValidation.error}`
`Unsend URL: ${unsendUrl} is not valid, status: ${unsendUrlValidation.code} message:${unsendUrlValidation.error}`
);
}
@@ -63,28 +80,35 @@ export class SesSettingsService {
const setting = await db.sesSetting.create({
data: {
region,
callbackUrl: `${unsendUrl}/api/ses_callback`,
callbackUrl: `${parsedUrl}/api/ses_callback`,
topic: `${idPrefix}-${region}-unsend`,
idPrefix,
},
});
await createSettingInAws(setting);
EmailQueueService.initializeQueue(region, setting.sesEmailRateLimit);
this.invalidateCache();
await this.invalidateCache();
}
public static async init() {
public static async checkInitialized() {
if (!this.initialized) {
await this.invalidateCache();
this.initialized = true;
}
}
static async invalidateCache() {
this.cache = {};
const settings = await db.sesSetting.findMany();
settings.forEach((setting) => {
this.cache[setting.region] = setting;
if (setting.topicArn) {
this.topicArns.push(setting.topicArn);
}
});
}
static invalidateCache() {
this.cache = {};
this.init();
}
}
async function createSettingInAws(setting: SesSetting) {
@@ -95,18 +119,13 @@ async function createSettingInAws(setting: SesSetting) {
* Creates a new topic in AWS and subscribes the callback URL to it
*/
async function registerTopicInAws(setting: SesSetting) {
const topicArn = await sns.createTopic(setting.topic);
const topicArn = await sns.createTopic(setting.topic, setting.region);
if (!topicArn) {
throw new Error("Failed to create SNS topic");
}
await sns.subscribeEndpoint(
topicArn,
`${setting.callbackUrl}/api/ses_callback`
);
return await db.sesSetting.update({
const _setting = await db.sesSetting.update({
where: {
id: setting.id,
},
@@ -114,6 +133,17 @@ async function registerTopicInAws(setting: SesSetting) {
topicArn,
},
});
// Invalidate the cache to update the topicArn list
SesSettingsService.invalidateCache();
await sns.subscribeEndpoint(
topicArn,
`${setting.callbackUrl}`,
setting.region
);
return _setting;
}
/**
@@ -133,28 +163,32 @@ async function registerConfigurationSet(setting: SesSetting) {
const generalStatus = await ses.addWebhookConfiguration(
configGeneral,
setting.topicArn,
GENERAL_EVENTS
GENERAL_EVENTS,
setting.region
);
const configClick = `${setting.idPrefix}-${setting.region}-unsend-click`;
const clickStatus = await ses.addWebhookConfiguration(
configClick,
setting.topicArn,
[...GENERAL_EVENTS, "CLICK"]
[...GENERAL_EVENTS, "CLICK"],
setting.region
);
const configOpen = `${setting.idPrefix}-${setting.region}-unsend-open`;
const openStatus = await ses.addWebhookConfiguration(
configOpen,
setting.topicArn,
[...GENERAL_EVENTS, "OPEN"]
[...GENERAL_EVENTS, "OPEN"],
setting.region
);
const configFull = `${setting.idPrefix}-${setting.region}-unsend-full`;
const fullStatus = await ses.addWebhookConfiguration(
configFull,
setting.topicArn,
[...GENERAL_EVENTS, "CLICK", "OPEN"]
[...GENERAL_EVENTS, "CLICK", "OPEN"],
setting.region
);
return await db.sesSetting.update({
@@ -175,10 +209,10 @@ async function registerConfigurationSet(setting: SesSetting) {
}
async function isValidUnsendUrl(url: string) {
console.log("Checking if URL is valid", url);
try {
const response = await fetch(`${url}/api/ses_callback`, {
method: "POST",
body: JSON.stringify({ fromUnsend: true }),
method: "GET",
});
return {
isValid: response.status === 200,
@@ -186,6 +220,7 @@ async function isValidUnsendUrl(url: string) {
error: response.statusText,
};
} catch (e) {
console.log("Error checking if URL is valid", e);
return {
isValid: false,
code: 500,