queue bulk contacts (#273)

This commit is contained in:
KM Koushik
2025-10-12 06:18:04 +11:00
committed by GitHub
parent 2fe2d5cdab
commit 159b15e37e
7 changed files with 150 additions and 17 deletions
+4 -1
View File
@@ -41,4 +41,7 @@ yarn-error.log*
prod_db.tar prod_db.tar
bin bin
__pycache__ __pycache__
memory-profiles
memory-baseline.json
memory-analysis-report.html
+1 -1
View File
@@ -27,7 +27,7 @@
- Formatting: Prettier 3; run `pnpm format`. - Formatting: Prettier 3; run `pnpm format`.
- Files: React components PascalCase (e.g., `AppSideBar.tsx`); folders kebab/lowercase. - Files: React components PascalCase (e.g., `AppSideBar.tsx`); folders kebab/lowercase.
- Paths (web): use alias `~/` for src imports (e.g., `import { x } from "~/utils/x"`). - Paths (web): use alias `~/` for src imports (e.g., `import { x } from "~/utils/x"`).
- Never use dynamic imports - NEVER USE DYNAMIC IMPORTS. ALWAYS IMPORT ON THE TOP
## Rules ## Rules
@@ -66,7 +66,7 @@ export default function AddContact({
onSuccess: async () => { onSuccess: async () => {
utils.contacts.contacts.invalidate(); utils.contacts.contacts.invalidate();
setOpen(false); setOpen(false);
toast.success("Contacts added successfully"); toast.success("Contacts queued for processing");
}, },
onError: async (error) => { onError: async (error) => {
toast.error(error.message); toast.error(error.message);
+13 -9
View File
@@ -20,7 +20,7 @@ export const contactsRouter = createTRPCRouter({
.input( .input(
z.object({ z.object({
name: z.string(), name: z.string(),
}) }),
) )
.mutation(async ({ ctx: { team }, input }) => { .mutation(async ({ ctx: { team }, input }) => {
const { name } = input; const { name } = input;
@@ -38,7 +38,7 @@ export const contactsRouter = createTRPCRouter({
unsubscribedContacts, unsubscribedContacts,
campaigns, campaigns,
}; };
} },
), ),
updateContactBook: contactBookProcedure updateContactBook: contactBookProcedure
@@ -48,7 +48,7 @@ export const contactsRouter = createTRPCRouter({
name: z.string().optional(), name: z.string().optional(),
properties: z.record(z.string()).optional(), properties: z.record(z.string()).optional(),
emoji: z.string().optional(), emoji: z.string().optional(),
}) }),
) )
.mutation(async ({ ctx: { contactBook }, input }) => { .mutation(async ({ ctx: { contactBook }, input }) => {
const { contactBookId, ...data } = input; const { contactBookId, ...data } = input;
@@ -67,7 +67,7 @@ export const contactsRouter = createTRPCRouter({
page: z.number().optional(), page: z.number().optional(),
subscribed: z.boolean().optional(), subscribed: z.boolean().optional(),
search: z.string().optional(), search: z.string().optional(),
}) }),
) )
.query(async ({ ctx: { db }, input }) => { .query(async ({ ctx: { db }, input }) => {
const page = input.page || 1; const page = input.page || 1;
@@ -126,12 +126,16 @@ export const contactsRouter = createTRPCRouter({
lastName: z.string().optional(), lastName: z.string().optional(),
properties: z.record(z.string()).optional(), properties: z.record(z.string()).optional(),
subscribed: z.boolean().optional(), subscribed: z.boolean().optional(),
}) }),
), ),
}) }),
) )
.mutation(async ({ ctx: { contactBook }, input }) => { .mutation(async ({ ctx: { contactBook, team }, input }) => {
return contactService.bulkAddContacts(contactBook.id, input.contacts); return contactService.bulkAddContacts(
contactBook.id,
input.contacts,
team.id,
);
}), }),
updateContact: contactBookProcedure updateContact: contactBookProcedure
@@ -143,7 +147,7 @@ export const contactsRouter = createTRPCRouter({
lastName: z.string().optional(), lastName: z.string().optional(),
properties: z.record(z.string()).optional(), properties: z.record(z.string()).optional(),
subscribed: z.boolean().optional(), subscribed: z.boolean().optional(),
}) }),
) )
.mutation(async ({ input }) => { .mutation(async ({ input }) => {
const { contactId, ...contact } = input; const { contactId, ...contact } = input;
@@ -1,5 +1,6 @@
export const SES_WEBHOOK_QUEUE = "ses-webhook"; export const SES_WEBHOOK_QUEUE = "ses-webhook";
export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing"; export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing";
export const CONTACT_BULK_ADD_QUEUE = "contact-bulk-add";
export const DEFAULT_QUEUE_OPTIONS = { export const DEFAULT_QUEUE_OPTIONS = {
removeOnComplete: true, removeOnComplete: true,
@@ -0,0 +1,122 @@
import { Queue, Worker } from "bullmq";
import { getRedis } from "../redis";
import {
DEFAULT_QUEUE_OPTIONS,
CONTACT_BULK_ADD_QUEUE,
} from "../queue/queue-constants";
import { logger } from "../logger/log";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
import { addOrUpdateContact, ContactInput } from "./contact-service";
type ContactJobData = {
contactBookId: string;
contact: ContactInput;
teamId?: number;
};
type ContactJob = TeamJob<ContactJobData>;
class ContactQueueService {
public static queue = new Queue<ContactJobData>(CONTACT_BULK_ADD_QUEUE, {
connection: getRedis(),
defaultJobOptions: DEFAULT_QUEUE_OPTIONS,
});
public static worker = new Worker(
CONTACT_BULK_ADD_QUEUE,
createWorkerHandler(processContactJob),
{
connection: getRedis(),
concurrency: 20,
},
);
static {
this.worker.on("error", (err) => {
logger.error({ err }, "[ContactQueueService]: Worker error");
});
logger.info("[ContactQueueService]: Initialized contact queue service");
}
public static async addContactJob(
contactBookId: string,
contact: ContactInput,
teamId?: number,
delay?: number,
) {
await this.queue.add(
`add-contact-${contact.email}`,
{
contactBookId,
contact,
teamId,
},
{
delay,
...DEFAULT_QUEUE_OPTIONS,
},
);
}
public static async addBulkContactJobs(
contactBookId: string,
contacts: ContactInput[],
teamId?: number,
) {
const jobs = contacts.map((contact) => ({
name: `add-contact-${contact.email}`,
data: {
contactBookId,
contact,
teamId,
},
opts: DEFAULT_QUEUE_OPTIONS,
}));
await this.queue.addBulk(jobs);
logger.info(
{ count: contacts.length, contactBookId },
"[ContactQueueService]: Added bulk contact jobs to queue",
);
}
public static async getQueueStats() {
const waiting = await this.queue.getWaiting();
const active = await this.queue.getActive();
const completed = await this.queue.getCompleted();
const failed = await this.queue.getFailed();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
};
}
}
async function processContactJob(job: ContactJob) {
const { contactBookId, contact } = job.data;
logger.info(
{ contactEmail: contact.email, contactBookId },
"[ContactQueueService]: Processing contact job",
);
try {
await addOrUpdateContact(contactBookId, contact);
logger.info(
{ contactEmail: contact.email },
"[ContactQueueService]: Successfully processed contact job",
);
} catch (error) {
logger.error(
{ contactEmail: contact.email, error },
"[ContactQueueService]: Failed to process contact job",
);
throw error;
}
}
export { ContactQueueService };
@@ -1,4 +1,5 @@
import { db } from "../db"; import { db } from "../db";
import { ContactQueueService } from "./contact-queue-service";
export type ContactInput = { export type ContactInput = {
email: string; email: string;
@@ -60,13 +61,15 @@ export async function deleteContact(contactId: string) {
export async function bulkAddContacts( export async function bulkAddContacts(
contactBookId: string, contactBookId: string,
contacts: Array<ContactInput> contacts: Array<ContactInput>,
teamId?: number
) { ) {
const createdContacts = await Promise.all( await ContactQueueService.addBulkContactJobs(contactBookId, contacts, teamId);
contacts.map((contact) => addOrUpdateContact(contactBookId, contact))
);
return createdContacts; return {
message: `Queued ${contacts.length} contacts for processing`,
count: contacts.length,
};
} }
export async function unsubscribeContact(contactId: string) { export async function unsubscribeContact(contactId: string) {