diff --git a/apps/web/prisma/migrations/20250201051912_add_indexes/migration.sql b/apps/web/prisma/migrations/20250201051912_add_indexes/migration.sql new file mode 100644 index 0000000..9c02e93 --- /dev/null +++ b/apps/web/prisma/migrations/20250201051912_add_indexes/migration.sql @@ -0,0 +1,8 @@ +-- CreateIndex +CREATE INDEX "Campaign_createdAt_idx" ON "Campaign"("createdAt" DESC); + +-- CreateIndex +CREATE INDEX "Email_createdAt_idx" ON "Email"("createdAt" DESC); + +-- CreateIndex +CREATE INDEX "EmailEvent_emailId_idx" ON "EmailEvent"("emailId"); diff --git a/apps/web/prisma/migrations/20250201131024_add_daily_usage/migration.sql b/apps/web/prisma/migrations/20250201131024_add_daily_usage/migration.sql new file mode 100644 index 0000000..761b7dc --- /dev/null +++ b/apps/web/prisma/migrations/20250201131024_add_daily_usage/migration.sql @@ -0,0 +1,22 @@ +-- CreateEnum +CREATE TYPE "EmailUsageType" AS ENUM ('TRANSACTIONAL', 'MARKETING'); + +-- CreateTable +CREATE TABLE "DailyEmailUsage" ( + "teamId" INTEGER NOT NULL, + "date" TEXT NOT NULL, + "type" "EmailUsageType" NOT NULL, + "domainId" INTEGER NOT NULL, + "delivered" INTEGER NOT NULL, + "opened" INTEGER NOT NULL, + "clicked" INTEGER NOT NULL, + "bounced" INTEGER NOT NULL, + "complained" INTEGER NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "DailyEmailUsage_pkey" PRIMARY KEY ("teamId","domainId","date","type") +); + +-- AddForeignKey +ALTER TABLE "DailyEmailUsage" ADD CONSTRAINT "DailyEmailUsage_teamId_fkey" FOREIGN KEY ("teamId") REFERENCES "Team"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/apps/web/prisma/migrations/20250201131959_add_sent_to_daily_usage/migration.sql b/apps/web/prisma/migrations/20250201131959_add_sent_to_daily_usage/migration.sql new file mode 100644 index 0000000..fcdcaa4 --- /dev/null +++ b/apps/web/prisma/migrations/20250201131959_add_sent_to_daily_usage/migration.sql @@ -0,0 +1,7 @@ +-- AlterTable +ALTER TABLE "DailyEmailUsage" ADD COLUMN "sent" INTEGER NOT NULL DEFAULT 0, +ALTER COLUMN "delivered" SET DEFAULT 0, +ALTER COLUMN "opened" SET DEFAULT 0, +ALTER COLUMN "clicked" SET DEFAULT 0, +ALTER COLUMN "bounced" SET DEFAULT 0, +ALTER COLUMN "complained" SET DEFAULT 0; diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma index 870085e..5594c93 100644 --- a/apps/web/prisma/schema.prisma +++ b/apps/web/prisma/schema.prisma @@ -91,16 +91,17 @@ model User { } model Team { - id Int @id @default(autoincrement()) - name String - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - teamUsers TeamUser[] - domains Domain[] - apiKeys ApiKey[] - emails Email[] - contactBooks ContactBook[] - campaigns Campaign[] + id Int @id @default(autoincrement()) + name String + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + teamUsers TeamUser[] + domains Domain[] + apiKeys ApiKey[] + emails Email[] + contactBooks ContactBook[] + campaigns Campaign[] + dailyEmailUsages DailyEmailUsage[] } enum Role { @@ -203,6 +204,8 @@ model Email { contactId String? team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) emailEvents EmailEvent[] + + @@index([createdAt(sort: Desc)]) } model EmailEvent { @@ -212,6 +215,8 @@ model EmailEvent { data Json? createdAt DateTime @default(now()) email Email @relation(fields: [emailId], references: [id], onDelete: Cascade) + + @@index([emailId]) } model ContactBook { @@ -277,4 +282,30 @@ model Campaign { updatedAt DateTime @updatedAt team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + + @@index([createdAt(sort: Desc)]) +} + +enum EmailUsageType { + TRANSACTIONAL + MARKETING +} + +model DailyEmailUsage { + teamId Int + date String + type EmailUsageType + domainId Int + sent Int @default(0) + delivered Int @default(0) + opened Int @default(0) + clicked Int @default(0) + bounced Int @default(0) + complained Int @default(0) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + + @@id([teamId, domainId, date, type]) } diff --git a/apps/web/src/app/(dashboard)/dashboard/dashboard-chart.tsx b/apps/web/src/app/(dashboard)/dashboard/dashboard-chart.tsx index 8e7d3ab..bea16f8 100644 --- a/apps/web/src/app/(dashboard)/dashboard/dashboard-chart.tsx +++ b/apps/web/src/app/(dashboard)/dashboard/dashboard-chart.tsx @@ -43,42 +43,47 @@ export default function DashboardChart() { <> @@ -108,7 +113,7 @@ export default function DashboardChart() { - + { const data = payload?.[0]?.payload as Record< - EmailStatus, + | "sent" + | "delivered" + | "opened" + | "clicked" + | "bounced" + | "complained", number - > & { name: string }; + > & { date: string }; - if ( - !data || - (!data.BOUNCED && - !data.COMPLAINED && - !data.DELIVERED && - !data.OPENED && - !data.CLICKED) - ) - return null; + if (!data || data.sent === 0) return null; return (

- {data.name} + {data.date}

- {data.DELIVERED ? ( + {data.delivered ? (

Delivered

- {data.DELIVERED} + {data.delivered}

) : null} - {data.BOUNCED ? ( + {data.bounced ? (

Bounced

-

{data.BOUNCED}

+

{data.bounced}

) : null} - {data.COMPLAINED ? ( + {data.complained ? (

Complained

- {data.COMPLAINED} + {data.complained}

) : null} - {data.OPENED ? ( + {data.opened ? (

Opened

-

{data.OPENED}

+

{data.opened}

) : null} - {data.CLICKED ? ( + {data.clicked ? (

Clicked

-

{data.CLICKED}

+

{data.clicked}

) : null}
@@ -198,14 +200,14 @@ export default function DashboardChart() { {/* */} - - - - + + + +
@@ -237,7 +239,7 @@ const DashboardItemCard: React.FC = ({ {count} {status !== "total" ? ( -
{percentage}%
+
{(percentage * 100).toFixed(0)}%
) : null} diff --git a/apps/web/src/app/api/ses_callback/route.ts b/apps/web/src/app/api/ses_callback/route.ts index 672d4ac..7ea0623 100644 --- a/apps/web/src/app/api/ses_callback/route.ts +++ b/apps/web/src/app/api/ses_callback/route.ts @@ -1,5 +1,5 @@ import { db } from "~/server/db"; -import { parseSesHook } from "~/server/service/ses-hook-parser"; +import { parseSesHook, SesHookParser } from "~/server/service/ses-hook-parser"; import { SesSettingsService } from "~/server/service/ses-settings-service"; import { SnsNotificationMessage } from "~/types/aws-types"; @@ -16,7 +16,7 @@ export async function POST(req: Request) { const isEventValid = await checkEventValidity(data); - console.log("isEventValid: ", isEventValid); + console.log("Is event valid: ", isEventValid); if (!isEventValid) { return Response.json({ data: "Event is not valid" }); @@ -30,7 +30,10 @@ export async function POST(req: Request) { try { message = JSON.parse(data.Message || "{}"); - const status = await parseSesHook(message); + const status = await SesHookParser.queue({ + event: message, + messageId: data.MessageId, + }); console.log("Error is parsing hook", !status); if (!status) { return Response.json({ data: "Error is parsing hook" }); @@ -43,6 +46,9 @@ export async function POST(req: Request) { } } +/** + * Handles the subscription confirmation event. called only once for a webhook + */ async function handleSubscription(message: any) { await fetch(message.SubscribeURL, { method: "GET", @@ -73,7 +79,9 @@ async function handleSubscription(message: any) { return Response.json({ data: "Success" }); } -// A simple check to ensure that the event is from the correct topic +/** + * A simple check to ensure that the event is from the correct topic + */ async function checkEventValidity(message: SnsNotificationMessage) { const { TopicArn } = message; const configuredTopicArn = await SesSettingsService.getTopicArns(); diff --git a/apps/web/src/server/api/routers/email.ts b/apps/web/src/server/api/routers/email.ts index d1b3a0a..7beedf6 100644 --- a/apps/web/src/server/api/routers/email.ts +++ b/apps/web/src/server/api/routers/email.ts @@ -68,108 +68,85 @@ export const emailRouter = createTRPCRouter({ .query(async ({ ctx, input }) => { const { team } = ctx; const days = input.days !== 7 ? 30 : 7; - const daysInMs = days * 24 * 60 * 60 * 1000; - const rawEmailStatusCounts = await db.email.findMany({ - where: { - teamId: team.id, - createdAt: { - gt: new Date(Date.now() - daysInMs), - }, - }, - select: { - latestStatus: true, - createdAt: true, - }, - }); + const startDate = new Date(); + startDate.setDate(startDate.getDate() - days); + const isoStartDate = startDate.toISOString().split("T")[0]; - const totalCount = rawEmailStatusCounts.length; + type DailyEmailUsage = { + date: string; + sent: number; + delivered: number; + opened: number; + clicked: number; + bounced: number; + complained: number; + }; - const emailStatusCounts = rawEmailStatusCounts.reduce( - (acc, cur) => { - acc[cur.latestStatus] = { - count: (acc[cur.latestStatus]?.count || 0) + 1, - percentage: Number( - ( - (((acc[cur.latestStatus]?.count || 0) + 1) / totalCount) * - 100 - ).toFixed(0) - ), - }; + const result = await db.$queryRaw>` + SELECT + date, + sent, + delivered, + opened, + clicked, + bounced, + complained + FROM "DailyEmailUsage" + WHERE "teamId" = ${team.id} + AND "date" >= ${isoStartDate} + ORDER BY "date" ASC + `; + + // Fill in any missing dates with 0 values + const filledResult: DailyEmailUsage[] = []; + const endDateObj = new Date(); + + for (let i = days; i > -1; i--) { + const dateStr = subDays(endDateObj, i) + .toISOString() + .split("T")[0] as string; + const existingData = result.find((r) => r.date === dateStr); + + if (existingData) { + filledResult.push({ + ...existingData, + date: format(dateStr, "MMM dd"), + }); + } else { + filledResult.push({ + date: format(dateStr, "MMM dd"), + sent: 0, + delivered: 0, + opened: 0, + clicked: 0, + bounced: 0, + complained: 0, + }); + } + } + + const totalCounts = result.reduce( + (acc, curr) => { + acc.sent += curr.sent; + acc.delivered += curr.delivered; + acc.opened += curr.opened; + acc.clicked += curr.clicked; + acc.bounced += curr.bounced; + acc.complained += curr.complained; return acc; }, { - DELIVERED: { count: 0, percentage: 0 }, - COMPLAINED: { count: 0, percentage: 0 }, - OPENED: { count: 0, percentage: 0 }, - CLICKED: { count: 0, percentage: 0 }, - BOUNCED: { count: 0, percentage: 0 }, - } as Record + sent: 0, + delivered: 0, + opened: 0, + clicked: 0, + bounced: 0, + complained: 0, + } ); - const dateRecord: Record< - string, - Record< - "DELIVERED" | "COMPLAINED" | "OPENED" | "CLICKED" | "BOUNCED", - number - > - > = {}; - - const currentDate = new Date(); - - for (let i = 0; i < (input.days || 7); i++) { - const actualDate = subDays(currentDate, i); - dateRecord[format(actualDate, "MMM dd")] = { - DELIVERED: 0, - COMPLAINED: 0, - OPENED: 0, - CLICKED: 0, - BOUNCED: 0, - }; - } - - const _emailDailyStatusCounts = rawEmailStatusCounts.reduce( - (acc, { latestStatus, createdAt }) => { - const day = format(createdAt, "MMM dd"); - - if ( - !day || - ![ - "DELIVERED", - "COMPLAINED", - "OPENED", - "CLICKED", - "BOUNCED", - ].includes(latestStatus) - ) { - return acc; - } - - if (!acc[day]) { - return acc; - } - - acc[day]![ - latestStatus as - | "DELIVERED" - | "COMPLAINED" - | "OPENED" - | "CLICKED" - | "BOUNCED" - ]++; - return acc; - }, - dateRecord - ); - - const emailDailyStatusCounts = Object.entries(_emailDailyStatusCounts) - .reverse() - .map(([date, counts]) => ({ - name: date, - ...counts, - })); - - return { emailStatusCounts, totalCount, emailDailyStatusCounts }; + return { result: filledResult, totalCounts }; }), getEmail: emailProcedure.query(async ({ input }) => { diff --git a/apps/web/src/server/aws/ses.ts b/apps/web/src/server/aws/ses.ts index 8214b54..b7fa192 100644 --- a/apps/web/src/server/aws/ses.ts +++ b/apps/web/src/server/aws/ses.ts @@ -162,15 +162,14 @@ export async function sendEmailThroughSes({ ...(unsubUrl ? [ { Name: "List-Unsubscribe", Value: `<${unsubUrl}>` }, - { Name: "List-Unsubscribe-Post", Value: "List-Unsubscribe=One-Click" }, + { + Name: "List-Unsubscribe-Post", + Value: "List-Unsubscribe=One-Click", + }, ] - : [] - ), + : []), // Spread in the precedence header if present - ...(isBulk - ? [{ Name: "Precedence", Value: "bulk" }] - : [] - ), + ...(isBulk ? [{ Name: "Precedence", Value: "bulk" }] : []), ], }, }, @@ -216,7 +215,8 @@ export async function sendEmailWithAttachments({ rawEmail += `To: ${Array.isArray(to) ? to.join(", ") : to}\n`; rawEmail += cc && cc.length ? `Cc: ${cc.join(", ")}\n` : ""; rawEmail += bcc && bcc.length ? `Bcc: ${bcc.join(", ")}\n` : ""; - rawEmail += replyTo && replyTo.length ? `Reply-To: ${replyTo.join(", ")}\n` : ""; + rawEmail += + replyTo && replyTo.length ? `Reply-To: ${replyTo.join(", ")}\n` : ""; rawEmail += `Subject: ${subject}\n`; rawEmail += `MIME-Version: 1.0\n`; rawEmail += `Content-Type: multipart/mixed; boundary="${boundary}"\n\n`; @@ -266,7 +266,7 @@ export async function addWebhookConfiguration( configName: string, topicArn: string, eventTypes: EventType[], - region: string, + region: string ) { const sesClient = getSesClient(region); diff --git a/apps/web/src/server/service/ses-hook-parser.ts b/apps/web/src/server/service/ses-hook-parser.ts index 1baec0b..7217eb9 100644 --- a/apps/web/src/server/service/ses-hook-parser.ts +++ b/apps/web/src/server/service/ses-hook-parser.ts @@ -3,6 +3,8 @@ import { SesClick, SesEvent, SesEventDataKey } from "~/types/aws-types"; import { db } from "../db"; import { updateCampaignAnalytics } from "./campaign-service"; import { env } from "~/env"; +import { getRedis } from "../redis"; +import { Queue, Worker } from "bullmq"; export async function parseSesHook(data: SesEvent) { const mailStatus = getEmailStatus(data); @@ -45,6 +47,49 @@ export async function parseSesHook(data: SesEvent) { WHERE id = ${email.id} `; + // Update daily email usage statistics + const today = new Date().toISOString().split("T")[0] as string; // Format: YYYY-MM-DD + + if ( + [ + "DELIVERED", + "OPENED", + "CLICKED", + "BOUNCED", + "COMPLAINED", + "SENT", + ].includes(mailStatus) + ) { + const updateField = mailStatus.toLowerCase(); + + await db.dailyEmailUsage.upsert({ + where: { + teamId_domainId_date_type: { + teamId: email.teamId, + domainId: email.domainId ?? 0, + date: today, + type: email.campaignId ? "MARKETING" : "TRANSACTIONAL", + }, + }, + create: { + teamId: email.teamId, + domainId: email.domainId ?? 0, + date: today, + type: email.campaignId ? "MARKETING" : "TRANSACTIONAL", + delivered: updateField === "delivered" ? 1 : 0, + opened: updateField === "opened" ? 1 : 0, + clicked: updateField === "clicked" ? 1 : 0, + bounced: updateField === "bounced" ? 1 : 0, + complained: updateField === "complained" ? 1 : 0, + }, + update: { + [updateField]: { + increment: 1, + }, + }, + }); + } + if (email.campaignId) { if ( mailStatus !== "CLICKED" || @@ -109,3 +154,28 @@ function getEmailData(data: SesEvent) { return data[eventType.toLowerCase() as SesEventDataKey]; } } + +export class SesHookParser { + private static sesHookQueue = new Queue("ses-web-hook", { + connection: getRedis(), + }); + + private static worker = new Worker( + "ses-web-hook", + async (job) => { + await this.execute(job.data); + }, + { + connection: getRedis(), + concurrency: 200, + } + ); + + private static async execute(event: SesEvent) { + await parseSesHook(event); + } + + static async queue(data: { event: SesEvent; messageId: string }) { + return await this.sesHookQueue.add(data.messageId, data.event); + } +}