Files
spoon/packages/backend/convex/threads.ts
T
2026-06-22 10:37:26 -04:00

459 lines
13 KiB
TypeScript

import { ConvexError, v } from 'convex/values';
import type { Doc } from './_generated/dataModel';
import { internal } from './_generated/api';
import {
internalMutation,
internalQuery,
mutation,
query,
} from './_generated/server';
import {
getOwnedSpoon,
getRequiredUserId,
optionalText,
requireText,
} from './model';
const threadSource = v.union(
v.literal('user_request'),
v.literal('upstream_update'),
v.literal('merge_conflict'),
v.literal('manual_review'),
v.literal('system'),
);
const threadStatus = v.union(
v.literal('open'),
v.literal('queued'),
v.literal('running'),
v.literal('waiting_for_user'),
v.literal('changes_ready'),
v.literal('draft_pr_opened'),
v.literal('resolved'),
v.literal('ignored'),
v.literal('failed'),
v.literal('cancelled'),
);
const maintenanceOutcome = v.union(
v.literal('auto_synced'),
v.literal('sync_recommended'),
v.literal('ignored'),
v.literal('review_pr_recommended'),
v.literal('manual_review_required'),
v.literal('conflict_resolution_required'),
v.literal('failed'),
v.literal('unknown'),
);
const messageRole = v.union(
v.literal('user'),
v.literal('assistant'),
v.literal('system'),
v.literal('tool'),
);
const messageStatus = v.union(
v.literal('queued'),
v.literal('streaming'),
v.literal('completed'),
v.literal('failed'),
);
const titleFromPrompt = (prompt: string) => {
const firstLine = prompt.trim().split('\n')[0] ?? 'Thread';
return firstLine.length > 80 ? `${firstLine.slice(0, 77)}...` : firstLine;
};
const publicThread = (thread: Doc<'threads'>) => thread;
export const listMine = query({
args: {
status: v.optional(v.union(threadStatus, v.literal('all'))),
source: v.optional(v.union(threadSource, v.literal('all'))),
spoonId: v.optional(v.id('spoons')),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
const threads = await ctx.db
.query('threads')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.order('desc')
.take(args.limit ?? 50);
return threads.filter((thread) => {
if (
args.status &&
args.status !== 'all' &&
thread.status !== args.status
) {
return false;
}
if (
args.source &&
args.source !== 'all' &&
thread.source !== args.source
) {
return false;
}
if (args.spoonId && thread.spoonId !== args.spoonId) return false;
return true;
});
},
});
export const listForSpoon = query({
args: { spoonId: v.id('spoons'), limit: v.optional(v.number()) },
handler: async (ctx, { spoonId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
await getOwnedSpoon(ctx, spoonId, ownerId);
return await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(limit ?? 25);
},
});
export const get = query({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
const spoon = thread.spoonId ? await ctx.db.get(thread.spoonId) : null;
const job = thread.latestAgentJobId
? await ctx.db.get(thread.latestAgentJobId)
: null;
return {
thread: publicThread(thread),
spoon: spoon?.ownerId === ownerId ? spoon : null,
latestJob: job?.ownerId === ownerId ? job : null,
};
},
});
export const listMessages = query({
args: { threadId: v.id('threads'), limit: v.optional(v.number()) },
handler: async (ctx, { threadId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
return await ctx.db
.query('threadMessages')
.withIndex('by_thread', (q) => q.eq('threadId', threadId))
.order('asc')
.take(limit ?? 200);
},
});
export const createUserThread = mutation({
args: {
spoonId: v.id('spoons'),
title: v.optional(v.string()),
prompt: v.string(),
baseBranch: v.optional(v.string()),
requestedBranchName: v.optional(v.string()),
materializeEnvFile: v.optional(v.boolean()),
envFilePath: v.optional(v.string()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
await getOwnedSpoon(ctx, args.spoonId, ownerId);
const prompt = requireText(args.prompt, 'Prompt');
const now = Date.now();
const threadId = await ctx.db.insert('threads', {
ownerId,
spoonId: args.spoonId,
title: optionalText(args.title) ?? titleFromPrompt(prompt),
summary: prompt,
source: 'user_request',
status: 'open',
priority: 'normal',
createdAt: now,
updatedAt: now,
});
await ctx.db.insert('threadMessages', {
threadId,
ownerId,
spoonId: args.spoonId,
role: 'user',
content: prompt,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.agentJobs.createForThreadInternal,
{
threadId,
ownerId,
jobType: 'user_change',
baseBranch: args.baseBranch,
requestedBranchName: args.requestedBranchName,
materializeEnvFile: args.materializeEnvFile,
envFilePath: args.envFilePath,
aiProviderProfileId: args.aiProviderProfileId,
},
);
return threadId;
},
});
export const appendUserMessage = mutation({
args: { threadId: v.id('threads'), content: v.string() },
handler: async (ctx, { threadId, content }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
const now = Date.now();
return await ctx.db.insert('threadMessages', {
threadId,
ownerId,
spoonId: thread.spoonId,
role: 'user',
content: requireText(content, 'Message'),
status: 'queued',
createdAt: now,
updatedAt: now,
});
},
});
export const cancel = mutation({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
await ctx.db.patch(threadId, {
status: 'cancelled',
updatedAt: Date.now(),
resolvedAt: Date.now(),
});
return { success: true };
},
});
export const markResolved = mutation({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
await ctx.db.patch(threadId, {
status: 'resolved',
updatedAt: Date.now(),
resolvedAt: Date.now(),
});
return { success: true };
},
});
export const findOpenMaintenanceThread = internalQuery({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
upstreamTo: v.string(),
},
handler: async (ctx, { spoonId, ownerId, upstreamTo }) => {
const threads = await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.collect();
return (
threads.find(
(thread) =>
thread.ownerId === ownerId &&
thread.upstreamTo === upstreamTo &&
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
thread.status,
),
) ?? null
);
},
});
export const createMaintenanceThread = internalMutation({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
source: v.union(v.literal('upstream_update'), v.literal('merge_conflict')),
title: v.string(),
summary: v.string(),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.string(),
forkHeadAtCreation: v.optional(v.string()),
mergeBaseAtCreation: v.optional(v.string()),
relatedSyncRunId: v.optional(v.id('syncRuns')),
jobType: v.union(
v.literal('maintenance_review'),
v.literal('conflict_resolution'),
),
},
handler: async (ctx, args) => {
const now = Date.now();
const existing = await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', args.spoonId))
.order('desc')
.collect()
.then((threads) =>
threads.find(
(thread) =>
thread.ownerId === args.ownerId &&
thread.upstreamTo === args.upstreamTo &&
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
thread.status,
),
),
);
if (existing) {
await ctx.db.insert('threadMessages', {
threadId: existing._id,
ownerId: args.ownerId,
spoonId: args.spoonId,
role: 'system',
content: args.summary,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.db.patch(existing._id, {
relatedSyncRunId: args.relatedSyncRunId,
updatedAt: now,
});
return existing._id;
}
const threadId = await ctx.db.insert('threads', {
ownerId: args.ownerId,
spoonId: args.spoonId,
title: args.title,
summary: args.summary,
source: args.source,
status: 'open',
priority: args.source === 'merge_conflict' ? 'high' : 'normal',
upstreamFrom: args.upstreamFrom,
upstreamTo: args.upstreamTo,
forkHeadAtCreation: args.forkHeadAtCreation,
mergeBaseAtCreation: args.mergeBaseAtCreation,
relatedSyncRunId: args.relatedSyncRunId,
maintenanceOutcome: 'unknown',
createdAt: now,
updatedAt: now,
});
await ctx.db.insert('threadMessages', {
threadId,
ownerId: args.ownerId,
spoonId: args.spoonId,
role: 'system',
content: args.summary,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.agentJobs.createForThreadInternal,
{
threadId,
ownerId: args.ownerId,
jobType: args.jobType,
},
);
return threadId;
},
});
export const patchThreadInternal = internalMutation({
args: {
threadId: v.id('threads'),
status: v.optional(threadStatus),
summary: v.optional(v.string()),
maintenanceOutcome: v.optional(maintenanceOutcome),
ignoredCommitShas: v.optional(v.array(v.string())),
ignoredReason: v.optional(v.string()),
latestAgentJobId: v.optional(v.id('agentJobs')),
},
handler: async (ctx, args) => {
const thread = await ctx.db.get(args.threadId);
if (!thread) throw new ConvexError('Thread not found.');
const patch: Partial<Doc<'threads'>> = { updatedAt: Date.now() };
if (args.status !== undefined) patch.status = args.status;
if (args.summary !== undefined) patch.summary = optionalText(args.summary);
if (args.maintenanceOutcome !== undefined) {
patch.maintenanceOutcome = args.maintenanceOutcome;
}
if (args.ignoredCommitShas !== undefined) {
patch.ignoredCommitShas = args.ignoredCommitShas;
}
if (args.ignoredReason !== undefined) {
patch.ignoredReason = optionalText(args.ignoredReason);
}
if (args.latestAgentJobId !== undefined) {
patch.latestAgentJobId = args.latestAgentJobId;
}
if (
args.status &&
['resolved', 'ignored', 'failed', 'cancelled'].includes(args.status)
) {
patch.resolvedAt = Date.now();
}
await ctx.db.patch(args.threadId, patch);
return { success: true };
},
});
export const appendMessageInternal = internalMutation({
args: {
threadId: v.id('threads'),
ownerId: v.id('users'),
role: messageRole,
content: v.string(),
status: v.optional(messageStatus),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
const thread = await ctx.db.get(args.threadId);
if (thread?.ownerId !== args.ownerId) {
throw new ConvexError('Thread not found.');
}
const now = Date.now();
return await ctx.db.insert('threadMessages', {
threadId: args.threadId,
ownerId: args.ownerId,
spoonId: thread.spoonId,
role: args.role,
content: args.content,
status: args.status ?? 'completed',
metadata: optionalText(args.metadata),
createdAt: now,
updatedAt: now,
});
},
});
export const recordIgnoredUpstreamChange = internalMutation({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.string(),
commitShas: v.array(v.string()),
reason: v.string(),
decidedBy: v.union(v.literal('agent'), v.literal('user')),
threadId: v.optional(v.id('threads')),
},
handler: async (ctx, args) => {
return await ctx.db.insert('ignoredUpstreamChanges', {
...args,
createdAt: Date.now(),
});
},
});