feat: capture plan_snapshot at job claim in wait_for_job

- resetStaleClaimedJobs: also sets plan_snapshot = NULL on reset
- tryClaimJob: JOINs tasks table to read implementation_plan in the
  same atomic transaction, writes it to claude_jobs.plan_snapshot
- Empty-plan edge case: NULL becomes '' (non-null) in snapshot
- Exports both functions for unit testing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janpeter Visser 2026-04-30 19:27:52 +02:00
parent 5ecb9903e6
commit ddc773d20a

View file

@ -19,51 +19,57 @@ const inputSchema = z.object({
wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300),
})
async function resetStaleClaimedJobs(userId: string) {
export async function resetStaleClaimedJobs(userId: string) {
await prisma.$executeRaw`
UPDATE claude_jobs
SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL
SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL
WHERE user_id = ${userId}
AND status = 'CLAIMED'
AND claimed_at < NOW() - INTERVAL '30 minutes'
`
}
async function tryClaimJob(
export async function tryClaimJob(
userId: string,
tokenId: string,
productId?: string,
): Promise<string | null> {
// Atomic claim in a single transaction
// Atomic claim in a single transaction — also captures plan_snapshot from task
const rows = await prisma.$transaction(async (tx) => {
// SELECT FOR UPDATE SKIP LOCKED — skip jobs another worker has locked
// SELECT FOR UPDATE OF claude_jobs SKIP LOCKED — join tasks to read implementation_plan
const found = productId
? await tx.$queryRaw<Array<{ id: string }>>`
SELECT id FROM claude_jobs
WHERE user_id = ${userId}
AND product_id = ${productId}
AND status = 'QUEUED'
ORDER BY created_at ASC
? await tx.$queryRaw<Array<{ id: string; implementation_plan: string | null }>>`
SELECT cj.id, t.implementation_plan
FROM claude_jobs cj
JOIN tasks t ON t.id = cj.task_id
WHERE cj.user_id = ${userId}
AND cj.product_id = ${productId}
AND cj.status = 'QUEUED'
ORDER BY cj.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
FOR UPDATE OF cj SKIP LOCKED
`
: await tx.$queryRaw<Array<{ id: string }>>`
SELECT id FROM claude_jobs
WHERE user_id = ${userId}
AND status = 'QUEUED'
ORDER BY created_at ASC
: await tx.$queryRaw<Array<{ id: string; implementation_plan: string | null }>>`
SELECT cj.id, t.implementation_plan
FROM claude_jobs cj
JOIN tasks t ON t.id = cj.task_id
WHERE cj.user_id = ${userId}
AND cj.status = 'QUEUED'
ORDER BY cj.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
FOR UPDATE OF cj SKIP LOCKED
`
if (found.length === 0) return []
const jobId = found[0].id
const snapshot = found[0].implementation_plan ?? ''
await tx.$executeRaw`
UPDATE claude_jobs
SET status = 'CLAIMED',
claimed_by_token_id = ${tokenId},
claimed_at = NOW()
claimed_at = NOW(),
plan_snapshot = ${snapshot}
WHERE id = ${jobId}
`
return [{ id: jobId }]