idempotency (#282)

This commit is contained in:
KM Koushik
2025-11-17 11:42:09 +11:00
committed by GitHub
parent eacf231173
commit cb489654b5
14 changed files with 859 additions and 1118 deletions
@@ -1,9 +1,9 @@
import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { getTeamFromToken } from "~/server/public-api/auth";
import { sendBulkEmails } from "~/server/service/email-service";
import { EmailContent } from "~/types";
import { emailSchema } from "../../schemas/email-schema"; // Corrected import path
import { emailSchema } from "../../schemas/email-schema";
import { IdempotencyService } from "~/server/service/idempotency-service";
// Define the schema for a single email within the bulk request
// This is similar to the schema in send-email.ts but without the top-level 'required'
@@ -13,6 +13,24 @@ const route = createRoute({
method: "post",
path: "/v1/emails/batch",
request: {
headers: z
.object({
"Idempotency-Key": z
.string()
.min(1)
.max(256)
.optional()
.openapi({
description: `Pass the optional Idempotency-Key header to make the request safe to retry. The key can be up to 256 characters. The server stores the canonical request body and behaves as follows:
- Same key + same request body → returns the original emailId with 200 OK without re-sending.
- Same key + different request body → returns 409 Conflict with code: NOT_UNIQUE so you can detect the mismatch.
- Same key while another request is still being processed → returns 409 Conflict; retry after a short delay or once the first request completes.
Entries expire after 24 hours. Use a unique key per logical send (for example, an order or signup ID).`,
}),
})
.partial(),
body: {
required: true,
content: {
@@ -47,27 +65,41 @@ function sendBatch(app: PublicAPIApp) {
const team = c.var.team;
const emailPayloads = c.req.valid("json");
// Add teamId and apiKeyId to each email payload
const emailsToSend: Array<
EmailContent & { teamId: number; apiKeyId?: number }
> = emailPayloads.map((payload) => ({
const normalizedPayloads = emailPayloads.map((payload) => ({
...payload,
text: payload.text ?? undefined,
html:
payload.html && payload.html !== "true" && payload.html !== "false"
? payload.html
: undefined,
}));
const idemKey = c.req.header("Idempotency-Key") ?? undefined;
const responseData = await IdempotencyService.withIdempotency({
teamId: team.id,
apiKeyId: team.apiKeyId,
}));
idemKey,
payload: normalizedPayloads,
operation: async () => {
const emailsToSend: Array<
EmailContent & { teamId: number; apiKeyId?: number }
> = normalizedPayloads.map((payload) => ({
...payload,
teamId: team.id,
apiKeyId: team.apiKeyId,
}));
// Call the service function to send emails in bulk
const createdEmails = await sendBulkEmails(emailsToSend);
const createdEmails = await sendBulkEmails(emailsToSend);
// Map the result to the response format
const responseData = createdEmails.map((email) => ({
emailId: email.id,
}));
return createdEmails.map((email) => ({
emailId: email.id,
}));
},
extractEmailIds: (data) => data.map((item) => item.emailId),
formatCachedResponse: (emailIds) =>
emailIds.map((id) => ({ emailId: id })),
logContext: "bulk email send",
});
return c.json({ data: responseData });
});
@@ -2,11 +2,30 @@ import { createRoute, z } from "@hono/zod-openapi";
import { PublicAPIApp } from "~/server/public-api/hono";
import { sendEmail } from "~/server/service/email-service";
import { emailSchema } from "../../schemas/email-schema";
import { IdempotencyService } from "~/server/service/idempotency-service";
const route = createRoute({
method: "post",
path: "/v1/emails",
request: {
headers: z
.object({
"Idempotency-Key": z
.string()
.min(1)
.max(256)
.optional()
.openapi({
description: `Pass the optional Idempotency-Key header to make the request safe to retry. The key can be up to 256 characters. The server stores the canonical request body and behaves as follows:
- Same key + same request body → returns the original emailId with 200 OK without re-sending.
- Same key + different request body → returns 409 Conflict with code: NOT_UNIQUE so you can detect the mismatch.
- Same key while another request is still being processed → returns 409 Conflict; retry after a short delay or once the first request completes.
Entries expire after 24 hours. Use a unique key per logical send (for example, an order or signup ID).`,
}),
})
.partial(),
body: {
required: true,
content: {
@@ -31,24 +50,43 @@ const route = createRoute({
function send(app: PublicAPIApp) {
app.openapi(route, async (c) => {
const team = c.var.team;
const requestBody = c.req.valid("json");
let html = undefined;
const _html = c.req.valid("json")?.html?.toString();
if (_html && _html !== "true" && _html !== "false") {
html = _html;
let html: string | undefined;
const rawHtml = requestBody?.html?.toString();
if (rawHtml && rawHtml !== "true" && rawHtml !== "false") {
html = rawHtml;
}
const email = await sendEmail({
...c.req.valid("json"),
const clientPayload = {
...requestBody,
text: requestBody.text ?? undefined,
html,
};
const idemKey = c.req.header("Idempotency-Key") ?? undefined;
const result = await IdempotencyService.withIdempotency<
typeof clientPayload,
{ emailId?: string }
>({
teamId: team.id,
apiKeyId: team.apiKeyId,
text: c.req.valid("json").text ?? undefined,
html: html,
idemKey,
payload: clientPayload,
operation: async () => {
const email = await sendEmail({
...clientPayload,
teamId: team.id,
apiKeyId: team.apiKeyId,
});
return { emailId: email?.id };
},
extractEmailIds: (result) => (result.emailId ? [result.emailId] : []),
formatCachedResponse: (emailIds) => ({ emailId: emailIds[0] }),
logContext: "email send",
});
return c.json({ emailId: email?.id });
return c.json(result);
});
}
@@ -0,0 +1,177 @@
import { getRedis } from "~/server/redis";
import { canonicalizePayload } from "~/server/utils/idempotency";
import { UnsendApiError } from "~/server/public-api/api-error";
import { logger } from "~/server/logger/log";
const IDEMPOTENCY_RESULT_TTL_SECONDS = 24 * 60 * 60; // 24h
const IDEMPOTENCY_LOCK_TTL_SECONDS = 60; // 60s
export type IdempotencyRecord = {
bodyHash: string;
emailIds: string[];
};
export type IdempotencyHandlerOptions<TPayload, TResult> = {
teamId: number;
idemKey: string | undefined;
payload: TPayload;
operation: () => Promise<TResult>;
extractEmailIds: (result: TResult) => string[];
formatCachedResponse: (emailIds: string[]) => TResult;
logContext: string;
};
function resultKey(teamId: number, key: string) {
return `idem:${teamId}:${key}`;
}
function lockKey(teamId: number, key: string) {
return `idemlock:${teamId}:${key}`;
}
export const IdempotencyService = {
async getResult(
teamId: number,
key: string,
): Promise<IdempotencyRecord | null> {
const redis = getRedis();
const raw = await redis.get(resultKey(teamId, key));
if (!raw) return null;
try {
const parsed = JSON.parse(raw);
if (
parsed &&
typeof parsed === "object" &&
typeof (parsed as any).bodyHash === "string" &&
Array.isArray((parsed as any).emailIds)
) {
return parsed as IdempotencyRecord;
}
return null;
} catch {
return null;
}
},
async setResult(
teamId: number,
key: string,
record: IdempotencyRecord,
): Promise<void> {
const redis = getRedis();
await redis.setex(
resultKey(teamId, key),
IDEMPOTENCY_RESULT_TTL_SECONDS,
JSON.stringify(record),
);
},
async acquireLock(teamId: number, key: string): Promise<boolean> {
const redis = getRedis();
const ok = await redis.set(
lockKey(teamId, key),
"1",
"EX",
IDEMPOTENCY_LOCK_TTL_SECONDS,
"NX",
);
return ok === "OK";
},
async releaseLock(teamId: number, key: string): Promise<void> {
const redis = getRedis();
await redis.del(lockKey(teamId, key));
},
async withIdempotency<TPayload, TResult>(
options: IdempotencyHandlerOptions<TPayload, TResult>,
): Promise<TResult> {
const {
teamId,
idemKey,
payload,
operation,
extractEmailIds,
formatCachedResponse,
logContext,
} = options;
// Validate idempotency key length
if (idemKey !== undefined && (idemKey.length < 1 || idemKey.length > 256)) {
throw new UnsendApiError({
code: "BAD_REQUEST",
message: "Invalid Idempotency-Key length",
});
}
// If no idempotency key, just execute the operation
if (!idemKey) {
return await operation();
}
// Calculate payload hash
const { bodyHash: payloadHash } = canonicalizePayload(payload);
// Check for existing result
const existing = await this.getResult(teamId, idemKey);
if (existing) {
if (existing.bodyHash === payloadHash) {
logger.info({ teamId }, `Idempotency hit for ${logContext}`);
return formatCachedResponse(existing.emailIds);
}
throw new UnsendApiError({
code: "NOT_UNIQUE",
message: "Idempotency-Key already used with a different payload",
});
}
// Try to acquire lock
const lockAcquired = await this.acquireLock(teamId, idemKey);
if (!lockAcquired) {
// Check again in case another request completed
const again = await this.getResult(teamId, idemKey);
if (again) {
if (again.bodyHash === payloadHash) {
logger.info(
{ teamId },
`Idempotency hit after contention for ${logContext}`,
);
return formatCachedResponse(again.emailIds);
}
throw new UnsendApiError({
code: "NOT_UNIQUE",
message: "Idempotency-Key already used with a different payload",
});
}
throw new UnsendApiError({
code: "NOT_UNIQUE",
message:
"Request with same Idempotency-Key is in progress. Retry later.",
});
}
try {
// Execute the operation
const result = await operation();
// Store the result for future idempotency checks
await this.setResult(teamId, idemKey, {
bodyHash: payloadHash,
emailIds: extractEmailIds(result),
});
return result;
} finally {
// Always release the lock
await this.releaseLock(teamId, idemKey);
}
},
};
export const IDEMPOTENCY_CONSTANTS = {
RESULT_TTL_SECONDS: IDEMPOTENCY_RESULT_TTL_SECONDS,
LOCK_TTL_SECONDS: IDEMPOTENCY_LOCK_TTL_SECONDS,
};
+69
View File
@@ -0,0 +1,69 @@
import { createHash } from "crypto";
type CanonicalValue =
| string
| number
| boolean
| null
| CanonicalValue[]
| { [key: string]: CanonicalValue };
function normalize(value: unknown): CanonicalValue | undefined {
if (value === undefined) {
return undefined;
}
if (value === null) {
return null;
}
if (Array.isArray(value)) {
return value.map((item) => normalize(item) ?? null);
}
if (value instanceof Date) {
return value.toISOString();
}
if (typeof value === "object") {
const entries = Object.entries(value as Record<string, unknown>).sort(
([keyA], [keyB]) => (keyA < keyB ? -1 : keyA > keyB ? 1 : 0)
);
const result: Record<string, CanonicalValue> = {};
for (const [key, val] of entries) {
const normalized = normalize(val);
if (normalized !== undefined) {
result[key] = normalized;
}
}
return result;
}
if (typeof value === "string") {
return value;
}
if (typeof value === "number") {
return value;
}
if (typeof value === "boolean") {
return value;
}
if (typeof value === "bigint") {
return value.toString();
}
return String(value);
}
export function canonicalizePayload(payload: unknown) {
const normalized = normalize(payload);
const canonical = JSON.stringify(normalized ?? null);
const bodyHash = createHash("sha256").update(canonical).digest("hex");
return { canonical, bodyHash };
}