diff --git a/apps/web/src/env.js b/apps/web/src/env.js index 758019b..f80b6bc 100644 --- a/apps/web/src/env.js +++ b/apps/web/src/env.js @@ -66,6 +66,10 @@ export const env = createEnv({ SMTP_HOST: z.string().default("smtp.usesend.com"), SMTP_USER: z.string().default("usesend"), CONTACT_BOOK_ID: z.string().optional(), + EMAIL_CLEANUP_DAYS: z + .string() + .optional() + .transform((str) => (str ? parseInt(str, 10) : undefined)), }, /** @@ -122,6 +126,7 @@ export const env = createEnv({ SMTP_HOST: process.env.SMTP_HOST, SMTP_USER: process.env.SMTP_USER, CONTACT_BOOK_ID: process.env.CONTACT_BOOK_ID, + EMAIL_CLEANUP_DAYS: process.env.EMAIL_CLEANUP_DAYS, }, /** * Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation. This is especially diff --git a/apps/web/src/instrumentation.ts b/apps/web/src/instrumentation.ts index 81d3186..8cade36 100644 --- a/apps/web/src/instrumentation.ts +++ b/apps/web/src/instrumentation.ts @@ -1,5 +1,5 @@ import { env } from "./env"; -import { isCloud } from "./utils/common"; +import { isCloud , isEmailCleanupEnabled } from "./utils/common"; let initialized = false; @@ -25,6 +25,10 @@ export async function register() { await import("~/server/jobs/usage-job"); } + if (isEmailCleanupEnabled()) { + await import("~/server/jobs/cleanup-email-bodies"); + } + const { CampaignSchedulerService } = await import( "~/server/jobs/campaign-scheduler-job" ); diff --git a/apps/web/src/server/jobs/cleanup-email-bodies.ts b/apps/web/src/server/jobs/cleanup-email-bodies.ts new file mode 100644 index 0000000..5e221d6 --- /dev/null +++ b/apps/web/src/server/jobs/cleanup-email-bodies.ts @@ -0,0 +1,73 @@ +import {Queue, Worker} from "bullmq"; +import {db} from "~/server/db"; +import {getRedis} from "~/server/redis"; +import {logger} from "../logger/log"; +import {DEFAULT_QUEUE_OPTIONS} from "../queue/queue-constants"; +import {env} from "~/env"; +import {isSelfHosted, isEmailCleanupEnabled} from "~/utils/common"; + +const CLEANUP_QUEUE_NAME = "cleanup-email-bodies"; + +const CLEANUP_CRON = "0 0 * * *"; // default: midnight UTC + +// Only initialize if self hosted and cleanup enabled +if (isSelfHosted() && isEmailCleanupEnabled()) { + const CLEANUP_DAYS = env.EMAIL_CLEANUP_DAYS!; + + /** + * Initialize Queue + */ + const cleanupQueue = new Queue(CLEANUP_QUEUE_NAME, { + connection: getRedis(), + }); + + const worker = new Worker( + CLEANUP_QUEUE_NAME, + async () => { + logger.info(`[Cleanup] Starting cleanup for emails older than ${CLEANUP_DAYS} days...`); + + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - CLEANUP_DAYS); + + const result = await db.email.updateMany({ + where: { + createdAt: {lt: cutoffDate}, + OR: [ + {text: {not: null}}, + {html: {not: null}}, + ], + }, + data: { + text: null, + html: null, + }, + }); + + logger.info(`[Cleanup] Emails cleaned: ${result.count}`); + }, + { + connection: getRedis(), + } + ); + + await cleanupQueue.upsertJobScheduler( + "scheduled-email-cleanup", + { + pattern: CLEANUP_CRON, + tz: "UTC", + }, + { + opts: { + ...DEFAULT_QUEUE_OPTIONS, + }, + } + ); + + worker.on("completed", (job) => { + logger.info({jobId: job.id}, ` Email Body cleanup job completed`); + }); + + worker.on("failed", (job, err) => { + logger.error({err, jobId: job?.id}, `Email Body cleanup job failed`); + }); +} diff --git a/apps/web/src/utils/common.ts b/apps/web/src/utils/common.ts index d4da6d1..4e332dd 100644 --- a/apps/web/src/utils/common.ts +++ b/apps/web/src/utils/common.ts @@ -1,9 +1,17 @@ -import { env } from "~/env"; +import {env} from "~/env"; export function isCloud() { - return env.NEXT_PUBLIC_IS_CLOUD; + return env.NEXT_PUBLIC_IS_CLOUD; } export function isSelfHosted() { - return !isCloud(); + return !isCloud(); +} + +export function isEmailCleanupEnabled() { + const days = env.EMAIL_CLEANUP_DAYS; + if (days === undefined || isNaN(days) || days <= 0) { + return false; + } + return true; }