// PBI-76: User-scoped SSE stream voor user-settings cross-tab/cross-device sync. // // Wordt door in app/(app)/layout.tsx geopend zodra de // gebruiker is ingelogd. Filtert pg_notify-payloads op // `kind === 'user_settings' && userId === session.userId`. Settings worden // via prop al gehydrateerd; deze route levert alleen incrementele patches. // // Auth: iron-session cookie. Demo-tokens openen geen subscription (bridge // skipt voor isDemo). // Output: text/event-stream — `data:` met de patch (Partial). // Sluit zelf na 240s als safety-net; client herconnect. import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' export const maxDuration = 300 const CHANNEL = 'scrum4me_changes' const HEARTBEAT_MS = 25_000 const HARD_CLOSE_MS = 240_000 interface UserSettingsPayload { kind: 'user_settings' userId: string patch: Record } function isUserSettingsPayload(p: unknown): p is UserSettingsPayload { if (typeof p !== 'object' || p === null) return false const obj = p as Record return ( obj.kind === 'user_settings' && typeof obj.userId === 'string' && typeof obj.patch === 'object' && obj.patch !== null ) } export async function GET(request: NextRequest) { const session = await getSession() if (!session.userId) { return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) } const userId = session.userId const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL if (!directUrl) { return Response.json( { error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }, ) } const encoder = new TextEncoder() const pgClient = new Client({ connectionString: directUrl }) let heartbeatTimer: ReturnType | null = null let hardCloseTimer: ReturnType | null = null let closed = false const stream = new ReadableStream({ async start(controller) { const enqueue = (chunk: string) => { if (closed) return try { controller.enqueue(encoder.encode(chunk)) } catch { // controller already closed } } const cleanup = async (reason: string) => { if (closed) return closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) await closePgClientSafely(pgClient, 'realtime/user-settings') try { controller.close() } catch { // already closed } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/user-settings] closed: ${reason}`) } } try { await pgClient.connect() await pgClient.query(`LISTEN ${CHANNEL}`) } catch (err) { console.error('[realtime/user-settings] pg connect/listen failed:', err) enqueue( `event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`, ) await cleanup('pg connect failed') return } pgClient.on('notification', (msg) => { if (!msg.payload) return let payload: unknown try { payload = JSON.parse(msg.payload) } catch { return } if (!isUserSettingsPayload(payload)) return if (payload.userId !== userId) return enqueue(`data: ${JSON.stringify(payload.patch)}\n\n`) }) pgClient.on('error', (err) => { console.error('[realtime/user-settings] pg client error:', err) cleanup('pg error') }) enqueue(`: connected\n\n`) heartbeatTimer = setInterval(() => { enqueue(`: heartbeat\n\n`) }, HEARTBEAT_MS) hardCloseTimer = setTimeout(() => { cleanup('hard close 240s') }, HARD_CLOSE_MS) request.signal.addEventListener('abort', () => { cleanup('client aborted') }) }, }) return new Response(stream, { headers: { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }, }) }