From 35601e8e4bc976ff8f7cb2885402cfd4700ede3d Mon Sep 17 00:00:00 2001 From: Madhura68 Date: Thu, 7 May 2026 12:33:55 +0200 Subject: [PATCH] PBI-50 F2-T2/T3: SPRINT_IMPLEMENTATION-pad in getFullJobContext + lease-driven stale-reset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F2-T2 — getFullJobContext branche voor `kind === 'SPRINT_IMPLEMENTATION'`: - Fetch sprint_run met deep include (sprint → product + stories → pbi + tasks). - resolveRepoRoot via product; rollbackClaim bij faal. - Branch-resolutie: previous_run_id + branch → reuse (resume-pad), anders verse `feat/sprint-`. createWorktreeForJob met juiste reuseBranch-flag. - Capture base_sha via `git rev-parse HEAD` na worktree-add. - Frozen scope-snapshot: SprintTaskExecution.createMany met plan_snapshot, verify_required_snapshot, verify_only_snapshot per task in scope. Order is PBI→Story→Task. base_sha alleen op task[0] (rest fillt verify-tool). - Update job.branch + job.base_sha + sprint_run.branch in één transactie. - Lookup execution_ids voor response shape. F2-T3 — resetStaleClaimedJobs lease-driven: - WHERE-clause uitgebreid naar `status IN ('CLAIMED','RUNNING')` met OR-clause `lease_until < NOW() OR (lease_until IS NULL AND claimed_at < NOW() - 30min)`. Legacy jobs zonder lease blijven via claimed_at-pad werken; nieuwe jobs via lease_until. - RETURNING uitgebreid met kind, sprint_run_id, branch. - Bij stale FAILED SPRINT_IMPLEMENTATION: push branch (geen mark-ready, geen PR-promotie) zodat werk niet verloren gaat. Vul SprintRun.failure_reason met laatst-RUNNING execution voor diagnose. Imports: getWorktreeRoot uit worktree-paths.js, pushBranchForJob uit push.js. Tests: 31 files, 243 passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tools/wait-for-job.ts | 250 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 237 insertions(+), 13 deletions(-) diff --git a/src/tools/wait-for-job.ts b/src/tools/wait-for-job.ts index d05e9f8..1323e50 100644 --- a/src/tools/wait-for-job.ts +++ b/src/tools/wait-for-job.ts @@ -15,7 +15,9 @@ const execFileP = promisify(execFile) import { requireWriteAccess } from '../auth.js' import { toolJson, toolError, withToolErrors } from '../errors.js' import { createWorktreeForJob } from '../git/worktree.js' +import { getWorktreeRoot } from '../git/worktree-paths.js' import { setupProductWorktrees, releaseLocksOnTerminal } from '../git/job-locks.js' +import { pushBranchForJob } from '../git/push.js' /** Parse `https://github.com//(.git)?` → ``. */ export function repoNameFromUrl(repoUrl: string | null | undefined): string | null { @@ -225,45 +227,96 @@ const inputSchema = z.object({ const STALE_ERROR_MSG = 'agent did not complete job within 2 attempts' export async function resetStaleClaimedJobs(userId: string): Promise { - // Jobs that exceeded the retry limit → FAILED - const failedRows = await prisma.$queryRaw< - Array<{ id: string; task_id: string; product_id: string }> - >` + // PBI-50: lease-driven stale-detection. Jobs in CLAIMED of RUNNING met + // verlopen lease_until (default 5 min, verlengd door job_heartbeat) worden + // gereset. Legacy jobs zonder lease_until vallen terug op de oude + // claimed_at + 30-min-regel. + type StaleRow = { + id: string + task_id: string | null + product_id: string + kind: string + sprint_run_id: string | null + branch: string | null + } + + const failedRows = await prisma.$queryRaw` UPDATE claude_jobs SET status = 'FAILED', finished_at = NOW(), error = ${STALE_ERROR_MSG} WHERE user_id = ${userId} - AND status = 'CLAIMED' - AND claimed_at < NOW() - INTERVAL '30 minutes' + AND status IN ('CLAIMED', 'RUNNING') AND retry_count >= 2 - RETURNING id, task_id, product_id + AND ( + lease_until < NOW() + OR (lease_until IS NULL AND claimed_at < NOW() - INTERVAL '30 minutes') + ) + RETURNING id, task_id, product_id, kind::text AS kind, sprint_run_id, branch ` - // Jobs under the retry limit → back to QUEUED, increment retry_count const requeuedRows = await prisma.$queryRaw< - Array<{ id: string; task_id: string; product_id: string; retry_count: number }> + (StaleRow & { retry_count: number })[] >` UPDATE claude_jobs SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL, + lease_until = NULL, retry_count = retry_count + 1 WHERE user_id = ${userId} - AND status = 'CLAIMED' - AND claimed_at < NOW() - INTERVAL '30 minutes' + AND status IN ('CLAIMED', 'RUNNING') AND retry_count < 2 - RETURNING id, task_id, product_id, retry_count + AND ( + lease_until < NOW() + OR (lease_until IS NULL AND claimed_at < NOW() - INTERVAL '30 minutes') + ) + RETURNING id, task_id, product_id, kind::text AS kind, sprint_run_id, branch, retry_count ` if (failedRows.length === 0 && requeuedRows.length === 0) return // PBI-9: release any product-worktree locks held by these stale jobs. - // No-op for jobs without registered locks (TASK_IMPLEMENTATION). for (const j of failedRows) await releaseLocksOnTerminal(j.id) for (const j of requeuedRows) await releaseLocksOnTerminal(j.id) + // PBI-50: voor stale FAILED SPRINT_IMPLEMENTATION jobs — push de branch + // zodat het werk niet verloren gaat (geen mark-ready / PR-promotie), + // en zet SprintRun.failure_reason met een verwijzing naar de laatst + // RUNNING execution voor diagnose. + for (const j of failedRows.filter((r) => r.kind === 'SPRINT_IMPLEMENTATION')) { + if (j.branch && j.product_id) { + const repoRoot = await resolveRepoRoot(j.product_id).catch(() => null) + if (repoRoot) { + const worktreeDir = getWorktreeRoot() + const worktreePath = path.join(worktreeDir, j.id) + try { + await pushBranchForJob({ worktreePath, branchName: j.branch }) + } catch (err) { + console.warn(`[stale-reset] push failed for stale sprint-job ${j.id}:`, err) + } + } + } + if (j.sprint_run_id) { + const lastRunning = await prisma.sprintTaskExecution.findFirst({ + where: { sprint_job_id: j.id, status: 'RUNNING' }, + orderBy: { order: 'desc' }, + select: { order: true, task_id: true }, + }) + const reasonSuffix = lastRunning + ? `, last execution: order ${lastRunning.order} task ${lastRunning.task_id}` + : '' + await prisma.sprintRun.update({ + where: { id: j.sprint_run_id }, + data: { + status: 'FAILED', + failure_reason: `stale: lease verlopen${reasonSuffix}`, + }, + }) + } + } + // Notify UI via SSE for each transition (best-effort) try { const pg = new Client({ connectionString: process.env.DATABASE_URL }) @@ -490,6 +543,177 @@ async function getFullJobContext(jobId: string) { } } + // PBI-50: SPRINT_IMPLEMENTATION — single-session sprint runner. + // Eén ClaudeJob per SprintRun handelt sequentieel alle TO_DO-tasks af. + // Bij claim: maak frozen scope-snapshot via SprintTaskExecution-rows, + // resolve worktree (verse branch of hergebruikt via previous_run_id), + // capture base_sha. Worker werkt uitsluitend op deze frozen snapshot. + if (job.kind === 'SPRINT_IMPLEMENTATION') { + if (!job.sprint_run_id) { + await rollbackClaim(job.id) + return null + } + const sprintRun = await prisma.sprintRun.findUnique({ + where: { id: job.sprint_run_id }, + include: { + sprint: { + include: { + product: true, + stories: { + where: { status: { not: 'DONE' } }, + include: { + pbi: { + select: { id: true, code: true, title: true, priority: true, sort_order: true, status: true }, + }, + tasks: { + where: { status: 'TO_DO' }, + orderBy: [{ priority: 'asc' }, { sort_order: 'asc' }], + }, + }, + orderBy: [{ priority: 'asc' }, { sort_order: 'asc' }], + }, + }, + }, + }, + }) + if (!sprintRun) { + await rollbackClaim(job.id) + return null + } + + const repoRoot = await resolveRepoRoot(sprintRun.sprint.product_id) + if (!repoRoot) { + await rollbackClaim(job.id) + return null + } + + // Branch resolution: previous_run_id + branch → reuse; anders verse. + const isResume = !!(sprintRun.previous_run_id && sprintRun.branch) + const branchName = isResume + ? sprintRun.branch! + : `feat/sprint-${job.sprint_run_id.slice(-8)}` + + let worktreePath: string + let baseSha: string + try { + const wt = await createWorktreeForJob({ + repoRoot, + jobId: job.id, + branchName, + reuseBranch: isResume, + }) + worktreePath = wt.worktreePath + + const { stdout: headSha } = await execFileP('git', ['rev-parse', 'HEAD'], { + cwd: worktreePath, + }) + baseSha = headSha.trim() + } catch (err) { + console.warn(`[wait-for-job] sprint-worktree setup failed for ${job.id}:`, err) + await rollbackClaim(job.id) + return null + } + + // Verzamel ordered tasks in flat list, behoud volgorde + const orderedTasks = sprintRun.sprint.stories.flatMap((s) => + s.tasks.map((t) => ({ ...t, story_pbi_id: s.pbi.id })), + ) + + // Persist branch + base_sha + scope-snapshot in één transactie + await prisma.$transaction([ + prisma.claudeJob.update({ + where: { id: job.id }, + data: { branch: branchName, base_sha: baseSha }, + }), + prisma.sprintTaskExecution.createMany({ + data: orderedTasks.map((t, idx) => ({ + sprint_job_id: job.id, + task_id: t.id, + order: idx, + plan_snapshot: t.implementation_plan ?? '', + verify_required_snapshot: t.verify_required, + verify_only_snapshot: t.verify_only, + base_sha: idx === 0 ? baseSha : null, + status: 'PENDING' as const, + })), + }), + prisma.sprintRun.update({ + where: { id: job.sprint_run_id }, + data: { branch: branchName }, + }), + ]) + + // Lookup execution_ids in volgorde voor de response + const executions = await prisma.sprintTaskExecution.findMany({ + where: { sprint_job_id: job.id }, + orderBy: { order: 'asc' }, + select: { id: true, task_id: true, order: true, base_sha: true }, + }) + const execIdByTaskId = new Map(executions.map((e) => [e.task_id, e.id])) + + // Dedupe PBIs uit de stories (één PBI kan meerdere stories hebben) + const pbiMap = new Map() + for (const s of sprintRun.sprint.stories) pbiMap.set(s.pbi.id, s.pbi) + + return { + job_id: job.id, + kind: job.kind, + status: 'claimed', + sprint: { + id: sprintRun.sprint.id, + sprint_goal: sprintRun.sprint.sprint_goal, + status: sprintRun.sprint.status, + }, + sprint_run: { + id: sprintRun.id, + pr_strategy: sprintRun.pr_strategy, + branch: branchName, + previous_run_id: sprintRun.previous_run_id, + }, + product: { + id: sprintRun.sprint.product.id, + name: sprintRun.sprint.product.name, + repo_url: sprintRun.sprint.product.repo_url, + definition_of_done: sprintRun.sprint.product.definition_of_done, + auto_pr: sprintRun.sprint.product.auto_pr, + }, + pbis: Array.from(pbiMap.values()).map((p) => ({ + id: p.id, + code: p.code, + title: p.title, + priority: p.priority, + sort_order: p.sort_order, + status: p.status, + })), + stories: sprintRun.sprint.stories.map((s) => ({ + id: s.id, + code: s.code, + title: s.title, + pbi_id: s.pbi_id, + priority: s.priority, + sort_order: s.sort_order, + status: s.status, + })), + task_executions: orderedTasks.map((t, idx) => ({ + execution_id: execIdByTaskId.get(t.id)!, + task_id: t.id, + code: t.code, + title: t.title, + story_id: t.story_id, + order: idx, + plan_snapshot: t.implementation_plan ?? '', + verify_required: t.verify_required, + verify_only: t.verify_only, + base_sha: idx === 0 ? baseSha : null, + })), + worktree_path: worktreePath, + branch_name: branchName, + repo_url: sprintRun.sprint.product.repo_url, + base_sha: baseSha, + heartbeat_interval_seconds: 60, + } + } + // TASK_IMPLEMENTATION (default) — bestaande gedrag onaangetast. const { task } = job if (!task) return null