Add email queue (#1)

* Add pgboss queue support

* Implement queue for sending emails

* Add migrations
This commit is contained in:
KM Koushik
2024-05-10 16:21:26 +10:00
committed by GitHub
parent 5931174889
commit 64c7613d8c
11 changed files with 329 additions and 71 deletions

View File

@@ -38,6 +38,7 @@
"mime-types": "^2.1.35",
"next": "^14.2.1",
"next-auth": "^4.24.6",
"pg-boss": "^9.0.3",
"pnpm": "^8.15.5",
"prisma": "^5.11.0",
"query-string": "^9.0.0",

View File

@@ -0,0 +1,5 @@
-- AlterEnum
ALTER TYPE "EmailStatus" ADD VALUE 'QUEUED';
-- AlterTable
ALTER TABLE "Email" ADD COLUMN "attachments" TEXT;

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Email" ALTER COLUMN "latestStatus" SET DEFAULT 'QUEUED';

View File

@@ -139,6 +139,7 @@ model ApiKey {
}
enum EmailStatus {
QUEUED
SENT
OPENED
CLICKED
@@ -158,11 +159,12 @@ model Email {
subject String
text String?
html String?
latestStatus EmailStatus @default(SENT)
latestStatus EmailStatus @default(QUEUED)
teamId Int
domainId Int?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
attachments String?
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
emailEvents EmailEvent[]
}

View File

@@ -38,6 +38,8 @@ export const env = createEnv({
UNSEND_URL: z.string(),
GOOGLE_CLIENT_ID: z.string(),
GOOGLE_CLIENT_SECRET: z.string(),
SES_QUEUE_LIMIT: z.string().transform((str) => parseInt(str, 10)),
AWS_DEFAULT_REGION: z.string().default("us-east-1"),
},
/**
@@ -68,6 +70,8 @@ export const env = createEnv({
UNSEND_URL: process.env.UNSEND_URL,
GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID,
GOOGLE_CLIENT_SECRET: process.env.GOOGLE_CLIENT_SECRET,
SES_QUEUE_LIMIT: process.env.SES_QUEUE_LIMIT,
AWS_DEFAULT_REGION: process.env.AWS_DEFAULT_REGION,
},
/**
* Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation. This is especially

View File

@@ -172,12 +172,21 @@ export const emailRouter = createTRPCRouter({
where: {
id: input.id,
},
include: {
select: {
emailEvents: {
orderBy: {
createdAt: "asc",
},
},
id: true,
createdAt: true,
latestStatus: true,
subject: true,
to: true,
from: true,
domainId: true,
text: true,
html: true,
},
});

View File

@@ -1,7 +1,7 @@
import { EmailContent } from "~/types";
import { db } from "../db";
import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses";
import { APP_SETTINGS } from "~/utils/constants";
import { UnsendApiError } from "~/server/public-api/api-error";
import { queueEmail } from "./job-service";
export async function sendEmail(
emailContent: EmailContent & { teamId: number }
@@ -15,72 +15,34 @@ export async function sendEmail(
});
if (!domain) {
throw new Error(
"Domain of from email is wrong. Use the email verified by unsend"
);
throw new UnsendApiError({
code: "BAD_REQUEST",
message:
"Domain of from email is wrong. Use the email verified by unsend",
});
}
if (domain.status !== "SUCCESS") {
throw new Error("Domain is not verified");
}
const messageId = attachments
? await sendEmailWithAttachments({
to,
from,
subject,
text,
html,
region: domain.region,
configurationSetName: getConfigurationSetName(
domain.clickTracking,
domain.openTracking
),
attachments,
})
: await sendEmailThroughSes({
to,
from,
subject,
text,
html,
region: domain.region,
configurationSetName: getConfigurationSetName(
domain.clickTracking,
domain.openTracking
),
attachments,
});
if (messageId) {
return await db.email.create({
data: {
to,
from,
subject,
text,
html,
sesEmailId: messageId,
teamId,
domainId: domain.id,
},
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Domain is not verified",
});
}
}
function getConfigurationSetName(
clickTracking: boolean,
openTracking: boolean
) {
if (clickTracking && openTracking) {
return APP_SETTINGS.SES_CONFIGURATION_FULL;
}
if (clickTracking) {
return APP_SETTINGS.SES_CONFIGURATION_CLICK_TRACKING;
}
if (openTracking) {
return APP_SETTINGS.SES_CONFIGURATION_OPEN_TRACKING;
}
const email = await db.email.create({
data: {
to,
from,
subject,
text,
html,
teamId,
domainId: domain.id,
attachments: attachments ? JSON.stringify(attachments) : undefined,
},
});
return APP_SETTINGS.SES_CONFIGURATION_GENERAL;
queueEmail(email.id);
return email;
}

View File

@@ -0,0 +1,95 @@
import pgBoss from "pg-boss";
import { env } from "~/env";
import { EmailAttachment, EmailContent } from "~/types";
import { db } from "../db";
import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses";
import { getConfigurationSetName } from "~/utils/ses-utils";
const boss = new pgBoss({
connectionString: env.DATABASE_URL,
archiveCompletedAfterSeconds: 60 * 60 * 24, // 24 hours
deleteAfterDays: 7, // 7 days
});
let started = false;
async function getBoss() {
if (!started) {
await boss.start();
await boss.work(
"send_email",
{
teamConcurrency: env.SES_QUEUE_LIMIT,
teamSize: env.SES_QUEUE_LIMIT,
teamRefill: true,
},
executeEmail
);
started = true;
}
return boss;
}
export async function queueEmail(emailId: string) {
const boss = await getBoss();
await boss.send("send_email", { emailId, timestamp: Date.now() });
}
async function executeEmail(
job: pgBoss.Job<{ emailId: string; timestamp: number }>
) {
console.log(
`[EmailJob]: Executing email job ${job.data.emailId}, time elapsed: ${Date.now() - job.data.timestamp}ms`
);
const email = await db.email.findUnique({
where: { id: job.data.emailId },
});
const domain = email?.domainId
? await db.domain.findUnique({
where: { id: email?.domainId },
})
: null;
if (!email) {
console.log(`[EmailJob]: Email not found, skipping`);
return;
}
const attachments: Array<EmailAttachment> = email.attachments
? JSON.parse(email.attachments)
: [];
const messageId = attachments.length
? await sendEmailWithAttachments({
to: email.to,
from: email.from,
subject: email.subject,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName: getConfigurationSetName(
domain?.clickTracking ?? false,
domain?.openTracking ?? false
),
attachments,
})
: await sendEmailThroughSes({
to: email.to,
from: email.from,
subject: email.subject,
text: email.text ?? undefined,
html: email.html ?? undefined,
region: domain?.region ?? env.AWS_DEFAULT_REGION,
configurationSetName: getConfigurationSetName(
domain?.clickTracking ?? false,
domain?.openTracking ?? false
),
attachments,
});
await db.email.update({
where: { id: email.id },
data: { sesEmailId: messageId, attachments: undefined },
});
}

View File

@@ -4,8 +4,10 @@ export type EmailContent = {
subject: string;
text?: string;
html?: string;
attachments?: {
filename: string;
content: string;
}[];
attachments?: Array<EmailAttachment>;
};
export type EmailAttachment = {
filename: string;
content: string;
};

View File

@@ -0,0 +1,18 @@
import { APP_SETTINGS } from "./constants";
export function getConfigurationSetName(
clickTracking: boolean,
openTracking: boolean
) {
if (clickTracking && openTracking) {
return APP_SETTINGS.SES_CONFIGURATION_FULL;
}
if (clickTracking) {
return APP_SETTINGS.SES_CONFIGURATION_CLICK_TRACKING;
}
if (openTracking) {
return APP_SETTINGS.SES_CONFIGURATION_OPEN_TRACKING;
}
return APP_SETTINGS.SES_CONFIGURATION_GENERAL;
}

162
pnpm-lock.yaml generated
View File

@@ -154,6 +154,9 @@ importers:
next-auth:
specifier: ^4.24.6
version: 4.24.7(next@14.2.1)(react-dom@18.2.0)(react@18.2.0)
pg-boss:
specifier: ^9.0.3
version: 9.0.3
pnpm:
specifier: ^8.15.5
version: 8.15.5
@@ -3601,6 +3604,14 @@ packages:
resolution: {integrity: sha512-j5QzrmsokwWWp6kUcJQySpbG+xfOBqqKnup3OIk1pz+kB/80SLorZ9V8zHFLO92Lcd+hbvq8bT+zOGoPkmBV0Q==}
dev: false
/aggregate-error@3.1.0:
resolution: {integrity: sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==}
engines: {node: '>=8'}
dependencies:
clean-stack: 2.2.0
indent-string: 4.0.0
dev: false
/ajv@6.12.6:
resolution: {integrity: sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==}
dependencies:
@@ -3971,6 +3982,11 @@ packages:
escape-string-regexp: 1.0.5
dev: true
/clean-stack@2.2.0:
resolution: {integrity: sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==}
engines: {node: '>=6'}
dev: false
/client-only@0.0.1:
resolution: {integrity: sha512-IV3Ou0jSMzZrd3pZ48nLkT9DA7Ag1pnPzaiQhpW7c3RbcqqzvzzVu+L8gfqMp/8IM2MQtSiqaCxrrcfu8I8rMA==}
dev: false
@@ -4037,6 +4053,13 @@ packages:
is-what: 4.1.16
dev: false
/cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
dependencies:
luxon: 3.4.4
dev: false
/cross-spawn@7.0.3:
resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==}
engines: {node: '>= 8'}
@@ -4186,6 +4209,11 @@ packages:
object-keys: 1.1.1
dev: true
/delay@5.0.0:
resolution: {integrity: sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==}
engines: {node: '>=10'}
dev: false
/dequal@2.0.3:
resolution: {integrity: sha512-0je+qPKHEMohvfRTCEo3CrPG6cAzAYgmzKyxRiYSSDkS6eGJdyVJm7WaYA5ECaAD9wLB2T4EEeymA5aFVcYXCA==}
engines: {node: '>=6'}
@@ -5397,7 +5425,6 @@ packages:
/indent-string@4.0.0:
resolution: {integrity: sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==}
engines: {node: '>=8'}
dev: true
/inflight@1.0.6:
resolution: {integrity: sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==}
@@ -5807,6 +5834,10 @@ packages:
p-locate: 5.0.0
dev: true
/lodash.debounce@4.0.8:
resolution: {integrity: sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==}
dev: false
/lodash.merge@4.6.2:
resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==}
dev: true
@@ -5851,6 +5882,11 @@ packages:
react: 18.2.0
dev: false
/luxon@3.4.4:
resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==}
engines: {node: '>=12'}
dev: false
/merge2@1.4.1:
resolution: {integrity: sha512-8q7VEgMJW4J8tcfVPy8g09NcQwZdbwFEqhe/WZkoIzjn/3TGDwtOCYtXGxA3O8tPzpczCCDgv+P2P5y00ZJOOg==}
engines: {node: '>= 8'}
@@ -6211,6 +6247,13 @@ packages:
p-limit: 3.1.0
dev: true
/p-map@4.0.0:
resolution: {integrity: sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==}
engines: {node: '>=10'}
dependencies:
aggregate-error: 3.1.0
dev: false
/p-try@2.2.0:
resolution: {integrity: sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==}
engines: {node: '>=6'}
@@ -6273,6 +6316,83 @@ packages:
engines: {node: '>=8'}
dev: true
/pg-boss@9.0.3:
resolution: {integrity: sha512-cUWUiv3sr563yNy0nCZ25Tv5U0m59Y9MhX/flm0vTR012yeVCrqpfboaZP4xFOQPdWipMJpuu4g94HR0SncTgw==}
engines: {node: '>=16'}
dependencies:
cron-parser: 4.9.0
delay: 5.0.0
lodash.debounce: 4.0.8
p-map: 4.0.0
pg: 8.11.5
serialize-error: 8.1.0
uuid: 9.0.1
transitivePeerDependencies:
- pg-native
dev: false
/pg-cloudflare@1.1.1:
resolution: {integrity: sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==}
requiresBuild: true
dev: false
optional: true
/pg-connection-string@2.6.4:
resolution: {integrity: sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==}
dev: false
/pg-int8@1.0.1:
resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==}
engines: {node: '>=4.0.0'}
dev: false
/pg-pool@3.6.2(pg@8.11.5):
resolution: {integrity: sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==}
peerDependencies:
pg: '>=8.0'
dependencies:
pg: 8.11.5
dev: false
/pg-protocol@1.6.1:
resolution: {integrity: sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==}
dev: false
/pg-types@2.2.0:
resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==}
engines: {node: '>=4'}
dependencies:
pg-int8: 1.0.1
postgres-array: 2.0.0
postgres-bytea: 1.0.0
postgres-date: 1.0.7
postgres-interval: 1.2.0
dev: false
/pg@8.11.5:
resolution: {integrity: sha512-jqgNHSKL5cbDjFlHyYsCXmQDrfIX/3RsNwYqpd4N0Kt8niLuNoRNH+aazv6cOd43gPh9Y4DjQCtb+X0MH0Hvnw==}
engines: {node: '>= 8.0.0'}
peerDependencies:
pg-native: '>=3.0.1'
peerDependenciesMeta:
pg-native:
optional: true
dependencies:
pg-connection-string: 2.6.4
pg-pool: 3.6.2(pg@8.11.5)
pg-protocol: 1.6.1
pg-types: 2.2.0
pgpass: 1.0.5
optionalDependencies:
pg-cloudflare: 1.1.1
dev: false
/pgpass@1.0.5:
resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==}
dependencies:
split2: 4.2.0
dev: false
/picocolors@1.0.0:
resolution: {integrity: sha512-1fygroTLlHu66zi26VoTDv8yRgm0Fccecssto+MhsZ0D/DGW2sm8E8AjW7NU5VVTRt5GxbeZ5qBuJr+HyLYkjQ==}
@@ -6385,6 +6505,28 @@ packages:
picocolors: 1.0.0
source-map-js: 1.1.0
/postgres-array@2.0.0:
resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==}
engines: {node: '>=4'}
dev: false
/postgres-bytea@1.0.0:
resolution: {integrity: sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==}
engines: {node: '>=0.10.0'}
dev: false
/postgres-date@1.0.7:
resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==}
engines: {node: '>=0.10.0'}
dev: false
/postgres-interval@1.2.0:
resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==}
engines: {node: '>=0.10.0'}
dependencies:
xtend: 4.0.2
dev: false
/preact-render-to-string@5.2.3(preact@10.11.3):
resolution: {integrity: sha512-aPDxUn5o3GhWdtJtW0svRC2SS/l8D9MAgo2+AWml+BhDImb27ALf04Q2d+AHqUUOc6RdSXFIBVa2gxzgMKgtZA==}
peerDependencies:
@@ -6856,6 +6998,13 @@ packages:
lru-cache: 6.0.0
dev: true
/serialize-error@8.1.0:
resolution: {integrity: sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ==}
engines: {node: '>=10'}
dependencies:
type-fest: 0.20.2
dev: false
/server-only@0.0.1:
resolution: {integrity: sha512-qepMx2JxAa5jjfzxG79yPPq+8BuFToHd1hm7kI+Z4zAq1ftQiP7HcxMhDDItrbtwVeLg/cY2JnKnrcFkmiswNA==}
dev: false
@@ -6983,6 +7132,11 @@ packages:
engines: {node: '>=12'}
dev: false
/split2@4.2.0:
resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==}
engines: {node: '>= 10.x'}
dev: false
/streamsearch@1.1.0:
resolution: {integrity: sha512-Mcc5wHehp9aXz1ax6bZUyY5afg9u2rv5cqQI3mRrYkGC8rW2hM02jWuwjtL++LS5qinSyhj2QfLyNsuc+VsExg==}
engines: {node: '>=10.0.0'}
@@ -7336,7 +7490,6 @@ packages:
/type-fest@0.20.2:
resolution: {integrity: sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==}
engines: {node: '>=10'}
dev: true
/type-fest@0.6.0:
resolution: {integrity: sha512-q+MB8nYR1KDLrgr4G5yemftpMC7/QLqVndBmEEdqzmNj5dcFOO4Oo8qlwZE3ULT3+Zim1F8Kq4cBnikNhlCMlg==}
@@ -7466,6 +7619,11 @@ packages:
hasBin: true
dev: false
/uuid@9.0.1:
resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==}
hasBin: true
dev: false
/validate-npm-package-license@3.0.4:
resolution: {integrity: sha512-DpKm2Ui/xN7/HQKCtpZxoRWBhZ9Z0kqtygG8XCgNQ8ZlDnxuQmWhj566j8fN4Cu3/JmbhsDo7fcAJq4s9h27Ew==}
dependencies: