diff --git a/.gitignore b/.gitignore index e86311d..c2b5d84 100644 --- a/.gitignore +++ b/.gitignore @@ -41,4 +41,7 @@ yarn-error.log* prod_db.tar bin -__pycache__ \ No newline at end of file +__pycache__ +memory-profiles +memory-baseline.json +memory-analysis-report.html \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md index e6d4dca..7ee8585 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,7 +27,7 @@ - Formatting: Prettier 3; run `pnpm format`. - Files: React components PascalCase (e.g., `AppSideBar.tsx`); folders kebab/lowercase. - Paths (web): use alias `~/` for src imports (e.g., `import { x } from "~/utils/x"`). -- Never use dynamic imports +- NEVER USE DYNAMIC IMPORTS. ALWAYS IMPORT ON THE TOP ## Rules diff --git a/apps/web/src/app/(dashboard)/contacts/[contactBookId]/add-contact.tsx b/apps/web/src/app/(dashboard)/contacts/[contactBookId]/add-contact.tsx index 27326a2..638f3b1 100644 --- a/apps/web/src/app/(dashboard)/contacts/[contactBookId]/add-contact.tsx +++ b/apps/web/src/app/(dashboard)/contacts/[contactBookId]/add-contact.tsx @@ -66,7 +66,7 @@ export default function AddContact({ onSuccess: async () => { utils.contacts.contacts.invalidate(); setOpen(false); - toast.success("Contacts added successfully"); + toast.success("Contacts queued for processing"); }, onError: async (error) => { toast.error(error.message); diff --git a/apps/web/src/server/api/routers/contacts.ts b/apps/web/src/server/api/routers/contacts.ts index 8bb2abc..44964e3 100644 --- a/apps/web/src/server/api/routers/contacts.ts +++ b/apps/web/src/server/api/routers/contacts.ts @@ -20,7 +20,7 @@ export const contactsRouter = createTRPCRouter({ .input( z.object({ name: z.string(), - }) + }), ) .mutation(async ({ ctx: { team }, input }) => { const { name } = input; @@ -38,7 +38,7 @@ export const contactsRouter = createTRPCRouter({ unsubscribedContacts, campaigns, }; - } + }, ), updateContactBook: contactBookProcedure @@ -48,7 +48,7 @@ export const contactsRouter = createTRPCRouter({ name: z.string().optional(), properties: z.record(z.string()).optional(), emoji: z.string().optional(), - }) + }), ) .mutation(async ({ ctx: { contactBook }, input }) => { const { contactBookId, ...data } = input; @@ -67,7 +67,7 @@ export const contactsRouter = createTRPCRouter({ page: z.number().optional(), subscribed: z.boolean().optional(), search: z.string().optional(), - }) + }), ) .query(async ({ ctx: { db }, input }) => { const page = input.page || 1; @@ -126,12 +126,16 @@ export const contactsRouter = createTRPCRouter({ lastName: z.string().optional(), properties: z.record(z.string()).optional(), subscribed: z.boolean().optional(), - }) + }), ), - }) + }), ) - .mutation(async ({ ctx: { contactBook }, input }) => { - return contactService.bulkAddContacts(contactBook.id, input.contacts); + .mutation(async ({ ctx: { contactBook, team }, input }) => { + return contactService.bulkAddContacts( + contactBook.id, + input.contacts, + team.id, + ); }), updateContact: contactBookProcedure @@ -143,7 +147,7 @@ export const contactsRouter = createTRPCRouter({ lastName: z.string().optional(), properties: z.record(z.string()).optional(), subscribed: z.boolean().optional(), - }) + }), ) .mutation(async ({ input }) => { const { contactId, ...contact } = input; diff --git a/apps/web/src/server/queue/queue-constants.ts b/apps/web/src/server/queue/queue-constants.ts index e00ce95..91d7fc9 100644 --- a/apps/web/src/server/queue/queue-constants.ts +++ b/apps/web/src/server/queue/queue-constants.ts @@ -1,5 +1,6 @@ export const SES_WEBHOOK_QUEUE = "ses-webhook"; export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing"; +export const CONTACT_BULK_ADD_QUEUE = "contact-bulk-add"; export const DEFAULT_QUEUE_OPTIONS = { removeOnComplete: true, diff --git a/apps/web/src/server/service/contact-queue-service.ts b/apps/web/src/server/service/contact-queue-service.ts new file mode 100644 index 0000000..3ed6ef0 --- /dev/null +++ b/apps/web/src/server/service/contact-queue-service.ts @@ -0,0 +1,122 @@ +import { Queue, Worker } from "bullmq"; +import { getRedis } from "../redis"; +import { + DEFAULT_QUEUE_OPTIONS, + CONTACT_BULK_ADD_QUEUE, +} from "../queue/queue-constants"; +import { logger } from "../logger/log"; +import { createWorkerHandler, TeamJob } from "../queue/bullmq-context"; +import { addOrUpdateContact, ContactInput } from "./contact-service"; + +type ContactJobData = { + contactBookId: string; + contact: ContactInput; + teamId?: number; +}; + +type ContactJob = TeamJob; + +class ContactQueueService { + public static queue = new Queue(CONTACT_BULK_ADD_QUEUE, { + connection: getRedis(), + defaultJobOptions: DEFAULT_QUEUE_OPTIONS, + }); + + public static worker = new Worker( + CONTACT_BULK_ADD_QUEUE, + createWorkerHandler(processContactJob), + { + connection: getRedis(), + concurrency: 20, + }, + ); + + static { + this.worker.on("error", (err) => { + logger.error({ err }, "[ContactQueueService]: Worker error"); + }); + + logger.info("[ContactQueueService]: Initialized contact queue service"); + } + + public static async addContactJob( + contactBookId: string, + contact: ContactInput, + teamId?: number, + delay?: number, + ) { + await this.queue.add( + `add-contact-${contact.email}`, + { + contactBookId, + contact, + teamId, + }, + { + delay, + ...DEFAULT_QUEUE_OPTIONS, + }, + ); + } + + public static async addBulkContactJobs( + contactBookId: string, + contacts: ContactInput[], + teamId?: number, + ) { + const jobs = contacts.map((contact) => ({ + name: `add-contact-${contact.email}`, + data: { + contactBookId, + contact, + teamId, + }, + opts: DEFAULT_QUEUE_OPTIONS, + })); + + await this.queue.addBulk(jobs); + logger.info( + { count: contacts.length, contactBookId }, + "[ContactQueueService]: Added bulk contact jobs to queue", + ); + } + + public static async getQueueStats() { + const waiting = await this.queue.getWaiting(); + const active = await this.queue.getActive(); + const completed = await this.queue.getCompleted(); + const failed = await this.queue.getFailed(); + + return { + waiting: waiting.length, + active: active.length, + completed: completed.length, + failed: failed.length, + }; + } +} + +async function processContactJob(job: ContactJob) { + const { contactBookId, contact } = job.data; + + logger.info( + { contactEmail: contact.email, contactBookId }, + "[ContactQueueService]: Processing contact job", + ); + + try { + await addOrUpdateContact(contactBookId, contact); + logger.info( + { contactEmail: contact.email }, + "[ContactQueueService]: Successfully processed contact job", + ); + } catch (error) { + logger.error( + { contactEmail: contact.email, error }, + "[ContactQueueService]: Failed to process contact job", + ); + throw error; + } +} + +export { ContactQueueService }; diff --git a/apps/web/src/server/service/contact-service.ts b/apps/web/src/server/service/contact-service.ts index d42ce73..6d5b082 100644 --- a/apps/web/src/server/service/contact-service.ts +++ b/apps/web/src/server/service/contact-service.ts @@ -1,4 +1,5 @@ import { db } from "../db"; +import { ContactQueueService } from "./contact-queue-service"; export type ContactInput = { email: string; @@ -60,13 +61,15 @@ export async function deleteContact(contactId: string) { export async function bulkAddContacts( contactBookId: string, - contacts: Array + contacts: Array, + teamId?: number ) { - const createdContacts = await Promise.all( - contacts.map((contact) => addOrUpdateContact(contactBookId, contact)) - ); + await ContactQueueService.addBulkContactJobs(contactBookId, contacts, teamId); - return createdContacts; + return { + message: `Queued ${contacts.length} contacts for processing`, + count: contacts.length, + }; } export async function unsubscribeContact(contactId: string) {