add logging (#187)

This commit is contained in:
KM Koushik
2025-07-26 20:05:34 +10:00
committed by GitHub
parent 5612d7a3eb
commit 202fbeacb6
24 changed files with 490 additions and 134 deletions

View File

@@ -8,6 +8,15 @@ import { sendRawEmail } from "../aws/ses";
import { getRedis } from "../redis";
import { DEFAULT_QUEUE_OPTIONS } from "../queue/queue-constants";
import { Prisma } from "@prisma/client";
import { logger } from "../logger/log";
import { createWorkerHandler, TeamJob } from "../queue/bullmq-context";
type QueueEmailJob = TeamJob<{
emailId: string;
timestamp: number;
unsubUrl?: string;
isBulk?: boolean;
}>;
function createQueueAndWorker(region: string, quota: number, suffix: string) {
const connection = getRedis();
@@ -16,7 +25,8 @@ function createQueueAndWorker(region: string, quota: number, suffix: string) {
const queue = new Queue(queueName, { connection });
const worker = new Worker(queueName, executeEmail, {
// TODO: Add team context to job data when queueing
const worker = new Worker(queueName, createWorkerHandler(executeEmail), {
concurrency: quota,
connection,
});
@@ -26,9 +36,9 @@ function createQueueAndWorker(region: string, quota: number, suffix: string) {
export class EmailQueueService {
private static initialized = false;
public static transactionalQueue = new Map<string, Queue>();
public static transactionalQueue = new Map<string, Queue<QueueEmailJob>>();
private static transactionalWorker = new Map<string, Worker>();
public static marketingQueue = new Map<string, Queue>();
public static marketingQueue = new Map<string, Queue<QueueEmailJob>>();
private static marketingWorker = new Map<string, Worker>();
public static initializeQueue(
@@ -36,7 +46,10 @@ export class EmailQueueService {
quota: number,
transactionalQuotaPercentage: number
) {
console.log(`[EmailQueueService]: Initializing queue for region ${region}`);
logger.info(
{ region },
`[EmailQueueService]: Initializing queue for region`
);
const transactionalQuota = Math.floor(
(quota * transactionalQuotaPercentage) / 100
@@ -44,8 +57,9 @@ export class EmailQueueService {
const marketingQuota = quota - transactionalQuota;
if (this.transactionalQueue.has(region)) {
console.log(
`[EmailQueueService]: Updating transactional quota for region ${region} to ${transactionalQuota}`
logger.info(
{ region, transactionalQuota },
`[EmailQueueService]: Updating transactional quota for region`
);
const transactionalWorker = this.transactionalWorker.get(region);
if (transactionalWorker) {
@@ -53,8 +67,9 @@ export class EmailQueueService {
transactionalQuota !== 0 ? transactionalQuota : 1;
}
} else {
console.log(
`[EmailQueueService]: Creating transactional queue for region ${region} with quota ${transactionalQuota}`
logger.info(
{ region, transactionalQuota },
`[EmailQueueService]: Creating transactional queue for region`
);
const { queue: transactionalQueue, worker: transactionalWorker } =
createQueueAndWorker(
@@ -67,16 +82,18 @@ export class EmailQueueService {
}
if (this.marketingQueue.has(region)) {
console.log(
`[EmailQueueService]: Updating marketing quota for region ${region} to ${marketingQuota}`
logger.info(
{ region, marketingQuota },
`[EmailQueueService]: Updating marketing quota for region`
);
const marketingWorker = this.marketingWorker.get(region);
if (marketingWorker) {
marketingWorker.concurrency = marketingQuota !== 0 ? marketingQuota : 1;
}
} else {
console.log(
`[EmailQueueService]: Creating marketing queue for region ${region} with quota ${marketingQuota}`
logger.info(
{ region, marketingQuota },
`[EmailQueueService]: Creating marketing queue for region`
);
const { queue: marketingQueue, worker: marketingWorker } =
createQueueAndWorker(
@@ -91,6 +108,7 @@ export class EmailQueueService {
public static async queueEmail(
emailId: string,
teamId: number,
region: string,
transactional: boolean,
unsubUrl?: string,
@@ -108,7 +126,7 @@ export class EmailQueueService {
}
queue.add(
emailId,
{ emailId, timestamp: Date.now(), unsubUrl, isBulk },
{ emailId, timestamp: Date.now(), unsubUrl, isBulk, teamId },
{ jobId: emailId, delay, ...DEFAULT_QUEUE_OPTIONS }
);
}
@@ -123,6 +141,7 @@ export class EmailQueueService {
public static async queueBulk(
jobs: {
emailId: string;
teamId: number;
region: string;
transactional: boolean;
unsubUrl?: string;
@@ -131,7 +150,7 @@ export class EmailQueueService {
}[]
): Promise<void> {
if (jobs.length === 0) {
console.log("[EmailQueueService]: No jobs provided for bulk queue.");
logger.info("[EmailQueueService]: No jobs provided for bulk queue.");
return;
}
@@ -139,8 +158,9 @@ export class EmailQueueService {
await this.init();
}
console.log(
`[EmailQueueService]: Starting bulk queue for ${jobs.length} jobs.`
logger.info(
{ count: jobs.length },
`[EmailQueueService]: Starting bulk queue for jobs.`
);
// Group jobs by region and type
@@ -176,8 +196,9 @@ export class EmailQueueService {
for (const groupKey in groupedJobs) {
const group = groupedJobs[groupKey];
if (!group || !group.queue) {
console.error(
`[EmailQueueService]: Queue not found for group ${groupKey} during bulk add. Skipping ${group?.jobDetails?.length ?? 0} jobs.`
logger.error(
{ groupKey, count: group?.jobDetails?.length ?? 0 },
`[EmailQueueService]: Queue not found for group during bulk add. Skipping jobs.`
);
// Optionally: handle these skipped jobs (e.g., mark corresponding emails as failed)
continue;
@@ -192,6 +213,7 @@ export class EmailQueueService {
timestamp: job.timestamp ?? Date.now(),
unsubUrl: job.unsubUrl,
isBulk,
teamId: job.teamId,
},
opts: {
jobId: job.emailId, // Use emailId as jobId
@@ -200,14 +222,15 @@ export class EmailQueueService {
},
}));
console.log(
`[EmailQueueService]: Adding ${bulkData.length} jobs to queue ${queue.name}`
logger.info(
{ count: bulkData.length, queue: queue.name },
`[EmailQueueService]: Adding jobs to queue`
);
bulkAddPromises.push(
queue.addBulk(bulkData).catch((error) => {
console.error(
`[EmailQueueService]: Failed to add bulk jobs to queue ${queue.name}:`,
error
logger.error(
{ err: error, queue: queue.name },
`[EmailQueueService]: Failed to add bulk jobs to queue`
);
// Optionally: handle bulk add failure (e.g., mark corresponding emails as failed)
})
@@ -215,7 +238,7 @@ export class EmailQueueService {
}
await Promise.allSettled(bulkAddPromises);
console.log(
logger.info(
"[EmailQueueService]: Finished processing bulk queue requests."
);
}
@@ -278,16 +301,10 @@ export class EmailQueueService {
}
}
async function executeEmail(
job: Job<{
emailId: string;
timestamp: number;
unsubUrl?: string;
isBulk?: boolean;
}>
) {
console.log(
`[EmailQueueService]: Executing email job ${job.data.emailId}, time elapsed: ${Date.now() - job.data.timestamp}ms`
async function executeEmail(job: QueueEmailJob) {
logger.info(
{ emailId: job.data.emailId, elapsed: Date.now() - job.data.timestamp },
`[EmailQueueService]: Executing email job`
);
const email = await db.email.findUnique({
@@ -301,7 +318,10 @@ async function executeEmail(
: null;
if (!email) {
console.log(`[EmailQueueService]: Email not found, skipping`);
logger.info(
{ emailId: job.data.emailId },
`[EmailQueueService]: Email not found, skipping`
);
return;
}
@@ -309,7 +329,7 @@ async function executeEmail(
? JSON.parse(email.attachments)
: [];
console.log(`Domain: ${JSON.stringify(domain)}`);
logger.info({ domain }, `Domain`);
const configurationSetName = await getConfigurationSetName(
domain?.clickTracking ?? false,
@@ -321,7 +341,7 @@ async function executeEmail(
return;
}
console.log(`[EmailQueueService]: Sending email ${email.id}`);
logger.info({ emailId: email.id }, `[EmailQueueService]: Sending email`);
const unsubUrl = job.data.unsubUrl;
const isBulk = job.data.isBulk;