import { ConvexError, v } from 'convex/values'; import type { Doc } from './_generated/dataModel'; import { internal } from './_generated/api'; import { internalMutation, internalQuery, mutation, query, } from './_generated/server'; import { getOwnedSpoon, getRequiredUserId, optionalText, requireText, } from './model'; const threadSource = v.union( v.literal('user_request'), v.literal('upstream_update'), v.literal('merge_conflict'), v.literal('manual_review'), v.literal('system'), ); const threadStatus = v.union( v.literal('open'), v.literal('queued'), v.literal('running'), v.literal('waiting_for_user'), v.literal('changes_ready'), v.literal('draft_pr_opened'), v.literal('resolved'), v.literal('ignored'), v.literal('failed'), v.literal('cancelled'), ); const maintenanceOutcome = v.union( v.literal('auto_synced'), v.literal('sync_recommended'), v.literal('ignored'), v.literal('review_pr_recommended'), v.literal('manual_review_required'), v.literal('conflict_resolution_required'), v.literal('failed'), v.literal('unknown'), ); const messageRole = v.union( v.literal('user'), v.literal('assistant'), v.literal('system'), v.literal('tool'), ); const messageStatus = v.union( v.literal('queued'), v.literal('streaming'), v.literal('completed'), v.literal('failed'), ); const titleFromPrompt = (prompt: string) => { const firstLine = prompt.trim().split('\n')[0] ?? 'Thread'; return firstLine.length > 80 ? `${firstLine.slice(0, 77)}...` : firstLine; }; const publicThread = (thread: Doc<'threads'>) => thread; export const listMine = query({ args: { status: v.optional(v.union(threadStatus, v.literal('all'))), source: v.optional(v.union(threadSource, v.literal('all'))), spoonId: v.optional(v.id('spoons')), limit: v.optional(v.number()), }, handler: async (ctx, args) => { const ownerId = await getRequiredUserId(ctx); const threads = await ctx.db .query('threads') .withIndex('by_owner', (q) => q.eq('ownerId', ownerId)) .order('desc') .take(args.limit ?? 50); return threads.filter((thread) => { if ( args.status && args.status !== 'all' && thread.status !== args.status ) { return false; } if ( args.source && args.source !== 'all' && thread.source !== args.source ) { return false; } if (args.spoonId && thread.spoonId !== args.spoonId) return false; return true; }); }, }); export const listForSpoon = query({ args: { spoonId: v.id('spoons'), limit: v.optional(v.number()) }, handler: async (ctx, { spoonId, limit }) => { const ownerId = await getRequiredUserId(ctx); await getOwnedSpoon(ctx, spoonId, ownerId); return await ctx.db .query('threads') .withIndex('by_spoon', (q) => q.eq('spoonId', spoonId)) .order('desc') .take(limit ?? 25); }, }); export const get = query({ args: { threadId: v.id('threads') }, handler: async (ctx, { threadId }) => { const ownerId = await getRequiredUserId(ctx); const thread = await ctx.db.get(threadId); if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.'); const spoon = thread.spoonId ? await ctx.db.get(thread.spoonId) : null; const job = thread.latestAgentJobId ? await ctx.db.get(thread.latestAgentJobId) : null; return { thread: publicThread(thread), spoon: spoon?.ownerId === ownerId ? spoon : null, latestJob: job?.ownerId === ownerId ? job : null, }; }, }); export const listMessages = query({ args: { threadId: v.id('threads'), limit: v.optional(v.number()) }, handler: async (ctx, { threadId, limit }) => { const ownerId = await getRequiredUserId(ctx); const thread = await ctx.db.get(threadId); if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.'); return await ctx.db .query('threadMessages') .withIndex('by_thread', (q) => q.eq('threadId', threadId)) .order('asc') .take(limit ?? 200); }, }); export const createUserThread = mutation({ args: { spoonId: v.id('spoons'), title: v.optional(v.string()), prompt: v.string(), baseBranch: v.optional(v.string()), requestedBranchName: v.optional(v.string()), materializeEnvFile: v.optional(v.boolean()), envFilePath: v.optional(v.string()), aiProviderProfileId: v.optional(v.id('aiProviderProfiles')), }, handler: async (ctx, args) => { const ownerId = await getRequiredUserId(ctx); await getOwnedSpoon(ctx, args.spoonId, ownerId); const prompt = requireText(args.prompt, 'Prompt'); const now = Date.now(); const threadId = await ctx.db.insert('threads', { ownerId, spoonId: args.spoonId, title: optionalText(args.title) ?? titleFromPrompt(prompt), summary: prompt, source: 'user_request', status: 'open', priority: 'normal', createdAt: now, updatedAt: now, }); await ctx.db.insert('threadMessages', { threadId, ownerId, spoonId: args.spoonId, role: 'user', content: prompt, status: 'completed', createdAt: now, updatedAt: now, }); await ctx.scheduler.runAfter( 0, internal.agentJobs.createForThreadInternal, { threadId, ownerId, jobType: 'user_change', baseBranch: args.baseBranch, requestedBranchName: args.requestedBranchName, materializeEnvFile: args.materializeEnvFile, envFilePath: args.envFilePath, aiProviderProfileId: args.aiProviderProfileId, }, ); return threadId; }, }); export const appendUserMessage = mutation({ args: { threadId: v.id('threads'), content: v.string() }, handler: async (ctx, { threadId, content }) => { const ownerId = await getRequiredUserId(ctx); const thread = await ctx.db.get(threadId); if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.'); const now = Date.now(); return await ctx.db.insert('threadMessages', { threadId, ownerId, spoonId: thread.spoonId, role: 'user', content: requireText(content, 'Message'), status: 'queued', createdAt: now, updatedAt: now, }); }, }); export const cancel = mutation({ args: { threadId: v.id('threads') }, handler: async (ctx, { threadId }) => { const ownerId = await getRequiredUserId(ctx); const thread = await ctx.db.get(threadId); if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.'); await ctx.db.patch(threadId, { status: 'cancelled', updatedAt: Date.now(), resolvedAt: Date.now(), }); return { success: true }; }, }); export const markResolved = mutation({ args: { threadId: v.id('threads') }, handler: async (ctx, { threadId }) => { const ownerId = await getRequiredUserId(ctx); const thread = await ctx.db.get(threadId); if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.'); await ctx.db.patch(threadId, { status: 'resolved', updatedAt: Date.now(), resolvedAt: Date.now(), }); return { success: true }; }, }); export const findOpenMaintenanceThread = internalQuery({ args: { spoonId: v.id('spoons'), ownerId: v.id('users'), upstreamTo: v.string(), }, handler: async (ctx, { spoonId, ownerId, upstreamTo }) => { const threads = await ctx.db .query('threads') .withIndex('by_spoon', (q) => q.eq('spoonId', spoonId)) .order('desc') .collect(); return ( threads.find( (thread) => thread.ownerId === ownerId && thread.upstreamTo === upstreamTo && !['resolved', 'ignored', 'failed', 'cancelled'].includes( thread.status, ), ) ?? null ); }, }); export const createMaintenanceThread = internalMutation({ args: { spoonId: v.id('spoons'), ownerId: v.id('users'), source: v.union(v.literal('upstream_update'), v.literal('merge_conflict')), title: v.string(), summary: v.string(), upstreamFrom: v.optional(v.string()), upstreamTo: v.string(), forkHeadAtCreation: v.optional(v.string()), mergeBaseAtCreation: v.optional(v.string()), relatedSyncRunId: v.optional(v.id('syncRuns')), jobType: v.union( v.literal('maintenance_review'), v.literal('conflict_resolution'), ), }, handler: async (ctx, args) => { const now = Date.now(); const existing = await ctx.db .query('threads') .withIndex('by_spoon', (q) => q.eq('spoonId', args.spoonId)) .order('desc') .collect() .then((threads) => threads.find( (thread) => thread.ownerId === args.ownerId && thread.upstreamTo === args.upstreamTo && !['resolved', 'ignored', 'failed', 'cancelled'].includes( thread.status, ), ), ); if (existing) { await ctx.db.insert('threadMessages', { threadId: existing._id, ownerId: args.ownerId, spoonId: args.spoonId, role: 'system', content: args.summary, status: 'completed', createdAt: now, updatedAt: now, }); await ctx.db.patch(existing._id, { relatedSyncRunId: args.relatedSyncRunId, updatedAt: now, }); return existing._id; } const threadId = await ctx.db.insert('threads', { ownerId: args.ownerId, spoonId: args.spoonId, title: args.title, summary: args.summary, source: args.source, status: 'open', priority: args.source === 'merge_conflict' ? 'high' : 'normal', upstreamFrom: args.upstreamFrom, upstreamTo: args.upstreamTo, forkHeadAtCreation: args.forkHeadAtCreation, mergeBaseAtCreation: args.mergeBaseAtCreation, relatedSyncRunId: args.relatedSyncRunId, maintenanceOutcome: 'unknown', createdAt: now, updatedAt: now, }); await ctx.db.insert('threadMessages', { threadId, ownerId: args.ownerId, spoonId: args.spoonId, role: 'system', content: args.summary, status: 'completed', createdAt: now, updatedAt: now, }); await ctx.scheduler.runAfter( 0, internal.agentJobs.createForThreadInternal, { threadId, ownerId: args.ownerId, jobType: args.jobType, }, ); return threadId; }, }); export const patchThreadInternal = internalMutation({ args: { threadId: v.id('threads'), status: v.optional(threadStatus), summary: v.optional(v.string()), maintenanceOutcome: v.optional(maintenanceOutcome), ignoredCommitShas: v.optional(v.array(v.string())), ignoredReason: v.optional(v.string()), latestAgentJobId: v.optional(v.id('agentJobs')), }, handler: async (ctx, args) => { const thread = await ctx.db.get(args.threadId); if (!thread) throw new ConvexError('Thread not found.'); const patch: Partial> = { updatedAt: Date.now() }; if (args.status !== undefined) patch.status = args.status; if (args.summary !== undefined) patch.summary = optionalText(args.summary); if (args.maintenanceOutcome !== undefined) { patch.maintenanceOutcome = args.maintenanceOutcome; } if (args.ignoredCommitShas !== undefined) { patch.ignoredCommitShas = args.ignoredCommitShas; } if (args.ignoredReason !== undefined) { patch.ignoredReason = optionalText(args.ignoredReason); } if (args.latestAgentJobId !== undefined) { patch.latestAgentJobId = args.latestAgentJobId; } if ( args.status && ['resolved', 'ignored', 'failed', 'cancelled'].includes(args.status) ) { patch.resolvedAt = Date.now(); } await ctx.db.patch(args.threadId, patch); return { success: true }; }, }); export const appendMessageInternal = internalMutation({ args: { threadId: v.id('threads'), ownerId: v.id('users'), role: messageRole, content: v.string(), status: v.optional(messageStatus), metadata: v.optional(v.string()), }, handler: async (ctx, args) => { const thread = await ctx.db.get(args.threadId); if (thread?.ownerId !== args.ownerId) { throw new ConvexError('Thread not found.'); } const now = Date.now(); return await ctx.db.insert('threadMessages', { threadId: args.threadId, ownerId: args.ownerId, spoonId: thread.spoonId, role: args.role, content: args.content, status: args.status ?? 'completed', metadata: optionalText(args.metadata), createdAt: now, updatedAt: now, }); }, }); export const recordIgnoredUpstreamChange = internalMutation({ args: { spoonId: v.id('spoons'), ownerId: v.id('users'), upstreamFrom: v.optional(v.string()), upstreamTo: v.string(), commitShas: v.array(v.string()), reason: v.string(), decidedBy: v.union(v.literal('agent'), v.literal('user')), threadId: v.optional(v.id('threads')), }, handler: async (ctx, args) => { return await ctx.db.insert('ignoredUpstreamChanges', { ...args, createdAt: Date.now(), }); }, });