feat: add webhooks (#334)

This commit is contained in:
KM Koushik
2026-01-18 20:50:54 +11:00
committed by GitHub
parent f40a311cc9
commit 8676965019
58 changed files with 5334 additions and 245 deletions
+2
View File
@@ -14,6 +14,7 @@ import { suppressionRouter } from "./routers/suppression";
import { limitsRouter } from "./routers/limits";
import { waitlistRouter } from "./routers/waitlist";
import { feedbackRouter } from "./routers/feedback";
import { webhookRouter } from "./routers/webhook";
/**
* This is the primary router for your server.
@@ -36,6 +37,7 @@ export const appRouter = createTRPCRouter({
limits: limitsRouter,
waitlist: waitlistRouter,
feedback: feedbackRouter,
webhook: webhookRouter,
});
// export type definition of API
+4 -2
View File
@@ -152,12 +152,13 @@ export const contactsRouter = createTRPCRouter({
subscribed: z.boolean().optional(),
}),
)
.mutation(async ({ ctx: { contactBook }, input }) => {
.mutation(async ({ ctx: { contactBook, team }, input }) => {
const { contactId, ...contact } = input;
const updatedContact = await contactService.updateContactInContactBook(
contactId,
contactBook.id,
contact,
team.id,
);
if (!updatedContact) {
@@ -172,10 +173,11 @@ export const contactsRouter = createTRPCRouter({
deleteContact: contactBookProcedure
.input(z.object({ contactId: z.string() }))
.mutation(async ({ ctx: { contactBook }, input }) => {
.mutation(async ({ ctx: { contactBook, team }, input }) => {
const deletedContact = await contactService.deleteContactInContactBook(
input.contactId,
contactBook.id,
team.id,
);
if (!deletedContact) {
+18 -11
View File
@@ -2,7 +2,7 @@ import { Email, EmailStatus, Prisma } from "@prisma/client";
import { format, subDays } from "date-fns";
import { z } from "zod";
import { DEFAULT_QUERY_LIMIT } from "~/lib/constants";
import { BOUNCE_ERROR_MESSAGES } from "~/lib/constants/ses-errors";
import { BOUNCE_ERROR_MESSAGES } from "@usesend/lib/src";
import type { SesBounce } from "~/types/aws-types";
import {
@@ -95,12 +95,12 @@ export const emailRouter = createTRPCRouter({
const offset = (page - 1) * limit;
const emails = await db.$queryRaw<Array<Email>>`
SELECT
id,
"createdAt",
"latestStatus",
subject,
"to",
SELECT
id,
"createdAt",
"latestStatus",
subject,
"to",
"scheduledAt"
FROM "Email"
WHERE "teamId" = ${ctx.team.id}
@@ -110,9 +110,9 @@ export const emailRouter = createTRPCRouter({
${
input.search
? Prisma.sql`AND (
"subject" ILIKE ${`%${input.search}%`}
"subject" ILIKE ${`%${input.search}%`}
OR EXISTS (
SELECT 1 FROM unnest("to") AS email
SELECT 1 FROM unnest("to") AS email
WHERE email ILIKE ${`%${input.search}%`}
)
)`
@@ -201,7 +201,12 @@ export const emailRouter = createTRPCRouter({
} as const;
if (email.latestStatus !== "BOUNCED" || !email.bounceData) {
return { ...base, bounceType: undefined, bounceSubType: undefined, bounceReason: undefined };
return {
...base,
bounceType: undefined,
bounceSubType: undefined,
bounceReason: undefined,
};
}
const bounce = ensureBounceObject(email.bounceData);
@@ -209,7 +214,9 @@ export const emailRouter = createTRPCRouter({
const bounceSubType = bounce?.bounceSubType
? bounce.bounceSubType.toString().trim().replace(/\s+/g, "")
: undefined;
const bounceReason = bounce ? getBounceReasonFromParsed(bounce) : undefined;
const bounceReason = bounce
? getBounceReasonFromParsed(bounce)
: undefined;
return { ...base, bounceType, bounceSubType, bounceReason };
});
+3 -1
View File
@@ -8,7 +8,7 @@ export const limitsRouter = createTRPCRouter({
.input(
z.object({
type: z.nativeEnum(LimitReason),
})
}),
)
.query(async ({ ctx, input }) => {
switch (input.type) {
@@ -18,6 +18,8 @@ export const limitsRouter = createTRPCRouter({
return LimitService.checkDomainLimit(ctx.team.id);
case LimitReason.TEAM_MEMBER:
return LimitService.checkTeamMemberLimit(ctx.team.id);
case LimitReason.WEBHOOK:
return LimitService.checkWebhookLimit(ctx.team.id);
default:
// exhaustive guard
throw new Error("Unsupported limit type");
+135
View File
@@ -0,0 +1,135 @@
import { z } from "zod";
import { createTRPCRouter, teamProcedure } from "~/server/api/trpc";
import { WebhookCallStatus, WebhookStatus } from "@prisma/client";
import { WebhookEvents } from "@usesend/lib/src/webhook/webhook-events";
import { WebhookService } from "~/server/service/webhook-service";
const EVENT_TYPES_ENUM = z.enum(WebhookEvents);
export const webhookRouter = createTRPCRouter({
list: teamProcedure.query(async ({ ctx }) => {
return WebhookService.listWebhooks(ctx.team.id);
}),
getById: teamProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
return WebhookService.getWebhook({
id: input.id,
teamId: ctx.team.id,
});
}),
create: teamProcedure
.input(
z.object({
url: z.string().url(),
description: z.string().optional(),
eventTypes: z.array(EVENT_TYPES_ENUM),
secret: z.string().min(16).optional(),
}),
)
.mutation(async ({ ctx, input }) => {
return WebhookService.createWebhook({
teamId: ctx.team.id,
userId: ctx.session.user.id,
url: input.url,
description: input.description,
eventTypes: input.eventTypes,
secret: input.secret,
});
}),
update: teamProcedure
.input(
z.object({
id: z.string(),
url: z.string().url().optional(),
description: z.string().nullable().optional(),
eventTypes: z.array(EVENT_TYPES_ENUM).optional(),
rotateSecret: z.boolean().optional(),
secret: z.string().min(16).optional(),
}),
)
.mutation(async ({ ctx, input }) => {
return WebhookService.updateWebhook({
id: input.id,
teamId: ctx.team.id,
url: input.url,
description: input.description,
eventTypes: input.eventTypes,
rotateSecret: input.rotateSecret,
secret: input.secret,
});
}),
setStatus: teamProcedure
.input(
z.object({
id: z.string(),
status: z.nativeEnum(WebhookStatus),
}),
)
.mutation(async ({ ctx, input }) => {
return WebhookService.setWebhookStatus({
id: input.id,
teamId: ctx.team.id,
status: input.status,
});
}),
delete: teamProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
return WebhookService.deleteWebhook({
id: input.id,
teamId: ctx.team.id,
});
}),
test: teamProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
return WebhookService.testWebhook({
webhookId: input.id,
teamId: ctx.team.id,
});
}),
listCalls: teamProcedure
.input(
z.object({
webhookId: z.string().optional(),
status: z.nativeEnum(WebhookCallStatus).optional(),
limit: z.number().min(1).max(50).default(20),
cursor: z.string().optional(),
}),
)
.query(async ({ ctx, input }) => {
return WebhookService.listWebhookCalls({
teamId: ctx.team.id,
webhookId: input.webhookId,
status: input.status,
limit: input.limit,
cursor: input.cursor,
});
}),
getCall: teamProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
return WebhookService.getWebhookCall({
id: input.id,
teamId: ctx.team.id,
});
}),
retryCall: teamProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
return WebhookService.retryCall({
callId: input.id,
teamId: ctx.team.id,
});
}),
});
@@ -0,0 +1,55 @@
import { Queue, Worker } from "bullmq";
import { subDays } from "date-fns";
import { db } from "~/server/db";
import { getRedis } from "~/server/redis";
import { DEFAULT_QUEUE_OPTIONS, WEBHOOK_CLEANUP_QUEUE } from "../queue/queue-constants";
import { logger } from "../logger/log";
const WEBHOOK_RETENTION_DAYS = 30;
const webhookCleanupQueue = new Queue(WEBHOOK_CLEANUP_QUEUE, {
connection: getRedis(),
});
const worker = new Worker(
WEBHOOK_CLEANUP_QUEUE,
async () => {
const cutoff = subDays(new Date(), WEBHOOK_RETENTION_DAYS);
const result = await db.webhookCall.deleteMany({
where: {
createdAt: {
lt: cutoff,
},
},
});
logger.info(
{ deleted: result.count, cutoff: cutoff.toISOString() },
"[WebhookCleanupJob]: Deleted old webhook calls",
);
},
{
connection: getRedis(),
}
);
await webhookCleanupQueue.upsertJobScheduler(
"webhook-cleanup-daily",
{
pattern: "0 3 * * *", // daily at 03:00 UTC
tz: "UTC",
},
{
opts: {
...DEFAULT_QUEUE_OPTIONS,
},
}
);
worker.on("completed", (job) => {
logger.info({ jobId: job.id }, "[WebhookCleanupJob]: Job completed");
});
worker.on("failed", (job, err) => {
logger.error({ err, jobId: job?.id }, "[WebhookCleanupJob]: Job failed");
});
@@ -1,6 +1,5 @@
import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { getTeamFromToken } from "~/server/public-api/auth";
import { addOrUpdateContact } from "~/server/service/contact-service";
import { getContactBook } from "../../api-utils";
@@ -55,7 +54,8 @@ function addContact(app: PublicAPIApp) {
const contact = await addOrUpdateContact(
contactBook.id,
c.req.valid("json")
c.req.valid("json"),
team.id,
);
return c.json({ contactId: contact.id });
@@ -47,6 +47,7 @@ function deleteContactHandler(app: PublicAPIApp) {
const deletedContact = await deleteContactInContactBook(
contactId,
contactBook.id,
team.id,
);
if (!deletedContact) {
@@ -61,6 +61,7 @@ function updateContactInfo(app: PublicAPIApp) {
contactId,
contactBook.id,
c.req.valid("json"),
team.id,
);
if (!contact) {
@@ -1,6 +1,5 @@
import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { getTeamFromToken } from "~/server/public-api/auth";
import { addOrUpdateContact } from "~/server/service/contact-service";
import { getContactBook } from "../../api-utils";
@@ -55,7 +54,8 @@ function upsertContact(app: PublicAPIApp) {
const contact = await addOrUpdateContact(
contactBook.id,
c.req.valid("json")
c.req.valid("json"),
team.id,
);
return c.json({ contactId: contact.id });
@@ -3,6 +3,8 @@ export const CAMPAIGN_MAIL_PROCESSING_QUEUE = "campaign-emails-processing";
export const CONTACT_BULK_ADD_QUEUE = "contact-bulk-add";
export const CAMPAIGN_BATCH_QUEUE = "campaign-batch";
export const CAMPAIGN_SCHEDULER_QUEUE = "campaign-scheduler";
export const WEBHOOK_DISPATCH_QUEUE = "webhook-dispatch";
export const WEBHOOK_CLEANUP_QUEUE = "webhook-cleanup";
export const DEFAULT_QUEUE_OPTIONS = {
removeOnComplete: true,
@@ -97,7 +97,7 @@ class ContactQueueService {
}
async function processContactJob(job: ContactJob) {
const { contactBookId, contact } = job.data;
const { contactBookId, contact, teamId } = job.data;
logger.info(
{ contactEmail: contact.email, contactBookId },
@@ -105,7 +105,7 @@ async function processContactJob(job: ContactJob) {
);
try {
await addOrUpdateContact(contactBookId, contact);
await addOrUpdateContact(contactBookId, contact, teamId);
logger.info(
{ contactEmail: contact.email },
"[ContactQueueService]: Successfully processed contact job",
+78 -4
View File
@@ -1,5 +1,12 @@
import { type Contact } from "@prisma/client";
import {
type ContactPayload,
type ContactWebhookEventType,
} from "@usesend/lib/src/webhook/webhook-events";
import { db } from "../db";
import { ContactQueueService } from "./contact-queue-service";
import { WebhookService } from "./webhook-service";
import { logger } from "../logger/log";
export type ContactInput = {
email: string;
@@ -12,6 +19,7 @@ export type ContactInput = {
export async function addOrUpdateContact(
contactBookId: string,
contact: ContactInput,
teamId?: number,
) {
// Check if contact exists to handle subscribed logic
const existingContact = await db.contact.findUnique({
@@ -37,7 +45,7 @@ export async function addOrUpdateContact(
// All other cases (Yes→No, Yes→Yes, No→No) are allowed naturally
}
const createdContact = await db.contact.upsert({
const savedContact = await db.contact.upsert({
where: {
contactBookId_email: {
contactBookId,
@@ -60,7 +68,13 @@ export async function addOrUpdateContact(
},
});
return createdContact;
const eventType: ContactWebhookEventType = existingContact
? "contact.updated"
: "contact.created";
await emitContactEvent(savedContact, eventType, teamId);
return savedContact;
}
export async function getContactInContactBook(
@@ -79,6 +93,7 @@ export async function updateContactInContactBook(
contactId: string,
contactBookId: string,
contact: Partial<ContactInput>,
teamId?: number,
) {
const existingContact = await getContactInContactBook(
contactId,
@@ -89,17 +104,22 @@ export async function updateContactInContactBook(
return null;
}
return db.contact.update({
const updatedContact = await db.contact.update({
where: {
id: contactId,
},
data: contact,
});
await emitContactEvent(updatedContact, "contact.updated", teamId);
return updatedContact;
}
export async function deleteContactInContactBook(
contactId: string,
contactBookId: string,
teamId?: number,
) {
const existingContact = await getContactInContactBook(
contactId,
@@ -110,11 +130,15 @@ export async function deleteContactInContactBook(
return null;
}
return db.contact.delete({
const deletedContact = await db.contact.delete({
where: {
id: contactId,
},
});
await emitContactEvent(deletedContact, "contact.deleted", teamId);
return deletedContact;
}
export async function bulkAddContacts(
@@ -151,3 +175,53 @@ export async function subscribeContact(contactId: string) {
},
});
}
function buildContactPayload(contact: Contact): ContactPayload {
return {
id: contact.id,
email: contact.email,
contactBookId: contact.contactBookId,
subscribed: contact.subscribed,
properties: (contact.properties ?? {}) as Record<string, unknown>,
firstName: contact.firstName,
lastName: contact.lastName,
createdAt: contact.createdAt.toISOString(),
updatedAt: contact.updatedAt.toISOString(),
};
}
async function emitContactEvent(
contact: Contact,
type: ContactWebhookEventType,
teamId?: number,
) {
try {
const resolvedTeamId =
teamId ??
(await db.contactBook
.findUnique({
where: { id: contact.contactBookId },
select: { teamId: true },
})
.then((contactBook) => contactBook?.teamId));
if (!resolvedTeamId) {
logger.warn(
{ contactId: contact.id },
"[ContactService]: Skipping webhook emission, teamId not found",
);
return;
}
await WebhookService.emit(
resolvedTeamId,
type,
buildContactPayload(contact),
);
} catch (error) {
logger.error(
{ error, contactId: contact.id, type },
"[ContactService]: Failed to emit contact webhook event",
);
}
}
+63 -10
View File
@@ -7,8 +7,13 @@ import { SesSettingsService } from "./ses-settings-service";
import { UnsendApiError } from "../public-api/api-error";
import { logger } from "../logger/log";
import { ApiKey, DomainStatus, type Domain } from "@prisma/client";
import {
type DomainPayload,
type DomainWebhookEventType,
} from "@usesend/lib/src/webhook/webhook-events";
import { LimitService } from "./limit-service";
import type { DomainDnsRecord } from "~/types/domain";
import { WebhookService } from "./webhook-service";
const DOMAIN_STATUS_VALUES = new Set(Object.values(DomainStatus));
@@ -72,7 +77,7 @@ function buildDnsRecords(domain: Domain): DomainDnsRecord[] {
}
function withDnsRecords<T extends Domain>(
domain: T
domain: T,
): T & { dnsRecords: DomainDnsRecord[] } {
return {
...domain,
@@ -82,6 +87,24 @@ function withDnsRecords<T extends Domain>(
const dnsResolveTxt = util.promisify(dns.resolveTxt);
function buildDomainPayload(domain: Domain): DomainPayload {
return {
id: domain.id,
name: domain.name,
status: domain.status,
region: domain.region,
createdAt: domain.createdAt.toISOString(),
updatedAt: domain.updatedAt.toISOString(),
clickTracking: domain.clickTracking,
openTracking: domain.openTracking,
subdomain: domain.subdomain,
sesTenantId: domain.sesTenantId,
dkimStatus: domain.dkimStatus,
spfDetails: domain.spfDetails,
dmarcAdded: domain.dmarcAdded,
};
}
export async function validateDomainFromEmail(email: string, teamId: number) {
// Extract email from format like 'Name <email@domain>' this will allow entries such as "Someone @ something <some@domain.com>" to parse correctly as well.
const match = email.match(/<([^>]+)>/);
@@ -130,7 +153,7 @@ export async function validateDomainFromEmail(email: string, teamId: number) {
export async function validateApiKeyDomainAccess(
email: string,
teamId: number,
apiKey: ApiKey & { domain?: { name: string } | null }
apiKey: ApiKey & { domain?: { name: string } | null },
) {
// First validate the domain exists and is verified
const domain = await validateDomainFromEmail(email, teamId);
@@ -155,7 +178,7 @@ export async function createDomain(
teamId: number,
name: string,
region: string,
sesTenantId?: string
sesTenantId?: string,
) {
const domainStr = tldts.getDomain(name);
@@ -187,7 +210,7 @@ export async function createDomain(
name,
region,
sesTenantId,
dkimSelector
dkimSelector,
);
const domain = await db.domain.create({
@@ -204,6 +227,8 @@ export async function createDomain(
},
});
await emitDomainEvent(domain, "domain.created");
return withDnsRecords(domain);
}
@@ -223,9 +248,10 @@ export async function getDomain(id: number, teamId: number) {
}
if (domain.isVerifying) {
const previousStatus = domain.status;
const domainIdentity = await ses.getDomainIdentity(
domain.name,
domain.region
domain.region,
);
const dkimStatus = domainIdentity.DkimAttributes?.Status;
@@ -268,7 +294,7 @@ export async function getDomain(id: number, teamId: number) {
? lastCheckedTime.toISOString()
: (lastCheckedTime ?? null);
return {
const response = {
...domainWithDns,
dkimStatus: normalizedDomain.dkimStatus,
spfDetails: normalizedDomain.spfDetails,
@@ -276,6 +302,16 @@ export async function getDomain(id: number, teamId: number) {
lastCheckedTime: normalizedLastCheckedTime,
dmarcAdded: normalizedDomain.dmarcAdded,
};
if (previousStatus !== domainWithDns.status) {
const eventType: DomainWebhookEventType =
domainWithDns.status === DomainStatus.SUCCESS
? "domain.verified"
: "domain.updated";
await emitDomainEvent(domainWithDns, eventType);
}
return response;
}
return withDnsRecords(domain);
@@ -283,12 +319,16 @@ export async function getDomain(id: number, teamId: number) {
export async function updateDomain(
id: number,
data: { clickTracking?: boolean; openTracking?: boolean }
data: { clickTracking?: boolean; openTracking?: boolean },
) {
return db.domain.update({
const updated = await db.domain.update({
where: { id },
data,
});
await emitDomainEvent(updated, "domain.updated");
return updated;
}
export async function deleteDomain(id: number) {
@@ -303,7 +343,7 @@ export async function deleteDomain(id: number) {
const deleted = await ses.deleteDomain(
domain.name,
domain.region,
domain.sesTenantId ?? undefined
domain.sesTenantId ?? undefined,
);
if (!deleted) {
@@ -312,12 +352,14 @@ export async function deleteDomain(id: number) {
const deletedRecord = await db.domain.delete({ where: { id } });
await emitDomainEvent(domain, "domain.deleted");
return deletedRecord;
}
export async function getDomains(
teamId: number,
options?: { domainId?: number }
options?: { domainId?: number },
) {
const domains = await db.domain.findMany({
where: {
@@ -341,3 +383,14 @@ async function getDmarcRecord(domain: string) {
return null; // or handle error as appropriate
}
}
async function emitDomainEvent(domain: Domain, type: DomainWebhookEventType) {
try {
await WebhookService.emit(domain.teamId, type, buildDomainPayload(domain));
} catch (error) {
logger.error(
{ error, domainId: domain.id, type },
"[DomainService]: Failed to emit domain webhook event",
);
}
}
@@ -101,6 +101,36 @@ export class LimitService {
};
}
static async checkWebhookLimit(teamId: number): Promise<{
isLimitReached: boolean;
limit: number;
reason?: LimitReason;
}> {
// Limits only apply in cloud mode
if (!env.NEXT_PUBLIC_IS_CLOUD) {
return { isLimitReached: false, limit: -1 };
}
const team = await TeamService.getTeamCached(teamId);
const currentCount = await db.webhook.count({
where: { teamId },
});
const limit = PLAN_LIMITS[getActivePlan(team)].webhooks;
if (isLimitExceeded(currentCount, limit)) {
return {
isLimitReached: true,
limit,
reason: LimitReason.WEBHOOK,
};
}
return {
isLimitReached: false,
limit,
};
}
// Checks email sending limits and also triggers usage notifications.
// Side effects:
// - Sends "warning" emails when nearing daily/monthly limits (rate-limited in TeamService)
+217 -2
View File
@@ -1,9 +1,14 @@
import {
EmailStatus,
Prisma,
UnsubscribeReason,
SuppressionReason,
UnsubscribeReason,
type Email,
} from "@prisma/client";
import {
type EmailBasePayload,
type EmailEventPayloadMap,
type EmailWebhookEventType,
} from "@usesend/lib/src/webhook/webhook-events";
import {
SesBounce,
SesClick,
@@ -25,6 +30,7 @@ import {
import { getChildLogger, logger, withLogger } from "../logger/log";
import { randomUUID } from "crypto";
import { SuppressionService } from "./suppression-service";
import { WebhookService } from "./webhook-service";
export async function parseSesHook(data: SesEvent) {
const mailStatus = getEmailStatus(data);
@@ -295,9 +301,218 @@ export async function parseSesHook(data: SesEvent) {
logger.info("Email event created");
try {
const occurredAt = data.mail.timestamp
? new Date(data.mail.timestamp).toISOString()
: new Date().toISOString();
const metadata = buildEmailMetadata(mailStatus, mailData);
await WebhookService.emit(
email.teamId,
emailStatusToEvent(mailStatus),
buildEmailWebhookPayload({
email,
status: mailStatus,
occurredAt,
eventData: mailData,
metadata,
}),
);
} catch (error) {
logger.error(
{ error, emailId: email.id, mailStatus },
"[SesHookParser]: Failed to emit webhook",
);
}
return true;
}
type EmailBounceSubType =
EmailEventPayloadMap["email.bounced"]["bounce"]["subType"];
function buildEmailWebhookPayload(params: {
email: Email;
status: EmailStatus;
occurredAt: string;
eventData: SesEvent | SesEvent[SesEventDataKey];
metadata?: Record<string, unknown>;
}): EmailEventPayloadMap[EmailWebhookEventType] {
const { email, status, eventData, occurredAt, metadata } = params;
const basePayload: EmailBasePayload = {
id: email.id,
status,
from: email.from,
to: email.to,
occurredAt,
campaignId: email.campaignId ?? undefined,
contactId: email.contactId ?? undefined,
domainId: email.domainId ?? null,
subject: email.subject,
metadata,
};
switch (status) {
case EmailStatus.BOUNCED: {
const bounce = eventData as SesBounce | undefined;
return {
...basePayload,
bounce: {
type: bounce?.bounceType ?? "Undetermined",
subType: normalizeBounceSubType(bounce?.bounceSubType),
message: bounce?.bouncedRecipients?.[0]?.diagnosticCode,
},
};
}
case EmailStatus.OPENED: {
const openData = eventData as SesEvent["open"];
return {
...basePayload,
open: {
timestamp: openData?.timestamp ?? occurredAt,
userAgent: openData?.userAgent,
ip: openData?.ipAddress,
},
};
}
case EmailStatus.CLICKED: {
const clickData = eventData as SesClick | undefined;
return {
...basePayload,
click: {
timestamp: clickData?.timestamp ?? occurredAt,
url: clickData?.link ?? "",
userAgent: clickData?.userAgent,
ip: clickData?.ipAddress,
},
};
}
default:
return basePayload;
}
}
function normalizeBounceSubType(
subType: SesBounce["bounceSubType"] | undefined,
): EmailBounceSubType {
const normalized = subType?.replace(/\s+/g, "") as
| EmailBounceSubType
| undefined;
const validSubTypes: EmailBounceSubType[] = [
"General",
"NoEmail",
"Suppressed",
"OnAccountSuppressionList",
"MailboxFull",
"MessageTooLarge",
"ContentRejected",
"AttachmentRejected",
];
if (normalized && validSubTypes.includes(normalized)) {
return normalized;
}
return "General";
}
function emailStatusToEvent(status: EmailStatus): EmailWebhookEventType {
switch (status) {
case EmailStatus.QUEUED:
return "email.queued";
case EmailStatus.SENT:
return "email.sent";
case EmailStatus.DELIVERY_DELAYED:
return "email.delivery_delayed";
case EmailStatus.DELIVERED:
return "email.delivered";
case EmailStatus.BOUNCED:
return "email.bounced";
case EmailStatus.REJECTED:
return "email.rejected";
case EmailStatus.RENDERING_FAILURE:
return "email.rendering_failure";
case EmailStatus.COMPLAINED:
return "email.complained";
case EmailStatus.FAILED:
return "email.failed";
case EmailStatus.CANCELLED:
return "email.cancelled";
case EmailStatus.SUPPRESSED:
return "email.suppressed";
case EmailStatus.OPENED:
return "email.opened";
case EmailStatus.CLICKED:
return "email.clicked";
default:
return "email.queued";
}
}
function buildEmailMetadata(
status: EmailStatus,
mailData: SesEvent | SesEvent[SesEventDataKey],
) {
switch (status) {
case EmailStatus.BOUNCED: {
const bounce = mailData as SesBounce;
return {
bounceType: bounce.bounceType,
bounceSubType: bounce.bounceSubType,
diagnosticCode: bounce.bouncedRecipients?.[0]?.diagnosticCode,
};
}
case EmailStatus.COMPLAINED: {
const complaintInfo = (mailData as any)?.complaint ?? mailData;
return {
feedbackType: complaintInfo?.complaintFeedbackType,
userAgent: complaintInfo?.userAgent,
};
}
case EmailStatus.OPENED: {
const openData = (mailData as any)?.open ?? mailData;
return {
ipAddress: openData?.ipAddress,
userAgent: openData?.userAgent,
};
}
case EmailStatus.CLICKED: {
const click = mailData as SesClick;
return {
ipAddress: click.ipAddress,
userAgent: click.userAgent,
link: click.link,
};
}
case EmailStatus.RENDERING_FAILURE: {
const failure = mailData as SesEvent["renderingFailure"];
return {
errorMessage: failure?.errorMessage,
templateName: failure?.templateName,
};
}
case EmailStatus.DELIVERY_DELAYED: {
const deliveryDelay = mailData as SesEvent["deliveryDelay"];
return {
delayType: deliveryDelay?.delayType,
expirationTime: deliveryDelay?.expirationTime,
delayedRecipients: deliveryDelay?.delayedRecipients,
};
}
case EmailStatus.REJECTED: {
const reject = mailData as SesEvent["reject"];
return {
reason: reject?.reason,
};
}
default:
return undefined;
}
}
async function checkUnsubscribe({
contactId,
campaignId,
@@ -0,0 +1,819 @@
import { WebhookCallStatus, WebhookStatus } from "@prisma/client";
import { Queue, Worker } from "bullmq";
import { createHmac, randomUUID, randomBytes } from "crypto";
import {
WebhookEventData,
WebhookPayloadData,
WEBHOOK_EVENT_VERSION,
type WebhookEvent,
type WebhookEventPayloadMap,
type WebhookEventType,
} from "@usesend/lib/src/webhook/webhook-events";
import { db } from "../db";
import { getRedis } from "../redis";
import {
DEFAULT_QUEUE_OPTIONS,
WEBHOOK_DISPATCH_QUEUE,
} from "../queue/queue-constants";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
import { logger } from "../logger/log";
import { LimitService } from "./limit-service";
import { UnsendApiError } from "../public-api/api-error";
const WEBHOOK_DISPATCH_CONCURRENCY = 25;
const WEBHOOK_MAX_ATTEMPTS = 6;
const WEBHOOK_BASE_BACKOFF_MS = 5_000;
const WEBHOOK_LOCK_TTL_MS = 15_000;
const WEBHOOK_LOCK_RETRY_DELAY_MS = 2_000;
const WEBHOOK_AUTO_DISABLE_THRESHOLD = 30;
const WEBHOOK_REQUEST_TIMEOUT_MS = 10_000;
const WEBHOOK_RESPONSE_TEXT_LIMIT = 4_096;
type WebhookCallJobData = {
callId: string;
teamId?: number;
};
type WebhookCallJob = TeamJob<WebhookCallJobData>;
type WebhookEventInput<TType extends WebhookEventType> =
WebhookPayloadData<TType>;
export class WebhookQueueService {
private static queue = new Queue<WebhookCallJobData>(WEBHOOK_DISPATCH_QUEUE, {
connection: getRedis(),
defaultJobOptions: {
...DEFAULT_QUEUE_OPTIONS,
attempts: WEBHOOK_MAX_ATTEMPTS,
backoff: {
type: "exponential",
delay: WEBHOOK_BASE_BACKOFF_MS,
},
},
});
private static worker = new Worker(
WEBHOOK_DISPATCH_QUEUE,
createWorkerHandler(processWebhookCall),
{
connection: getRedis(),
concurrency: WEBHOOK_DISPATCH_CONCURRENCY,
},
);
static {
this.worker.on("error", (error) => {
logger.error({ error }, "[WebhookQueueService]: Worker error");
});
logger.info("[WebhookQueueService]: Initialized webhook queue service");
}
public static async enqueueCall(callId: string, teamId: number) {
await this.queue.add(
callId,
{
callId,
teamId,
},
{ jobId: callId },
);
}
}
export class WebhookService {
public static async emit<TType extends WebhookEventType>(
teamId: number,
type: TType,
payload: WebhookEventInput<TType>,
) {
const activeWebhooks = await db.webhook.findMany({
where: {
teamId,
status: WebhookStatus.ACTIVE,
OR: [
{
eventTypes: {
has: type,
},
},
{
eventTypes: {
isEmpty: true,
},
},
],
},
});
if (activeWebhooks.length === 0) {
logger.debug(
{ teamId, type },
"[WebhookService]: No active webhooks for event type",
);
return;
}
const payloadString = stringifyPayload(payload);
for (const webhook of activeWebhooks) {
const call = await db.webhookCall.create({
data: {
webhookId: webhook.id,
teamId: webhook.teamId,
type: type,
payload: payloadString,
status: WebhookCallStatus.PENDING,
attempt: 0,
},
});
await WebhookQueueService.enqueueCall(call.id, webhook.teamId);
}
}
public static async retryCall(params: { callId: string; teamId: number }) {
const call = await db.webhookCall.findFirst({
where: { id: params.callId, teamId: params.teamId },
});
if (!call) {
throw new Error("Webhook call not found");
}
await db.webhookCall.update({
where: { id: call.id },
data: {
status: WebhookCallStatus.PENDING,
attempt: 0,
nextAttemptAt: null,
lastError: null,
responseStatus: null,
responseTimeMs: null,
responseText: null,
},
});
await WebhookQueueService.enqueueCall(call.id, params.teamId);
return call.id;
}
public static async testWebhook(params: {
webhookId: string;
teamId: number;
}) {
const webhook = await db.webhook.findFirst({
where: { id: params.webhookId, teamId: params.teamId },
});
if (!webhook) {
throw new Error("Webhook not found");
}
const payload = {
test: true,
webhookId: webhook.id,
sentAt: new Date().toISOString(),
};
const call = await db.webhookCall.create({
data: {
webhookId: webhook.id,
teamId: webhook.teamId,
type: "webhook.test",
payload: stringifyPayload(payload),
status: WebhookCallStatus.PENDING,
attempt: 0,
},
});
await WebhookQueueService.enqueueCall(call.id, webhook.teamId);
return call.id;
}
public static generateSecret() {
return `whsec_${randomBytes(32).toString("hex")}`;
}
public static async listWebhooks(teamId: number) {
return db.webhook.findMany({
where: { teamId },
orderBy: { createdAt: "desc" },
});
}
public static async getWebhook(params: { id: string; teamId: number }) {
const webhook = await db.webhook.findFirst({
where: { id: params.id, teamId: params.teamId },
});
if (!webhook) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Webhook not found",
});
}
return webhook;
}
public static async createWebhook(params: {
teamId: number;
userId: number;
url: string;
description?: string;
eventTypes: string[];
secret?: string;
}) {
const { isLimitReached, reason } = await LimitService.checkWebhookLimit(
params.teamId,
);
if (isLimitReached) {
throw new UnsendApiError({
code: "FORBIDDEN",
message: reason ?? "Webhook limit reached",
});
}
const secret = params.secret ?? WebhookService.generateSecret();
return db.webhook.create({
data: {
teamId: params.teamId,
url: params.url,
description: params.description,
secret,
eventTypes: params.eventTypes,
status: WebhookStatus.ACTIVE,
createdByUserId: params.userId,
},
});
}
public static async updateWebhook(params: {
id: string;
teamId: number;
url?: string;
description?: string | null;
eventTypes?: string[];
rotateSecret?: boolean;
secret?: string;
}) {
const webhook = await db.webhook.findFirst({
where: { id: params.id, teamId: params.teamId },
});
if (!webhook) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Webhook not found",
});
}
const secret =
params.rotateSecret === true
? WebhookService.generateSecret()
: params.secret;
return db.webhook.update({
where: { id: webhook.id },
data: {
url: params.url ?? webhook.url,
description:
params.description === undefined
? webhook.description
: (params.description ?? null),
eventTypes: params.eventTypes ?? webhook.eventTypes,
secret: secret ?? webhook.secret,
},
});
}
public static async setWebhookStatus(params: {
id: string;
teamId: number;
status: WebhookStatus;
}) {
const webhook = await db.webhook.findFirst({
where: { id: params.id, teamId: params.teamId },
});
if (!webhook) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Webhook not found",
});
}
return db.webhook.update({
where: { id: webhook.id },
data: {
status: params.status,
consecutiveFailures:
params.status === WebhookStatus.ACTIVE
? 0
: webhook.consecutiveFailures,
},
});
}
public static async deleteWebhook(params: { id: string; teamId: number }) {
const webhook = await db.webhook.findFirst({
where: { id: params.id, teamId: params.teamId },
});
if (!webhook) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Webhook not found",
});
}
return db.webhook.delete({
where: { id: webhook.id },
});
}
public static async listWebhookCalls(params: {
teamId: number;
webhookId?: string;
status?: WebhookCallStatus;
limit: number;
cursor?: string;
}) {
const calls = await db.webhookCall.findMany({
where: {
teamId: params.teamId,
webhookId: params.webhookId,
status: params.status,
},
orderBy: { createdAt: "desc" },
take: params.limit + 1,
cursor: params.cursor ? { id: params.cursor } : undefined,
});
let nextCursor: string | null = null;
if (calls.length > params.limit) {
const next = calls.pop();
nextCursor = next?.id ?? null;
}
return {
items: calls,
nextCursor,
};
}
public static async getWebhookCall(params: { id: string; teamId: number }) {
const call = await db.webhookCall.findFirst({
where: { id: params.id, teamId: params.teamId },
include: {
webhook: {
select: {
apiVersion: true,
},
},
},
});
if (!call) {
throw new UnsendApiError({
code: "NOT_FOUND",
message: "Webhook call not found",
});
}
return call;
}
}
function stringifyPayload(payload: unknown) {
if (typeof payload === "string") {
return payload;
}
try {
return JSON.stringify(payload);
} catch (error) {
logger.error(
{ error },
"[WebhookService]: Failed to stringify payload, falling back to empty object",
);
return "{}";
}
}
async function processWebhookCall(job: WebhookCallJob) {
const attempt = job.attemptsMade + 1;
const call = await db.webhookCall.findUnique({
where: { id: job.data.callId },
include: {
webhook: true,
},
});
if (!call) {
logger.warn(
{ callId: job.data.callId },
"[WebhookQueueService]: Call not found",
);
return;
}
if (call.webhook.status !== WebhookStatus.ACTIVE) {
await db.webhookCall.update({
where: { id: call.id },
data: {
status: WebhookCallStatus.DISCARDED,
attempt,
},
});
logger.info(
{ callId: call.id, webhookId: call.webhookId },
"[WebhookQueueService]: Discarded call because webhook is not active",
);
return;
}
await db.webhookCall.update({
where: { id: call.id },
data: {
status: WebhookCallStatus.IN_PROGRESS,
attempt,
},
});
const lockKey = `webhook:lock:${call.webhookId}`;
const redis = getRedis();
const lockValue = randomUUID();
const lockAcquired = await acquireLock(redis, lockKey, lockValue);
if (!lockAcquired) {
await db.webhookCall.update({
where: { id: call.id },
data: {
nextAttemptAt: new Date(Date.now() + WEBHOOK_LOCK_RETRY_DELAY_MS),
status: WebhookCallStatus.PENDING,
},
});
// Let BullMQ handle retry timing; this records observability.
throw new Error("Webhook lock not acquired");
}
try {
const body = buildPayload(call, attempt);
const { responseStatus, responseTimeMs, responseText } = await postWebhook({
url: call.webhook.url,
secret: call.webhook.secret,
type: call.type,
callId: call.id,
body,
});
logger.info(
`Webhook call ${call.id} completed successfully, response status: ${responseStatus}, response time: ${responseTimeMs}ms, `,
);
await db.$transaction([
db.webhookCall.update({
where: { id: call.id },
data: {
status: WebhookCallStatus.DELIVERED,
attempt,
responseStatus,
responseTimeMs,
lastError: null,
nextAttemptAt: null,
responseText,
},
}),
db.webhook.update({
where: { id: call.webhookId },
data: {
consecutiveFailures: 0,
lastSuccessAt: new Date(),
},
}),
]);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : "Unknown webhook error";
const responseStatus =
error instanceof WebhookHttpError ? error.statusCode : null;
const responseTimeMs =
error instanceof WebhookHttpError ? error.responseTimeMs : null;
const responseText =
error instanceof WebhookHttpError ? error.responseText : null;
const nextAttemptAt =
attempt < WEBHOOK_MAX_ATTEMPTS
? new Date(Date.now() + computeBackoff(attempt))
: null;
const updatedWebhook = await db.webhook.update({
where: { id: call.webhookId },
data: {
consecutiveFailures: {
increment: 1,
},
lastFailureAt: new Date(),
status:
call.webhook.consecutiveFailures + 1 >= WEBHOOK_AUTO_DISABLE_THRESHOLD
? WebhookStatus.AUTO_DISABLED
: call.webhook.status,
},
});
await db.webhookCall.update({
where: { id: call.id },
data: {
status:
attempt >= WEBHOOK_MAX_ATTEMPTS
? WebhookCallStatus.FAILED
: WebhookCallStatus.PENDING,
attempt,
nextAttemptAt,
lastError: errorMessage,
responseStatus: responseStatus ?? undefined,
responseTimeMs: responseTimeMs ?? undefined,
responseText: responseText ?? undefined,
},
});
const statusLabel =
updatedWebhook.status === WebhookStatus.AUTO_DISABLED
? "auto-disabled"
: "failed";
logger.warn(
{
callId: call.id,
webhookId: call.webhookId,
statusLabel,
attempt,
responseStatus,
nextAttemptAt,
error: errorMessage,
},
"[WebhookQueueService]: Webhook call failure",
);
if (updatedWebhook.status === WebhookStatus.AUTO_DISABLED) {
return;
}
throw error;
} finally {
await releaseLock(redis, lockKey, lockValue);
}
}
async function acquireLock(
redis: ReturnType<typeof getRedis>,
key: string,
value: string,
) {
const result = await redis.set(key, value, "PX", WEBHOOK_LOCK_TTL_MS, "NX");
return result === "OK";
}
async function releaseLock(
redis: ReturnType<typeof getRedis>,
key: string,
value: string,
) {
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`;
try {
await redis.eval(script, 1, key, value);
} catch (error) {
logger.error({ error }, "[WebhookQueueService]: Failed to release lock");
}
}
function computeBackoff(attempt: number) {
const base = WEBHOOK_BASE_BACKOFF_MS * Math.pow(2, attempt - 1);
const jitter = base * 0.3 * Math.random();
return base + jitter;
}
type WebhookPayload = {
id: string;
type: string;
version: string | null;
createdAt: string;
teamId: number;
data: unknown;
attempt: number;
};
function buildPayload(
call: {
id: string;
webhookId: string;
teamId: number;
type: string;
payload: string;
createdAt: Date;
webhook: { apiVersion: string | null };
},
attempt: number,
): WebhookPayload {
let parsed: unknown = call.payload;
try {
parsed = JSON.parse(call.payload);
} catch {
// keep string payload as-is
}
return {
id: call.id,
type: call.type,
version: call.webhook.apiVersion ?? WEBHOOK_EVENT_VERSION,
createdAt: call.createdAt.toISOString(),
teamId: call.teamId,
data: parsed,
attempt,
};
}
class WebhookHttpError extends Error {
public statusCode: number | null;
public responseTimeMs: number | null;
public responseText: string | null;
constructor(
message: string,
statusCode: number | null,
responseTimeMs: number | null,
responseText: string | null,
) {
super(message);
this.statusCode = statusCode;
this.responseTimeMs = responseTimeMs;
this.responseText = responseText;
}
}
async function postWebhook(params: {
url: string;
secret: string;
type: string;
callId: string;
body: WebhookPayload;
}) {
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(),
WEBHOOK_REQUEST_TIMEOUT_MS,
);
const stringBody = JSON.stringify(params.body);
const timestamp = Date.now().toString();
const signature = signBody(params.secret, timestamp, stringBody);
const headers = {
"Content-Type": "application/json",
"User-Agent": "UseSend-Webhook/1.0",
"X-UseSend-Event": params.type,
"X-UseSend-Call": params.callId,
"X-UseSend-Timestamp": timestamp,
"X-UseSend-Signature": signature,
"X-UseSend-Retry": params.body.attempt > 1 ? "true" : "false",
};
const start = Date.now();
try {
const response = await fetch(params.url, {
method: "POST",
headers,
body: stringBody,
redirect: "manual",
signal: controller.signal,
});
const responseTimeMs = Date.now() - start;
const responseText = await captureResponseText(response);
if (response.ok) {
return {
responseStatus: response.status,
responseTimeMs,
responseText,
};
}
throw new WebhookHttpError(
`Non-2xx response: ${response.status}`,
response.status,
responseTimeMs,
responseText,
);
} catch (error) {
const responseTimeMs = Date.now() - start;
if (error instanceof WebhookHttpError) {
throw error;
}
if (error instanceof DOMException && error.name === "AbortError") {
throw new WebhookHttpError(
"Webhook request timed out",
null,
responseTimeMs,
null,
);
}
throw new WebhookHttpError(
error instanceof Error ? error.message : "Unknown fetch error",
null,
responseTimeMs,
null,
);
} finally {
clearTimeout(timeout);
}
}
function signBody(secret: string, timestamp: string, body: string) {
const hmac = createHmac("sha256", secret);
hmac.update(`${timestamp}.${body}`);
return `v1=${hmac.digest("hex")}`;
}
async function captureResponseText(response: Response) {
const contentType = response.headers.get("content-type");
const isText =
contentType?.startsWith("text/") ||
contentType?.includes("application/json") ||
contentType?.includes("application/xml");
if (!isText) {
return null;
}
const contentLengthHeader = response.headers.get("content-length");
const contentLength = contentLengthHeader
? Number.parseInt(contentLengthHeader, 10)
: null;
if (contentLength && Number.isFinite(contentLength)) {
if (contentLength <= 0) {
return "";
}
if (contentLength > WEBHOOK_RESPONSE_TEXT_LIMIT * 2) {
return `<omitted: content-length ${contentLength} exceeds limit ${WEBHOOK_RESPONSE_TEXT_LIMIT}>`;
}
}
const body = response.body;
if (body && typeof body.getReader === "function") {
const reader = body.getReader();
const decoder = new TextDecoder();
let received = 0;
let chunks = "";
let truncated = false;
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
if (value) {
const decoded = decoder.decode(value, { stream: true });
received += decoded.length;
if (received > WEBHOOK_RESPONSE_TEXT_LIMIT) {
const sliceRemaining =
WEBHOOK_RESPONSE_TEXT_LIMIT - (received - decoded.length);
chunks += decoded.slice(0, Math.max(0, sliceRemaining));
truncated = true;
await reader.cancel();
break;
} else {
chunks += decoded;
}
}
}
if (truncated) {
return `${chunks}...<truncated>`;
}
return chunks;
}
const text = await response.text();
if (text.length > WEBHOOK_RESPONSE_TEXT_LIMIT) {
return `${text.slice(0, WEBHOOK_RESPONSE_TEXT_LIMIT)}...<truncated>`;
}
return text;
}