feat: add REDIS_KEY_PREFIX env var for Redis ACL namespace isolation (#365)

* feat: add REDIS_KEY_PREFIX env var for Redis ACL namespace isolation

Adds optional REDIS_KEY_PREFIX env var that prefixes all Redis keys
(BullMQ queues via `prefix` option, cache/lock/rate-limit keys via
`redisKey()` helper). When unset, behavior is unchanged (BullMQ
defaults to "bull:", cache keys are unprefixed).

This enables self-hosters using Redis ACL multi-tenancy to restrict
useSend to its own key namespace (e.g. `~usesend:*`).

16 files changed across env schema, Redis module, 9 BullMQ queue/worker
files, and 5 direct Redis key operation sites.

* docs: add REDIS_KEY_PREFIX to self-host assets and fix docker run example

Add REDIS_KEY_PREFIX env var to docker/prod/compose.yml, .env.example,
.env.selfhost.example, and self-hosting docs. Fix missing trailing
backslashes in standalone docker run example.

* fix(redis): disable ioredis ready check and BullMQ version check

Redis ACL blocks INFO command (in @dangerous category). ioredis uses
INFO for ready check, BullMQ uses it for version detection. Without
these flags, BullMQ workers fail to initialize and silently stop
processing jobs.

- Add enableReadyCheck: false to ioredis connection
- Add skipVersionCheck: true to all 5 Queue + 5 Worker constructors

* fix(redis): add skipVersionCheck to remaining BullMQ job queues

Add skipVersionCheck: true to Queue and Worker constructors in all 4 job
files (campaign-scheduler, cleanup-email-bodies, usage-job,
webhook-cleanup) to match the pattern already used in service files.
This prevents BullMQ version mismatch errors when using REDIS_KEY_PREFIX
with Redis ACL namespace isolation.
This commit is contained in:
Michał Ordon
2026-03-01 21:12:47 +00:00
committed by GitHub
parent 69eeb2d96e
commit 62d7c44efc
21 changed files with 100 additions and 37 deletions
+2
View File
@@ -59,6 +59,8 @@ DATABASE_URL="postgres://<username>:<password>@<host>:<port>/<database-name>"
REDIS_URL="redis://<username>:<password>@<host>:<port>"
```
If you're sharing a Redis instance across multiple apps and using Redis ACL for isolation, set `REDIS_KEY_PREFIX` to namespace all keys (e.g. `REDIS_KEY_PREFIX="usesend"` prefixes all keys with `usesend:`).
</Step>
<Step title="Next auth url and secret">
Url is the app url you're going to use and secret is random string. You can generate a random secret using this command.
@@ -2,7 +2,7 @@ import NextAuth from "next-auth";
import { authOptions } from "~/server/auth";
import { env } from "~/env";
import { getRedis } from "~/server/redis";
import { getRedis, redisKey } from "~/server/redis";
import { logger } from "~/server/logger/log";
const handler = NextAuth(authOptions);
@@ -60,7 +60,7 @@ export async function POST(req: Request, ctx: any) {
return handler(req, ctx);
}
const redis = getRedis();
const key = `auth-rl:${ip}`;
const key = redisKey(`auth-rl:${ip}`);
const ttl = 60;
const count = await redis.incr(key);
if (count === 1) await redis.expire(key, ttl);
+2
View File
@@ -53,6 +53,7 @@ export const env = createEnv({
FOUNDER_EMAIL: z.string().optional(),
DISCORD_WEBHOOK_URL: z.string().optional(),
REDIS_URL: z.string(),
REDIS_KEY_PREFIX: z.string().default(""),
S3_COMPATIBLE_ACCESS_KEY: z.string().optional(),
S3_COMPATIBLE_SECRET_KEY: z.string().optional(),
S3_COMPATIBLE_API_URL: z.string().optional(),
@@ -116,6 +117,7 @@ export const env = createEnv({
FOUNDER_EMAIL: process.env.FOUNDER_EMAIL,
DISCORD_WEBHOOK_URL: process.env.DISCORD_WEBHOOK_URL,
REDIS_URL: process.env.REDIS_URL,
REDIS_KEY_PREFIX: process.env.REDIS_KEY_PREFIX,
FROM_EMAIL: process.env.FROM_EMAIL,
S3_COMPATIBLE_ACCESS_KEY: process.env.S3_COMPATIBLE_ACCESS_KEY,
S3_COMPATIBLE_SECRET_KEY: process.env.S3_COMPATIBLE_SECRET_KEY,
+2 -2
View File
@@ -4,7 +4,7 @@ import { env } from "~/env";
import { authedProcedure, createTRPCRouter } from "~/server/api/trpc";
import { logger } from "~/server/logger/log";
import { sendMail } from "~/server/mailer";
import { getRedis } from "~/server/redis";
import { getRedis, redisKey } from "~/server/redis";
import {
WAITLIST_EMAIL_TYPES,
waitlistSubmissionSchema,
@@ -40,7 +40,7 @@ export const waitlistRouter = createTRPCRouter({
}
const redis = getRedis();
const rateKey = `waitlist:requests:${user.id}`;
const rateKey = redisKey(`waitlist:requests:${user.id}`);
const currentCountRaw = await redis.get(rateKey);
const currentCount = currentCountRaw ? Number(currentCountRaw) : 0;
@@ -4,7 +4,7 @@ import {
CAMPAIGN_SCHEDULER_QUEUE,
DEFAULT_QUEUE_OPTIONS,
} from "../queue/queue-constants";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX } from "../redis";
import { CampaignBatchService } from "../service/campaign-service";
import { db } from "../db";
import { logger } from "../logger/log";
@@ -18,6 +18,8 @@ export class CampaignSchedulerService {
CAMPAIGN_SCHEDULER_QUEUE,
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
}
);
@@ -82,7 +84,7 @@ export class CampaignSchedulerService {
logger.error({ err }, "Campaign scheduler tick failed");
}
}),
{ connection: getRedis(), concurrency: 1 }
{ connection: getRedis(), concurrency: 1, prefix: BULL_PREFIX, skipVersionCheck: true }
);
static async start() {
@@ -1,6 +1,6 @@
import {Queue, Worker} from "bullmq";
import {db} from "~/server/db";
import {getRedis} from "~/server/redis";
import {getRedis, BULL_PREFIX} from "~/server/redis";
import {logger} from "../logger/log";
import {DEFAULT_QUEUE_OPTIONS} from "../queue/queue-constants";
import {env} from "~/env";
@@ -19,6 +19,8 @@ if (isSelfHosted() && isEmailCleanupEnabled()) {
*/
const cleanupQueue = new Queue(CLEANUP_QUEUE_NAME, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
});
const worker = new Worker(
@@ -47,6 +49,8 @@ if (isSelfHosted() && isEmailCleanupEnabled()) {
},
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
}
);
+5 -1
View File
@@ -3,7 +3,7 @@ import { db } from "~/server/db";
import { env } from "~/env";
import { getUsageDate, getUsageUnits } from "~/lib/usage";
import { sendUsageToStripe } from "~/server/billing/usage";
import { getRedis } from "~/server/redis";
import { getRedis, BULL_PREFIX } from "~/server/redis";
import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants";
import { logger } from "../logger/log";
@@ -11,6 +11,8 @@ const USAGE_QUEUE_NAME = "usage-reporting";
const usageQueue = new Queue(USAGE_QUEUE_NAME, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
});
const worker = new Worker(
@@ -69,6 +71,8 @@ const worker = new Worker(
},
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
},
);
@@ -1,7 +1,7 @@
import { Queue, Worker } from "bullmq";
import { subDays } from "date-fns";
import { db } from "~/server/db";
import { getRedis } from "~/server/redis";
import { getRedis, BULL_PREFIX } from "~/server/redis";
import { DEFAULT_QUEUE_OPTIONS, WEBHOOK_CLEANUP_QUEUE } from "../queue/queue-constants";
import { logger } from "../logger/log";
@@ -9,6 +9,8 @@ const WEBHOOK_RETENTION_DAYS = 30;
const webhookCleanupQueue = new Queue(WEBHOOK_CLEANUP_QUEUE, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
});
const worker = new Worker(
@@ -30,6 +32,8 @@ const worker = new Worker(
},
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
}
);
+2 -2
View File
@@ -3,7 +3,7 @@ import { swaggerUI } from "@hono/swagger-ui";
import { Context, Next } from "hono";
import { handleError } from "./api-error";
import { env } from "~/env";
import { getRedis } from "~/server/redis";
import { getRedis, redisKey } from "~/server/redis";
import { getTeamFromToken } from "~/server/public-api/auth";
import { isSelfHosted } from "~/utils/common";
import { UnsendApiError } from "./api-error";
@@ -66,7 +66,7 @@ export function getApp() {
const team = c.var.team;
const limit = team.apiRateLimit ?? 2; // Default limit from your previous setup
const key = `rl:${team.id}`; // Rate limit key for Redis
const key = redisKey(`rl:${team.id}`); // Rate limit key for Redis
const redis = getRedis();
let currentRequests: number;
+24 -2
View File
@@ -3,10 +3,31 @@ import { env } from "~/env";
export let connection: IORedis | null = null;
/**
* Key prefix derived from REDIS_KEY_PREFIX env var.
* When set (e.g. "usesend"), all cache keys become "usesend:team:1", etc.
* When empty, keys are unprefixed (backwards compatible).
*/
export const REDIS_PREFIX = env.REDIS_KEY_PREFIX
? `${env.REDIS_KEY_PREFIX}:`
: "";
/**
* BullMQ prefix (no trailing colon — BullMQ adds its own separator).
* When REDIS_KEY_PREFIX is empty, falls back to BullMQ's default "bull".
*/
export const BULL_PREFIX = env.REDIS_KEY_PREFIX || "bull";
/** Prefix a cache key with REDIS_KEY_PREFIX. */
export function redisKey(key: string): string {
return `${REDIS_PREFIX}${key}`;
}
export const getRedis = () => {
if (!connection || connection.status === "end") {
connection = new IORedis(`${env.REDIS_URL}?family=0`, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
}
return connection;
@@ -24,9 +45,10 @@ export async function withCache<T>(
const { ttlSeconds = 120, disable = false } = options ?? {};
const redis = getRedis();
const prefixedKey = redisKey(key);
if (!disable) {
const cached = await redis.get(key);
const cached = await redis.get(prefixedKey);
if (cached) {
try {
return JSON.parse(cached) as T;
@@ -40,7 +62,7 @@ export async function withCache<T>(
if (!disable) {
try {
await redis.setex(key, ttlSeconds, JSON.stringify(value));
await redis.setex(prefixedKey, ttlSeconds, JSON.stringify(value));
} catch {
// ignore cache set errors
}
@@ -10,7 +10,7 @@ import {
} from "@prisma/client";
import { EmailQueueService } from "./email-queue-service";
import { Queue, Worker } from "bullmq";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX } from "../redis";
import {
CAMPAIGN_BATCH_QUEUE,
DEFAULT_QUEUE_OPTIONS,
@@ -928,6 +928,8 @@ export class CampaignBatchService {
CAMPAIGN_BATCH_QUEUE,
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
}
);
@@ -1028,7 +1030,7 @@ export class CampaignBatchService {
data: { lastCursor: newCursor, lastSentAt: new Date() },
});
}),
{ connection: getRedis(), concurrency: 20 }
{ connection: getRedis(), concurrency: 20, prefix: BULL_PREFIX, skipVersionCheck: true }
);
static async queueBatch({
@@ -1,5 +1,5 @@
import { Queue, Worker } from "bullmq";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX } from "../redis";
import {
DEFAULT_QUEUE_OPTIONS,
CONTACT_BULK_ADD_QUEUE,
@@ -19,6 +19,8 @@ type ContactJob = TeamJob<ContactJobData>;
class ContactQueueService {
public static queue = new Queue<ContactJobData>(CONTACT_BULK_ADD_QUEUE, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
defaultJobOptions: DEFAULT_QUEUE_OPTIONS,
});
@@ -27,6 +29,8 @@ class ContactQueueService {
createWorkerHandler(processContactJob),
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
concurrency: 20,
},
);
@@ -5,7 +5,7 @@ import { convert as htmlToText } from "html-to-text";
import { getConfigurationSetName } from "~/utils/ses-utils";
import { db } from "../db";
import { sendRawEmail } from "../aws/ses";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX } from "../redis";
import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants";
import { logger } from "../logger/log";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
@@ -25,12 +25,14 @@ function createQueueAndWorker(region: string, quota: number, suffix: string) {
const queueName = `${region}-${suffix}`;
const queue = new Queue(queueName, { connection });
const queue = new Queue(queueName, { connection, prefix: BULL_PREFIX, skipVersionCheck: true });
// TODO: Add team context to job data when queueing
const worker = new Worker(queueName, createWorkerHandler(executeEmail), {
concurrency: quota,
connection,
prefix: BULL_PREFIX,
skipVersionCheck: true,
});
return { queue, worker };
@@ -1,4 +1,4 @@
import { getRedis } from "~/server/redis";
import { getRedis, redisKey } from "~/server/redis";
import { canonicalizePayload } from "~/server/utils/idempotency";
import { UnsendApiError } from "~/server/public-api/api-error";
import { logger } from "~/server/logger/log";
@@ -22,11 +22,11 @@ export type IdempotencyHandlerOptions<TPayload, TResult> = {
};
function resultKey(teamId: number, key: string) {
return `idem:${teamId}:${key}`;
return redisKey(`idem:${teamId}:${key}`);
}
function lockKey(teamId: number, key: string) {
return `idemlock:${teamId}:${key}`;
return redisKey(`idemlock:${teamId}:${key}`);
}
export const IdempotencyService = {
@@ -21,7 +21,7 @@ import {
updateCampaignAnalytics,
} from "./campaign-service";
import { env } from "~/env";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX } from "../redis";
import { Queue, Worker } from "bullmq";
import {
DEFAULT_QUEUE_OPTIONS,
@@ -619,6 +619,8 @@ function getEmailData(data: SesEvent) {
export class SesHookParser {
private static sesHookQueue = new Queue(SES_WEBHOOK_QUEUE, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
});
private static worker = new Worker(
@@ -635,6 +637,8 @@ export class SesHookParser {
},
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
concurrency: 50,
},
);
+4 -4
View File
@@ -5,7 +5,7 @@ import { sendMail, sendTeamInviteEmail } from "~/server/mailer";
import { logger } from "~/server/logger/log";
import type { Prisma, Team, TeamInvite } from "@prisma/client";
import { UnsendApiError } from "../public-api/api-error";
import { getRedis } from "~/server/redis";
import { getRedis, redisKey } from "~/server/redis";
import { LimitReason } from "~/lib/constants/plans";
import { LimitService } from "./limit-service";
import { renderUsageLimitReachedEmail } from "../email-templates/UsageLimitReachedEmail";
@@ -17,7 +17,7 @@ const TEAM_CACHE_TTL_SECONDS = 120; // 2 minutes
export class TeamService {
private static cacheKey(teamId: number) {
return `team:${teamId}`;
return redisKey(`team:${teamId}`);
}
static async refreshTeamCache(teamId: number): Promise<Team | null> {
@@ -396,7 +396,7 @@ export class TeamService {
}
const redis = getRedis();
const cacheKey = `limit:notify:${teamId}:${reason}`;
const cacheKey = redisKey(`limit:notify:${teamId}:${reason}`);
// Atomic SET NX to prevent race conditions: only one concurrent caller
// can acquire the cooldown key. TTL = 24 hours (one notification per day).
const acquired = await redis.set(cacheKey, "1", "EX", 24 * 60 * 60, "NX");
@@ -493,7 +493,7 @@ export class TeamService {
}
const redis = getRedis();
const cacheKey = `limit:warning:${teamId}:${reason}`;
const cacheKey = redisKey(`limit:warning:${teamId}:${reason}`);
// Atomic SET NX to prevent race conditions: only one concurrent caller
// can acquire the cooldown key. TTL = 24 hours (one notification per day).
const acquired = await redis.set(cacheKey, "1", "EX", 24 * 60 * 60, "NX");
@@ -10,7 +10,7 @@ import {
type WebhookEventType,
} from "@usesend/lib/src/webhook/webhook-events";
import { db } from "../db";
import { getRedis } from "../redis";
import { getRedis, BULL_PREFIX, redisKey } from "../redis";
import {
DEFAULT_QUEUE_OPTIONS,
WEBHOOK_DISPATCH_QUEUE,
@@ -42,6 +42,8 @@ type WebhookEventInput<TType extends WebhookEventType> =
export class WebhookQueueService {
private static queue = new Queue<WebhookCallJobData>(WEBHOOK_DISPATCH_QUEUE, {
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
defaultJobOptions: {
...DEFAULT_QUEUE_OPTIONS,
attempts: WEBHOOK_MAX_ATTEMPTS,
@@ -57,6 +59,8 @@ export class WebhookQueueService {
createWorkerHandler(processWebhookCall),
{
connection: getRedis(),
prefix: BULL_PREFIX,
skipVersionCheck: true,
concurrency: WEBHOOK_DISPATCH_CONCURRENCY,
},
);
@@ -446,7 +450,7 @@ async function processWebhookCall(job: WebhookCallJob) {
},
});
const lockKey = `webhook:lock:${call.webhookId}`;
const lockKey = redisKey(`webhook:lock:${call.webhookId}`);
const redis = getRedis();
const lockValue = randomUUID();