diff --git a/bin/run-one-job.ts b/bin/run-one-job.ts index 0b3580b..3da7cf9 100644 --- a/bin/run-one-job.ts +++ b/bin/run-one-job.ts @@ -38,6 +38,8 @@ import { import { releaseLocksOnTerminal } from '/opt/scrum4me-mcp/src/git/job-locks.js' import { mapBudgetToEffort } from '/opt/scrum4me-mcp/src/lib/job-config.js' import { getKindPromptText } from '/opt/scrum4me-mcp/src/lib/kind-prompts.js' +import { registerWorker } from '/opt/scrum4me-mcp/src/presence/worker.js' +import { startHeartbeat } from '/opt/scrum4me-mcp/src/presence/heartbeat.js' // ----- logging -------------------------------------------------------- const log = (msg: string) => @@ -49,6 +51,7 @@ const logError = (msg: string) => const WAIT_DEADLINE_SECONDS = 270 // ruim binnen MAX_WAIT_SECONDS van wait_for_job const POLL_INTERVAL_MS = 5000 const HEARTBEAT_INTERVAL_MS = 60_000 +const WORKER_HEARTBEAT_INTERVAL_MS = 10_000 const MCP_CONFIG = '/opt/agent/mcp-config.json' const QUOTA_PROBE_PATH = '/opt/agent/bin/worker-quota-probe.sh' const QUOTA_BACKOFF_CAP_MS = 30 * 60 * 1000 @@ -152,225 +155,245 @@ async function main(): Promise { const { userId, tokenId } = await getAuth() log(`auth ok user_id=${userId} token_id=${tokenId}`) - // 1. Quota probe (gate vóór elke claim). + // Worker presence — UI leest claude_workers.last_seen_at. + // UPSERT zodat de rij tussen iteraties blijft bestaan (geen flicker), + // en heartbeat houdt last_seen_at vers tijdens quota-backoff, + // LISTEN-wait, claude-spawn, en cleanup. Niet unregisteren bij exit: + // de UI prunet zelf rijen ouder dan 60s. try { - await quotaProbe(userId) + await registerWorker({ userId, tokenId }) } catch (err) { - logError(`quota probe error: ${(err as Error).message}`) - return 1 + logError(`registerWorker failed (non-fatal): ${(err as Error).message}`) } + const workerHeartbeat = startHeartbeat({ + userId, + tokenId, + intervalMs: WORKER_HEARTBEAT_INTERVAL_MS, + }) - // 2. Reset stale claims, then attempt to claim. - await resetStaleClaimedJobs(userId) - let jobId = await tryClaimJob(userId, tokenId) - if (!jobId) { - log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`) - await waitForEnqueue(userId) + try { + // 1. Quota probe (gate vóór elke claim). + try { + await quotaProbe(userId) + } catch (err) { + logError(`quota probe error: ${(err as Error).message}`) + return 1 + } + + // 2. Reset stale claims, then attempt to claim. await resetStaleClaimedJobs(userId) - jobId = await tryClaimJob(userId, tokenId) - } - if (!jobId) { - log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`) - return 0 - } + let jobId = await tryClaimJob(userId, tokenId) + if (!jobId) { + log(`no job claimed — LISTEN scrum4me_changes deadline=${WAIT_DEADLINE_SECONDS}s`) + await waitForEnqueue(userId) + await resetStaleClaimedJobs(userId) + jobId = await tryClaimJob(userId, tokenId) + } + if (!jobId) { + log(`claim timeout after ${WAIT_DEADLINE_SECONDS}s — exiting 0`) + return 0 + } - log(`claimed job_id=${jobId}`) + log(`claimed job_id=${jobId}`) - // 3. Resolve full context. - let ctx: Awaited> = null - try { - ctx = await getFullJobContext(jobId) - } catch (err) { - logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`) - log(`rollback claim job_id=${jobId} reason=context_fetch_failed`) - await rollbackClaim(jobId) - return 1 - } - if (!ctx) { - logError(`getFullJobContext returned null for job_id=${jobId}`) - await rollbackClaim(jobId) - return 1 - } - - // 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun - // eigen worktree-pad al ingevuld door getFullJobContext. - // We werken hier met `any` omdat de return-type van getFullJobContext een - // discriminated union is en TypeScript hier zonder kind-narrow geen velden - // exposed; de runtime checks dekken alle paden af. - const ctxAny = ctx as any - let worktreePath: string | null = - ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null - - if (ctx.kind === 'TASK_IMPLEMENTATION') { - if (!ctxAny.story || !ctxAny.task) { - logError(`TASK_IMPLEMENTATION job has incomplete story/task context`) + // 3. Resolve full context. + let ctx: Awaited> = null + try { + ctx = await getFullJobContext(jobId) + } catch (err) { + logError(`getFullJobContext error job_id=${jobId} ${(err as Error).message}`) + log(`rollback claim job_id=${jobId} reason=context_fetch_failed`) await rollbackClaim(jobId) - await releaseLocksOnTerminal(jobId) return 1 } - const wt = await attachWorktreeToJob( - ctxAny.product.id, - jobId, - ctxAny.story.id, - ctxAny.task.repo_url, - ) - if ('error' in wt) { - logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`) - log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`) + if (!ctx) { + logError(`getFullJobContext returned null for job_id=${jobId}`) await rollbackClaim(jobId) - await releaseLocksOnTerminal(jobId) return 1 } - worktreePath = wt.worktree_path - ctxAny.worktree_path = wt.worktree_path - ctxAny.branch_name = wt.branch_name - log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`) - } else if (worktreePath) { - log(`worktree path=${worktreePath} (pre-resolved)`) - } - // 5. Resolved config — log voor audit. - const cfg = ctx.config - const effort = mapBudgetToEffort(cfg.thinking_budget) - log( - `config job_id=${jobId} model=${cfg.model} ` + - `permission_mode=${cfg.permission_mode} ` + - `thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` + - `max_turns=${cfg.max_turns ?? 'null'} ` + - `allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`, - ) + // 4. Attach worktree for TASK_IMPLEMENTATION; sprint/idea-jobs hebben hun + // eigen worktree-pad al ingevuld door getFullJobContext. + // We werken hier met `any` omdat de return-type van getFullJobContext een + // discriminated union is en TypeScript hier zonder kind-narrow geen velden + // exposed; de runtime checks dekken alle paden af. + const ctxAny = ctx as any + let worktreePath: string | null = + ctxAny.worktree_path ?? ctxAny.primary_worktree_path ?? null - // 6. Write payload to /tmp/job-/payload.json. - const payloadDir = `/tmp/job-${jobId}` - const payloadPath = `${payloadDir}/payload.json` - mkdirSync(payloadDir, { recursive: true }) - const payloadJson = JSON.stringify(ctx, null, 2) - writeFileSync(payloadPath, payloadJson, 'utf8') - log( - `payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`, - ) - - // 7. Build CLI args. - const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath) - const args: string[] = [ - '-p', - promptText, - '--model', - cfg.model, - '--permission-mode', - cfg.permission_mode, - '--allowedTools', - (cfg.allowed_tools ?? []).join(','), - '--mcp-config', - MCP_CONFIG, - '--add-dir', - '/opt/agent', - '--output-format', - 'text', - ] - if (effort) args.push('--effort', effort) - - const cwd = worktreePath ?? '/opt/agent' - // Log args zonder de volledige prompt-tekst (kan kilo's groot zijn). - const argsForLog = args - .map((a, i) => (i === 1 ? `` : a)) - .join(' ') - log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`) - - // 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION. - let heartbeatTimer: NodeJS.Timeout | null = null - if (ctx.kind === 'SPRINT_IMPLEMENTATION') { - heartbeatTimer = setInterval(() => { - prisma - .$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}` - .then(() => { - log( - `heartbeat tick job_id=${jobId} lease_until=${new Date( - Date.now() + 5 * 60_000, - ).toISOString()}`, - ) - }) - .catch((err: Error) => { - logError(`heartbeat error: ${err.message}`) - }) - }, HEARTBEAT_INTERVAL_MS) - } - - // 9. Spawn Claude. - const start = Date.now() - let exitCode: number | null = null - let stdoutBuf = '' - try { - await new Promise((resolve, reject) => { - const child = spawn('claude', args, { cwd }) - child.stdout.on('data', (chunk) => { - const s = chunk.toString() - process.stdout.write(s) - stdoutBuf += s - }) - child.stderr.on('data', (chunk) => { - const s = chunk.toString() - process.stderr.write(s) - stdoutBuf += s - }) - child.on('error', (err) => reject(err)) - child.on('close', (code) => { - exitCode = code - resolve() - }) - }) - } catch (err) { - logError(`spawn error: ${(err as Error).message}`) - if (heartbeatTimer) clearInterval(heartbeatTimer) - await rollbackClaim(jobId).catch(() => {}) - await releaseLocksOnTerminal(jobId).catch(() => {}) - return 1 - } finally { - if (heartbeatTimer) clearInterval(heartbeatTimer) - } - - const durationMs = Date.now() - start - log( - `claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` + - `duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`, - ) - - // 10. Token-expiry detection. - let tokenExpired = false - for (const pat of TOKEN_EXPIRY_PATTERNS) { - if (pat.test(stdoutBuf)) { - tokenExpired = true - log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`) - break - } - } - - // 11. Rollback claim if Claude exited non-zero without updating job-status. - // PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min. - if (exitCode !== 0 && !tokenExpired) { - const jobNow = await prisma.claudeJob.findUnique({ - where: { id: jobId }, - select: { status: true }, - }) - if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') { - log( - `rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`, + if (ctx.kind === 'TASK_IMPLEMENTATION') { + if (!ctxAny.story || !ctxAny.task) { + logError(`TASK_IMPLEMENTATION job has incomplete story/task context`) + await rollbackClaim(jobId) + await releaseLocksOnTerminal(jobId) + return 1 + } + const wt = await attachWorktreeToJob( + ctxAny.product.id, + jobId, + ctxAny.story.id, + ctxAny.task.repo_url, ) + if ('error' in wt) { + logError(`attachWorktreeToJob error job_id=${jobId} ${wt.error}`) + log(`rollback claim job_id=${jobId} reason=worktree_attach_failed`) + await rollbackClaim(jobId) + await releaseLocksOnTerminal(jobId) + return 1 + } + worktreePath = wt.worktree_path + ctxAny.worktree_path = wt.worktree_path + ctxAny.branch_name = wt.branch_name + log(`worktree path=${wt.worktree_path} branch=${wt.branch_name}`) + } else if (worktreePath) { + log(`worktree path=${worktreePath} (pre-resolved)`) + } + + // 5. Resolved config — log voor audit. + const cfg = ctx.config + const effort = mapBudgetToEffort(cfg.thinking_budget) + log( + `config job_id=${jobId} model=${cfg.model} ` + + `permission_mode=${cfg.permission_mode} ` + + `thinking_budget=${cfg.thinking_budget} effort=${effort ?? '-'} ` + + `max_turns=${cfg.max_turns ?? 'null'} ` + + `allowed_tools_count=${cfg.allowed_tools?.length ?? 0}`, + ) + + // 6. Write payload to /tmp/job-/payload.json. + const payloadDir = `/tmp/job-${jobId}` + const payloadPath = `${payloadDir}/payload.json` + mkdirSync(payloadDir, { recursive: true }) + const payloadJson = JSON.stringify(ctx, null, 2) + writeFileSync(payloadPath, payloadJson, 'utf8') + log( + `payload written path=${payloadPath} size_bytes=${Buffer.byteLength(payloadJson)}`, + ) + + // 7. Build CLI args. + const promptText = getKindPromptText(ctx.kind).replace('$PAYLOAD_PATH', payloadPath) + const args: string[] = [ + '-p', + promptText, + '--model', + cfg.model, + '--permission-mode', + cfg.permission_mode, + '--allowedTools', + (cfg.allowed_tools ?? []).join(','), + '--mcp-config', + MCP_CONFIG, + '--add-dir', + '/opt/agent', + '--output-format', + 'text', + ] + if (effort) args.push('--effort', effort) + + const cwd = worktreePath ?? '/opt/agent' + // Log args zonder de volledige prompt-tekst (kan kilo's groot zijn). + const argsForLog = args + .map((a, i) => (i === 1 ? `` : a)) + .join(' ') + log(`spawn claude job_id=${jobId} cwd=${cwd} args="${argsForLog}"`) + + // 8. Lease-renewal heartbeat for SPRINT_IMPLEMENTATION. + let heartbeatTimer: NodeJS.Timeout | null = null + if (ctx.kind === 'SPRINT_IMPLEMENTATION') { + heartbeatTimer = setInterval(() => { + prisma + .$executeRaw`UPDATE claude_jobs SET lease_until = NOW() + INTERVAL '5 minutes' WHERE id = ${jobId}` + .then(() => { + log( + `heartbeat tick job_id=${jobId} lease_until=${new Date( + Date.now() + 5 * 60_000, + ).toISOString()}`, + ) + }) + .catch((err: Error) => { + logError(`heartbeat error: ${err.message}`) + }) + }, HEARTBEAT_INTERVAL_MS) + } + + // 9. Spawn Claude. + const start = Date.now() + let exitCode: number | null = null + let stdoutBuf = '' + try { + await new Promise((resolve, reject) => { + const child = spawn('claude', args, { cwd }) + child.stdout.on('data', (chunk) => { + const s = chunk.toString() + process.stdout.write(s) + stdoutBuf += s + }) + child.stderr.on('data', (chunk) => { + const s = chunk.toString() + process.stderr.write(s) + stdoutBuf += s + }) + child.on('error', (err) => reject(err)) + child.on('close', (code) => { + exitCode = code + resolve() + }) + }) + } catch (err) { + logError(`spawn error: ${(err as Error).message}`) + if (heartbeatTimer) clearInterval(heartbeatTimer) await rollbackClaim(jobId).catch(() => {}) await releaseLocksOnTerminal(jobId).catch(() => {}) + return 1 + } finally { + if (heartbeatTimer) clearInterval(heartbeatTimer) } - } - // 12. Cleanup payload directory. - try { - rmSync(payloadDir, { recursive: true, force: true }) - } catch { - // non-fatal - } - log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`) + const durationMs = Date.now() - start + log( + `claude done job_id=${jobId} exit_code=${exitCode ?? 'null'} ` + + `duration_ms=${durationMs} wall_clock_seconds=${Math.round(durationMs / 1000)}`, + ) - if (tokenExpired) return 3 - return exitCode ?? 1 + // 10. Token-expiry detection. + let tokenExpired = false + for (const pat of TOKEN_EXPIRY_PATTERNS) { + if (pat.test(stdoutBuf)) { + tokenExpired = true + log(`TOKEN_EXPIRED detected pattern="${pat.source}" exiting code=3`) + break + } + } + + // 11. Rollback claim if Claude exited non-zero without updating job-status. + // PBI-50 lease-driven recovery vangt resterende stale jobs op na 5 min. + if (exitCode !== 0 && !tokenExpired) { + const jobNow = await prisma.claudeJob.findUnique({ + where: { id: jobId }, + select: { status: true }, + }) + if (jobNow?.status === 'CLAIMED' || jobNow?.status === 'RUNNING') { + log( + `rollback claim job_id=${jobId} reason=claude_exit_${exitCode}_without_status_update`, + ) + await rollbackClaim(jobId).catch(() => {}) + await releaseLocksOnTerminal(jobId).catch(() => {}) + } + } + + // 12. Cleanup payload directory. + try { + rmSync(payloadDir, { recursive: true, force: true }) + } catch { + // non-fatal + } + log(`cleanup payload_removed=true heartbeat_stopped=${heartbeatTimer !== null}`) + + if (tokenExpired) return 3 + return exitCode ?? 1 + } finally { + workerHeartbeat.stop() + } } // ----- entry ----------------------------------------------------------