Merge pull request #4 from madhura68/feat/plan-snapshot-verify-tool

feat: plan_snapshot capture in wait_for_job + verify_task_against_plan tool
This commit is contained in:
Janpeter Visser 2026-04-30 19:42:33 +02:00 committed by GitHub
commit c2ca29d52b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 678 additions and 20 deletions

View file

@ -25,9 +25,52 @@ activity and create todos via native tool calls instead of curl.
| `get_question_answer` | Fetch the current status + answer of a previously-asked question | n/a |
| `list_open_questions` | List own open/answered questions, most recent first (max 50) | n/a |
| `cancel_question` | Cancel an own open question (asker-only) | no |
| `wait_for_job` | Block until a QUEUED ClaudeJob is available, claim it atomically, return full task context with frozen `plan_snapshot` | no |
| `update_job_status` | Report job transition to `running`, `done`, or `failed`; triggers SSE event to UI | no |
| `verify_task_against_plan` | Compare frozen `plan_snapshot` against current plan + story logs + commits; returns per-AC ✓/✗/? heuristic and drift-score | yes (read-only) |
Demo accounts may read but writes return `PERMISSION_DENIED`.
### verify_task_against_plan
Compares the immutable snapshot captured at claim time against the current state of the work. Useful at the end of a job to self-assess completeness.
**Input**
```json
{ "task_id": "cmolqlqvh0023q..." }
```
**Output**
```
# Verify task: Prisma-schema + migratie in Scrum4Me (cmolqlqvh...)
## Plan
- Snapshot: - Bewerk prisma/schema.prisma:...
- Current: - Bewerk prisma/schema.prisma:...
- Edited onderweg: **no**
## AC-checks (5/6 ✓ — drift-score 83%)
- ✓ Scrum4Me prisma/schema.prisma: nieuw veld plan_snapshot...
- ✓ Migratie aangemaakt en getest
- ✗ vendor/scrum4me submodule in scrum4me-mcp gebumpt
## Realisatie
- 1 log_implementation-entry
- commit `a3af2dd` — feat: add plan_snapshot field to ClaudeJob schema
---
⚠️ Heuristiek-rapport — handmatige PR-review blijft nodig
```
**Beperkingen heuristiek**
- Zoekt op sleutelwoorden (filenames, camelCase-identifiers, lange woorden) — geen semantisch begrip
- AC's die alleen over externe verificatie gaan (deployment, user-test) scoren altijd ✗ zonder extra log-entries
- Plan_snapshot is NULL voor jobs die zijn geclaimed vóór versie met snapshot-feature — rapport meldt "no baseline"
- Gebruik het rapport als startpunt, niet als definitief oordeel; PR-review blijft leidend
## Prompts
- `implement_next_story` — full workflow: fetch context, log plan, walk

View file

@ -0,0 +1,198 @@
import { describe, it, expect } from 'vitest'
import {
parseAcceptanceCriteria,
extractKeywords,
checkACStatus,
computeDriftScore,
lineDiff,
buildVerifyResult,
renderMarkdownReport,
} from '../src/lib/verify-plan.js'
describe('parseAcceptanceCriteria', () => {
it('returns empty array for null', () => {
expect(parseAcceptanceCriteria(null)).toEqual([])
})
it('parses dash-prefixed lines', () => {
const text = '- First AC\n- Second AC\n- Third AC'
expect(parseAcceptanceCriteria(text)).toEqual(['First AC', 'Second AC', 'Third AC'])
})
it('strips numbered prefixes', () => {
const text = '1. Do this\n2. Do that'
expect(parseAcceptanceCriteria(text)).toEqual(['Do this', 'Do that'])
})
it('ignores blank lines', () => {
const text = '- AC1\n\n- AC2'
expect(parseAcceptanceCriteria(text)).toEqual(['AC1', 'AC2'])
})
})
describe('extractKeywords', () => {
it('extracts filenames with extensions', () => {
const kws = extractKeywords('update wait-for-job.ts and verify-plan.ts')
expect(kws).toContain('wait-for-job.ts')
expect(kws).toContain('verify-plan.ts')
})
it('extracts long words', () => {
const kws = extractKeywords('implementation snapshot detection')
expect(kws).toContain('implementation')
expect(kws).toContain('snapshot')
expect(kws).toContain('detection')
})
it('returns unique keywords', () => {
const kws = extractKeywords('implementation implementation')
const count = kws.filter((k) => k === 'implementation').length
expect(count).toBe(1)
})
})
describe('checkACStatus', () => {
it('returns ✓ when majority of keywords found in corpus', () => {
const ac = 'plan_snapshot field added to ClaudeJob'
const corpus = 'added plan_snapshot field to claudejob schema migration'
expect(checkACStatus(ac, corpus)).toBe('✓')
})
it('returns ✗ when no keywords found', () => {
const ac = 'zxqwerty obscure feature nobody implemented'
const corpus = 'completely different log content about other things'
expect(checkACStatus(ac, corpus)).toBe('✗')
})
it('returns ? when partial match', () => {
const ac = 'snapshot captured at claim time with plan_snapshot field'
const corpus = 'snapshot written to database'
const result = checkACStatus(ac, corpus)
expect(['?', '✓']).toContain(result)
})
it('returns ? for very short AC with no extractable keywords', () => {
expect(checkACStatus('Ok', 'anything')).toBe('?')
})
})
describe('computeDriftScore', () => {
it('returns 100 when all pass', () => {
const results = [{ status: '✓' as const }, { status: '✓' as const }]
expect(computeDriftScore(results)).toBe(100)
})
it('returns 0 when all fail', () => {
const results = [{ status: '✗' as const }, { status: '✗' as const }]
expect(computeDriftScore(results)).toBe(0)
})
it('returns 50 for half passing', () => {
const results = [{ status: '✓' as const }, { status: '✗' as const }]
expect(computeDriftScore(results)).toBe(50)
})
it('returns 0 for empty list', () => {
expect(computeDriftScore([])).toBe(0)
})
})
describe('lineDiff', () => {
it('returns null when strings are identical', () => {
expect(lineDiff('line1\nline2', 'line1\nline2')).toBeNull()
})
it('shows added lines with +', () => {
const diff = lineDiff('line1', 'line1\nline2')
expect(diff).toContain('+ line2')
})
it('shows removed lines with -', () => {
const diff = lineDiff('line1\nline2', 'line1')
expect(diff).toContain('- line2')
})
it('shows changed lines as remove+add pair', () => {
const diff = lineDiff('old line', 'new line')
expect(diff).toContain('- old line')
expect(diff).toContain('+ new line')
})
})
describe('buildVerifyResult + renderMarkdownReport', () => {
it('scenario: plan unchanged, all ACs matched in logs — 100% drift score', () => {
const plan = 'Add plan_snapshot field to ClaudeJob schema'
const result = buildVerifyResult({
taskId: 'task-1',
taskTitle: 'Prisma migration',
planSnapshot: plan,
currentPlan: plan,
acceptanceCriteriaText: '- plan_snapshot field added\n- migration created',
implementationLogs: ['Added plan_snapshot field, created migration file for claudejob'],
commits: [{ hash: 'abc123', message: 'feat: add plan_snapshot to claudejob schema' }],
})
expect(result.planEdited).toBe(false)
expect(result.planDiff).toBeNull()
expect(result.hasBaseline).toBe(true)
expect(result.driftScore).toBeGreaterThanOrEqual(50)
const report = renderMarkdownReport(result)
expect(report).toContain('Edited onderweg: **no**')
expect(report).toContain('drift-score')
})
it('scenario: plan edited onderweg — planEdited=true, diff in output', () => {
const result = buildVerifyResult({
taskId: 'task-2',
taskTitle: 'Wait for job update',
planSnapshot: 'Original plan\nStep 1',
currentPlan: 'Original plan\nStep 1 revised\nStep 2 added',
acceptanceCriteriaText: null,
implementationLogs: [],
commits: [],
})
expect(result.planEdited).toBe(true)
expect(result.planDiff).not.toBeNull()
expect(result.planDiff).toContain('- Step 1')
expect(result.planDiff).toContain('+ Step 1 revised')
const report = renderMarkdownReport(result)
expect(report).toContain('Edited onderweg: **yes**')
expect(report).toContain('```diff')
})
it('scenario: AC without match in logs → ✗', () => {
const result = buildVerifyResult({
taskId: 'task-3',
taskTitle: 'Unimplemented feature',
planSnapshot: 'some plan',
currentPlan: 'some plan',
acceptanceCriteriaText: '- zxcvbnm_completely_missing_feature deployed',
implementationLogs: ['unrelated work done here'],
commits: [],
})
expect(result.acceptanceCriteria[0].status).toBe('✗')
expect(result.driftScore).toBe(0)
})
it('scenario: stale claim (snapshot null) → no baseline in report', () => {
const result = buildVerifyResult({
taskId: 'task-4',
taskTitle: 'Old job',
planSnapshot: null,
currentPlan: 'current plan',
acceptanceCriteriaText: '- something done',
implementationLogs: ['something done here'],
commits: [],
})
expect(result.hasBaseline).toBe(false)
expect(result.planEdited).toBe(false)
const report = renderMarkdownReport(result)
expect(report).toContain('no baseline')
})
})

View file

@ -0,0 +1,134 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
vi.mock('../src/prisma.js', () => ({
prisma: {
$executeRaw: vi.fn(),
$transaction: vi.fn(),
},
}))
import { prisma } from '../src/prisma.js'
import { resetStaleClaimedJobs, tryClaimJob } from '../src/tools/wait-for-job.js'
const mockPrisma = prisma as unknown as {
$executeRaw: ReturnType<typeof vi.fn>
$transaction: ReturnType<typeof vi.fn>
}
beforeEach(() => {
vi.clearAllMocks()
})
describe('resetStaleClaimedJobs', () => {
it('resets plan_snapshot to NULL when resetting stale claimed jobs', async () => {
mockPrisma.$executeRaw.mockResolvedValue(0)
await resetStaleClaimedJobs('user-1')
expect(mockPrisma.$executeRaw).toHaveBeenCalledOnce()
// Verify the template literal includes plan_snapshot = NULL
const call = mockPrisma.$executeRaw.mock.calls[0]
const sqlParts: string[] = call[0]
const fullSql = sqlParts.join('')
expect(fullSql).toContain('plan_snapshot = NULL')
expect(fullSql).toContain("status = 'QUEUED'")
expect(fullSql).toContain('claimed_at < NOW()')
})
})
describe('tryClaimJob', () => {
it('writes plan_snapshot from task.implementation_plan when claiming a job', async () => {
const jobId = 'job-123'
const implementationPlan = 'Step 1: Do the thing\nStep 2: Done'
mockPrisma.$transaction.mockImplementation(async (fn: (tx: typeof prisma) => Promise<unknown>) => {
const mockTx = {
$queryRaw: vi.fn().mockResolvedValue([{ id: jobId, implementation_plan: implementationPlan }]),
$executeRaw: vi.fn().mockResolvedValue(1),
}
return fn(mockTx as unknown as typeof prisma)
})
const result = await tryClaimJob('user-1', 'token-1')
expect(result).toBe(jobId)
// Verify the transaction was called and the UPDATE included plan_snapshot
expect(mockPrisma.$transaction).toHaveBeenCalledOnce()
const txFn = mockPrisma.$transaction.mock.calls[0][0]
const capturedTx = {
$queryRaw: vi.fn().mockResolvedValue([{ id: jobId, implementation_plan: implementationPlan }]),
$executeRaw: vi.fn().mockResolvedValue(1),
}
await txFn(capturedTx as unknown as typeof prisma)
const updateCall = capturedTx.$executeRaw.mock.calls[0]
const sqlParts: string[] = updateCall[0]
const fullSql = sqlParts.join('')
expect(fullSql).toContain('plan_snapshot')
expect(fullSql).toContain("status = 'CLAIMED'")
})
it('uses empty string as snapshot when task has no implementation_plan', async () => {
const jobId = 'job-456'
mockPrisma.$transaction.mockImplementation(async (fn: (tx: typeof prisma) => Promise<unknown>) => {
const mockTx = {
$queryRaw: vi.fn().mockResolvedValue([{ id: jobId, implementation_plan: null }]),
$executeRaw: vi.fn().mockResolvedValue(1),
}
return fn(mockTx as unknown as typeof prisma)
})
const result = await tryClaimJob('user-1', 'token-1')
expect(result).toBe(jobId)
// Verify the snapshot value passed is '' (empty string, not null)
const capturedTx = {
$queryRaw: vi.fn().mockResolvedValue([{ id: jobId, implementation_plan: null }]),
$executeRaw: vi.fn().mockResolvedValue(1),
}
const txFn = mockPrisma.$transaction.mock.calls[0][0]
await txFn(capturedTx as unknown as typeof prisma)
const updateCall = capturedTx.$executeRaw.mock.calls[0]
// Template literal params: [0]=sql parts, [1]=tokenId, [2]=snapshot, [3]=jobId
expect(updateCall[2]).toBe('')
})
it('returns null when no QUEUED job is available', async () => {
mockPrisma.$transaction.mockImplementation(async (fn: (tx: typeof prisma) => Promise<unknown>) => {
const mockTx = {
$queryRaw: vi.fn().mockResolvedValue([]),
$executeRaw: vi.fn(),
}
return fn(mockTx as unknown as typeof prisma)
})
const result = await tryClaimJob('user-1', 'token-1')
expect(result).toBeNull()
})
it('scopes to product_id when provided', async () => {
mockPrisma.$transaction.mockImplementation(async (fn: (tx: typeof prisma) => Promise<unknown>) => {
const mockTx = {
$queryRaw: vi.fn().mockResolvedValue([]),
$executeRaw: vi.fn(),
}
return fn(mockTx as unknown as typeof prisma)
})
await tryClaimJob('user-1', 'token-1', 'product-1')
const capturedTx = {
$queryRaw: vi.fn().mockResolvedValue([]),
$executeRaw: vi.fn(),
}
const txFn = mockPrisma.$transaction.mock.calls[0][0]
await txFn(capturedTx as unknown as typeof prisma)
const queryCall = capturedTx.$queryRaw.mock.calls[0]
// product_id should be passed as a parameter
expect(queryCall).toContain('product-1')
})
})

View file

@ -256,6 +256,7 @@ model ClaudeJob {
claimed_at DateTime?
started_at DateTime?
finished_at DateTime?
plan_snapshot String?
branch String?
summary String?
error String?

View file

@ -19,6 +19,7 @@ import { registerListOpenQuestionsTool } from './tools/list-open-questions.js'
import { registerCancelQuestionTool } from './tools/cancel-question.js'
import { registerWaitForJobTool } from './tools/wait-for-job.js'
import { registerUpdateJobStatusTool } from './tools/update-job-status.js'
import { registerVerifyTaskAgainstPlanTool } from './tools/verify-task-against-plan.js'
import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js'
const VERSION = '0.1.0'
@ -51,6 +52,7 @@ async function main() {
registerCancelQuestionTool(server)
registerWaitForJobTool(server)
registerUpdateJobStatusTool(server)
registerVerifyTaskAgainstPlanTool(server)
registerImplementNextStoryPrompt(server)
const transport = new StdioServerTransport()

178
src/lib/verify-plan.ts Normal file
View file

@ -0,0 +1,178 @@
// Core logic for verify_task_against_plan: diff, AC heuristic, drift score.
export function parseAcceptanceCriteria(text: string | null): string[] {
if (!text) return []
return text
.split('\n')
.map((line) => line.replace(/^[-•*]\s*/, '').replace(/^\d+\.\s*/, '').trim())
.filter((line) => line.length > 0)
}
export function extractKeywords(text: string): string[] {
const raw = text.split(/[\s,;:()[\]{}]+/)
const keywords: Set<string> = new Set()
for (const word of raw) {
const clean = word.replace(/^[^\w./\-]+|[^\w./\-]+$/g, '')
if (!clean) continue
if (clean.includes('.') || clean.includes('/')) {
keywords.add(clean.toLowerCase()) // filenames / paths
} else if (/^[A-Z][a-z]/.test(clean) && clean.length > 4) {
keywords.add(clean.toLowerCase()) // CamelCase identifiers
} else if (clean.length > 6) {
keywords.add(clean.toLowerCase()) // long lowercase words
}
}
return [...keywords]
}
export function checkACStatus(acText: string, corpus: string): '✓' | '✗' | '?' {
const keywords = extractKeywords(acText)
if (keywords.length === 0) return '?'
const corpusLower = corpus.toLowerCase()
const matched = keywords.filter((kw) => corpusLower.includes(kw))
const ratio = matched.length / keywords.length
if (ratio >= 0.5) return '✓'
if (ratio > 0) return '?'
return '✗'
}
export function computeDriftScore(results: Array<{ status: '✓' | '✗' | '?' }>): number {
if (results.length === 0) return 0
const passed = results.filter((r) => r.status === '✓').length
return Math.round((passed / results.length) * 100)
}
export function lineDiff(snapshot: string, current: string): string | null {
if (snapshot === current) return null
const aLines = snapshot.split('\n')
const bLines = current.split('\n')
const out: string[] = []
const len = Math.max(aLines.length, bLines.length)
for (let i = 0; i < len; i++) {
const a = i < aLines.length ? aLines[i] : undefined
const b = i < bLines.length ? bLines[i] : undefined
if (a === b) continue
if (a !== undefined) out.push(`- ${a}`)
if (b !== undefined) out.push(`+ ${b}`)
}
return out.length > 0 ? out.join('\n') : null
}
export interface ACResult {
text: string
status: '✓' | '✗' | '?'
}
export interface VerifyResult {
taskId: string
taskTitle: string
hasBaseline: boolean
planSnapshot: string | null
currentPlan: string | null
planEdited: boolean
planDiff: string | null
implementationLogs: string[]
commits: Array<{ hash: string | null; message: string | null }>
acceptanceCriteria: ACResult[]
driftScore: number
}
export function buildVerifyResult(opts: {
taskId: string
taskTitle: string
planSnapshot: string | null
currentPlan: string | null
acceptanceCriteriaText: string | null
implementationLogs: string[]
commits: Array<{ hash: string | null; message: string | null }>
}): VerifyResult {
const { taskId, taskTitle, planSnapshot, currentPlan, acceptanceCriteriaText, implementationLogs, commits } = opts
const hasBaseline = planSnapshot !== null
const planEdited = hasBaseline && planSnapshot !== (currentPlan ?? '')
const planDiff = planEdited ? lineDiff(planSnapshot!, currentPlan ?? '') : null
const corpus = [
currentPlan ?? '',
...implementationLogs,
...commits.map((c) => `${c.hash ?? ''} ${c.message ?? ''}`),
].join('\n')
const acTexts = parseAcceptanceCriteria(acceptanceCriteriaText)
const acceptanceCriteria: ACResult[] = acTexts.map((text) => ({
text,
status: checkACStatus(text, corpus),
}))
const driftScore = computeDriftScore(acceptanceCriteria)
return {
taskId,
taskTitle,
hasBaseline,
planSnapshot,
currentPlan,
planEdited,
planDiff,
implementationLogs,
commits,
acceptanceCriteria,
driftScore,
}
}
export function renderMarkdownReport(r: VerifyResult): string {
const lines: string[] = []
lines.push(`# Verify task: ${r.taskTitle} (\`${r.taskId}\`)`, '')
lines.push('## Plan')
if (!r.hasBaseline) {
lines.push('- Snapshot: **no baseline** (job was claimed before snapshot feature)')
} else {
const snipLen = 120
const snip = (r.planSnapshot ?? '').length > snipLen
? (r.planSnapshot ?? '').slice(0, snipLen) + '…'
: (r.planSnapshot ?? '(leeg)')
lines.push(`- Snapshot: ${snip}`)
}
const curSnip = (r.currentPlan ?? '').length > 120
? (r.currentPlan ?? '').slice(0, 120) + '…'
: (r.currentPlan ?? '(leeg)')
lines.push(`- Current: ${curSnip}`)
lines.push(`- Edited onderweg: **${r.planEdited ? 'yes' : 'no'}**`)
if (r.planEdited && r.planDiff) {
lines.push('', '### Diff (snapshot → current)', '```diff', r.planDiff, '```')
}
lines.push('')
const passed = r.acceptanceCriteria.filter((a) => a.status === '✓').length
const total = r.acceptanceCriteria.length
lines.push(`## AC-checks (${passed}/${total} ✓ — drift-score ${r.driftScore}%)`)
if (total === 0) {
lines.push('_Geen acceptance criteria op de parent story._')
} else {
for (const ac of r.acceptanceCriteria) {
lines.push(`- ${ac.status} ${ac.text}`)
}
}
lines.push('')
lines.push('## Realisatie')
lines.push(`- ${r.implementationLogs.length} log_implementation-entr${r.implementationLogs.length === 1 ? 'y' : 'ies'}`)
if (r.commits.length === 0) {
lines.push('- Geen commits gelogd')
} else {
for (const c of r.commits) {
lines.push(`- commit \`${c.hash ?? '?'}\`${c.message ?? '(geen bericht)'}`)
}
}
lines.push('')
lines.push('---')
lines.push('⚠️ Heuristiek-rapport — handmatige PR-review blijft nodig')
return lines.join('\n')
}

View file

@ -0,0 +1,96 @@
import { z } from 'zod'
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { prisma } from '../prisma.js'
import { getAuth } from '../auth.js'
import { userCanAccessTask } from '../access.js'
import { toolError, toolJson, withToolErrors } from '../errors.js'
import { buildVerifyResult, renderMarkdownReport } from '../lib/verify-plan.js'
const inputSchema = z.object({
task_id: z.string().min(1),
})
export function registerVerifyTaskAgainstPlanTool(server: McpServer) {
server.registerTool(
'verify_task_against_plan',
{
title: 'Verify task against plan',
description:
'Compare the frozen plan_snapshot (captured at claim time) against current ' +
'task.implementation_plan, story logs, and commits. Returns a markdown report ' +
'with per-AC ✓/✗/? heuristic checks and a drift-score. Read-only — demo users allowed.',
inputSchema,
annotations: { readOnlyHint: true },
},
async ({ task_id }) =>
withToolErrors(async () => {
const auth = await getAuth()
if (!auth) return toolError('Unauthorized')
if (!(await userCanAccessTask(task_id, auth.userId))) {
return toolError(`Task ${task_id} not found or not accessible`)
}
const task = await prisma.task.findUnique({
where: { id: task_id },
select: {
id: true,
title: true,
implementation_plan: true,
story: {
select: {
id: true,
acceptance_criteria: true,
logs: {
orderBy: { created_at: 'asc' },
select: {
type: true,
content: true,
commit_hash: true,
commit_message: true,
},
},
},
},
claude_jobs: {
where: { status: { in: ['CLAIMED', 'RUNNING', 'DONE', 'FAILED'] } },
orderBy: { created_at: 'desc' },
take: 1,
select: { plan_snapshot: true },
},
},
})
if (!task) return toolError(`Task ${task_id} not found`)
const latestJob = task.claude_jobs[0] ?? null
const planSnapshot = latestJob ? latestJob.plan_snapshot : null
const implementationLogs = task.story.logs
.filter((l) => l.type === 'IMPLEMENTATION_PLAN')
.map((l) => l.content)
const commits = task.story.logs
.filter((l) => l.type === 'COMMIT')
.map((l) => ({ hash: l.commit_hash, message: l.commit_message }))
const result = buildVerifyResult({
taskId: task.id,
taskTitle: task.title,
planSnapshot,
currentPlan: task.implementation_plan,
acceptanceCriteriaText: task.story.acceptance_criteria,
implementationLogs,
commits,
})
return toolJson({
report: renderMarkdownReport(result),
task_id: result.taskId,
drift_score: result.driftScore,
ac_results: result.acceptanceCriteria,
plan_edited: result.planEdited,
has_baseline: result.hasBaseline,
})
}),
)
}

View file

@ -19,51 +19,57 @@ const inputSchema = z.object({
wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300),
})
async function resetStaleClaimedJobs(userId: string) {
export async function resetStaleClaimedJobs(userId: string) {
await prisma.$executeRaw`
UPDATE claude_jobs
SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL
SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL
WHERE user_id = ${userId}
AND status = 'CLAIMED'
AND claimed_at < NOW() - INTERVAL '30 minutes'
`
}
async function tryClaimJob(
export async function tryClaimJob(
userId: string,
tokenId: string,
productId?: string,
): Promise<string | null> {
// Atomic claim in a single transaction
// Atomic claim in a single transaction — also captures plan_snapshot from task
const rows = await prisma.$transaction(async (tx) => {
// SELECT FOR UPDATE SKIP LOCKED — skip jobs another worker has locked
// SELECT FOR UPDATE OF claude_jobs SKIP LOCKED — join tasks to read implementation_plan
const found = productId
? await tx.$queryRaw<Array<{ id: string }>>`
SELECT id FROM claude_jobs
WHERE user_id = ${userId}
AND product_id = ${productId}
AND status = 'QUEUED'
ORDER BY created_at ASC
? await tx.$queryRaw<Array<{ id: string; implementation_plan: string | null }>>`
SELECT cj.id, t.implementation_plan
FROM claude_jobs cj
JOIN tasks t ON t.id = cj.task_id
WHERE cj.user_id = ${userId}
AND cj.product_id = ${productId}
AND cj.status = 'QUEUED'
ORDER BY cj.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
FOR UPDATE OF cj SKIP LOCKED
`
: await tx.$queryRaw<Array<{ id: string }>>`
SELECT id FROM claude_jobs
WHERE user_id = ${userId}
AND status = 'QUEUED'
ORDER BY created_at ASC
: await tx.$queryRaw<Array<{ id: string; implementation_plan: string | null }>>`
SELECT cj.id, t.implementation_plan
FROM claude_jobs cj
JOIN tasks t ON t.id = cj.task_id
WHERE cj.user_id = ${userId}
AND cj.status = 'QUEUED'
ORDER BY cj.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
FOR UPDATE OF cj SKIP LOCKED
`
if (found.length === 0) return []
const jobId = found[0].id
const snapshot = found[0].implementation_plan ?? ''
await tx.$executeRaw`
UPDATE claude_jobs
SET status = 'CLAIMED',
claimed_by_token_id = ${tokenId},
claimed_at = NOW()
claimed_at = NOW(),
plan_snapshot = ${snapshot}
WHERE id = ${jobId}
`
return [{ id: jobId }]

2
vendor/scrum4me vendored

@ -1 +1 @@
Subproject commit 73087e9705abbe4ad53278ea95cb377cccd1e1f3
Subproject commit a3af2dda63531149a940931ca614d85ea9b9727e