scrum4me-docker/bin/run-one-job.ts
Janpeter Visser 0b5a044ea5 feat(logs): per-job log-symlink jobs/<job_id>.log -> runs/<ts>.log (IDEA-063)
Run-logs in /var/log/agent/runs/ zijn timestamp-named, dus de output van
een specifieke job was alleen via grep te vinden. De map jobs/ bestond al
maar werd niet gevuld.

- run-agent.sh: geeft het run-log-pad door als RUN_LOG env-var aan
  run-one-job.ts.
- run-one-job.ts: legt direct na de claim een symlink
  jobs/<job_id>.log -> ../runs/<ts>.log. Relatief pad (overleeft de
  host bind-mount), best-effort (faalt de job nooit over een log-gemak).
- log-cleanup.sh: ruimt dangling per-job symlinks op met `find -xtype l`
  — nodig omdat rotate-logs.sh het doel na 24u gzipt (.log -> .log.gz)
  of na 30d verwijdert, en de bestaande `-type f` cleanup symlinks niet
  raakt.

Functioneel geverifieerd: symlink resolveert, dangling-prune werkt,
`-type f` negeert de symlink (geen voortijdige delete). run-one-job.ts
parseert schoon (node --check + type-strip).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 19:22:40 +02:00

434 lines
16 KiB
TypeScript

#!/usr/bin/env tsx
// run-one-job.ts — handelt één geclaimde Scrum4Me-ClaudeJob af.
//
// Architectuur (zie docs/plans/queue-loop-extraction.md in Scrum4Me-repo):
// scrum4me-docker/bin/run-agent.sh roept dit script per iteratie aan.
// Stappen:
// 1. getAuth → resolved userId/tokenId uit SCRUM4ME_TOKEN.
// 2. quota-probe (was Claude's verantwoordelijkheid in CLAUDE.md stappen 0.x).
// 3. resetStaleClaimedJobs → tryClaimJob, met LISTEN-fallback (270s) bij lege queue.
// 4. getFullJobContext → resolved JobConfig + kind-specifieke payload.
// 5. attachWorktreeToJob (alleen TASK_IMPLEMENTATION).
// 6. Schrijf payload naar /tmp/job-<id>/payload.json.
// 7. Bouw CLI-args uit ctx.config + mapBudgetToEffort.
// 8. setInterval(60s) lease-renewal voor SPRINT_IMPLEMENTATION.
// 9. spawn 'claude' met inherited stdio + scan voor token-expiry-patterns.
// 10. try/finally: bij Claude-exit≠0 zonder update_job_status → rollbackClaim.
// 11. cleanup payload + prisma.$disconnect().
//
// Exit-codes:
// 0 = job afgehandeld of geen job binnen wait-deadline (idle)
// 1 = generieke fout (claim, context-fetch, worktree, spawn)
// 3 = TOKEN_EXPIRED detected → run-agent.sh schrijft TOKEN_EXPIRED marker
import { spawn, spawnSync } from 'node:child_process'
import { mkdirSync, rmSync, symlinkSync, writeFileSync } from 'node:fs'
import { basename, join } from 'node:path'
import { Client as PgClient } from 'pg'
import { getAuth } from '/opt/scrum4me-mcp/src/auth.js'
import { prisma } from '/opt/scrum4me-mcp/src/prisma.js'
import {
attachWorktreeToJob,
getFullJobContext,
resetStaleClaimedJobs,
rollbackClaim,
tryClaimJob,
} from '/opt/scrum4me-mcp/src/tools/wait-for-job.js'
import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js'
import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js'
import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js'
import { registerWorker } from '/opt/scrum4me-mcp/src/presence/worker.js'
import { startHeartbeat } from '/opt/scrum4me-mcp/src/presence/heartbeat.js'
// ----- logging --------------------------------------------------------
const log = (msg: string) =>
console.log(`${new Date().toISOString()} [run-one-job] ${msg}`)
const logError = (msg: string) =>
console.error(`${new Date().toISOString()} [run-one-job] ERROR ${msg}`)
// ----- constants ------------------------------------------------------
const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job
const POLL_INTERVAL_MS = 5000
const HEARTBEAT_INTERVAL_MS = 60_000
const WORKER_HEARTBEAT_INTERVAL_MS = 10_000
const MCP_CONFIG = '/opt/agent/mcp-config.json'
const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh'
const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000
const TOKEN_EXPIRY_PATTERNS: RegExp[] = [
/invalid_api_key/i,
/authentication.*failed/i,
/401.*unauthor/i,
/OAuth.*expired/i,
]
// ----- quota probe ----------------------------------------------------
// Soft-fail: als de probe geen rate-limit-headers krijgt (sommige Anthropic
// endpoints retourneren ze niet) of een transient netwerkfout heeft, log
// een warning en ga door. Alleen bij gemeten quota-overschrijding sleepen.
// Dit spiegelt het gedrag van CLAUDE.md stap 0.4 ("anders: ga door").
async function quotaProbe(userId: string): Promise<void> {
const probe = spawnSync(QUOTA_PROBE_PATH, [], { encoding: 'utf8' })
if (probe.status !== 0) {
log(
`quota probe non-zero status=${probe.status} stdout=${probe.stdout.slice(0, 200).trim()} — continuing without gate`,
)
return
}
let parsed: { pct?: number; limit?: number; remaining?: number; reset_at_iso?: string; error?: string }
try {
parsed = JSON.parse(probe.stdout)
} catch {
log(`quota probe stdout not JSON (continuing): ${probe.stdout.slice(0, 200)}`)
return
}
if (parsed.pct === undefined) {
log(`quota probe no pct (continuing): error=${parsed.error ?? '-'}`)
return
}
const user = await prisma.user.findUnique({
where: { id: userId },
select: { min_quota_pct: true },
})
const minPct = user?.min_quota_pct ?? 0
log(
`quota probe pct=${parsed.pct} min_quota_pct=${minPct} ` +
`reset_at=${parsed.reset_at_iso ?? '-'}`,
)
if (parsed.pct < minPct) {
let sleepMs = QUOTA_BACKOFF_CAP_MS
if (parsed.reset_at_iso) {
const resetAt = new Date(parsed.reset_at_iso).getTime()
const delta = resetAt - Date.now()
if (delta > 0 && delta < sleepMs) sleepMs = delta
}
log(`quota below min — sleeping ${Math.round(sleepMs / 1000)}s until reset`)
await new Promise((resolve) => setTimeout(resolve, sleepMs))
}
}
// ----- LISTEN-fallback voor lege queue -------------------------------
async function waitForEnqueue(userId: string): Promise<void> {
const dburl = process.env.DATABASE_URL
if (!dburl) throw new Error('DATABASE_URL not set')
const client = new PgClient({ connectionString: dburl })
await client.connect()
await client.query('LISTEN scrum4me_changes')
const deadline = Date.now() + WAIT_DEADLINE_SECONDS * 1000
try {
while (Date.now() < deadline) {
await new Promise<void>((resolve) => {
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
const onNotify = (msg: { payload?: string }) => {
try {
const payload = JSON.parse(msg.payload ?? '{}')
if (
payload.type === 'claude_job_enqueued' &&
payload.user_id === userId
) {
clearTimeout(pollTimer)
client.removeListener('notification', onNotify)
resolve()
}
} catch {
// ignore parse errors
}
}
client.on('notification', onNotify)
})
// Out of the inner promise — caller will retry tryClaimJob.
return
}
} finally {
await client.end().catch(() => {})
}
}
// ----- main -----------------------------------------------------------
async function main(): Promise<number> {
log('claim attempt starting')
const { userId, tokenId } = await getAuth()
log(`auth ok user_id=${userId} token_id=${tokenId}`)
// Worker presence — UI leest claude_workers.last_seen_at.
// UPSERT zodat de rij tussen iteraties blijft bestaan (geen flicker),
// en heartbeat houdt last_seen_at vers tijdens quota-backoff,
// LISTEN-wait, claude-spawn, en cleanup. Niet unregisteren bij exit:
// de UI prunet zelf rijen ouder dan 60s.
try {
await registerWorker({ userId, tokenId })
} catch (err) {
logError(`registerWorker failed (non-fatal): ${(err as Error).message}`)
}
const workerHeartbeat = startHeartbeat({
userId,
tokenId,
intervalMs: WORKER_HEARTBEAT_INTERVAL_MS,
})
try {
// 1. Quota probe (gate vóór elke claim).
try {
await quotaProbe(userId)
} catch (err) {
logError(`quota probe error: ${(err as Error).message}`)
return 1
}
// 2. Reset stale claims, then attempt to claim.
await resetStaleClaimedJobs(userId)
let jobId = await tryClaimJob(userId, tokenId)
if (!jobId) {
log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`)
await waitForEnqueue(userId)
await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId)
}
if (!jobId) {
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`)
return 0
}
log(`claimed job_id=${jobId}`)
// Per-job log: symlink jobs/<jobId>.log -> the runs/<timestamp>.log of
// this iteration. runs/ files are timestamp-named, so without this a job's
// output is only findable by grepping. run-agent.sh passes the run-log
// path via RUN_LOG. Relative target so it survives the host bind-mount.
// Best-effort — never fail the job over a log convenience. Dangling links
// (after the runs/ file is gzipped/deleted) are pruned by log-cleanup.sh.
const runLog = process.env.RUN_LOG
if (runLog) {
try {
const jobsDir = join(process.env.AGENT_LOG_DIR ?? '/var/log/agent', 'jobs')
mkdirSync(jobsDir, { recursive: true })
const linkPath = join(jobsDir, `${jobId}.log`)
rmSync(linkPath, { force: true })
symlinkSync(join('..', 'runs', basename(runLog)), linkPath)
} catch (err) {
log(`per-job log symlink skipped for ${jobId}: ${(err as Error).message}`)
}
}
// 3. Resolve full context.
let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null
try {
ctx = await getFullJobContext(jobId)
} catch (err) {
logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`)
log(`rollback claim job_id=${jobId} reason=context_fetch_failed`)
await rollbackClaim(jobId)
return 1
}
if (!ctx) {
logError(`getFullJobContext returned null for job_id=${jobId}`)
await rollbackClaim(jobId)
return 1
}
// 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
// eigen worktree-pad al ingevuld door getFullJobContext.
// We werken hier met `any` omdat de return-type van getFullJobContext een
// discriminated union is en TypeScript hier zonder kind-narrow geen velden
// exposed; de runtime checks dekken alle paden af.
const ctxAny = ctx as any
let worktreePath: string | null =
ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
if (ctx.kind === 'TASK_IMPLEMENTATION') {
if (!ctxAny.story || !ctxAny.task) {
logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
await rollbackClaim(jobId)
await releaseLocksOnTerminal(jobId)
return 1
}
const wt = await attachWorktreeToJob(
ctxAny.product.id,
jobId,
ctxAny.story.id,
ctxAny.task.repo_url,
)
if ('error' in wt) {
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
await rollbackClaim(jobId)
await releaseLocksOnTerminal(jobId)
return 1
}
worktreePath = wt.worktree_path
ctxAny.worktree_path = wt.worktree_path
ctxAny.branch_name = wt.branch_name
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
} else if (worktreePath) {
log(`worktree path=${worktreePath} (pre-resolved)`)
}
// 5. Resolved config — log voor audit.
const cfg = ctx.config
const effort = mapBudgetToEffort(cfg.thinking_budget)
log(
`config job_id=${jobId} model=${cfg.model} ` +
`permission_mode=${cfg.permission_mode} ` +
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` +
`max_turns=${cfg.max_turns ?? 'null'} ` +
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
)
// 6. Write payload to /tmp/job-<id>/payload.json.
const payloadDir = `/tmp/job-${jobId}`
const payloadPath = `${payloadDir}/payload.json`
mkdirSync(payloadDir, { recursive: true })
const payloadJson = JSON.stringify(ctx, null, 2)
writeFileSync(payloadPath, payloadJson, 'utf8')
log(
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`,
)
// 7. Build CLI args.
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath)
const args: string[] = [
'-p',
promptText,
'--model',
cfg.model,
'--permission-mode',
cfg.permission_mode,
'--allowedTools',
(cfg.allowed_tools ?? []).join(','),
'--mcp-config',
MCP_CONFIG,
'--add-dir',
'/opt/agent',
'--output-format',
'text',
]
if (effort) args.push('--effort', effort)
const cwd = worktreePath ?? '/opt/agent'
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
const argsForLog = args
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
.join(' ')
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
let heartbeatTimer: NodeJS.Timeout | null = null
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
heartbeatTimer = setInterval(() => {
prisma
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
.then(() => {
log(
`heartbeat tick job_id=${jobId} lease_until=${new Date(
Date.now() + 5 * 60_000,
).toISOString()}`,
)
})
.catch((err: Error) => {
logError(`heartbeat error: ${err.message}`)
})
}, HEARTBEAT_INTERVAL_MS)
}
// 9. Spawn Claude.
const start = Date.now()
let exitCode: number | null = null
let stdoutBuf = ''
try {
await new Promise<void>((resolve, reject) => {
const child = spawn('claude', args, { cwd })
child.stdout.on('data', (chunk) => {
const s = chunk.toString()
process.stdout.write(s)
stdoutBuf += s
})
child.stderr.on('data', (chunk) => {
const s = chunk.toString()
process.stderr.write(s)
stdoutBuf += s
})
child.on('error', (err) => reject(err))
child.on('close', (code) => {
exitCode = code
resolve()
})
})
} catch (err) {
logError(`spawn error: ${(err as Error).message}`)
if (heartbeatTimer) clearInterval(heartbeatTimer)
await rollbackClaim(jobId).catch(() => {})
await releaseLocksOnTerminal(jobId).catch(() => {})
return 1
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer)
}
const durationMs = Date.now() - start
log(
`claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
`duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
)
// 10. Token-expiry detection.
let tokenExpired = false
for (const pat of TOKEN_EXPIRY_PATTERNS) {
if (pat.test(stdoutBuf)) {
tokenExpired = true
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
break
}
}
// 11. Rollback claim if Claude exited non-zero without updating job-status.
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
if (exitCode !== 0 && !tokenExpired) {
const jobNow = await prisma.claudeJob.findUnique({
where: { id: jobId },
select: { status: true },
})
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
log(
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
)
await rollbackClaim(jobId).catch(() => {})
await releaseLocksOnTerminal(jobId).catch(() => {})
}
}
// 12. Cleanup payload directory.
try {
rmSync(payloadDir, { recursive: true, force: true })
} catch {
// non-fatal
}
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
if (tokenExpired) return 3
return exitCode ?? 1
} finally {
workerHeartbeat.stop()
}
}
// ----- entry ----------------------------------------------------------
process.on('SIGTERM', () => {
prisma.$disconnect().finally(() => process.exit(143))
})
main()
.then(async (code) => {
log(`exit code=${code}`)
await prisma.$disconnect().catch(() => {})
process.exit(code)
})
.catch(async (err) => {
logError(`fatal: ${(err as Error).stack ?? err}`)
await prisma.$disconnect().catch(() => {})
process.exit(1)
})