Add agent workflows & stuff
This commit is contained in:
@@ -0,0 +1,368 @@
|
||||
'use node';
|
||||
|
||||
import { getAuthUserId } from '@convex-dev/auth/server';
|
||||
import { ConvexError, v } from 'convex/values';
|
||||
|
||||
import type { Doc, Id } from './_generated/dataModel';
|
||||
import type { ActionCtx } from './_generated/server';
|
||||
import type { GitHubCompareSummary } from './githubClient';
|
||||
import { internal } from './_generated/api';
|
||||
import { action, internalAction } from './_generated/server';
|
||||
import {
|
||||
compareAcrossForkNetwork,
|
||||
getInstallationOctokit,
|
||||
getRepository,
|
||||
getSpoonInstallationId,
|
||||
listPullRequests,
|
||||
syncForkBranch,
|
||||
} from './githubClient';
|
||||
|
||||
const getRequiredUserId = async (ctx: ActionCtx) => {
|
||||
const userId = await getAuthUserId(ctx);
|
||||
if (!userId) throw new ConvexError('Not authenticated.');
|
||||
return userId;
|
||||
};
|
||||
|
||||
const toStatus = (upstreamAheadBy: number, forkAheadBy: number) => {
|
||||
if (upstreamAheadBy === 0 && forkAheadBy === 0) return 'up_to_date' as const;
|
||||
if (upstreamAheadBy > 0 && forkAheadBy === 0) return 'behind' as const;
|
||||
if (upstreamAheadBy === 0 && forkAheadBy > 0) return 'ahead' as const;
|
||||
if (upstreamAheadBy > 0 && forkAheadBy > 0) return 'diverged' as const;
|
||||
return 'unknown' as const;
|
||||
};
|
||||
|
||||
const getLastCommitAt = (compare: GitHubCompareSummary) =>
|
||||
compare.commits[compare.commits.length - 1]?.committedAt;
|
||||
|
||||
const ensureForkMetadata = (spoon: Doc<'spoons'>) => {
|
||||
if (!spoon.forkOwner || !spoon.forkRepo) {
|
||||
throw new ConvexError('Fork metadata is required before GitHub refresh.');
|
||||
}
|
||||
return {
|
||||
forkOwner: spoon.forkOwner,
|
||||
forkRepo: spoon.forkRepo,
|
||||
forkBranch: spoon.forkDefaultBranch ?? spoon.upstreamDefaultBranch,
|
||||
};
|
||||
};
|
||||
|
||||
const refreshOwnedSpoon = async (
|
||||
ctx: ActionCtx,
|
||||
ownerId: Id<'users'>,
|
||||
spoonId: Id<'spoons'>,
|
||||
kind: 'manual_check' | 'scheduled_check' = 'manual_check',
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
status: ReturnType<typeof toStatus>;
|
||||
upstreamAheadBy: number;
|
||||
forkAheadBy: number;
|
||||
}> => {
|
||||
const spoon: Doc<'spoons'> = await ctx.runQuery(
|
||||
internal.spoons.getOwnedForAction,
|
||||
{
|
||||
spoonId,
|
||||
ownerId,
|
||||
},
|
||||
);
|
||||
if (spoon.provider !== 'github') {
|
||||
throw new ConvexError(
|
||||
'GitHub refresh is only available for GitHub Spoons.',
|
||||
);
|
||||
}
|
||||
const connection = await ctx.runQuery(internal.github.getConnectionForUser, {
|
||||
userId: ownerId,
|
||||
});
|
||||
const installationId = getSpoonInstallationId(spoon, connection);
|
||||
const { forkOwner, forkRepo, forkBranch } = ensureForkMetadata(spoon);
|
||||
const syncRunId = await ctx.runMutation(internal.syncRuns.createInternal, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
kind,
|
||||
status: 'running',
|
||||
summary: 'Refreshing GitHub repository state.',
|
||||
});
|
||||
|
||||
await ctx.runMutation(internal.spoons.patchSyncFields, {
|
||||
spoonId,
|
||||
syncStatus: 'checking',
|
||||
lastSyncRunId: syncRunId,
|
||||
lastGithubRefreshAt: Date.now(),
|
||||
});
|
||||
|
||||
try {
|
||||
const octokit = getInstallationOctokit(installationId);
|
||||
const [upstreamRepo, forkRepoData] = await Promise.all([
|
||||
getRepository(octokit, spoon.upstreamOwner, spoon.upstreamRepo),
|
||||
getRepository(octokit, forkOwner, forkRepo),
|
||||
]);
|
||||
const upstreamBranch =
|
||||
spoon.upstreamDefaultBranch || upstreamRepo.default_branch;
|
||||
const resolvedForkBranch = forkBranch || forkRepoData.default_branch;
|
||||
const [upstreamCompare, forkCompare, forkPulls, upstreamPulls]: [
|
||||
GitHubCompareSummary,
|
||||
GitHubCompareSummary,
|
||||
Awaited<ReturnType<typeof listPullRequests>>,
|
||||
Awaited<ReturnType<typeof listPullRequests>>,
|
||||
] = await Promise.all([
|
||||
compareAcrossForkNetwork(octokit, {
|
||||
owner: spoon.upstreamOwner,
|
||||
repo: spoon.upstreamRepo,
|
||||
baseOwner: forkOwner,
|
||||
baseBranch: resolvedForkBranch,
|
||||
headOwner: spoon.upstreamOwner,
|
||||
headBranch: upstreamBranch,
|
||||
}),
|
||||
compareAcrossForkNetwork(octokit, {
|
||||
owner: spoon.upstreamOwner,
|
||||
repo: spoon.upstreamRepo,
|
||||
baseOwner: spoon.upstreamOwner,
|
||||
baseBranch: upstreamBranch,
|
||||
headOwner: forkOwner,
|
||||
headBranch: resolvedForkBranch,
|
||||
}),
|
||||
listPullRequests(octokit, { owner: forkOwner, repo: forkRepo }),
|
||||
listPullRequests(octokit, {
|
||||
owner: spoon.upstreamOwner,
|
||||
repo: spoon.upstreamRepo,
|
||||
head: `${forkOwner}:${resolvedForkBranch}`,
|
||||
}),
|
||||
]);
|
||||
const status = toStatus(upstreamCompare.aheadBy, forkCompare.aheadBy);
|
||||
const openForkPullRequestCount = forkPulls.filter(
|
||||
(pull) => pull.state === 'open',
|
||||
).length;
|
||||
const openUpstreamPullRequestCount = upstreamPulls.filter(
|
||||
(pull) => pull.state === 'open',
|
||||
).length;
|
||||
const now = Date.now();
|
||||
|
||||
await Promise.all([
|
||||
ctx.runMutation(internal.spoonState.upsert, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
upstreamFullName: upstreamRepo.full_name,
|
||||
forkFullName: forkRepoData.full_name,
|
||||
upstreamDefaultBranch: upstreamRepo.default_branch,
|
||||
forkDefaultBranch: forkRepoData.default_branch,
|
||||
upstreamHeadSha: upstreamCompare.headSha,
|
||||
forkHeadSha: forkCompare.headSha,
|
||||
mergeBaseSha: upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha,
|
||||
upstreamAheadBy: upstreamCompare.aheadBy,
|
||||
forkAheadBy: forkCompare.aheadBy,
|
||||
status,
|
||||
openForkPullRequestCount,
|
||||
openUpstreamPullRequestCount,
|
||||
lastCommitAt:
|
||||
getLastCommitAt(upstreamCompare) ?? getLastCommitAt(forkCompare),
|
||||
rawCompareUrl: upstreamCompare.htmlUrl,
|
||||
}),
|
||||
ctx.runMutation(internal.spoonCommits.replaceForSpoon, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
side: 'upstream',
|
||||
commits: upstreamCompare.commits,
|
||||
}),
|
||||
ctx.runMutation(internal.spoonCommits.replaceForSpoon, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
side: 'fork',
|
||||
commits: forkCompare.commits,
|
||||
}),
|
||||
ctx.runMutation(internal.spoonPullRequests.replaceForSpoon, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
scope: 'fork',
|
||||
pullRequests: forkPulls,
|
||||
}),
|
||||
ctx.runMutation(internal.spoonPullRequests.replaceForSpoon, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
scope: 'from_fork_to_upstream',
|
||||
pullRequests: upstreamPulls,
|
||||
}),
|
||||
]);
|
||||
|
||||
await ctx.runMutation(internal.spoons.patchSyncFields, {
|
||||
spoonId,
|
||||
syncStatus: status,
|
||||
upstreamAheadBy: upstreamCompare.aheadBy,
|
||||
forkAheadBy: forkCompare.aheadBy,
|
||||
lastMergeBaseCommit:
|
||||
upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha,
|
||||
lastUpstreamCommit: upstreamCompare.headSha,
|
||||
lastForkCommit: forkCompare.headSha,
|
||||
lastGithubRefreshAt: now,
|
||||
lastSuccessfulRefreshAt: now,
|
||||
lastCheckedAt: now,
|
||||
lastError: '',
|
||||
});
|
||||
await ctx.runMutation(internal.syncRuns.patchInternal, {
|
||||
syncRunId,
|
||||
status: status === 'diverged' ? 'needs_review' : 'clean',
|
||||
summary: `GitHub refresh complete: ${upstreamCompare.aheadBy} upstream commit(s), ${forkCompare.aheadBy} fork-only commit(s).`,
|
||||
});
|
||||
return {
|
||||
success: true,
|
||||
status,
|
||||
upstreamAheadBy: upstreamCompare.aheadBy,
|
||||
forkAheadBy: forkCompare.aheadBy,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
await Promise.all([
|
||||
ctx.runMutation(internal.spoons.patchSyncFields, {
|
||||
spoonId,
|
||||
syncStatus: 'error',
|
||||
lastGithubRefreshAt: Date.now(),
|
||||
lastCheckedAt: Date.now(),
|
||||
lastError: message,
|
||||
}),
|
||||
ctx.runMutation(internal.syncRuns.patchInternal, {
|
||||
syncRunId,
|
||||
status: 'failed',
|
||||
error: message,
|
||||
}),
|
||||
]);
|
||||
throw new ConvexError(message);
|
||||
}
|
||||
};
|
||||
|
||||
export const refreshSpoonGithubState = action({
|
||||
args: { spoonId: v.id('spoons') },
|
||||
handler: async (
|
||||
ctx,
|
||||
{ spoonId },
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
status: ReturnType<typeof toStatus>;
|
||||
upstreamAheadBy: number;
|
||||
forkAheadBy: number;
|
||||
}> => {
|
||||
const ownerId = await getRequiredUserId(ctx);
|
||||
return await refreshOwnedSpoon(ctx, ownerId, spoonId);
|
||||
},
|
||||
});
|
||||
|
||||
export const syncForkWithUpstream = action({
|
||||
args: { spoonId: v.id('spoons') },
|
||||
handler: async (
|
||||
ctx,
|
||||
{ spoonId },
|
||||
): Promise<{
|
||||
success: boolean;
|
||||
status: ReturnType<typeof toStatus>;
|
||||
upstreamAheadBy: number;
|
||||
forkAheadBy: number;
|
||||
}> => {
|
||||
const ownerId = await getRequiredUserId(ctx);
|
||||
const spoon: Doc<'spoons'> = await ctx.runQuery(
|
||||
internal.spoons.getOwnedForAction,
|
||||
{
|
||||
spoonId,
|
||||
ownerId,
|
||||
},
|
||||
);
|
||||
const state = await ctx.runQuery(internal.spoonState.getInternal, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
});
|
||||
if (state?.status !== 'behind' || state.forkAheadBy !== 0) {
|
||||
throw new ConvexError(
|
||||
'Sync is only available for behind, non-diverged forks.',
|
||||
);
|
||||
}
|
||||
const connection = await ctx.runQuery(
|
||||
internal.github.getConnectionForUser,
|
||||
{
|
||||
userId: ownerId,
|
||||
},
|
||||
);
|
||||
const installationId = getSpoonInstallationId(spoon, connection);
|
||||
const { forkOwner, forkRepo, forkBranch } = ensureForkMetadata(spoon);
|
||||
const syncRunId = await ctx.runMutation(internal.syncRuns.createInternal, {
|
||||
spoonId,
|
||||
ownerId,
|
||||
kind: 'merge_attempt',
|
||||
status: 'running',
|
||||
summary: 'Syncing fork branch with upstream.',
|
||||
});
|
||||
try {
|
||||
const octokit = getInstallationOctokit(installationId);
|
||||
await syncForkBranch(octokit, {
|
||||
forkOwner,
|
||||
forkRepo,
|
||||
branch: forkBranch,
|
||||
});
|
||||
await ctx.runMutation(internal.syncRuns.patchInternal, {
|
||||
syncRunId,
|
||||
status: 'merged',
|
||||
summary: 'GitHub fork sync completed.',
|
||||
});
|
||||
return await refreshOwnedSpoon(ctx, ownerId, spoonId, 'manual_check');
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
const conflict = message.toLowerCase().includes('conflict');
|
||||
await ctx.runMutation(internal.syncRuns.patchInternal, {
|
||||
syncRunId,
|
||||
status: conflict ? 'conflict' : 'failed',
|
||||
error: message,
|
||||
});
|
||||
await ctx.runMutation(internal.spoons.patchSyncFields, {
|
||||
spoonId,
|
||||
syncStatus: conflict ? 'conflict' : 'error',
|
||||
lastError: message,
|
||||
});
|
||||
throw new ConvexError(message);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
export const refreshDueSpoons = internalAction({
|
||||
args: { limit: v.optional(v.number()) },
|
||||
handler: async (
|
||||
ctx,
|
||||
{ limit },
|
||||
): Promise<
|
||||
(
|
||||
| {
|
||||
success: boolean;
|
||||
status: ReturnType<typeof toStatus>;
|
||||
upstreamAheadBy: number;
|
||||
forkAheadBy: number;
|
||||
}
|
||||
| { success: false; spoonId: Id<'spoons'>; error: string }
|
||||
)[]
|
||||
> => {
|
||||
const due: { spoonId: Id<'spoons'>; ownerId: Id<'users'> }[] =
|
||||
await ctx.runQuery(internal.spoonSettings.listRefreshDue, {
|
||||
limit: limit ?? 10,
|
||||
});
|
||||
const results: (
|
||||
| {
|
||||
success: boolean;
|
||||
status: ReturnType<typeof toStatus>;
|
||||
upstreamAheadBy: number;
|
||||
forkAheadBy: number;
|
||||
}
|
||||
| { success: false; spoonId: Id<'spoons'>; error: string }
|
||||
)[] = [];
|
||||
for (const item of due) {
|
||||
try {
|
||||
results.push(
|
||||
await refreshOwnedSpoon(
|
||||
ctx,
|
||||
item.ownerId,
|
||||
item.spoonId,
|
||||
'scheduled_check',
|
||||
),
|
||||
);
|
||||
} catch (error) {
|
||||
results.push({
|
||||
success: false,
|
||||
spoonId: item.spoonId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
return results;
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user