Merge pull request #17 from madhura68/claude/determined-wright-14cfbf

fix(runner): worker-presence heartbeat in run-one-job
This commit is contained in:
Janpeter Visser 2026-05-11 02:37:41 +02:00 committed by GitHub
commit 7ec32c8def
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -38,6 +38,8 @@ import {
import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js' import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js'
import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js' import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js'
import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js' import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js'
import { registerWorker } from '/opt/scrum4me-mcp/src/presence/worker.js'
import { startHeartbeat } from '/opt/scrum4me-mcp/src/presence/heartbeat.js'
// ----- logging -------------------------------------------------------- // ----- logging --------------------------------------------------------
const log = (msg: string) => const log = (msg: string) =>
@ -49,6 +51,7 @@ const logError = (msg: string) =>
const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job
const POLL_INTERVAL_MS = 5000 const POLL_INTERVAL_MS = 5000
const HEARTBEAT_INTERVAL_MS = 60_000 const HEARTBEAT_INTERVAL_MS = 60_000
const WORKER_HEARTBEAT_INTERVAL_MS = 10_000
const MCP_CONFIG = '/opt/agent/mcp-config.json' const MCP_CONFIG = '/opt/agent/mcp-config.json'
const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh' const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh'
const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000 const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000
@ -152,225 +155,245 @@ async function main(): Promise<number> {
const { userId, tokenId } = await getAuth() const { userId, tokenId } = await getAuth()
log(`auth ok user_id=${userId} token_id=${tokenId}`) log(`auth ok user_id=${userId} token_id=${tokenId}`)
// 1. Quota probe (gate vóór elke claim). // Worker presence — UI leest claude_workers.last_seen_at.
// UPSERT zodat de rij tussen iteraties blijft bestaan (geen flicker),
// en heartbeat houdt last_seen_at vers tijdens quota-backoff,
// LISTEN-wait, claude-spawn, en cleanup. Niet unregisteren bij exit:
// de UI prunet zelf rijen ouder dan 60s.
try { try {
await quotaProbe(userId) await registerWorker({ userId, tokenId })
} catch (err) { } catch (err) {
logError(`quota probe error: ${(err as Error).message}`) logError(`registerWorker failed (non-fatal): ${(err as Error).message}`)
return 1
} }
const workerHeartbeat = startHeartbeat({
userId,
tokenId,
intervalMs: WORKER_HEARTBEAT_INTERVAL_MS,
})
// 2. Reset stale claims, then attempt to claim. try {
await resetStaleClaimedJobs(userId) // 1. Quota probe (gate vóór elke claim).
let jobId = await tryClaimJob(userId, tokenId) try {
if (!jobId) { await quotaProbe(userId)
log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`) } catch (err) {
await waitForEnqueue(userId) logError(`quota probe error: ${(err as Error).message}`)
return 1
}
// 2. Reset stale claims, then attempt to claim.
await resetStaleClaimedJobs(userId) await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId) let jobId = await tryClaimJob(userId, tokenId)
} if (!jobId) {
if (!jobId) { log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`)
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`) await waitForEnqueue(userId)
return 0 await resetStaleClaimedJobs(userId)
} jobId = await tryClaimJob(userId, tokenId)
}
if (!jobId) {
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`)
return 0
}
log(`claimed job_id=${jobId}`) log(`claimed job_id=${jobId}`)
// 3. Resolve full context. // 3. Resolve full context.
let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null
try { try {
ctx = await getFullJobContext(jobId) ctx = await getFullJobContext(jobId)
} catch (err) { } catch (err) {
logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`) logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`)
log(`rollback claim job_id=${jobId} reason=context_fetch_failed`) log(`rollback claim job_id=${jobId} reason=context_fetch_failed`)
await rollbackClaim(jobId)
return 1
}
if (!ctx) {
logError(`getFullJobContext returned null for job_id=${jobId}`)
await rollbackClaim(jobId)
return 1
}
// 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
// eigen worktree-pad al ingevuld door getFullJobContext.
// We werken hier met `any` omdat de return-type van getFullJobContext een
// discriminated union is en TypeScript hier zonder kind-narrow geen velden
// exposed; de runtime checks dekken alle paden af.
const ctxAny = ctx as any
let worktreePath: string | null =
ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
if (ctx.kind === 'TASK_IMPLEMENTATION') {
if (!ctxAny.story || !ctxAny.task) {
logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
await rollbackClaim(jobId) await rollbackClaim(jobId)
await releaseLocksOnTerminal(jobId)
return 1 return 1
} }
const wt = await attachWorktreeToJob( if (!ctx) {
ctxAny.product.id, logError(`getFullJobContext returned null for job_id=${jobId}`)
jobId,
ctxAny.story.id,
ctxAny.task.repo_url,
)
if ('error' in wt) {
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
await rollbackClaim(jobId) await rollbackClaim(jobId)
await releaseLocksOnTerminal(jobId)
return 1 return 1
} }
worktreePath = wt.worktree_path
ctxAny.worktree_path = wt.worktree_path
ctxAny.branch_name = wt.branch_name
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
} else if (worktreePath) {
log(`worktree path=${worktreePath} (pre-resolved)`)
}
// 5. Resolved config — log voor audit. // 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
const cfg = ctx.config // eigen worktree-pad al ingevuld door getFullJobContext.
const effort = mapBudgetToEffort(cfg.thinking_budget) // We werken hier met `any` omdat de return-type van getFullJobContext een
log( // discriminated union is en TypeScript hier zonder kind-narrow geen velden
`config job_id=${jobId} model=${cfg.model} ` + // exposed; de runtime checks dekken alle paden af.
`permission_mode=${cfg.permission_mode} ` + const ctxAny = ctx as any
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` + let worktreePath: string | null =
`max_turns=${cfg.max_turns ?? 'null'} ` + ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
)
// 6. Write payload to /tmp/job-<id>/payload.json. if (ctx.kind === 'TASK_IMPLEMENTATION') {
const payloadDir = `/tmp/job-${jobId}` if (!ctxAny.story || !ctxAny.task) {
const payloadPath = `${payloadDir}/payload.json` logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
mkdirSync(payloadDir, { recursive: true }) await rollbackClaim(jobId)
const payloadJson = JSON.stringify(ctx, null, 2) await releaseLocksOnTerminal(jobId)
writeFileSync(payloadPath, payloadJson, 'utf8') return 1
log( }
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`, const wt = await attachWorktreeToJob(
) ctxAny.product.id,
jobId,
// 7. Build CLI args. ctxAny.story.id,
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath) ctxAny.task.repo_url,
const args: string[] = [
'-p',
promptText,
'--model',
cfg.model,
'--permission-mode',
cfg.permission_mode,
'--allowedTools',
(cfg.allowed_tools ?? []).join(','),
'--mcp-config',
MCP_CONFIG,
'--add-dir',
'/opt/agent',
'--output-format',
'text',
]
if (effort) args.push('--effort', effort)
const cwd = worktreePath ?? '/opt/agent'
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
const argsForLog = args
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
.join(' ')
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
let heartbeatTimer: NodeJS.Timeout | null = null
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
heartbeatTimer = setInterval(() => {
prisma
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
.then(() => {
log(
`heartbeat tick job_id=${jobId} lease_until=${new Date(
Date.now() + 5 * 60_000,
).toISOString()}`,
)
})
.catch((err: Error) => {
logError(`heartbeat error: ${err.message}`)
})
}, HEARTBEAT_INTERVAL_MS)
}
// 9. Spawn Claude.
const start = Date.now()
let exitCode: number | null = null
let stdoutBuf = ''
try {
await new Promise<void>((resolve, reject) => {
const child = spawn('claude', args, { cwd })
child.stdout.on('data', (chunk) => {
const s = chunk.toString()
process.stdout.write(s)
stdoutBuf += s
})
child.stderr.on('data', (chunk) => {
const s = chunk.toString()
process.stderr.write(s)
stdoutBuf += s
})
child.on('error', (err) => reject(err))
child.on('close', (code) => {
exitCode = code
resolve()
})
})
} catch (err) {
logError(`spawn error: ${(err as Error).message}`)
if (heartbeatTimer) clearInterval(heartbeatTimer)
await rollbackClaim(jobId).catch(() => {})
await releaseLocksOnTerminal(jobId).catch(() => {})
return 1
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer)
}
const durationMs = Date.now() - start
log(
`claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
`duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
)
// 10. Token-expiry detection.
let tokenExpired = false
for (const pat of TOKEN_EXPIRY_PATTERNS) {
if (pat.test(stdoutBuf)) {
tokenExpired = true
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
break
}
}
// 11. Rollback claim if Claude exited non-zero without updating job-status.
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
if (exitCode !== 0 && !tokenExpired) {
const jobNow = await prisma.claudeJob.findUnique({
where: { id: jobId },
select: { status: true },
})
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
log(
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
) )
if ('error' in wt) {
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
await rollbackClaim(jobId)
await releaseLocksOnTerminal(jobId)
return 1
}
worktreePath = wt.worktree_path
ctxAny.worktree_path = wt.worktree_path
ctxAny.branch_name = wt.branch_name
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
} else if (worktreePath) {
log(`worktree path=${worktreePath} (pre-resolved)`)
}
// 5. Resolved config — log voor audit.
const cfg = ctx.config
const effort = mapBudgetToEffort(cfg.thinking_budget)
log(
`config job_id=${jobId} model=${cfg.model} ` +
`permission_mode=${cfg.permission_mode} ` +
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` +
`max_turns=${cfg.max_turns ?? 'null'} ` +
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
)
// 6. Write payload to /tmp/job-<id>/payload.json.
const payloadDir = `/tmp/job-${jobId}`
const payloadPath = `${payloadDir}/payload.json`
mkdirSync(payloadDir, { recursive: true })
const payloadJson = JSON.stringify(ctx, null, 2)
writeFileSync(payloadPath, payloadJson, 'utf8')
log(
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`,
)
// 7. Build CLI args.
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath)
const args: string[] = [
'-p',
promptText,
'--model',
cfg.model,
'--permission-mode',
cfg.permission_mode,
'--allowedTools',
(cfg.allowed_tools ?? []).join(','),
'--mcp-config',
MCP_CONFIG,
'--add-dir',
'/opt/agent',
'--output-format',
'text',
]
if (effort) args.push('--effort', effort)
const cwd = worktreePath ?? '/opt/agent'
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
const argsForLog = args
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
.join(' ')
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
let heartbeatTimer: NodeJS.Timeout | null = null
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
heartbeatTimer = setInterval(() => {
prisma
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
.then(() => {
log(
`heartbeat tick job_id=${jobId} lease_until=${new Date(
Date.now() + 5 * 60_000,
).toISOString()}`,
)
})
.catch((err: Error) => {
logError(`heartbeat error: ${err.message}`)
})
}, HEARTBEAT_INTERVAL_MS)
}
// 9. Spawn Claude.
const start = Date.now()
let exitCode: number | null = null
let stdoutBuf = ''
try {
await new Promise<void>((resolve, reject) => {
const child = spawn('claude', args, { cwd })
child.stdout.on('data', (chunk) => {
const s = chunk.toString()
process.stdout.write(s)
stdoutBuf += s
})
child.stderr.on('data', (chunk) => {
const s = chunk.toString()
process.stderr.write(s)
stdoutBuf += s
})
child.on('error', (err) => reject(err))
child.on('close', (code) => {
exitCode = code
resolve()
})
})
} catch (err) {
logError(`spawn error: ${(err as Error).message}`)
if (heartbeatTimer) clearInterval(heartbeatTimer)
await rollbackClaim(jobId).catch(() => {}) await rollbackClaim(jobId).catch(() => {})
await releaseLocksOnTerminal(jobId).catch(() => {}) await releaseLocksOnTerminal(jobId).catch(() => {})
return 1
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer)
} }
}
// 12. Cleanup payload directory. const durationMs = Date.now() - start
try { log(
rmSync(payloadDir, { recursive: true, force: true }) `claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
} catch { `duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
// non-fatal )
}
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
if (tokenExpired) return 3 // 10. Token-expiry detection.
return exitCode ?? 1 let tokenExpired = false
for (const pat of TOKEN_EXPIRY_PATTERNS) {
if (pat.test(stdoutBuf)) {
tokenExpired = true
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
break
}
}
// 11. Rollback claim if Claude exited non-zero without updating job-status.
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
if (exitCode !== 0 && !tokenExpired) {
const jobNow = await prisma.claudeJob.findUnique({
where: { id: jobId },
select: { status: true },
})
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
log(
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
)
await rollbackClaim(jobId).catch(() => {})
await releaseLocksOnTerminal(jobId).catch(() => {})
}
}
// 12. Cleanup payload directory.
try {
rmSync(payloadDir, { recursive: true, force: true })
} catch {
// non-fatal
}
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
if (tokenExpired) return 3
return exitCode ?? 1
} finally {
workerHeartbeat.stop()
}
} }
// ----- entry ---------------------------------------------------------- // ----- entry ----------------------------------------------------------