* feat(M13 T-519b): SSE worker_heartbeat + NavBar stand-by badge Aanvulling op scrum4me-mcp PR #25 (worker_heartbeat MCP-tool). - app/api/realtime/solo/route.ts: WorkerHeartbeatPayload type + isWorkerHeartbeatPayload guard + shouldEmit-routing op user_id. - stores/solo-store.ts: workerQuotaPct + workerQuotaCheckAt state + setWorkerQuota action. Reset bij decrementWorkers naar 0. - lib/realtime/use-solo-realtime.ts: handle worker_heartbeat-event, roep setWorkerQuota. - components/solo/nav-status-indicators.tsx: stand-by badge wanneer workerQuotaPct < minQuotaPct + tooltip met drempel. - components/shared/nav-bar.tsx + app/(app)/layout.tsx: minQuotaPct prop plumbing van User.min_quota_pct naar NavStatusIndicators. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(M13 T-520b): pre-flight quota-check sectie in mcp-integration Documenteert de batch-loop-uitbreiding: 1. get_worker_settings → min_quota_pct 2. bin/worker-quota-probe.sh → pct + reset 3. worker_heartbeat naar server (NavBar stand-by-badge) 4. Sleep tot reset bij low quota; anders wait_for_job Verwijst naar bin/worker-quota-probe.sh in scrum4me-docker (zie PR daar). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
319 lines
10 KiB
TypeScript
319 lines
10 KiB
TypeScript
// ST-802: Server-Sent Events stream voor het Solo Paneel.
|
|
//
|
|
// Opent een dedicated pg-verbinding (DIRECT_URL) en LISTEN't op het
|
|
// `scrum4me_changes`-kanaal. Filtert events server-side op product
|
|
// (uit query-param), sprint (actieve sprint van het product), en
|
|
// persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL
|
|
// voor stories — claim-lijst).
|
|
//
|
|
// Auth: iron-session cookie. Demo-tokens mogen lezen.
|
|
// Output: text/event-stream met JSON-payloads + heartbeat-comments.
|
|
// Sluit zelf na 240s als safety-net; client herconnect.
|
|
|
|
import { NextRequest } from 'next/server'
|
|
import { Client } from 'pg'
|
|
import { getSession } from '@/lib/auth'
|
|
import { getAccessibleProduct } from '@/lib/product-access'
|
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
|
|
|
export const runtime = 'nodejs'
|
|
export const dynamic = 'force-dynamic'
|
|
export const maxDuration = 300
|
|
|
|
const CHANNEL = 'scrum4me_changes'
|
|
const HEARTBEAT_MS = 25_000
|
|
const HARD_CLOSE_MS = 240_000
|
|
|
|
type EntityPayload = {
|
|
op: 'I' | 'U' | 'D'
|
|
// M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes-
|
|
// kanaal; we filteren die hieronder weg zodat solo-clients geen
|
|
// notification-events ontvangen waar ze niets mee doen.
|
|
entity: 'task' | 'story' | 'question'
|
|
id: string
|
|
story_id?: string
|
|
product_id: string
|
|
sprint_id: string | null
|
|
assignee_id: string | null
|
|
changed_fields?: string[]
|
|
}
|
|
|
|
type JobPayload = {
|
|
type: 'claude_job_enqueued' | 'claude_job_status'
|
|
job_id: string
|
|
task_id?: string | null
|
|
// M12: idea-jobs zetten kind + idea_id ipv task_id. Solo filtert die weg
|
|
// (idea-jobs horen op /api/realtime/notifications, niet op het Solo Paneel).
|
|
idea_id?: string | null
|
|
kind?: 'TASK_IMPLEMENTATION' | 'IDEA_GRILL' | 'IDEA_MAKE_PLAN'
|
|
user_id: string
|
|
product_id: string
|
|
status: string
|
|
branch?: string
|
|
pushed_at?: string
|
|
pr_url?: string
|
|
verify_result?: string
|
|
summary?: string
|
|
error?: string
|
|
}
|
|
|
|
type WorkerPayload = {
|
|
type: 'worker_connected' | 'worker_disconnected'
|
|
user_id: string
|
|
token_id: string
|
|
product_id?: string
|
|
}
|
|
|
|
// M13: per-iteration quota-rapport van de worker. Geen product-scope —
|
|
// elke heartbeat geldt voor alle producten waar deze user toegang toe heeft.
|
|
type WorkerHeartbeatPayload = {
|
|
type: 'worker_heartbeat'
|
|
user_id: string
|
|
token_id: string
|
|
last_quota_pct: number
|
|
last_quota_check_at: string
|
|
}
|
|
|
|
type NotifyPayload = EntityPayload | JobPayload | WorkerPayload | WorkerHeartbeatPayload
|
|
|
|
function isJobPayload(p: NotifyPayload): p is JobPayload {
|
|
return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status')
|
|
}
|
|
|
|
function isWorkerPayload(p: NotifyPayload): p is WorkerPayload {
|
|
return 'type' in p && (p.type === 'worker_connected' || p.type === 'worker_disconnected')
|
|
}
|
|
|
|
function isWorkerHeartbeatPayload(p: NotifyPayload): p is WorkerHeartbeatPayload {
|
|
return 'type' in p && p.type === 'worker_heartbeat'
|
|
}
|
|
|
|
function shouldEmit(
|
|
payload: NotifyPayload,
|
|
productId: string,
|
|
activeSprintId: string | null,
|
|
userId: string,
|
|
): boolean {
|
|
if (isJobPayload(payload)) {
|
|
// M12: skip idea-jobs (kind=IDEA_*) — die horen op /api/realtime/notifications.
|
|
if (payload.kind === 'IDEA_GRILL' || payload.kind === 'IDEA_MAKE_PLAN') return false
|
|
return payload.user_id === userId && payload.product_id === productId
|
|
}
|
|
|
|
if (isWorkerPayload(payload)) {
|
|
return payload.user_id === userId
|
|
}
|
|
|
|
if (isWorkerHeartbeatPayload(payload)) {
|
|
return payload.user_id === userId
|
|
}
|
|
|
|
// M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier.
|
|
if (payload.entity === 'question') return false
|
|
|
|
if (payload.product_id !== productId) return false
|
|
|
|
// Sprint scope: alleen events binnen de actieve sprint (of zonder sprint
|
|
// voor unassigned-story claims die in de claim-sheet horen).
|
|
if (payload.entity === 'story' && payload.assignee_id === null) {
|
|
// Unassigned story (claim-lijst) — toon altijd, ongeacht sprint
|
|
return payload.sprint_id === activeSprintId || payload.sprint_id === null
|
|
}
|
|
if (payload.sprint_id !== activeSprintId) return false
|
|
|
|
// Persoonlijke relevantie
|
|
return payload.assignee_id === userId
|
|
}
|
|
|
|
export async function GET(request: NextRequest) {
|
|
const session = await getSession()
|
|
if (!session.userId) {
|
|
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
|
|
}
|
|
const userId = session.userId
|
|
|
|
const productId = request.nextUrl.searchParams.get('product_id')
|
|
if (!productId) {
|
|
return Response.json({ error: 'product_id is verplicht' }, { status: 400 })
|
|
}
|
|
|
|
const product = await getAccessibleProduct(productId, userId)
|
|
if (!product) {
|
|
return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 })
|
|
}
|
|
|
|
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
|
|
if (!directUrl) {
|
|
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
|
|
}
|
|
|
|
const encoder = new TextEncoder()
|
|
const pgClient = new Client({ connectionString: directUrl })
|
|
|
|
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
|
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
|
|
let closed = false
|
|
|
|
const stream = new ReadableStream({
|
|
async start(controller) {
|
|
const enqueue = (chunk: string) => {
|
|
if (closed) return
|
|
try {
|
|
controller.enqueue(encoder.encode(chunk))
|
|
} catch {
|
|
// Stream al gesloten — controller throwt, negeren
|
|
}
|
|
}
|
|
|
|
const cleanup = async (reason: string) => {
|
|
if (closed) return
|
|
closed = true
|
|
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
|
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
|
await closePgClientSafely(pgClient, 'realtime/solo')
|
|
try {
|
|
controller.close()
|
|
} catch {
|
|
// already closed
|
|
}
|
|
if (process.env.NODE_ENV !== 'production') {
|
|
console.log(`[realtime/solo] closed: ${reason}`)
|
|
}
|
|
}
|
|
|
|
// Resolve actieve sprint éénmalig per connectie
|
|
const sprint = await prisma_sprint_findActive(productId)
|
|
const activeSprintId = sprint?.id ?? null
|
|
|
|
try {
|
|
await pgClient.connect()
|
|
await pgClient.query(`LISTEN ${CHANNEL}`)
|
|
} catch (err) {
|
|
console.error('[realtime/solo] pg connect/listen failed:', err)
|
|
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
|
|
await cleanup('pg connect failed')
|
|
return
|
|
}
|
|
|
|
pgClient.on('notification', (msg) => {
|
|
if (!msg.payload) return
|
|
let payload: NotifyPayload
|
|
try {
|
|
payload = JSON.parse(msg.payload) as NotifyPayload
|
|
} catch {
|
|
return
|
|
}
|
|
if (!shouldEmit(payload, productId, activeSprintId, userId)) return
|
|
enqueue(`data: ${msg.payload}\n\n`)
|
|
})
|
|
|
|
pgClient.on('error', async (err) => {
|
|
console.error('[realtime/solo] pg client error:', err)
|
|
await cleanup('pg error')
|
|
})
|
|
|
|
// Stuur eerst een "ready"-event zodat de client weet dat de connectie staat
|
|
enqueue(
|
|
`event: ready\ndata: ${JSON.stringify({
|
|
product_id: productId,
|
|
sprint_id: activeSprintId,
|
|
})}\n\n`,
|
|
)
|
|
|
|
// Stuur initiële ClaudeJob-state zodat de UI synchroon is bij reconnect
|
|
const activeJobs = await prisma_jobs_findActive(userId, productId)
|
|
if (activeJobs.length > 0) {
|
|
enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`)
|
|
}
|
|
|
|
// Stale workers opruimen + actieve count sturen
|
|
await prisma_workers_cleanup()
|
|
const workerCount = await prisma_workers_count(userId)
|
|
enqueue(`event: workers_initial\ndata: ${JSON.stringify({ count: workerCount })}\n\n`)
|
|
|
|
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
|
|
heartbeatTimer = setInterval(() => {
|
|
enqueue(`: heartbeat\n\n`)
|
|
}, HEARTBEAT_MS)
|
|
|
|
// Hard-close safety: Vercel kapt na maxDuration; we sluiten zelf eerder
|
|
hardCloseTimer = setTimeout(() => {
|
|
cleanup('hard close 240s')
|
|
}, HARD_CLOSE_MS)
|
|
|
|
// Client trekt de stekker (tab dicht, refresh, etc.)
|
|
request.signal.addEventListener('abort', () => {
|
|
cleanup('client aborted')
|
|
})
|
|
},
|
|
})
|
|
|
|
return new Response(stream, {
|
|
headers: {
|
|
'Content-Type': 'text/event-stream; charset=utf-8',
|
|
'Cache-Control': 'no-cache, no-transform',
|
|
Connection: 'keep-alive',
|
|
'X-Accel-Buffering': 'no',
|
|
},
|
|
})
|
|
}
|
|
|
|
async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> {
|
|
const { prisma } = await import('@/lib/prisma')
|
|
return prisma.sprint.findFirst({
|
|
where: { product_id: productId, status: 'ACTIVE' },
|
|
select: { id: true },
|
|
orderBy: { created_at: 'desc' },
|
|
})
|
|
}
|
|
|
|
async function prisma_jobs_findActive(userId: string, productId: string) {
|
|
const { prisma } = await import('@/lib/prisma')
|
|
const { jobStatusToApi } = await import('@/lib/job-status')
|
|
const today = new Date()
|
|
today.setHours(0, 0, 0, 0)
|
|
const jobs = await prisma.claudeJob.findMany({
|
|
where: {
|
|
user_id: userId,
|
|
product_id: productId,
|
|
OR: [
|
|
{ status: { in: ['QUEUED', 'CLAIMED', 'RUNNING'] } },
|
|
{ status: { in: ['DONE', 'FAILED'] }, finished_at: { gte: today } },
|
|
],
|
|
},
|
|
select: {
|
|
id: true, task_id: true, status: true, branch: true, pushed_at: true, pr_url: true, verify_result: true, summary: true, error: true,
|
|
},
|
|
orderBy: { created_at: 'asc' },
|
|
})
|
|
return jobs.map(j => ({
|
|
job_id: j.id,
|
|
task_id: j.task_id,
|
|
status: jobStatusToApi(j.status),
|
|
branch: j.branch ?? undefined,
|
|
pushed_at: j.pushed_at?.toISOString() ?? undefined,
|
|
pr_url: j.pr_url ?? undefined,
|
|
verify_result: j.verify_result?.toLowerCase() as import('@/stores/solo-store').VerifyResultApi | undefined,
|
|
summary: j.summary ?? undefined,
|
|
error: j.error ?? undefined,
|
|
}))
|
|
}
|
|
|
|
const WORKER_STALE_MS = 60_000
|
|
|
|
async function prisma_workers_cleanup() {
|
|
const { prisma } = await import('@/lib/prisma')
|
|
await prisma.claudeWorker.deleteMany({
|
|
where: { last_seen_at: { lt: new Date(Date.now() - WORKER_STALE_MS) } },
|
|
})
|
|
}
|
|
|
|
async function prisma_workers_count(userId: string): Promise<number> {
|
|
const { prisma } = await import('@/lib/prisma')
|
|
return prisma.claudeWorker.count({
|
|
where: {
|
|
user_id: userId,
|
|
last_seen_at: { gt: new Date(Date.now() - 15_000) },
|
|
},
|
|
})
|
|
}
|