feat: add daily email usage (#97)

* add daily email usage

* remove console
This commit is contained in:
KM Koushik
2025-02-02 07:57:49 +11:00
committed by GitHub
parent 6b9696e715
commit f60c66acbe
9 changed files with 283 additions and 158 deletions

View File

@@ -3,6 +3,8 @@ import { SesClick, SesEvent, SesEventDataKey } from "~/types/aws-types";
import { db } from "../db";
import { updateCampaignAnalytics } from "./campaign-service";
import { env } from "~/env";
import { getRedis } from "../redis";
import { Queue, Worker } from "bullmq";
export async function parseSesHook(data: SesEvent) {
const mailStatus = getEmailStatus(data);
@@ -45,6 +47,49 @@ export async function parseSesHook(data: SesEvent) {
WHERE id = ${email.id}
`;
// Update daily email usage statistics
const today = new Date().toISOString().split("T")[0] as string; // Format: YYYY-MM-DD
if (
[
"DELIVERED",
"OPENED",
"CLICKED",
"BOUNCED",
"COMPLAINED",
"SENT",
].includes(mailStatus)
) {
const updateField = mailStatus.toLowerCase();
await db.dailyEmailUsage.upsert({
where: {
teamId_domainId_date_type: {
teamId: email.teamId,
domainId: email.domainId ?? 0,
date: today,
type: email.campaignId ? "MARKETING" : "TRANSACTIONAL",
},
},
create: {
teamId: email.teamId,
domainId: email.domainId ?? 0,
date: today,
type: email.campaignId ? "MARKETING" : "TRANSACTIONAL",
delivered: updateField === "delivered" ? 1 : 0,
opened: updateField === "opened" ? 1 : 0,
clicked: updateField === "clicked" ? 1 : 0,
bounced: updateField === "bounced" ? 1 : 0,
complained: updateField === "complained" ? 1 : 0,
},
update: {
[updateField]: {
increment: 1,
},
},
});
}
if (email.campaignId) {
if (
mailStatus !== "CLICKED" ||
@@ -109,3 +154,28 @@ function getEmailData(data: SesEvent) {
return data[eventType.toLowerCase() as SesEventDataKey];
}
}
export class SesHookParser {
private static sesHookQueue = new Queue("ses-web-hook", {
connection: getRedis(),
});
private static worker = new Worker(
"ses-web-hook",
async (job) => {
await this.execute(job.data);
},
{
connection: getRedis(),
concurrency: 200,
}
);
private static async execute(event: SesEvent) {
await parseSesHook(event);
}
static async queue(data: { event: SesEvent; messageId: string }) {
return await this.sesHookQueue.add(data.messageId, data.event);
}
}