feat(PBI-4/ST-005): runner haalt queue-loop uit Claude (één invocation per job)
Vervangt de lange seed-prompt-loop door een Node-runner die per iteratie precies één geclaimde job afhandelt. Eén Claude-invocation = één job met de juiste per-kind config (model/permission-mode/effort/allowed_tools) volgens PBI-67's resolveJobConfig. - T-18/19/20/21: bin/run-one-job.ts (nieuw, ESM tsx). Imports direct uit /opt/scrum4me-mcp/src/. Stappen: auth → quota-probe → claim met LISTEN-fallback 270s → getFullJobContext → attachWorktreeToJob (TASK) → payload schrijven → CLI-args bouwen + mapBudgetToEffort → spawn claude → token-expiry detection → rollbackClaim bij exit≠0 zonder update_job_status → cleanup. Logging met ISO-timestamps voor elke fase. setInterval(60s) lease-renewal alleen voor SPRINT_IMPLEMENTATION. - T-22: bin/run-agent.sh — SEED_PROMPT + ALLOWED_TOOLS verwijderd; claude -p vervangen door `tsx /opt/agent/bin/run-one-job.ts`. TOKEN_EXPIRED detectie uitgebreid met exit_code==3 trigger. - T-23: CLAUDE.md herschreven — operationele loop weg, architectuur- uitleg toegevoegd, hardstop-regels (geen wait_for_job, check_queue_empty, job_heartbeat, git push). T-24 smoke-test gedeferd tot na merge scrum4me-mcp PR (Dockerfile clone't via MCP_GIT_REF, default 'main'); zie test_result-log voor verificatie- commando's. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b6bea1ecbb
commit
a6079892d7
3 changed files with 475 additions and 197 deletions
|
|
@ -3,14 +3,19 @@
|
|||
#
|
||||
# Strategie:
|
||||
# - Eerst pre-flight token-check (eenmalig, blokkeert start bij faal)
|
||||
# - Loop: claude -p met seed-prompt
|
||||
# - Exit 0 → de queue was leeg, sleep kort, herhaal
|
||||
# - Loop: tsx /opt/agent/bin/run-one-job.ts (één geclaimde job per iteratie)
|
||||
# - Exit 0 → de queue was leeg of de job is afgerond, sleep kort, herhaal
|
||||
# - Exit 3 → run-one-job detecteerde TOKEN_EXPIRED in Claude-output
|
||||
# - Exit ≠ 0 → exponential backoff, log, schrijf state, herhaal
|
||||
# - Bij N opeenvolgende fouten → schrijf UNHEALTHY marker; health
|
||||
# endpoint gaat op 503, container blijft runnen voor diagnose
|
||||
# - Bij gedetecteerde token-expiry → schrijf TOKEN_EXPIRED marker
|
||||
# en exit (compose start opnieuw, maar entrypoint zal dezelfde
|
||||
# marker zien via health-server)
|
||||
#
|
||||
# Claim/exec-loop zit in bin/run-one-job.ts (Node + tsx); deze shell doet
|
||||
# alleen daemon/backoff/health/log-rotation. Zie docs/plans/queue-loop-extraction.md
|
||||
# in de Scrum4Me-repo.
|
||||
|
||||
set -uo pipefail # let op: geen -e, we willen exit-codes inspecteren
|
||||
|
||||
|
|
@ -40,13 +45,10 @@ rm -f "${AGENT_STATE_DIR}/UNHEALTHY" "${AGENT_STATE_DIR}/TOKEN_EXPIRED"
|
|||
/opt/agent/bin/rotate-logs.sh || true
|
||||
/opt/agent/bin/log-cleanup.sh || true
|
||||
|
||||
# ----- seed prompt ------------------------------------------------------
|
||||
SEED_PROMPT='Pak de volgende job uit de Scrum4Me-queue en draai de queue leeg volgens de loop in /opt/agent/CLAUDE.md. Niet stoppen tussen jobs door. Sluit pas af zodra wait_for_job na de volledige block-time terugkomt zonder claim.'
|
||||
|
||||
# Tools-allowlist: alle MCP-tools die scrum4me-mcp aanbiedt + standaard
|
||||
# file/bash-tools. Geen WebFetch, geen WebSearch — de agent heeft die
|
||||
# niet nodig en uitsluiting verkleint het surface.
|
||||
ALLOWED_TOOLS='Read,Edit,Write,Bash,Grep,Glob,mcp__scrum4me__health,mcp__scrum4me__list_products,mcp__scrum4me__get_claude_context,mcp__scrum4me__wait_for_job,mcp__scrum4me__check_queue_empty,mcp__scrum4me__update_job_status,mcp__scrum4me__update_task_status,mcp__scrum4me__update_task_plan,mcp__scrum4me__log_implementation,mcp__scrum4me__log_test_result,mcp__scrum4me__log_commit,mcp__scrum4me__create_pbi,mcp__scrum4me__create_story,mcp__scrum4me__create_task,mcp__scrum4me__create_todo,mcp__scrum4me__ask_user_question,mcp__scrum4me__get_question_answer,mcp__scrum4me__list_open_questions,mcp__scrum4me__cancel_question,mcp__scrum4me__get_worker_settings,mcp__scrum4me__worker_heartbeat'
|
||||
# Geen seed-prompt en geen ALLOWED_TOOLS-string meer: per-job CLI-flags
|
||||
# (incl. --model, --permission-mode, --effort, --allowedTools en de
|
||||
# kind-specifieke prompt) worden door run-one-job.ts gebouwd uit
|
||||
# JobConfig (resolved via PBI-67's resolveJobConfig).
|
||||
|
||||
CONSEC_FAILURES=0
|
||||
BACKOFF=${AGENT_BACKOFF_START}
|
||||
|
|
@ -60,32 +62,25 @@ while true; do
|
|||
--argjson failures "$CONSEC_FAILURES" \
|
||||
'{status:"running", currentBatchStartedAt:$started, consecutiveFailures:$failures}')"
|
||||
|
||||
log "starting batch (log: ${run_log})"
|
||||
log "starting iteration (log: ${run_log})"
|
||||
|
||||
# claude -p met onze MCP-config en allowlist.
|
||||
# cwd = /opt/agent zodat onze CLAUDE.md auto-geladen wordt.
|
||||
#
|
||||
# --permission-mode bypassPermissions: alle resterende permission-
|
||||
# prompts uit. Veilig in deze container omdat (1) we draaien als
|
||||
# non-root agent-user, (2) geen push-credentials, (3) writes
|
||||
# gelimiteerd tot /tmp/job-*. De allowlist hierboven blijft als
|
||||
# belt-and-braces second filter.
|
||||
# Eén iteratie = één geclaimde job (of "geen job" → exit 0). De runner
|
||||
# claimt zelf via tryClaimJob, leest JobConfig (PBI-67), bouwt de
|
||||
# juiste Claude CLI-args, spawnt 'claude', wacht, sluit af.
|
||||
set +e
|
||||
claude -p "${SEED_PROMPT}" \
|
||||
--mcp-config /opt/agent/mcp-config.json \
|
||||
--allowedTools "${ALLOWED_TOOLS}" \
|
||||
--permission-mode bypassPermissions \
|
||||
--output-format text \
|
||||
> "${run_log}" 2>&1
|
||||
tsx /opt/agent/bin/run-one-job.ts > "${run_log}" 2>&1
|
||||
exit_code=$?
|
||||
set -e
|
||||
|
||||
iteration_end=$(date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
log "batch ended exit=${exit_code}"
|
||||
|
||||
# Token-expiry detectie: parse stderr/stdout op bekende strings.
|
||||
if grep -qE '(invalid_api_key|authentication.*failed|401.*unauthor|OAuth.*expired)' "${run_log}"; then
|
||||
log "AUTH FAILURE detected in run log — marking TOKEN_EXPIRED"
|
||||
# Token-expiry detectie: run-one-job.ts retourneert exit 3 wanneer het
|
||||
# bekende auth-error-strings in Claude's output ziet. We checken óók de
|
||||
# log-tekst voor het geval een ander pad het patroon raakt (bv. Prisma-
|
||||
# connection-error met OAuth-expired in error-body).
|
||||
if [[ "$exit_code" -eq 3 ]] || grep -qE '(invalid_api_key|authentication.*failed|401.*unauthor|OAuth.*expired)' "${run_log}"; then
|
||||
log "AUTH FAILURE detected (exit=$exit_code or pattern in log) — marking TOKEN_EXPIRED"
|
||||
touch "${AGENT_STATE_DIR}/TOKEN_EXPIRED"
|
||||
write_state "$(jq -n \
|
||||
--arg endedAt "$iteration_end" \
|
||||
|
|
|
|||
387
bin/run-one-job.ts
Normal file
387
bin/run-one-job.ts
Normal file
|
|
@ -0,0 +1,387 @@
|
|||
#!/usr/bin/env tsx
|
||||
// run-one-job.ts — handelt één geclaimde Scrum4Me-ClaudeJob af.
|
||||
//
|
||||
// Architectuur (zie docs/plans/queue-loop-extraction.md in Scrum4Me-repo):
|
||||
// scrum4me-docker/bin/run-agent.sh roept dit script per iteratie aan.
|
||||
// Stappen:
|
||||
// 1. getAuth → resolved userId/tokenId uit SCRUM4ME_TOKEN.
|
||||
// 2. quota-probe (was Claude's verantwoordelijkheid in CLAUDE.md stappen 0.x).
|
||||
// 3. resetStaleClaimedJobs → tryClaimJob, met LISTEN-fallback (270s) bij lege queue.
|
||||
// 4. getFullJobContext → resolved JobConfig + kind-specifieke payload.
|
||||
// 5. attachWorktreeToJob (alleen TASK_IMPLEMENTATION).
|
||||
// 6. Schrijf payload naar /tmp/job-<id>/payload.json.
|
||||
// 7. Bouw CLI-args uit ctx.config + mapBudgetToEffort.
|
||||
// 8. setInterval(60s) lease-renewal voor SPRINT_IMPLEMENTATION.
|
||||
// 9. spawn 'claude' met inherited stdio + scan voor token-expiry-patterns.
|
||||
// 10. try/finally: bij Claude-exit≠0 zonder update_job_status → rollbackClaim.
|
||||
// 11. cleanup payload + prisma.$disconnect().
|
||||
//
|
||||
// Exit-codes:
|
||||
// 0 = job afgehandeld of geen job binnen wait-deadline (idle)
|
||||
// 1 = generieke fout (claim, context-fetch, worktree, spawn)
|
||||
// 3 = TOKEN_EXPIRED detected → run-agent.sh schrijft TOKEN_EXPIRED marker
|
||||
|
||||
import { spawn, spawnSync } from 'node:child_process'
|
||||
import { mkdirSync, rmSync, writeFileSync } from 'node:fs'
|
||||
|
||||
import { Client as PgClient } from 'pg'
|
||||
|
||||
import { getAuth } from '/opt/scrum4me-mcp/src/auth.js'
|
||||
import { prisma } from '/opt/scrum4me-mcp/src/prisma.js'
|
||||
import {
|
||||
attachWorktreeToJob,
|
||||
getFullJobContext,
|
||||
releaseLocksOnTerminal,
|
||||
resetStaleClaimedJobs,
|
||||
rollbackClaim,
|
||||
tryClaimJob,
|
||||
} from '/opt/scrum4me-mcp/src/tools/wait-for-job.js'
|
||||
import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js'
|
||||
import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js'
|
||||
|
||||
// ----- logging --------------------------------------------------------
|
||||
const log = (msg: string) =>
|
||||
console.log(`${new Date().toISOString()} [run-one-job] ${msg}`)
|
||||
const logError = (msg: string) =>
|
||||
console.error(`${new Date().toISOString()} [run-one-job] ERROR ${msg}`)
|
||||
|
||||
// ----- constants ------------------------------------------------------
|
||||
const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const HEARTBEAT_INTERVAL_MS = 60_000
|
||||
const MCP_CONFIG = '/opt/agent/mcp-config.json'
|
||||
const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh'
|
||||
const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000
|
||||
const TOKEN_EXPIRY_PATTERNS: RegExp[] = [
|
||||
/invalid_api_key/i,
|
||||
/authentication.*failed/i,
|
||||
/401.*unauthor/i,
|
||||
/OAuth.*expired/i,
|
||||
]
|
||||
|
||||
// ----- quota probe ----------------------------------------------------
|
||||
async function quotaProbe(userId: string): Promise<void> {
|
||||
const probe = spawnSync(QUOTA_PROBE_PATH, [], { encoding: 'utf8' })
|
||||
if (probe.status !== 0) {
|
||||
logError(
|
||||
`quota probe failed: status=${probe.status} stderr=${(probe.stderr ?? '').trim()}`,
|
||||
)
|
||||
throw new Error('quota probe failed')
|
||||
}
|
||||
let parsed: { pct?: number; limit?: number; remaining?: number; reset_at_iso?: string }
|
||||
try {
|
||||
parsed = JSON.parse(probe.stdout)
|
||||
} catch {
|
||||
logError(`quota probe stdout not JSON: ${probe.stdout.slice(0, 200)}`)
|
||||
throw new Error('quota probe stdout invalid')
|
||||
}
|
||||
if (parsed.pct === undefined) {
|
||||
logError(`quota probe missing pct field: ${probe.stdout.slice(0, 200)}`)
|
||||
throw new Error('quota probe missing pct')
|
||||
}
|
||||
|
||||
const user = await prisma.user.findUnique({
|
||||
where: { id: userId },
|
||||
select: { min_quota_pct: true },
|
||||
})
|
||||
const minPct = user?.min_quota_pct ?? 0
|
||||
|
||||
log(
|
||||
`quota probe pct=${parsed.pct} min_quota_pct=${minPct} ` +
|
||||
`reset_at=${parsed.reset_at_iso ?? '-'}`,
|
||||
)
|
||||
|
||||
if (parsed.pct < minPct) {
|
||||
let sleepMs = QUOTA_BACKOFF_CAP_MS
|
||||
if (parsed.reset_at_iso) {
|
||||
const resetAt = new Date(parsed.reset_at_iso).getTime()
|
||||
const delta = resetAt - Date.now()
|
||||
if (delta > 0 && delta < sleepMs) sleepMs = delta
|
||||
}
|
||||
log(`quota below min — sleeping ${Math.round(sleepMs / 1000)}s until reset`)
|
||||
await new Promise((resolve) => setTimeout(resolve, sleepMs))
|
||||
}
|
||||
}
|
||||
|
||||
// ----- LISTEN-fallback voor lege queue -------------------------------
|
||||
async function waitForEnqueue(userId: string): Promise<void> {
|
||||
const dburl = process.env.DATABASE_URL
|
||||
if (!dburl) throw new Error('DATABASE_URL not set')
|
||||
const client = new PgClient({ connectionString: dburl })
|
||||
await client.connect()
|
||||
await client.query('LISTEN scrum4me_changes')
|
||||
|
||||
const deadline = Date.now() + WAIT_DEADLINE_SECONDS * 1000
|
||||
|
||||
try {
|
||||
while (Date.now() < deadline) {
|
||||
await new Promise<void>((resolve) => {
|
||||
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
|
||||
const onNotify = (msg: { payload?: string }) => {
|
||||
try {
|
||||
const payload = JSON.parse(msg.payload ?? '{}')
|
||||
if (
|
||||
payload.type === 'claude_job_enqueued' &&
|
||||
payload.user_id === userId
|
||||
) {
|
||||
clearTimeout(pollTimer)
|
||||
client.removeListener('notification', onNotify)
|
||||
resolve()
|
||||
}
|
||||
} catch {
|
||||
// ignore parse errors
|
||||
}
|
||||
}
|
||||
client.on('notification', onNotify)
|
||||
})
|
||||
// Out of the inner promise — caller will retry tryClaimJob.
|
||||
return
|
||||
}
|
||||
} finally {
|
||||
await client.end().catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
// ----- main -----------------------------------------------------------
|
||||
async function main(): Promise<number> {
|
||||
log('claim attempt starting')
|
||||
const { userId, tokenId } = await getAuth()
|
||||
log(`auth ok user_id=${userId} token_id=${tokenId}`)
|
||||
|
||||
// 1. Quota probe (gate vóór elke claim).
|
||||
try {
|
||||
await quotaProbe(userId)
|
||||
} catch (err) {
|
||||
logError(`quota probe error: ${(err as Error).message}`)
|
||||
return 1
|
||||
}
|
||||
|
||||
// 2. Reset stale claims, then attempt to claim.
|
||||
await resetStaleClaimedJobs(userId)
|
||||
let jobId = await tryClaimJob(userId, tokenId)
|
||||
if (!jobId) {
|
||||
log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`)
|
||||
await waitForEnqueue(userId)
|
||||
await resetStaleClaimedJobs(userId)
|
||||
jobId = await tryClaimJob(userId, tokenId)
|
||||
}
|
||||
if (!jobId) {
|
||||
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`)
|
||||
return 0
|
||||
}
|
||||
|
||||
log(`claimed job_id=${jobId}`)
|
||||
|
||||
// 3. Resolve full context.
|
||||
let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null
|
||||
try {
|
||||
ctx = await getFullJobContext(jobId)
|
||||
} catch (err) {
|
||||
logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`)
|
||||
log(`rollback claim job_id=${jobId} reason=context_fetch_failed`)
|
||||
await rollbackClaim(jobId)
|
||||
return 1
|
||||
}
|
||||
if (!ctx) {
|
||||
logError(`getFullJobContext returned null for job_id=${jobId}`)
|
||||
await rollbackClaim(jobId)
|
||||
return 1
|
||||
}
|
||||
|
||||
// 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
|
||||
// eigen worktree-pad al ingevuld door getFullJobContext.
|
||||
// We werken hier met `any` omdat de return-type van getFullJobContext een
|
||||
// discriminated union is en TypeScript hier zonder kind-narrow geen velden
|
||||
// exposed; de runtime checks dekken alle paden af.
|
||||
const ctxAny = ctx as any
|
||||
let worktreePath: string | null =
|
||||
ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
|
||||
|
||||
if (ctx.kind === 'TASK_IMPLEMENTATION') {
|
||||
if (!ctxAny.story || !ctxAny.task) {
|
||||
logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
const wt = await attachWorktreeToJob(
|
||||
ctxAny.product.id,
|
||||
jobId,
|
||||
ctxAny.story.id,
|
||||
ctxAny.task.repo_url,
|
||||
)
|
||||
if ('error' in wt) {
|
||||
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
|
||||
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
worktreePath = wt.worktree_path
|
||||
ctxAny.worktree_path = wt.worktree_path
|
||||
ctxAny.branch_name = wt.branch_name
|
||||
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
|
||||
} else if (worktreePath) {
|
||||
log(`worktree path=${worktreePath} (pre-resolved)`)
|
||||
}
|
||||
|
||||
// 5. Resolved config — log voor audit.
|
||||
const cfg = ctx.config
|
||||
const effort = mapBudgetToEffort(cfg.thinking_budget)
|
||||
log(
|
||||
`config job_id=${jobId} model=${cfg.model} ` +
|
||||
`permission_mode=${cfg.permission_mode} ` +
|
||||
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` +
|
||||
`max_turns=${cfg.max_turns ?? 'null'} ` +
|
||||
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
|
||||
)
|
||||
|
||||
// 6. Write payload to /tmp/job-<id>/payload.json.
|
||||
const payloadDir = `/tmp/job-${jobId}`
|
||||
const payloadPath = `${payloadDir}/payload.json`
|
||||
mkdirSync(payloadDir, { recursive: true })
|
||||
const payloadJson = JSON.stringify(ctx, null, 2)
|
||||
writeFileSync(payloadPath, payloadJson, 'utf8')
|
||||
log(
|
||||
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`,
|
||||
)
|
||||
|
||||
// 7. Build CLI args.
|
||||
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath)
|
||||
const args: string[] = [
|
||||
'-p',
|
||||
promptText,
|
||||
'--model',
|
||||
cfg.model,
|
||||
'--permission-mode',
|
||||
cfg.permission_mode,
|
||||
'--allowedTools',
|
||||
(cfg.allowed_tools ?? []).join(','),
|
||||
'--mcp-config',
|
||||
MCP_CONFIG,
|
||||
'--add-dir',
|
||||
'/opt/agent',
|
||||
'--output-format',
|
||||
'text',
|
||||
]
|
||||
if (effort) args.push('--effort', effort)
|
||||
|
||||
const cwd = worktreePath ?? '/opt/agent'
|
||||
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
|
||||
const argsForLog = args
|
||||
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
|
||||
.join(' ')
|
||||
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
|
||||
|
||||
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
|
||||
let heartbeatTimer: NodeJS.Timeout | null = null
|
||||
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
|
||||
heartbeatTimer = setInterval(() => {
|
||||
prisma
|
||||
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
|
||||
.then(() => {
|
||||
log(
|
||||
`heartbeat tick job_id=${jobId} lease_until=${new Date(
|
||||
Date.now() + 5 * 60_000,
|
||||
).toISOString()}`,
|
||||
)
|
||||
})
|
||||
.catch((err: Error) => {
|
||||
logError(`heartbeat error: ${err.message}`)
|
||||
})
|
||||
}, HEARTBEAT_INTERVAL_MS)
|
||||
}
|
||||
|
||||
// 9. Spawn Claude.
|
||||
const start = Date.now()
|
||||
let exitCode: number | null = null
|
||||
let stdoutBuf = ''
|
||||
try {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn('claude', args, { cwd })
|
||||
child.stdout.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stdout.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.stderr.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stderr.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.on('error', (err) => reject(err))
|
||||
child.on('close', (code) => {
|
||||
exitCode = code
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
logError(`spawn error: ${(err as Error).message}`)
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
await rollbackClaim(jobId).catch(() => {})
|
||||
await releaseLocksOnTerminal(jobId).catch(() => {})
|
||||
return 1
|
||||
} finally {
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
}
|
||||
|
||||
const durationMs = Date.now() - start
|
||||
log(
|
||||
`claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
|
||||
`duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
|
||||
)
|
||||
|
||||
// 10. Token-expiry detection.
|
||||
let tokenExpired = false
|
||||
for (const pat of TOKEN_EXPIRY_PATTERNS) {
|
||||
if (pat.test(stdoutBuf)) {
|
||||
tokenExpired = true
|
||||
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 11. Rollback claim if Claude exited non-zero without updating job-status.
|
||||
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
|
||||
if (exitCode !== 0 && !tokenExpired) {
|
||||
const jobNow = await prisma.claudeJob.findUnique({
|
||||
where: { id: jobId },
|
||||
select: { status: true },
|
||||
})
|
||||
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
|
||||
log(
|
||||
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
|
||||
)
|
||||
await rollbackClaim(jobId).catch(() => {})
|
||||
await releaseLocksOnTerminal(jobId).catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
// 12. Cleanup payload directory.
|
||||
try {
|
||||
rmSync(payloadDir, { recursive: true, force: true })
|
||||
} catch {
|
||||
// non-fatal
|
||||
}
|
||||
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
|
||||
|
||||
if (tokenExpired) return 3
|
||||
return exitCode ?? 1
|
||||
}
|
||||
|
||||
// ----- entry ----------------------------------------------------------
|
||||
process.on('SIGTERM', () => {
|
||||
prisma.$disconnect().finally(() => process.exit(143))
|
||||
})
|
||||
|
||||
main()
|
||||
.then(async (code) => {
|
||||
log(`exit code=${code}`)
|
||||
await prisma.$disconnect().catch(() => {})
|
||||
process.exit(code)
|
||||
})
|
||||
.catch(async (err) => {
|
||||
logError(`fatal: ${(err as Error).stack ?? err}`)
|
||||
await prisma.$disconnect().catch(() => {})
|
||||
process.exit(1)
|
||||
})
|
||||
Loading…
Add table
Add a link
Reference in a new issue