#!/usr/bin/env tsx // run-one-job.ts — handelt één geclaimde Scrum4Me-ClaudeJob af. // // Architectuur (zie docs/plans/queue-loop-extraction.md in Scrum4Me-repo): // scrum4me-docker/bin/run-agent.sh roept dit script per iteratie aan. // Stappen: // 1. getAuth → resolved userId/tokenId uit SCRUM4ME_TOKEN. // 2. quota-probe (was Claude's verantwoordelijkheid in CLAUDE.md stappen 0.x). // 3. resetStaleClaimedJobs → tryClaimJob, met LISTEN-fallback (270s) bij lege queue. // 4. getFullJobContext → resolved JobConfig + kind-specifieke payload. // 5. attachWorktreeToJob (alleen TASK_IMPLEMENTATION). // 6. Schrijf payload naar /tmp/job-/payload.json. // 7. Bouw CLI-args uit ctx.config + mapBudgetToEffort. // 8. setInterval(60s) lease-renewal voor SPRINT_IMPLEMENTATION. // 9. spawn 'claude' met inherited stdio + scan voor token-expiry-patterns. // 10. try/finally: bij Claude-exit≠0 zonder update_job_status → rollbackClaim. // 11. cleanup payload + prisma.$disconnect(). // // Exit-codes: // 0 = job afgehandeld of geen job binnen wait-deadline (idle) // 1 = generieke fout (claim, context-fetch, worktree, spawn) // 3 = TOKEN_EXPIRED detected → run-agent.sh schrijft TOKEN_EXPIRED marker import { spawn, spawnSync } from 'node:child_process' import { mkdirSync, rmSync, symlinkSync, writeFileSync } from 'node:fs' import { basename, join } from 'node:path' import { Client as PgClient } from 'pg' import { getAuth } from '/opt/scrum4me-mcp/src/auth.js' import { prisma } from '/opt/scrum4me-mcp/src/prisma.js' import { attachWorktreeToJob, getFullJobContext, resetStaleClaimedJobs, rollbackClaim, tryClaimJob, } from '/opt/scrum4me-mcp/src/tools/wait-for-job.js' import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js' import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js' import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js' import { registerWorker } from '/opt/scrum4me-mcp/src/presence/worker.js' import { startHeartbeat } from '/opt/scrum4me-mcp/src/presence/heartbeat.js' // ----- logging -------------------------------------------------------- const log = (msg: string) => console.log(`${new Date().toISOString()} [run-one-job] ${msg}`) const logError = (msg: string) => console.error(`${new Date().toISOString()} [run-one-job] ERROR ${msg}`) // ----- constants ------------------------------------------------------ const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job const POLL_INTERVAL_MS = 5000 const HEARTBEAT_INTERVAL_MS = 60_000 const WORKER_HEARTBEAT_INTERVAL_MS = 10_000 const MCP_CONFIG = '/opt/agent/mcp-config.json' const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh' const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000 const TOKEN_EXPIRY_PATTERNS: RegExp[] = [ /invalid_api_key/i, /authentication.*failed/i, /401.*unauthor/i, /OAuth.*expired/i, ] // ----- quota probe ---------------------------------------------------- // Soft-fail: als de probe geen rate-limit-headers krijgt (sommige Anthropic // endpoints retourneren ze niet) of een transient netwerkfout heeft, log // een warning en ga door. Alleen bij gemeten quota-overschrijding sleepen. // Dit spiegelt het gedrag van CLAUDE.md stap 0.4 ("anders: ga door"). async function quotaProbe(userId: string): Promise { const probe = spawnSync(QUOTA_PROBE_PATH, [], { encoding: 'utf8' }) if (probe.status !== 0) { log( `quota probe non-zero status=${probe.status} stdout=${probe.stdout.slice(0, 200).trim()} — continuing without gate`, ) return } let parsed: { pct?: number; limit?: number; remaining?: number; reset_at_iso?: string; error?: string } try { parsed = JSON.parse(probe.stdout) } catch { log(`quota probe stdout not JSON (continuing): ${probe.stdout.slice(0, 200)}`) return } if (parsed.pct === undefined) { log(`quota probe no pct (continuing): error=${parsed.error ?? '-'}`) return } const user = await prisma.user.findUnique({ where: { id: userId }, select: { min_quota_pct: true }, }) const minPct = user?.min_quota_pct ?? 0 log( `quota probe pct=${parsed.pct} min_quota_pct=${minPct} ` + `reset_at=${parsed.reset_at_iso ?? '-'}`, ) if (parsed.pct < minPct) { let sleepMs = QUOTA_BACKOFF_CAP_MS if (parsed.reset_at_iso) { const resetAt = new Date(parsed.reset_at_iso).getTime() const delta = resetAt - Date.now() if (delta > 0 && delta < sleepMs) sleepMs = delta } log(`quota below min — sleeping ${Math.round(sleepMs / 1000)}s until reset`) await new Promise((resolve) => setTimeout(resolve, sleepMs)) } } // ----- LISTEN-fallback voor lege queue ------------------------------- async function waitForEnqueue(userId: string): Promise { const dburl = process.env.DATABASE_URL if (!dburl) throw new Error('DATABASE_URL not set') const client = new PgClient({ connectionString: dburl }) await client.connect() await client.query('LISTEN scrum4me_changes') const deadline = Date.now() + WAIT_DEADLINE_SECONDS * 1000 try { while (Date.now() < deadline) { await new Promise((resolve) => { const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS) const onNotify = (msg: { payload?: string }) => { try { const payload = JSON.parse(msg.payload ?? '{}') if ( payload.type === 'claude_job_enqueued' && payload.user_id === userId ) { clearTimeout(pollTimer) client.removeListener('notification', onNotify) resolve() } } catch { // ignore parse errors } } client.on('notification', onNotify) }) // Out of the inner promise — caller will retry tryClaimJob. return } } finally { await client.end().catch(() => {}) } } // ----- main ----------------------------------------------------------- async function main(): Promise { log('claim attempt starting') const { userId, tokenId } = await getAuth() log(`auth ok user_id=${userId} token_id=${tokenId}`) // Worker presence — UI leest claude_workers.last_seen_at. // UPSERT zodat de rij tussen iteraties blijft bestaan (geen flicker), // en heartbeat houdt last_seen_at vers tijdens quota-backoff, // LISTEN-wait, claude-spawn, en cleanup. Niet unregisteren bij exit: // de UI prunet zelf rijen ouder dan 60s. try { await registerWorker({ userId, tokenId }) } catch (err) { logError(`registerWorker failed (non-fatal): ${(err as Error).message}`) } const workerHeartbeat = startHeartbeat({ userId, tokenId, intervalMs: WORKER_HEARTBEAT_INTERVAL_MS, }) try { // 1. Quota probe (gate vóór elke claim). try { await quotaProbe(userId) } catch (err) { logError(`quota probe error: ${(err as Error).message}`) return 1 } // 2. Reset stale claims, then attempt to claim. await resetStaleClaimedJobs(userId) let jobId = await tryClaimJob(userId, tokenId) if (!jobId) { log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`) await waitForEnqueue(userId) await resetStaleClaimedJobs(userId) jobId = await tryClaimJob(userId, tokenId) } if (!jobId) { log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`) return 0 } log(`claimed job_id=${jobId}`) // Per-job log: symlink jobs/.log -> the runs/.log of // this iteration. runs/ files are timestamp-named, so without this a job's // output is only findable by grepping. run-agent.sh passes the run-log // path via RUN_LOG. Relative target so it survives the host bind-mount. // Best-effort — never fail the job over a log convenience. Dangling links // (after the runs/ file is gzipped/deleted) are pruned by log-cleanup.sh. const runLog = process.env.RUN_LOG if (runLog) { try { const jobsDir = join(process.env.AGENT_LOG_DIR ?? '/var/log/agent', 'jobs') mkdirSync(jobsDir, { recursive: true }) const linkPath = join(jobsDir, `${jobId}.log`) rmSync(linkPath, { force: true }) symlinkSync(join('..', 'runs', basename(runLog)), linkPath) } catch (err) { log(`per-job log symlink skipped for ${jobId}: ${(err as Error).message}`) } } // 3. Resolve full context. let ctx: Awaited> = null try { ctx = await getFullJobContext(jobId) } catch (err) { logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`) log(`rollback claim job_id=${jobId} reason=context_fetch_failed`) await rollbackClaim(jobId) return 1 } if (!ctx) { logError(`getFullJobContext returned null for job_id=${jobId}`) await rollbackClaim(jobId) return 1 } // 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun // eigen worktree-pad al ingevuld door getFullJobContext. // We werken hier met `any` omdat de return-type van getFullJobContext een // discriminated union is en TypeScript hier zonder kind-narrow geen velden // exposed; de runtime checks dekken alle paden af. const ctxAny = ctx as any let worktreePath: string | null = ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null if (ctx.kind === 'TASK_IMPLEMENTATION') { if (!ctxAny.story || !ctxAny.task) { logError(`TASK_IMPLEMENTATION job has incomplete story/task context`) await rollbackClaim(jobId) await releaseLocksOnTerminal(jobId) return 1 } const wt = await attachWorktreeToJob( ctxAny.product.id, jobId, ctxAny.story.id, ctxAny.task.repo_url, ) if ('error' in wt) { logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`) log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`) await rollbackClaim(jobId) await releaseLocksOnTerminal(jobId) return 1 } worktreePath = wt.worktree_path ctxAny.worktree_path = wt.worktree_path ctxAny.branch_name = wt.branch_name log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`) } else if (worktreePath) { log(`worktree path=${worktreePath} (pre-resolved)`) } // 5. Resolved config — log voor audit. const cfg = ctx.config const effort = mapBudgetToEffort(cfg.thinking_budget) log( `config job_id=${jobId} model=${cfg.model} ` + `permission_mode=${cfg.permission_mode} ` + `thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` + `max_turns=${cfg.max_turns ?? 'null'} ` + `allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`, ) // 6. Write payload to /tmp/job-/payload.json. const payloadDir = `/tmp/job-${jobId}` const payloadPath = `${payloadDir}/payload.json` mkdirSync(payloadDir, { recursive: true }) const payloadJson = JSON.stringify(ctx, null, 2) writeFileSync(payloadPath, payloadJson, 'utf8') log( `payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`, ) // 7. Build CLI args. const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath) // --output-format is configureerbaar via env. Default 'stream-json' geeft // de volledige event-stream (elke tool-call, elk bericht) live in de // run-log, i.p.v. alleen Claude's eind-samenvatting. stream-json vereist // --verbose in print-mode. Zet AGENT_CLAUDE_OUTPUT_FORMAT=text terug voor // de oude terse output. TOKEN_EXPIRED-detectie werkt ongewijzigd: de // auth-error-strings staan ook binnen de JSON-events. const outputFormat = process.env.AGENT_CLAUDE_OUTPUT_FORMAT ?? 'stream-json' const args: string[] = [ '-p', promptText, '--model', cfg.model, '--permission-mode', cfg.permission_mode, '--allowedTools', (cfg.allowed_tools ?? []).join(','), '--mcp-config', MCP_CONFIG, '--add-dir', '/opt/agent', '--output-format', outputFormat, ] if (outputFormat === 'stream-json') args.push('--verbose') if (effort) args.push('--effort', effort) const cwd = worktreePath ?? '/opt/agent' // Log args zonder de volledige prompt-tekst (kan kilo's groot zijn). const argsForLog = args .map((a, i) => (i === 1 ? `` : a)) .join(' ') log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`) // 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION. let heartbeatTimer: NodeJS.Timeout | null = null if (ctx.kind === 'SPRINT_IMPLEMENTATION') { heartbeatTimer = setInterval(() => { prisma .$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}` .then(() => { log( `heartbeat tick job_id=${jobId} lease_until=${new Date( Date.now() + 5 * 60_000, ).toISOString()}`, ) }) .catch((err: Error) => { logError(`heartbeat error: ${err.message}`) }) }, HEARTBEAT_INTERVAL_MS) } // 9. Spawn Claude. const start = Date.now() let exitCode: number | null = null let stdoutBuf = '' try { await new Promise((resolve, reject) => { const child = spawn('claude', args, { cwd }) child.stdout.on('data', (chunk) => { const s = chunk.toString() process.stdout.write(s) stdoutBuf += s }) child.stderr.on('data', (chunk) => { const s = chunk.toString() process.stderr.write(s) stdoutBuf += s }) child.on('error', (err) => reject(err)) child.on('close', (code) => { exitCode = code resolve() }) }) } catch (err) { logError(`spawn error: ${(err as Error).message}`) if (heartbeatTimer) clearInterval(heartbeatTimer) await rollbackClaim(jobId).catch(() => {}) await releaseLocksOnTerminal(jobId).catch(() => {}) return 1 } finally { if (heartbeatTimer) clearInterval(heartbeatTimer) } const durationMs = Date.now() - start log( `claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` + `duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`, ) // 10. Token-expiry detection. let tokenExpired = false for (const pat of TOKEN_EXPIRY_PATTERNS) { if (pat.test(stdoutBuf)) { tokenExpired = true log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`) break } } // 11. Rollback claim if Claude exited non-zero without updating job-status. // PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min. if (exitCode !== 0 && !tokenExpired) { const jobNow = await prisma.claudeJob.findUnique({ where: { id: jobId }, select: { status: true }, }) if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') { log( `rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`, ) await rollbackClaim(jobId).catch(() => {}) await releaseLocksOnTerminal(jobId).catch(() => {}) } } // 12. Cleanup payload directory. try { rmSync(payloadDir, { recursive: true, force: true }) } catch { // non-fatal } log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`) if (tokenExpired) return 3 return exitCode ?? 1 } finally { workerHeartbeat.stop() } } // ----- entry ---------------------------------------------------------- process.on('SIGTERM', () => { prisma.$disconnect().finally(() => process.exit(143)) }) main() .then(async (code) => { log(`exit code=${code}`) await prisma.$disconnect().catch(() => {}) process.exit(code) }) .catch(async (err) => { logError(`fatal: ${(err as Error).stack ?? err}`) await prisma.$disconnect().catch(() => {}) process.exit(1) })