'use node'; import { getAuthUserId } from '@convex-dev/auth/server'; import { ConvexError, v } from 'convex/values'; import type { Doc, Id } from './_generated/dataModel'; import type { ActionCtx } from './_generated/server'; import type { GitHubCompareSummary } from './githubClient'; import { internal } from './_generated/api'; import { action, internalAction } from './_generated/server'; import { compareAcrossForkNetwork, getInstallationOctokit, getRepository, getSpoonInstallationId, listPullRequests, syncForkBranch, } from './githubClient'; const getRequiredUserId = async (ctx: ActionCtx) => { const userId = await getAuthUserId(ctx); if (!userId) throw new ConvexError('Not authenticated.'); return userId; }; const toStatus = (upstreamAheadBy: number, forkAheadBy: number) => { if (upstreamAheadBy === 0 && forkAheadBy === 0) return 'up_to_date' as const; if (upstreamAheadBy > 0 && forkAheadBy === 0) return 'behind' as const; if (upstreamAheadBy === 0 && forkAheadBy > 0) return 'ahead' as const; if (upstreamAheadBy > 0 && forkAheadBy > 0) return 'diverged' as const; return 'unknown' as const; }; const getLastCommitAt = (compare: GitHubCompareSummary) => compare.commits[compare.commits.length - 1]?.committedAt; const ensureForkMetadata = (spoon: Doc<'spoons'>) => { if (!spoon.forkOwner || !spoon.forkRepo) { throw new ConvexError('Fork metadata is required before GitHub refresh.'); } return { forkOwner: spoon.forkOwner, forkRepo: spoon.forkRepo, forkBranch: spoon.forkDefaultBranch ?? spoon.upstreamDefaultBranch, }; }; const refreshOwnedSpoon = async ( ctx: ActionCtx, ownerId: Id<'users'>, spoonId: Id<'spoons'>, kind: 'manual_check' | 'scheduled_check' = 'manual_check', ): Promise<{ success: boolean; status: ReturnType; upstreamAheadBy: number; forkAheadBy: number; }> => { const spoon: Doc<'spoons'> = await ctx.runQuery( internal.spoons.getOwnedForAction, { spoonId, ownerId, }, ); if (spoon.provider !== 'github') { throw new ConvexError( 'GitHub refresh is only available for GitHub Spoons.', ); } const connection = await ctx.runQuery(internal.github.getConnectionForUser, { userId: ownerId, }); const installationId = getSpoonInstallationId(spoon, connection); const { forkOwner, forkRepo, forkBranch } = ensureForkMetadata(spoon); const syncRunId = await ctx.runMutation(internal.syncRuns.createInternal, { spoonId, ownerId, kind, status: 'running', summary: 'Refreshing GitHub repository state.', }); await ctx.runMutation(internal.spoons.patchSyncFields, { spoonId, syncStatus: 'checking', lastSyncRunId: syncRunId, lastGithubRefreshAt: Date.now(), }); try { const octokit = getInstallationOctokit(installationId); const [upstreamRepo, forkRepoData] = await Promise.all([ getRepository(octokit, spoon.upstreamOwner, spoon.upstreamRepo), getRepository(octokit, forkOwner, forkRepo), ]); const upstreamBranch = spoon.upstreamDefaultBranch || upstreamRepo.default_branch; const resolvedForkBranch = forkBranch || forkRepoData.default_branch; const [upstreamCompare, forkCompare, forkPulls, upstreamPulls]: [ GitHubCompareSummary, GitHubCompareSummary, Awaited>, Awaited>, ] = await Promise.all([ compareAcrossForkNetwork(octokit, { owner: spoon.upstreamOwner, repo: spoon.upstreamRepo, baseOwner: forkOwner, baseBranch: resolvedForkBranch, headOwner: spoon.upstreamOwner, headBranch: upstreamBranch, }), compareAcrossForkNetwork(octokit, { owner: spoon.upstreamOwner, repo: spoon.upstreamRepo, baseOwner: spoon.upstreamOwner, baseBranch: upstreamBranch, headOwner: forkOwner, headBranch: resolvedForkBranch, }), listPullRequests(octokit, { owner: forkOwner, repo: forkRepo }), listPullRequests(octokit, { owner: spoon.upstreamOwner, repo: spoon.upstreamRepo, head: `${forkOwner}:${resolvedForkBranch}`, }), ]); const status = toStatus(upstreamCompare.aheadBy, forkCompare.aheadBy); const openForkPullRequestCount = forkPulls.filter( (pull) => pull.state === 'open', ).length; const openUpstreamPullRequestCount = upstreamPulls.filter( (pull) => pull.state === 'open', ).length; const now = Date.now(); await Promise.all([ ctx.runMutation(internal.spoonState.upsert, { spoonId, ownerId, upstreamFullName: upstreamRepo.full_name, forkFullName: forkRepoData.full_name, upstreamDefaultBranch: upstreamRepo.default_branch, forkDefaultBranch: forkRepoData.default_branch, upstreamHeadSha: upstreamCompare.headSha, forkHeadSha: forkCompare.headSha, mergeBaseSha: upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha, upstreamAheadBy: upstreamCompare.aheadBy, forkAheadBy: forkCompare.aheadBy, status, openForkPullRequestCount, openUpstreamPullRequestCount, lastCommitAt: getLastCommitAt(upstreamCompare) ?? getLastCommitAt(forkCompare), rawCompareUrl: upstreamCompare.htmlUrl, }), ctx.runMutation(internal.spoonCommits.replaceForSpoon, { spoonId, ownerId, side: 'upstream', commits: upstreamCompare.commits, }), ctx.runMutation(internal.spoonCommits.replaceForSpoon, { spoonId, ownerId, side: 'fork', commits: forkCompare.commits, }), ctx.runMutation(internal.spoonPullRequests.replaceForSpoon, { spoonId, ownerId, scope: 'fork', pullRequests: forkPulls, }), ctx.runMutation(internal.spoonPullRequests.replaceForSpoon, { spoonId, ownerId, scope: 'from_fork_to_upstream', pullRequests: upstreamPulls, }), ]); await ctx.runMutation(internal.spoons.patchSyncFields, { spoonId, syncStatus: status, upstreamAheadBy: upstreamCompare.aheadBy, forkAheadBy: forkCompare.aheadBy, lastMergeBaseCommit: upstreamCompare.mergeBaseSha ?? forkCompare.mergeBaseSha, lastUpstreamCommit: upstreamCompare.headSha, lastForkCommit: forkCompare.headSha, lastGithubRefreshAt: now, lastSuccessfulRefreshAt: now, lastCheckedAt: now, lastError: '', }); await ctx.runMutation(internal.syncRuns.patchInternal, { syncRunId, status: status === 'diverged' ? 'needs_review' : 'clean', summary: `GitHub refresh complete: ${upstreamCompare.aheadBy} upstream commit(s), ${forkCompare.aheadBy} fork-only commit(s).`, }); return { success: true, status, upstreamAheadBy: upstreamCompare.aheadBy, forkAheadBy: forkCompare.aheadBy, }; } catch (error) { const message = error instanceof Error ? error.message : String(error); await Promise.all([ ctx.runMutation(internal.spoons.patchSyncFields, { spoonId, syncStatus: 'error', lastGithubRefreshAt: Date.now(), lastCheckedAt: Date.now(), lastError: message, }), ctx.runMutation(internal.syncRuns.patchInternal, { syncRunId, status: 'failed', error: message, }), ]); throw new ConvexError(message); } }; export const refreshSpoonGithubState = action({ args: { spoonId: v.id('spoons') }, handler: async ( ctx, { spoonId }, ): Promise<{ success: boolean; status: ReturnType; upstreamAheadBy: number; forkAheadBy: number; }> => { const ownerId = await getRequiredUserId(ctx); return await refreshOwnedSpoon(ctx, ownerId, spoonId); }, }); export const syncForkWithUpstream = action({ args: { spoonId: v.id('spoons') }, handler: async ( ctx, { spoonId }, ): Promise<{ success: boolean; status: ReturnType; upstreamAheadBy: number; forkAheadBy: number; }> => { const ownerId = await getRequiredUserId(ctx); const spoon: Doc<'spoons'> = await ctx.runQuery( internal.spoons.getOwnedForAction, { spoonId, ownerId, }, ); const state = await ctx.runQuery(internal.spoonState.getInternal, { spoonId, ownerId, }); if (state?.status !== 'behind' || state.forkAheadBy !== 0) { throw new ConvexError( 'Sync is only available for behind, non-diverged forks.', ); } const connection = await ctx.runQuery( internal.github.getConnectionForUser, { userId: ownerId, }, ); const installationId = getSpoonInstallationId(spoon, connection); const { forkOwner, forkRepo, forkBranch } = ensureForkMetadata(spoon); const syncRunId = await ctx.runMutation(internal.syncRuns.createInternal, { spoonId, ownerId, kind: 'merge_attempt', status: 'running', summary: 'Syncing fork branch with upstream.', }); try { const octokit = getInstallationOctokit(installationId); await syncForkBranch(octokit, { forkOwner, forkRepo, branch: forkBranch, }); await ctx.runMutation(internal.syncRuns.patchInternal, { syncRunId, status: 'merged', summary: 'GitHub fork sync completed.', }); return await refreshOwnedSpoon(ctx, ownerId, spoonId, 'manual_check'); } catch (error) { const message = error instanceof Error ? error.message : String(error); const conflict = message.toLowerCase().includes('conflict'); await ctx.runMutation(internal.syncRuns.patchInternal, { syncRunId, status: conflict ? 'conflict' : 'failed', error: message, }); await ctx.runMutation(internal.spoons.patchSyncFields, { spoonId, syncStatus: conflict ? 'conflict' : 'error', lastError: message, }); throw new ConvexError(message); } }, }); export const refreshDueSpoons = internalAction({ args: { limit: v.optional(v.number()) }, handler: async ( ctx, { limit }, ): Promise< ( | { success: boolean; status: ReturnType; upstreamAheadBy: number; forkAheadBy: number; } | { success: false; spoonId: Id<'spoons'>; error: string } )[] > => { const due: { spoonId: Id<'spoons'>; ownerId: Id<'users'> }[] = await ctx.runQuery(internal.spoonSettings.listRefreshDue, { limit: limit ?? 10, }); const results: ( | { success: boolean; status: ReturnType; upstreamAheadBy: number; forkAheadBy: number; } | { success: false; spoonId: Id<'spoons'>; error: string } )[] = []; for (const item of due) { try { results.push( await refreshOwnedSpoon( ctx, item.ownerId, item.spoonId, 'scheduled_check', ), ); } catch (error) { results.push({ success: false, spoonId: item.spoonId, error: error instanceof Error ? error.message : String(error), }); } } return results; }, });