feat: batch campaigns (#227)

This commit is contained in:
KM Koushik
2025-10-12 22:43:16 +11:00
committed by GitHub
parent 159b15e37e
commit e631f16c85
22 changed files with 13574 additions and 6314 deletions
+96 -38
View File
@@ -11,10 +11,7 @@ import {
} from "~/server/api/trpc";
import { logger } from "~/server/logger/log";
import { nanoid } from "~/server/nanoid";
import {
sendCampaign,
subscribeContact,
} from "~/server/service/campaign-service";
import * as campaignService from "~/server/service/campaign-service";
import { validateDomainFromEmail } from "~/server/service/domain-service";
import {
getDocumentUploadUrl,
@@ -29,10 +26,10 @@ export const campaignRouter = createTRPCRouter({
z.object({
page: z.number().optional(),
status: z.enum(statuses).optional().nullable(),
}),
search: z.string().optional().nullable(),
})
)
.query(async ({ ctx: { db, team }, input }) => {
let completeTime = performance.now();
const page = input.page || 1;
const limit = 30;
const offset = (page - 1) * limit;
@@ -45,6 +42,23 @@ export const campaignRouter = createTRPCRouter({
whereConditions.status = input.status;
}
if (input.search) {
whereConditions.OR = [
{
name: {
contains: input.search,
mode: "insensitive",
},
},
{
subject: {
contains: input.search,
mode: "insensitive",
},
},
];
}
const countP = db.campaign.count({ where: whereConditions });
const campaignsP = db.campaign.findMany({
@@ -57,6 +71,11 @@ export const campaignRouter = createTRPCRouter({
createdAt: true,
updatedAt: true,
status: true,
scheduledAt: true,
total: true,
sent: true,
delivered: true,
unsubscribed: true,
},
orderBy: {
createdAt: "desc",
@@ -64,19 +83,8 @@ export const campaignRouter = createTRPCRouter({
skip: offset,
take: limit,
});
let time = performance.now();
campaignsP.then((campaigns) => {
logger.info(
`Time taken to get campaigns: ${performance.now() - time} milliseconds`,
);
});
const [campaigns, count] = await Promise.all([campaignsP, countP]);
logger.info(
{ duration: performance.now() - completeTime },
`Time taken to complete request`,
);
return { campaigns, totalPage: Math.ceil(count / limit) };
}),
@@ -87,7 +95,7 @@ export const campaignRouter = createTRPCRouter({
name: z.string(),
from: z.string(),
subject: z.string(),
}),
})
)
.mutation(async ({ ctx: { db, team }, input }) => {
const domain = await validateDomainFromEmail(input.from, team.id);
@@ -113,7 +121,7 @@ export const campaignRouter = createTRPCRouter({
content: z.string().optional(),
contactBookId: z.string().optional(),
replyTo: z.string().array().optional(),
}),
})
)
.mutation(async ({ ctx: { db, team, campaign: campaignOld }, input }) => {
const { campaignId, ...data } = input;
@@ -155,14 +163,9 @@ export const campaignRouter = createTRPCRouter({
return campaign;
}),
deleteCampaign: campaignProcedure.mutation(
async ({ ctx: { db, team }, input }) => {
const campaign = await db.campaign.delete({
where: { id: input.campaignId, teamId: team.id },
});
return campaign;
},
),
deleteCampaign: campaignProcedure.mutation(async ({ input }) => {
return await campaignService.deleteCampaign(input.campaignId);
}),
getCampaign: campaignProcedure.query(async ({ ctx: { db, team }, input }) => {
const campaign = await db.campaign.findUnique({
@@ -191,10 +194,31 @@ export const campaignRouter = createTRPCRouter({
};
}),
sendCampaign: campaignProcedure.mutation(
async ({ ctx: { db, team }, input }) => {
await sendCampaign(input.campaignId);
},
latestEmails: campaignProcedure.query(
async ({ ctx: { db, team, campaign } }) => {
const emails = await db.email.findMany({
where: {
teamId: team.id,
campaignId: campaign.id,
},
orderBy: [
{ updatedAt: "desc" },
{ createdAt: "desc" },
],
take: 10,
select: {
id: true,
subject: true,
to: true,
latestStatus: true,
createdAt: true,
updatedAt: true,
scheduledAt: true,
},
});
return emails;
}
),
reSubscribeContact: publicProcedure
@@ -202,14 +226,14 @@ export const campaignRouter = createTRPCRouter({
z.object({
id: z.string(),
hash: z.string(),
}),
})
)
.mutation(async ({ ctx: { db }, input }) => {
await subscribeContact(input.id, input.hash);
.mutation(async ({ input }) => {
await campaignService.subscribeContact(input.id, input.hash);
}),
duplicateCampaign: campaignProcedure.mutation(
async ({ ctx: { db, team, campaign }, input }) => {
async ({ ctx: { db, team, campaign } }) => {
const newCampaign = await db.campaign.create({
data: {
name: `${campaign.name} (Copy)`,
@@ -223,15 +247,49 @@ export const campaignRouter = createTRPCRouter({
});
return newCampaign;
},
}
),
scheduleCampaign: campaignProcedure
.input(
z.object({
campaignId: z.string(),
scheduledAt: z.union([z.string().datetime(), z.date()]).optional(),
batchSize: z.number().min(1).max(100_000).optional(),
})
)
.mutation(async ({ ctx: { team }, input }) => {
await campaignService.scheduleCampaign({
campaignId: input.campaignId,
teamId: team.id,
scheduledAt: input.scheduledAt,
batchSize: input.batchSize,
});
return { ok: true };
}),
pauseCampaign: campaignProcedure.mutation(async ({ ctx: { campaign } }) => {
await campaignService.pauseCampaign({
campaignId: campaign.id,
teamId: campaign.teamId,
});
return { ok: true };
}),
resumeCampaign: campaignProcedure.mutation(async ({ ctx: { campaign } }) => {
await campaignService.resumeCampaign({
campaignId: campaign.id,
teamId: campaign.teamId,
});
return { ok: true };
}),
generateImagePresignedUrl: campaignProcedure
.input(
z.object({
name: z.string(),
type: z.string(),
}),
})
)
.mutation(async ({ ctx: { team }, input }) => {
const extension = input.name.split(".").pop();
@@ -239,7 +297,7 @@ export const campaignRouter = createTRPCRouter({
const url = await getDocumentUploadUrl(
`${team.id}/${randomName}`,
input.type,
input.type
);
const imageUrl = `${env.S3_COMPATIBLE_PUBLIC_URL}/${team.id}/${randomName}`;
@@ -0,0 +1,104 @@
import { Queue, Worker } from "bullmq";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
import {
CAMPAIGN_SCHEDULER_QUEUE,
DEFAULT_QUEUE_OPTIONS,
} from "../queue/queue-constants";
import { getRedis } from "../redis";
import { CampaignBatchService } from "../service/campaign-service";
import { db } from "../db";
import { logger } from "../logger/log";
const SCHEDULER_TICK_MS = 1500;
type SchedulerJob = TeamJob<{}>;
export class CampaignSchedulerService {
private static schedulerQueue = new Queue<SchedulerJob>(
CAMPAIGN_SCHEDULER_QUEUE,
{
connection: getRedis(),
}
);
static worker = new Worker(
CAMPAIGN_SCHEDULER_QUEUE,
createWorkerHandler(async (_job: SchedulerJob) => {
try {
const now = new Date();
const campaigns = await db.campaign.findMany({
where: {
status: { in: ["SCHEDULED", "RUNNING"] },
OR: [{ scheduledAt: null }, { scheduledAt: { lte: now } }],
},
select: {
id: true,
teamId: true,
lastSentAt: true,
batchWindowMinutes: true,
},
});
const enqueuePromises: Promise<any>[] = [];
for (const c of campaigns) {
const windowMin = c.batchWindowMinutes ?? 0;
if (windowMin > 0 && c.lastSentAt) {
const elapsedMs = now.getTime() - new Date(c.lastSentAt).getTime();
const windowMs = windowMin * 60 * 1000;
if (elapsedMs < windowMs) {
const remainingMs = windowMs - elapsedMs;
logger.debug(
{ campaignId: c.id, remainingMs, windowMs },
"Skip queueing batch; window not elapsed"
);
continue;
}
}
enqueuePromises.push(
CampaignBatchService.queueBatch({
campaignId: c.id,
teamId: c.teamId,
}).catch((err) => {
logger.error(
{ err, campaignId: c.id },
"Failed to enqueue campaign batch"
);
})
);
}
if (enqueuePromises.length > 0) {
const results = await Promise.allSettled(enqueuePromises);
const rejected = results.filter(
(r) => r.status === "rejected"
).length;
const fulfilled = results.length - rejected;
logger.debug(
{ total: results.length, fulfilled, rejected },
"Scheduler enqueue summary"
);
}
} catch (err) {
logger.error({ err }, "Campaign scheduler tick failed");
}
}),
{ connection: getRedis(), concurrency: 1 }
);
static async start() {
try {
await this.schedulerQueue.add(
"tick",
{},
{
jobId: "campaign-scheduler",
repeat: { every: SCHEDULER_TICK_MS },
...DEFAULT_QUEUE_OPTIONS,
}
);
} catch (err) {
// Adding the same repeatable job is idempotent; ignore job-exists errors
logger.info({ err }, "Scheduler start attempted");
}
}
}
@@ -1,6 +1,8 @@
export const SES_WEBHOOK_QUEUE = "ses-webhook";
export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing";
export const CONTACT_BULK_ADD_QUEUE = "contact-bulk-add";
export const CAMPAIGN_BATCH_QUEUE = "campaign-batch";
export const CAMPAIGN_SCHEDULER_QUEUE = "campaign-scheduler";
export const DEFAULT_QUEUE_OPTIONS = {
removeOnComplete: true,
+367 -123
View File
@@ -8,17 +8,17 @@ import {
EmailStatus,
UnsubscribeReason,
} from "@prisma/client";
import { validateDomainFromEmail } from "./domain-service";
import { EmailQueueService } from "./email-queue-service";
import { Queue, Worker } from "bullmq";
import { getRedis } from "../redis";
import {
CAMPAIGN_MAIL_PROCESSING_QUEUE,
CAMPAIGN_BATCH_QUEUE,
DEFAULT_QUEUE_OPTIONS,
} from "../queue/queue-constants";
import { logger } from "../logger/log";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
import { SuppressionService } from "./suppression-service";
import { UnsendApiError } from "../public-api/api-error";
const CAMPAIGN_UNSUB_PLACEHOLDER_TOKENS = [
"{{unsend_unsubscribe_url}}",
@@ -57,21 +57,6 @@ export async function sendCampaign(id: string) {
throw new Error("No contact book found for campaign");
}
const contactBook = await db.contactBook.findUnique({
where: { id: campaign.contactBookId },
include: {
contacts: {
where: {
subscribed: true,
},
},
},
});
if (!contactBook) {
throw new Error("Contact book not found");
}
if (!campaign.html) {
throw new Error("No HTML content for campaign");
}
@@ -83,27 +68,194 @@ export async function sendCampaign(id: string) {
);
if (!unsubPlaceholderFound) {
throw new Error(
"Campaign must include an unsubscribe link before sending"
);
throw new Error("Campaign must include an unsubscribe link before sending");
}
await sendCampaignEmail(campaign, {
campaignId: campaign.id,
from: campaign.from,
subject: campaign.subject,
html: campaign.html,
replyTo: campaign.replyTo,
cc: campaign.cc,
bcc: campaign.bcc,
teamId: campaign.teamId,
contacts: contactBook.contacts,
// Count subscribed contacts for total, don't load all into memory
const total = await db.contact.count({
where: { contactBookId: campaign.contactBookId, subscribed: true },
});
// Mark as scheduled (or keep running if already running), set totals and scheduledAt if not set
await db.campaign.update({
where: { id },
data: { status: "SENT", total: contactBook.contacts.length },
data: {
status: "SCHEDULED",
total,
scheduledAt: campaign.scheduledAt ?? new Date(),
lastCursor: campaign.lastCursor ?? null,
},
});
// Kick off first batch immediately (idempotent by jobId)
await CampaignBatchService.queueBatch({
campaignId: id,
teamId: campaign.teamId,
});
}
export async function scheduleCampaign({
campaignId,
teamId,
scheduledAt: scheduledAtInput,
batchSize,
}: {
campaignId: string;
teamId: number;
scheduledAt?: Date | string;
batchSize?: number;
}) {
let campaign = await db.campaign.findUnique({
where: { id: campaignId, teamId },
});
if (!campaign) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Campaign not found",
});
}
if (!campaign.content) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "No content added for campaign",
});
}
// Parse & render HTML (idempotent) similar to sendCampaign
try {
const jsonContent = JSON.parse(campaign.content);
const renderer = new EmailRenderer(jsonContent);
const html = await renderer.render();
campaign = await db.campaign.update({
where: { id: campaign.id },
data: { html },
});
} catch (err) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Invalid content",
});
}
if (!campaign.contactBookId) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "No contact book found for campaign",
});
}
if (!campaign.html) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "No HTML content for campaign",
});
}
const unsubPlaceholderFound = CAMPAIGN_UNSUB_PLACEHOLDER_TOKENS.some(
(placeholder) =>
campaign.content?.includes(placeholder) ||
campaign.html?.includes(placeholder)
);
if (!unsubPlaceholderFound) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Campaign must include an unsubscribe link before scheduling",
});
}
// Count subscribed contacts for total
const total = await db.contact.count({
where: { contactBookId: campaign.contactBookId, subscribed: true },
});
if (total === 0) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "No subscribed contacts to send",
});
}
const scheduledAt = scheduledAtInput
? scheduledAtInput instanceof Date
? scheduledAtInput
: new Date(scheduledAtInput)
: new Date();
const shouldResetCursor =
campaign.status === "DRAFT" || campaign.status === "SENT";
await db.campaign.update({
where: { id: campaign.id },
data: {
status: "SCHEDULED",
scheduledAt,
total,
...(batchSize ? { batchSize } : {}),
...(shouldResetCursor ? { lastCursor: null } : {}),
},
});
return { ok: true };
}
export async function pauseCampaign({
campaignId,
teamId,
}: {
campaignId: string;
teamId: number;
}) {
const campaign = await db.campaign.findUnique({
where: { id: campaignId, teamId },
});
if (!campaign) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Campaign not found",
});
}
await db.campaign.update({
where: { id: campaignId },
data: { status: "PAUSED" },
});
return { ok: true };
}
export async function resumeCampaign({
campaignId,
teamId,
}: {
campaignId: string;
teamId: number;
}) {
const campaign = await db.campaign.findUnique({
where: { id: campaignId, teamId },
});
if (!campaign) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Campaign not found",
});
}
if (campaign.scheduledAt && campaign.scheduledAt.getTime() > Date.now()) {
await db.campaign.update({
where: { id: campaignId },
data: { status: "SCHEDULED" },
});
} else {
await db.campaign.update({
where: { id: campaignId },
data: { status: "RUNNING" },
});
}
return { ok: true };
}
export function createUnsubUrl(contactId: string, campaignId: string) {
@@ -242,18 +394,21 @@ export async function subscribeContact(id: string, hash: string) {
}
}
type CampainEmail = {
campaignId: string;
from: string;
subject: string;
html: string;
previewText?: string;
replyTo?: string[];
cc?: string[];
bcc?: string[];
teamId: number;
contacts: Array<Contact>;
};
export async function deleteCampaign(id: string) {
const campaign = await db.$transaction(async (tx) => {
await tx.campaignEmail.deleteMany({
where: { campaignId: id },
});
const campaign = await tx.campaign.delete({
where: { id },
});
return campaign;
});
return campaign;
}
type CampaignEmailJob = {
contact: Contact;
@@ -272,8 +427,6 @@ type CampaignEmailJob = {
};
};
type QueueCampaignEmailJob = TeamJob<CampaignEmailJob>;
async function processContactEmail(jobData: CampaignEmailJob) {
const { contact, campaign, emailConfig } = jobData;
const jsonContent = JSON.parse(campaign.content || "{}");
@@ -367,6 +520,18 @@ async function processContactEmail(jobData: CampaignEmailJob) {
},
});
try {
await db.campaignEmail.create({
data: {
campaignId: emailConfig.campaignId,
contactId: contact.id,
emailId: email.id,
},
});
} catch (error) {
logger.error({ err: error }, "Failed to create campaign email record");
}
return;
}
@@ -413,6 +578,22 @@ async function processContactEmail(jobData: CampaignEmailJob) {
},
});
try {
await db.campaignEmail.create({
data: {
campaignId: emailConfig.campaignId,
contactId: contact.id,
emailId: email.id,
},
});
} catch (error) {
logger.error(
{ err: error },
"Failed to create campaign email record so skipping email sending"
);
return;
}
// Queue email for sending
await EmailQueueService.queueEmail(
email.id,
@@ -423,50 +604,6 @@ async function processContactEmail(jobData: CampaignEmailJob) {
);
}
export async function sendCampaignEmail(
campaign: Campaign,
emailData: CampainEmail
) {
const {
campaignId,
from,
subject,
replyTo,
cc,
bcc,
teamId,
contacts,
previewText,
} = emailData;
const domain = await validateDomainFromEmail(from, teamId);
logger.info("Bulk queueing contacts");
await CampaignEmailService.queueBulkContacts(
contacts.map((contact) => ({
contact,
campaign,
emailConfig: {
from,
subject,
replyTo: replyTo
? Array.isArray(replyTo)
? replyTo
: [replyTo]
: undefined,
cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined,
bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined,
teamId,
campaignId,
previewText,
domainId: domain.id,
region: domain.region,
},
}))
);
}
export async function updateCampaignAnalytics(
campaignId: string,
emailStatus: EmailStatus,
@@ -514,51 +651,158 @@ export async function updateCampaignAnalytics(
});
}
const CAMPAIGN_EMAIL_CONCURRENCY = 50;
// ---------------------------
// Simple campaign batch queue
// ---------------------------
class CampaignEmailService {
private static campaignQueue = new Queue<QueueCampaignEmailJob>(
CAMPAIGN_MAIL_PROCESSING_QUEUE,
type CampaignBatchJob = TeamJob<{ campaignId: string }>;
export class CampaignBatchService {
private static batchQueue = new Queue<CampaignBatchJob>(
CAMPAIGN_BATCH_QUEUE,
{
connection: getRedis(),
}
);
// TODO: Add team context to job data when queueing
static worker = new Worker(
CAMPAIGN_MAIL_PROCESSING_QUEUE,
createWorkerHandler(async (job: QueueCampaignEmailJob) => {
await processContactEmail(job.data);
CAMPAIGN_BATCH_QUEUE,
createWorkerHandler(async (job: CampaignBatchJob) => {
const { campaignId } = job.data;
const campaign = await db.campaign.findUnique({
where: { id: campaignId },
});
if (!campaign) return;
if (!campaign.contactBookId) return;
// Skip paused campaigns
if (campaign.status === "PAUSED") return;
// Respect scheduledAt if set
if (campaign.scheduledAt && campaign.scheduledAt.getTime() > Date.now())
return;
// First touch moves SCHEDULED -> RUNNING
if (campaign.status === "SCHEDULED") {
await db.campaign.update({
where: { id: campaignId },
data: { status: "RUNNING" },
});
}
const batchSize = campaign.batchSize ?? 500;
const where = {
contactBookId: campaign.contactBookId,
subscribed: true,
} as const;
const pagination: any = {
take: batchSize,
orderBy: { id: "asc" as const },
};
if (campaign.lastCursor) {
pagination.cursor = { id: campaign.lastCursor };
pagination.skip = 1; // do not include the cursor row
}
const contacts = await db.contact.findMany({ where, ...pagination });
if (contacts.length === 0) {
// No more contacts -> mark SENT
await db.campaign.update({
where: { id: campaignId },
data: { status: "SENT" },
});
return;
}
// Fetch domain for region and id
const domain = await db.domain.findUnique({
where: { id: campaign.domainId },
});
if (!domain) return;
// Bulk existence check to avoid duplicates while unique is not enforced
const existing = await db.campaignEmail.findMany({
where: {
campaignId: campaign.id,
contactId: { in: contacts.map((c) => c.id) },
},
select: { contactId: true },
});
const existingSet = new Set(existing.map((e) => e.contactId));
// Process each contact in this batch
for (const contact of contacts) {
if (existingSet.has(contact.id)) continue;
await processContactEmail({
contact,
campaign,
emailConfig: {
from: campaign.from,
subject: campaign.subject,
replyTo: Array.isArray(campaign.replyTo) ? campaign.replyTo : [],
cc: Array.isArray(campaign.cc) ? campaign.cc : [],
bcc: Array.isArray(campaign.bcc) ? campaign.bcc : [],
teamId: campaign.teamId,
campaignId: campaign.id,
previewText: campaign.previewText ?? undefined,
domainId: domain.id,
region: domain.region,
},
});
}
// Advance cursor and timestamp
const newCursor = contacts[contacts.length - 1]?.id;
await db.campaign.update({
where: { id: campaignId },
data: { lastCursor: newCursor, lastSentAt: new Date() },
});
}),
{
connection: getRedis(),
concurrency: CAMPAIGN_EMAIL_CONCURRENCY,
}
{ connection: getRedis(), concurrency: 20 }
);
static async queueContact(data: CampaignEmailJob) {
return await this.campaignQueue.add(
`contact-${data.contact.id}`,
{
...data,
teamId: data.emailConfig.teamId,
},
DEFAULT_QUEUE_OPTIONS
);
}
static async queueBatch({
campaignId,
teamId,
}: {
campaignId: string;
teamId?: number;
}) {
// Defensive check: avoid enqueue if window not elapsed (scheduler already enforces)
try {
const campaign = await db.campaign.findUnique({
where: { id: campaignId },
select: { lastSentAt: true, batchWindowMinutes: true, status: true },
});
if (!campaign) return;
if (campaign.status === "PAUSED" || campaign.status === "SENT") return;
const windowMin = campaign.batchWindowMinutes ?? 0;
if (windowMin > 0 && campaign.lastSentAt) {
const elapsedMs = Date.now() - new Date(campaign.lastSentAt).getTime();
const windowMs = windowMin * 60 * 1000;
if (elapsedMs < windowMs) {
logger.debug(
{ campaignId, remainingMs: windowMs - elapsedMs },
"Defensive skip enqueue; window not elapsed"
);
return;
}
}
} catch (err) {
logger.warn(
{ err, campaignId },
"Failed defensive window check; proceeding to enqueue"
);
}
static async queueBulkContacts(data: CampaignEmailJob[]) {
return await this.campaignQueue.addBulk(
data.map((item) => ({
name: `contact-${item.contact.id}`,
data: {
...item,
teamId: item.emailConfig.teamId,
},
opts: {
...DEFAULT_QUEUE_OPTIONS,
},
}))
await this.batchQueue.add(
`campaign-${campaignId}`,
{ campaignId, teamId },
{ jobId: `campaign-batch:${campaignId}`, ...DEFAULT_QUEUE_OPTIONS }
);
}
}