459 lines
13 KiB
TypeScript
459 lines
13 KiB
TypeScript
import { ConvexError, v } from 'convex/values';
|
|
|
|
import type { Doc } from './_generated/dataModel';
|
|
import { internal } from './_generated/api';
|
|
import {
|
|
internalMutation,
|
|
internalQuery,
|
|
mutation,
|
|
query,
|
|
} from './_generated/server';
|
|
import {
|
|
getOwnedSpoon,
|
|
getRequiredUserId,
|
|
optionalText,
|
|
requireText,
|
|
} from './model';
|
|
|
|
const threadSource = v.union(
|
|
v.literal('user_request'),
|
|
v.literal('upstream_update'),
|
|
v.literal('merge_conflict'),
|
|
v.literal('manual_review'),
|
|
v.literal('system'),
|
|
);
|
|
|
|
const threadStatus = v.union(
|
|
v.literal('open'),
|
|
v.literal('queued'),
|
|
v.literal('running'),
|
|
v.literal('waiting_for_user'),
|
|
v.literal('changes_ready'),
|
|
v.literal('draft_pr_opened'),
|
|
v.literal('resolved'),
|
|
v.literal('ignored'),
|
|
v.literal('failed'),
|
|
v.literal('cancelled'),
|
|
);
|
|
|
|
const maintenanceOutcome = v.union(
|
|
v.literal('auto_synced'),
|
|
v.literal('sync_recommended'),
|
|
v.literal('ignored'),
|
|
v.literal('review_pr_recommended'),
|
|
v.literal('manual_review_required'),
|
|
v.literal('conflict_resolution_required'),
|
|
v.literal('failed'),
|
|
v.literal('unknown'),
|
|
);
|
|
|
|
const messageRole = v.union(
|
|
v.literal('user'),
|
|
v.literal('assistant'),
|
|
v.literal('system'),
|
|
v.literal('tool'),
|
|
);
|
|
|
|
const messageStatus = v.union(
|
|
v.literal('queued'),
|
|
v.literal('streaming'),
|
|
v.literal('completed'),
|
|
v.literal('failed'),
|
|
);
|
|
|
|
const titleFromPrompt = (prompt: string) => {
|
|
const firstLine = prompt.trim().split('\n')[0] ?? 'Thread';
|
|
return firstLine.length > 80 ? `${firstLine.slice(0, 77)}...` : firstLine;
|
|
};
|
|
|
|
const publicThread = (thread: Doc<'threads'>) => thread;
|
|
|
|
export const listMine = query({
|
|
args: {
|
|
status: v.optional(v.union(threadStatus, v.literal('all'))),
|
|
source: v.optional(v.union(threadSource, v.literal('all'))),
|
|
spoonId: v.optional(v.id('spoons')),
|
|
limit: v.optional(v.number()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const threads = await ctx.db
|
|
.query('threads')
|
|
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
|
|
.order('desc')
|
|
.take(args.limit ?? 50);
|
|
return threads.filter((thread) => {
|
|
if (
|
|
args.status &&
|
|
args.status !== 'all' &&
|
|
thread.status !== args.status
|
|
) {
|
|
return false;
|
|
}
|
|
if (
|
|
args.source &&
|
|
args.source !== 'all' &&
|
|
thread.source !== args.source
|
|
) {
|
|
return false;
|
|
}
|
|
if (args.spoonId && thread.spoonId !== args.spoonId) return false;
|
|
return true;
|
|
});
|
|
},
|
|
});
|
|
|
|
export const listForSpoon = query({
|
|
args: { spoonId: v.id('spoons'), limit: v.optional(v.number()) },
|
|
handler: async (ctx, { spoonId, limit }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
await getOwnedSpoon(ctx, spoonId, ownerId);
|
|
return await ctx.db
|
|
.query('threads')
|
|
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
|
|
.order('desc')
|
|
.take(limit ?? 25);
|
|
},
|
|
});
|
|
|
|
export const get = query({
|
|
args: { threadId: v.id('threads') },
|
|
handler: async (ctx, { threadId }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const thread = await ctx.db.get(threadId);
|
|
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
|
const spoon = thread.spoonId ? await ctx.db.get(thread.spoonId) : null;
|
|
const job = thread.latestAgentJobId
|
|
? await ctx.db.get(thread.latestAgentJobId)
|
|
: null;
|
|
return {
|
|
thread: publicThread(thread),
|
|
spoon: spoon?.ownerId === ownerId ? spoon : null,
|
|
latestJob: job?.ownerId === ownerId ? job : null,
|
|
};
|
|
},
|
|
});
|
|
|
|
export const listMessages = query({
|
|
args: { threadId: v.id('threads'), limit: v.optional(v.number()) },
|
|
handler: async (ctx, { threadId, limit }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const thread = await ctx.db.get(threadId);
|
|
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
|
return await ctx.db
|
|
.query('threadMessages')
|
|
.withIndex('by_thread', (q) => q.eq('threadId', threadId))
|
|
.order('asc')
|
|
.take(limit ?? 200);
|
|
},
|
|
});
|
|
|
|
export const createUserThread = mutation({
|
|
args: {
|
|
spoonId: v.id('spoons'),
|
|
title: v.optional(v.string()),
|
|
prompt: v.string(),
|
|
baseBranch: v.optional(v.string()),
|
|
requestedBranchName: v.optional(v.string()),
|
|
materializeEnvFile: v.optional(v.boolean()),
|
|
envFilePath: v.optional(v.string()),
|
|
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
await getOwnedSpoon(ctx, args.spoonId, ownerId);
|
|
const prompt = requireText(args.prompt, 'Prompt');
|
|
const now = Date.now();
|
|
const threadId = await ctx.db.insert('threads', {
|
|
ownerId,
|
|
spoonId: args.spoonId,
|
|
title: optionalText(args.title) ?? titleFromPrompt(prompt),
|
|
summary: prompt,
|
|
source: 'user_request',
|
|
status: 'open',
|
|
priority: 'normal',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
await ctx.db.insert('threadMessages', {
|
|
threadId,
|
|
ownerId,
|
|
spoonId: args.spoonId,
|
|
role: 'user',
|
|
content: prompt,
|
|
status: 'completed',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
await ctx.scheduler.runAfter(
|
|
0,
|
|
internal.agentJobs.createForThreadInternal,
|
|
{
|
|
threadId,
|
|
ownerId,
|
|
jobType: 'user_change',
|
|
baseBranch: args.baseBranch,
|
|
requestedBranchName: args.requestedBranchName,
|
|
materializeEnvFile: args.materializeEnvFile,
|
|
envFilePath: args.envFilePath,
|
|
aiProviderProfileId: args.aiProviderProfileId,
|
|
},
|
|
);
|
|
return threadId;
|
|
},
|
|
});
|
|
|
|
export const appendUserMessage = mutation({
|
|
args: { threadId: v.id('threads'), content: v.string() },
|
|
handler: async (ctx, { threadId, content }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const thread = await ctx.db.get(threadId);
|
|
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
|
const now = Date.now();
|
|
return await ctx.db.insert('threadMessages', {
|
|
threadId,
|
|
ownerId,
|
|
spoonId: thread.spoonId,
|
|
role: 'user',
|
|
content: requireText(content, 'Message'),
|
|
status: 'queued',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
},
|
|
});
|
|
|
|
export const cancel = mutation({
|
|
args: { threadId: v.id('threads') },
|
|
handler: async (ctx, { threadId }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const thread = await ctx.db.get(threadId);
|
|
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
|
await ctx.db.patch(threadId, {
|
|
status: 'cancelled',
|
|
updatedAt: Date.now(),
|
|
resolvedAt: Date.now(),
|
|
});
|
|
return { success: true };
|
|
},
|
|
});
|
|
|
|
export const markResolved = mutation({
|
|
args: { threadId: v.id('threads') },
|
|
handler: async (ctx, { threadId }) => {
|
|
const ownerId = await getRequiredUserId(ctx);
|
|
const thread = await ctx.db.get(threadId);
|
|
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
|
await ctx.db.patch(threadId, {
|
|
status: 'resolved',
|
|
updatedAt: Date.now(),
|
|
resolvedAt: Date.now(),
|
|
});
|
|
return { success: true };
|
|
},
|
|
});
|
|
|
|
export const findOpenMaintenanceThread = internalQuery({
|
|
args: {
|
|
spoonId: v.id('spoons'),
|
|
ownerId: v.id('users'),
|
|
upstreamTo: v.string(),
|
|
},
|
|
handler: async (ctx, { spoonId, ownerId, upstreamTo }) => {
|
|
const threads = await ctx.db
|
|
.query('threads')
|
|
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
|
|
.order('desc')
|
|
.collect();
|
|
return (
|
|
threads.find(
|
|
(thread) =>
|
|
thread.ownerId === ownerId &&
|
|
thread.upstreamTo === upstreamTo &&
|
|
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
|
|
thread.status,
|
|
),
|
|
) ?? null
|
|
);
|
|
},
|
|
});
|
|
|
|
export const createMaintenanceThread = internalMutation({
|
|
args: {
|
|
spoonId: v.id('spoons'),
|
|
ownerId: v.id('users'),
|
|
source: v.union(v.literal('upstream_update'), v.literal('merge_conflict')),
|
|
title: v.string(),
|
|
summary: v.string(),
|
|
upstreamFrom: v.optional(v.string()),
|
|
upstreamTo: v.string(),
|
|
forkHeadAtCreation: v.optional(v.string()),
|
|
mergeBaseAtCreation: v.optional(v.string()),
|
|
relatedSyncRunId: v.optional(v.id('syncRuns')),
|
|
jobType: v.union(
|
|
v.literal('maintenance_review'),
|
|
v.literal('conflict_resolution'),
|
|
),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const now = Date.now();
|
|
const existing = await ctx.db
|
|
.query('threads')
|
|
.withIndex('by_spoon', (q) => q.eq('spoonId', args.spoonId))
|
|
.order('desc')
|
|
.collect()
|
|
.then((threads) =>
|
|
threads.find(
|
|
(thread) =>
|
|
thread.ownerId === args.ownerId &&
|
|
thread.upstreamTo === args.upstreamTo &&
|
|
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
|
|
thread.status,
|
|
),
|
|
),
|
|
);
|
|
if (existing) {
|
|
await ctx.db.insert('threadMessages', {
|
|
threadId: existing._id,
|
|
ownerId: args.ownerId,
|
|
spoonId: args.spoonId,
|
|
role: 'system',
|
|
content: args.summary,
|
|
status: 'completed',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
await ctx.db.patch(existing._id, {
|
|
relatedSyncRunId: args.relatedSyncRunId,
|
|
updatedAt: now,
|
|
});
|
|
return existing._id;
|
|
}
|
|
|
|
const threadId = await ctx.db.insert('threads', {
|
|
ownerId: args.ownerId,
|
|
spoonId: args.spoonId,
|
|
title: args.title,
|
|
summary: args.summary,
|
|
source: args.source,
|
|
status: 'open',
|
|
priority: args.source === 'merge_conflict' ? 'high' : 'normal',
|
|
upstreamFrom: args.upstreamFrom,
|
|
upstreamTo: args.upstreamTo,
|
|
forkHeadAtCreation: args.forkHeadAtCreation,
|
|
mergeBaseAtCreation: args.mergeBaseAtCreation,
|
|
relatedSyncRunId: args.relatedSyncRunId,
|
|
maintenanceOutcome: 'unknown',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
await ctx.db.insert('threadMessages', {
|
|
threadId,
|
|
ownerId: args.ownerId,
|
|
spoonId: args.spoonId,
|
|
role: 'system',
|
|
content: args.summary,
|
|
status: 'completed',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
await ctx.scheduler.runAfter(
|
|
0,
|
|
internal.agentJobs.createForThreadInternal,
|
|
{
|
|
threadId,
|
|
ownerId: args.ownerId,
|
|
jobType: args.jobType,
|
|
},
|
|
);
|
|
return threadId;
|
|
},
|
|
});
|
|
|
|
export const patchThreadInternal = internalMutation({
|
|
args: {
|
|
threadId: v.id('threads'),
|
|
status: v.optional(threadStatus),
|
|
summary: v.optional(v.string()),
|
|
maintenanceOutcome: v.optional(maintenanceOutcome),
|
|
ignoredCommitShas: v.optional(v.array(v.string())),
|
|
ignoredReason: v.optional(v.string()),
|
|
latestAgentJobId: v.optional(v.id('agentJobs')),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const thread = await ctx.db.get(args.threadId);
|
|
if (!thread) throw new ConvexError('Thread not found.');
|
|
const patch: Partial<Doc<'threads'>> = { updatedAt: Date.now() };
|
|
if (args.status !== undefined) patch.status = args.status;
|
|
if (args.summary !== undefined) patch.summary = optionalText(args.summary);
|
|
if (args.maintenanceOutcome !== undefined) {
|
|
patch.maintenanceOutcome = args.maintenanceOutcome;
|
|
}
|
|
if (args.ignoredCommitShas !== undefined) {
|
|
patch.ignoredCommitShas = args.ignoredCommitShas;
|
|
}
|
|
if (args.ignoredReason !== undefined) {
|
|
patch.ignoredReason = optionalText(args.ignoredReason);
|
|
}
|
|
if (args.latestAgentJobId !== undefined) {
|
|
patch.latestAgentJobId = args.latestAgentJobId;
|
|
}
|
|
if (
|
|
args.status &&
|
|
['resolved', 'ignored', 'failed', 'cancelled'].includes(args.status)
|
|
) {
|
|
patch.resolvedAt = Date.now();
|
|
}
|
|
await ctx.db.patch(args.threadId, patch);
|
|
return { success: true };
|
|
},
|
|
});
|
|
|
|
export const appendMessageInternal = internalMutation({
|
|
args: {
|
|
threadId: v.id('threads'),
|
|
ownerId: v.id('users'),
|
|
role: messageRole,
|
|
content: v.string(),
|
|
status: v.optional(messageStatus),
|
|
metadata: v.optional(v.string()),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
const thread = await ctx.db.get(args.threadId);
|
|
if (thread?.ownerId !== args.ownerId) {
|
|
throw new ConvexError('Thread not found.');
|
|
}
|
|
const now = Date.now();
|
|
return await ctx.db.insert('threadMessages', {
|
|
threadId: args.threadId,
|
|
ownerId: args.ownerId,
|
|
spoonId: thread.spoonId,
|
|
role: args.role,
|
|
content: args.content,
|
|
status: args.status ?? 'completed',
|
|
metadata: optionalText(args.metadata),
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
});
|
|
},
|
|
});
|
|
|
|
export const recordIgnoredUpstreamChange = internalMutation({
|
|
args: {
|
|
spoonId: v.id('spoons'),
|
|
ownerId: v.id('users'),
|
|
upstreamFrom: v.optional(v.string()),
|
|
upstreamTo: v.string(),
|
|
commitShas: v.array(v.string()),
|
|
reason: v.string(),
|
|
decidedBy: v.union(v.literal('agent'), v.literal('user')),
|
|
threadId: v.optional(v.id('threads')),
|
|
},
|
|
handler: async (ctx, args) => {
|
|
return await ctx.db.insert('ignoredUpstreamChanges', {
|
|
...args,
|
|
createdAt: Date.now(),
|
|
});
|
|
},
|
|
});
|