diff --git a/src/index.ts b/src/index.ts index 15479e3..2059b32 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,10 @@ import { registerUpdateJobStatusTool } from './tools/update-job-status.js' import { registerVerifyTaskAgainstPlanTool } from './tools/verify-task-against-plan.js' import { registerCleanupMyWorktreesTool } from './tools/cleanup-my-worktrees.js' import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js' +import { getAuth } from './auth.js' +import { registerWorker } from './presence/worker.js' +import { startHeartbeat } from './presence/heartbeat.js' +import { registerShutdownHandlers } from './presence/shutdown.js' const VERSION = '0.1.0' @@ -59,6 +63,12 @@ async function main() { const transport = new StdioServerTransport() await server.connect(transport) + + const auth = await getAuth() + await registerWorker({ userId: auth.userId, tokenId: auth.tokenId }) + const { stop: stopHeartbeat } = startHeartbeat({ tokenId: auth.tokenId }) + registerShutdownHandlers({ userId: auth.userId, tokenId: auth.tokenId, stopHeartbeat }) + console.error(`scrum4me-mcp ${VERSION} running on stdio`) } diff --git a/src/tools/wait-for-job.ts b/src/tools/wait-for-job.ts index 4aff070..03ed979 100644 --- a/src/tools/wait-for-job.ts +++ b/src/tools/wait-for-job.ts @@ -1,6 +1,5 @@ // wait_for_job — blokkeert tot een QUEUED ClaudeJob beschikbaar is, claimt 'm // atomisch via FOR UPDATE SKIP LOCKED, en retourneert de volledige task-context. -// Registreert ook de worker-presence (ClaudeWorker upsert + heartbeat). import { z } from 'zod' import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' @@ -66,7 +65,6 @@ export async function attachWorktreeToJob( const MAX_WAIT_SECONDS = 600 const POLL_INTERVAL_MS = 5_000 const STALE_CLAIMED_INTERVAL = "30 minutes" -const WORKER_HEARTBEAT_INTERVAL_MS = 5_000 const inputSchema = z.object({ product_id: z.string().min(1).optional(), @@ -196,25 +194,6 @@ export async function tryClaimJob( return rows.length > 0 ? rows[0].id : null } -async function upsertWorker(userId: string, tokenId: string, productId?: string) { - await prisma.claudeWorker.upsert({ - where: { token_id: tokenId }, - create: { - user_id: userId, - token_id: tokenId, - product_id: productId ?? null, - }, - update: { - last_seen_at: new Date(), - product_id: productId ?? null, - }, - }) -} - -async function deleteWorker(tokenId: string) { - await prisma.claudeWorker.deleteMany({ where: { token_id: tokenId } }) -} - async function getFullJobContext(jobId: string) { const job = await prisma.claudeJob.findUnique({ where: { id: jobId }, @@ -282,7 +261,6 @@ export function registerWaitForJobTool(server: McpServer) { 'and return full task context (implementation_plan, story, pbi, sprint, repo_url). ' + 'Also creates a git worktree for the job and returns worktree_path and branch_name. ' + 'Work exclusively in worktree_path — do all file edits and commits there. ' + - 'Registers worker presence so the Scrum4Me UI can show "Agent verbonden". ' + 'Resets stale CLAIMED jobs (>30min) back to QUEUED before scanning. ' + 'Pass optional product_id to scope to a specific product. ' + 'Returns { status: "timeout" } when wait_seconds elapses without a job. ' + @@ -294,103 +272,62 @@ export function registerWaitForJobTool(server: McpServer) { const auth = await requireWriteAccess() const { userId, tokenId } = auth - // Register presence - await upsertWorker(userId, tokenId, product_id) + // 1. Reset stale claimed jobs + await resetStaleClaimedJobs(userId) - // Notify worker_connected (best-effort — geen fatal error bij mislukken) - try { - const pg = new Client({ connectionString: process.env.DATABASE_URL }) - await pg.connect() - await pg.query( - `SELECT pg_notify('scrum4me_changes', $1)`, - [JSON.stringify({ type: 'worker_connected', user_id: userId, product_id: product_id ?? null, token_id: tokenId })], - ) - await pg.end() - } catch { - // non-fatal + // 2. Try immediate claim + let jobId = await tryClaimJob(userId, tokenId, product_id) + if (jobId) { + const ctx = await getFullJobContext(jobId) + if (!ctx) return toolError('Job claimed but context fetch failed') + const wt = await attachWorktreeToJob(ctx.product.id, jobId) + if ('error' in wt) return toolError(wt.error) + return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) } + // 3. No job available — LISTEN and poll until timeout + const deadline = Date.now() + wait_seconds * 1000 + const listenClient = new Client({ connectionString: process.env.DATABASE_URL }) + await listenClient.connect() + await listenClient.query('LISTEN scrum4me_changes') + try { - // 1. Reset stale claimed jobs - await resetStaleClaimedJobs(userId) - - // 2. Try immediate claim - let jobId = await tryClaimJob(userId, tokenId, product_id) - if (jobId) { - const ctx = await getFullJobContext(jobId) - if (!ctx) return toolError('Job claimed but context fetch failed') - const wt = await attachWorktreeToJob(ctx.product.id, jobId) - if ('error' in wt) return toolError(wt.error) - return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) - } - - // 3. No job available — LISTEN and poll until timeout - const deadline = Date.now() + wait_seconds * 1000 - const listenClient = new Client({ connectionString: process.env.DATABASE_URL }) - await listenClient.connect() - await listenClient.query('LISTEN scrum4me_changes') - - const heartbeatTimer = setInterval(async () => { - try { - await upsertWorker(userId, tokenId, product_id) - } catch { - // non-fatal - } - }, WORKER_HEARTBEAT_INTERVAL_MS) - - try { - while (Date.now() < deadline) { - // Wait for a notification or poll interval - await new Promise((resolve) => { - const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS) - listenClient.once('notification', (msg) => { - try { - const payload = JSON.parse(msg.payload ?? '{}') - if ( - payload.type === 'claude_job_enqueued' && - payload.user_id === userId && - (!product_id || payload.product_id === product_id) - ) { - clearTimeout(pollTimer) - resolve() - } - } catch { - // ignore parse errors + while (Date.now() < deadline) { + // Wait for a notification or poll interval + await new Promise((resolve) => { + const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS) + listenClient.once('notification', (msg) => { + try { + const payload = JSON.parse(msg.payload ?? '{}') + if ( + payload.type === 'claude_job_enqueued' && + payload.user_id === userId && + (!product_id || payload.product_id === product_id) + ) { + clearTimeout(pollTimer) + resolve() } - }) + } catch { + // ignore parse errors + } }) + }) - await resetStaleClaimedJobs(userId) - jobId = await tryClaimJob(userId, tokenId, product_id) - if (jobId) { - const ctx = await getFullJobContext(jobId) - if (!ctx) return toolError('Job claimed but context fetch failed') - const wt = await attachWorktreeToJob(ctx.product.id, jobId) - if ('error' in wt) return toolError(wt.error) - return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) - } + await resetStaleClaimedJobs(userId) + jobId = await tryClaimJob(userId, tokenId, product_id) + if (jobId) { + const ctx = await getFullJobContext(jobId) + if (!ctx) return toolError('Job claimed but context fetch failed') + const wt = await attachWorktreeToJob(ctx.product.id, jobId) + if ('error' in wt) return toolError(wt.error) + return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name }) } - } finally { - clearInterval(heartbeatTimer) - await listenClient.end().catch(() => {}) } - - return toolJson({ status: 'timeout', message: 'No job available within wait window' }) } finally { - // Deregister presence and notify - await deleteWorker(tokenId).catch(() => {}) - try { - const pg = new Client({ connectionString: process.env.DATABASE_URL }) - await pg.connect() - await pg.query( - `SELECT pg_notify('scrum4me_changes', $1)`, - [JSON.stringify({ type: 'worker_disconnected', user_id: userId, token_id: tokenId })], - ) - await pg.end() - } catch { - // non-fatal - } + await listenClient.end().catch(() => {}) } + + return toolJson({ status: 'timeout', message: 'No job available within wait window' }) }), ) }