diff --git a/app/api/realtime/solo/route.ts b/app/api/realtime/solo/route.ts new file mode 100644 index 0000000..ea6fea8 --- /dev/null +++ b/app/api/realtime/solo/route.ts @@ -0,0 +1,190 @@ +// ST-802: Server-Sent Events stream voor het Solo Paneel. +// +// Opent een dedicated pg-verbinding (DIRECT_URL) en LISTEN't op het +// `scrum4me_changes`-kanaal. Filtert events server-side op product +// (uit query-param), sprint (actieve sprint van het product), en +// persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL +// voor stories — claim-lijst). +// +// Auth: iron-session cookie. Demo-tokens mogen lezen. +// Output: text/event-stream met JSON-payloads + heartbeat-comments. +// Sluit zelf na 240s als safety-net; client herconnect. + +import { NextRequest } from 'next/server' +import { Client } from 'pg' +import { getSession } from '@/lib/auth' +import { getAccessibleProduct } from '@/lib/product-access' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 300 + +const CHANNEL = 'scrum4me_changes' +const HEARTBEAT_MS = 25_000 +const HARD_CLOSE_MS = 240_000 + +interface NotifyPayload { + op: 'I' | 'U' | 'D' + entity: 'task' | 'story' + id: string + story_id?: string + product_id: string + sprint_id: string | null + assignee_id: string | null + changed_fields?: string[] +} + +function shouldEmit( + payload: NotifyPayload, + productId: string, + activeSprintId: string | null, + userId: string, +): boolean { + if (payload.product_id !== productId) return false + + // Sprint scope: alleen events binnen de actieve sprint (of zonder sprint + // voor unassigned-story claims die in de claim-sheet horen). + if (payload.entity === 'story' && payload.assignee_id === null) { + // Unassigned story (claim-lijst) — toon altijd, ongeacht sprint + return payload.sprint_id === activeSprintId || payload.sprint_id === null + } + if (payload.sprint_id !== activeSprintId) return false + + // Persoonlijke relevantie + return payload.assignee_id === userId +} + +export async function GET(request: NextRequest) { + const session = await getSession() + if (!session.userId) { + return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) + } + const userId = session.userId + + const productId = request.nextUrl.searchParams.get('product_id') + if (!productId) { + return Response.json({ error: 'product_id is verplicht' }, { status: 400 }) + } + + const product = await getAccessibleProduct(productId, userId) + if (!product) { + return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 }) + } + + const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL + if (!directUrl) { + return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }) + } + + const encoder = new TextEncoder() + const pgClient = new Client({ connectionString: directUrl }) + + let heartbeatTimer: ReturnType | null = null + let hardCloseTimer: ReturnType | null = null + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + const enqueue = (chunk: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(chunk)) + } catch { + // Stream al gesloten — controller throwt, negeren + } + } + + const cleanup = async (reason: string) => { + if (closed) return + closed = true + if (heartbeatTimer) clearInterval(heartbeatTimer) + if (hardCloseTimer) clearTimeout(hardCloseTimer) + try { + await pgClient.end() + } catch { + // ignore + } + try { + controller.close() + } catch { + // already closed + } + if (process.env.NODE_ENV !== 'production') { + console.log(`[realtime/solo] closed: ${reason}`) + } + } + + // Resolve actieve sprint éénmalig per connectie + const sprint = await prisma_sprint_findActive(productId) + const activeSprintId = sprint?.id ?? null + + try { + await pgClient.connect() + await pgClient.query(`LISTEN ${CHANNEL}`) + } catch (err) { + enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`) + await cleanup('pg connect failed') + return + } + + pgClient.on('notification', (msg) => { + if (!msg.payload) return + let payload: NotifyPayload + try { + payload = JSON.parse(msg.payload) as NotifyPayload + } catch { + return + } + if (!shouldEmit(payload, productId, activeSprintId, userId)) return + enqueue(`data: ${msg.payload}\n\n`) + }) + + pgClient.on('error', async () => { + await cleanup('pg error') + }) + + // Stuur eerst een "ready"-event zodat de client weet dat de connectie staat + enqueue( + `event: ready\ndata: ${JSON.stringify({ + product_id: productId, + sprint_id: activeSprintId, + })}\n\n`, + ) + + // Heartbeat als SSE-comment — voorkomt proxy-timeouts + heartbeatTimer = setInterval(() => { + enqueue(`: heartbeat\n\n`) + }, HEARTBEAT_MS) + + // Hard-close safety: Vercel kapt na maxDuration; we sluiten zelf eerder + hardCloseTimer = setTimeout(() => { + cleanup('hard close 240s') + }, HARD_CLOSE_MS) + + // Client trekt de stekker (tab dicht, refresh, etc.) + request.signal.addEventListener('abort', () => { + cleanup('client aborted') + }) + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }, + }) +} + +// Lokaal helper — Prisma vermijden voor deze ene query om de pg-only flow +// schoon te houden. Geeft de actieve sprint van een product, of null. +async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> { + const { prisma } = await import('@/lib/prisma') + return prisma.sprint.findFirst({ + where: { product_id: productId, status: 'ACTIVE' }, + select: { id: true }, + orderBy: { created_at: 'desc' }, + }) +}