diff --git a/src/index.ts b/src/index.ts index d05900c..2938c70 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,6 +29,10 @@ import { registerUpdateIdeaPlanMdTool } from './tools/update-idea-plan-md.js' import { registerLogIdeaDecisionTool } from './tools/log-idea-decision.js' import { registerGetWorkerSettingsTool } from './tools/get-worker-settings.js' import { registerWorkerHeartbeatTool } from './tools/worker-heartbeat.js' +// PBI-50: SPRINT_IMPLEMENTATION-tools +import { registerVerifySprintTaskTool } from './tools/verify-sprint-task.js' +import { registerUpdateTaskExecutionTool } from './tools/update-task-execution.js' +import { registerJobHeartbeatTool } from './tools/job-heartbeat.js' import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js' import { getAuth } from './auth.js' import { registerWorker } from './presence/worker.js' @@ -92,6 +96,10 @@ async function main() { // M13: worker quota-gate tools registerGetWorkerSettingsTool(server) registerWorkerHeartbeatTool(server) + // PBI-50: SPRINT_IMPLEMENTATION-tools + registerVerifySprintTaskTool(server) + registerUpdateTaskExecutionTool(server) + registerJobHeartbeatTool(server) registerImplementNextStoryPrompt(server) // Presence bootstrap MUST run before server.connect — the stdio transport diff --git a/src/lib/tasks-status-update.ts b/src/lib/tasks-status-update.ts index 3549f3d..64e2ac6 100644 --- a/src/lib/tasks-status-update.ts +++ b/src/lib/tasks-status-update.ts @@ -38,6 +38,11 @@ export async function propagateStatusUpwards( taskId: string, newStatus: TaskStatus, client?: Prisma.TransactionClient, + // PBI-50: optionele expliciete sprint_run_id voor SPRINT_IMPLEMENTATION + // (waar geen ClaudeJob.task_id-koppeling bestaat). Wanneer afwezig valt + // de helper terug op de lookup via ClaudeJob.task_id, met als laatste + // fallback Story → Sprint → SprintRun.findFirst({ status: active }). + sprintRunId?: string, ): Promise { const run = async (tx: Prisma.TransactionClient): Promise => { const task = await tx.task.update({ @@ -151,18 +156,43 @@ export async function propagateStatusUpwards( } } - // SprintRun herevalueren — via ClaudeJob.sprint_run_id van deze task + // SprintRun herevalueren. Resolve sprint_run_id in volgorde: + // 1. Expliciete sprintRunId-arg (PBI-50: SPRINT_IMPLEMENTATION-pad). + // 2. ClaudeJob.task_id-lookup (PER_TASK-flow). + // 3. Story → Sprint → SprintRun.findFirst({ status: active }) (geen + // task-job, bv. handmatige task-statuswijziging via UI). let sprintRunChanged = false if (nextSprintStatus === 'FAILED' || nextSprintStatus === 'COMPLETED') { - const job = await tx.claudeJob.findFirst({ - where: { task_id: taskId, sprint_run_id: { not: null } }, - orderBy: { created_at: 'desc' }, - select: { id: true, sprint_run_id: true }, - }) + let resolvedRunId: string | null = sprintRunId ?? null + let cancelExceptJobId: string | null = null - if (job?.sprint_run_id) { + if (!resolvedRunId) { + const job = await tx.claudeJob.findFirst({ + where: { task_id: taskId, sprint_run_id: { not: null } }, + orderBy: { created_at: 'desc' }, + select: { id: true, sprint_run_id: true }, + }) + if (job?.sprint_run_id) { + resolvedRunId = job.sprint_run_id + cancelExceptJobId = job.id + } + } + + if (!resolvedRunId && story.sprint_id) { + const activeRun = await tx.sprintRun.findFirst({ + where: { + sprint_id: story.sprint_id, + status: { in: ['QUEUED', 'RUNNING', 'PAUSED'] }, + }, + orderBy: { created_at: 'desc' }, + select: { id: true }, + }) + if (activeRun) resolvedRunId = activeRun.id + } + + if (resolvedRunId) { const sprintRun = await tx.sprintRun.findUnique({ - where: { id: job.sprint_run_id }, + where: { id: resolvedRunId }, select: { id: true, status: true }, }) if ( @@ -180,11 +210,16 @@ export async function propagateStatusUpwards( failed_task_id: taskId, }, }) + // Cancel sibling-jobs binnen dezelfde SprintRun behalve de + // huidige task-job (als die er is). Voor SPRINT_IMPLEMENTATION + // is cancelExceptJobId null en hebben we geen siblings om te + // cancellen — de SPRINT-job zelf blijft actief en de worker + // detecteert dit via job_heartbeat. await tx.claudeJob.updateMany({ where: { sprint_run_id: sprintRun.id, status: { in: ['QUEUED', 'CLAIMED', 'RUNNING'] }, - id: { not: job.id }, + ...(cancelExceptJobId ? { id: { not: cancelExceptJobId } } : {}), }, data: { status: 'CANCELLED', @@ -230,14 +265,16 @@ export interface UpdateTaskStatusResult { task: PropagationResult['task'] storyStatusChange: StoryStatusChange storyId: string + sprintRunChanged: boolean } export async function updateTaskStatusWithStoryPromotion( taskId: string, newStatus: TaskStatus, client?: Prisma.TransactionClient, + sprintRunId?: string, ): Promise { - const result = await propagateStatusUpwards(taskId, newStatus, client) + const result = await propagateStatusUpwards(taskId, newStatus, client, sprintRunId) let storyStatusChange: StoryStatusChange = null if (result.storyChanged) { storyStatusChange = newStatus === 'DONE' ? 'promoted' : 'demoted' @@ -246,5 +283,6 @@ export async function updateTaskStatusWithStoryPromotion( task: result.task, storyStatusChange, storyId: result.storyId, + sprintRunChanged: result.sprintRunChanged, } } diff --git a/src/tools/job-heartbeat.ts b/src/tools/job-heartbeat.ts new file mode 100644 index 0000000..36c42a2 --- /dev/null +++ b/src/tools/job-heartbeat.ts @@ -0,0 +1,81 @@ +// PBI-50 F3-T3: job_heartbeat +// +// Verlengt ClaudeJob.lease_until met 5 min zodat resetStaleClaimedJobs een +// long-running job (bv. SPRINT_IMPLEMENTATION over 30+ min) niet ten onrechte +// als stale markt. Worker draait een achtergrond-loop elke 60s. +// +// Voor SPRINT-jobs: respons bevat sprint_run_status zodat de worker zijn +// loop kan breken bij ≠ RUNNING (bv. UI-side cancel of MERGE_CONFLICT-pause). + +import { z } from 'zod' +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' +import { prisma } from '../prisma.js' +import { requireWriteAccess } from '../auth.js' +import { toolError, toolJson, withToolErrors } from '../errors.js' + +const inputSchema = z.object({ + job_id: z.string().min(1), +}) + +export function registerJobHeartbeatTool(server: McpServer) { + server.registerTool( + 'job_heartbeat', + { + title: 'Job heartbeat', + description: + 'Extend the lease on a CLAIMED/RUNNING job by 5 minutes. Token must own the job. ' + + 'For SPRINT_IMPLEMENTATION jobs: response includes sprint_run_status so the worker ' + + 'can break its task-loop on UI-side cancel/pause without an extra query. ' + + 'Worker should call this every ~60s during long-running batches. ' + + 'Forbidden for demo accounts.', + inputSchema, + }, + async ({ job_id }) => + withToolErrors(async () => { + const auth = await requireWriteAccess() + + // Atomic conditional UPDATE so a non-owner / non-active job is rejected + // without a separate read. + const updated = await prisma.$queryRaw< + Array<{ id: string; lease_until: Date; kind: string; sprint_run_id: string | null }> + >` + UPDATE claude_jobs + SET lease_until = NOW() + INTERVAL '5 minutes' + WHERE id = ${job_id} + AND claimed_by_token_id = ${auth.tokenId} + AND status IN ('CLAIMED', 'RUNNING') + RETURNING id, lease_until, kind::text AS kind, sprint_run_id + ` + if (updated.length === 0) { + return toolError( + `Job ${job_id} not found, not claimed by your token, or in terminal state`, + ) + } + const row = updated[0] + + let sprint_run_status: string | null = null + let sprint_run_pause_reason: string | null = null + if (row.kind === 'SPRINT_IMPLEMENTATION' && row.sprint_run_id) { + const sprintRun = await prisma.sprintRun.findUnique({ + where: { id: row.sprint_run_id }, + select: { status: true, pause_context: true }, + }) + sprint_run_status = sprintRun?.status ?? null + // Extract pause_reason from pause_context Json (best-effort) + const ctx = sprintRun?.pause_context as + | { pause_reason?: string } + | null + | undefined + sprint_run_pause_reason = ctx?.pause_reason ?? null + } + + return toolJson({ + ok: true, + job_id: row.id, + lease_until: row.lease_until.toISOString(), + sprint_run_status, + sprint_run_pause_reason, + }) + }), + ) +} diff --git a/src/tools/update-task-execution.ts b/src/tools/update-task-execution.ts new file mode 100644 index 0000000..8b3213a --- /dev/null +++ b/src/tools/update-task-execution.ts @@ -0,0 +1,110 @@ +// PBI-50 F3-T2: update_task_execution +// +// SPRINT_IMPLEMENTATION-flow lifecycle-tool. Worker roept dit aan voor elke +// task in de batch om de SprintTaskExecution-row te muteren: +// PENDING → RUNNING → DONE/FAILED/SKIPPED +// Idempotent: dezelfde call kan veilig herhaald worden. + +import { z } from 'zod' +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' +import { prisma } from '../prisma.js' +import { requireWriteAccess } from '../auth.js' +import { toolError, toolJson, withToolErrors } from '../errors.js' + +const inputSchema = z.object({ + execution_id: z.string().min(1), + status: z.enum(['PENDING', 'RUNNING', 'DONE', 'FAILED', 'SKIPPED']), + base_sha: z.string().optional(), + head_sha: z.string().optional(), + skip_reason: z.string().max(2000).optional(), +}) + +export function registerUpdateTaskExecutionTool(server: McpServer) { + server.registerTool( + 'update_task_execution', + { + title: 'Update SprintTaskExecution status', + description: + 'Mutate a SprintTaskExecution row in a SPRINT_IMPLEMENTATION batch. ' + + 'Status: PENDING|RUNNING|DONE|FAILED|SKIPPED. Worker calls this for each ' + + 'task transition. Token must own the parent SPRINT_IMPLEMENTATION ClaudeJob. ' + + 'Idempotent — safe to retry. Schrijft started_at (RUNNING) en finished_at ' + + '(DONE/FAILED/SKIPPED). Forbidden for demo accounts.', + inputSchema, + }, + async ({ execution_id, status, base_sha, head_sha, skip_reason }) => + withToolErrors(async () => { + const auth = await requireWriteAccess() + + const execution = await prisma.sprintTaskExecution.findUnique({ + where: { id: execution_id }, + select: { + id: true, + sprint_job_id: true, + sprint_job: { + select: { claimed_by_token_id: true, status: true, kind: true }, + }, + }, + }) + if (!execution) { + return toolError(`SprintTaskExecution ${execution_id} not found`) + } + if (execution.sprint_job.kind !== 'SPRINT_IMPLEMENTATION') { + return toolError( + `Execution ${execution_id} hangs at job kind ${execution.sprint_job.kind}, expected SPRINT_IMPLEMENTATION`, + ) + } + if (execution.sprint_job.claimed_by_token_id !== auth.tokenId) { + return toolError( + `Forbidden: token does not own SPRINT_IMPLEMENTATION job for execution ${execution_id}`, + ) + } + if ( + execution.sprint_job.status !== 'CLAIMED' && + execution.sprint_job.status !== 'RUNNING' + ) { + return toolError( + `Sprint job is in terminal state ${execution.sprint_job.status}`, + ) + } + + const now = new Date() + const updated = await prisma.sprintTaskExecution.update({ + where: { id: execution_id }, + data: { + status, + ...(base_sha !== undefined ? { base_sha } : {}), + ...(head_sha !== undefined ? { head_sha } : {}), + ...(skip_reason !== undefined ? { skip_reason } : {}), + ...(status === 'RUNNING' ? { started_at: now } : {}), + ...(status === 'DONE' || status === 'FAILED' || status === 'SKIPPED' + ? { finished_at: now } + : {}), + }, + select: { + id: true, + status: true, + base_sha: true, + head_sha: true, + verify_result: true, + verify_summary: true, + skip_reason: true, + started_at: true, + finished_at: true, + }, + }) + + return toolJson({ + execution_id: updated.id, + status: updated.status, + base_sha: updated.base_sha, + head_sha: updated.head_sha, + verify_result: updated.verify_result, + verify_summary: updated.verify_summary, + skip_reason: updated.skip_reason, + started_at: updated.started_at?.toISOString() ?? null, + finished_at: updated.finished_at?.toISOString() ?? null, + }) + }), + ) +} diff --git a/src/tools/update-task-status.ts b/src/tools/update-task-status.ts index d3756ce..8ac8463 100644 --- a/src/tools/update-task-status.ts +++ b/src/tools/update-task-status.ts @@ -1,5 +1,6 @@ import { z } from 'zod' import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' +import { prisma } from '../prisma.js' import { requireWriteAccess } from '../auth.js' import { userCanAccessTask } from '../access.js' import { toolError, toolJson, withToolErrors } from '../errors.js' @@ -9,6 +10,10 @@ import { updateTaskStatusWithStoryPromotion } from '../lib/tasks-status-update.j const inputSchema = z.object({ task_id: z.string().min(1), status: z.enum(TASK_STATUS_API_VALUES as [string, ...string[]]), + // PBI-50: optionele sprint_run_id voor SPRINT_IMPLEMENTATION-flow. + // Wanneer aanwezig: server valideert dat task in deze sprint zit, run + // actief is, en de huidige token een ClaudeJob in deze run heeft geclaimt. + sprint_run_id: z.string().min(1).optional(), }) export function registerUpdateTaskStatusTool(server: McpServer) { @@ -17,11 +22,14 @@ export function registerUpdateTaskStatusTool(server: McpServer) { { title: 'Update task status', description: - 'Set the status of a task. Allowed values: todo, in_progress, review, done. ' + + 'Set the status of a task. Allowed values: todo, in_progress, review, done, failed. ' + + 'Optional sprint_run_id binds the update to a SPRINT_IMPLEMENTATION run for ' + + 'cascade-propagation; the server validates that the task belongs to the sprint ' + + 'and that the calling token has claimed a job in that run. ' + 'Forbidden for demo accounts.', inputSchema, }, - async ({ task_id, status }) => + async ({ task_id, status, sprint_run_id }) => withToolErrors(async () => { const auth = await requireWriteAccess() const dbStatus = taskStatusFromApi(status) @@ -31,15 +39,74 @@ export function registerUpdateTaskStatusTool(server: McpServer) { if (!(await userCanAccessTask(task_id, auth.userId))) { return toolError(`Task ${task_id} not found or not accessible`) } - const { task, storyStatusChange } = await updateTaskStatusWithStoryPromotion( - task_id, - dbStatus, - ) + + // PBI-50: validate explicit sprint_run_id binding. + if (sprint_run_id) { + const sprintRun = await prisma.sprintRun.findUnique({ + where: { id: sprint_run_id }, + select: { id: true, status: true, sprint_id: true }, + }) + if (!sprintRun) { + return toolError(`SprintRun ${sprint_run_id} not found`) + } + if ( + sprintRun.status !== 'QUEUED' && + sprintRun.status !== 'RUNNING' && + sprintRun.status !== 'PAUSED' + ) { + return toolError( + `SprintRun ${sprint_run_id} is in terminal state ${sprintRun.status}; cannot update task status against it`, + ) + } + + // Task moet in deze sprint zitten + const task = await prisma.task.findUnique({ + where: { id: task_id }, + select: { story: { select: { sprint_id: true } } }, + }) + if (!task || task.story.sprint_id !== sprintRun.sprint_id) { + return toolError( + `Task ${task_id} is not in sprint ${sprintRun.sprint_id} (sprint_run ${sprint_run_id})`, + ) + } + + // Token-coupling: huidige token moet een actieve ClaudeJob in deze + // SprintRun hebben geclaimt (typisch de SPRINT_IMPLEMENTATION-job). + const tokenJob = await prisma.claudeJob.findFirst({ + where: { + sprint_run_id, + claimed_by_token_id: auth.tokenId, + status: { in: ['CLAIMED', 'RUNNING'] }, + }, + select: { id: true }, + }) + if (!tokenJob) { + return toolError( + `Forbidden: current token has no active claim in sprint_run ${sprint_run_id}`, + ) + } + } + + const { task, storyStatusChange, sprintRunChanged } = + await updateTaskStatusWithStoryPromotion(task_id, dbStatus, undefined, sprint_run_id) + + // Voor SPRINT-flow: stuur expliciete sprint_run_status mee zodat + // worker zijn loop kan breken bij FAILED/PAUSED zonder extra query. + let sprintRunStatusChange: string | null = null + if (sprintRunChanged && sprint_run_id) { + const updated = await prisma.sprintRun.findUnique({ + where: { id: sprint_run_id }, + select: { status: true }, + }) + sprintRunStatusChange = updated?.status ?? null + } + return toolJson({ id: task.id, status: taskStatusToApi(task.status), implementation_plan: task.implementation_plan, story_status_change: storyStatusChange, + sprint_run_status_change: sprintRunStatusChange, }) }), ) diff --git a/src/tools/verify-sprint-task.ts b/src/tools/verify-sprint-task.ts new file mode 100644 index 0000000..fbd62d2 --- /dev/null +++ b/src/tools/verify-sprint-task.ts @@ -0,0 +1,151 @@ +// PBI-50 F3-T1: verify_sprint_task +// +// Execution-aware verify-tool voor SPRINT_IMPLEMENTATION-flow. +// Verschilt van verify_task_against_plan in: +// - input via execution_id (niet task_id) +// - base_sha komt uit SprintTaskExecution.base_sha; voor task[1..N] zonder +// base_sha vult de tool dynamisch met head_sha van vorige DONE-execution +// - plan_snapshot komt uit execution.plan_snapshot (frozen op claim-tijd) +// - resultaat opgeslagen op execution-row, niet op ClaudeJob.verify_result +// - response geeft allowed_for_done direct mee + +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' +import { z } from 'zod' +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' +import { prisma } from '../prisma.js' +import { requireWriteAccess } from '../auth.js' +import { toolError, toolJson, withToolErrors } from '../errors.js' +import { classifyDiffAgainstPlan } from '../verify/classify.js' +import { checkVerifyGate } from './update-job-status.js' + +const exec = promisify(execFile) + +const inputSchema = z.object({ + execution_id: z.string().min(1), + worktree_path: z.string().min(1), + summary: z.string().max(2000).optional(), +}) + +export function registerVerifySprintTaskTool(server: McpServer) { + server.registerTool( + 'verify_sprint_task', + { + title: 'Verify SprintTaskExecution against frozen plan', + description: + 'Run `git diff ...HEAD` in the worktree and classify against the ' + + 'frozen plan_snapshot of this SprintTaskExecution. Returns ALIGNED|PARTIAL|EMPTY|' + + 'DIVERGENT plus reasoning + allowed_for_done (computed via the standard verify-gate ' + + 'with the execution\'s frozen verify_required/verify_only). ' + + 'For task[1..N] zonder base_sha vult de tool die in op basis van de head_sha van de ' + + 'vorige DONE-execution. Optional summary is opgeslagen voor PARTIAL/DIVERGENT-rationale ' + + 'en gebruikt door de gate. ' + + 'Call this BEFORE update_task_execution(DONE) for each task in the sprint batch. ' + + 'Forbidden for demo accounts.', + inputSchema, + annotations: { readOnlyHint: false }, + }, + async ({ execution_id, worktree_path, summary }) => + withToolErrors(async () => { + const auth = await requireWriteAccess() + + const execution = await prisma.sprintTaskExecution.findUnique({ + where: { id: execution_id }, + select: { + id: true, + sprint_job_id: true, + order: true, + base_sha: true, + plan_snapshot: true, + verify_required_snapshot: true, + verify_only_snapshot: true, + sprint_job: { + select: { claimed_by_token_id: true, status: true, kind: true }, + }, + }, + }) + if (!execution) { + return toolError(`SprintTaskExecution ${execution_id} not found`) + } + if (execution.sprint_job.kind !== 'SPRINT_IMPLEMENTATION') { + return toolError( + `Execution ${execution_id} hangs at job kind ${execution.sprint_job.kind}, expected SPRINT_IMPLEMENTATION`, + ) + } + if (execution.sprint_job.claimed_by_token_id !== auth.tokenId) { + return toolError( + `Forbidden: token does not own SPRINT_IMPLEMENTATION job for execution ${execution_id}`, + ) + } + + // Resolve base_sha. Voor task[0] is dit gevuld bij claim. Voor + // task[1..N] wordt dit dynamisch gevuld op basis van de vorige + // DONE-execution's head_sha. Persist na fill zodat herhaalde calls + // dezelfde base gebruiken. + let baseSha = execution.base_sha + if (!baseSha) { + const previousDone = await prisma.sprintTaskExecution.findFirst({ + where: { + sprint_job_id: execution.sprint_job_id, + order: { lt: execution.order }, + status: 'DONE', + head_sha: { not: null }, + }, + orderBy: { order: 'desc' }, + select: { head_sha: true }, + }) + if (!previousDone?.head_sha) { + return toolError( + `MISSING_BASE_SHA: execution ${execution_id} has no base_sha and no previous DONE-execution with head_sha. Did you skip update_task_execution(DONE) on a prior task?`, + ) + } + baseSha = previousDone.head_sha + await prisma.sprintTaskExecution.update({ + where: { id: execution_id }, + data: { base_sha: baseSha }, + }) + } + + let diff: string + try { + const { stdout } = await exec('git', ['diff', `${baseSha}...HEAD`], { + cwd: worktree_path, + }) + diff = stdout + } catch (err) { + return toolError( + `git diff failed in worktree (${worktree_path}): ${(err as Error).message ?? 'unknown error'}`, + ) + } + + const { result, reasoning } = classifyDiffAgainstPlan({ + diff, + plan: execution.plan_snapshot, + }) + + await prisma.sprintTaskExecution.update({ + where: { id: execution_id }, + data: { + verify_result: result, + ...(summary !== undefined ? { verify_summary: summary } : {}), + }, + }) + + const gate = checkVerifyGate( + result, + execution.verify_only_snapshot, + execution.verify_required_snapshot, + summary, + ) + + return toolJson({ + execution_id: execution.id, + result: result.toLowerCase() as 'aligned' | 'partial' | 'empty' | 'divergent', + reasoning, + base_sha: baseSha, + allowed_for_done: gate.allowed, + reason: gate.allowed ? null : gate.error, + }) + }), + ) +}