diff --git a/AGENTS.md b/AGENTS.md index 7ee8585..a5e8198 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,6 +19,7 @@ - `pnpm format`: Prettier over ts/tsx/md. - `pnpm dx` / `pnpm dx:up` / `pnpm dx:down`: Spin up/down local infra via Docker Compose, then run migrations. - Database (apps/web filter): `pnpm db:generate` | `db:migrate-dev` | `db:push` | `db:studio`. +- Never run migrations unless users explicitly asked ## Coding Style & Naming Conventions diff --git a/apps/web/package.json b/apps/web/package.json index 5ada494..231838c 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -14,7 +14,11 @@ "db:migrate-dev": "prisma migrate dev", "db:migrate-deploy": "prisma migrate deploy", "db:studio": "prisma studio", - "db:migrate-reset": "prisma migrate reset" + "db:migrate-reset": "prisma migrate reset", + "memory:monitor": "node --expose-gc scripts/memory-monitor.js", + "memory:profile": "node --expose-gc scripts/memory-profiler.js", + "memory:test": "node --expose-gc -e \"const MemoryMonitor = require('./scripts/memory-monitor'); const monitor = new MemoryMonitor(); monitor.start(1000); setTimeout(() => monitor.stop(), 30000)\"", + "memory:baseline": "node --expose-gc scripts/baseline-test.js" }, "dependencies": { "@auth/prisma-adapter": "^2.9.0", @@ -100,4 +104,4 @@ "initVersion": "7.30.0" }, "packageManager": "pnpm@8.9.2" -} \ No newline at end of file +} diff --git a/apps/web/prisma/migrations/20251006185736_campaign_batching/migration.sql b/apps/web/prisma/migrations/20251006185736_campaign_batching/migration.sql new file mode 100644 index 0000000..48358ed --- /dev/null +++ b/apps/web/prisma/migrations/20251006185736_campaign_batching/migration.sql @@ -0,0 +1,36 @@ +-- AlterEnum +-- This migration adds more than one value to an enum. +-- With PostgreSQL versions 11 and earlier, this is not possible +-- in a single migration. This can be worked around by creating +-- multiple migrations, each migration adding only one value to +-- the enum. + + +ALTER TYPE "CampaignStatus" ADD VALUE 'RUNNING'; +ALTER TYPE "CampaignStatus" ADD VALUE 'PAUSED'; + +-- AlterTable +ALTER TABLE "Campaign" ADD COLUMN "batchSize" INTEGER NOT NULL DEFAULT 500, +ADD COLUMN "batchWindowMinutes" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "lastCursor" TEXT, +ADD COLUMN "lastSentAt" TIMESTAMP(3), +ADD COLUMN "scheduledAt" TIMESTAMP(3); + +-- CreateTable +CREATE TABLE "CampaignEmail" ( + "campaignId" TEXT NOT NULL, + "contactId" TEXT NOT NULL, + "emailId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "CampaignEmail_pkey" PRIMARY KEY ("campaignId","contactId") +); + +-- CreateIndex +CREATE INDEX "Campaign_status_scheduledAt_idx" ON "Campaign"("status", "scheduledAt"); + +-- CreateIndex +CREATE INDEX "Contact_contactBookId_id_idx" ON "Contact"("contactBookId", "id"); + +-- CreateIndex +CREATE INDEX "Email_campaignId_contactId_idx" ON "Email"("campaignId", "contactId"); diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma index 3c898ce..19cc463 100644 --- a/apps/web/prisma/schema.prisma +++ b/apps/web/prisma/schema.prisma @@ -263,9 +263,19 @@ model Email { team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) emailEvents EmailEvent[] + @@index([campaignId, contactId]) @@index([createdAt(sort: Desc)]) } +model CampaignEmail { + campaignId String + contactId String + emailId String + createdAt DateTime @default(now()) + + @@id([campaignId, contactId]) +} + model EmailEvent { id String @id @default(cuid()) emailId String @@ -320,44 +330,53 @@ model Contact { contactBook ContactBook @relation(fields: [contactBookId], references: [id], onDelete: Cascade) @@unique([contactBookId, email]) + @@index([contactBookId, id]) } enum CampaignStatus { DRAFT SCHEDULED + RUNNING + PAUSED SENT } model Campaign { - id String @id @default(cuid()) - name String - teamId Int - from String - cc String[] - bcc String[] - replyTo String[] - domainId Int - subject String - previewText String? - html String? - content String? - contactBookId String? - total Int @default(0) - sent Int @default(0) - delivered Int @default(0) - opened Int @default(0) - clicked Int @default(0) - unsubscribed Int @default(0) - bounced Int @default(0) - hardBounced Int @default(0) - complained Int @default(0) - status CampaignStatus @default(DRAFT) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(cuid()) + name String + teamId Int + from String + cc String[] + bcc String[] + replyTo String[] + domainId Int + subject String + previewText String? + html String? + content String? + contactBookId String? + scheduledAt DateTime? + total Int @default(0) + sent Int @default(0) + delivered Int @default(0) + opened Int @default(0) + clicked Int @default(0) + unsubscribed Int @default(0) + bounced Int @default(0) + hardBounced Int @default(0) + complained Int @default(0) + status CampaignStatus @default(DRAFT) + batchSize Int @default(500) + batchWindowMinutes Int @default(0) + lastCursor String? + lastSentAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) @@index([createdAt(sort: Desc)]) + @@index([status, scheduledAt]) } model Template { diff --git a/apps/web/src/app/(dashboard)/campaigns/[campaignId]/edit/page.tsx b/apps/web/src/app/(dashboard)/campaigns/[campaignId]/edit/page.tsx index f08aa17..3fb1cba 100644 --- a/apps/web/src/app/(dashboard)/campaigns/[campaignId]/edit/page.tsx +++ b/apps/web/src/app/(dashboard)/campaigns/[campaignId]/edit/page.tsx @@ -41,6 +41,8 @@ import { AccordionItem, AccordionTrigger, } from "@usesend/ui/src/accordion"; +import ScheduleCampaign from "../../schedule-campaign"; +import { useRouter } from "next/navigation"; const sendSchema = z.object({ confirmation: z.string(), @@ -63,7 +65,7 @@ export default function EditCampaignPage({ { campaignId }, { enabled: !!campaignId, - }, + } ); if (isLoading) { @@ -94,11 +96,12 @@ function CampaignEditor({ }: { campaign: Campaign & { imageUploadSupported: boolean }; }) { + const router = useRouter(); const contactBooksQuery = api.contacts.getContactBooks.useQuery({}); const utils = api.useUtils(); const [json, setJson] = useState | undefined>( - campaign.content ? JSON.parse(campaign.content) : undefined, + campaign.content ? JSON.parse(campaign.content) : undefined ); const [isSaving, setIsSaving] = useState(false); const [name, setName] = useState(campaign.name); @@ -106,12 +109,11 @@ function CampaignEditor({ const [from, setFrom] = useState(campaign.from); const [contactBookId, setContactBookId] = useState(campaign.contactBookId); const [replyTo, setReplyTo] = useState( - campaign.replyTo[0], + campaign.replyTo[0] ); const [previewText, setPreviewText] = useState( - campaign.previewText, + campaign.previewText ); - const [openSendDialog, setOpenSendDialog] = useState(false); const updateCampaignMutation = api.campaign.updateCampaign.useMutation({ onSuccess: () => { @@ -119,13 +121,8 @@ function CampaignEditor({ setIsSaving(false); }, }); - const sendCampaignMutation = api.campaign.sendCampaign.useMutation(); const getUploadUrl = api.campaign.generateImagePresignedUrl.useMutation(); - const sendForm = useForm>({ - resolver: zodResolver(sendSchema), - }); - function updateEditorContent() { updateCampaignMutation.mutate({ campaignId: campaign.id, @@ -135,39 +132,13 @@ function CampaignEditor({ const deboucedUpdateCampaign = useDebouncedCallback( updateEditorContent, - 1000, + 1000 ); - async function onSendCampaign(values: z.infer) { - if ( - values.confirmation?.toLocaleLowerCase() !== "Send".toLocaleLowerCase() - ) { - sendForm.setError("confirmation", { - message: "Please type 'Send' to confirm", - }); - return; - } - - sendCampaignMutation.mutate( - { - campaignId: campaign.id, - }, - { - onSuccess: () => { - setOpenSendDialog(false); - toast.success(`Campaign sent successfully`); - }, - onError: (error) => { - toast.error(`Failed to send campaign: ${error.message}`); - }, - }, - ); - } - const handleFileChange = async (file: File) => { if (file.size > IMAGE_SIZE_LIMIT) { throw new Error( - `File should be less than ${IMAGE_SIZE_LIMIT / 1024 / 1024}MB`, + `File should be less than ${IMAGE_SIZE_LIMIT / 1024 / 1024}MB` ); } @@ -191,10 +162,8 @@ function CampaignEditor({ return imageUrl; }; - const confirmation = sendForm.watch("confirmation"); - const contactBook = contactBooksQuery.data?.find( - (book) => book.id === contactBookId, + (book) => book.id === contactBookId ); return ( @@ -220,7 +189,7 @@ function CampaignEditor({ toast.error(`${e.message}. Reverting changes.`); setName(campaign.name); }, - }, + } ); }} /> @@ -235,56 +204,13 @@ function CampaignEditor({ ? "just now" : `${formatDistanceToNow(campaign.updatedAt)} ago`} - - - - - - - Send Campaign - - Are you sure you want to send this campaign? This action - cannot be undone. - - -
-
- - ( - - Type 'Send' to confirm - - - - - - )} - /> -
- -
- - -
-
-
+ + { + router.push(`/campaigns/${campaign.id}`); + }} + /> @@ -315,7 +241,7 @@ function CampaignEditor({ toast.error(`${e.message}. Reverting changes.`); setSubject(campaign.subject); }, - }, + } ); }} className="mt-1 py-1 text-sm block w-full outline-none border-b border-transparent focus:border-border bg-transparent" @@ -350,7 +276,7 @@ function CampaignEditor({ toast.error(`${e.message}. Reverting changes.`); setFrom(campaign.from); }, - }, + } ); }} /> @@ -381,7 +307,7 @@ function CampaignEditor({ toast.error(`${e.message}. Reverting changes.`); setReplyTo(campaign.replyTo[0]); }, - }, + } ); }} /> @@ -414,7 +340,7 @@ function CampaignEditor({ toast.error(`${e.message}. Reverting changes.`); setPreviewText(campaign.previewText ?? ""); }, - }, + } ); }} className="mt-1 py-1 text-sm block w-full outline-none border-b border-transparent bg-transparent focus:border-border" @@ -440,7 +366,7 @@ function CampaignEditor({ onError: () => { setContactBookId(campaign.contactBookId); }, - }, + } ); setContactBookId(val); }} diff --git a/apps/web/src/app/(dashboard)/campaigns/[campaignId]/page.tsx b/apps/web/src/app/(dashboard)/campaigns/[campaignId]/page.tsx index 5f549e4..a668f48 100644 --- a/apps/web/src/app/(dashboard)/campaigns/[campaignId]/page.tsx +++ b/apps/web/src/app/(dashboard)/campaigns/[campaignId]/page.tsx @@ -14,6 +14,14 @@ import { H2 } from "@usesend/ui"; import Spinner from "@usesend/ui/src/spinner"; import { api } from "~/trpc/react"; import { use } from "react"; +import { CampaignStatus } from "@prisma/client"; +import { formatDistanceToNow } from "date-fns"; +import TogglePauseCampaign from "../toggle-pause-campaign"; +import CampaignStatusBadge from "../../campaigns/campaign-status-badge"; +import { Button } from "@usesend/ui/src/button"; +import { Card, CardContent, CardHeader, CardTitle } from "@usesend/ui/src/card"; +import { EmailStatusBadge } from "../../emails/email-status-badge"; +import { AnimatePresence, motion } from "framer-motion"; export default function CampaignDetailsPage({ params, @@ -22,9 +30,31 @@ export default function CampaignDetailsPage({ }) { const { campaignId } = use(params); - const { data: campaign, isLoading } = api.campaign.getCampaign.useQuery({ - campaignId: campaignId, - }); + const { data: campaign, isLoading } = api.campaign.getCampaign.useQuery( + { campaignId: campaignId }, + { + refetchInterval: (query) => { + const c: any = query.state.data; + if (!c) return false; + + if ( + c.status === CampaignStatus.RUNNING || + c.status === CampaignStatus.PAUSED + ) { + return 5000; + } + return false; + }, + } + ); + + const { data: latestEmails, isLoading: latestEmailsLoading } = + api.campaign.latestEmails.useQuery( + { campaignId: campaignId }, + { + refetchInterval: 5000, + } + ); if (isLoading) { return ( @@ -38,74 +68,181 @@ export default function CampaignDetailsPage({ return
Campaign not found
; } - const statusCards = [ + const deliveredCount = campaign.delivered ?? 0; + const openedCount = campaign.opened ?? 0; + const clickedCount = campaign.clicked ?? 0; + const unsubscribedCount = campaign.unsubscribed ?? 0; + const deliveredDenominator = deliveredCount > 0 ? deliveredCount : 0; + const percentageOfDelivered = (value: number) => + deliveredDenominator > 0 ? (value / deliveredDenominator) * 100 : 0; + + const statisticsRows = [ { status: "delivered", - count: campaign.delivered, - percentage: 100, + count: deliveredCount, + percentage: deliveredDenominator > 0 ? 100 : 0, }, { status: "unsubscribed", - count: campaign.unsubscribed, - percentage: (campaign.unsubscribed / campaign.delivered) * 100, + count: unsubscribedCount, + percentage: percentageOfDelivered(unsubscribedCount), }, { status: "clicked", - count: campaign.clicked, - percentage: (campaign.clicked / campaign.delivered) * 100, + count: clickedCount, + percentage: percentageOfDelivered(clickedCount), }, { status: "opened", - count: campaign.opened, - percentage: (campaign.opened / campaign.delivered) * 100, + count: openedCount, + percentage: percentageOfDelivered(openedCount), }, ]; + const total = campaign.total ?? 0; + const processed = campaign.sent ?? 0; + return (
- - - - - - Campaigns - - - - - - - {campaign.name} - - - - -
-

Statistics

-
- {statusCards.map((card) => ( -
-
- {card.status !== "total" ? ( - - ) : null} -
{card.status.toLowerCase()}
-
-
-
- {card.count} +
+ + + + + + Campaigns + + + + + + +
+
{campaign.name}
+
- {card.status !== "total" ? ( -
- {card.percentage.toFixed(1)}% + + + + + {campaign.status === "SCHEDULED" ? ( + + + + ) : ( + + )} +
+ +
+
+ + +
+ Statistics + {total > 0 ? ( +
+ {processed.toLocaleString()} of {total.toLocaleString()}{" "} + processed
- ) : null} + ) : ( +
+ No recipients processed yet +
+ )}
-
- ))} + + + {statisticsRows.map((row, index) => ( +
+
+ +
+
{row.status}
+
+
+
+
{row.count}
+ {row.status !== "delivered" ? ( +
+ {row.percentage.toFixed(1)}% of delivered +
+ ) : null} +
+
+ ))} +
+ + + + + Live activity + + +
+ {latestEmailsLoading ? ( +
+ +
+ ) : !latestEmails || latestEmails.length === 0 ? ( +
+
+ No recent user actions yet. +
+
+ ) : ( +
+ + {latestEmails.map((email) => { + const recipients = email.to ?? []; + const primaryRecipient = + recipients.length > 0 + ? recipients[0] + : "Unknown recipient"; + const timestamp = + email.latestStatus === "SCHEDULED" && + email.scheduledAt + ? new Date(email.scheduledAt) + : new Date(email.updatedAt ?? email.createdAt); + const relativeTime = formatDistanceToNow(timestamp, { + addSuffix: true, + }); + + return ( + +
+ {primaryRecipient} +
+
+ + + {relativeTime} + +
+
+ ); + })} +
+
+ )} +
+
+
@@ -146,43 +283,29 @@ export default function CampaignDetailsPage({ ); } -const CampaignStatusBadge: React.FC<{ status: string }> = ({ status }) => { - let outsideColor = "bg-gray"; - let insideColor = "bg-gray/50"; +const CampaignStatusIndicator: React.FC<{ status: string }> = ({ status }) => { + let colorClass = "bg-gray"; switch (status) { case "delivered": - outsideColor = "bg-green/30"; - insideColor = "bg-green"; + colorClass = "bg-green"; break; case "bounced": case "unsubscribed": - outsideColor = "bg-red/30"; - insideColor = "bg-red"; + colorClass = "bg-red"; break; case "clicked": - outsideColor = "bg-blue/30"; - insideColor = "bg-blue"; + colorClass = "bg-blue"; break; case "opened": - outsideColor = "bg-purple/30"; - insideColor = "bg-purple"; + colorClass = "bg-purple"; break; - case "complained": - outsideColor = "bg-yellow/30"; - insideColor = "bg-yellow"; + colorClass = "bg-yellow"; break; default: - outsideColor = "bg-gray/40"; - insideColor = "bg-gray"; + colorClass = "bg-gray"; } - return ( -
-
-
- ); + return
; }; diff --git a/apps/web/src/app/(dashboard)/campaigns/campaign-card.tsx b/apps/web/src/app/(dashboard)/campaigns/campaign-card.tsx new file mode 100644 index 0000000..5d16d42 --- /dev/null +++ b/apps/web/src/app/(dashboard)/campaigns/campaign-card.tsx @@ -0,0 +1,146 @@ +"use client"; + +import { CampaignStatus } from "@prisma/client"; +import { format } from "date-fns"; +import Link from "next/link"; + +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@usesend/ui/src/tooltip"; +import DeleteCampaign from "./delete-campaign"; +import DuplicateCampaign from "./duplicate-campaign"; +import TogglePauseCampaign from "./toggle-pause-campaign"; +import CampaignStatusBadge from "./campaign-status-badge"; + +interface CampaignCardProps { + campaign: { + id: string; + name: string; + subject: string; + from: string; + status: CampaignStatus; + createdAt: Date; + updatedAt: Date; + scheduledAt?: Date | null; + total: number; + sent: number; + delivered: number; + unsubscribed: number; + }; +} + +export default function CampaignCard({ campaign }: CampaignCardProps) { + const sentPercentage = + campaign.total > 0 ? Math.round((campaign.sent / campaign.total) * 100) : 0; + const pendingCount = campaign.total - campaign.sent; + + return ( +
+ {/* Header: Campaign name + status badge */} +
+
+ +
+ {campaign.name} +
+ + +
+ {campaign.status === CampaignStatus.SCHEDULED ? ( + campaign.scheduledAt && ( +
+ At{" "} + + {format(new Date(campaign.scheduledAt), "MMM do, hh:mm a")} + +
+ ) + ) : campaign.status === CampaignStatus.SENT ? ( +
+ + Delivered {campaign.delivered}, + + {/* + | + */} + + Unsubscribed {campaign.unsubscribed} + +
+ ) : ( +
+ + Sent {campaign.sent}, + + + {pendingCount > 0 && ( + + Pending {pendingCount} + + )} +
+ )} +
+
+ + + + {/* Actions */} + +
+ {(campaign.status === CampaignStatus.SCHEDULED || + campaign.status === CampaignStatus.RUNNING || + campaign.status === CampaignStatus.PAUSED) && ( + + + + + + + + {campaign.status === CampaignStatus.PAUSED + ? "Resume campaign" + : "Pause campaign"} + + + )} + + + + + + + + Duplicate campaign + + + + + + + + + + Delete campaign + + +
+
+
+ + {/* Scheduled date for scheduled campaigns */} + + {/* Mini stats */} +
+ ); +} diff --git a/apps/web/src/app/(dashboard)/campaigns/campaign-list.tsx b/apps/web/src/app/(dashboard)/campaigns/campaign-list.tsx index 19c46e4..4b388a6 100644 --- a/apps/web/src/app/(dashboard)/campaigns/campaign-list.tsx +++ b/apps/web/src/app/(dashboard)/campaigns/campaign-list.tsx @@ -1,43 +1,73 @@ "use client"; -import { - Table, - TableHeader, - TableRow, - TableHead, - TableBody, - TableCell, -} from "@usesend/ui/src/table"; import { api } from "~/trpc/react"; import { useUrlState } from "~/hooks/useUrlState"; import { Button } from "@usesend/ui/src/button"; import Spinner from "@usesend/ui/src/spinner"; -import { formatDistanceToNow } from "date-fns"; import { CampaignStatus } from "@prisma/client"; -import DeleteCampaign from "./delete-campaign"; -import Link from "next/link"; -import DuplicateCampaign from "./duplicate-campaign"; import { Select, SelectTrigger, SelectContent, SelectItem, } from "@usesend/ui/src/select"; +import { Input } from "@usesend/ui/src/input"; +import { Search } from "lucide-react"; +import { useDebouncedCallback } from "use-debounce"; +import CampaignCard from "./campaign-card"; export default function CampaignList() { const [page, setPage] = useUrlState("page", "1"); const [status, setStatus] = useUrlState("status"); + const [searchTerm, setSearchTerm] = useUrlState("search"); + const [search, setSearch] = useUrlState("search"); + + const debouncedSearch = useDebouncedCallback((value: string) => { + setSearch(value); + }, 1000); + + const onSearch = (value: string) => { + setSearchTerm(value); + debouncedSearch(value); + }; const pageNumber = Number(page); - const campaignsQuery = api.campaign.getCampaigns.useQuery({ - page: pageNumber, - status: status as CampaignStatus | null, - }); + const campaignsQuery = api.campaign.getCampaigns.useQuery( + { + page: pageNumber, + status: status as CampaignStatus | null, + search, + }, + { + refetchInterval: (query) => { + const c = query.state.data?.campaigns; + if (!c) return false; + const shouldPoll = c.some( + (campaign) => + campaign.status === CampaignStatus.RUNNING || + campaign.status === CampaignStatus.SCHEDULED + ); + return shouldPoll ? 5000 : false; + }, + } + ); return (
-
+
+ {/* Search input */} +
+ + onSearch(e.target.value)} + className="pl-10" + /> +
+ + {/* Status filter */}
-
- - - - Name - Status - Created At - Actions - - - - {campaignsQuery.isLoading ? ( - - - - - - ) : campaignsQuery.data?.campaigns.length ? ( - campaignsQuery.data?.campaigns.map((campaign) => ( - - - - {campaign.name} - - - -
- {campaign.status.toLowerCase()} -
-
- - {formatDistanceToNow(new Date(campaign.createdAt), { - addSuffix: true, - })} - - -
- - -
-
-
- )) - ) : ( - - - No campaigns found - - + {/* Campaign cards */} +
+ {campaignsQuery.isLoading ? ( +
+ +
+ ) : campaignsQuery.data?.campaigns.length ? ( + campaignsQuery.data?.campaigns.map((campaign) => ( + + )) + ) : ( +
+ No campaigns found + {(search || status) && ( +
+ Try adjusting your search or filters +
)} - -
+
+ )}
+ + + + Schedule Campaign + +
+
+ +
+ onScheduleInputChange(e.target.value)} + /> + + + + + + +
+ { + if (d) setDatePreserveTime(d); + }} + className="rounded-md border w-[250px] h-[300px] shrink-0 font-mono" + /> +
{ + e.stopPropagation(); + }} + onTouchMoveCapture={(e) => { + e.stopPropagation(); + }} + > + {timeOptions.map((opt) => { + const isActive = selectedDate + ? getMinutesOfDay(selectedDate) === opt.minutes + : false; + return ( + + ); + })} +
+
+
+
+
+ +
+ {selectedDate ? ( + + {format(selectedDate, "MMMM do, h:mm a")} + + ) : ( + No date selected + )} +
+
+ + {error && ( +
+ {error} +
+ )} + +
+ {isConfirmNow ? ( +
+ + Are you sure you want to send this campaign now? + + + +
+ ) : ( + <> + + + + )} +
+
+
+ +
+ ); +}; + +export default ScheduleCampaign; diff --git a/apps/web/src/app/(dashboard)/campaigns/toggle-pause-campaign.tsx b/apps/web/src/app/(dashboard)/campaigns/toggle-pause-campaign.tsx new file mode 100644 index 0000000..816d3f1 --- /dev/null +++ b/apps/web/src/app/(dashboard)/campaigns/toggle-pause-campaign.tsx @@ -0,0 +1,97 @@ +"use client"; + +import { Button } from "@usesend/ui/src/button"; +import { api } from "~/trpc/react"; +import React from "react"; +import { Pause, Play } from "lucide-react"; +import { Campaign, CampaignStatus } from "@prisma/client"; +import { toast } from "@usesend/ui/src/toaster"; + +export const TogglePauseCampaign: React.FC<{ + campaign: Partial & { id: string; status?: CampaignStatus }; + mode?: "icon" | "full"; +}> = ({ campaign, mode = "icon" }) => { + const utils = api.useUtils(); + const pauseMutation = api.campaign.pauseCampaign.useMutation(); + const resumeMutation = api.campaign.resumeCampaign.useMutation(); + + const isPaused = campaign.status === CampaignStatus.PAUSED; + + const onToggle = () => { + if (isPaused) { + resumeMutation.mutate( + { campaignId: campaign.id }, + { + onSuccess: () => { + utils.campaign.getCampaigns.invalidate(); + utils.campaign.getCampaign.invalidate(); + toast.success("Campaign resumed"); + }, + } + ); + } else { + pauseMutation.mutate( + { campaignId: campaign.id }, + { + onSuccess: () => { + utils.campaign.getCampaigns.invalidate(); + utils.campaign.getCampaign.invalidate(); + toast.success("Campaign paused"); + }, + } + ); + } + }; + + const pending = pauseMutation.isPending || resumeMutation.isPending; + + if ( + campaign.status !== CampaignStatus.PAUSED && + campaign.status !== CampaignStatus.RUNNING + ) { + return null; + } + + return ( + <> + {mode === "icon" ? ( + + ) : ( + + )} + + ); +}; + +export default TogglePauseCampaign; diff --git a/apps/web/src/instrumentation.ts b/apps/web/src/instrumentation.ts index 81f1e66..81d3186 100644 --- a/apps/web/src/instrumentation.ts +++ b/apps/web/src/instrumentation.ts @@ -25,6 +25,11 @@ export async function register() { await import("~/server/jobs/usage-job"); } + const { CampaignSchedulerService } = await import( + "~/server/jobs/campaign-scheduler-job" + ); + await CampaignSchedulerService.start(); + initialized = true; } } diff --git a/apps/web/src/server/api/routers/campaign.ts b/apps/web/src/server/api/routers/campaign.ts index 03eb8d1..6f1cac2 100644 --- a/apps/web/src/server/api/routers/campaign.ts +++ b/apps/web/src/server/api/routers/campaign.ts @@ -11,10 +11,7 @@ import { } from "~/server/api/trpc"; import { logger } from "~/server/logger/log"; import { nanoid } from "~/server/nanoid"; -import { - sendCampaign, - subscribeContact, -} from "~/server/service/campaign-service"; +import * as campaignService from "~/server/service/campaign-service"; import { validateDomainFromEmail } from "~/server/service/domain-service"; import { getDocumentUploadUrl, @@ -29,10 +26,10 @@ export const campaignRouter = createTRPCRouter({ z.object({ page: z.number().optional(), status: z.enum(statuses).optional().nullable(), - }), + search: z.string().optional().nullable(), + }) ) .query(async ({ ctx: { db, team }, input }) => { - let completeTime = performance.now(); const page = input.page || 1; const limit = 30; const offset = (page - 1) * limit; @@ -45,6 +42,23 @@ export const campaignRouter = createTRPCRouter({ whereConditions.status = input.status; } + if (input.search) { + whereConditions.OR = [ + { + name: { + contains: input.search, + mode: "insensitive", + }, + }, + { + subject: { + contains: input.search, + mode: "insensitive", + }, + }, + ]; + } + const countP = db.campaign.count({ where: whereConditions }); const campaignsP = db.campaign.findMany({ @@ -57,6 +71,11 @@ export const campaignRouter = createTRPCRouter({ createdAt: true, updatedAt: true, status: true, + scheduledAt: true, + total: true, + sent: true, + delivered: true, + unsubscribed: true, }, orderBy: { createdAt: "desc", @@ -64,19 +83,8 @@ export const campaignRouter = createTRPCRouter({ skip: offset, take: limit, }); - let time = performance.now(); - - campaignsP.then((campaigns) => { - logger.info( - `Time taken to get campaigns: ${performance.now() - time} milliseconds`, - ); - }); const [campaigns, count] = await Promise.all([campaignsP, countP]); - logger.info( - { duration: performance.now() - completeTime }, - `Time taken to complete request`, - ); return { campaigns, totalPage: Math.ceil(count / limit) }; }), @@ -87,7 +95,7 @@ export const campaignRouter = createTRPCRouter({ name: z.string(), from: z.string(), subject: z.string(), - }), + }) ) .mutation(async ({ ctx: { db, team }, input }) => { const domain = await validateDomainFromEmail(input.from, team.id); @@ -113,7 +121,7 @@ export const campaignRouter = createTRPCRouter({ content: z.string().optional(), contactBookId: z.string().optional(), replyTo: z.string().array().optional(), - }), + }) ) .mutation(async ({ ctx: { db, team, campaign: campaignOld }, input }) => { const { campaignId, ...data } = input; @@ -155,14 +163,9 @@ export const campaignRouter = createTRPCRouter({ return campaign; }), - deleteCampaign: campaignProcedure.mutation( - async ({ ctx: { db, team }, input }) => { - const campaign = await db.campaign.delete({ - where: { id: input.campaignId, teamId: team.id }, - }); - return campaign; - }, - ), + deleteCampaign: campaignProcedure.mutation(async ({ input }) => { + return await campaignService.deleteCampaign(input.campaignId); + }), getCampaign: campaignProcedure.query(async ({ ctx: { db, team }, input }) => { const campaign = await db.campaign.findUnique({ @@ -191,10 +194,31 @@ export const campaignRouter = createTRPCRouter({ }; }), - sendCampaign: campaignProcedure.mutation( - async ({ ctx: { db, team }, input }) => { - await sendCampaign(input.campaignId); - }, + latestEmails: campaignProcedure.query( + async ({ ctx: { db, team, campaign } }) => { + const emails = await db.email.findMany({ + where: { + teamId: team.id, + campaignId: campaign.id, + }, + orderBy: [ + { updatedAt: "desc" }, + { createdAt: "desc" }, + ], + take: 10, + select: { + id: true, + subject: true, + to: true, + latestStatus: true, + createdAt: true, + updatedAt: true, + scheduledAt: true, + }, + }); + + return emails; + } ), reSubscribeContact: publicProcedure @@ -202,14 +226,14 @@ export const campaignRouter = createTRPCRouter({ z.object({ id: z.string(), hash: z.string(), - }), + }) ) - .mutation(async ({ ctx: { db }, input }) => { - await subscribeContact(input.id, input.hash); + .mutation(async ({ input }) => { + await campaignService.subscribeContact(input.id, input.hash); }), duplicateCampaign: campaignProcedure.mutation( - async ({ ctx: { db, team, campaign }, input }) => { + async ({ ctx: { db, team, campaign } }) => { const newCampaign = await db.campaign.create({ data: { name: `${campaign.name} (Copy)`, @@ -223,15 +247,49 @@ export const campaignRouter = createTRPCRouter({ }); return newCampaign; - }, + } ), + scheduleCampaign: campaignProcedure + .input( + z.object({ + campaignId: z.string(), + scheduledAt: z.union([z.string().datetime(), z.date()]).optional(), + batchSize: z.number().min(1).max(100_000).optional(), + }) + ) + .mutation(async ({ ctx: { team }, input }) => { + await campaignService.scheduleCampaign({ + campaignId: input.campaignId, + teamId: team.id, + scheduledAt: input.scheduledAt, + batchSize: input.batchSize, + }); + return { ok: true }; + }), + + pauseCampaign: campaignProcedure.mutation(async ({ ctx: { campaign } }) => { + await campaignService.pauseCampaign({ + campaignId: campaign.id, + teamId: campaign.teamId, + }); + return { ok: true }; + }), + + resumeCampaign: campaignProcedure.mutation(async ({ ctx: { campaign } }) => { + await campaignService.resumeCampaign({ + campaignId: campaign.id, + teamId: campaign.teamId, + }); + return { ok: true }; + }), + generateImagePresignedUrl: campaignProcedure .input( z.object({ name: z.string(), type: z.string(), - }), + }) ) .mutation(async ({ ctx: { team }, input }) => { const extension = input.name.split(".").pop(); @@ -239,7 +297,7 @@ export const campaignRouter = createTRPCRouter({ const url = await getDocumentUploadUrl( `${team.id}/${randomName}`, - input.type, + input.type ); const imageUrl = `${env.S3_COMPATIBLE_PUBLIC_URL}/${team.id}/${randomName}`; diff --git a/apps/web/src/server/jobs/campaign-scheduler-job.ts b/apps/web/src/server/jobs/campaign-scheduler-job.ts new file mode 100644 index 0000000..e00d99d --- /dev/null +++ b/apps/web/src/server/jobs/campaign-scheduler-job.ts @@ -0,0 +1,104 @@ +import { Queue, Worker } from "bullmq"; +import { createWorkerHandler, TeamJob } from "../queue/bullmq-context"; +import { + CAMPAIGN_SCHEDULER_QUEUE, + DEFAULT_QUEUE_OPTIONS, +} from "../queue/queue-constants"; +import { getRedis } from "../redis"; +import { CampaignBatchService } from "../service/campaign-service"; +import { db } from "../db"; +import { logger } from "../logger/log"; + +const SCHEDULER_TICK_MS = 1500; + +type SchedulerJob = TeamJob<{}>; + +export class CampaignSchedulerService { + private static schedulerQueue = new Queue( + CAMPAIGN_SCHEDULER_QUEUE, + { + connection: getRedis(), + } + ); + + static worker = new Worker( + CAMPAIGN_SCHEDULER_QUEUE, + createWorkerHandler(async (_job: SchedulerJob) => { + try { + const now = new Date(); + const campaigns = await db.campaign.findMany({ + where: { + status: { in: ["SCHEDULED", "RUNNING"] }, + OR: [{ scheduledAt: null }, { scheduledAt: { lte: now } }], + }, + select: { + id: true, + teamId: true, + lastSentAt: true, + batchWindowMinutes: true, + }, + }); + + const enqueuePromises: Promise[] = []; + for (const c of campaigns) { + const windowMin = c.batchWindowMinutes ?? 0; + if (windowMin > 0 && c.lastSentAt) { + const elapsedMs = now.getTime() - new Date(c.lastSentAt).getTime(); + const windowMs = windowMin * 60 * 1000; + if (elapsedMs < windowMs) { + const remainingMs = windowMs - elapsedMs; + logger.debug( + { campaignId: c.id, remainingMs, windowMs }, + "Skip queueing batch; window not elapsed" + ); + continue; + } + } + enqueuePromises.push( + CampaignBatchService.queueBatch({ + campaignId: c.id, + teamId: c.teamId, + }).catch((err) => { + logger.error( + { err, campaignId: c.id }, + "Failed to enqueue campaign batch" + ); + }) + ); + } + + if (enqueuePromises.length > 0) { + const results = await Promise.allSettled(enqueuePromises); + const rejected = results.filter( + (r) => r.status === "rejected" + ).length; + const fulfilled = results.length - rejected; + logger.debug( + { total: results.length, fulfilled, rejected }, + "Scheduler enqueue summary" + ); + } + } catch (err) { + logger.error({ err }, "Campaign scheduler tick failed"); + } + }), + { connection: getRedis(), concurrency: 1 } + ); + + static async start() { + try { + await this.schedulerQueue.add( + "tick", + {}, + { + jobId: "campaign-scheduler", + repeat: { every: SCHEDULER_TICK_MS }, + ...DEFAULT_QUEUE_OPTIONS, + } + ); + } catch (err) { + // Adding the same repeatable job is idempotent; ignore job-exists errors + logger.info({ err }, "Scheduler start attempted"); + } + } +} diff --git a/apps/web/src/server/queue/queue-constants.ts b/apps/web/src/server/queue/queue-constants.ts index 91d7fc9..fb6d4c9 100644 --- a/apps/web/src/server/queue/queue-constants.ts +++ b/apps/web/src/server/queue/queue-constants.ts @@ -1,6 +1,8 @@ export const SES_WEBHOOK_QUEUE = "ses-webhook"; export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing"; export const CONTACT_BULK_ADD_QUEUE = "contact-bulk-add"; +export const CAMPAIGN_BATCH_QUEUE = "campaign-batch"; +export const CAMPAIGN_SCHEDULER_QUEUE = "campaign-scheduler"; export const DEFAULT_QUEUE_OPTIONS = { removeOnComplete: true, diff --git a/apps/web/src/server/service/campaign-service.ts b/apps/web/src/server/service/campaign-service.ts index 4af9bc3..387321a 100644 --- a/apps/web/src/server/service/campaign-service.ts +++ b/apps/web/src/server/service/campaign-service.ts @@ -8,17 +8,17 @@ import { EmailStatus, UnsubscribeReason, } from "@prisma/client"; -import { validateDomainFromEmail } from "./domain-service"; import { EmailQueueService } from "./email-queue-service"; import { Queue, Worker } from "bullmq"; import { getRedis } from "../redis"; import { - CAMPAIGN_MAIL_PROCESSING_QUEUE, + CAMPAIGN_BATCH_QUEUE, DEFAULT_QUEUE_OPTIONS, } from "../queue/queue-constants"; import { logger } from "../logger/log"; import { createWorkerHandler, TeamJob } from "../queue/bullmq-context"; import { SuppressionService } from "./suppression-service"; +import { UnsendApiError } from "../public-api/api-error"; const CAMPAIGN_UNSUB_PLACEHOLDER_TOKENS = [ "{{unsend_unsubscribe_url}}", @@ -57,21 +57,6 @@ export async function sendCampaign(id: string) { throw new Error("No contact book found for campaign"); } - const contactBook = await db.contactBook.findUnique({ - where: { id: campaign.contactBookId }, - include: { - contacts: { - where: { - subscribed: true, - }, - }, - }, - }); - - if (!contactBook) { - throw new Error("Contact book not found"); - } - if (!campaign.html) { throw new Error("No HTML content for campaign"); } @@ -83,27 +68,194 @@ export async function sendCampaign(id: string) { ); if (!unsubPlaceholderFound) { - throw new Error( - "Campaign must include an unsubscribe link before sending" - ); + throw new Error("Campaign must include an unsubscribe link before sending"); } - await sendCampaignEmail(campaign, { - campaignId: campaign.id, - from: campaign.from, - subject: campaign.subject, - html: campaign.html, - replyTo: campaign.replyTo, - cc: campaign.cc, - bcc: campaign.bcc, - teamId: campaign.teamId, - contacts: contactBook.contacts, + // Count subscribed contacts for total, don't load all into memory + const total = await db.contact.count({ + where: { contactBookId: campaign.contactBookId, subscribed: true }, }); + // Mark as scheduled (or keep running if already running), set totals and scheduledAt if not set await db.campaign.update({ where: { id }, - data: { status: "SENT", total: contactBook.contacts.length }, + data: { + status: "SCHEDULED", + total, + scheduledAt: campaign.scheduledAt ?? new Date(), + lastCursor: campaign.lastCursor ?? null, + }, }); + + // Kick off first batch immediately (idempotent by jobId) + await CampaignBatchService.queueBatch({ + campaignId: id, + teamId: campaign.teamId, + }); +} + +export async function scheduleCampaign({ + campaignId, + teamId, + scheduledAt: scheduledAtInput, + batchSize, +}: { + campaignId: string; + teamId: number; + scheduledAt?: Date | string; + batchSize?: number; +}) { + let campaign = await db.campaign.findUnique({ + where: { id: campaignId, teamId }, + }); + if (!campaign) { + throw new UnsendApiError({ + code: "NOT_FOUND", + message: "Campaign not found", + }); + } + + if (!campaign.content) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "No content added for campaign", + }); + } + + // Parse & render HTML (idempotent) similar to sendCampaign + try { + const jsonContent = JSON.parse(campaign.content); + const renderer = new EmailRenderer(jsonContent); + const html = await renderer.render(); + campaign = await db.campaign.update({ + where: { id: campaign.id }, + data: { html }, + }); + } catch (err) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Invalid content", + }); + } + + if (!campaign.contactBookId) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "No contact book found for campaign", + }); + } + + if (!campaign.html) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "No HTML content for campaign", + }); + } + + const unsubPlaceholderFound = CAMPAIGN_UNSUB_PLACEHOLDER_TOKENS.some( + (placeholder) => + campaign.content?.includes(placeholder) || + campaign.html?.includes(placeholder) + ); + if (!unsubPlaceholderFound) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Campaign must include an unsubscribe link before scheduling", + }); + } + + // Count subscribed contacts for total + const total = await db.contact.count({ + where: { contactBookId: campaign.contactBookId, subscribed: true }, + }); + + if (total === 0) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "No subscribed contacts to send", + }); + } + + const scheduledAt = scheduledAtInput + ? scheduledAtInput instanceof Date + ? scheduledAtInput + : new Date(scheduledAtInput) + : new Date(); + + const shouldResetCursor = + campaign.status === "DRAFT" || campaign.status === "SENT"; + + await db.campaign.update({ + where: { id: campaign.id }, + data: { + status: "SCHEDULED", + scheduledAt, + total, + ...(batchSize ? { batchSize } : {}), + ...(shouldResetCursor ? { lastCursor: null } : {}), + }, + }); + + return { ok: true }; +} + +export async function pauseCampaign({ + campaignId, + teamId, +}: { + campaignId: string; + teamId: number; +}) { + const campaign = await db.campaign.findUnique({ + where: { id: campaignId, teamId }, + }); + + if (!campaign) { + throw new UnsendApiError({ + code: "NOT_FOUND", + message: "Campaign not found", + }); + } + + await db.campaign.update({ + where: { id: campaignId }, + data: { status: "PAUSED" }, + }); + + return { ok: true }; +} + +export async function resumeCampaign({ + campaignId, + teamId, +}: { + campaignId: string; + teamId: number; +}) { + const campaign = await db.campaign.findUnique({ + where: { id: campaignId, teamId }, + }); + + if (!campaign) { + throw new UnsendApiError({ + code: "NOT_FOUND", + message: "Campaign not found", + }); + } + + if (campaign.scheduledAt && campaign.scheduledAt.getTime() > Date.now()) { + await db.campaign.update({ + where: { id: campaignId }, + data: { status: "SCHEDULED" }, + }); + } else { + await db.campaign.update({ + where: { id: campaignId }, + data: { status: "RUNNING" }, + }); + } + + return { ok: true }; } export function createUnsubUrl(contactId: string, campaignId: string) { @@ -242,18 +394,21 @@ export async function subscribeContact(id: string, hash: string) { } } -type CampainEmail = { - campaignId: string; - from: string; - subject: string; - html: string; - previewText?: string; - replyTo?: string[]; - cc?: string[]; - bcc?: string[]; - teamId: number; - contacts: Array; -}; +export async function deleteCampaign(id: string) { + const campaign = await db.$transaction(async (tx) => { + await tx.campaignEmail.deleteMany({ + where: { campaignId: id }, + }); + + const campaign = await tx.campaign.delete({ + where: { id }, + }); + + return campaign; + }); + + return campaign; +} type CampaignEmailJob = { contact: Contact; @@ -272,8 +427,6 @@ type CampaignEmailJob = { }; }; -type QueueCampaignEmailJob = TeamJob; - async function processContactEmail(jobData: CampaignEmailJob) { const { contact, campaign, emailConfig } = jobData; const jsonContent = JSON.parse(campaign.content || "{}"); @@ -367,6 +520,18 @@ async function processContactEmail(jobData: CampaignEmailJob) { }, }); + try { + await db.campaignEmail.create({ + data: { + campaignId: emailConfig.campaignId, + contactId: contact.id, + emailId: email.id, + }, + }); + } catch (error) { + logger.error({ err: error }, "Failed to create campaign email record"); + } + return; } @@ -413,6 +578,22 @@ async function processContactEmail(jobData: CampaignEmailJob) { }, }); + try { + await db.campaignEmail.create({ + data: { + campaignId: emailConfig.campaignId, + contactId: contact.id, + emailId: email.id, + }, + }); + } catch (error) { + logger.error( + { err: error }, + "Failed to create campaign email record so skipping email sending" + ); + return; + } + // Queue email for sending await EmailQueueService.queueEmail( email.id, @@ -423,50 +604,6 @@ async function processContactEmail(jobData: CampaignEmailJob) { ); } -export async function sendCampaignEmail( - campaign: Campaign, - emailData: CampainEmail -) { - const { - campaignId, - from, - subject, - replyTo, - cc, - bcc, - teamId, - contacts, - previewText, - } = emailData; - - const domain = await validateDomainFromEmail(from, teamId); - - logger.info("Bulk queueing contacts"); - - await CampaignEmailService.queueBulkContacts( - contacts.map((contact) => ({ - contact, - campaign, - emailConfig: { - from, - subject, - replyTo: replyTo - ? Array.isArray(replyTo) - ? replyTo - : [replyTo] - : undefined, - cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, - bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, - teamId, - campaignId, - previewText, - domainId: domain.id, - region: domain.region, - }, - })) - ); -} - export async function updateCampaignAnalytics( campaignId: string, emailStatus: EmailStatus, @@ -514,51 +651,158 @@ export async function updateCampaignAnalytics( }); } -const CAMPAIGN_EMAIL_CONCURRENCY = 50; +// --------------------------- +// Simple campaign batch queue +// --------------------------- -class CampaignEmailService { - private static campaignQueue = new Queue( - CAMPAIGN_MAIL_PROCESSING_QUEUE, +type CampaignBatchJob = TeamJob<{ campaignId: string }>; + +export class CampaignBatchService { + private static batchQueue = new Queue( + CAMPAIGN_BATCH_QUEUE, { connection: getRedis(), } ); - // TODO: Add team context to job data when queueing static worker = new Worker( - CAMPAIGN_MAIL_PROCESSING_QUEUE, - createWorkerHandler(async (job: QueueCampaignEmailJob) => { - await processContactEmail(job.data); + CAMPAIGN_BATCH_QUEUE, + createWorkerHandler(async (job: CampaignBatchJob) => { + const { campaignId } = job.data; + + const campaign = await db.campaign.findUnique({ + where: { id: campaignId }, + }); + if (!campaign) return; + if (!campaign.contactBookId) return; + + // Skip paused campaigns + if (campaign.status === "PAUSED") return; + + // Respect scheduledAt if set + if (campaign.scheduledAt && campaign.scheduledAt.getTime() > Date.now()) + return; + + // First touch moves SCHEDULED -> RUNNING + if (campaign.status === "SCHEDULED") { + await db.campaign.update({ + where: { id: campaignId }, + data: { status: "RUNNING" }, + }); + } + + const batchSize = campaign.batchSize ?? 500; + + const where = { + contactBookId: campaign.contactBookId, + subscribed: true, + } as const; + const pagination: any = { + take: batchSize, + orderBy: { id: "asc" as const }, + }; + if (campaign.lastCursor) { + pagination.cursor = { id: campaign.lastCursor }; + pagination.skip = 1; // do not include the cursor row + } + + const contacts = await db.contact.findMany({ where, ...pagination }); + + if (contacts.length === 0) { + // No more contacts -> mark SENT + await db.campaign.update({ + where: { id: campaignId }, + data: { status: "SENT" }, + }); + return; + } + + // Fetch domain for region and id + const domain = await db.domain.findUnique({ + where: { id: campaign.domainId }, + }); + if (!domain) return; + + // Bulk existence check to avoid duplicates while unique is not enforced + const existing = await db.campaignEmail.findMany({ + where: { + campaignId: campaign.id, + contactId: { in: contacts.map((c) => c.id) }, + }, + select: { contactId: true }, + }); + const existingSet = new Set(existing.map((e) => e.contactId)); + + // Process each contact in this batch + for (const contact of contacts) { + if (existingSet.has(contact.id)) continue; + + await processContactEmail({ + contact, + campaign, + emailConfig: { + from: campaign.from, + subject: campaign.subject, + replyTo: Array.isArray(campaign.replyTo) ? campaign.replyTo : [], + cc: Array.isArray(campaign.cc) ? campaign.cc : [], + bcc: Array.isArray(campaign.bcc) ? campaign.bcc : [], + teamId: campaign.teamId, + campaignId: campaign.id, + previewText: campaign.previewText ?? undefined, + domainId: domain.id, + region: domain.region, + }, + }); + } + + // Advance cursor and timestamp + const newCursor = contacts[contacts.length - 1]?.id; + await db.campaign.update({ + where: { id: campaignId }, + data: { lastCursor: newCursor, lastSentAt: new Date() }, + }); }), - { - connection: getRedis(), - concurrency: CAMPAIGN_EMAIL_CONCURRENCY, - } + { connection: getRedis(), concurrency: 20 } ); - static async queueContact(data: CampaignEmailJob) { - return await this.campaignQueue.add( - `contact-${data.contact.id}`, - { - ...data, - teamId: data.emailConfig.teamId, - }, - DEFAULT_QUEUE_OPTIONS - ); - } + static async queueBatch({ + campaignId, + teamId, + }: { + campaignId: string; + teamId?: number; + }) { + // Defensive check: avoid enqueue if window not elapsed (scheduler already enforces) + try { + const campaign = await db.campaign.findUnique({ + where: { id: campaignId }, + select: { lastSentAt: true, batchWindowMinutes: true, status: true }, + }); + if (!campaign) return; + if (campaign.status === "PAUSED" || campaign.status === "SENT") return; + const windowMin = campaign.batchWindowMinutes ?? 0; + if (windowMin > 0 && campaign.lastSentAt) { + const elapsedMs = Date.now() - new Date(campaign.lastSentAt).getTime(); + const windowMs = windowMin * 60 * 1000; + if (elapsedMs < windowMs) { + logger.debug( + { campaignId, remainingMs: windowMs - elapsedMs }, + "Defensive skip enqueue; window not elapsed" + ); + return; + } + } + } catch (err) { + logger.warn( + { err, campaignId }, + "Failed defensive window check; proceeding to enqueue" + ); + } - static async queueBulkContacts(data: CampaignEmailJob[]) { - return await this.campaignQueue.addBulk( - data.map((item) => ({ - name: `contact-${item.contact.id}`, - data: { - ...item, - teamId: item.emailConfig.teamId, - }, - opts: { - ...DEFAULT_QUEUE_OPTIONS, - }, - })) + await this.batchQueue.add( + `campaign-${campaignId}`, + { campaignId, teamId }, + { jobId: `campaign-batch:${campaignId}`, ...DEFAULT_QUEUE_OPTIONS } ); } } diff --git a/packages/ui/package.json b/packages/ui/package.json index 067fc3d..ed4f279 100644 --- a/packages/ui/package.json +++ b/packages/ui/package.json @@ -46,12 +46,14 @@ "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.1.1", + "date-fns": "^4.1.0", "framer-motion": "^12.9.2", "hast-util-to-jsx-runtime": "^2.3.6", "input-otp": "^1.4.2", "lucide-react": "^0.503.0", "next-themes": "^0.4.6", "pnpm": "^10.9.0", + "react-day-picker": "^9.10.0", "react-hook-form": "^7.56.1", "recharts": "^2.15.3", "shiki": "^3.3.0", diff --git a/packages/ui/src/button.tsx b/packages/ui/src/button.tsx index 4d0a4d0..a4cb9bd 100644 --- a/packages/ui/src/button.tsx +++ b/packages/ui/src/button.tsx @@ -59,6 +59,7 @@ const Button = React.forwardRef( ref ) => { const Comp = asChild ? Slot : "button"; + return ( ( disabled={isLoading || props.disabled} {...props} > - {isLoading && showSpinner ? : null} + {isLoading && showSpinner ? ( + + ) : null} {children} ); diff --git a/packages/ui/src/calendar.tsx b/packages/ui/src/calendar.tsx new file mode 100644 index 0000000..37599c1 --- /dev/null +++ b/packages/ui/src/calendar.tsx @@ -0,0 +1,213 @@ +"use client"; + +import * as React from "react"; +import { + ChevronDownIcon, + ChevronLeftIcon, + ChevronRightIcon, +} from "lucide-react"; +import { DayButton, DayPicker, getDefaultClassNames } from "react-day-picker"; + +import { cn } from "../lib/utils"; +import { Button, buttonVariants } from "./button"; + +function Calendar({ + className, + classNames, + showOutsideDays = true, + captionLayout = "label", + buttonVariant = "ghost", + formatters, + components, + ...props +}: React.ComponentProps & { + buttonVariant?: React.ComponentProps["variant"]; +}) { + const defaultClassNames = getDefaultClassNames(); + + return ( + svg]:rotate-180`, + String.raw`rtl:**:[.rdp-button\_previous>svg]:rotate-180`, + className + )} + captionLayout={captionLayout} + formatters={{ + formatMonthDropdown: (date) => + date.toLocaleString("default", { month: "short" }), + ...formatters, + }} + classNames={{ + root: cn("w-fit", defaultClassNames.root), + months: cn( + "flex gap-4 flex-col md:flex-row relative", + defaultClassNames.months + ), + month: cn("flex flex-col w-full gap-4", defaultClassNames.month), + nav: cn( + "flex items-center gap-1 w-full absolute top-0 inset-x-0 justify-between", + defaultClassNames.nav + ), + button_previous: cn( + buttonVariants({ variant: buttonVariant }), + "size-(--cell-size) aria-disabled:opacity-50 p-0 select-none", + defaultClassNames.button_previous + ), + button_next: cn( + buttonVariants({ variant: buttonVariant }), + "size-(--cell-size) aria-disabled:opacity-50 p-0 select-none", + defaultClassNames.button_next + ), + month_caption: cn( + "flex items-center justify-center h-(--cell-size) w-full px-(--cell-size)", + defaultClassNames.month_caption + ), + dropdowns: cn( + "w-full flex items-center text-sm font-medium justify-center h-(--cell-size) gap-1.5", + defaultClassNames.dropdowns + ), + dropdown_root: cn( + "relative has-focus:border-ring border border-input shadow-xs has-focus:ring-ring/50 has-focus:ring-[3px] rounded-md", + defaultClassNames.dropdown_root + ), + dropdown: cn( + "absolute bg-popover inset-0 opacity-0", + defaultClassNames.dropdown + ), + caption_label: cn( + "select-none font-medium", + captionLayout === "label" + ? "text-sm" + : "rounded-md pl-2 pr-1 flex items-center gap-1 text-sm h-8 [&>svg]:text-muted-foreground [&>svg]:size-3.5", + defaultClassNames.caption_label + ), + table: "w-full border-collapse", + weekdays: cn("flex", defaultClassNames.weekdays), + weekday: cn( + "text-muted-foreground rounded-md flex-1 font-normal text-[0.8rem] select-none", + defaultClassNames.weekday + ), + week: cn("flex w-full mt-2", defaultClassNames.week), + week_number_header: cn( + "select-none w-(--cell-size)", + defaultClassNames.week_number_header + ), + week_number: cn( + "text-[0.8rem] select-none text-muted-foreground", + defaultClassNames.week_number + ), + day: cn( + "relative w-full h-full p-0 text-center [&:first-child[data-selected=true]_button]:rounded-l-md [&:last-child[data-selected=true]_button]:rounded-r-md group/day aspect-square select-none", + defaultClassNames.day + ), + range_start: cn( + "rounded-l-md bg-accent", + defaultClassNames.range_start + ), + range_middle: cn("rounded-none", defaultClassNames.range_middle), + range_end: cn("rounded-r-md bg-accent", defaultClassNames.range_end), + today: cn( + "bg-accent text-accent-foreground rounded-md data-[selected=true]:rounded-none", + defaultClassNames.today + ), + outside: cn( + "text-muted-foreground aria-selected:text-muted-foreground", + defaultClassNames.outside + ), + disabled: cn( + "text-muted-foreground opacity-50", + defaultClassNames.disabled + ), + hidden: cn("invisible", defaultClassNames.hidden), + ...classNames, + }} + components={{ + Root: ({ className, rootRef, ...props }) => { + return ( +
+ ); + }, + Chevron: ({ className, orientation, ...props }) => { + if (orientation === "left") { + return ( + + ); + } + + if (orientation === "right") { + return ( + + ); + } + + return ( + + ); + }, + DayButton: CalendarDayButton, + WeekNumber: ({ children, ...props }) => { + return ( + +
+ {children} +
+ + ); + }, + ...components, + }} + {...props} + /> + ); +} + +function CalendarDayButton({ + className, + day, + modifiers, + ...props +}: React.ComponentProps) { + const defaultClassNames = getDefaultClassNames(); + + const ref = React.useRef(null); + React.useEffect(() => { + if (modifiers.focused) ref.current?.focus(); + }, [modifiers.focused]); + + return ( +