// ST-802: Server-Sent Events stream voor het Solo Paneel. // // Opent een dedicated pg-verbinding (DIRECT_URL) en LISTEN't op het // `scrum4me_changes`-kanaal. Filtert events server-side op product // (uit query-param), sprint (actieve sprint van het product), en // persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL // voor stories — claim-lijst). // // Auth: iron-session cookie. Demo-tokens mogen lezen. // Output: text/event-stream met JSON-payloads + heartbeat-comments. // Sluit zelf na 240s als safety-net; client herconnect. import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { getAccessibleProduct } from '@/lib/product-access' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' export const maxDuration = 300 const CHANNEL = 'scrum4me_changes' const HEARTBEAT_MS = 25_000 const HARD_CLOSE_MS = 240_000 type EntityPayload = { op: 'I' | 'U' | 'D' // M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes- // kanaal; we filteren die hieronder weg zodat solo-clients geen // notification-events ontvangen waar ze niets mee doen. entity: 'task' | 'story' | 'question' id: string story_id?: string product_id: string sprint_id: string | null assignee_id: string | null changed_fields?: string[] } type JobPayload = { type: 'claude_job_enqueued' | 'claude_job_status' job_id: string task_id: string user_id: string product_id: string status: string branch?: string pushed_at?: string pr_url?: string verify_result?: string summary?: string error?: string } type WorkerPayload = { type: 'worker_connected' | 'worker_disconnected' user_id: string token_id: string product_id?: string } type NotifyPayload = EntityPayload | JobPayload | WorkerPayload function isJobPayload(p: NotifyPayload): p is JobPayload { return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status') } function isWorkerPayload(p: NotifyPayload): p is WorkerPayload { return 'type' in p && (p.type === 'worker_connected' || p.type === 'worker_disconnected') } function shouldEmit( payload: NotifyPayload, productId: string, activeSprintId: string | null, userId: string, ): boolean { if (isJobPayload(payload)) { return payload.user_id === userId && payload.product_id === productId } if (isWorkerPayload(payload)) { return payload.user_id === userId } // M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier. if (payload.entity === 'question') return false if (payload.product_id !== productId) return false // Sprint scope: alleen events binnen de actieve sprint (of zonder sprint // voor unassigned-story claims die in de claim-sheet horen). if (payload.entity === 'story' && payload.assignee_id === null) { // Unassigned story (claim-lijst) — toon altijd, ongeacht sprint return payload.sprint_id === activeSprintId || payload.sprint_id === null } if (payload.sprint_id !== activeSprintId) return false // Persoonlijke relevantie return payload.assignee_id === userId } export async function GET(request: NextRequest) { const session = await getSession() if (!session.userId) { return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) } const userId = session.userId const productId = request.nextUrl.searchParams.get('product_id') if (!productId) { return Response.json({ error: 'product_id is verplicht' }, { status: 400 }) } const product = await getAccessibleProduct(productId, userId) if (!product) { return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 }) } const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL if (!directUrl) { return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }) } const encoder = new TextEncoder() const pgClient = new Client({ connectionString: directUrl }) let heartbeatTimer: ReturnType | null = null let hardCloseTimer: ReturnType | null = null let closed = false const stream = new ReadableStream({ async start(controller) { const enqueue = (chunk: string) => { if (closed) return try { controller.enqueue(encoder.encode(chunk)) } catch { // Stream al gesloten — controller throwt, negeren } } const cleanup = async (reason: string) => { if (closed) return closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) try { await pgClient.end() } catch { // ignore } try { controller.close() } catch { // already closed } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/solo] closed: ${reason}`) } } // Resolve actieve sprint éénmalig per connectie const sprint = await prisma_sprint_findActive(productId) const activeSprintId = sprint?.id ?? null try { await pgClient.connect() await pgClient.query(`LISTEN ${CHANNEL}`) } catch (err) { console.error('[realtime/solo] pg connect/listen failed:', err) enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`) await cleanup('pg connect failed') return } pgClient.on('notification', (msg) => { if (!msg.payload) return let payload: NotifyPayload try { payload = JSON.parse(msg.payload) as NotifyPayload } catch { return } if (!shouldEmit(payload, productId, activeSprintId, userId)) return enqueue(`data: ${msg.payload}\n\n`) }) pgClient.on('error', async (err) => { console.error('[realtime/solo] pg client error:', err) await cleanup('pg error') }) // Stuur eerst een "ready"-event zodat de client weet dat de connectie staat enqueue( `event: ready\ndata: ${JSON.stringify({ product_id: productId, sprint_id: activeSprintId, })}\n\n`, ) // Stuur initiële ClaudeJob-state zodat de UI synchroon is bij reconnect const activeJobs = await prisma_jobs_findActive(userId, productId) if (activeJobs.length > 0) { enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`) } // Stale workers opruimen + actieve count sturen await prisma_workers_cleanup() const workerCount = await prisma_workers_count(userId) enqueue(`event: workers_initial\ndata: ${JSON.stringify({ count: workerCount })}\n\n`) // Heartbeat als SSE-comment — voorkomt proxy-timeouts heartbeatTimer = setInterval(() => { enqueue(`: heartbeat\n\n`) }, HEARTBEAT_MS) // Hard-close safety: Vercel kapt na maxDuration; we sluiten zelf eerder hardCloseTimer = setTimeout(() => { cleanup('hard close 240s') }, HARD_CLOSE_MS) // Client trekt de stekker (tab dicht, refresh, etc.) request.signal.addEventListener('abort', () => { cleanup('client aborted') }) }, }) return new Response(stream, { headers: { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }, }) } async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> { const { prisma } = await import('@/lib/prisma') return prisma.sprint.findFirst({ where: { product_id: productId, status: 'ACTIVE' }, select: { id: true }, orderBy: { created_at: 'desc' }, }) } async function prisma_jobs_findActive(userId: string, productId: string) { const { prisma } = await import('@/lib/prisma') const { jobStatusToApi } = await import('@/lib/job-status') const today = new Date() today.setHours(0, 0, 0, 0) const jobs = await prisma.claudeJob.findMany({ where: { user_id: userId, product_id: productId, OR: [ { status: { in: ['QUEUED', 'CLAIMED', 'RUNNING'] } }, { status: { in: ['DONE', 'FAILED'] }, finished_at: { gte: today } }, ], }, select: { id: true, task_id: true, status: true, branch: true, pushed_at: true, pr_url: true, verify_result: true, summary: true, error: true, }, orderBy: { created_at: 'asc' }, }) return jobs.map(j => ({ job_id: j.id, task_id: j.task_id, status: jobStatusToApi(j.status), branch: j.branch ?? undefined, pushed_at: j.pushed_at?.toISOString() ?? undefined, pr_url: j.pr_url ?? undefined, verify_result: j.verify_result?.toLowerCase() as import('@/stores/solo-store').VerifyResultApi | undefined, summary: j.summary ?? undefined, error: j.error ?? undefined, })) } const WORKER_STALE_MS = 60_000 async function prisma_workers_cleanup() { const { prisma } = await import('@/lib/prisma') await prisma.claudeWorker.deleteMany({ where: { last_seen_at: { lt: new Date(Date.now() - WORKER_STALE_MS) } }, }) } async function prisma_workers_count(userId: string): Promise { const { prisma } = await import('@/lib/prisma') return prisma.claudeWorker.count({ where: { user_id: userId, last_seen_at: { gt: new Date(Date.now() - 15_000) }, }, }) }