fix(runner): registreer worker-presence + 10s heartbeat in run-one-job
Tot nu toe schreef de NAS-runner nooit naar `claude_workers`, waardoor de UI de worker als offline toonde ondanks gezonde container-health. Direct na `getAuth()` doen we nu een UPSERT via `registerWorker` en starten we een 10s heartbeat die `last_seen_at` vers houdt tijdens quota-backoff, LISTEN-wait, claude-spawn en cleanup. De heartbeat stopt via try/finally op elk exit-pad. Bewust geen `unregisterWorker`: tussen iteraties zou dat UI-flicker geven, en abnormale exits worden door de UI's eigen 60s-prune opgevangen. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
38c0e5f103
commit
e8c4518abb
1 changed files with 221 additions and 198 deletions
|
|
@ -38,6 +38,8 @@ import {
|
|||
import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js'
|
||||
import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js'
|
||||
import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js'
|
||||
import { registerWorker } from '/opt/scrum4me-mcp/src/presence/worker.js'
|
||||
import { startHeartbeat } from '/opt/scrum4me-mcp/src/presence/heartbeat.js'
|
||||
|
||||
// ----- logging --------------------------------------------------------
|
||||
const log = (msg: string) =>
|
||||
|
|
@ -49,6 +51,7 @@ const logError = (msg: string) =>
|
|||
const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const HEARTBEAT_INTERVAL_MS = 60_000
|
||||
const WORKER_HEARTBEAT_INTERVAL_MS = 10_000
|
||||
const MCP_CONFIG = '/opt/agent/mcp-config.json'
|
||||
const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh'
|
||||
const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000
|
||||
|
|
@ -152,225 +155,245 @@ async function main(): Promise<number> {
|
|||
const { userId, tokenId } = await getAuth()
|
||||
log(`auth ok user_id=${userId} token_id=${tokenId}`)
|
||||
|
||||
// 1. Quota probe (gate vóór elke claim).
|
||||
// Worker presence — UI leest claude_workers.last_seen_at.
|
||||
// UPSERT zodat de rij tussen iteraties blijft bestaan (geen flicker),
|
||||
// en heartbeat houdt last_seen_at vers tijdens quota-backoff,
|
||||
// LISTEN-wait, claude-spawn, en cleanup. Niet unregisteren bij exit:
|
||||
// de UI prunet zelf rijen ouder dan 60s.
|
||||
try {
|
||||
await quotaProbe(userId)
|
||||
await registerWorker({ userId, tokenId })
|
||||
} catch (err) {
|
||||
logError(`quota probe error: ${(err as Error).message}`)
|
||||
return 1
|
||||
logError(`registerWorker failed (non-fatal): ${(err as Error).message}`)
|
||||
}
|
||||
const workerHeartbeat = startHeartbeat({
|
||||
userId,
|
||||
tokenId,
|
||||
intervalMs: WORKER_HEARTBEAT_INTERVAL_MS,
|
||||
})
|
||||
|
||||
// 2. Reset stale claims, then attempt to claim.
|
||||
await resetStaleClaimedJobs(userId)
|
||||
let jobId = await tryClaimJob(userId, tokenId)
|
||||
if (!jobId) {
|
||||
log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`)
|
||||
await waitForEnqueue(userId)
|
||||
try {
|
||||
// 1. Quota probe (gate vóór elke claim).
|
||||
try {
|
||||
await quotaProbe(userId)
|
||||
} catch (err) {
|
||||
logError(`quota probe error: ${(err as Error).message}`)
|
||||
return 1
|
||||
}
|
||||
|
||||
// 2. Reset stale claims, then attempt to claim.
|
||||
await resetStaleClaimedJobs(userId)
|
||||
jobId = await tryClaimJob(userId, tokenId)
|
||||
}
|
||||
if (!jobId) {
|
||||
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`)
|
||||
return 0
|
||||
}
|
||||
let jobId = await tryClaimJob(userId, tokenId)
|
||||
if (!jobId) {
|
||||
log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`)
|
||||
await waitForEnqueue(userId)
|
||||
await resetStaleClaimedJobs(userId)
|
||||
jobId = await tryClaimJob(userId, tokenId)
|
||||
}
|
||||
if (!jobId) {
|
||||
log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`)
|
||||
return 0
|
||||
}
|
||||
|
||||
log(`claimed job_id=${jobId}`)
|
||||
log(`claimed job_id=${jobId}`)
|
||||
|
||||
// 3. Resolve full context.
|
||||
let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null
|
||||
try {
|
||||
ctx = await getFullJobContext(jobId)
|
||||
} catch (err) {
|
||||
logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`)
|
||||
log(`rollback claim job_id=${jobId} reason=context_fetch_failed`)
|
||||
await rollbackClaim(jobId)
|
||||
return 1
|
||||
}
|
||||
if (!ctx) {
|
||||
logError(`getFullJobContext returned null for job_id=${jobId}`)
|
||||
await rollbackClaim(jobId)
|
||||
return 1
|
||||
}
|
||||
|
||||
// 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
|
||||
// eigen worktree-pad al ingevuld door getFullJobContext.
|
||||
// We werken hier met `any` omdat de return-type van getFullJobContext een
|
||||
// discriminated union is en TypeScript hier zonder kind-narrow geen velden
|
||||
// exposed; de runtime checks dekken alle paden af.
|
||||
const ctxAny = ctx as any
|
||||
let worktreePath: string | null =
|
||||
ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
|
||||
|
||||
if (ctx.kind === 'TASK_IMPLEMENTATION') {
|
||||
if (!ctxAny.story || !ctxAny.task) {
|
||||
logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
|
||||
// 3. Resolve full context.
|
||||
let ctx: Awaited<ReturnType<typeof getFullJobContext>> = null
|
||||
try {
|
||||
ctx = await getFullJobContext(jobId)
|
||||
} catch (err) {
|
||||
logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`)
|
||||
log(`rollback claim job_id=${jobId} reason=context_fetch_failed`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
const wt = await attachWorktreeToJob(
|
||||
ctxAny.product.id,
|
||||
jobId,
|
||||
ctxAny.story.id,
|
||||
ctxAny.task.repo_url,
|
||||
)
|
||||
if ('error' in wt) {
|
||||
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
|
||||
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
|
||||
if (!ctx) {
|
||||
logError(`getFullJobContext returned null for job_id=${jobId}`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
worktreePath = wt.worktree_path
|
||||
ctxAny.worktree_path = wt.worktree_path
|
||||
ctxAny.branch_name = wt.branch_name
|
||||
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
|
||||
} else if (worktreePath) {
|
||||
log(`worktree path=${worktreePath} (pre-resolved)`)
|
||||
}
|
||||
|
||||
// 5. Resolved config — log voor audit.
|
||||
const cfg = ctx.config
|
||||
const effort = mapBudgetToEffort(cfg.thinking_budget)
|
||||
log(
|
||||
`config job_id=${jobId} model=${cfg.model} ` +
|
||||
`permission_mode=${cfg.permission_mode} ` +
|
||||
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` +
|
||||
`max_turns=${cfg.max_turns ?? 'null'} ` +
|
||||
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
|
||||
)
|
||||
// 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun
|
||||
// eigen worktree-pad al ingevuld door getFullJobContext.
|
||||
// We werken hier met `any` omdat de return-type van getFullJobContext een
|
||||
// discriminated union is en TypeScript hier zonder kind-narrow geen velden
|
||||
// exposed; de runtime checks dekken alle paden af.
|
||||
const ctxAny = ctx as any
|
||||
let worktreePath: string | null =
|
||||
ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null
|
||||
|
||||
// 6. Write payload to /tmp/job-<id>/payload.json.
|
||||
const payloadDir = `/tmp/job-${jobId}`
|
||||
const payloadPath = `${payloadDir}/payload.json`
|
||||
mkdirSync(payloadDir, { recursive: true })
|
||||
const payloadJson = JSON.stringify(ctx, null, 2)
|
||||
writeFileSync(payloadPath, payloadJson, 'utf8')
|
||||
log(
|
||||
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`,
|
||||
)
|
||||
|
||||
// 7. Build CLI args.
|
||||
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath)
|
||||
const args: string[] = [
|
||||
'-p',
|
||||
promptText,
|
||||
'--model',
|
||||
cfg.model,
|
||||
'--permission-mode',
|
||||
cfg.permission_mode,
|
||||
'--allowedTools',
|
||||
(cfg.allowed_tools ?? []).join(','),
|
||||
'--mcp-config',
|
||||
MCP_CONFIG,
|
||||
'--add-dir',
|
||||
'/opt/agent',
|
||||
'--output-format',
|
||||
'text',
|
||||
]
|
||||
if (effort) args.push('--effort', effort)
|
||||
|
||||
const cwd = worktreePath ?? '/opt/agent'
|
||||
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
|
||||
const argsForLog = args
|
||||
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
|
||||
.join(' ')
|
||||
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
|
||||
|
||||
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
|
||||
let heartbeatTimer: NodeJS.Timeout | null = null
|
||||
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
|
||||
heartbeatTimer = setInterval(() => {
|
||||
prisma
|
||||
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
|
||||
.then(() => {
|
||||
log(
|
||||
`heartbeat tick job_id=${jobId} lease_until=${new Date(
|
||||
Date.now() + 5 * 60_000,
|
||||
).toISOString()}`,
|
||||
)
|
||||
})
|
||||
.catch((err: Error) => {
|
||||
logError(`heartbeat error: ${err.message}`)
|
||||
})
|
||||
}, HEARTBEAT_INTERVAL_MS)
|
||||
}
|
||||
|
||||
// 9. Spawn Claude.
|
||||
const start = Date.now()
|
||||
let exitCode: number | null = null
|
||||
let stdoutBuf = ''
|
||||
try {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn('claude', args, { cwd })
|
||||
child.stdout.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stdout.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.stderr.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stderr.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.on('error', (err) => reject(err))
|
||||
child.on('close', (code) => {
|
||||
exitCode = code
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
logError(`spawn error: ${(err as Error).message}`)
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
await rollbackClaim(jobId).catch(() => {})
|
||||
await releaseLocksOnTerminal(jobId).catch(() => {})
|
||||
return 1
|
||||
} finally {
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
}
|
||||
|
||||
const durationMs = Date.now() - start
|
||||
log(
|
||||
`claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
|
||||
`duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
|
||||
)
|
||||
|
||||
// 10. Token-expiry detection.
|
||||
let tokenExpired = false
|
||||
for (const pat of TOKEN_EXPIRY_PATTERNS) {
|
||||
if (pat.test(stdoutBuf)) {
|
||||
tokenExpired = true
|
||||
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 11. Rollback claim if Claude exited non-zero without updating job-status.
|
||||
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
|
||||
if (exitCode !== 0 && !tokenExpired) {
|
||||
const jobNow = await prisma.claudeJob.findUnique({
|
||||
where: { id: jobId },
|
||||
select: { status: true },
|
||||
})
|
||||
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
|
||||
log(
|
||||
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
|
||||
if (ctx.kind === 'TASK_IMPLEMENTATION') {
|
||||
if (!ctxAny.story || !ctxAny.task) {
|
||||
logError(`TASK_IMPLEMENTATION job has incomplete story/task context`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
const wt = await attachWorktreeToJob(
|
||||
ctxAny.product.id,
|
||||
jobId,
|
||||
ctxAny.story.id,
|
||||
ctxAny.task.repo_url,
|
||||
)
|
||||
if ('error' in wt) {
|
||||
logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`)
|
||||
log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`)
|
||||
await rollbackClaim(jobId)
|
||||
await releaseLocksOnTerminal(jobId)
|
||||
return 1
|
||||
}
|
||||
worktreePath = wt.worktree_path
|
||||
ctxAny.worktree_path = wt.worktree_path
|
||||
ctxAny.branch_name = wt.branch_name
|
||||
log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`)
|
||||
} else if (worktreePath) {
|
||||
log(`worktree path=${worktreePath} (pre-resolved)`)
|
||||
}
|
||||
|
||||
// 5. Resolved config — log voor audit.
|
||||
const cfg = ctx.config
|
||||
const effort = mapBudgetToEffort(cfg.thinking_budget)
|
||||
log(
|
||||
`config job_id=${jobId} model=${cfg.model} ` +
|
||||
`permission_mode=${cfg.permission_mode} ` +
|
||||
`thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` +
|
||||
`max_turns=${cfg.max_turns ?? 'null'} ` +
|
||||
`allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`,
|
||||
)
|
||||
|
||||
// 6. Write payload to /tmp/job-<id>/payload.json.
|
||||
const payloadDir = `/tmp/job-${jobId}`
|
||||
const payloadPath = `${payloadDir}/payload.json`
|
||||
mkdirSync(payloadDir, { recursive: true })
|
||||
const payloadJson = JSON.stringify(ctx, null, 2)
|
||||
writeFileSync(payloadPath, payloadJson, 'utf8')
|
||||
log(
|
||||
`payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`,
|
||||
)
|
||||
|
||||
// 7. Build CLI args.
|
||||
const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath)
|
||||
const args: string[] = [
|
||||
'-p',
|
||||
promptText,
|
||||
'--model',
|
||||
cfg.model,
|
||||
'--permission-mode',
|
||||
cfg.permission_mode,
|
||||
'--allowedTools',
|
||||
(cfg.allowed_tools ?? []).join(','),
|
||||
'--mcp-config',
|
||||
MCP_CONFIG,
|
||||
'--add-dir',
|
||||
'/opt/agent',
|
||||
'--output-format',
|
||||
'text',
|
||||
]
|
||||
if (effort) args.push('--effort', effort)
|
||||
|
||||
const cwd = worktreePath ?? '/opt/agent'
|
||||
// Log args zonder de volledige prompt-tekst (kan kilo's groot zijn).
|
||||
const argsForLog = args
|
||||
.map((a, i) => (i === 1 ? `<prompt-${promptText.length}-chars>` : a))
|
||||
.join(' ')
|
||||
log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`)
|
||||
|
||||
// 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION.
|
||||
let heartbeatTimer: NodeJS.Timeout | null = null
|
||||
if (ctx.kind === 'SPRINT_IMPLEMENTATION') {
|
||||
heartbeatTimer = setInterval(() => {
|
||||
prisma
|
||||
.$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}`
|
||||
.then(() => {
|
||||
log(
|
||||
`heartbeat tick job_id=${jobId} lease_until=${new Date(
|
||||
Date.now() + 5 * 60_000,
|
||||
).toISOString()}`,
|
||||
)
|
||||
})
|
||||
.catch((err: Error) => {
|
||||
logError(`heartbeat error: ${err.message}`)
|
||||
})
|
||||
}, HEARTBEAT_INTERVAL_MS)
|
||||
}
|
||||
|
||||
// 9. Spawn Claude.
|
||||
const start = Date.now()
|
||||
let exitCode: number | null = null
|
||||
let stdoutBuf = ''
|
||||
try {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn('claude', args, { cwd })
|
||||
child.stdout.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stdout.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.stderr.on('data', (chunk) => {
|
||||
const s = chunk.toString()
|
||||
process.stderr.write(s)
|
||||
stdoutBuf += s
|
||||
})
|
||||
child.on('error', (err) => reject(err))
|
||||
child.on('close', (code) => {
|
||||
exitCode = code
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
logError(`spawn error: ${(err as Error).message}`)
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
await rollbackClaim(jobId).catch(() => {})
|
||||
await releaseLocksOnTerminal(jobId).catch(() => {})
|
||||
return 1
|
||||
} finally {
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
}
|
||||
}
|
||||
|
||||
// 12. Cleanup payload directory.
|
||||
try {
|
||||
rmSync(payloadDir, { recursive: true, force: true })
|
||||
} catch {
|
||||
// non-fatal
|
||||
}
|
||||
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
|
||||
const durationMs = Date.now() - start
|
||||
log(
|
||||
`claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` +
|
||||
`duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`,
|
||||
)
|
||||
|
||||
if (tokenExpired) return 3
|
||||
return exitCode ?? 1
|
||||
// 10. Token-expiry detection.
|
||||
let tokenExpired = false
|
||||
for (const pat of TOKEN_EXPIRY_PATTERNS) {
|
||||
if (pat.test(stdoutBuf)) {
|
||||
tokenExpired = true
|
||||
log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 11. Rollback claim if Claude exited non-zero without updating job-status.
|
||||
// PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min.
|
||||
if (exitCode !== 0 && !tokenExpired) {
|
||||
const jobNow = await prisma.claudeJob.findUnique({
|
||||
where: { id: jobId },
|
||||
select: { status: true },
|
||||
})
|
||||
if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') {
|
||||
log(
|
||||
`rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`,
|
||||
)
|
||||
await rollbackClaim(jobId).catch(() => {})
|
||||
await releaseLocksOnTerminal(jobId).catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
// 12. Cleanup payload directory.
|
||||
try {
|
||||
rmSync(payloadDir, { recursive: true, force: true })
|
||||
} catch {
|
||||
// non-fatal
|
||||
}
|
||||
log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`)
|
||||
|
||||
if (tokenExpired) return 3
|
||||
return exitCode ?? 1
|
||||
} finally {
|
||||
workerHeartbeat.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// ----- entry ----------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue