feat: bootstrap worker presence at server startup, remove inline presence from wait-for-job

This commit is contained in:
Janpeter Visser 2026-05-01 14:31:46 +02:00
parent 1d6652b7c8
commit 88f9156504
2 changed files with 55 additions and 108 deletions

View file

@ -22,6 +22,10 @@ import { registerUpdateJobStatusTool } from './tools/update-job-status.js'
import { registerVerifyTaskAgainstPlanTool } from './tools/verify-task-against-plan.js'
import { registerCleanupMyWorktreesTool } from './tools/cleanup-my-worktrees.js'
import { registerImplementNextStoryPrompt } from './prompts/implement-next-story.js'
import { getAuth } from './auth.js'
import { registerWorker } from './presence/worker.js'
import { startHeartbeat } from './presence/heartbeat.js'
import { registerShutdownHandlers } from './presence/shutdown.js'
const VERSION = '0.1.0'
@ -59,6 +63,12 @@ async function main() {
const transport = new StdioServerTransport()
await server.connect(transport)
const auth = await getAuth()
await registerWorker({ userId: auth.userId, tokenId: auth.tokenId })
const { stop: stopHeartbeat } = startHeartbeat({ tokenId: auth.tokenId })
registerShutdownHandlers({ userId: auth.userId, tokenId: auth.tokenId, stopHeartbeat })
console.error(`scrum4me-mcp ${VERSION} running on stdio`)
}

View file

@ -1,6 +1,5 @@
// wait_for_job — blokkeert tot een QUEUED ClaudeJob beschikbaar is, claimt 'm
// atomisch via FOR UPDATE SKIP LOCKED, en retourneert de volledige task-context.
// Registreert ook de worker-presence (ClaudeWorker upsert + heartbeat).
import { z } from 'zod'
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
@ -66,7 +65,6 @@ export async function attachWorktreeToJob(
const MAX_WAIT_SECONDS = 600
const POLL_INTERVAL_MS = 5_000
const STALE_CLAIMED_INTERVAL = "30 minutes"
const WORKER_HEARTBEAT_INTERVAL_MS = 5_000
const inputSchema = z.object({
product_id: z.string().min(1).optional(),
@ -196,25 +194,6 @@ export async function tryClaimJob(
return rows.length > 0 ? rows[0].id : null
}
async function upsertWorker(userId: string, tokenId: string, productId?: string) {
await prisma.claudeWorker.upsert({
where: { token_id: tokenId },
create: {
user_id: userId,
token_id: tokenId,
product_id: productId ?? null,
},
update: {
last_seen_at: new Date(),
product_id: productId ?? null,
},
})
}
async function deleteWorker(tokenId: string) {
await prisma.claudeWorker.deleteMany({ where: { token_id: tokenId } })
}
async function getFullJobContext(jobId: string) {
const job = await prisma.claudeJob.findUnique({
where: { id: jobId },
@ -282,7 +261,6 @@ export function registerWaitForJobTool(server: McpServer) {
'and return full task context (implementation_plan, story, pbi, sprint, repo_url). ' +
'Also creates a git worktree for the job and returns worktree_path and branch_name. ' +
'Work exclusively in worktree_path — do all file edits and commits there. ' +
'Registers worker presence so the Scrum4Me UI can show "Agent verbonden". ' +
'Resets stale CLAIMED jobs (>30min) back to QUEUED before scanning. ' +
'Pass optional product_id to scope to a specific product. ' +
'Returns { status: "timeout" } when wait_seconds elapses without a job. ' +
@ -294,103 +272,62 @@ export function registerWaitForJobTool(server: McpServer) {
const auth = await requireWriteAccess()
const { userId, tokenId } = auth
// Register presence
await upsertWorker(userId, tokenId, product_id)
// 1. Reset stale claimed jobs
await resetStaleClaimedJobs(userId)
// Notify worker_connected (best-effort — geen fatal error bij mislukken)
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query(
`SELECT pg_notify('scrum4me_changes', $1)`,
[JSON.stringify({ type: 'worker_connected', user_id: userId, product_id: product_id ?? null, token_id: tokenId })],
)
await pg.end()
} catch {
// non-fatal
// 2. Try immediate claim
let jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
// 3. No job available — LISTEN and poll until timeout
const deadline = Date.now() + wait_seconds * 1000
const listenClient = new Client({ connectionString: process.env.DATABASE_URL })
await listenClient.connect()
await listenClient.query('LISTEN scrum4me_changes')
try {
// 1. Reset stale claimed jobs
await resetStaleClaimedJobs(userId)
// 2. Try immediate claim
let jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
// 3. No job available — LISTEN and poll until timeout
const deadline = Date.now() + wait_seconds * 1000
const listenClient = new Client({ connectionString: process.env.DATABASE_URL })
await listenClient.connect()
await listenClient.query('LISTEN scrum4me_changes')
const heartbeatTimer = setInterval(async () => {
try {
await upsertWorker(userId, tokenId, product_id)
} catch {
// non-fatal
}
}, WORKER_HEARTBEAT_INTERVAL_MS)
try {
while (Date.now() < deadline) {
// Wait for a notification or poll interval
await new Promise<void>((resolve) => {
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
listenClient.once('notification', (msg) => {
try {
const payload = JSON.parse(msg.payload ?? '{}')
if (
payload.type === 'claude_job_enqueued' &&
payload.user_id === userId &&
(!product_id || payload.product_id === product_id)
) {
clearTimeout(pollTimer)
resolve()
}
} catch {
// ignore parse errors
while (Date.now() < deadline) {
// Wait for a notification or poll interval
await new Promise<void>((resolve) => {
const pollTimer = setTimeout(resolve, POLL_INTERVAL_MS)
listenClient.once('notification', (msg) => {
try {
const payload = JSON.parse(msg.payload ?? '{}')
if (
payload.type === 'claude_job_enqueued' &&
payload.user_id === userId &&
(!product_id || payload.product_id === product_id)
) {
clearTimeout(pollTimer)
resolve()
}
})
} catch {
// ignore parse errors
}
})
})
await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
await resetStaleClaimedJobs(userId)
jobId = await tryClaimJob(userId, tokenId, product_id)
if (jobId) {
const ctx = await getFullJobContext(jobId)
if (!ctx) return toolError('Job claimed but context fetch failed')
const wt = await attachWorktreeToJob(ctx.product.id, jobId)
if ('error' in wt) return toolError(wt.error)
return toolJson({ ...ctx, worktree_path: wt.worktree_path, branch_name: wt.branch_name })
}
} finally {
clearInterval(heartbeatTimer)
await listenClient.end().catch(() => {})
}
return toolJson({ status: 'timeout', message: 'No job available within wait window' })
} finally {
// Deregister presence and notify
await deleteWorker(tokenId).catch(() => {})
try {
const pg = new Client({ connectionString: process.env.DATABASE_URL })
await pg.connect()
await pg.query(
`SELECT pg_notify('scrum4me_changes', $1)`,
[JSON.stringify({ type: 'worker_disconnected', user_id: userId, token_id: tokenId })],
)
await pg.end()
} catch {
// non-fatal
}
await listenClient.end().catch(() => {})
}
return toolJson({ status: 'timeout', message: 'No job available within wait window' })
}),
)
}