// ST-1104: User-scoped Server-Sent Events stream voor de notificatie-bel (M11). // // Wordt door in app/(app)/layout.tsx gemount zodra de // gebruiker is ingelogd. In tegenstelling tot /api/realtime/solo (product- // scoped, voor één Solo-bord) is deze stream **user-scoped**: hij filtert // op alle producten waar de ingelogde user toegang toe heeft, zodat de bell // flikkert ongeacht op welke pagina je staat. // // Auth: iron-session cookie. Demo-tokens mogen lezen. // Output: text/event-stream — `event:state` met initial open-questions list, // daarna `data:` events bij elke status-overgang in scrum4me_changes. // Sluit zelf na 240s als safety-net; client herconnect. import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { prisma } from '@/lib/prisma' import { productAccessFilter } from '@/lib/product-access' import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' export const maxDuration = 300 const CHANNEL = 'scrum4me_changes' const HEARTBEAT_MS = 25_000 const HARD_CLOSE_MS = 240_000 interface NotifyPayload { op: 'I' | 'U' entity: 'task' | 'story' | 'question' id: string product_id: string story_id?: string task_id?: string | null assignee_id?: string | null status?: string } export async function GET(request: NextRequest) { const session = await getSession() if (!session.userId) { return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) } const userId = session.userId // Haal alle accessible product-IDs één keer op — gebruikt voor SSE-filter en // voor de initial-state query. Geen real-time refresh als de user halverwege // toegang krijgt of verliest; reconnect lost dat op (frontend doet dat al). const products = await prisma.product.findMany({ where: { archived: false, ...productAccessFilter(userId) }, select: { id: true }, }) const accessibleProductIds = new Set(products.map((p) => p.id)) const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL if (!directUrl) { return Response.json( { error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }, ) } const encoder = new TextEncoder() const pgClient = new Client({ connectionString: directUrl }) let heartbeatTimer: ReturnType | null = null let hardCloseTimer: ReturnType | null = null let closed = false const stream = new ReadableStream({ async start(controller) { const enqueue = (chunk: string) => { if (closed) return try { controller.enqueue(encoder.encode(chunk)) } catch { // controller al gesloten — negeren } } const cleanup = async (reason: string) => { if (closed) return closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) await closePgClientSafely(pgClient, 'realtime/notifications') try { controller.close() } catch { // already closed } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/notifications] closed: ${reason}`) } } try { await pgClient.connect() await pgClient.query(`LISTEN ${CHANNEL}`) } catch (err) { console.error('[realtime/notifications] pg connect/listen failed:', err) enqueue( `event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`, ) await cleanup('pg connect failed') return } pgClient.on('notification', (msg) => { if (!msg.payload) return let payload: NotifyPayload try { payload = JSON.parse(msg.payload) as NotifyPayload } catch { return } if (payload.entity !== 'question') return if (!accessibleProductIds.has(payload.product_id)) return enqueue(`data: ${msg.payload}\n\n`) }) pgClient.on('error', (err) => { console.error('[realtime/notifications] pg client error:', err) cleanup('pg error') }) // Initial state ná LISTEN actief — race-fix conform M10 ST-1004 / ST-1006. // Voorkomt dat een vraag die net vóór SSE-open landt verloren gaat. const openQuestions = await prisma.claudeQuestion.findMany({ where: { status: 'open', expires_at: { gt: new Date() }, product_id: { in: products.map((p) => p.id) }, story_id: { not: null }, }, orderBy: { created_at: 'desc' }, take: 100, select: { id: true, product_id: true, story_id: true, task_id: true, question: true, options: true, created_at: true, expires_at: true, story: { select: { code: true, title: true, assignee_id: true } }, }, }) enqueue( `event: state\ndata: ${JSON.stringify({ questions: openQuestions.flatMap((q) => { if (!q.story || q.story_id === null) return [] return [{ id: q.id, product_id: q.product_id, story_id: q.story_id, task_id: q.task_id, story_code: q.story.code, story_title: q.story.title, assignee_id: q.story.assignee_id, question: q.question, options: q.options, created_at: q.created_at.toISOString(), expires_at: q.expires_at.toISOString(), }] }), })}\n\n`, ) heartbeatTimer = setInterval(() => { enqueue(`: heartbeat\n\n`) }, HEARTBEAT_MS) hardCloseTimer = setTimeout(() => { cleanup('hard close 240s') }, HARD_CLOSE_MS) request.signal.addEventListener('abort', () => { cleanup('client aborted') }) }, }) return new Response(stream, { headers: { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }, }) }