Clean up old stuff & fix ui errors
This commit is contained in:
@@ -160,6 +160,27 @@ const requireWorkerToken = (workerToken: string) => {
|
||||
if (workerToken !== expected) throw new ConvexError('Invalid worker token.');
|
||||
};
|
||||
|
||||
const mergeMessageMetadata = (
|
||||
metadata: string | undefined,
|
||||
patch: Record<string, unknown>,
|
||||
) => {
|
||||
if (!metadata) return JSON.stringify(patch);
|
||||
try {
|
||||
return JSON.stringify({ ...(JSON.parse(metadata) as object), ...patch });
|
||||
} catch {
|
||||
return JSON.stringify({ note: metadata, ...patch });
|
||||
}
|
||||
};
|
||||
|
||||
const parseMessageMetadata = (metadata: string | undefined) => {
|
||||
if (!metadata) return null;
|
||||
try {
|
||||
return JSON.parse(metadata) as Record<string, unknown>;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const slugify = (value: string) =>
|
||||
value
|
||||
.toLowerCase()
|
||||
@@ -736,6 +757,7 @@ export const getWorkspaceUiState = query({
|
||||
activeFilePath: undefined,
|
||||
vimEnabled: false,
|
||||
expandedDirectoryPaths: [],
|
||||
agentThreadWidth: 420,
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
}
|
||||
@@ -750,6 +772,7 @@ export const patchWorkspaceUiState = mutation({
|
||||
activeFilePath: v.optional(v.string()),
|
||||
vimEnabled: v.optional(v.boolean()),
|
||||
expandedDirectoryPaths: v.optional(v.array(v.string())),
|
||||
agentThreadWidth: v.optional(v.number()),
|
||||
},
|
||||
handler: async (ctx, args) => {
|
||||
const ownerId = await getRequiredUserId(ctx);
|
||||
@@ -780,6 +803,14 @@ export const patchWorkspaceUiState = mutation({
|
||||
),
|
||||
}
|
||||
: {}),
|
||||
...(args.agentThreadWidth !== undefined
|
||||
? {
|
||||
agentThreadWidth: Math.min(
|
||||
Math.max(Math.round(args.agentThreadWidth), 320),
|
||||
720,
|
||||
),
|
||||
}
|
||||
: {}),
|
||||
updatedAt: now,
|
||||
};
|
||||
if (existing) {
|
||||
@@ -794,6 +825,7 @@ export const patchWorkspaceUiState = mutation({
|
||||
activeFilePath: patch.activeFilePath,
|
||||
vimEnabled: patch.vimEnabled ?? false,
|
||||
expandedDirectoryPaths: patch.expandedDirectoryPaths ?? [],
|
||||
agentThreadWidth: patch.agentThreadWidth ?? 420,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
@@ -1537,7 +1569,9 @@ export const appendMessage = mutation({
|
||||
role: args.role,
|
||||
content: args.content,
|
||||
status: args.status,
|
||||
metadata: args.metadata,
|
||||
metadata: mergeMessageMetadata(args.metadata, {
|
||||
agentJobMessageId: messageId,
|
||||
}),
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
@@ -1570,6 +1604,32 @@ export const updateMessage = mutation({
|
||||
if (args.status !== undefined) patch.status = args.status;
|
||||
if (args.metadata !== undefined) patch.metadata = args.metadata;
|
||||
await ctx.db.patch(args.messageId, patch);
|
||||
const threadId = job.threadId;
|
||||
if (threadId) {
|
||||
const threadMessages = await ctx.db
|
||||
.query('threadMessages')
|
||||
.withIndex('by_thread', (q) => q.eq('threadId', threadId))
|
||||
.order('desc')
|
||||
.take(300);
|
||||
const mirrored = threadMessages.find(
|
||||
(threadMessage) =>
|
||||
parseMessageMetadata(threadMessage.metadata)?.agentJobMessageId ===
|
||||
args.messageId,
|
||||
);
|
||||
if (mirrored) {
|
||||
const threadPatch: Partial<Doc<'threadMessages'>> = {
|
||||
updatedAt: patch.updatedAt,
|
||||
};
|
||||
if (args.content !== undefined) threadPatch.content = args.content;
|
||||
if (args.status !== undefined) threadPatch.status = args.status;
|
||||
if (args.metadata !== undefined) {
|
||||
threadPatch.metadata = mergeMessageMetadata(args.metadata, {
|
||||
agentJobMessageId: args.messageId,
|
||||
});
|
||||
}
|
||||
await ctx.db.patch(mirrored._id, threadPatch);
|
||||
}
|
||||
}
|
||||
return { success: true };
|
||||
},
|
||||
});
|
||||
|
||||
@@ -444,6 +444,7 @@ const applicationTables = {
|
||||
spoonId: v.id('spoons'),
|
||||
ownerId: v.id('users'),
|
||||
enabled: v.boolean(),
|
||||
// Legacy records may contain openai_direct. New writes use opencode only.
|
||||
runtime: v.optional(
|
||||
v.union(v.literal('opencode'), v.literal('openai_direct')),
|
||||
),
|
||||
@@ -507,6 +508,7 @@ const applicationTables = {
|
||||
v.literal('timed_out'),
|
||||
),
|
||||
prompt: v.string(),
|
||||
// Legacy jobs may contain openai_direct. New jobs use opencode only.
|
||||
runtime: v.optional(
|
||||
v.union(v.literal('openai_direct'), v.literal('opencode')),
|
||||
),
|
||||
@@ -603,6 +605,7 @@ const applicationTables = {
|
||||
activeFilePath: v.optional(v.string()),
|
||||
vimEnabled: v.boolean(),
|
||||
expandedDirectoryPaths: v.array(v.string()),
|
||||
agentThreadWidth: v.optional(v.number()),
|
||||
createdAt: v.number(),
|
||||
updatedAt: v.number(),
|
||||
})
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { ConvexError, v } from 'convex/values';
|
||||
|
||||
import type { Doc } from './_generated/dataModel';
|
||||
import type { MutationCtx } from './_generated/server';
|
||||
import { internal } from './_generated/api';
|
||||
import {
|
||||
internalMutation,
|
||||
@@ -68,6 +69,53 @@ const titleFromPrompt = (prompt: string) => {
|
||||
|
||||
const publicThread = (thread: Doc<'threads'>) => thread;
|
||||
|
||||
const isDeletableThreadJob = (job: Doc<'agentJobs'>) =>
|
||||
['failed', 'cancelled', 'timed_out', 'draft_pr_opened'].includes(
|
||||
job.status,
|
||||
) || ['stopped', 'expired', 'failed'].includes(job.workspaceStatus ?? '');
|
||||
|
||||
const deleteThreadJobRows = async (ctx: MutationCtx, job: Doc<'agentJobs'>) => {
|
||||
const [messages, events, artifacts, changes, uiStates, interactions] =
|
||||
await Promise.all([
|
||||
ctx.db
|
||||
.query('agentJobMessages')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
ctx.db
|
||||
.query('agentJobEvents')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
ctx.db
|
||||
.query('agentJobArtifacts')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
ctx.db
|
||||
.query('agentWorkspaceChanges')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
ctx.db
|
||||
.query('agentWorkspaceUiStates')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
ctx.db
|
||||
.query('agentInteractionRequests')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', job._id))
|
||||
.collect(),
|
||||
]);
|
||||
|
||||
for (const row of [
|
||||
...messages,
|
||||
...events,
|
||||
...artifacts,
|
||||
...changes,
|
||||
...uiStates,
|
||||
...interactions,
|
||||
]) {
|
||||
await ctx.db.delete(row._id);
|
||||
}
|
||||
await ctx.db.delete(job._id);
|
||||
};
|
||||
|
||||
export const listMine = query({
|
||||
args: {
|
||||
status: v.optional(v.union(threadStatus, v.literal('all'))),
|
||||
@@ -275,6 +323,42 @@ export const markResolved = mutation({
|
||||
},
|
||||
});
|
||||
|
||||
export const deleteThread = mutation({
|
||||
args: { threadId: v.id('threads') },
|
||||
handler: async (ctx, { threadId }) => {
|
||||
const ownerId = await getRequiredUserId(ctx);
|
||||
const thread = await ctx.db.get(threadId);
|
||||
if (thread?.ownerId !== ownerId) throw new ConvexError('Thread not found.');
|
||||
|
||||
const jobs = (
|
||||
await ctx.db
|
||||
.query('agentJobs')
|
||||
.withIndex('by_owner', (q) => q.eq('ownerId', ownerId))
|
||||
.collect()
|
||||
).filter((job) => job.threadId === threadId);
|
||||
|
||||
const activeJob = jobs.find((job) => !isDeletableThreadJob(job));
|
||||
if (activeJob) {
|
||||
throw new ConvexError(
|
||||
'Stop or cancel active workspace runs before deleting this thread.',
|
||||
);
|
||||
}
|
||||
|
||||
const messages = await ctx.db
|
||||
.query('threadMessages')
|
||||
.withIndex('by_thread', (q) => q.eq('threadId', threadId))
|
||||
.collect();
|
||||
for (const job of jobs) {
|
||||
await deleteThreadJobRows(ctx, job);
|
||||
}
|
||||
for (const message of messages) {
|
||||
await ctx.db.delete(message._id);
|
||||
}
|
||||
await ctx.db.delete(threadId);
|
||||
return { deletedJobs: jobs.length, deletedMessages: messages.length };
|
||||
},
|
||||
});
|
||||
|
||||
export const findOpenMaintenanceThread = internalQuery({
|
||||
args: {
|
||||
spoonId: v.id('spoons'),
|
||||
|
||||
@@ -48,6 +48,7 @@ const createAgentJob = async (
|
||||
spoonId: Id<'spoons'>;
|
||||
status: 'running' | 'failed' | 'cancelled';
|
||||
workspaceStatus?: 'active' | 'stopped' | 'failed' | 'expired';
|
||||
threadId?: Id<'threads'>;
|
||||
},
|
||||
) =>
|
||||
await t.mutation(async (ctx) => {
|
||||
@@ -64,6 +65,7 @@ const createAgentJob = async (
|
||||
spoonId: args.spoonId,
|
||||
ownerId: args.ownerId,
|
||||
agentRequestId: requestId,
|
||||
threadId: args.threadId,
|
||||
status: args.status,
|
||||
prompt: 'Clean this workspace',
|
||||
runtime: 'opencode',
|
||||
@@ -299,6 +301,126 @@ describe('convex-test harness', () => {
|
||||
).rejects.toThrow('Agent job not found.');
|
||||
});
|
||||
|
||||
test('persists and clamps workspace agent thread width', async () => {
|
||||
const t = convexTest(schema, modules);
|
||||
const ownerId = (await createUser(t, 'owner@example.com')) as Id<'users'>;
|
||||
const spoonId = await authed(t, ownerId).mutation(
|
||||
api.spoons.createManual,
|
||||
spoonInput,
|
||||
);
|
||||
const jobId = await createAgentJob(t, {
|
||||
ownerId,
|
||||
spoonId,
|
||||
status: 'running',
|
||||
workspaceStatus: 'active',
|
||||
});
|
||||
|
||||
const defaults = await authed(t, ownerId).query(
|
||||
api.agentJobs.getWorkspaceUiState,
|
||||
{ jobId },
|
||||
);
|
||||
expect(defaults.agentThreadWidth).toBe(420);
|
||||
|
||||
await authed(t, ownerId).mutation(api.agentJobs.patchWorkspaceUiState, {
|
||||
jobId,
|
||||
agentThreadWidth: 999,
|
||||
});
|
||||
const wide = await authed(t, ownerId).query(
|
||||
api.agentJobs.getWorkspaceUiState,
|
||||
{ jobId },
|
||||
);
|
||||
expect(wide.agentThreadWidth).toBe(720);
|
||||
|
||||
await authed(t, ownerId).mutation(api.agentJobs.patchWorkspaceUiState, {
|
||||
jobId,
|
||||
agentThreadWidth: 100,
|
||||
});
|
||||
const narrow = await authed(t, ownerId).query(
|
||||
api.agentJobs.getWorkspaceUiState,
|
||||
{ jobId },
|
||||
);
|
||||
expect(narrow.agentThreadWidth).toBe(320);
|
||||
});
|
||||
|
||||
test('deletes terminal threads and attached terminal workspace rows', async () => {
|
||||
const t = convexTest(schema, modules);
|
||||
const ownerId = (await createUser(t, 'owner@example.com')) as Id<'users'>;
|
||||
const spoonId = await authed(t, ownerId).mutation(
|
||||
api.spoons.createManual,
|
||||
spoonInput,
|
||||
);
|
||||
const threadId = await t.mutation(async (ctx) => {
|
||||
return await ctx.db.insert('threads', {
|
||||
ownerId,
|
||||
spoonId,
|
||||
title: 'Failed attempt',
|
||||
source: 'user_request',
|
||||
status: 'failed',
|
||||
priority: 'normal',
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
});
|
||||
});
|
||||
const jobId = await createAgentJob(t, {
|
||||
ownerId,
|
||||
spoonId,
|
||||
threadId,
|
||||
status: 'failed',
|
||||
workspaceStatus: 'failed',
|
||||
});
|
||||
await t.mutation(async (ctx) => {
|
||||
await ctx.db.patch(threadId, { latestAgentJobId: jobId });
|
||||
});
|
||||
|
||||
await authed(t, ownerId).mutation(api.threads.deleteThread, { threadId });
|
||||
|
||||
const [thread, job, messages] = await t.run(async (ctx) => {
|
||||
const rows = await ctx.db
|
||||
.query('agentJobMessages')
|
||||
.withIndex('by_job', (q) => q.eq('jobId', jobId))
|
||||
.collect();
|
||||
return [await ctx.db.get(threadId), await ctx.db.get(jobId), rows];
|
||||
});
|
||||
expect(thread).toBeNull();
|
||||
expect(job).toBeNull();
|
||||
expect(messages).toHaveLength(0);
|
||||
});
|
||||
|
||||
test('does not delete threads with active workspace runs', async () => {
|
||||
const t = convexTest(schema, modules);
|
||||
const ownerId = (await createUser(t, 'owner@example.com')) as Id<'users'>;
|
||||
const spoonId = await authed(t, ownerId).mutation(
|
||||
api.spoons.createManual,
|
||||
spoonInput,
|
||||
);
|
||||
const threadId = await t.mutation(async (ctx) => {
|
||||
return await ctx.db.insert('threads', {
|
||||
ownerId,
|
||||
spoonId,
|
||||
title: 'Active attempt',
|
||||
source: 'user_request',
|
||||
status: 'running',
|
||||
priority: 'normal',
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
});
|
||||
});
|
||||
const jobId = await createAgentJob(t, {
|
||||
ownerId,
|
||||
spoonId,
|
||||
threadId,
|
||||
status: 'running',
|
||||
workspaceStatus: 'active',
|
||||
});
|
||||
await t.mutation(async (ctx) => {
|
||||
await ctx.db.patch(threadId, { latestAgentJobId: jobId });
|
||||
});
|
||||
|
||||
await expect(
|
||||
authed(t, ownerId).mutation(api.threads.deleteThread, { threadId }),
|
||||
).rejects.toThrow('Stop or cancel active workspace runs');
|
||||
});
|
||||
|
||||
test('queues a new thread job after the previous job is terminal', async () => {
|
||||
const t = convexTest(schema, modules);
|
||||
const ownerId = (await createUser(t, 'owner@example.com')) as Id<'users'>;
|
||||
|
||||
Reference in New Issue
Block a user