diff --git a/CLAUDE.md b/CLAUDE.md index af9950a..f3b46e7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,6 +42,17 @@ If no repo root is found, `wait_for_job` rolls the claim back to QUEUED and retu Run `cleanup_my_worktrees` (no arguments) to scan `~/.scrum4me-agent-worktrees/` and remove worktrees for jobs that are in a terminal state (DONE, FAILED, CANCELLED). Worktrees for active jobs (QUEUED, CLAIMED, RUNNING) are left untouched. Returns `{ removed, kept, skipped }`. +## Worker presence + +Server-startup registers a `ClaudeWorker` record + starts a 5 s heartbeat; SIGTERM/SIGINT cleans it up. The Scrum4Me NavBar counts active workers via `last_seen_at < now() - 15s`. + +| File | Purpose | +|---|---| +| `src/presence/worker.ts` | `registerWorker` (upsert + pg_notify worker_connected) + `unregisterWorker` | +| `src/presence/heartbeat.ts` | `startHeartbeat` — 5 s interval, stops on record-not-found | +| `src/presence/shutdown.ts` | `registerShutdownHandlers` — SIGTERM/SIGINT → stop heartbeat + unregister | +| `src/index.ts` | Bootstrap: calls `getAuth` → `registerWorker` → `startHeartbeat` → `registerShutdownHandlers` | + ## Key source files | File | Purpose | diff --git a/README.md b/README.md index 34d47fb..047b98e 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,35 @@ After starting the server on the feature branch: 4. In the **main checkout**: `git status` → clean (no agent changes). 5. Call `update_job_status(done)` → worktree directory disappears. +## Batch-loop + +De agent draait in een lus tot de queue leeg is. Hier is de flow: + +1. Roep `wait_for_job` aan. +2. Voer de job uit conform het meegegeven `implementation_plan`. +3. Roep `update_job_status('done' | 'failed')` aan. +4. Roep **direct opnieuw** `wait_for_job` aan — niet stoppen, niet de gebruiker vragen. +5. Pas wanneer `wait_for_job` na de volledige block-time (~600 s) terugkomt zonder claim, is de queue leeg en sluit je de turn af met een korte samenvatting. + +``` +wait_for_job → claim → run → update_job_status(done|failed) + │ + ┌────────────┴───────────────┐ + ▼ ▼ + next_action='wait_for_job_again' next_action='queue_empty' + │ │ + └──────── loop terug ─────────┘ stop +``` + +De `update_job_status`-response bevat het veld `next_action`: + +- `wait_for_job_again` — er staan nog jobs in de queue; roep `wait_for_job` meteen opnieuw aan +- `queue_empty` — de queue is leeg; sluit de batch-run af + +Minimale agent-prompt (geen CLAUDE.md-context nodig): + +> *Pak de volgende job uit de Scrum4Me-queue.* + ## Schema sync The Prisma schema is the source of truth in the upstream Scrum4Me diff --git a/__tests__/update-job-status-next-action.test.ts b/__tests__/update-job-status-next-action.test.ts new file mode 100644 index 0000000..3f1a870 --- /dev/null +++ b/__tests__/update-job-status-next-action.test.ts @@ -0,0 +1,25 @@ +import { describe, it, expect } from 'vitest' +import { resolveNextAction } from '../src/tools/update-job-status.js' + +describe('resolveNextAction', () => { + it('returns wait_for_job_again when queue has jobs after done', () => { + expect(resolveNextAction(3, 'done')).toBe('wait_for_job_again') + }) + + it('returns queue_empty when queue is empty after done', () => { + expect(resolveNextAction(0, 'done')).toBe('queue_empty') + }) + + it('returns wait_for_job_again when queue has jobs after failed', () => { + expect(resolveNextAction(1, 'failed')).toBe('wait_for_job_again') + }) + + it('returns queue_empty when queue is empty after failed', () => { + expect(resolveNextAction(0, 'failed')).toBe('queue_empty') + }) + + it('returns idle for running status regardless of queue count', () => { + expect(resolveNextAction(5, 'running')).toBe('idle') + expect(resolveNextAction(0, 'running')).toBe('idle') + }) +}) diff --git a/src/index.ts b/src/index.ts index 15479e3..2059b32 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,10 @@ import { registerUpdateJobStatusTool } from './tools/update-job-status.js' import { registerVerifyTaskAgainstPlanTool } from './tools/verify-task-against-plan.js' import { registerCleanupMyWorktreesTool } from './tools/cleanup-my-worktrees.js' import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js' +import { getAuth } from './auth.js' +import { registerWorker } from './presence/worker.js' +import { startHeartbeat } from './presence/heartbeat.js' +import { registerShutdownHandlers } from './presence/shutdown.js' const VERSION = '0.1.0' @@ -59,6 +63,12 @@ async function main() { const transport = new StdioServerTransport() await server.connect(transport) + + const auth = await getAuth() + await registerWorker({ userId: auth.userId, tokenId: auth.tokenId }) + const { stop: stopHeartbeat } = startHeartbeat({ tokenId: auth.tokenId }) + registerShutdownHandlers({ userId: auth.userId, tokenId: auth.tokenId, stopHeartbeat }) + console.error(`scrum4me-mcp ${VERSION} running on stdio`) } diff --git a/src/presence/heartbeat.ts b/src/presence/heartbeat.ts new file mode 100644 index 0000000..f4cb230 --- /dev/null +++ b/src/presence/heartbeat.ts @@ -0,0 +1,23 @@ +import { prisma } from '../prisma.js' + +export function startHeartbeat(opts: { + tokenId: string + intervalMs?: number +}): { stop: () => void } { + const timer = setInterval(async () => { + try { + const result = await prisma.claudeWorker.updateMany({ + where: { token_id: opts.tokenId }, + data: { last_seen_at: new Date() }, + }) + if (result.count === 0) { + console.error('[scrum4me-mcp] Heartbeat: worker record not found — token may be revoked. Stopping.') + clearInterval(timer) + } + } catch { + // non-fatal + } + }, opts.intervalMs ?? 5_000) + + return { stop: () => clearInterval(timer) } +} diff --git a/src/presence/shutdown.ts b/src/presence/shutdown.ts new file mode 100644 index 0000000..53eb567 --- /dev/null +++ b/src/presence/shutdown.ts @@ -0,0 +1,20 @@ +import { unregisterWorker } from './worker.js' + +export function registerShutdownHandlers(opts: { + userId: string + tokenId: string + stopHeartbeat: () => void +}): void { + let exiting = false + + const shutdown = async () => { + if (exiting) return + exiting = true + opts.stopHeartbeat() + await unregisterWorker({ userId: opts.userId, tokenId: opts.tokenId }) + process.exit(0) + } + + process.on('SIGTERM', () => void shutdown()) + process.on('SIGINT', () => void shutdown()) +} diff --git a/src/presence/worker.ts b/src/presence/worker.ts new file mode 100644 index 0000000..9fb39d7 --- /dev/null +++ b/src/presence/worker.ts @@ -0,0 +1,61 @@ +import { Client } from 'pg' +import { prisma } from '../prisma.js' + +export async function registerWorker(opts: { + userId: string + tokenId: string + productId?: string | null +}): Promise { + await prisma.claudeWorker.upsert({ + where: { token_id: opts.tokenId }, + create: { + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }, + update: { + last_seen_at: new Date(), + product_id: opts.productId ?? null, + }, + }) + + try { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'worker_connected', + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }), + ]) + await pg.end() + } catch { + // non-fatal + } +} + +export async function unregisterWorker(opts: { + userId: string + tokenId: string +}): Promise { + await prisma.claudeWorker.deleteMany({ where: { token_id: opts.tokenId } }).catch(() => {}) + + try { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'worker_disconnected', + user_id: opts.userId, + token_id: opts.tokenId, + }), + ]) + await pg.end() + } catch { + // non-fatal + } +} diff --git a/src/tools/update-job-status.ts b/src/tools/update-job-status.ts index 614da29..05f4dfd 100644 --- a/src/tools/update-job-status.ts +++ b/src/tools/update-job-status.ts @@ -118,6 +118,14 @@ const DB_STATUS_MAP = { failed: 'FAILED', } as const +export function resolveNextAction( + queueCount: number, + status: 'running' | 'done' | 'failed', +): 'wait_for_job_again' | 'queue_empty' | 'idle' { + if (status === 'running') return 'idle' + return queueCount > 0 ? 'wait_for_job_again' : 'queue_empty' +} + export async function maybeCreateAutoPr(opts: { jobId: string productId: string @@ -161,9 +169,14 @@ export function registerUpdateJobStatusTool(server: McpServer) { 'Report progress on a claimed ClaudeJob. Allowed transitions from CLAIMED/RUNNING: ' + 'running (start), done (finished), failed (error). ' + 'The Bearer token must match the token that claimed the job. ' + +<<<<<<< feat/job-mgskzyvx + 'Automatically emits an SSE event so the Scrum4Me UI updates in real time. ' + + 'Response includes next_action: when wait_for_job_again, immediately call wait_for_job again. When queue_empty, the agent batch is done.', +======= 'Before marking done: call verify_task_against_plan first — done is rejected when ' + 'verify_result is null or EMPTY (unless task.verify_only is true). ' + 'Automatically emits an SSE event so the Scrum4Me UI updates in real time.', +>>>>>>> main inputSchema, }, async ({ job_id, status, branch, summary, error }) => @@ -295,6 +308,11 @@ export function registerUpdateJobStatusTool(server: McpServer) { await cleanupWorktreeForTerminalStatus(job.product_id, job_id, actualStatus, branchToWrite) } + const queueCount = await prisma.claudeJob.count({ + where: { user_id: userId, status: 'QUEUED' }, + }) + const nextAction = resolveNextAction(queueCount, actualStatus) + return toolJson({ job_id: updated.id, status: actualStatus, @@ -306,6 +324,7 @@ export function registerUpdateJobStatusTool(server: McpServer) { error: updated.error, started_at: updated.started_at?.toISOString() ?? null, finished_at: updated.finished_at?.toISOString() ?? null, + next_action: nextAction, }) }), ) diff --git a/src/tools/wait-for-job.ts b/src/tools/wait-for-job.ts index 4aff070..03ed979 100644 --- a/src/tools/wait-for-job.ts +++ b/src/tools/wait-for-job.ts @@ -1,6 +1,5 @@ // wait_for_job — blokkeert tot een QUEUED ClaudeJob beschikbaar is, claimt 'm // atomisch via FOR UPDATE SKIP LOCKED, en retourneert de volledige task-context. -// Registreert ook de worker-presence (ClaudeWorker upsert + heartbeat). import { z } from 'zod' import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' @@ -66,7 +65,6 @@ export async function attachWorktreeToJob( const MAX_WAIT_SECONDS = 600 const POLL_INTERVAL_MS = 5_000 const STALE_CLAIMED_INTERVAL = "30 minutes" -const WORKER_HEARTBEAT_INTERVAL_MS = 5_000 const inputSchema = z.object({ product_id: z.string().min(1).optional(), @@ -196,25 +194,6 @@ export async function tryClaimJob( return rows.length > 0 ? rows[0].id : null } -async function upsertWorker(userId: string, tokenId: string, productId?: string) { - await prisma.claudeWorker.upsert({ - where: { token_id: tokenId }, - create: { - user_id: userId, - token_id: tokenId, - product_id: productId ?? null, - }, - update: { - last_seen_at: new Date(), - product_id: productId ?? null, - }, - }) -} - -async function deleteWorker(tokenId: string) { - await prisma.claudeWorker.deleteMany({ where: { token_id: tokenId } }) -} - async function getFullJobContext(jobId: string) { const job = await prisma.claudeJob.findUnique({ where: { id: jobId }, @@ -282,7 +261,6 @@ export function registerWaitForJobTool(server: McpServer) { 'and return full task context (implementation_plan, story, pbi, sprint, repo_url). ' + 'Also creates a git worktree for the job and returns worktree_path and branch_name. ' + 'Work exclusively in worktree_path — do all file edits and commits there. ' + - 'Registers worker presence so the Scrum4Me UI can show "Agent verbonden". ' + 'Resets stale CLAIMED jobs (>30min) back to QUEUED before scanning. ' + 'Pass optional product_id to scope to a specific product. ' + 'Returns { status: "timeout" } when wait_seconds elapses without a job. ' + @@ -294,103 +272,62 @@ export function registerWaitForJobTool(server: McpServer) { const auth = await requireWriteAccess() const { userId, tokenId } = auth - // Register presence - await upsertWorker(userId, tokenId, product_id) + // 1. Reset stale claimed jobs + await resetStaleClaimedJobs(userId) - // Notify worker_connected (best-effort — geen fatal error bij mislukken) - try { - const pg = new Client({ connectionString: process.env.DATABASE_URL }) - await pg.connect() - await pg.query( - `SELECT pg_notify('scrum4me_changes', $1)`, - [JSON.stringify({ type: 'worker_connected', user_id: userId, product_id: product_id ?? null, token_id: tokenId })], - ) - await pg.end() - } catch { - // non-fatal + // 2. Try immediate claim + let jobId = await tryClaimJob(userId, tokenId, product_id) + if (jobId) { + const ctx = await getFullJobContext(jobId) + if (!ctx) return toolError('Job claimed but context fetch failed') + const wt = await attachWorktreeToJob(ctx.product.id, jobId) + if ('error' in wt) return toolError(wt.error) + return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) } + // 3. No job available — LISTEN and poll until timeout + const deadline = Date.now() + wait_seconds * 1000 + const listenClient = new Client({ connectionString: process.env.DATABASE_URL }) + await listenClient.connect() + await listenClient.query('LISTEN scrum4me_changes') + try { - // 1. Reset stale claimed jobs - await resetStaleClaimedJobs(userId) - - // 2. Try immediate claim - let jobId = await tryClaimJob(userId, tokenId, product_id) - if (jobId) { - const ctx = await getFullJobContext(jobId) - if (!ctx) return toolError('Job claimed but context fetch failed') - const wt = await attachWorktreeToJob(ctx.product.id, jobId) - if ('error' in wt) return toolError(wt.error) - return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) - } - - // 3. No job available — LISTEN and poll until timeout - const deadline = Date.now() + wait_seconds * 1000 - const listenClient = new Client({ connectionString: process.env.DATABASE_URL }) - await listenClient.connect() - await listenClient.query('LISTEN scrum4me_changes') - - const heartbeatTimer = setInterval(async () => { - try { - await upsertWorker(userId, tokenId, product_id) - } catch { - // non-fatal - } - }, WORKER_HEARTBEAT_INTERVAL_MS) - - try { - while (Date.now() < deadline) { - // Wait for a notification or poll interval - await new Promise((resolve) => { - const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS) - listenClient.once('notification', (msg) => { - try { - const payload = JSON.parse(msg.payload ?? '{}') - if ( - payload.type === 'claude_job_enqueued' && - payload.user_id === userId && - (!product_id || payload.product_id === product_id) - ) { - clearTimeout(pollTimer) - resolve() - } - } catch { - // ignore parse errors + while (Date.now() < deadline) { + // Wait for a notification or poll interval + await new Promise((resolve) => { + const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS) + listenClient.once('notification', (msg) => { + try { + const payload = JSON.parse(msg.payload ?? '{}') + if ( + payload.type === 'claude_job_enqueued' && + payload.user_id === userId && + (!product_id || payload.product_id === product_id) + ) { + clearTimeout(pollTimer) + resolve() } - }) + } catch { + // ignore parse errors + } }) + }) - await resetStaleClaimedJobs(userId) - jobId = await tryClaimJob(userId, tokenId, product_id) - if (jobId) { - const ctx = await getFullJobContext(jobId) - if (!ctx) return toolError('Job claimed but context fetch failed') - const wt = await attachWorktreeToJob(ctx.product.id, jobId) - if ('error' in wt) return toolError(wt.error) - return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) - } + await resetStaleClaimedJobs(userId) + jobId = await tryClaimJob(userId, tokenId, product_id) + if (jobId) { + const ctx = await getFullJobContext(jobId) + if (!ctx) return toolError('Job claimed but context fetch failed') + const wt = await attachWorktreeToJob(ctx.product.id, jobId) + if ('error' in wt) return toolError(wt.error) + return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) } - } finally { - clearInterval(heartbeatTimer) - await listenClient.end().catch(() => {}) } - - return toolJson({ status: 'timeout', message: 'No job available within wait window' }) } finally { - // Deregister presence and notify - await deleteWorker(tokenId).catch(() => {}) - try { - const pg = new Client({ connectionString: process.env.DATABASE_URL }) - await pg.connect() - await pg.query( - `SELECT pg_notify('scrum4me_changes', $1)`, - [JSON.stringify({ type: 'worker_disconnected', user_id: userId, token_id: tokenId })], - ) - await pg.end() - } catch { - // non-fatal - } + await listenClient.end().catch(() => {}) } + + return toolJson({ status: 'timeout', message: 'No job available within wait window' }) }), ) }