PBI-50 F2-T2/T3: SPRINT_IMPLEMENTATION-pad in getFullJobContext + lease-driven stale-reset

F2-T2 — getFullJobContext branche voor `kind === 'SPRINT_IMPLEMENTATION'`:
- Fetch sprint_run met deep include (sprint → product + stories → pbi + tasks).
- resolveRepoRoot via product; rollbackClaim bij faal.
- Branch-resolutie: previous_run_id + branch → reuse (resume-pad), anders
  verse `feat/sprint-<run_id-suffix>`. createWorktreeForJob met juiste
  reuseBranch-flag.
- Capture base_sha via `git rev-parse HEAD` na worktree-add.
- Frozen scope-snapshot: SprintTaskExecution.createMany met plan_snapshot,
  verify_required_snapshot, verify_only_snapshot per task in scope. Order
  is PBI→Story→Task. base_sha alleen op task[0] (rest fillt verify-tool).
- Update job.branch + job.base_sha + sprint_run.branch in één transactie.
- Lookup execution_ids voor response shape.

F2-T3 — resetStaleClaimedJobs lease-driven:
- WHERE-clause uitgebreid naar `status IN ('CLAIMED','RUNNING')` met OR-clause
  `lease_until < NOW() OR (lease_until IS NULL AND claimed_at < NOW() - 30min)`.
  Legacy jobs zonder lease blijven via claimed_at-pad werken; nieuwe jobs
  via lease_until.
- RETURNING uitgebreid met kind, sprint_run_id, branch.
- Bij stale FAILED SPRINT_IMPLEMENTATION: push branch (geen mark-ready,
  geen PR-promotie) zodat werk niet verloren gaat. Vul SprintRun.failure_reason
  met laatst-RUNNING execution voor diagnose.

Imports: getWorktreeRoot uit worktree-paths.js, pushBranchForJob uit push.js.

Tests: 31 files, 243 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Madhura68 2026-05-07 12:33:55 +02:00
parent de6bbd4edd
commit 35601e8e4b

View file

@ -15,7 +15,9 @@ const execFileP = promisify(execFile)
import { requireWriteAccess } from '../auth.js' import { requireWriteAccess } from '../auth.js'
import { toolJson, toolError, withToolErrors } from '../errors.js' import { toolJson, toolError, withToolErrors } from '../errors.js'
import { createWorktreeForJob } from '../git/worktree.js' import { createWorktreeForJob } from '../git/worktree.js'
import { getWorktreeRoot } from '../git/worktree-paths.js'
import { setupProductWorktrees, releaseLocksOnTerminal } from '../git/job-locks.js' import { setupProductWorktrees, releaseLocksOnTerminal } from '../git/job-locks.js'
import { pushBranchForJob } from '../git/push.js'
/** Parse `https://github.com/<owner>/<name>(.git)?` → `<name>`. */ /** Parse `https://github.com/<owner>/<name>(.git)?` → `<name>`. */
export function repoNameFromUrl(repoUrl: string | null | undefined): string | null { export function repoNameFromUrl(repoUrl: string | null | undefined): string | null {
@ -225,45 +227,96 @@ const inputSchema = z.object({
const STALE_ERROR_MSG = 'agent did not complete job within 2 attempts' const STALE_ERROR_MSG = 'agent did not complete job within 2 attempts'
export async function resetStaleClaimedJobs(userId: string): Promise<void> { export async function resetStaleClaimedJobs(userId: string): Promise<void> {
// Jobs that exceeded the retry limit → FAILED // PBI-50: lease-driven stale-detection. Jobs in CLAIMED of RUNNING met
const failedRows = await prisma.$queryRaw< // verlopen lease_until (default 5 min, verlengd door job_heartbeat) worden
Array<{ id: string; task_id: string; product_id: string }> // gereset. Legacy jobs zonder lease_until vallen terug op de oude
>` // claimed_at + 30-min-regel.
type StaleRow = {
id: string
task_id: string | null
product_id: string
kind: string
sprint_run_id: string | null
branch: string | null
}
const failedRows = await prisma.$queryRaw<StaleRow[]>`
UPDATE claude_jobs UPDATE claude_jobs
SET status = 'FAILED', SET status = 'FAILED',
finished_at = NOW(), finished_at = NOW(),
error = ${STALE_ERROR_MSG} error = ${STALE_ERROR_MSG}
WHERE user_id = ${userId} WHERE user_id = ${userId}
AND status = 'CLAIMED' AND status IN ('CLAIMED', 'RUNNING')
AND claimed_at < NOW() - INTERVAL '30 minutes'
AND retry_count >= 2 AND retry_count >= 2
RETURNING id, task_id, product_id AND (
lease_until < NOW()
OR (lease_until IS NULL AND claimed_at < NOW() - INTERVAL '30 minutes')
)
RETURNING id, task_id, product_id, kind::text AS kind, sprint_run_id, branch
` `
// Jobs under the retry limit → back to QUEUED, increment retry_count
const requeuedRows = await prisma.$queryRaw< const requeuedRows = await prisma.$queryRaw<
Array<{ id: string; task_id: string; product_id: string; retry_count: number }> (StaleRow & { retry_count: number })[]
>` >`
UPDATE claude_jobs UPDATE claude_jobs
SET status = 'QUEUED', SET status = 'QUEUED',
claimed_by_token_id = NULL, claimed_by_token_id = NULL,
claimed_at = NULL, claimed_at = NULL,
plan_snapshot = NULL, plan_snapshot = NULL,
lease_until = NULL,
retry_count = retry_count + 1 retry_count = retry_count + 1
WHERE user_id = ${userId} WHERE user_id = ${userId}
AND status = 'CLAIMED' AND status IN ('CLAIMED', 'RUNNING')
AND claimed_at < NOW() - INTERVAL '30 minutes'
AND retry_count < 2 AND retry_count < 2
RETURNING id, task_id, product_id, retry_count AND (
lease_until < NOW()
OR (lease_until IS NULL AND claimed_at < NOW() - INTERVAL '30 minutes')
)
RETURNING id, task_id, product_id, kind::text AS kind, sprint_run_id, branch, retry_count
` `
if (failedRows.length === 0 && requeuedRows.length === 0) return if (failedRows.length === 0 && requeuedRows.length === 0) return
// PBI-9: release any product-worktree locks held by these stale jobs. // PBI-9: release any product-worktree locks held by these stale jobs.
// No-op for jobs without registered locks (TASK_IMPLEMENTATION).
for (const j of failedRows) await releaseLocksOnTerminal(j.id) for (const j of failedRows) await releaseLocksOnTerminal(j.id)
for (const j of requeuedRows) await releaseLocksOnTerminal(j.id) for (const j of requeuedRows) await releaseLocksOnTerminal(j.id)
// PBI-50: voor stale FAILED SPRINT_IMPLEMENTATION jobs — push de branch
// zodat het werk niet verloren gaat (geen mark-ready / PR-promotie),
// en zet SprintRun.failure_reason met een verwijzing naar de laatst
// RUNNING execution voor diagnose.
for (const j of failedRows.filter((r) => r.kind === 'SPRINT_IMPLEMENTATION')) {
if (j.branch && j.product_id) {
const repoRoot = await resolveRepoRoot(j.product_id).catch(() => null)
if (repoRoot) {
const worktreeDir = getWorktreeRoot()
const worktreePath = path.join(worktreeDir, j.id)
try {
await pushBranchForJob({ worktreePath, branchName: j.branch })
} catch (err) {
console.warn(`[stale-reset] push failed for stale sprint-job ${j.id}:`, err)
}
}
}
if (j.sprint_run_id) {
const lastRunning = await prisma.sprintTaskExecution.findFirst({
where: { sprint_job_id: j.id, status: 'RUNNING' },
orderBy: { order: 'desc' },
select: { order: true, task_id: true },
})
const reasonSuffix = lastRunning
? `, last execution: order ${lastRunning.order} task ${lastRunning.task_id}`
: ''
await prisma.sprintRun.update({
where: { id: j.sprint_run_id },
data: {
status: 'FAILED',
failure_reason: `stale: lease verlopen${reasonSuffix}`,
},
})
}
}
// Notify UI via SSE for each transition (best-effort) // Notify UI via SSE for each transition (best-effort)
try { try {
const pg = new Client({ connectionString: process.env.DATABASE_URL }) const pg = new Client({ connectionString: process.env.DATABASE_URL })
@ -490,6 +543,177 @@ async function getFullJobContext(jobId: string) {
} }
} }
// PBI-50: SPRINT_IMPLEMENTATION — single-session sprint runner.
// Eén ClaudeJob per SprintRun handelt sequentieel alle TO_DO-tasks af.
// Bij claim: maak frozen scope-snapshot via SprintTaskExecution-rows,
// resolve worktree (verse branch of hergebruikt via previous_run_id),
// capture base_sha. Worker werkt uitsluitend op deze frozen snapshot.
if (job.kind === 'SPRINT_IMPLEMENTATION') {
if (!job.sprint_run_id) {
await rollbackClaim(job.id)
return null
}
const sprintRun = await prisma.sprintRun.findUnique({
where: { id: job.sprint_run_id },
include: {
sprint: {
include: {
product: true,
stories: {
where: { status: { not: 'DONE' } },
include: {
pbi: {
select: { id: true, code: true, title: true, priority: true, sort_order: true, status: true },
},
tasks: {
where: { status: 'TO_DO' },
orderBy: [{ priority: 'asc' }, { sort_order: 'asc' }],
},
},
orderBy: [{ priority: 'asc' }, { sort_order: 'asc' }],
},
},
},
},
})
if (!sprintRun) {
await rollbackClaim(job.id)
return null
}
const repoRoot = await resolveRepoRoot(sprintRun.sprint.product_id)
if (!repoRoot) {
await rollbackClaim(job.id)
return null
}
// Branch resolution: previous_run_id + branch → reuse; anders verse.
const isResume = !!(sprintRun.previous_run_id && sprintRun.branch)
const branchName = isResume
? sprintRun.branch!
: `feat/sprint-${job.sprint_run_id.slice(-8)}`
let worktreePath: string
let baseSha: string
try {
const wt = await createWorktreeForJob({
repoRoot,
jobId: job.id,
branchName,
reuseBranch: isResume,
})
worktreePath = wt.worktreePath
const { stdout: headSha } = await execFileP('git', ['rev-parse', 'HEAD'], {
cwd: worktreePath,
})
baseSha = headSha.trim()
} catch (err) {
console.warn(`[wait-for-job] sprint-worktree setup failed for ${job.id}:`, err)
await rollbackClaim(job.id)
return null
}
// Verzamel ordered tasks in flat list, behoud volgorde
const orderedTasks = sprintRun.sprint.stories.flatMap((s) =>
s.tasks.map((t) => ({ ...t, story_pbi_id: s.pbi.id })),
)
// Persist branch + base_sha + scope-snapshot in één transactie
await prisma.$transaction([
prisma.claudeJob.update({
where: { id: job.id },
data: { branch: branchName, base_sha: baseSha },
}),
prisma.sprintTaskExecution.createMany({
data: orderedTasks.map((t, idx) => ({
sprint_job_id: job.id,
task_id: t.id,
order: idx,
plan_snapshot: t.implementation_plan ?? '',
verify_required_snapshot: t.verify_required,
verify_only_snapshot: t.verify_only,
base_sha: idx === 0 ? baseSha : null,
status: 'PENDING' as const,
})),
}),
prisma.sprintRun.update({
where: { id: job.sprint_run_id },
data: { branch: branchName },
}),
])
// Lookup execution_ids in volgorde voor de response
const executions = await prisma.sprintTaskExecution.findMany({
where: { sprint_job_id: job.id },
orderBy: { order: 'asc' },
select: { id: true, task_id: true, order: true, base_sha: true },
})
const execIdByTaskId = new Map(executions.map((e) => [e.task_id, e.id]))
// Dedupe PBIs uit de stories (één PBI kan meerdere stories hebben)
const pbiMap = new Map<string, typeof sprintRun.sprint.stories[number]['pbi']>()
for (const s of sprintRun.sprint.stories) pbiMap.set(s.pbi.id, s.pbi)
return {
job_id: job.id,
kind: job.kind,
status: 'claimed',
sprint: {
id: sprintRun.sprint.id,
sprint_goal: sprintRun.sprint.sprint_goal,
status: sprintRun.sprint.status,
},
sprint_run: {
id: sprintRun.id,
pr_strategy: sprintRun.pr_strategy,
branch: branchName,
previous_run_id: sprintRun.previous_run_id,
},
product: {
id: sprintRun.sprint.product.id,
name: sprintRun.sprint.product.name,
repo_url: sprintRun.sprint.product.repo_url,
definition_of_done: sprintRun.sprint.product.definition_of_done,
auto_pr: sprintRun.sprint.product.auto_pr,
},
pbis: Array.from(pbiMap.values()).map((p) => ({
id: p.id,
code: p.code,
title: p.title,
priority: p.priority,
sort_order: p.sort_order,
status: p.status,
})),
stories: sprintRun.sprint.stories.map((s) => ({
id: s.id,
code: s.code,
title: s.title,
pbi_id: s.pbi_id,
priority: s.priority,
sort_order: s.sort_order,
status: s.status,
})),
task_executions: orderedTasks.map((t, idx) => ({
execution_id: execIdByTaskId.get(t.id)!,
task_id: t.id,
code: t.code,
title: t.title,
story_id: t.story_id,
order: idx,
plan_snapshot: t.implementation_plan ?? '',
verify_required: t.verify_required,
verify_only: t.verify_only,
base_sha: idx === 0 ? baseSha : null,
})),
worktree_path: worktreePath,
branch_name: branchName,
repo_url: sprintRun.sprint.product.repo_url,
base_sha: baseSha,
heartbeat_interval_seconds: 60,
}
}
// TASK_IMPLEMENTATION (default) — bestaande gedrag onaangetast. // TASK_IMPLEMENTATION (default) — bestaande gedrag onaangetast.
const { task } = job const { task } = job
if (!task) return null if (!task) return null