feat: worker presence layer + batch-loop docs (#7)

* feat: add next_action field to update_job_status response

* docs: add Batch-loop section to README

* feat: presence layer — registerWorker, startHeartbeat, registerShutdownHandlers

* feat: bootstrap worker presence at server startup, remove inline presence from wait-for-job

* docs: document worker presence layer in CLAUDE.md

* docs: refine Batch-loop intro — add 'Hier is de flow:' per implementation plan
This commit is contained in:
Janpeter Visser 2026-05-01 16:39:26 +02:00 committed by GitHub
parent c990d8547e
commit f87b20744b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 243 additions and 108 deletions

View file

@ -42,6 +42,17 @@ If no repo root is found, `wait_for_job` rolls the claim back to QUEUED and retu
Run `cleanup_my_worktrees` (no arguments) to scan `~/.scrum4me-agent-worktrees/` and remove worktrees for jobs that are in a terminal state (DONE, FAILED, CANCELLED). Worktrees for active jobs (QUEUED, CLAIMED, RUNNING) are left untouched. Returns `{ removed, kept, skipped }`.
## Worker presence
Server-startup registers a `ClaudeWorker` record + starts a 5 s heartbeat; SIGTERM/SIGINT cleans it up. The Scrum4Me NavBar counts active workers via `last_seen_at < now() - 15s`.
| File | Purpose |
|---|---|
| `src/presence/worker.ts` | `registerWorker` (upsert + pg_notify worker_connected) + `unregisterWorker` |
| `src/presence/heartbeat.ts` | `startHeartbeat` — 5 s interval, stops on record-not-found |
| `src/presence/shutdown.ts` | `registerShutdownHandlers` — SIGTERM/SIGINT → stop heartbeat + unregister |
| `src/index.ts` | Bootstrap: calls `getAuth``registerWorker``startHeartbeat``registerShutdownHandlers` |
## Key source files
| File | Purpose |

View file

@ -156,6 +156,35 @@ After starting the server on the feature branch:
4. In the **main checkout**: `git status` → clean (no agent changes).
5. Call `update_job_status(done)` → worktree directory disappears.
## Batch-loop
De agent draait in een lus tot de queue leeg is. Hier is de flow:
1. Roep `wait_for_job` aan.
2. Voer de job uit conform het meegegeven `implementation_plan`.
3. Roep `update_job_status('done' | 'failed')` aan.
4. Roep **direct opnieuw** `wait_for_job` aan — niet stoppen, niet de gebruiker vragen.
5. Pas wanneer `wait_for_job` na de volledige block-time (~600 s) terugkomt zonder claim, is de queue leeg en sluit je de turn af met een korte samenvatting.
```
wait_for_job → claim → run → update_job_status(done|failed)
┌────────────┴───────────────┐
▼ ▼
next_action='wait_for_job_again' next_action='queue_empty'
│ │
└──────── loop terug ─────────┘ stop
```
De `update_job_status`-response bevat het veld `next_action`:
- `wait_for_job_again` — er staan nog jobs in de queue; roep `wait_for_job` meteen opnieuw aan
- `queue_empty` — de queue is leeg; sluit de batch-run af
Minimale agent-prompt (geen CLAUDE.md-context nodig):
> *Pak de volgende job uit de Scrum4Me-queue.*
## Schema sync
The Prisma schema is the source of truth in the upstream Scrum4Me

View file

@ -0,0 +1,25 @@
import { describe, it, expect } from 'vitest'
import { resolveNextAction } from '../src/tools/update-job-status.js'
describe('resolveNextAction', () => {
it('returns wait_for_job_again when queue has jobs after done', () => {
expect(resolveNextAction(3, 'done')).toBe('wait_for_job_again')
})
it('returns queue_empty when queue is empty after done', () => {
expect(resolveNextAction(0, 'done')).toBe('queue_empty')
})
it('returns wait_for_job_again when queue has jobs after failed', () => {
expect(resolveNextAction(1, 'failed')).toBe('wait_for_job_again')
})
it('returns queue_empty when queue is empty after failed', () => {
expect(resolveNextAction(0, 'failed')).toBe('queue_empty')
})
it('returns idle for running status regardless of queue count', () => {
expect(resolveNextAction(5, 'running')).toBe('idle')
expect(resolveNextAction(0, 'running')).toBe('idle')
})
})

View file

@ -22,6 +22,10 @@ import { registerUpdateJobStatusTool } from './tools/update-job-status.js'
import { registerVerifyTaskAgainstPlanTool } from './tools/verify-task-against-plan.js'
import { registerCleanupMyWorktreesTool } from './tools/cleanup-my-worktrees.js'
import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js'
import { getAuth } from './auth.js'
import { registerWorker } from './presence/worker.js'
import { startHeartbeat } from './presence/heartbeat.js'
import { registerShutdownHandlers } from './presence/shutdown.js'
const VERSION = '0.1.0'
@ -59,6 +63,12 @@ async function main() {
const transport = new StdioServerTransport()
await server.connect(transport)
const auth = await getAuth()
await registerWorker({ userId: auth.userId, tokenId: auth.tokenId })
const { stop: stopHeartbeat } = startHeartbeat({ tokenId: auth.tokenId })
registerShutdownHandlers({ userId: auth.userId, tokenId: auth.tokenId, stopHeartbeat })
console.error(`scrum4me-mcp ${VERSION} running on stdio`)
}

23
src/presence/heartbeat.ts Normal file
View file

@ -0,0 +1,23 @@
import { prisma } from '../prisma.js'
export function startHeartbeat(opts: {
tokenId: string
intervalMs?: number
}): { stop: () => void } {
const timer = setInterval(async () => {
try {
const result = await prisma.claudeWorker.updateMany({
where: { token_id: opts.tokenId },
data: { last_seen_at: new Date() },
})
if (result.count === 0) {
console.error('[scrum4me-mcp] Heartbeat: worker record not found — token may be revoked. Stopping.')
clearInterval(timer)
}
} catch {
// non-fatal
}
}, opts.intervalMs ?? 5_000)
return { stop: () => clearInterval(timer) }
}

20
src/presence/shutdown.ts Normal file
View file

@ -0,0 +1,20 @@
import { unregisterWorker } from './worker.js'
export function registerShutdownHandlers(opts: {
userId: string
tokenId: string
stopHeartbeat: () => void
}): void {
let exiting = false
const shutdown = async () => {
if (exiting) return
exiting = true
opts.stopHeartbeat()
await unregisterWorker({ userId: opts.userId, tokenId: opts.tokenId })
process.exit(0)
}
process.on('SIGTERM', () => void shutdown())
process.on('SIGINT', () => void shutdown())
}

61
src/presence/worker.ts Normal file
View file

@ -0,0 +1,61 @@
import { Client } from 'pg'
import { prisma } from '../prisma.js'
export async function registerWorker(opts: {
userId: string
tokenId: string
productId?: string | null
}): Promise<void> {
await prisma.claudeWorker.upsert({
where: { token_id: opts.tokenId },
create: {
user_id: opts.userId,
token_id: opts.tokenId,
product_id: opts.productId ?? null,
},
update: {
last_seen_at: new Date(),
product_id: opts.productId ?? null,
},
})
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query('SELECT pg_notify($1, $2)', [
'scrum4me_changes',
JSON.stringify({
type: 'worker_connected',
user_id: opts.userId,
token_id: opts.tokenId,
product_id: opts.productId ?? null,
}),
])
await pg.end()
} catch {
// non-fatal
}
}
export async function unregisterWorker(opts: {
userId: string
tokenId: string
}): Promise<void> {
await prisma.claudeWorker.deleteMany({ where: { token_id: opts.tokenId } }).catch(() => {})
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query('SELECT pg_notify($1, $2)', [
'scrum4me_changes',
JSON.stringify({
type: 'worker_disconnected',
user_id: opts.userId,
token_id: opts.tokenId,
}),
])
await pg.end()
} catch {
// non-fatal
}
}

View file

@ -118,6 +118,14 @@ const DB_STATUS_MAP = {
failed: 'FAILED',
} as const
export function resolveNextAction(
queueCount: number,
status: 'running' | 'done' | 'failed',
): 'wait_for_job_again' | 'queue_empty' | 'idle' {
if (status === 'running') return 'idle'
return queueCount > 0 ? 'wait_for_job_again' : 'queue_empty'
}
export async function maybeCreateAutoPr(opts: {
jobId: string
productId: string
@ -161,9 +169,14 @@ export function registerUpdateJobStatusTool(server: McpServer) {
'Report progress on a claimed ClaudeJob. Allowed transitions from CLAIMED/RUNNING: ' +
'running (start), done (finished), failed (error). ' +
'The Bearer token must match the token that claimed the job. ' +
<<<<<<< feat/job-mgskzyvx
'Automatically emits an SSE event so the Scrum4Me UI updates in real time. ' +
'Response includes next_action: when wait_for_job_again, immediately call wait_for_job again. When queue_empty, the agent batch is done.',
=======
'Before marking done: call verify_task_against_plan first — done is rejected when ' +
'verify_result is null or EMPTY (unless task.verify_only is true). ' +
'Automatically emits an SSE event so the Scrum4Me UI updates in real time.',
>>>>>>> main
inputSchema,
},
async ({ job_id, status, branch, summary, error }) =>
@ -295,6 +308,11 @@ export function registerUpdateJobStatusTool(server: McpServer) {
await cleanupWorktreeForTerminalStatus(job.product_id, job_id, actualStatus, branchToWrite)
}
const queueCount = await prisma.claudeJob.count({
where: { user_id: userId, status: 'QUEUED' },
})
const nextAction = resolveNextAction(queueCount, actualStatus)
return toolJson({
job_id: updated.id,
status: actualStatus,
@ -306,6 +324,7 @@ export function registerUpdateJobStatusTool(server: McpServer) {
error: updated.error,
started_at: updated.started_at?.toISOString() ?? null,
finished_at: updated.finished_at?.toISOString() ?? null,
next_action: nextAction,
})
}),
)

View file

@ -1,6 +1,5 @@
// wait_for_job — blokkeert tot een QUEUED ClaudeJob beschikbaar is, claimt 'm
// atomisch via FOR UPDATE SKIP LOCKED, en retourneert de volledige task-context.
// Registreert ook de worker-presence (ClaudeWorker upsert + heartbeat).
import { z } from 'zod'
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
@ -66,7 +65,6 @@ export async function attachWorktreeToJob(
const MAX_WAIT_SECONDS = 600
const POLL_INTERVAL_MS = 5_000
const STALE_CLAIMED_INTERVAL = "30 minutes"
const WORKER_HEARTBEAT_INTERVAL_MS = 5_000
const inputSchema = z.object({
product_id: z.string().min(1).optional(),
@ -196,25 +194,6 @@ export async function tryClaimJob(
return rows.length > 0 ? rows[0].id : null
}
async function upsertWorker(userId: string, tokenId: string, productId?: string) {
await prisma.claudeWorker.upsert({
where: { token_id: tokenId },
create: {
user_id: userId,
token_id: tokenId,
product_id: productId ?? null,
},
update: {
last_seen_at: new Date(),
product_id: productId ?? null,
},
})
}
async function deleteWorker(tokenId: string) {
await prisma.claudeWorker.deleteMany({ where: { token_id: tokenId } })
}
async function getFullJobContext(jobId: string) {
const job = await prisma.claudeJob.findUnique({
where: { id: jobId },
@ -282,7 +261,6 @@ export function registerWaitForJobTool(server: McpServer) {
'and return full task context (implementation_plan, story, pbi, sprint, repo_url). ' +
'Also creates a git worktree for the job and returns worktree_path and branch_name. ' +
'Work exclusively in worktree_path — do all file edits and commits there. ' +
'Registers worker presence so the Scrum4Me UI can show "Agent verbonden". ' +
'Resets stale CLAIMED jobs (>30min) back to QUEUED before scanning. ' +
'Pass optional product_id to scope to a specific product. ' +
'Returns { status: "timeout" } when wait_seconds elapses without a job. ' +
@ -294,103 +272,62 @@ export function registerWaitForJobTool(server: McpServer) {
const auth = await requireWriteAccess()
const { userId, tokenId } = auth
// Register presence
await upsertWorker(userId, tokenId, product_id)
// 1. Reset stale claimed jobs
await resetStaleClaimedJobs(userId)
// Notify worker_connected (best-effort — geen fatal error bij mislukken)
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query(
`SELECT pg_notify('scrum4me_changes', $1)`,
[JSON.stringify({ type: 'worker_connected', user_id: userId, product_id: product_id ?? null, token_id: tokenId })],
)
await pg.end()
} catch {
// non-fatal
// 2. Try immediate claim
let jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
// 3. No job available — LISTEN and poll until timeout
const deadline = Date.now() + wait_seconds * 1000
const listenClient = new Client({ connectionString: process.env.DATABASE_URL })
await listenClient.connect()
await listenClient.query('LISTEN scrum4me_changes')
try {
// 1. Reset stale claimed jobs
await resetStaleClaimedJobs(userId)
// 2. Try immediate claim
let jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
// 3. No job available — LISTEN and poll until timeout
const deadline = Date.now() + wait_seconds * 1000
const listenClient = new Client({ connectionString: process.env.DATABASE_URL })
await listenClient.connect()
await listenClient.query('LISTEN scrum4me_changes')
const heartbeatTimer = setInterval(async () => {
try {
await upsertWorker(userId, tokenId, product_id)
} catch {
// non-fatal
}
}, WORKER_HEARTBEAT_INTERVAL_MS)
try {
while (Date.now() < deadline) {
// Wait for a notification or poll interval
await new Promise<void>((resolve) => {
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
listenClient.once('notification', (msg) => {
try {
const payload = JSON.parse(msg.payload ?? '{}')
if (
payload.type === 'claude_job_enqueued' &&
payload.user_id === userId &&
(!product_id || payload.product_id === product_id)
) {
clearTimeout(pollTimer)
resolve()
}
} catch {
// ignore parse errors
while (Date.now() < deadline) {
// Wait for a notification or poll interval
await new Promise<void>((resolve) => {
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
listenClient.once('notification', (msg) => {
try {
const payload = JSON.parse(msg.payload ?? '{}')
if (
payload.type === 'claude_job_enqueued' &&
payload.user_id === userId &&
(!product_id || payload.product_id === product_id)
) {
clearTimeout(pollTimer)
resolve()
}
})
} catch {
// ignore parse errors
}
})
})
await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
} finally {
clearInterval(heartbeatTimer)
await listenClient.end().catch(() => {})
}
return toolJson({ status: 'timeout', message: 'No job available within wait window' })
} finally {
// Deregister presence and notify
await deleteWorker(tokenId).catch(() => {})
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query(
`SELECT pg_notify('scrum4me_changes', $1)`,
[JSON.stringify({ type: 'worker_disconnected', user_id: userId, token_id: tokenId })],
)
await pg.end()
} catch {
// non-fatal
}
await listenClient.end().catch(() => {})
}
return toolJson({ status: 'timeout', message: 'No job available within wait window' })
}),
)
}