Added transaction to creating an SES setting & upgraded to Prisma ORM 6 (#109)
* Added transaction to creating SES setting & upgraded Prisma to Prisma ORM 6 * Keep logging of the queue status in --------- Co-authored-by: Kumarion <121711454+Kumarion@users.noreply.github.com>
This commit is contained in:
@@ -46,10 +46,6 @@ export const env = createEnv({
|
||||
ADMIN_EMAIL: z.string().optional(),
|
||||
DISCORD_WEBHOOK_URL: z.string().optional(),
|
||||
REDIS_URL: z.string(),
|
||||
ENABLE_PRISMA_CLIENT: z
|
||||
.string()
|
||||
.default("false")
|
||||
.transform((str) => str === "true"), // Converts string "true" to boolean true
|
||||
S3_COMPATIBLE_ACCESS_KEY: z.string().optional(),
|
||||
S3_COMPATIBLE_SECRET_KEY: z.string().optional(),
|
||||
S3_COMPATIBLE_API_URL: z.string().optional(),
|
||||
@@ -93,7 +89,6 @@ export const env = createEnv({
|
||||
DISCORD_WEBHOOK_URL: process.env.DISCORD_WEBHOOK_URL,
|
||||
REDIS_URL: process.env.REDIS_URL,
|
||||
FROM_EMAIL: process.env.FROM_EMAIL,
|
||||
ENABLE_PRISMA_CLIENT: process.env.ENABLE_PRISMA_CLIENT, // Add this line
|
||||
S3_COMPATIBLE_ACCESS_KEY: process.env.S3_COMPATIBLE_ACCESS_KEY,
|
||||
S3_COMPATIBLE_SECRET_KEY: process.env.S3_COMPATIBLE_SECRET_KEY,
|
||||
S3_COMPATIBLE_API_URL: process.env.S3_COMPATIBLE_API_URL,
|
||||
|
@@ -2,6 +2,7 @@ import {
|
||||
SNSClient,
|
||||
CreateTopicCommand,
|
||||
SubscribeCommand,
|
||||
DeleteTopicCommand,
|
||||
} from "@aws-sdk/client-sns";
|
||||
import { env } from "~/env";
|
||||
|
||||
@@ -25,6 +26,11 @@ export async function createTopic(topic: string, region: string) {
|
||||
return data.TopicArn;
|
||||
}
|
||||
|
||||
export async function deleteTopic(topicArn: string, region: string) {
|
||||
const client = getSnsClient(region);
|
||||
await client.send(new DeleteTopicCommand({ TopicArn: topicArn }));
|
||||
}
|
||||
|
||||
export async function subscribeEndpoint(
|
||||
topicArn: string,
|
||||
endpointUrl: string,
|
||||
|
@@ -1,5 +1,4 @@
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import { withOptimize } from "@prisma/extension-optimize";
|
||||
import { env } from "~/env";
|
||||
|
||||
const createPrismaClient = () => {
|
||||
@@ -7,9 +6,6 @@ const createPrismaClient = () => {
|
||||
log:
|
||||
env.NODE_ENV === "development" ? ["query", "error", "warn"] : ["error"],
|
||||
});
|
||||
if (env.ENABLE_PRISMA_CLIENT) {
|
||||
return client.$extends(withOptimize());
|
||||
}
|
||||
|
||||
return client;
|
||||
};
|
||||
|
@@ -78,31 +78,64 @@ export class SesSettingsService {
|
||||
}
|
||||
|
||||
const idPrefix = smallNanoid(10);
|
||||
let topicArn: string | undefined;
|
||||
|
||||
const setting = await db.sesSetting.create({
|
||||
data: {
|
||||
try {
|
||||
const topicName = `${idPrefix}-${region}-unsend`;
|
||||
topicArn = await sns.createTopic(topicName, region);
|
||||
if (!topicArn) {
|
||||
throw new Error("Failed to create SNS topic");
|
||||
}
|
||||
|
||||
const setting = await db.$transaction(async (tx) => {
|
||||
const setting = await tx.sesSetting.create({
|
||||
data: {
|
||||
region,
|
||||
callbackUrl: `${parsedUrl}/api/ses_callback`,
|
||||
topic: topicName,
|
||||
topicArn,
|
||||
sesEmailRateLimit: sendingRateLimit,
|
||||
transactionalQuota,
|
||||
idPrefix,
|
||||
},
|
||||
});
|
||||
|
||||
await sns.subscribeEndpoint(
|
||||
topicArn!,
|
||||
`${setting.callbackUrl}`,
|
||||
setting.region
|
||||
);
|
||||
|
||||
return setting;
|
||||
});
|
||||
|
||||
if (!setting) {
|
||||
throw new Error("Failed to create setting");
|
||||
}
|
||||
|
||||
await registerConfigurationSet(setting);
|
||||
|
||||
EmailQueueService.initializeQueue(
|
||||
region,
|
||||
callbackUrl: `${parsedUrl}/api/ses_callback`,
|
||||
topic: `${idPrefix}-${region}-unsend`,
|
||||
sesEmailRateLimit: sendingRateLimit,
|
||||
transactionalQuota,
|
||||
idPrefix,
|
||||
},
|
||||
});
|
||||
setting.sesEmailRateLimit,
|
||||
setting.transactionalQuota
|
||||
);
|
||||
console.log(
|
||||
EmailQueueService.transactionalQueue,
|
||||
EmailQueueService.marketingQueue
|
||||
);
|
||||
|
||||
await createSettingInAws(setting);
|
||||
|
||||
EmailQueueService.initializeQueue(
|
||||
region,
|
||||
setting.sesEmailRateLimit,
|
||||
setting.transactionalQuota
|
||||
);
|
||||
console.log(
|
||||
EmailQueueService.transactionalQueue,
|
||||
EmailQueueService.marketingQueue
|
||||
);
|
||||
|
||||
await this.invalidateCache();
|
||||
await this.invalidateCache();
|
||||
} catch (error) {
|
||||
if (topicArn) {
|
||||
try {
|
||||
await sns.deleteTopic(topicArn, region);
|
||||
} catch (deleteError) {
|
||||
console.error('Failed to delete SNS topic after error:', deleteError);
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async updateSesSetting({
|
||||
@@ -163,41 +196,6 @@ export class SesSettingsService {
|
||||
}
|
||||
}
|
||||
|
||||
async function createSettingInAws(setting: SesSetting) {
|
||||
await registerTopicInAws(setting).then(registerConfigurationSet);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new topic in AWS and subscribes the callback URL to it
|
||||
*/
|
||||
async function registerTopicInAws(setting: SesSetting) {
|
||||
const topicArn = await sns.createTopic(setting.topic, setting.region);
|
||||
|
||||
if (!topicArn) {
|
||||
throw new Error("Failed to create SNS topic");
|
||||
}
|
||||
|
||||
const _setting = await db.sesSetting.update({
|
||||
where: {
|
||||
id: setting.id,
|
||||
},
|
||||
data: {
|
||||
topicArn,
|
||||
},
|
||||
});
|
||||
|
||||
// Invalidate the cache to update the topicArn list
|
||||
SesSettingsService.invalidateCache();
|
||||
|
||||
await sns.subscribeEndpoint(
|
||||
topicArn,
|
||||
`${setting.callbackUrl}`,
|
||||
setting.region
|
||||
);
|
||||
|
||||
return _setting;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new configuration set in AWS for given region
|
||||
* Totally consist of 4 configs.
|
||||
|
Reference in New Issue
Block a user