From ddc773d20a57dc0607b3706d0ffc4c47529f7383 Mon Sep 17 00:00:00 2001 From: Madhura68 Date: Thu, 30 Apr 2026 19:27:52 +0200 Subject: [PATCH] feat: capture plan_snapshot at job claim in wait_for_job - resetStaleClaimedJobs: also sets plan_snapshot = NULL on reset - tryClaimJob: JOINs tasks table to read implementation_plan in the same atomic transaction, writes it to claude_jobs.plan_snapshot - Empty-plan edge case: NULL becomes '' (non-null) in snapshot - Exports both functions for unit testing Co-Authored-By: Claude Sonnet 4.6 --- src/tools/wait-for-job.ts | 44 ++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/src/tools/wait-for-job.ts b/src/tools/wait-for-job.ts index 4430656..d4e5be5 100644 --- a/src/tools/wait-for-job.ts +++ b/src/tools/wait-for-job.ts @@ -19,51 +19,57 @@ const inputSchema = z.object({ wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300), }) -async function resetStaleClaimedJobs(userId: string) { +export async function resetStaleClaimedJobs(userId: string) { await prisma.$executeRaw` UPDATE claude_jobs - SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL + SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL WHERE user_id = ${userId} AND status = 'CLAIMED' AND claimed_at < NOW() - INTERVAL '30 minutes' ` } -async function tryClaimJob( +export async function tryClaimJob( userId: string, tokenId: string, productId?: string, ): Promise { - // Atomic claim in a single transaction + // Atomic claim in a single transaction — also captures plan_snapshot from task const rows = await prisma.$transaction(async (tx) => { - // SELECT FOR UPDATE SKIP LOCKED — skip jobs another worker has locked + // SELECT FOR UPDATE OF claude_jobs SKIP LOCKED — join tasks to read implementation_plan const found = productId - ? await tx.$queryRaw>` - SELECT id FROM claude_jobs - WHERE user_id = ${userId} - AND product_id = ${productId} - AND status = 'QUEUED' - ORDER BY created_at ASC + ? await tx.$queryRaw>` + SELECT cj.id, t.implementation_plan + FROM claude_jobs cj + JOIN tasks t ON t.id = cj.task_id + WHERE cj.user_id = ${userId} + AND cj.product_id = ${productId} + AND cj.status = 'QUEUED' + ORDER BY cj.created_at ASC LIMIT 1 - FOR UPDATE SKIP LOCKED + FOR UPDATE OF cj SKIP LOCKED ` - : await tx.$queryRaw>` - SELECT id FROM claude_jobs - WHERE user_id = ${userId} - AND status = 'QUEUED' - ORDER BY created_at ASC + : await tx.$queryRaw>` + SELECT cj.id, t.implementation_plan + FROM claude_jobs cj + JOIN tasks t ON t.id = cj.task_id + WHERE cj.user_id = ${userId} + AND cj.status = 'QUEUED' + ORDER BY cj.created_at ASC LIMIT 1 - FOR UPDATE SKIP LOCKED + FOR UPDATE OF cj SKIP LOCKED ` if (found.length === 0) return [] const jobId = found[0].id + const snapshot = found[0].implementation_plan ?? '' await tx.$executeRaw` UPDATE claude_jobs SET status = 'CLAIMED', claimed_by_token_id = ${tokenId}, - claimed_at = NOW() + claimed_at = NOW(), + plan_snapshot = ${snapshot} WHERE id = ${jobId} ` return [{ id: jobId }]