// ST-1104: User-scoped Server-Sent Events stream voor de notificatie-bel (M11). // // Wordt door in app/(app)/layout.tsx gemount zodra de // gebruiker is ingelogd. In tegenstelling tot /api/realtime/solo (product- // scoped, voor één Solo-bord) is deze stream **user-scoped**: hij filtert // op alle producten waar de ingelogde user toegang toe heeft, zodat de bell // flikkert ongeacht op welke pagina je staat. // // Auth: iron-session cookie. Demo-tokens mogen lezen. // Output: text/event-stream — `event:state` met initial open-questions list, // daarna `data:` events bij elke status-overgang in scrum4me_changes. // Sluit zelf na 240s als safety-net; client herconnect. import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { prisma } from '@/lib/prisma' import { productAccessFilter } from '@/lib/product-access' import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' export const maxDuration = 300 const CHANNEL = 'scrum4me_changes' const HEARTBEAT_MS = 25_000 const HARD_CLOSE_MS = 240_000 // Question-payloads: emitted by the notify_question_change trigger on // claude_questions. story_id and idea_id are mutually exclusive (DB-level // check-constraint added in M12). interface QuestionPayload { op: 'I' | 'U' entity: 'question' id: string product_id: string story_id?: string | null task_id?: string | null idea_id?: string | null assignee_id?: string | null status?: string } // Idea-job-payloads: emitted by actions/ideas.ts (startGrillJobAction etc.) // via prisma.$executeRaw pg_notify. Always carries user_id + idea_id + kind. interface IdeaJobPayload { type: 'claude_job_enqueued' | 'claude_job_status' job_id: string idea_id: string user_id: string product_id?: string | null kind: 'IDEA_GRILL' | 'IDEA_MAKE_PLAN' | 'PLAN_CHAT' status: string } // UserQuestion-payloads: emitted by app/api/user-questions/[id]/answer and // actions/user-questions.ts via prisma.$executeRaw pg_notify. interface UserQuestionPayload { op: 'I' | 'U' entity: 'user_question' id: string idea_id: string status: 'pending' | 'answered' } type NotifyPayload = QuestionPayload | IdeaJobPayload | UserQuestionPayload function isQuestionPayload(p: NotifyPayload): p is QuestionPayload { return 'entity' in p && p.entity === 'question' } function isUserQuestionPayload(p: NotifyPayload): p is UserQuestionPayload { return 'entity' in p && p.entity === 'user_question' } function isIdeaJobPayload(p: NotifyPayload): p is IdeaJobPayload { return ( 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status') && 'idea_id' in p && 'kind' in p && (p.kind === 'IDEA_GRILL' || p.kind === 'IDEA_MAKE_PLAN' || p.kind === 'PLAN_CHAT') ) } export async function GET(request: NextRequest) { const session = await getSession() if (!session.userId) { return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) } const userId = session.userId // Haal alle accessible product-IDs één keer op — gebruikt voor SSE-filter en // voor de initial-state query. Geen real-time refresh als de user halverwege // toegang krijgt of verliest; reconnect lost dat op (frontend doet dat al). const products = await prisma.product.findMany({ where: { archived: false, ...productAccessFilter(userId) }, select: { id: true }, }) const accessibleProductIds = new Set(products.map((p) => p.id)) // M12: idea-questions zijn strikt user_id-only (geen productAccessFilter). // We pre-fetchen de user's idea-ids zodat we snel kunnen filteren op het // SSE-pad — geen DB-call per event. const userIdeas = await prisma.idea.findMany({ where: { user_id: userId }, select: { id: true }, }) const accessibleIdeaIds = new Set(userIdeas.map((i) => i.id)) const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL if (!directUrl) { return Response.json( { error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }, ) } const encoder = new TextEncoder() const pgClient = new Client({ connectionString: directUrl }) let heartbeatTimer: ReturnType | null = null let hardCloseTimer: ReturnType | null = null let closed = false const stream = new ReadableStream({ async start(controller) { const enqueue = (chunk: string) => { if (closed) return try { controller.enqueue(encoder.encode(chunk)) } catch { // controller al gesloten — negeren } } const cleanup = async (reason: string) => { if (closed) return closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) await closePgClientSafely(pgClient, 'realtime/notifications') try { controller.close() } catch { // already closed } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/notifications] closed: ${reason}`) } } try { await pgClient.connect() await pgClient.query(`LISTEN ${CHANNEL}`) } catch (err) { console.error('[realtime/notifications] pg connect/listen failed:', err) enqueue( `event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`, ) await cleanup('pg connect failed') return } pgClient.on('notification', (msg) => { if (!msg.payload) return let payload: NotifyPayload try { payload = JSON.parse(msg.payload) as NotifyPayload } catch { return } if (isIdeaJobPayload(payload)) { // M12: idea-jobs zijn user-scoped, niet product-scoped. if (payload.user_id !== userId) return enqueue(`data: ${msg.payload}\n\n`) return } // UserQuestion (PLAN_CHAT answer-event): user-scoped via idea ownership. if (isUserQuestionPayload(payload)) { if (!accessibleIdeaIds.has(payload.idea_id)) return enqueue(`data: ${msg.payload}\n\n`) return } if (!isQuestionPayload(payload)) return // Idea-question: alleen voor de eigenaar van het idee. if (payload.idea_id) { if (!accessibleIdeaIds.has(payload.idea_id)) return enqueue(`data: ${msg.payload}\n\n`) return } // Story-question: bestaande product-access-check. if (!accessibleProductIds.has(payload.product_id)) return enqueue(`data: ${msg.payload}\n\n`) }) pgClient.on('error', (err) => { console.error('[realtime/notifications] pg client error:', err) cleanup('pg error') }) // Initial state ná LISTEN actief — race-fix conform M10 ST-1004 / ST-1006. // Voorkomt dat een vraag die net vóór SSE-open landt verloren gaat. // M12 hotfix: óók idea-questions (user-private), zodat de bel // gehydrateerd blijft na elke close+reconnect-cycle. const [storyOpen, ideaOpen] = await Promise.all([ prisma.claudeQuestion.findMany({ where: { status: 'open', expires_at: { gt: new Date() }, product_id: { in: products.map((p) => p.id) }, }, orderBy: { created_at: 'desc' }, take: 100, select: { id: true, product_id: true, story_id: true, task_id: true, question: true, options: true, created_at: true, expires_at: true, story: { select: { code: true, title: true, assignee_id: true } }, }, }), prisma.claudeQuestion.findMany({ where: { status: 'open', expires_at: { gt: new Date() }, idea: { user_id: userId }, }, orderBy: { created_at: 'desc' }, take: 100, select: { id: true, product_id: true, idea_id: true, question: true, options: true, created_at: true, expires_at: true, idea: { select: { id: true, code: true, title: true } }, }, }), ]) const stateQuestions = [ ...storyOpen.flatMap((q) => { if (!q.story || q.story_id === null) return [] return [{ kind: 'story' as const, id: q.id, product_id: q.product_id, story_id: q.story_id, task_id: q.task_id, story_code: q.story.code, story_title: q.story.title, assignee_id: q.story.assignee_id, question: q.question, options: q.options, created_at: q.created_at.toISOString(), expires_at: q.expires_at.toISOString(), }] }), ...ideaOpen.flatMap((q) => { if (!q.idea || q.idea_id === null) return [] return [{ kind: 'idea' as const, id: q.id, product_id: q.product_id, idea_id: q.idea_id, idea_code: q.idea.code, idea_title: q.idea.title, question: q.question, options: q.options, created_at: q.created_at.toISOString(), expires_at: q.expires_at.toISOString(), }] }), ].sort((a, b) => (a.created_at < b.created_at ? 1 : -1)) const userQuestionsInit = await prisma.userQuestion.findMany({ where: { idea: { user_id: userId } }, orderBy: { created_at: 'desc' }, take: 100, select: { id: true, idea_id: true, question: true, answer: true, status: true, created_at: true }, }) enqueue(`event: state\ndata: ${JSON.stringify({ questions: stateQuestions, userQuestions: userQuestionsInit.map(uq => ({ ...uq, created_at: uq.created_at.toISOString() })) })}\n\n`) heartbeatTimer = setInterval(() => { enqueue(`: heartbeat\n\n`) }, HEARTBEAT_MS) hardCloseTimer = setTimeout(() => { cleanup('hard close 240s') }, HARD_CLOSE_MS) request.signal.addEventListener('abort', () => { cleanup('client aborted') }) }, }) return new Response(stream, { headers: { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }, }) }