diff --git a/apps/docs/api-reference/openapi.json b/apps/docs/api-reference/openapi.json index 8632d2b..2828c42 100644 --- a/apps/docs/api-reference/openapi.json +++ b/apps/docs/api-reference/openapi.json @@ -243,9 +243,10 @@ { "schema": { "type": "number", + "nullable": true, "example": 1 }, - "required": true, + "required": false, "name": "id", "in": "path" } @@ -494,21 +495,25 @@ "to": { "anyOf": [ { - "type": "string" + "type": "string", + "format": "email" }, { "type": "array", "items": { - "type": "string" + "type": "string", + "format": "email" } } ] }, "from": { - "type": "string" + "type": "string", + "format": "email" }, "subject": { "type": "string", + "minLength": 1, "description": "Optional when templateId is provided" }, "templateId": { @@ -524,12 +529,14 @@ "replyTo": { "anyOf": [ { - "type": "string" + "type": "string", + "format": "email" }, { "type": "array", "items": { - "type": "string" + "type": "string", + "format": "email" } } ] @@ -537,12 +544,14 @@ "cc": { "anyOf": [ { - "type": "string" + "type": "string", + "format": "email" }, { "type": "array", "items": { - "type": "string" + "type": "string", + "format": "email" } } ] @@ -550,21 +559,27 @@ "bcc": { "anyOf": [ { - "type": "string" + "type": "string", + "format": "email" }, { "type": "array", "items": { - "type": "string" + "type": "string", + "format": "email" } } ] }, "text": { - "type": "string" + "type": "string", + "nullable": true, + "minLength": 1 }, "html": { - "type": "string" + "type": "string", + "nullable": true, + "minLength": 1 }, "attachments": { "type": "array", @@ -572,17 +587,20 @@ "type": "object", "properties": { "filename": { - "type": "string" + "type": "string", + "minLength": 1 }, "content": { - "type": "string" + "type": "string", + "minLength": 1 } }, "required": [ "filename", "content" ] - } + }, + "maxItems": 10 }, "scheduledAt": { "type": "string", @@ -616,6 +634,175 @@ } } }, + "/v1/emails/batch": { + "post": { + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "object", + "properties": { + "to": { + "anyOf": [ + { + "type": "string", + "format": "email" + }, + { + "type": "array", + "items": { + "type": "string", + "format": "email" + } + } + ] + }, + "from": { + "type": "string", + "format": "email" + }, + "subject": { + "type": "string", + "minLength": 1, + "description": "Optional when templateId is provided" + }, + "templateId": { + "type": "string", + "description": "ID of a template from the dashboard" + }, + "variables": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "replyTo": { + "anyOf": [ + { + "type": "string", + "format": "email" + }, + { + "type": "array", + "items": { + "type": "string", + "format": "email" + } + } + ] + }, + "cc": { + "anyOf": [ + { + "type": "string", + "format": "email" + }, + { + "type": "array", + "items": { + "type": "string", + "format": "email" + } + } + ] + }, + "bcc": { + "anyOf": [ + { + "type": "string", + "format": "email" + }, + { + "type": "array", + "items": { + "type": "string", + "format": "email" + } + } + ] + }, + "text": { + "type": "string", + "nullable": true, + "minLength": 1 + }, + "html": { + "type": "string", + "nullable": true, + "minLength": 1 + }, + "attachments": { + "type": "array", + "items": { + "type": "object", + "properties": { + "filename": { + "type": "string", + "minLength": 1 + }, + "content": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "filename", + "content" + ] + }, + "maxItems": 10 + }, + "scheduledAt": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "to", + "from" + ] + }, + "maxItems": 100 + } + } + } + }, + "responses": { + "200": { + "description": "List of successfully created email IDs", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "type": "object", + "properties": { + "emailId": { + "type": "string" + } + }, + "required": [ + "emailId" + ] + } + } + }, + "required": [ + "data" + ] + } + } + } + } + } + } + }, "/v1/emails/{emailId}/cancel": { "post": { "parameters": [ diff --git a/apps/web/src/server/public-api/api/emails/batch-email.ts b/apps/web/src/server/public-api/api/emails/batch-email.ts new file mode 100644 index 0000000..0738c76 --- /dev/null +++ b/apps/web/src/server/public-api/api/emails/batch-email.ts @@ -0,0 +1,73 @@ +import { createRoute, z } from "@hono/zod-openapi"; +import { PublicAPIApp } from "~/server/public-api/hono"; +import { getTeamFromToken } from "~/server/public-api/auth"; +import { sendBulkEmails } from "~/server/service/email-service"; +import { EmailContent } from "~/types"; +import { emailSchema } from "../../schemas/email-schema"; // Corrected import path + +// Define the schema for a single email within the bulk request +// This is similar to the schema in send-email.ts but without the top-level 'required' +// Removed inline emailSchema definition + +const route = createRoute({ + method: "post", + path: "/v1/emails/batch", + request: { + body: { + required: true, + content: { + "application/json": { + // Use the imported schema in an array + schema: z.array(emailSchema).max(100, { + message: + "Cannot send more than 100 emails in a single bulk request", + }), + }, + }, + }, + }, + responses: { + 200: { + content: { + "application/json": { + // Return an array of objects with the created email IDs + schema: z.object({ + data: z.array(z.object({ emailId: z.string() })), + }), + }, + }, + description: "List of successfully created email IDs", + }, + // Add other potential error responses based on sendBulkEmails logic if needed + }, +}); + +function sendBatch(app: PublicAPIApp) { + app.openapi(route, async (c) => { + const team = await getTeamFromToken(c); + const emailPayloads = c.req.valid("json"); + + // Add teamId and apiKeyId to each email payload + const emailsToSend: Array< + EmailContent & { teamId: number; apiKeyId?: number } + > = emailPayloads.map((payload) => ({ + ...payload, + text: payload.text ?? undefined, + html: payload.html ?? undefined, + teamId: team.id, + apiKeyId: team.apiKeyId, + })); + + // Call the service function to send emails in bulk + const createdEmails = await sendBulkEmails(emailsToSend); + + // Map the result to the response format + const responseData = createdEmails.map((email) => ({ + emailId: email.id, + })); + + return c.json({ data: responseData }); + }); +} + +export default sendBatch; diff --git a/apps/web/src/server/public-api/api/emails/send-email.ts b/apps/web/src/server/public-api/api/emails/send-email.ts index 41ac37f..144ca95 100644 --- a/apps/web/src/server/public-api/api/emails/send-email.ts +++ b/apps/web/src/server/public-api/api/emails/send-email.ts @@ -2,6 +2,7 @@ import { createRoute, z } from "@hono/zod-openapi"; import { PublicAPIApp } from "~/server/public-api/hono"; import { getTeamFromToken } from "~/server/public-api/auth"; import { sendEmail } from "~/server/service/email-service"; +import { emailSchema } from "../../schemas/email-schema"; const route = createRoute({ method: "post", @@ -11,36 +12,7 @@ const route = createRoute({ required: true, content: { "application/json": { - schema: z - .object({ - to: z.string().or(z.array(z.string())), - from: z.string(), - subject: z.string().optional().openapi({ - description: "Optional when templateId is provided", - }), - templateId: z.string().optional().openapi({ - description: "ID of a template from the dashboard", - }), - variables: z.record(z.string()).optional(), - replyTo: z.string().or(z.array(z.string())).optional(), - cc: z.string().or(z.array(z.string())).optional(), - bcc: z.string().or(z.array(z.string())).optional(), - text: z.string().optional().nullable(), - html: z.string().optional().nullable(), - attachments: z - .array( - z.object({ - filename: z.string(), - content: z.string(), - }) - ) - .optional(), - scheduledAt: z.string().datetime().optional(), - }) - .refine( - (data) => !!data.subject || !!data.templateId, - "Either subject or templateId should be passed." - ), + schema: emailSchema, }, }, }, diff --git a/apps/web/src/server/public-api/index.ts b/apps/web/src/server/public-api/index.ts index abe0bdb..e63e698 100644 --- a/apps/web/src/server/public-api/index.ts +++ b/apps/web/src/server/public-api/index.ts @@ -12,6 +12,7 @@ import upsertContact from "./api/contacts/upsert-contact"; import createDomain from "./api/domains/create-domain"; import deleteContact from "./api/contacts/delete-contact"; import verifyDomain from "./api/domains/verify-domain"; +import sendBatch from "./api/emails/batch-email"; export const app = getApp(); @@ -23,6 +24,7 @@ verifyDomain(app); /**Email related APIs */ getEmail(app); sendEmail(app); +sendBatch(app); updateEmailScheduledAt(app); cancelScheduledEmail(app); diff --git a/apps/web/src/server/public-api/schemas/email-schema.ts b/apps/web/src/server/public-api/schemas/email-schema.ts new file mode 100644 index 0000000..d6ab8ee --- /dev/null +++ b/apps/web/src/server/public-api/schemas/email-schema.ts @@ -0,0 +1,40 @@ +import { z } from "@hono/zod-openapi"; + +/** + * Reusable Zod schema for a single email payload used in public API requests. + */ +export const emailSchema = z + .object({ + to: z.string().email().or(z.array(z.string().email())), + from: z.string().email(), + subject: z.string().min(1).optional().openapi({ + description: "Optional when templateId is provided", + }), + templateId: z.string().optional().openapi({ + description: "ID of a template from the dashboard", + }), + variables: z.record(z.string()).optional(), + replyTo: z.string().email().or(z.array(z.string().email())).optional(), + cc: z.string().email().or(z.array(z.string().email())).optional(), + bcc: z.string().email().or(z.array(z.string().email())).optional(), + text: z.string().min(1).optional().nullable(), + html: z.string().min(1).optional().nullable(), + attachments: z + .array( + z.object({ + filename: z.string().min(1), + content: z.string().min(1), // Consider base64 validation if needed + }) + ) + .max(10) // Limit attachments array size if desired + .optional(), + scheduledAt: z.string().datetime({ offset: true }).optional(), // Ensure ISO 8601 format with offset + }) + .refine( + (data) => !!data.subject || !!data.templateId, + "Either subject or templateId must be provided." + ) + .refine( + (data) => !!data.text || !!data.html, + "Either text or html content must be provided." + ); diff --git a/apps/web/src/server/service/email-queue-service.ts b/apps/web/src/server/service/email-queue-service.ts index f5fc618..b94c6d9 100644 --- a/apps/web/src/server/service/email-queue-service.ts +++ b/apps/web/src/server/service/email-queue-service.ts @@ -7,6 +7,7 @@ import { db } from "../db"; import { sendEmailThroughSes, sendEmailWithAttachments } from "../aws/ses"; import { getRedis } from "../redis"; import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants"; +import { Prisma } from "@prisma/client"; function createQueueAndWorker(region: string, quota: number, suffix: string) { const connection = getRedis(); @@ -112,6 +113,113 @@ export class EmailQueueService { ); } + /** + * Efficiently queues multiple pre-defined email jobs using BullMQ's addBulk. + * Jobs are grouped by region and type (transactional/marketing) before queuing. + * + * @param jobs - Array of job details to queue. + * @returns A promise that resolves when all bulk additions are attempted. + */ + public static async queueBulk( + jobs: { + emailId: string; + region: string; + transactional: boolean; + unsubUrl?: string; + delay?: number; + timestamp?: number; // Optional: pass timestamp if needed for data + }[] + ): Promise { + if (jobs.length === 0) { + console.log("[EmailQueueService]: No jobs provided for bulk queue."); + return; + } + + if (!this.initialized) { + await this.init(); + } + + console.log( + `[EmailQueueService]: Starting bulk queue for ${jobs.length} jobs.` + ); + + // Group jobs by region and type + const groupedJobs = jobs.reduce( + (acc, job) => { + const key = `${job.region}-${job.transactional ? "transactional" : "marketing"}`; + if (!acc[key]) { + acc[key] = { + queue: job.transactional + ? this.transactionalQueue.get(job.region) + : this.marketingQueue.get(job.region), + region: job.region, + transactional: job.transactional, + jobDetails: [], + }; + } + acc[key]?.jobDetails.push(job); + return acc; + }, + {} as Record< + string, + { + queue: Queue | undefined; + region: string; + transactional: boolean; + jobDetails: typeof jobs; + } + > + ); + + const bulkAddPromises: Promise[] = []; + + for (const groupKey in groupedJobs) { + const group = groupedJobs[groupKey]; + if (!group || !group.queue) { + console.error( + `[EmailQueueService]: Queue not found for group ${groupKey} during bulk add. Skipping ${group?.jobDetails?.length ?? 0} jobs.` + ); + // Optionally: handle these skipped jobs (e.g., mark corresponding emails as failed) + continue; + } + + const queue = group.queue; + const isBulk = !group.transactional; + const bulkData = group.jobDetails.map((job) => ({ + name: job.emailId, // Use emailId as job name (matches single queue logic) + data: { + emailId: job.emailId, + timestamp: job.timestamp ?? Date.now(), + unsubUrl: job.unsubUrl, + isBulk, + }, + opts: { + jobId: job.emailId, // Use emailId as jobId + delay: job.delay, + ...DEFAULT_QUEUE_OPTIONS, // Apply default options (attempts, backoff) + }, + })); + + console.log( + `[EmailQueueService]: Adding ${bulkData.length} jobs to queue ${queue.name}` + ); + bulkAddPromises.push( + queue.addBulk(bulkData).catch((error) => { + console.error( + `[EmailQueueService]: Failed to add bulk jobs to queue ${queue.name}:`, + error + ); + // Optionally: handle bulk add failure (e.g., mark corresponding emails as failed) + }) + ); + } + + await Promise.allSettled(bulkAddPromises); + console.log( + "[EmailQueueService]: Finished processing bulk queue requests." + ); + } + public static async changeDelay( emailId: string, region: string, diff --git a/apps/web/src/server/service/email-service.ts b/apps/web/src/server/service/email-service.ts index c47b759..8cb66c1 100644 --- a/apps/web/src/server/service/email-service.ts +++ b/apps/web/src/server/service/email-service.ts @@ -220,3 +220,213 @@ export async function cancelEmail(emailId: string) { }, }); } + +/** + * Send multiple emails in bulk (up to 100 at a time) + * Handles template rendering, variable replacement, and efficient bulk queuing + */ +export async function sendBulkEmails( + emailContents: Array< + EmailContent & { + teamId: number; + apiKeyId?: number; + } + > +) { + if (emailContents.length === 0) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "No emails provided for bulk send", + }); + } + + if (emailContents.length > 100) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: "Cannot send more than 100 emails in a single bulk request", + }); + } + + // Group emails by domain to minimize domain validations + const emailsByDomain = new Map< + string, + { + domain: Awaited>; + emails: typeof emailContents; + } + >(); + + // First pass: validate domains and group emails + for (const content of emailContents) { + const { from } = content; + if (!emailsByDomain.has(from)) { + const domain = await validateDomainFromEmail(from, content.teamId); + emailsByDomain.set(from, { domain, emails: [] }); + } + emailsByDomain.get(from)?.emails.push(content); + } + + // Cache templates to avoid repeated database queries + const templateCache = new Map< + number, + { subject: string; content: any; renderer: EmailRenderer } + >(); + + const createdEmails = []; + const queueJobs = []; + + // Process each domain group + for (const { domain, emails } of emailsByDomain.values()) { + // Process emails in each domain group + for (const content of emails) { + const { + to, + from, + subject: subjectFromApiCall, + templateId, + variables, + text, + html: htmlFromApiCall, + teamId, + attachments, + replyTo, + cc, + bcc, + scheduledAt, + apiKeyId, + } = content; + + let subject = subjectFromApiCall; + let html = htmlFromApiCall; + + // Process template if specified + if (templateId) { + let templateData = templateCache.get(Number(templateId)); + if (!templateData) { + const template = await db.template.findUnique({ + where: { id: templateId }, + }); + if (template) { + const jsonContent = JSON.parse(template.content || "{}"); + templateData = { + subject: template.subject || "", + content: jsonContent, + renderer: new EmailRenderer(jsonContent), + }; + templateCache.set(Number(templateId), templateData); + } + } + + if (templateData) { + subject = replaceVariables(templateData.subject, variables || {}); + + // {{}} for link replacements + const modifiedVariables = { + ...variables, + ...Object.keys(variables || {}).reduce( + (acc, key) => { + acc[`{{${key}}}`] = variables?.[key] || ""; + return acc; + }, + {} as Record + ), + }; + + html = await templateData.renderer.render({ + shouldReplaceVariableValues: true, + variableValues: modifiedVariables, + }); + } + } + + if (!text && !html) { + throw new UnsendApiError({ + code: "BAD_REQUEST", + message: `Either text or html is required for email to ${to}`, + }); + } + + const scheduledAtDate = scheduledAt ? new Date(scheduledAt) : undefined; + const delay = scheduledAtDate + ? Math.max(0, scheduledAtDate.getTime() - Date.now()) + : undefined; + + try { + // Create email record + const email = await db.email.create({ + data: { + to: Array.isArray(to) ? to : [to], + from, + subject: subject as string, + replyTo: replyTo + ? Array.isArray(replyTo) + ? replyTo + : [replyTo] + : undefined, + cc: cc ? (Array.isArray(cc) ? cc : [cc]) : undefined, + bcc: bcc ? (Array.isArray(bcc) ? bcc : [bcc]) : undefined, + text, + html, + teamId, + domainId: domain.id, + attachments: attachments ? JSON.stringify(attachments) : undefined, + scheduledAt: scheduledAtDate, + latestStatus: scheduledAtDate ? "SCHEDULED" : "QUEUED", + apiId: apiKeyId, + }, + }); + + createdEmails.push(email); + + // Prepare queue job + queueJobs.push({ + emailId: email.id, + region: domain.region, + transactional: true, // Bulk emails are still transactional + delay, + timestamp: Date.now(), + }); + } catch (error: any) { + console.error( + `Failed to create email record for recipient ${to}:`, + error + ); + // Continue processing other emails + } + } + } + + if (queueJobs.length === 0) { + throw new UnsendApiError({ + code: "INTERNAL_SERVER_ERROR", + message: "Failed to create any email records", + }); + } + + // Bulk queue all jobs + try { + await EmailQueueService.queueBulk(queueJobs); + } catch (error: any) { + // Mark all created emails as failed + await Promise.all( + createdEmails.map(async (email) => { + await db.emailEvent.create({ + data: { + emailId: email.id, + status: "FAILED", + data: { + error: error.toString(), + }, + }, + }); + await db.email.update({ + where: { id: email.id }, + data: { latestStatus: "FAILED" }, + }); + }) + ); + throw error; + } + + return createdEmails; +} diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 5fae8a0..5a11674 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -1,6 +1,6 @@ { "name": "unsend", - "version": "1.4.2", + "version": "1.5.0", "description": "", "main": "./dist/index.js", "module": "./dist/index.mjs", diff --git a/packages/sdk/src/email.ts b/packages/sdk/src/email.ts index 24a9ce7..3f7b301 100644 --- a/packages/sdk/src/email.ts +++ b/packages/sdk/src/email.ts @@ -46,6 +46,27 @@ type CancelEmailResponse = { type CancelEmailResponseSuccess = paths["/v1/emails/{emailId}/cancel"]["post"]["responses"]["200"]["content"]["application/json"]; +// Batch emails types +/** + * Payload for sending multiple emails in a single batch request. + */ +type BatchEmailPayload = + paths["/v1/emails/batch"]["post"]["requestBody"]["content"]["application/json"]; + +/** + * Successful response schema for batch email send. + */ +type BatchEmailResponseSuccess = + paths["/v1/emails/batch"]["post"]["responses"]["200"]["content"]["application/json"]; + +/** + * Response structure for the batch method. + */ +type BatchEmailResponse = { + data: BatchEmailResponseSuccess["data"] | null; + error: ErrorResponse | null; +}; + export class Emails { constructor(private readonly unsend: Unsend) { this.unsend = unsend; @@ -69,6 +90,24 @@ export class Emails { return data; } + /** + * Send up to 100 emails in a single request. + * + * @param payload An array of email payloads. Max 100 emails. + * @returns A promise that resolves to the list of created email IDs or an error. + */ + async batch(payload: BatchEmailPayload): Promise { + // Note: React element rendering is not supported in batch mode. + const response = await this.unsend.post( + "/emails/batch", + payload + ); + return { + data: response.data ? response.data.data : null, + error: response.error, + }; + } + async get(id: string): Promise { const data = await this.unsend.get( `/emails/${id}` diff --git a/packages/sdk/types/schema.d.ts b/packages/sdk/types/schema.d.ts index 46608af..fd5cfa0 100644 --- a/packages/sdk/types/schema.d.ts +++ b/packages/sdk/types/schema.d.ts @@ -109,7 +109,7 @@ export interface paths { put: { parameters: { path: { - id: number; + id: number | null; }; }; responses: { @@ -192,6 +192,7 @@ export interface paths { content: { "application/json": { to: string | string[]; + /** Format: email */ from: string; /** @description Optional when templateId is provided */ subject?: string; @@ -203,8 +204,8 @@ export interface paths { replyTo?: string | string[]; cc?: string | string[]; bcc?: string | string[]; - text?: string; - html?: string; + text?: string | null; + html?: string | null; attachments?: { filename: string; content: string; @@ -226,6 +227,49 @@ export interface paths { }; }; }; + "/v1/emails/batch": { + post: { + requestBody: { + content: { + "application/json": ({ + to: string | string[]; + /** Format: email */ + from: string; + /** @description Optional when templateId is provided */ + subject?: string; + /** @description ID of a template from the dashboard */ + templateId?: string; + variables?: { + [key: string]: string; + }; + replyTo?: string | string[]; + cc?: string | string[]; + bcc?: string | string[]; + text?: string | null; + html?: string | null; + attachments?: { + filename: string; + content: string; + }[]; + /** Format: date-time */ + scheduledAt?: string; + })[]; + }; + }; + responses: { + /** @description List of successfully created email IDs */ + 200: { + content: { + "application/json": { + data: { + emailId: string; + }[]; + }; + }; + }; + }; + }; + }; "/v1/emails/{emailId}/cancel": { post: { parameters: {