Add schedule api (#60)

This commit is contained in:
KM Koushik
2024-08-21 17:19:11 +10:00
committed by GitHub
parent 1e66222425
commit 9b54fc1793
21 changed files with 920 additions and 86 deletions

View File

@@ -4,6 +4,7 @@ import { z } from "zod";
import { createTRPCRouter, teamProcedure } from "~/server/api/trpc";
import { db } from "~/server/db";
import { cancelEmail, updateEmail } from "~/server/service/email-service";
const statuses = Object.values(EmailStatus) as [EmailStatus];
@@ -39,6 +40,7 @@ export const emailRouter = createTRPCRouter({
latestStatus: true,
subject: true,
to: true,
scheduledAt: true,
},
orderBy: {
createdAt: "desc",
@@ -187,9 +189,22 @@ export const emailRouter = createTRPCRouter({
domainId: true,
text: true,
html: true,
scheduledAt: true,
},
});
return email;
}),
cancelEmail: teamProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ input }) => {
await cancelEmail(input.id);
}),
updateEmailScheduledAt: teamProcedure
.input(z.object({ id: z.string(), scheduledAt: z.string().datetime() }))
.mutation(async ({ input }) => {
await updateEmail(input.id, { scheduledAt: input.scheduledAt });
}),
});

View File

@@ -0,0 +1,46 @@
import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { getTeamFromToken } from "~/server/public-api/auth";
import { cancelEmail } from "~/server/service/email-service";
const route = createRoute({
method: "post",
path: "/v1/emails/{emailId}/cancel",
request: {
params: z.object({
emailId: z
.string()
.min(3)
.openapi({
param: {
name: "emailId",
in: "path",
},
example: "cuiwqdj74rygf74",
}),
}),
},
responses: {
200: {
content: {
"application/json": {
schema: z.object({ emailId: z.string().optional() }),
},
},
description: "Retrieve the user",
},
},
});
function cancelScheduledEmail(app: PublicAPIApp) {
app.openapi(route, async (c) => {
await getTeamFromToken(c);
const emailId = c.req.param("emailId");
await cancelEmail(emailId);
return c.json({ emailId });
});
}
export default cancelScheduledEmail;

View File

@@ -28,6 +28,7 @@ const route = createRoute({
})
)
.optional(),
scheduledAt: z.string().datetime().optional(),
}),
},
},

View File

@@ -0,0 +1,58 @@
import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { getTeamFromToken } from "~/server/public-api/auth";
import { updateEmail } from "~/server/service/email-service";
const route = createRoute({
method: "patch",
path: "/v1/emails/{emailId}",
request: {
params: z.object({
emailId: z
.string()
.min(3)
.openapi({
param: {
name: "emailId",
in: "path",
},
example: "cuiwqdj74rygf74",
}),
}),
body: {
required: true,
content: {
"application/json": {
schema: z.object({
scheduledAt: z.string().datetime(),
}),
},
},
},
},
responses: {
200: {
content: {
"application/json": {
schema: z.object({ emailId: z.string().optional() }),
},
},
description: "Retrieve the user",
},
},
});
function updateEmailScheduledAt(app: PublicAPIApp) {
app.openapi(route, async (c) => {
await getTeamFromToken(c);
const emailId = c.req.param("emailId");
await updateEmail(emailId, {
scheduledAt: c.req.valid("json").scheduledAt,
});
return c.json({ emailId });
});
}
export default updateEmailScheduledAt;

View File

@@ -5,6 +5,8 @@ import getEmail from "./api/emails/get-email";
import addContact from "./api/contacts/add-contact";
import updateContactInfo from "./api/contacts/update-contact";
import getContact from "./api/contacts/get-contact";
import updateEmailScheduledAt from "./api/emails/update-email";
import cancelScheduledEmail from "./api/emails/cancel-email";
export const app = getApp();
@@ -14,6 +16,8 @@ getDomains(app);
/**Email related APIs */
getEmail(app);
sendEmail(app);
updateEmailScheduledAt(app);
cancelScheduledEmail(app);
/**Contact related APIs */
addContact(app);

View File

@@ -41,13 +41,6 @@ export class EmailQueueService {
);
const marketingQuota = quota - transactionalQuota;
console.log(
"is transactional queue",
this.transactionalQueue.has(region),
"is marketing queue",
this.marketingQueue.has(region)
);
if (this.transactionalQueue.has(region)) {
console.log(
`[EmailQueueService]: Updating transactional quota for region ${region} to ${transactionalQuota}`
@@ -98,7 +91,8 @@ export class EmailQueueService {
emailId: string,
region: string,
transactional: boolean,
unsubUrl?: string
unsubUrl?: string,
delay?: number
) {
if (!this.initialized) {
await this.init();
@@ -109,7 +103,56 @@ export class EmailQueueService {
if (!queue) {
throw new Error(`Queue for region ${region} not found`);
}
queue.add("send-email", { emailId, timestamp: Date.now(), unsubUrl });
queue.add(
emailId,
{ emailId, timestamp: Date.now(), unsubUrl },
{ jobId: emailId, delay }
);
}
public static async changeDelay(
emailId: string,
region: string,
transactional: boolean,
delay: number
) {
if (!this.initialized) {
await this.init();
}
const queue = transactional
? this.transactionalQueue.get(region)
: this.marketingQueue.get(region);
if (!queue) {
throw new Error(`Queue for region ${region} not found`);
}
const job = await queue.getJob(emailId);
if (!job) {
throw new Error(`Job ${emailId} not found`);
}
await job.changeDelay(delay);
}
public static async chancelEmail(
emailId: string,
region: string,
transactional: boolean
) {
if (!this.initialized) {
await this.init();
}
const queue = transactional
? this.transactionalQueue.get(region)
: this.marketingQueue.get(region);
if (!queue) {
throw new Error(`Queue for region ${region} not found`);
}
const job = await queue.getJob(emailId);
if (!job) {
throw new Error(`Job ${emailId} not found`);
}
await job.remove();
}
public static async init() {

View File

@@ -3,9 +3,32 @@ import { db } from "../db";
import { UnsendApiError } from "~/server/public-api/api-error";
import { EmailQueueService } from "./email-queue-service";
import { validateDomainFromEmail } from "./domain-service";
import { Campaign, Contact } from "@prisma/client";
import { EmailRenderer } from "@unsend/email-editor/src/renderer";
import { createUnsubUrl } from "./campaign-service";
async function checkIfValidEmail(emailId: string) {
const email = await db.email.findUnique({
where: { id: emailId },
});
if (!email || !email.domainId) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Email not found",
});
}
const domain = await db.domain.findUnique({
where: { id: email.domainId },
});
if (!domain) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Email not found",
});
}
return { email, domain };
}
/**
Send transactional email
@@ -24,10 +47,16 @@ export async function sendEmail(
replyTo,
cc,
bcc,
scheduledAt,
} = emailContent;
const domain = await validateDomainFromEmail(from, teamId);
const scheduledAtDate = scheduledAt ? new Date(scheduledAt) : undefined;
const delay = scheduledAtDate
? Math.max(0, scheduledAtDate.getTime() - Date.now())
: undefined;
const email = await db.email.create({
data: {
to: Array.isArray(to) ? to : [to],
@@ -45,11 +74,19 @@ export async function sendEmail(
teamId,
domainId: domain.id,
attachments: attachments ? JSON.stringify(attachments) : undefined,
scheduledAt: scheduledAtDate,
latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED",
},
});
try {
await EmailQueueService.queueEmail(email.id, domain.region, true);
await EmailQueueService.queueEmail(
email.id,
domain.region,
true,
undefined,
delay
);
} catch (error: any) {
await db.emailEvent.create({
data: {
@@ -69,3 +106,62 @@ export async function sendEmail(
return email;
}
export async function updateEmail(
emailId: string,
{
scheduledAt,
}: {
scheduledAt?: string;
}
) {
const { email, domain } = await checkIfValidEmail(emailId);
if (email.latestStatus !== "SCHEDULED") {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Email already processed",
});
}
const scheduledAtDate = scheduledAt ? new Date(scheduledAt) : undefined;
const delay = scheduledAtDate
? Math.max(0, scheduledAtDate.getTime() - Date.now())
: undefined;
await db.email.update({
where: { id: emailId },
data: {
scheduledAt: scheduledAtDate,
},
});
await EmailQueueService.changeDelay(emailId, domain.region, true, delay ?? 0);
}
export async function cancelEmail(emailId: string) {
const { email, domain } = await checkIfValidEmail(emailId);
if (email.latestStatus !== "SCHEDULED") {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Email already processed",
});
}
await EmailQueueService.chancelEmail(emailId, domain.region, true);
await db.email.update({
where: { id: emailId },
data: {
latestStatus: "CANCELLED",
},
});
await db.emailEvent.create({
data: {
emailId,
status: "CANCELLED",
},
});
}

View File

@@ -38,7 +38,7 @@ export async function parseSesHook(data: SesEvent) {
await db.$executeRaw`
UPDATE "Email"
SET "latestStatus" = CASE
WHEN ${mailStatus}::text::\"EmailStatus\" > "latestStatus" OR "latestStatus" IS NULL
WHEN ${mailStatus}::text::\"EmailStatus\" > "latestStatus" OR "latestStatus" IS NULL OR "latestStatus" = 'SCHEDULED'::\"EmailStatus\"
THEN ${mailStatus}::text::\"EmailStatus\"
ELSE "latestStatus"
END