fix: prevent duplicate notification emails via atomic Redis SET NX (#346)
The warning and limit-reached notification emails were being sent multiple times because of a race condition: concurrent workers could both read the Redis cooldown key as empty (GET), both send emails, then both set the key (SETEX). Replaced the non-atomic GET + SETEX pattern with a single atomic SET ... NX EX that claims the cooldown slot before any emails are sent. Also increased cooldown from 6 hours to 24 hours so each notification is sent at most once per day. https://claude.ai/code/session_01VBYXi5e64Vtq1cXWsfTYTw Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -356,7 +356,7 @@ export class TeamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify all team users that email limit has been reached, at most once per 6 hours.
|
* Notify all team users that email limit has been reached, at most once per day.
|
||||||
*/
|
*/
|
||||||
static async maybeNotifyEmailLimitReached(
|
static async maybeNotifyEmailLimitReached(
|
||||||
teamId: number,
|
teamId: number,
|
||||||
@@ -390,13 +390,15 @@ export class TeamService {
|
|||||||
|
|
||||||
const redis = getRedis();
|
const redis = getRedis();
|
||||||
const cacheKey = `limit:notify:${teamId}:${reason}`;
|
const cacheKey = `limit:notify:${teamId}:${reason}`;
|
||||||
const alreadySent = await redis.get(cacheKey);
|
// Atomic SET NX to prevent race conditions: only one concurrent caller
|
||||||
if (alreadySent) {
|
// can acquire the cooldown key. TTL = 24 hours (one notification per day).
|
||||||
|
const acquired = await redis.set(cacheKey, "1", "EX", 24 * 60 * 60, "NX");
|
||||||
|
if (acquired !== "OK") {
|
||||||
logger.info(
|
logger.info(
|
||||||
{ teamId, cacheKey },
|
{ teamId, cacheKey },
|
||||||
"[TeamService]: Skipping notify — cooldown active",
|
"[TeamService]: Skipping notify — cooldown active",
|
||||||
);
|
);
|
||||||
return; // within cooldown window
|
return; // another request already claimed this window
|
||||||
}
|
}
|
||||||
|
|
||||||
const team = await TeamService.getTeamCached(teamId);
|
const team = await TeamService.getTeamCached(teamId);
|
||||||
@@ -447,17 +449,11 @@ export class TeamService {
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set cooldown for 6 hours
|
|
||||||
await redis.setex(cacheKey, 6 * 60 * 60, "1");
|
|
||||||
logger.info(
|
|
||||||
{ teamId, cacheKey },
|
|
||||||
"[TeamService]: Set limit reached notification cooldown",
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify all team users that they're nearing their email limit.
|
* Notify all team users that they're nearing their email limit.
|
||||||
* Rate limited via Redis to avoid spamming; sends at most once per 6 hours per reason.
|
* Rate limited via Redis to avoid spamming; sends at most once per day per reason.
|
||||||
*/
|
*/
|
||||||
static async sendWarningEmail(
|
static async sendWarningEmail(
|
||||||
teamId: number,
|
teamId: number,
|
||||||
@@ -492,13 +488,15 @@ export class TeamService {
|
|||||||
|
|
||||||
const redis = getRedis();
|
const redis = getRedis();
|
||||||
const cacheKey = `limit:warning:${teamId}:${reason}`;
|
const cacheKey = `limit:warning:${teamId}:${reason}`;
|
||||||
const alreadySent = await redis.get(cacheKey);
|
// Atomic SET NX to prevent race conditions: only one concurrent caller
|
||||||
if (alreadySent) {
|
// can acquire the cooldown key. TTL = 24 hours (one notification per day).
|
||||||
|
const acquired = await redis.set(cacheKey, "1", "EX", 24 * 60 * 60, "NX");
|
||||||
|
if (acquired !== "OK") {
|
||||||
logger.info(
|
logger.info(
|
||||||
{ teamId, cacheKey },
|
{ teamId, cacheKey },
|
||||||
"[TeamService]: Skipping warning — cooldown active",
|
"[TeamService]: Skipping warning — cooldown active",
|
||||||
);
|
);
|
||||||
return; // within cooldown window
|
return; // another request already claimed this window
|
||||||
}
|
}
|
||||||
|
|
||||||
const team = await TeamService.getTeamCached(teamId);
|
const team = await TeamService.getTeamCached(teamId);
|
||||||
@@ -558,12 +556,6 @@ export class TeamService {
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set cooldown for 6 hours
|
|
||||||
await redis.setex(cacheKey, 6 * 60 * 60, "1");
|
|
||||||
logger.info(
|
|
||||||
{ teamId, cacheKey },
|
|
||||||
"[TeamService]: Set warning notification cooldown",
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user