Move to threads based system.

This commit is contained in:
Gabriel Brown
2026-06-22 10:37:26 -04:00
parent 8ae6c4b533
commit 206b64176b
82 changed files with 6169 additions and 1930 deletions
+772 -52
View File
@@ -18,6 +18,51 @@ const jobStatus = v.union(
v.literal('timed_out'),
);
const runtime = v.literal('opencode');
const jobType = v.union(
v.literal('user_change'),
v.literal('maintenance_review'),
v.literal('conflict_resolution'),
);
const workspaceStatus = v.union(
v.literal('not_started'),
v.literal('starting'),
v.literal('active'),
v.literal('idle'),
v.literal('stopped'),
v.literal('expired'),
v.literal('failed'),
);
const messageRole = v.union(
v.literal('user'),
v.literal('assistant'),
v.literal('system'),
v.literal('tool'),
);
const messageStatus = v.union(
v.literal('queued'),
v.literal('streaming'),
v.literal('completed'),
v.literal('failed'),
);
const changeSource = v.union(
v.literal('user'),
v.literal('agent'),
v.literal('command'),
);
const changeType = v.union(
v.literal('added'),
v.literal('modified'),
v.literal('deleted'),
v.literal('renamed'),
);
const eventLevel = v.union(
v.literal('debug'),
v.literal('info'),
@@ -55,13 +100,34 @@ const artifactContentType = v.union(
v.literal('text/x-diff'),
);
const maintenanceDecision = v.union(
v.literal('sync'),
v.literal('ignore'),
v.literal('open_review_pr'),
v.literal('manual_review'),
v.literal('conflict_resolution'),
v.literal('unknown'),
);
const maintenanceRisk = v.union(
v.literal('low'),
v.literal('medium'),
v.literal('high'),
v.literal('unknown'),
);
const defaultAgentSettings = {
enabled: true,
runtime: 'opencode' as const,
branchPrefix: 'spoon/agent',
agentModel: 'gpt-5.1-codex',
reasoningEffort: 'high' as const,
agentModel: '',
reasoningEffort: 'medium' as const,
maxJobDurationMs: 1_800_000,
maxOutputBytes: 200_000,
envFilePath: '.env.local',
materializeEnvFileByDefault: false,
autoDetectCommands: true,
allowUserFileEditing: true,
};
const getWorkerToken = () => process.env.SPOON_WORKER_TOKEN?.trim();
@@ -94,6 +160,18 @@ const buildBranch = (
)}`;
};
const normalizeEnvFilePath = (value?: string) => {
const trimmed = optionalText(value);
if (!trimmed) return undefined;
if (trimmed.startsWith('/') || trimmed.includes('..')) {
throw new ConvexError('Env file path must stay inside the repository.');
}
if (!/^\.env(?:[./-][A-Za-z0-9_.-]+)?$/.test(trimmed)) {
throw new ConvexError('Env file path must be a .env-style path.');
}
return trimmed;
};
const getAgentSettings = async (ctx: MutationCtx, spoon: Doc<'spoons'>) => {
const settings = await ctx.db
.query('spoonAgentSettings')
@@ -120,12 +198,207 @@ const assertSecretOwnership = async (
}
};
const getJobProfile = async (
ctx: MutationCtx,
ownerId: Id<'users'>,
profileId?: Id<'aiProviderProfiles'>,
) => {
const profile = profileId
? await ctx.db.get(profileId)
: await getDefaultJobProfile(ctx, ownerId);
if (profile?.ownerId !== ownerId || !profile.enabled) {
throw new ConvexError('AI provider profile not found.');
}
if (profile.authType !== 'none' && !profile.encryptedSecret) {
throw new ConvexError('Selected AI provider is missing credentials.');
}
return profile;
};
const getDefaultJobProfile = async (ctx: MutationCtx, ownerId: Id<'users'>) => {
const profiles = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.collect();
const configuredProfiles = profiles.filter(
(profile) =>
profile.enabled &&
(profile.authType === 'none' || Boolean(profile.encryptedSecret)),
);
const explicitDefault = configuredProfiles.find(
(profile) =>
(profile as Doc<'aiProviderProfiles'> & { isDefault?: boolean })
.isDefault,
);
const profile =
explicitDefault ??
(configuredProfiles.length === 1 ? configuredProfiles[0] : undefined);
if (!profile) {
throw new ConvexError(
'Choose a default AI provider before queueing agent work.',
);
}
return profile;
};
const listSpoonSecretIds = async (
ctx: MutationCtx,
spoonId: Id<'spoons'>,
ownerId: Id<'users'>,
) => {
const secrets = await ctx.db
.query('spoonSecrets')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.collect();
return secrets
.filter((secret) => secret.ownerId === ownerId)
.map((secret) => secret._id);
};
const insertJob = async (
ctx: MutationCtx,
{
ownerId,
spoon,
requestId,
prompt,
settings,
threadId,
requestedJobType,
baseBranch,
requestedBranchName,
requestedRuntime,
materializeEnvFile,
requestedEnvFilePath,
requestedProfileId,
}: {
ownerId: Id<'users'>;
spoon: Doc<'spoons'>;
requestId: Id<'agentRequests'>;
prompt: string;
settings: Awaited<ReturnType<typeof getAgentSettings>>;
threadId?: Id<'threads'>;
requestedJobType:
| 'user_change'
| 'maintenance_review'
| 'conflict_resolution';
baseBranch?: string;
requestedBranchName?: string;
requestedRuntime?: 'opencode';
materializeEnvFile?: boolean;
requestedEnvFilePath?: string;
requestedProfileId?: Id<'aiProviderProfiles'>;
},
) => {
if (spoon.provider !== 'github') {
throw new ConvexError('Agent jobs currently require a GitHub Spoon.');
}
if (!spoon.forkOwner || !spoon.forkRepo || !spoon.forkUrl) {
throw new ConvexError(
'Add fork repository metadata before queueing a job.',
);
}
if (!settings.enabled) {
throw new ConvexError('Agent jobs are disabled for this Spoon.');
}
const aiProviderProfileId =
requestedProfileId ?? settings.aiProviderProfileId;
const profile = await getJobProfile(ctx, ownerId, aiProviderProfileId);
const selectedSecretIds = await listSpoonSecretIds(ctx, spoon._id, ownerId);
const now = Date.now();
const resolvedBaseBranch =
optionalText(baseBranch) ?? settings.defaultBaseBranch;
const jobRuntime = requestedRuntime ?? 'opencode';
const shouldMaterializeEnvFile =
materializeEnvFile ?? settings.materializeEnvFileByDefault;
const envFilePath =
normalizeEnvFilePath(requestedEnvFilePath) ??
normalizeEnvFilePath(
settings.envFilePath === 'custom'
? settings.customEnvFilePath
: settings.envFilePath,
);
const workBranch = buildBranch(
requestId,
prompt,
settings.branchPrefix,
requestedBranchName,
);
const jobId = await ctx.db.insert('agentJobs', {
spoonId: spoon._id,
ownerId,
agentRequestId: requestId,
threadId,
jobType: requestedJobType,
status: 'queued',
prompt,
runtime: jobRuntime,
workspaceStatus: 'not_started',
baseBranch: resolvedBaseBranch,
workBranch,
envFilePath,
materializeEnvFile: shouldMaterializeEnvFile,
githubInstallationId: spoon.githubInstallationId,
forkOwner: spoon.forkOwner,
forkRepo: spoon.forkRepo,
forkUrl: spoon.forkUrl,
upstreamOwner: spoon.upstreamOwner,
upstreamRepo: spoon.upstreamRepo,
selectedSecretIds,
aiProviderProfileId: profile._id,
model: profile.defaultModel,
reasoningEffort: profile.reasoningEffort,
createdAt: now,
updatedAt: now,
});
await ctx.db.patch(requestId, {
agentJobId: jobId,
selectedSecretIds,
baseBranch: resolvedBaseBranch,
requestedBranchName: optionalText(requestedBranchName),
status: 'queued',
updatedAt: now,
});
if (threadId) {
await ctx.db.patch(threadId, {
latestAgentJobId: jobId,
relatedAgentRequestId: requestId,
status: 'queued',
updatedAt: now,
});
}
await ctx.db.insert('agentJobEvents', {
jobId,
spoonId: spoon._id,
ownerId,
level: 'info',
phase: 'queued',
message: 'OpenCode job queued.',
createdAt: now,
});
await ctx.db.insert('agentJobMessages', {
jobId,
spoonId: spoon._id,
ownerId,
role: 'user',
content: prompt,
status: 'completed',
createdAt: now,
updatedAt: now,
});
return jobId;
};
export const createFromRequest = mutation({
args: {
requestId: v.id('agentRequests'),
selectedSecretIds: v.array(v.id('spoonSecrets')),
baseBranch: v.optional(v.string()),
requestedBranchName: v.optional(v.string()),
runtime: v.optional(runtime),
materializeEnvFile: v.optional(v.boolean()),
envFilePath: v.optional(v.string()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
@@ -137,74 +410,158 @@ export const createFromRequest = mutation({
throw new ConvexError('This request already has an agent job.');
}
const spoon = await getOwnedSpoon(ctx, request.spoonId, ownerId);
if (spoon.provider !== 'github') {
throw new ConvexError('Agent jobs currently require a GitHub Spoon.');
}
if (!spoon.forkOwner || !spoon.forkRepo || !spoon.forkUrl) {
throw new ConvexError(
'Add fork repository metadata before queueing a job.',
);
}
const settings = await getAgentSettings(ctx, spoon);
if (!settings.enabled) {
throw new ConvexError('Agent jobs are disabled for this Spoon.');
}
await assertSecretOwnership(
ctx,
spoon._id,
ownerId,
args.selectedSecretIds,
);
const now = Date.now();
const baseBranch =
optionalText(args.baseBranch) ?? settings.defaultBaseBranch;
const workBranch = buildBranch(
request._id,
request.prompt,
settings.branchPrefix,
args.requestedBranchName,
);
const jobId = await ctx.db.insert('agentJobs', {
spoonId: spoon._id,
return await insertJob(ctx, {
ownerId,
agentRequestId: request._id,
status: 'queued',
spoon,
requestId: request._id,
prompt: request.prompt,
baseBranch,
workBranch,
githubInstallationId: spoon.githubInstallationId,
forkOwner: spoon.forkOwner,
forkRepo: spoon.forkRepo,
forkUrl: spoon.forkUrl,
upstreamOwner: spoon.upstreamOwner,
upstreamRepo: spoon.upstreamRepo,
selectedSecretIds: args.selectedSecretIds,
model: settings.agentModel,
reasoningEffort: settings.reasoningEffort,
createdAt: now,
updatedAt: now,
settings,
requestedJobType: 'user_change',
baseBranch: args.baseBranch,
requestedBranchName: args.requestedBranchName,
requestedRuntime: args.runtime,
materializeEnvFile: args.materializeEnvFile,
requestedEnvFilePath: args.envFilePath,
requestedProfileId: args.aiProviderProfileId,
});
await ctx.db.patch(request._id, {
agentJobId: jobId,
selectedSecretIds: args.selectedSecretIds,
baseBranch,
requestedBranchName: optionalText(args.requestedBranchName),
status: 'queued',
updatedAt: now,
});
await ctx.db.insert('agentJobEvents', {
jobId,
},
});
export const createForThread = mutation({
args: {
threadId: v.id('threads'),
jobType,
baseBranch: v.optional(v.string()),
requestedBranchName: v.optional(v.string()),
materializeEnvFile: v.optional(v.boolean()),
envFilePath: v.optional(v.string()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(args.threadId);
if (thread?.ownerId !== ownerId || !thread.spoonId) {
throw new ConvexError('Thread not found.');
}
if (thread.latestAgentJobId) {
throw new ConvexError('This thread already has an agent job.');
}
const spoon = await getOwnedSpoon(ctx, thread.spoonId, ownerId);
const promptMessage = await ctx.db
.query('threadMessages')
.withIndex('by_thread', (q) => q.eq('threadId', args.threadId))
.order('desc')
.first();
const prompt =
promptMessage?.content ??
thread.summary ??
`Work on thread: ${thread.title}`;
const now = Date.now();
const requestId = await ctx.db.insert('agentRequests', {
spoonId: spoon._id,
ownerId,
level: 'info',
phase: 'queued',
message: 'Agent job queued.',
prompt,
status: 'queued',
requestType:
args.jobType === 'user_change'
? 'future_code_change'
: 'upstream_review',
priority: thread.priority,
source: thread.source === 'user_request' ? 'user' : 'system',
targetBranch: optionalText(args.baseBranch),
createdAt: now,
updatedAt: now,
});
const settings = await getAgentSettings(ctx, spoon);
const jobId = await insertJob(ctx, {
ownerId,
spoon,
requestId,
prompt,
settings,
threadId: args.threadId,
requestedJobType: args.jobType,
baseBranch: args.baseBranch,
requestedBranchName: args.requestedBranchName,
materializeEnvFile: args.materializeEnvFile,
requestedEnvFilePath: args.envFilePath,
requestedProfileId: args.aiProviderProfileId,
});
return jobId;
},
});
export const createForThreadInternal = internalMutation({
args: {
threadId: v.id('threads'),
ownerId: v.id('users'),
jobType,
baseBranch: v.optional(v.string()),
requestedBranchName: v.optional(v.string()),
materializeEnvFile: v.optional(v.boolean()),
envFilePath: v.optional(v.string()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
},
handler: async (ctx, args) => {
const thread = await ctx.db.get(args.threadId);
if (thread?.ownerId !== args.ownerId || !thread.spoonId) {
throw new ConvexError('Thread not found.');
}
if (thread.latestAgentJobId) return thread.latestAgentJobId;
const spoon = await ctx.db.get(thread.spoonId);
if (spoon?.ownerId !== args.ownerId) {
throw new ConvexError('Spoon not found.');
}
const promptMessage = await ctx.db
.query('threadMessages')
.withIndex('by_thread', (q) => q.eq('threadId', args.threadId))
.order('desc')
.first();
const prompt =
promptMessage?.content ??
thread.summary ??
`Review maintenance thread: ${thread.title}`;
const now = Date.now();
const requestId = await ctx.db.insert('agentRequests', {
spoonId: spoon._id,
ownerId: args.ownerId,
prompt,
status: 'queued',
requestType:
args.jobType === 'user_change'
? 'future_code_change'
: 'upstream_review',
priority: thread.priority,
source: thread.source === 'user_request' ? 'user' : 'system',
targetBranch: optionalText(args.baseBranch),
createdAt: now,
updatedAt: now,
});
const settings = await getAgentSettings(ctx, spoon);
return await insertJob(ctx, {
ownerId: args.ownerId,
spoon,
requestId,
prompt,
settings,
threadId: args.threadId,
requestedJobType: args.jobType,
baseBranch: args.baseBranch,
requestedBranchName: args.requestedBranchName,
materializeEnvFile: args.materializeEnvFile,
requestedEnvFilePath: args.envFilePath,
requestedProfileId: args.aiProviderProfileId,
});
},
});
export const listForSpoon = query({
args: { spoonId: v.id('spoons'), limit: v.optional(v.number()) },
handler: async (ctx, { spoonId, limit }) => {
@@ -228,6 +585,66 @@ export const get = query({
},
});
export const assertOwned = query({
args: { jobId: v.id('agentJobs') },
handler: async (ctx, { jobId }) => {
const ownerId = await getRequiredUserId(ctx);
const job = await ctx.db.get(jobId);
if (job?.ownerId !== ownerId) throw new ConvexError('Agent job not found.');
return { job, ownerId };
},
});
export const listMessages = query({
args: { jobId: v.id('agentJobs'), limit: v.optional(v.number()) },
handler: async (ctx, { jobId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
const job = await ctx.db.get(jobId);
if (job?.ownerId !== ownerId) throw new ConvexError('Agent job not found.');
return await ctx.db
.query('agentJobMessages')
.withIndex('by_job', (q) => q.eq('jobId', jobId))
.order('asc')
.take(limit ?? 200);
},
});
export const appendUserMessage = mutation({
args: { jobId: v.id('agentJobs'), content: v.string() },
handler: async (ctx, { jobId, content }) => {
const ownerId = await getRequiredUserId(ctx);
const job = await ctx.db.get(jobId);
if (job?.ownerId !== ownerId) throw new ConvexError('Agent job not found.');
const trimmed = optionalText(content);
if (!trimmed) throw new ConvexError('Message is required.');
const now = Date.now();
return await ctx.db.insert('agentJobMessages', {
jobId,
spoonId: job.spoonId,
ownerId,
role: 'user',
content: trimmed,
status: 'queued',
createdAt: now,
updatedAt: now,
});
},
});
export const listWorkspaceChanges = query({
args: { jobId: v.id('agentJobs'), limit: v.optional(v.number()) },
handler: async (ctx, { jobId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
const job = await ctx.db.get(jobId);
if (job?.ownerId !== ownerId) throw new ConvexError('Agent job not found.');
return await ctx.db
.query('agentWorkspaceChanges')
.withIndex('by_job', (q) => q.eq('jobId', jobId))
.order('desc')
.take(limit ?? 100);
},
});
export const listEvents = query({
args: { jobId: v.id('agentJobs'), limit: v.optional(v.number()) },
handler: async (ctx, { jobId, limit }) => {
@@ -312,6 +729,9 @@ export const claimNextInternal = internalMutation({
.query('spoonAgentSettings')
.withIndex('by_spoon', (q) => q.eq('spoonId', job.spoonId))
.first();
const aiProviderProfile = job.aiProviderProfileId
? await ctx.db.get(job.aiProviderProfileId)
: null;
const secrets = [];
for (const secretId of job.selectedSecretIds) {
const secret = await ctx.db.get(secretId);
@@ -343,6 +763,8 @@ export const claimNextInternal = internalMutation({
job: { ...job, status: 'claimed' as const, claimedBy: workerId },
spoon,
aiSettings,
aiProviderProfile:
aiProviderProfile?.ownerId === job.ownerId ? aiProviderProfile : null,
agentSettings,
secrets,
};
@@ -380,6 +802,110 @@ export const updateStatus = mutation({
if (args.error !== undefined) patch.error = args.error;
if (args.summary !== undefined) patch.summary = args.summary;
await ctx.db.patch(args.jobId, patch);
if (job.threadId) {
const threadStatus =
args.status === 'queued' || args.status === 'claimed'
? 'queued'
: args.status === 'running' || args.status === 'checks_running'
? 'running'
: args.status === 'changes_ready'
? 'changes_ready'
: args.status === 'draft_pr_opened'
? 'draft_pr_opened'
: args.status === 'failed' || args.status === 'timed_out'
? 'failed'
: args.status === 'cancelled'
? 'cancelled'
: undefined;
if (threadStatus) {
const threadPatch: Partial<Doc<'threads'>> = {
status: threadStatus,
summary: args.summary ?? job.summary,
updatedAt: now,
};
if (
['draft_pr_opened', 'failed', 'cancelled', 'timed_out'].includes(
args.status,
)
) {
threadPatch.resolvedAt = now;
}
await ctx.db.patch(job.threadId, threadPatch);
}
}
return { success: true };
},
});
export const markWorkspaceActive = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
opencodeSessionId: v.optional(v.string()),
containerId: v.optional(v.string()),
workspaceUrl: v.optional(v.string()),
workspaceExpiresAt: v.optional(v.number()),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
const now = Date.now();
await ctx.db.patch(args.jobId, {
workspaceStatus: 'active',
opencodeSessionId: optionalText(args.opencodeSessionId),
containerId: optionalText(args.containerId),
workspaceUrl: optionalText(args.workspaceUrl),
workspaceExpiresAt: args.workspaceExpiresAt,
lastHeartbeatAt: now,
updatedAt: now,
});
return { success: true };
},
});
export const markWorkspaceStopped = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
workspaceStatus: v.optional(workspaceStatus),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
const now = Date.now();
await ctx.db.patch(args.jobId, {
workspaceStatus: args.workspaceStatus ?? 'stopped',
updatedAt: now,
});
return { success: true };
},
});
export const heartbeatWorkspace = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
await ctx.db.patch(args.jobId, {
workspaceStatus: 'active',
lastHeartbeatAt: Date.now(),
updatedAt: Date.now(),
});
return { success: true };
},
});
@@ -416,6 +942,98 @@ export const completeWithDraftPr = mutation({
summary: args.summary,
updatedAt: now,
});
if (job.threadId) {
await ctx.db.patch(job.threadId, {
status: 'draft_pr_opened',
summary: args.summary,
updatedAt: now,
resolvedAt: now,
});
}
return { success: true };
},
});
export const applyMaintenanceDecision = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
decision: maintenanceDecision,
risk: maintenanceRisk,
summary: v.string(),
ignoredCommitShas: v.array(v.string()),
ignoredReason: v.optional(v.string()),
recommendedAction: v.string(),
requiresUserApproval: v.boolean(),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
if (!job.threadId) return { success: true };
const now = Date.now();
const outcome =
args.decision === 'sync'
? 'sync_recommended'
: args.decision === 'ignore'
? 'ignored'
: args.decision === 'open_review_pr'
? 'review_pr_recommended'
: args.decision === 'conflict_resolution'
? 'conflict_resolution_required'
: args.decision === 'manual_review'
? 'manual_review_required'
: 'unknown';
const status =
args.decision === 'ignore'
? 'ignored'
: args.decision === 'sync' && !args.requiresUserApproval
? 'resolved'
: 'waiting_for_user';
const threadPatch: Partial<Doc<'threads'>> = {
status,
maintenanceOutcome: outcome,
summary: args.summary,
ignoredCommitShas: args.ignoredCommitShas,
ignoredReason: args.ignoredReason,
updatedAt: now,
};
if (status === 'ignored' || status === 'resolved') {
threadPatch.resolvedAt = now;
}
await ctx.db.patch(job.threadId, threadPatch);
if (args.decision === 'ignore' && args.ignoredCommitShas.length > 0) {
const thread = await ctx.db.get(job.threadId);
await ctx.db.insert('ignoredUpstreamChanges', {
spoonId: job.spoonId,
ownerId: job.ownerId,
upstreamFrom: thread?.upstreamFrom,
upstreamTo: thread?.upstreamTo ?? job.upstreamRepo,
commitShas: args.ignoredCommitShas,
reason: args.ignoredReason ?? args.summary,
decidedBy: 'agent',
threadId: job.threadId,
createdAt: now,
});
}
await ctx.db.insert('threadMessages', {
threadId: job.threadId,
ownerId: job.ownerId,
spoonId: job.spoonId,
role: 'assistant',
content: args.summary,
status: 'completed',
metadata: JSON.stringify({
decision: args.decision,
risk: args.risk,
recommendedAction: args.recommendedAction,
}),
createdAt: now,
updatedAt: now,
});
return { success: true };
},
});
@@ -449,6 +1067,108 @@ export const appendEvent = mutation({
},
});
export const appendMessage = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
role: messageRole,
content: v.string(),
status: messageStatus,
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
const now = Date.now();
const messageId = await ctx.db.insert('agentJobMessages', {
jobId: args.jobId,
spoonId: job.spoonId,
ownerId: job.ownerId,
role: args.role,
content: args.content,
status: args.status,
metadata: args.metadata,
createdAt: now,
updatedAt: now,
});
if (job.threadId) {
await ctx.db.insert('threadMessages', {
threadId: job.threadId,
spoonId: job.spoonId,
ownerId: job.ownerId,
role: args.role,
content: args.content,
status: args.status,
metadata: args.metadata,
createdAt: now,
updatedAt: now,
});
}
return messageId;
},
});
export const updateMessage = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
messageId: v.id('agentJobMessages'),
content: v.optional(v.string()),
status: v.optional(messageStatus),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const message = await ctx.db.get(args.messageId);
if (!message) throw new ConvexError('Agent message not found.');
const job = await ctx.db.get(message.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
const patch: Partial<Doc<'agentJobMessages'>> = {
updatedAt: Date.now(),
};
if (args.content !== undefined) patch.content = args.content;
if (args.status !== undefined) patch.status = args.status;
if (args.metadata !== undefined) patch.metadata = args.metadata;
await ctx.db.patch(args.messageId, patch);
return { success: true };
},
});
export const recordWorkspaceChange = mutation({
args: {
workerToken: v.string(),
workerId: v.string(),
jobId: v.id('agentJobs'),
path: v.string(),
source: changeSource,
changeType,
diff: v.optional(v.string()),
},
handler: async (ctx, args) => {
requireWorkerToken(args.workerToken);
const job = await ctx.db.get(args.jobId);
if (job?.claimedBy !== args.workerId) {
throw new ConvexError('Agent job not claimed by this worker.');
}
return await ctx.db.insert('agentWorkspaceChanges', {
jobId: args.jobId,
spoonId: job.spoonId,
ownerId: job.ownerId,
path: args.path,
source: args.source,
changeType: args.changeType,
diff: args.diff,
createdAt: Date.now(),
});
},
});
export const addArtifact = mutation({
args: {
workerToken: v.string(),
+35 -6
View File
@@ -11,6 +11,7 @@ type ClaimedJob = {
job: Doc<'agentJobs'>;
spoon: Doc<'spoons'> | null;
aiSettings: Doc<'userAiSettings'> | null;
aiProviderProfile: Doc<'aiProviderProfiles'> | null;
agentSettings: Doc<'spoonAgentSettings'> | null;
secrets: Doc<'spoonSecrets'>[];
};
@@ -19,10 +20,20 @@ type WorkerClaim = {
job: Doc<'agentJobs'>;
spoon: Doc<'spoons'>;
openai: {
apiKey: string;
apiKey?: string;
model: string;
reasoningEffort: Doc<'agentJobs'>['reasoningEffort'];
};
aiProviderProfile?: {
id: string;
name: string;
provider: Doc<'aiProviderProfiles'>['provider'];
authType: Doc<'aiProviderProfiles'>['authType'];
secret?: string;
baseUrl?: string;
model: string;
reasoningEffort: Doc<'aiProviderProfiles'>['reasoningEffort'];
};
agentSettings: Doc<'spoonAgentSettings'> | null;
github: {
installationId?: string;
@@ -53,18 +64,36 @@ export const claimNextForWorker = action({
if (!claimed.spoon) {
throw new ConvexError('Claimed job points at a missing Spoon.');
}
if (!claimed.aiSettings?.encryptedApiKey) {
if (!claimed.aiProviderProfile) {
throw new ConvexError(
'OpenAI is not configured for this user. Add an OpenAI API key in settings.',
'AI is not configured for this user. Add an AI provider in settings.',
);
}
if (
claimed.aiProviderProfile.authType !== 'none' &&
!claimed.aiProviderProfile.encryptedSecret
) {
throw new ConvexError('Selected AI provider is missing credentials.');
}
const profile = claimed.aiProviderProfile;
return {
job: claimed.job,
spoon: claimed.spoon,
openai: {
apiKey: decryptSecret(claimed.aiSettings.encryptedApiKey),
model: claimed.job.model,
reasoningEffort: claimed.job.reasoningEffort,
model: profile.defaultModel,
reasoningEffort: profile.reasoningEffort,
},
aiProviderProfile: {
id: profile._id,
name: profile.name,
provider: profile.provider,
authType: profile.authType,
secret: profile.encryptedSecret
? decryptSecret(profile.encryptedSecret)
: undefined,
baseUrl: profile.baseUrl,
model: profile.defaultModel,
reasoningEffort: profile.reasoningEffort,
},
agentSettings: claimed.agentSettings,
github: {
@@ -0,0 +1,273 @@
import { ConvexError, v } from 'convex/values';
import type { Doc, Id } from './_generated/dataModel';
import type { MutationCtx } from './_generated/server';
import { internalMutation, mutation, query } from './_generated/server';
import { getRequiredUserId, optionalText } from './model';
type AiProviderProfileWithDefault = Doc<'aiProviderProfiles'> & {
isDefault?: boolean;
};
const provider = v.union(
v.literal('openai'),
v.literal('anthropic'),
v.literal('google'),
v.literal('openrouter'),
v.literal('requesty'),
v.literal('litellm'),
v.literal('cloudflare_ai_gateway'),
v.literal('custom_openai_compatible'),
v.literal('opencode_openai_login'),
);
const authType = v.union(
v.literal('api_key'),
v.literal('opencode_auth_json'),
v.literal('none'),
);
const reasoningEffort = v.union(
v.literal('none'),
v.literal('minimal'),
v.literal('low'),
v.literal('medium'),
v.literal('high'),
v.literal('xhigh'),
);
const isConfigured = (profile: Doc<'aiProviderProfiles'>) =>
profile.authType === 'none' || Boolean(profile.encryptedSecret);
const defaultPatch = (isDefault: boolean) =>
({ isDefault }) as Partial<Doc<'aiProviderProfiles'>>;
const publicProfile = (
profile: AiProviderProfileWithDefault,
defaultProfileId?: Id<'aiProviderProfiles'>,
) => ({
_id: profile._id,
_creationTime: profile._creationTime,
name: profile.name,
provider: profile.provider,
authType: profile.authType,
secretPreview: profile.secretPreview,
baseUrl: profile.baseUrl,
defaultModel: profile.defaultModel,
modelOptions: profile.modelOptions,
reasoningEffort: profile.reasoningEffort,
enabled: profile.enabled,
configured: isConfigured(profile),
isDefault: profile._id === defaultProfileId,
createdAt: profile.createdAt,
updatedAt: profile.updatedAt,
});
const requireOwnedProfile = async (
ctx: MutationCtx,
profileId: Id<'aiProviderProfiles'>,
ownerId: Id<'users'>,
) => {
const profile = await ctx.db.get(profileId);
if (profile?.ownerId !== ownerId) {
throw new ConvexError('AI provider profile not found.');
}
return profile;
};
export const listMine = query({
args: {},
handler: async (ctx) => {
const ownerId = await getRequiredUserId(ctx);
const profiles = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.order('desc')
.collect();
const configuredProfiles = profiles.filter(
(profile) => profile.enabled && isConfigured(profile),
);
const explicitDefault = configuredProfiles.find(
(profile) => (profile as AiProviderProfileWithDefault).isDefault,
);
const defaultProfileId =
explicitDefault?._id ??
(configuredProfiles.length === 1
? configuredProfiles[0]?._id
: undefined);
return profiles.map((profile) => publicProfile(profile, defaultProfileId));
},
});
export const get = query({
args: { profileId: v.id('aiProviderProfiles') },
handler: async (ctx, { profileId }) => {
const ownerId = await getRequiredUserId(ctx);
const profile = await ctx.db.get(profileId);
if (profile?.ownerId !== ownerId) {
throw new ConvexError('AI provider profile not found.');
}
const profiles = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.collect();
const configuredProfiles = profiles.filter(
(item) => item.enabled && isConfigured(item),
);
const explicitDefault = configuredProfiles.find(
(item) => (item as AiProviderProfileWithDefault).isDefault,
);
const defaultProfileId =
explicitDefault?._id ??
(configuredProfiles.length === 1
? configuredProfiles[0]?._id
: undefined);
return publicProfile(profile, defaultProfileId);
},
});
export const upsertEncryptedInternal = internalMutation({
args: {
ownerId: v.id('users'),
profileId: v.optional(v.id('aiProviderProfiles')),
name: v.string(),
provider,
authType,
encryptedSecret: v.optional(v.string()),
secretPreview: v.optional(v.string()),
baseUrl: v.optional(v.string()),
defaultModel: v.string(),
modelOptions: v.optional(v.array(v.string())),
reasoningEffort,
enabled: v.boolean(),
},
handler: async (ctx, args) => {
const now = Date.now();
const patch: Partial<Doc<'aiProviderProfiles'>> = {
name: args.name.trim() || 'AI provider',
provider: args.provider,
authType: args.authType,
baseUrl: optionalText(args.baseUrl),
defaultModel: args.defaultModel.trim() || 'gpt-5.5',
modelOptions: args.modelOptions
?.map((model) => model.trim())
.filter(Boolean),
reasoningEffort: args.reasoningEffort,
enabled: args.enabled,
updatedAt: now,
};
if (args.encryptedSecret !== undefined) {
patch.encryptedSecret = args.encryptedSecret;
patch.secretPreview = args.secretPreview;
}
if (args.profileId) {
await requireOwnedProfile(ctx, args.profileId, args.ownerId);
await ctx.db.patch(args.profileId, patch);
return args.profileId;
}
const existingProfiles = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', args.ownerId))
.collect();
const shouldBecomeDefault =
args.enabled &&
(args.authType === 'none' || Boolean(args.encryptedSecret)) &&
existingProfiles.filter(
(profile) => profile.enabled && isConfigured(profile),
).length === 0;
const profileId = await ctx.db.insert('aiProviderProfiles', {
ownerId: args.ownerId,
name: patch.name ?? 'AI provider',
provider: args.provider,
authType: args.authType,
encryptedSecret: args.encryptedSecret,
secretPreview: args.secretPreview,
baseUrl: optionalText(args.baseUrl),
defaultModel: patch.defaultModel ?? 'gpt-5.5',
modelOptions: patch.modelOptions,
reasoningEffort: args.reasoningEffort,
enabled: args.enabled,
createdAt: now,
updatedAt: now,
});
if (shouldBecomeDefault) {
await ctx.db.patch(profileId, defaultPatch(true));
}
return profileId;
},
});
export const updateMetadata = mutation({
args: {
profileId: v.id('aiProviderProfiles'),
name: v.string(),
baseUrl: v.optional(v.string()),
defaultModel: v.string(),
modelOptions: v.optional(v.array(v.string())),
reasoningEffort,
enabled: v.boolean(),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
await requireOwnedProfile(ctx, args.profileId, ownerId);
await ctx.db.patch(args.profileId, {
name: args.name.trim() || 'AI provider',
baseUrl: optionalText(args.baseUrl),
defaultModel: args.defaultModel.trim() || 'gpt-5.5',
modelOptions: args.modelOptions
?.map((model) => model.trim())
.filter(Boolean),
reasoningEffort: args.reasoningEffort,
enabled: args.enabled,
updatedAt: Date.now(),
});
return { success: true };
},
});
export const remove = mutation({
args: { profileId: v.id('aiProviderProfiles') },
handler: async (ctx, { profileId }) => {
const ownerId = await getRequiredUserId(ctx);
const profile = (await requireOwnedProfile(
ctx,
profileId,
ownerId,
)) as AiProviderProfileWithDefault;
await ctx.db.delete(profileId);
if (profile.isDefault) {
const remaining = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.collect();
const nextDefault = remaining.find(
(item) => item.enabled && isConfigured(item),
);
if (nextDefault) {
await ctx.db.patch(nextDefault._id, defaultPatch(true));
}
}
return { success: true };
},
});
export const setDefault = mutation({
args: { profileId: v.id('aiProviderProfiles') },
handler: async (ctx, { profileId }) => {
const ownerId = await getRequiredUserId(ctx);
const target = await requireOwnedProfile(ctx, profileId, ownerId);
if (!target.enabled || !isConfigured(target)) {
throw new ConvexError('Default provider must be enabled and configured.');
}
const profiles = await ctx.db
.query('aiProviderProfiles')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.collect();
await Promise.all(
profiles.map((profile) =>
ctx.db.patch(profile._id, defaultPatch(profile._id === profileId)),
),
);
return { success: true };
},
});
@@ -0,0 +1,90 @@
'use node';
import { getAuthUserId } from '@convex-dev/auth/server';
import { ConvexError, v } from 'convex/values';
import type { Id } from './_generated/dataModel';
import type { ActionCtx } from './_generated/server';
import { internal } from './_generated/api';
import { action } from './_generated/server';
import { encryptSecret } from './secretCrypto';
const provider = v.union(
v.literal('openai'),
v.literal('anthropic'),
v.literal('google'),
v.literal('openrouter'),
v.literal('requesty'),
v.literal('litellm'),
v.literal('cloudflare_ai_gateway'),
v.literal('custom_openai_compatible'),
v.literal('opencode_openai_login'),
);
const authType = v.union(
v.literal('api_key'),
v.literal('opencode_auth_json'),
v.literal('none'),
);
const reasoningEffort = v.union(
v.literal('none'),
v.literal('minimal'),
v.literal('low'),
v.literal('medium'),
v.literal('high'),
v.literal('xhigh'),
);
const getRequiredUserId = async (ctx: ActionCtx): Promise<Id<'users'>> => {
const userId = await getAuthUserId(ctx);
if (!userId) throw new ConvexError('Not authenticated.');
return userId;
};
const previewSecret = (secret: string) => {
const trimmed = secret.trim();
if (!trimmed) return undefined;
if (trimmed.startsWith('{')) return 'auth json configured';
if (trimmed.length <= 10) return 'configured';
return `${trimmed.slice(0, 7)}...${trimmed.slice(-4)}`;
};
export const save = action({
args: {
profileId: v.optional(v.id('aiProviderProfiles')),
name: v.string(),
provider,
authType,
secret: v.optional(v.string()),
baseUrl: v.optional(v.string()),
defaultModel: v.string(),
modelOptions: v.optional(v.array(v.string())),
reasoningEffort,
enabled: v.boolean(),
},
handler: async (ctx, args): Promise<Id<'aiProviderProfiles'>> => {
const ownerId = await getRequiredUserId(ctx);
const secret = args.secret?.trim();
if (!args.profileId && args.authType !== 'none' && !secret) {
throw new ConvexError('A credential is required for this provider.');
}
return await ctx.runMutation(
internal.aiProviderProfiles.upsertEncryptedInternal,
{
ownerId,
profileId: args.profileId,
name: args.name,
provider: args.provider,
authType: args.authType,
encryptedSecret: secret ? encryptSecret(secret) : undefined,
secretPreview: secret ? previewSecret(secret) : undefined,
baseUrl: args.baseUrl,
defaultModel: args.defaultModel,
modelOptions: args.modelOptions,
reasoningEffort: args.reasoningEffort,
enabled: args.enabled,
},
);
},
});
-201
View File
@@ -1,201 +0,0 @@
'use node';
import { getAuthUserId } from '@convex-dev/auth/server';
import { ConvexError, v } from 'convex/values';
import type { Doc, Id } from './_generated/dataModel';
import type { ActionCtx } from './_generated/server';
import { internal } from './_generated/api';
import { action } from './_generated/server';
import { reviewUpstreamCompatibility } from './openaiClient';
import { decryptSecret } from './secretCrypto';
const getRequiredUserId = async (ctx: ActionCtx): Promise<Id<'users'>> => {
const userId = await getAuthUserId(ctx);
if (!userId) throw new ConvexError('Not authenticated.');
return userId;
};
export const reviewLatestUpstreamChanges = action({
args: { spoonId: v.id('spoons') },
handler: async (
ctx,
{ spoonId },
): Promise<{
reviewId: Id<'aiReviews'>;
risk: 'low' | 'medium' | 'high';
recommendedAction:
| 'sync'
| 'open_review_pr'
| 'manual_review'
| 'do_not_sync';
}> => {
const ownerId = await getRequiredUserId(ctx);
const spoon: Doc<'spoons'> = await ctx.runQuery(
internal.spoons.getOwnedForAction,
{
spoonId,
ownerId,
},
);
const [state, settings, upstreamCommits, forkCommits]: [
Doc<'spoonRepositoryStates'> | null,
Doc<'spoonSettings'> | null,
Doc<'spoonCommits'>[],
Doc<'spoonCommits'>[],
] = await Promise.all([
ctx.runQuery(internal.spoonState.getInternal, { spoonId, ownerId }),
ctx.runQuery(internal.spoonSettings.getInternal, { spoonId, ownerId }),
ctx.runQuery(internal.spoonCommits.listInternal, {
spoonId,
ownerId,
side: 'upstream',
limit: 80,
}),
ctx.runQuery(internal.spoonCommits.listInternal, {
spoonId,
ownerId,
side: 'fork',
limit: 80,
}),
]);
const aiSettings: Doc<'userAiSettings'> | null = await ctx.runQuery(
internal.aiSettings.getForUserInternal,
{ userId: ownerId },
);
if (!aiSettings?.encryptedApiKey) {
throw new ConvexError(
'Add your OpenAI API key in Settings before running AI review.',
);
}
const model = aiSettings.model;
const syncRunId: Id<'syncRuns'> = await ctx.runMutation(
internal.syncRuns.createInternal,
{
spoonId,
ownerId,
kind: 'ai_review',
status: 'running',
summary: 'Reviewing upstream changes with OpenAI.',
},
);
const reviewId: Id<'aiReviews'> = await ctx.runMutation(
internal.aiReviews.createInternal,
{
spoonId,
ownerId,
syncRunId,
model,
status: 'running',
reviewType: 'upstream_update',
inputSummary: `${upstreamCommits.length} upstream commit(s), ${forkCommits.length} fork-only commit(s).`,
},
);
try {
if (upstreamCommits.length === 0) {
await ctx.runMutation(internal.aiReviews.completeInternal, {
reviewId,
outputSummary: 'The fork is already up to date with upstream.',
risk: 'low',
compatible: true,
requiresHumanReview: false,
recommendedAction: 'sync',
potentialConflicts: [],
importantFiles: [],
reasoningSummary:
'No upstream-only commits are cached for this Spoon, so there is nothing to review.',
});
await Promise.all([
ctx.runMutation(internal.spoons.patchSyncFields, {
spoonId,
lastAiReviewId: reviewId,
}),
ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
status: 'clean',
aiAssessment: 'No upstream-only commits are waiting.',
}),
]);
return { reviewId, risk: 'low', recommendedAction: 'sync' };
}
const review = await reviewUpstreamCompatibility(
{
spoonName: spoon.name,
upstreamFullName:
state?.upstreamFullName ??
`${spoon.upstreamOwner}/${spoon.upstreamRepo}`,
forkFullName:
state?.forkFullName ??
`${spoon.forkOwner ?? 'unknown'}/${spoon.forkRepo ?? 'unknown'}`,
status: state?.status ?? spoon.syncStatus ?? 'unknown',
upstreamAheadBy: state?.upstreamAheadBy ?? spoon.upstreamAheadBy ?? 0,
forkAheadBy: state?.forkAheadBy ?? spoon.forkAheadBy ?? 0,
upstreamCommits: upstreamCommits.map((commit) => ({
sha: commit.sha,
message: commit.message,
authorName: commit.authorName,
committedAt: commit.committedAt,
})),
forkCommits: forkCommits.map((commit) => ({
sha: commit.sha,
message: commit.message,
authorName: commit.authorName,
committedAt: commit.committedAt,
})),
importantFilePatterns: settings?.importantFilePatterns,
ignoredFilePatterns: settings?.ignoredFilePatterns,
},
{
apiKey: decryptSecret(aiSettings.encryptedApiKey),
model,
reasoningEffort: aiSettings.reasoningEffort,
},
);
await ctx.runMutation(internal.aiReviews.completeInternal, {
reviewId,
outputSummary: review.summary,
risk: review.risk,
compatible: review.compatible,
requiresHumanReview: review.requiresHumanReview,
recommendedAction: review.recommendedAction,
potentialConflicts: review.potentialConflicts,
importantFiles: review.importantFiles,
reasoningSummary: review.reasoningSummary,
});
await Promise.all([
ctx.runMutation(internal.spoons.patchSyncFields, {
spoonId,
lastAiReviewId: reviewId,
}),
ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
status:
review.compatible && review.risk === 'low'
? 'clean'
: 'needs_review',
aiAssessment: review.summary,
}),
]);
return {
reviewId,
risk: review.risk,
recommendedAction: review.recommendedAction,
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await Promise.all([
ctx.runMutation(internal.aiReviews.failInternal, {
reviewId,
error: message,
}),
ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
status: 'failed',
error: message,
}),
]);
throw new ConvexError(message);
}
},
});
+108
View File
@@ -50,6 +50,7 @@ const refreshOwnedSpoon = async (
ownerId: Id<'users'>,
spoonId: Id<'spoons'>,
kind: 'manual_check' | 'scheduled_check' = 'manual_check',
allowAutoSync = true,
): Promise<{
success: boolean;
status: ReturnType<typeof toStatus>;
@@ -200,6 +201,87 @@ const refreshOwnedSpoon = async (
status: status === 'diverged' ? 'needs_review' : 'clean',
summary: `GitHub refresh complete: ${upstreamCompare.aheadBy} upstream commit(s), ${forkCompare.aheadBy} fork-only commit(s).`,
});
if (status === 'behind' && forkCompare.aheadBy === 0 && allowAutoSync) {
try {
await syncForkBranch(octokit, {
forkOwner,
forkRepo,
branch: resolvedForkBranch,
});
await ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
status: 'merged',
decision: 'auto_synced',
summary:
'Fork had no custom commits, so Spoon synced it with upstream automatically.',
});
return await refreshOwnedSpoon(ctx, ownerId, spoonId, kind, false);
} catch (syncError) {
const message =
syncError instanceof Error ? syncError.message : String(syncError);
const threadId = await ctx.runMutation(
internal.threads.createMaintenanceThread,
{
spoonId,
ownerId,
source: 'merge_conflict',
title: `Resolve upstream sync conflict for ${spoon.name}`,
summary: `GitHub refused the automatic upstream sync: ${message}`,
upstreamFrom: upstreamCompare.mergeBaseSha,
upstreamTo: upstreamCompare.headSha ?? `${Date.now()}`,
forkHeadAtCreation: forkCompare.headSha,
mergeBaseAtCreation:
upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha,
relatedSyncRunId: syncRunId,
jobType: 'conflict_resolution',
},
);
await ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
threadId,
status: 'conflict',
decision: 'thread_created',
error: message,
});
await ctx.runMutation(internal.spoons.patchSyncFields, {
spoonId,
syncStatus: 'conflict',
lastError: message,
});
return {
success: true,
status: 'unknown' as const,
upstreamAheadBy: upstreamCompare.aheadBy,
forkAheadBy: forkCompare.aheadBy,
};
}
}
if (status === 'diverged') {
const threadId = await ctx.runMutation(
internal.threads.createMaintenanceThread,
{
spoonId,
ownerId,
source: 'upstream_update',
title: `Review upstream changes for ${spoon.name}`,
summary: `Upstream has ${upstreamCompare.aheadBy} commit(s) and the fork has ${forkCompare.aheadBy} custom commit(s). Review whether upstream should be merged, ignored, or resolved in a draft PR.`,
upstreamFrom: upstreamCompare.mergeBaseSha,
upstreamTo: upstreamCompare.headSha ?? `${Date.now()}`,
forkHeadAtCreation: forkCompare.headSha,
mergeBaseAtCreation:
upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha,
relatedSyncRunId: syncRunId,
jobType: 'maintenance_review',
},
);
await ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
threadId,
decision: 'thread_created',
});
}
return {
success: true,
status,
@@ -301,6 +383,32 @@ export const syncForkWithUpstream = action({
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const conflict = message.toLowerCase().includes('conflict');
if (conflict) {
const threadId = await ctx.runMutation(
internal.threads.createMaintenanceThread,
{
spoonId,
ownerId,
source: 'merge_conflict',
title: `Resolve upstream sync conflict for ${spoon.name}`,
summary: `GitHub reported a conflict while syncing upstream into this fork: ${message}`,
upstreamTo:
state.upstreamHeadSha ??
spoon.lastUpstreamCommit ??
`${Date.now()}`,
forkHeadAtCreation: state.forkHeadSha ?? spoon.lastForkCommit,
mergeBaseAtCreation:
state.mergeBaseSha ?? spoon.lastMergeBaseCommit,
relatedSyncRunId: syncRunId,
jobType: 'conflict_resolution',
},
);
await ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
threadId,
decision: 'thread_created',
});
}
await ctx.runMutation(internal.syncRuns.patchInternal, {
syncRunId,
status: conflict ? 'conflict' : 'failed',
-160
View File
@@ -1,160 +0,0 @@
import { ConvexError } from 'convex/values';
import OpenAI from 'openai';
type ReviewRisk = 'low' | 'medium' | 'high';
type ReviewAction = 'sync' | 'open_review_pr' | 'manual_review' | 'do_not_sync';
export type AiCompatibilityReview = {
summary: string;
risk: ReviewRisk;
compatible: boolean;
requiresHumanReview: boolean;
recommendedAction: ReviewAction;
potentialConflicts: string[];
importantFiles: string[];
reasoningSummary: string;
};
export type ReviewInput = {
spoonName: string;
upstreamFullName: string;
forkFullName: string;
status: string;
upstreamAheadBy: number;
forkAheadBy: number;
upstreamCommits: {
sha: string;
message: string;
authorName?: string;
committedAt?: number;
}[];
forkCommits: {
sha: string;
message: string;
authorName?: string;
committedAt?: number;
}[];
importantFilePatterns?: string[];
ignoredFilePatterns?: string[];
};
export type ReasoningEffort =
| 'none'
| 'minimal'
| 'low'
| 'medium'
| 'high'
| 'xhigh';
export type OpenAiReviewSettings = {
apiKey: string;
model: string;
reasoningEffort: ReasoningEffort;
};
const reviewSchema = {
type: 'object',
additionalProperties: false,
properties: {
summary: { type: 'string' },
risk: { type: 'string', enum: ['low', 'medium', 'high'] },
compatible: { type: 'boolean' },
requiresHumanReview: { type: 'boolean' },
recommendedAction: {
type: 'string',
enum: ['sync', 'open_review_pr', 'manual_review', 'do_not_sync'],
},
potentialConflicts: { type: 'array', items: { type: 'string' } },
importantFiles: { type: 'array', items: { type: 'string' } },
reasoningSummary: { type: 'string' },
},
required: [
'summary',
'risk',
'compatible',
'requiresHumanReview',
'recommendedAction',
'potentialConflicts',
'importantFiles',
'reasoningSummary',
],
} as const;
const isReviewRisk = (value: unknown): value is ReviewRisk =>
value === 'low' || value === 'medium' || value === 'high';
const isReviewAction = (value: unknown): value is ReviewAction =>
value === 'sync' ||
value === 'open_review_pr' ||
value === 'manual_review' ||
value === 'do_not_sync';
const validateReview = (value: unknown): AiCompatibilityReview => {
if (!value || typeof value !== 'object') {
throw new ConvexError('OpenAI returned an invalid review payload.');
}
const record = value as Record<string, unknown>;
if (
typeof record.summary !== 'string' ||
!isReviewRisk(record.risk) ||
typeof record.compatible !== 'boolean' ||
typeof record.requiresHumanReview !== 'boolean' ||
!isReviewAction(record.recommendedAction) ||
!Array.isArray(record.potentialConflicts) ||
!Array.isArray(record.importantFiles) ||
typeof record.reasoningSummary !== 'string'
) {
throw new ConvexError('OpenAI review did not match the expected schema.');
}
return {
summary: record.summary,
risk: record.risk,
compatible: record.compatible,
requiresHumanReview: record.requiresHumanReview,
recommendedAction: record.recommendedAction,
potentialConflicts: record.potentialConflicts.filter(
(item): item is string => typeof item === 'string',
),
importantFiles: record.importantFiles.filter(
(item): item is string => typeof item === 'string',
),
reasoningSummary: record.reasoningSummary,
};
};
export const reviewUpstreamCompatibility = async (
input: ReviewInput,
settings: OpenAiReviewSettings,
): Promise<AiCompatibilityReview> => {
const response = await new OpenAI({
apiKey: settings.apiKey,
}).responses.create({
model: settings.model,
store: false,
reasoning: {
effort: settings.reasoningEffort,
},
input: [
{
role: 'system',
content:
'You are reviewing whether upstream changes can be safely brought into a maintained fork. You do not execute code. You do not claim tests passed. Treat fork-only commits as user customizations that must be preserved. If changed files overlap with fork-only changes, increase risk. If patch context is incomplete, say so and require human review. Prefer conservative recommendations. Return only the required structured output.',
},
{
role: 'user',
content: JSON.stringify(input, null, 2),
},
],
text: {
format: {
type: 'json_schema',
name: 'spoon_upstream_compatibility_review',
strict: true,
schema: reviewSchema,
},
},
});
const raw = response.output_text;
if (!raw) throw new ConvexError('OpenAI returned an empty review.');
return validateReview(JSON.parse(raw));
};
+232
View File
@@ -219,6 +219,7 @@ const applicationTables = {
syncRuns: defineTable({
spoonId: v.id('spoons'),
ownerId: v.id('users'),
threadId: v.optional(v.id('threads')),
kind: v.union(
v.literal('scheduled_check'),
v.literal('manual_check'),
@@ -241,6 +242,14 @@ const applicationTables = {
aiAssessment: v.optional(v.string()),
mergeRequestUrl: v.optional(v.string()),
error: v.optional(v.string()),
decision: v.optional(
v.union(
v.literal('auto_synced'),
v.literal('thread_created'),
v.literal('ignored'),
v.literal('failed'),
),
),
createdAt: v.number(),
updatedAt: v.number(),
})
@@ -339,6 +348,46 @@ const applicationTables = {
})
.index('by_user', ['userId'])
.index('by_user_provider', ['userId', 'provider']),
aiProviderProfiles: defineTable({
ownerId: v.id('users'),
name: v.string(),
provider: v.union(
v.literal('openai'),
v.literal('anthropic'),
v.literal('google'),
v.literal('openrouter'),
v.literal('requesty'),
v.literal('litellm'),
v.literal('cloudflare_ai_gateway'),
v.literal('custom_openai_compatible'),
v.literal('opencode_openai_login'),
),
authType: v.union(
v.literal('api_key'),
v.literal('opencode_auth_json'),
v.literal('none'),
),
encryptedSecret: v.optional(v.string()),
secretPreview: v.optional(v.string()),
baseUrl: v.optional(v.string()),
defaultModel: v.string(),
modelOptions: v.optional(v.array(v.string())),
reasoningEffort: v.union(
v.literal('none'),
v.literal('minimal'),
v.literal('low'),
v.literal('medium'),
v.literal('high'),
v.literal('xhigh'),
),
enabled: v.boolean(),
isDefault: v.optional(v.boolean()),
createdAt: v.number(),
updatedAt: v.number(),
})
.index('by_owner', ['ownerId'])
.index('by_owner_provider', ['ownerId', 'provider'])
.index('by_owner_enabled', ['ownerId', 'enabled']),
agentRequests: defineTable({
spoonId: v.id('spoons'),
ownerId: v.id('users'),
@@ -395,6 +444,9 @@ const applicationTables = {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
enabled: v.boolean(),
runtime: v.optional(
v.union(v.literal('opencode'), v.literal('openai_direct')),
),
defaultBaseBranch: v.optional(v.string()),
branchPrefix: v.string(),
installCommand: v.optional(v.string()),
@@ -411,6 +463,20 @@ const applicationTables = {
),
maxJobDurationMs: v.number(),
maxOutputBytes: v.number(),
envFilePath: v.optional(
v.union(
v.literal('.env'),
v.literal('.env.local'),
v.literal('.env.production'),
v.literal('.env.production.local'),
v.literal('custom'),
),
),
customEnvFilePath: v.optional(v.string()),
materializeEnvFileByDefault: v.optional(v.boolean()),
autoDetectCommands: v.optional(v.boolean()),
allowUserFileEditing: v.optional(v.boolean()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
createdAt: v.number(),
updatedAt: v.number(),
})
@@ -420,6 +486,14 @@ const applicationTables = {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
agentRequestId: v.id('agentRequests'),
threadId: v.optional(v.id('threads')),
jobType: v.optional(
v.union(
v.literal('user_change'),
v.literal('maintenance_review'),
v.literal('conflict_resolution'),
),
),
status: v.union(
v.literal('queued'),
v.literal('claimed'),
@@ -433,8 +507,29 @@ const applicationTables = {
v.literal('timed_out'),
),
prompt: v.string(),
runtime: v.optional(
v.union(v.literal('openai_direct'), v.literal('opencode')),
),
workspaceStatus: v.optional(
v.union(
v.literal('not_started'),
v.literal('starting'),
v.literal('active'),
v.literal('idle'),
v.literal('stopped'),
v.literal('expired'),
v.literal('failed'),
),
),
baseBranch: v.string(),
workBranch: v.string(),
opencodeSessionId: v.optional(v.string()),
containerId: v.optional(v.string()),
workspaceUrl: v.optional(v.string()),
workspaceExpiresAt: v.optional(v.number()),
lastHeartbeatAt: v.optional(v.number()),
envFilePath: v.optional(v.string()),
materializeEnvFile: v.optional(v.boolean()),
githubInstallationId: v.optional(v.string()),
forkOwner: v.string(),
forkRepo: v.string(),
@@ -442,6 +537,7 @@ const applicationTables = {
upstreamOwner: v.string(),
upstreamRepo: v.string(),
selectedSecretIds: v.array(v.id('spoonSecrets')),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
model: v.string(),
reasoningEffort: v.union(
v.literal('none'),
@@ -468,6 +564,50 @@ const applicationTables = {
.index('by_request', ['agentRequestId'])
.index('by_status', ['status'])
.index('by_claim', ['status', 'createdAt']),
agentJobMessages: defineTable({
jobId: v.id('agentJobs'),
spoonId: v.id('spoons'),
ownerId: v.id('users'),
role: v.union(
v.literal('user'),
v.literal('assistant'),
v.literal('system'),
v.literal('tool'),
),
content: v.string(),
status: v.union(
v.literal('queued'),
v.literal('streaming'),
v.literal('completed'),
v.literal('failed'),
),
metadata: v.optional(v.string()),
createdAt: v.number(),
updatedAt: v.number(),
})
.index('by_job', ['jobId'])
.index('by_owner', ['ownerId']),
agentWorkspaceChanges: defineTable({
jobId: v.id('agentJobs'),
spoonId: v.id('spoons'),
ownerId: v.id('users'),
path: v.string(),
source: v.union(
v.literal('user'),
v.literal('agent'),
v.literal('command'),
),
changeType: v.union(
v.literal('added'),
v.literal('modified'),
v.literal('deleted'),
v.literal('renamed'),
),
diff: v.optional(v.string()),
createdAt: v.number(),
})
.index('by_job', ['jobId'])
.index('by_path', ['jobId', 'path']),
agentJobEvents: defineTable({
jobId: v.id('agentJobs'),
spoonId: v.id('spoons'),
@@ -523,6 +663,98 @@ const applicationTables = {
.index('by_job', ['jobId'])
.index('by_spoon', ['spoonId'])
.index('by_owner', ['ownerId']),
threads: defineTable({
ownerId: v.id('users'),
spoonId: v.optional(v.id('spoons')),
title: v.string(),
summary: v.optional(v.string()),
source: v.union(
v.literal('user_request'),
v.literal('upstream_update'),
v.literal('merge_conflict'),
v.literal('manual_review'),
v.literal('system'),
),
status: v.union(
v.literal('open'),
v.literal('queued'),
v.literal('running'),
v.literal('waiting_for_user'),
v.literal('changes_ready'),
v.literal('draft_pr_opened'),
v.literal('resolved'),
v.literal('ignored'),
v.literal('failed'),
v.literal('cancelled'),
),
priority: v.union(v.literal('low'), v.literal('normal'), v.literal('high')),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.optional(v.string()),
forkHeadAtCreation: v.optional(v.string()),
mergeBaseAtCreation: v.optional(v.string()),
relatedSyncRunId: v.optional(v.id('syncRuns')),
relatedAgentRequestId: v.optional(v.id('agentRequests')),
latestAgentJobId: v.optional(v.id('agentJobs')),
maintenanceOutcome: v.optional(
v.union(
v.literal('auto_synced'),
v.literal('sync_recommended'),
v.literal('ignored'),
v.literal('review_pr_recommended'),
v.literal('manual_review_required'),
v.literal('conflict_resolution_required'),
v.literal('failed'),
v.literal('unknown'),
),
),
ignoredCommitShas: v.optional(v.array(v.string())),
ignoredReason: v.optional(v.string()),
createdAt: v.number(),
updatedAt: v.number(),
resolvedAt: v.optional(v.number()),
})
.index('by_owner', ['ownerId'])
.index('by_owner_status', ['ownerId', 'status'])
.index('by_spoon', ['spoonId'])
.index('by_source', ['ownerId', 'source'])
.index('by_created', ['createdAt']),
threadMessages: defineTable({
threadId: v.id('threads'),
ownerId: v.id('users'),
spoonId: v.optional(v.id('spoons')),
role: v.union(
v.literal('user'),
v.literal('assistant'),
v.literal('system'),
v.literal('tool'),
),
content: v.string(),
status: v.union(
v.literal('queued'),
v.literal('streaming'),
v.literal('completed'),
v.literal('failed'),
),
metadata: v.optional(v.string()),
createdAt: v.number(),
updatedAt: v.number(),
})
.index('by_thread', ['threadId'])
.index('by_owner', ['ownerId']),
ignoredUpstreamChanges: defineTable({
spoonId: v.id('spoons'),
ownerId: v.id('users'),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.string(),
commitShas: v.array(v.string()),
reason: v.string(),
decidedBy: v.union(v.literal('agent'), v.literal('user')),
threadId: v.optional(v.id('threads')),
createdAt: v.number(),
})
.index('by_spoon', ['spoonId'])
.index('by_owner', ['ownerId'])
.index('by_upstream_to', ['spoonId', 'upstreamTo']),
};
export default defineSchema({
+52 -9
View File
@@ -13,21 +13,28 @@ const reasoningEffort = v.union(
v.literal('xhigh'),
);
const agentModel = v.union(
v.literal('gpt-5.1-codex'),
v.literal('gpt-5.5'),
v.literal('gpt-5.5-pro'),
v.literal('gpt-5.4'),
v.literal('gpt-5.4-mini'),
const runtime = v.literal('opencode');
const envFilePath = v.union(
v.literal('.env'),
v.literal('.env.local'),
v.literal('.env.production'),
v.literal('.env.production.local'),
v.literal('custom'),
);
const defaults = {
enabled: true,
runtime: 'opencode' as const,
branchPrefix: 'spoon/agent',
agentModel: 'gpt-5.1-codex',
reasoningEffort: 'high' as const,
agentModel: '',
reasoningEffort: 'medium' as const,
maxJobDurationMs: 1_800_000,
maxOutputBytes: 200_000,
envFilePath: '.env.local' as const,
materializeEnvFileByDefault: false,
autoDetectCommands: true,
allowUserFileEditing: true,
};
export const getForSpoon = query({
@@ -60,10 +67,18 @@ export const update = mutation({
installCommand: v.optional(v.string()),
checkCommand: v.optional(v.string()),
testCommand: v.optional(v.string()),
agentModel: v.optional(agentModel),
runtime: v.optional(runtime),
agentModel: v.optional(v.string()),
reasoningEffort: v.optional(reasoningEffort),
maxJobDurationMs: v.optional(v.number()),
maxOutputBytes: v.optional(v.number()),
envFilePath: v.optional(envFilePath),
customEnvFilePath: v.optional(v.string()),
materializeEnvFileByDefault: v.optional(v.boolean()),
autoDetectCommands: v.optional(v.boolean()),
allowUserFileEditing: v.optional(v.boolean()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
clearAiProviderProfile: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
@@ -107,6 +122,9 @@ export const update = mutation({
if (args.testCommand !== undefined) {
patch.testCommand = optionalText(args.testCommand);
}
if (args.runtime !== undefined) {
patch.runtime = 'opencode';
}
if (args.agentModel !== undefined) {
patch.agentModel = optionalText(args.agentModel) ?? defaults.agentModel;
}
@@ -119,6 +137,31 @@ export const update = mutation({
if (args.maxOutputBytes !== undefined) {
patch.maxOutputBytes = Math.max(10_000, args.maxOutputBytes);
}
if (args.envFilePath !== undefined) {
patch.envFilePath = args.envFilePath;
}
if (args.customEnvFilePath !== undefined) {
patch.customEnvFilePath = optionalText(args.customEnvFilePath);
}
if (args.materializeEnvFileByDefault !== undefined) {
patch.materializeEnvFileByDefault = args.materializeEnvFileByDefault;
}
if (args.autoDetectCommands !== undefined) {
patch.autoDetectCommands = args.autoDetectCommands;
}
if (args.allowUserFileEditing !== undefined) {
patch.allowUserFileEditing = args.allowUserFileEditing;
}
if (args.aiProviderProfileId !== undefined) {
const profile = await ctx.db.get(args.aiProviderProfileId);
if (profile?.ownerId !== ownerId) {
throw new Error('AI provider profile not found.');
}
patch.aiProviderProfileId = args.aiProviderProfileId;
}
if (args.clearAiProviderProfile) {
patch.aiProviderProfileId = undefined;
}
await ctx.db.patch(settings._id, patch);
return { success: true };
+61 -29
View File
@@ -100,33 +100,59 @@ export const getDetails = query({
handler: async (ctx, { spoonId }) => {
const ownerId = await getRequiredUserId(ctx);
const spoon = await getOwnedSpoon(ctx, spoonId, ownerId);
const [state, settings, latestReview, recentRuns, agentRequests] =
await Promise.all([
ctx.db
.query('spoonRepositoryStates')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.first(),
ctx.db
.query('spoonSettings')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.first(),
ctx.db
.query('aiReviews')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.first(),
ctx.db
.query('syncRuns')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(10),
ctx.db
.query('agentRequests')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(10),
]);
return { spoon, state, settings, latestReview, recentRuns, agentRequests };
const [
state,
settings,
latestReview,
recentRuns,
agentRequests,
ignoredChanges,
] = await Promise.all([
ctx.db
.query('spoonRepositoryStates')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.first(),
ctx.db
.query('spoonSettings')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.first(),
ctx.db
.query('aiReviews')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.first(),
ctx.db
.query('syncRuns')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(10),
ctx.db
.query('agentRequests')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(10),
ctx.db
.query('ignoredUpstreamChanges')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.collect(),
]);
const ignoredShas = new Set(
ignoredChanges.flatMap((change) => change.commitShas),
);
const effectiveUpstreamAheadBy = Math.max(
0,
(state?.upstreamAheadBy ?? spoon.upstreamAheadBy ?? 0) - ignoredShas.size,
);
return {
spoon,
state,
settings,
latestReview,
recentRuns,
agentRequests,
ignoredChanges,
effectiveUpstreamAheadBy,
};
},
});
@@ -210,12 +236,18 @@ export const createManual = mutation({
spoonId,
ownerId,
enabled: true,
runtime: 'opencode',
defaultBaseBranch: forkDefaultBranch ?? args.upstreamDefaultBranch,
branchPrefix: 'spoon/agent',
agentModel: 'gpt-5.1-codex',
reasoningEffort: 'high',
agentModel: '',
reasoningEffort: 'medium',
maxJobDurationMs: 1_800_000,
maxOutputBytes: 200_000,
envFilePath: '.env.local',
materializeEnvFileByDefault: false,
autoDetectCommands: true,
allowUserFileEditing: true,
aiProviderProfileId: undefined,
createdAt: now,
updatedAt: now,
});
+13
View File
@@ -22,6 +22,13 @@ const syncStatus = v.union(
v.literal('merged'),
);
const syncDecision = v.union(
v.literal('auto_synced'),
v.literal('thread_created'),
v.literal('ignored'),
v.literal('failed'),
);
export const listRecent = query({
args: { limit: v.optional(v.number()) },
handler: async (ctx, { limit }) => {
@@ -52,6 +59,7 @@ export const createInternal = internalMutation({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
threadId: v.optional(v.id('threads')),
kind: syncKind,
status: syncStatus,
upstreamFrom: v.optional(v.string()),
@@ -60,6 +68,7 @@ export const createInternal = internalMutation({
aiAssessment: v.optional(v.string()),
mergeRequestUrl: v.optional(v.string()),
error: v.optional(v.string()),
decision: v.optional(syncDecision),
},
handler: async (ctx, args): Promise<Id<'syncRuns'>> => {
const now = Date.now();
@@ -74,6 +83,7 @@ export const createInternal = internalMutation({
export const patchInternal = internalMutation({
args: {
syncRunId: v.id('syncRuns'),
threadId: v.optional(v.id('threads')),
status: v.optional(syncStatus),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.optional(v.string()),
@@ -81,9 +91,11 @@ export const patchInternal = internalMutation({
aiAssessment: v.optional(v.string()),
mergeRequestUrl: v.optional(v.string()),
error: v.optional(v.string()),
decision: v.optional(syncDecision),
},
handler: async (ctx, args) => {
const patch: Partial<Doc<'syncRuns'>> = { updatedAt: Date.now() };
if (args.threadId !== undefined) patch.threadId = args.threadId;
if (args.status !== undefined) patch.status = args.status;
if (args.upstreamFrom !== undefined) patch.upstreamFrom = args.upstreamFrom;
if (args.upstreamTo !== undefined) patch.upstreamTo = args.upstreamTo;
@@ -95,6 +107,7 @@ export const patchInternal = internalMutation({
patch.mergeRequestUrl = args.mergeRequestUrl;
}
if (args.error !== undefined) patch.error = args.error;
if (args.decision !== undefined) patch.decision = args.decision;
await ctx.db.patch(args.syncRunId, patch);
return { success: true };
},
+458
View File
@@ -0,0 +1,458 @@
import { ConvexError, v } from 'convex/values';
import type { Doc } from './_generated/dataModel';
import { internal } from './_generated/api';
import {
internalMutation,
internalQuery,
mutation,
query,
} from './_generated/server';
import {
getOwnedSpoon,
getRequiredUserId,
optionalText,
requireText,
} from './model';
const threadSource = v.union(
v.literal('user_request'),
v.literal('upstream_update'),
v.literal('merge_conflict'),
v.literal('manual_review'),
v.literal('system'),
);
const threadStatus = v.union(
v.literal('open'),
v.literal('queued'),
v.literal('running'),
v.literal('waiting_for_user'),
v.literal('changes_ready'),
v.literal('draft_pr_opened'),
v.literal('resolved'),
v.literal('ignored'),
v.literal('failed'),
v.literal('cancelled'),
);
const maintenanceOutcome = v.union(
v.literal('auto_synced'),
v.literal('sync_recommended'),
v.literal('ignored'),
v.literal('review_pr_recommended'),
v.literal('manual_review_required'),
v.literal('conflict_resolution_required'),
v.literal('failed'),
v.literal('unknown'),
);
const messageRole = v.union(
v.literal('user'),
v.literal('assistant'),
v.literal('system'),
v.literal('tool'),
);
const messageStatus = v.union(
v.literal('queued'),
v.literal('streaming'),
v.literal('completed'),
v.literal('failed'),
);
const titleFromPrompt = (prompt: string) => {
const firstLine = prompt.trim().split('\n')[0] ?? 'Thread';
return firstLine.length > 80 ? `${firstLine.slice(0, 77)}...` : firstLine;
};
const publicThread = (thread: Doc<'threads'>) => thread;
export const listMine = query({
args: {
status: v.optional(v.union(threadStatus, v.literal('all'))),
source: v.optional(v.union(threadSource, v.literal('all'))),
spoonId: v.optional(v.id('spoons')),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
const threads = await ctx.db
.query('threads')
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
.order('desc')
.take(args.limit ?? 50);
return threads.filter((thread) => {
if (
args.status &&
args.status !== 'all' &&
thread.status !== args.status
) {
return false;
}
if (
args.source &&
args.source !== 'all' &&
thread.source !== args.source
) {
return false;
}
if (args.spoonId && thread.spoonId !== args.spoonId) return false;
return true;
});
},
});
export const listForSpoon = query({
args: { spoonId: v.id('spoons'), limit: v.optional(v.number()) },
handler: async (ctx, { spoonId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
await getOwnedSpoon(ctx, spoonId, ownerId);
return await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.take(limit ?? 25);
},
});
export const get = query({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
const spoon = thread.spoonId ? await ctx.db.get(thread.spoonId) : null;
const job = thread.latestAgentJobId
? await ctx.db.get(thread.latestAgentJobId)
: null;
return {
thread: publicThread(thread),
spoon: spoon?.ownerId === ownerId ? spoon : null,
latestJob: job?.ownerId === ownerId ? job : null,
};
},
});
export const listMessages = query({
args: { threadId: v.id('threads'), limit: v.optional(v.number()) },
handler: async (ctx, { threadId, limit }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
return await ctx.db
.query('threadMessages')
.withIndex('by_thread', (q) => q.eq('threadId', threadId))
.order('asc')
.take(limit ?? 200);
},
});
export const createUserThread = mutation({
args: {
spoonId: v.id('spoons'),
title: v.optional(v.string()),
prompt: v.string(),
baseBranch: v.optional(v.string()),
requestedBranchName: v.optional(v.string()),
materializeEnvFile: v.optional(v.boolean()),
envFilePath: v.optional(v.string()),
aiProviderProfileId: v.optional(v.id('aiProviderProfiles')),
},
handler: async (ctx, args) => {
const ownerId = await getRequiredUserId(ctx);
await getOwnedSpoon(ctx, args.spoonId, ownerId);
const prompt = requireText(args.prompt, 'Prompt');
const now = Date.now();
const threadId = await ctx.db.insert('threads', {
ownerId,
spoonId: args.spoonId,
title: optionalText(args.title) ?? titleFromPrompt(prompt),
summary: prompt,
source: 'user_request',
status: 'open',
priority: 'normal',
createdAt: now,
updatedAt: now,
});
await ctx.db.insert('threadMessages', {
threadId,
ownerId,
spoonId: args.spoonId,
role: 'user',
content: prompt,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.agentJobs.createForThreadInternal,
{
threadId,
ownerId,
jobType: 'user_change',
baseBranch: args.baseBranch,
requestedBranchName: args.requestedBranchName,
materializeEnvFile: args.materializeEnvFile,
envFilePath: args.envFilePath,
aiProviderProfileId: args.aiProviderProfileId,
},
);
return threadId;
},
});
export const appendUserMessage = mutation({
args: { threadId: v.id('threads'), content: v.string() },
handler: async (ctx, { threadId, content }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
const now = Date.now();
return await ctx.db.insert('threadMessages', {
threadId,
ownerId,
spoonId: thread.spoonId,
role: 'user',
content: requireText(content, 'Message'),
status: 'queued',
createdAt: now,
updatedAt: now,
});
},
});
export const cancel = mutation({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
await ctx.db.patch(threadId, {
status: 'cancelled',
updatedAt: Date.now(),
resolvedAt: Date.now(),
});
return { success: true };
},
});
export const markResolved = mutation({
args: { threadId: v.id('threads') },
handler: async (ctx, { threadId }) => {
const ownerId = await getRequiredUserId(ctx);
const thread = await ctx.db.get(threadId);
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
await ctx.db.patch(threadId, {
status: 'resolved',
updatedAt: Date.now(),
resolvedAt: Date.now(),
});
return { success: true };
},
});
export const findOpenMaintenanceThread = internalQuery({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
upstreamTo: v.string(),
},
handler: async (ctx, { spoonId, ownerId, upstreamTo }) => {
const threads = await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', spoonId))
.order('desc')
.collect();
return (
threads.find(
(thread) =>
thread.ownerId === ownerId &&
thread.upstreamTo === upstreamTo &&
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
thread.status,
),
) ?? null
);
},
});
export const createMaintenanceThread = internalMutation({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
source: v.union(v.literal('upstream_update'), v.literal('merge_conflict')),
title: v.string(),
summary: v.string(),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.string(),
forkHeadAtCreation: v.optional(v.string()),
mergeBaseAtCreation: v.optional(v.string()),
relatedSyncRunId: v.optional(v.id('syncRuns')),
jobType: v.union(
v.literal('maintenance_review'),
v.literal('conflict_resolution'),
),
},
handler: async (ctx, args) => {
const now = Date.now();
const existing = await ctx.db
.query('threads')
.withIndex('by_spoon', (q) => q.eq('spoonId', args.spoonId))
.order('desc')
.collect()
.then((threads) =>
threads.find(
(thread) =>
thread.ownerId === args.ownerId &&
thread.upstreamTo === args.upstreamTo &&
!['resolved', 'ignored', 'failed', 'cancelled'].includes(
thread.status,
),
),
);
if (existing) {
await ctx.db.insert('threadMessages', {
threadId: existing._id,
ownerId: args.ownerId,
spoonId: args.spoonId,
role: 'system',
content: args.summary,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.db.patch(existing._id, {
relatedSyncRunId: args.relatedSyncRunId,
updatedAt: now,
});
return existing._id;
}
const threadId = await ctx.db.insert('threads', {
ownerId: args.ownerId,
spoonId: args.spoonId,
title: args.title,
summary: args.summary,
source: args.source,
status: 'open',
priority: args.source === 'merge_conflict' ? 'high' : 'normal',
upstreamFrom: args.upstreamFrom,
upstreamTo: args.upstreamTo,
forkHeadAtCreation: args.forkHeadAtCreation,
mergeBaseAtCreation: args.mergeBaseAtCreation,
relatedSyncRunId: args.relatedSyncRunId,
maintenanceOutcome: 'unknown',
createdAt: now,
updatedAt: now,
});
await ctx.db.insert('threadMessages', {
threadId,
ownerId: args.ownerId,
spoonId: args.spoonId,
role: 'system',
content: args.summary,
status: 'completed',
createdAt: now,
updatedAt: now,
});
await ctx.scheduler.runAfter(
0,
internal.agentJobs.createForThreadInternal,
{
threadId,
ownerId: args.ownerId,
jobType: args.jobType,
},
);
return threadId;
},
});
export const patchThreadInternal = internalMutation({
args: {
threadId: v.id('threads'),
status: v.optional(threadStatus),
summary: v.optional(v.string()),
maintenanceOutcome: v.optional(maintenanceOutcome),
ignoredCommitShas: v.optional(v.array(v.string())),
ignoredReason: v.optional(v.string()),
latestAgentJobId: v.optional(v.id('agentJobs')),
},
handler: async (ctx, args) => {
const thread = await ctx.db.get(args.threadId);
if (!thread) throw new ConvexError('Thread not found.');
const patch: Partial<Doc<'threads'>> = { updatedAt: Date.now() };
if (args.status !== undefined) patch.status = args.status;
if (args.summary !== undefined) patch.summary = optionalText(args.summary);
if (args.maintenanceOutcome !== undefined) {
patch.maintenanceOutcome = args.maintenanceOutcome;
}
if (args.ignoredCommitShas !== undefined) {
patch.ignoredCommitShas = args.ignoredCommitShas;
}
if (args.ignoredReason !== undefined) {
patch.ignoredReason = optionalText(args.ignoredReason);
}
if (args.latestAgentJobId !== undefined) {
patch.latestAgentJobId = args.latestAgentJobId;
}
if (
args.status &&
['resolved', 'ignored', 'failed', 'cancelled'].includes(args.status)
) {
patch.resolvedAt = Date.now();
}
await ctx.db.patch(args.threadId, patch);
return { success: true };
},
});
export const appendMessageInternal = internalMutation({
args: {
threadId: v.id('threads'),
ownerId: v.id('users'),
role: messageRole,
content: v.string(),
status: v.optional(messageStatus),
metadata: v.optional(v.string()),
},
handler: async (ctx, args) => {
const thread = await ctx.db.get(args.threadId);
if (thread?.ownerId !== args.ownerId) {
throw new ConvexError('Thread not found.');
}
const now = Date.now();
return await ctx.db.insert('threadMessages', {
threadId: args.threadId,
ownerId: args.ownerId,
spoonId: thread.spoonId,
role: args.role,
content: args.content,
status: args.status ?? 'completed',
metadata: optionalText(args.metadata),
createdAt: now,
updatedAt: now,
});
},
});
export const recordIgnoredUpstreamChange = internalMutation({
args: {
spoonId: v.id('spoons'),
ownerId: v.id('users'),
upstreamFrom: v.optional(v.string()),
upstreamTo: v.string(),
commitShas: v.array(v.string()),
reason: v.string(),
decidedBy: v.union(v.literal('agent'), v.literal('user')),
threadId: v.optional(v.id('threads')),
},
handler: async (ctx, args) => {
return await ctx.db.insert('ignoredUpstreamChanges', {
...args,
createdAt: Date.now(),
});
},
});