diff --git a/__tests__/wait-for-job-snapshot.test.ts b/__tests__/wait-for-job-snapshot.test.ts index e4eb059..bb2e871 100644 --- a/__tests__/wait-for-job-snapshot.test.ts +++ b/__tests__/wait-for-job-snapshot.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' vi.mock('../src/prisma.js', () => ({ prisma: { - $executeRaw: vi.fn(), + $queryRaw: vi.fn(), $transaction: vi.fn(), }, })) @@ -11,27 +11,41 @@ import { prisma } from '../src/prisma.js' import { resetStaleClaimedJobs, tryClaimJob } from '../src/tools/wait-for-job.js' const mockPrisma = prisma as unknown as { - $executeRaw: ReturnType + $queryRaw: ReturnType $transaction: ReturnType } beforeEach(() => { vi.clearAllMocks() + // Default: no stale jobs returned from either query + mockPrisma.$queryRaw.mockResolvedValue([]) }) describe('resetStaleClaimedJobs', () => { - it('resets plan_snapshot to NULL when resetting stale claimed jobs', async () => { - mockPrisma.$executeRaw.mockResolvedValue(0) + it('runs two $queryRaw calls: one for FAILED, one for QUEUED re-enqueue', async () => { await resetStaleClaimedJobs('user-1') + // Two queries: failed jobs + requeued jobs + expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(2) + }) - expect(mockPrisma.$executeRaw).toHaveBeenCalledOnce() - // Verify the template literal includes plan_snapshot = NULL - const call = mockPrisma.$executeRaw.mock.calls[0] - const sqlParts: string[] = call[0] - const fullSql = sqlParts.join('') - expect(fullSql).toContain('plan_snapshot = NULL') - expect(fullSql).toContain("status = 'QUEUED'") - expect(fullSql).toContain('claimed_at < NOW()') + it('FAILED query includes plan_snapshot = NULL reset and retry_count >= 2', async () => { + await resetStaleClaimedJobs('user-1') + const calls = mockPrisma.$queryRaw.mock.calls + // First call: FAILED transition + const failedSql = (calls[0][0] as string[]).join('') + expect(failedSql).toContain("status = 'FAILED'") + expect(failedSql).toContain('retry_count >= 2') + }) + + it('QUEUED re-enqueue query includes plan_snapshot = NULL and retry_count increment', async () => { + await resetStaleClaimedJobs('user-1') + const calls = mockPrisma.$queryRaw.mock.calls + // Second call: re-enqueue transition + const requeueSql = (calls[1][0] as string[]).join('') + expect(requeueSql).toContain("status = 'QUEUED'") + expect(requeueSql).toContain('plan_snapshot = NULL') + expect(requeueSql).toContain('retry_count = retry_count + 1') + expect(requeueSql).toContain('retry_count < 2') }) }) diff --git a/src/tools/wait-for-job.ts b/src/tools/wait-for-job.ts index 740710b..4aff070 100644 --- a/src/tools/wait-for-job.ts +++ b/src/tools/wait-for-job.ts @@ -73,14 +73,78 @@ const inputSchema = z.object({ wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300), }) -export async function resetStaleClaimedJobs(userId: string) { - await prisma.$executeRaw` +const STALE_ERROR_MSG = 'agent did not complete job within 2 attempts' + +export async function resetStaleClaimedJobs(userId: string): Promise { + // Jobs that exceeded the retry limit → FAILED + const failedRows = await prisma.$queryRaw< + Array<{ id: string; task_id: string; product_id: string }> + >` UPDATE claude_jobs - SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL + SET status = 'FAILED', + finished_at = NOW(), + error = ${STALE_ERROR_MSG} WHERE user_id = ${userId} AND status = 'CLAIMED' AND claimed_at < NOW() - INTERVAL '30 minutes' + AND retry_count >= 2 + RETURNING id, task_id, product_id ` + + // Jobs under the retry limit → back to QUEUED, increment retry_count + const requeuedRows = await prisma.$queryRaw< + Array<{ id: string; task_id: string; product_id: string; retry_count: number }> + >` + UPDATE claude_jobs + SET status = 'QUEUED', + claimed_by_token_id = NULL, + claimed_at = NULL, + plan_snapshot = NULL, + retry_count = retry_count + 1 + WHERE user_id = ${userId} + AND status = 'CLAIMED' + AND claimed_at < NOW() - INTERVAL '30 minutes' + AND retry_count < 2 + RETURNING id, task_id, product_id, retry_count + ` + + if (failedRows.length === 0 && requeuedRows.length === 0) return + + // Notify UI via SSE for each transition (best-effort) + try { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + for (const j of failedRows) { + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'claude_job_status', + job_id: j.id, + task_id: j.task_id, + user_id: userId, + product_id: j.product_id, + status: 'failed', + error: STALE_ERROR_MSG, + }), + ]) + } + for (const j of requeuedRows) { + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'claude_job_status', + job_id: j.id, + task_id: j.task_id, + user_id: userId, + product_id: j.product_id, + status: 'queued', + }), + ]) + } + await pg.end() + } catch { + // non-fatal — status transitions are already persisted + } } export async function tryClaimJob(