feat(M13): retry-tracking — stale CLAIMED jobs → QUEUED (retry_count++) or FAILED (≥2 retries)
resetStaleClaimedJobs now uses $queryRaw with RETURNING so it can send pg_notify claude_job_status events per transitioned job. Jobs under the retry limit are re-queued with retry_count incremented; jobs at ≥2 retries are marked FAILED.
This commit is contained in:
parent
2343915a6a
commit
095ebc40f8
2 changed files with 93 additions and 15 deletions
|
|
@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'
|
||||||
|
|
||||||
vi.mock('../src/prisma.js', () => ({
|
vi.mock('../src/prisma.js', () => ({
|
||||||
prisma: {
|
prisma: {
|
||||||
$executeRaw: vi.fn(),
|
$queryRaw: vi.fn(),
|
||||||
$transaction: vi.fn(),
|
$transaction: vi.fn(),
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
@ -11,27 +11,41 @@ import { prisma } from '../src/prisma.js'
|
||||||
import { resetStaleClaimedJobs, tryClaimJob } from '../src/tools/wait-for-job.js'
|
import { resetStaleClaimedJobs, tryClaimJob } from '../src/tools/wait-for-job.js'
|
||||||
|
|
||||||
const mockPrisma = prisma as unknown as {
|
const mockPrisma = prisma as unknown as {
|
||||||
$executeRaw: ReturnType<typeof vi.fn>
|
$queryRaw: ReturnType<typeof vi.fn>
|
||||||
$transaction: ReturnType<typeof vi.fn>
|
$transaction: ReturnType<typeof vi.fn>
|
||||||
}
|
}
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
vi.clearAllMocks()
|
vi.clearAllMocks()
|
||||||
|
// Default: no stale jobs returned from either query
|
||||||
|
mockPrisma.$queryRaw.mockResolvedValue([])
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('resetStaleClaimedJobs', () => {
|
describe('resetStaleClaimedJobs', () => {
|
||||||
it('resets plan_snapshot to NULL when resetting stale claimed jobs', async () => {
|
it('runs two $queryRaw calls: one for FAILED, one for QUEUED re-enqueue', async () => {
|
||||||
mockPrisma.$executeRaw.mockResolvedValue(0)
|
|
||||||
await resetStaleClaimedJobs('user-1')
|
await resetStaleClaimedJobs('user-1')
|
||||||
|
// Two queries: failed jobs + requeued jobs
|
||||||
|
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(2)
|
||||||
|
})
|
||||||
|
|
||||||
expect(mockPrisma.$executeRaw).toHaveBeenCalledOnce()
|
it('FAILED query includes plan_snapshot = NULL reset and retry_count >= 2', async () => {
|
||||||
// Verify the template literal includes plan_snapshot = NULL
|
await resetStaleClaimedJobs('user-1')
|
||||||
const call = mockPrisma.$executeRaw.mock.calls[0]
|
const calls = mockPrisma.$queryRaw.mock.calls
|
||||||
const sqlParts: string[] = call[0]
|
// First call: FAILED transition
|
||||||
const fullSql = sqlParts.join('')
|
const failedSql = (calls[0][0] as string[]).join('')
|
||||||
expect(fullSql).toContain('plan_snapshot = NULL')
|
expect(failedSql).toContain("status = 'FAILED'")
|
||||||
expect(fullSql).toContain("status = 'QUEUED'")
|
expect(failedSql).toContain('retry_count >= 2')
|
||||||
expect(fullSql).toContain('claimed_at < NOW()')
|
})
|
||||||
|
|
||||||
|
it('QUEUED re-enqueue query includes plan_snapshot = NULL and retry_count increment', async () => {
|
||||||
|
await resetStaleClaimedJobs('user-1')
|
||||||
|
const calls = mockPrisma.$queryRaw.mock.calls
|
||||||
|
// Second call: re-enqueue transition
|
||||||
|
const requeueSql = (calls[1][0] as string[]).join('')
|
||||||
|
expect(requeueSql).toContain("status = 'QUEUED'")
|
||||||
|
expect(requeueSql).toContain('plan_snapshot = NULL')
|
||||||
|
expect(requeueSql).toContain('retry_count = retry_count + 1')
|
||||||
|
expect(requeueSql).toContain('retry_count < 2')
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -73,14 +73,78 @@ const inputSchema = z.object({
|
||||||
wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300),
|
wait_seconds: z.number().int().min(1).max(MAX_WAIT_SECONDS).default(300),
|
||||||
})
|
})
|
||||||
|
|
||||||
export async function resetStaleClaimedJobs(userId: string) {
|
const STALE_ERROR_MSG = 'agent did not complete job within 2 attempts'
|
||||||
await prisma.$executeRaw`
|
|
||||||
|
export async function resetStaleClaimedJobs(userId: string): Promise<void> {
|
||||||
|
// Jobs that exceeded the retry limit → FAILED
|
||||||
|
const failedRows = await prisma.$queryRaw<
|
||||||
|
Array<{ id: string; task_id: string; product_id: string }>
|
||||||
|
>`
|
||||||
UPDATE claude_jobs
|
UPDATE claude_jobs
|
||||||
SET status = 'QUEUED', claimed_by_token_id = NULL, claimed_at = NULL, plan_snapshot = NULL
|
SET status = 'FAILED',
|
||||||
|
finished_at = NOW(),
|
||||||
|
error = ${STALE_ERROR_MSG}
|
||||||
WHERE user_id = ${userId}
|
WHERE user_id = ${userId}
|
||||||
AND status = 'CLAIMED'
|
AND status = 'CLAIMED'
|
||||||
AND claimed_at < NOW() - INTERVAL '30 minutes'
|
AND claimed_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
AND retry_count >= 2
|
||||||
|
RETURNING id, task_id, product_id
|
||||||
`
|
`
|
||||||
|
|
||||||
|
// Jobs under the retry limit → back to QUEUED, increment retry_count
|
||||||
|
const requeuedRows = await prisma.$queryRaw<
|
||||||
|
Array<{ id: string; task_id: string; product_id: string; retry_count: number }>
|
||||||
|
>`
|
||||||
|
UPDATE claude_jobs
|
||||||
|
SET status = 'QUEUED',
|
||||||
|
claimed_by_token_id = NULL,
|
||||||
|
claimed_at = NULL,
|
||||||
|
plan_snapshot = NULL,
|
||||||
|
retry_count = retry_count + 1
|
||||||
|
WHERE user_id = ${userId}
|
||||||
|
AND status = 'CLAIMED'
|
||||||
|
AND claimed_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
AND retry_count < 2
|
||||||
|
RETURNING id, task_id, product_id, retry_count
|
||||||
|
`
|
||||||
|
|
||||||
|
if (failedRows.length === 0 && requeuedRows.length === 0) return
|
||||||
|
|
||||||
|
// Notify UI via SSE for each transition (best-effort)
|
||||||
|
try {
|
||||||
|
const pg = new Client({ connectionString: process.env.DATABASE_URL })
|
||||||
|
await pg.connect()
|
||||||
|
for (const j of failedRows) {
|
||||||
|
await pg.query('SELECT pg_notify($1, $2)', [
|
||||||
|
'scrum4me_changes',
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'claude_job_status',
|
||||||
|
job_id: j.id,
|
||||||
|
task_id: j.task_id,
|
||||||
|
user_id: userId,
|
||||||
|
product_id: j.product_id,
|
||||||
|
status: 'failed',
|
||||||
|
error: STALE_ERROR_MSG,
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
for (const j of requeuedRows) {
|
||||||
|
await pg.query('SELECT pg_notify($1, $2)', [
|
||||||
|
'scrum4me_changes',
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'claude_job_status',
|
||||||
|
job_id: j.id,
|
||||||
|
task_id: j.task_id,
|
||||||
|
user_id: userId,
|
||||||
|
product_id: j.product_id,
|
||||||
|
status: 'queued',
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
await pg.end()
|
||||||
|
} catch {
|
||||||
|
// non-fatal — status transitions are already persisted
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function tryClaimJob(
|
export async function tryClaimJob(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue