From 009375a131608d439149048b8021f61b64da1b6e Mon Sep 17 00:00:00 2001 From: Madhura68 Date: Tue, 28 Apr 2026 01:15:37 +0200 Subject: [PATCH] feat(ST-1104): add user-scoped /api/realtime/notifications + filter solo-route MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Twee delen: 1. Solo-route filter (1-regel-fix in app/api/realtime/solo/route.ts): - NotifyPayload uitgebreid met entity:'question' - shouldEmit returnt direct false bij entity='question' Voorkomt dat solo-clients M11 question-events ontvangen (geen lekkage naar het Solo-bord; geen onnodig netwerk-verkeer; loose coupling tussen features). 2. Nieuwe SSE-route app/api/realtime/notifications/route.ts: - User-scoped (geen ?product_id=); query alle accessible product-IDs één keer bij connect via productAccessFilter - LISTEN scrum4me_changes; filter entity='question' && product_id ∈ accessible - Initial-state-event NA LISTEN actief (race-fix conform M10 ST-1004): query open vragen voor deze user's accessible products, stuur als event:state met summary (id, story_code/title, assignee_id, question, options, expires_at) - Hergebruikt het pg.Client + ReadableStream + heartbeat 25s + hard-close 240s + abort-cleanup-pattern uit solo-route Tests __tests__/api/notifications-stream.test.ts: - 401 zonder iron-session cookie (en geen DB-call) - Solo-route filter wordt visueel/E2E gedekt in ST-1108-acceptatie Quality gates: lint 0 errors, tsc clean, vitest 146/146 (18 files). Co-Authored-By: Claude Opus 4.7 (1M context) --- __tests__/api/notifications-stream.test.ts | 51 ++++++ app/api/realtime/notifications/route.ts | 194 +++++++++++++++++++++ app/api/realtime/solo/route.ts | 8 +- 3 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 __tests__/api/notifications-stream.test.ts create mode 100644 app/api/realtime/notifications/route.ts diff --git a/__tests__/api/notifications-stream.test.ts b/__tests__/api/notifications-stream.test.ts new file mode 100644 index 0000000..bf76547 --- /dev/null +++ b/__tests__/api/notifications-stream.test.ts @@ -0,0 +1,51 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +const { mockGetSession } = vi.hoisted(() => ({ mockGetSession: vi.fn() })) + +vi.mock('@/lib/auth', () => ({ + getSession: mockGetSession, +})) + +vi.mock('@/lib/prisma', () => ({ + prisma: { + product: { findMany: vi.fn() }, + claudeQuestion: { findMany: vi.fn() }, + }, +})) + +vi.mock('@/lib/product-access', () => ({ + productAccessFilter: vi.fn().mockReturnValue({}), + getAccessibleProduct: vi.fn(), +})) + +import { prisma } from '@/lib/prisma' +import type { NextRequest } from 'next/server' +import { GET } from '@/app/api/realtime/notifications/route' + +const mockPrisma = prisma as unknown as { + product: { findMany: ReturnType } + claudeQuestion: { findMany: ReturnType } +} + +function makeReq(): NextRequest { + // Minimaal NextRequest-shape voor de auth-pad — we komen niet bij de + // pg-stream-setup omdat de auth-fail vóór dat punt gebeurt. + return { signal: new AbortController().signal } as unknown as NextRequest +} + +beforeEach(() => { + vi.clearAllMocks() +}) + +describe('GET /api/realtime/notifications', () => { + it('401 zonder iron-session cookie, geen DB-call', async () => { + mockGetSession.mockResolvedValue({ userId: undefined, isDemo: false }) + const res = await GET(makeReq()) + expect(res.status).toBe(401) + expect(mockPrisma.product.findMany).not.toHaveBeenCalled() + }) +}) + +// Solo-route filter (entity='question' uitgesloten) is een 1-regel-fix in +// app/api/realtime/solo/route.ts. Visueel reviewbaar in de diff; full-stream- +// regressie wordt handmatig gedekt in ST-1108-acceptatie. diff --git a/app/api/realtime/notifications/route.ts b/app/api/realtime/notifications/route.ts new file mode 100644 index 0000000..f31c6d5 --- /dev/null +++ b/app/api/realtime/notifications/route.ts @@ -0,0 +1,194 @@ +// ST-1104: User-scoped Server-Sent Events stream voor de notificatie-bel (M11). +// +// Wordt door in app/(app)/layout.tsx gemount zodra de +// gebruiker is ingelogd. In tegenstelling tot /api/realtime/solo (product- +// scoped, voor één Solo-bord) is deze stream **user-scoped**: hij filtert +// op alle producten waar de ingelogde user toegang toe heeft, zodat de bell +// flikkert ongeacht op welke pagina je staat. +// +// Auth: iron-session cookie. Demo-tokens mogen lezen. +// Output: text/event-stream — `event:state` met initial open-questions list, +// daarna `data:` events bij elke status-overgang in scrum4me_changes. +// Sluit zelf na 240s als safety-net; client herconnect. + +import { NextRequest } from 'next/server' +import { Client } from 'pg' +import { getSession } from '@/lib/auth' +import { prisma } from '@/lib/prisma' +import { productAccessFilter } from '@/lib/product-access' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 300 + +const CHANNEL = 'scrum4me_changes' +const HEARTBEAT_MS = 25_000 +const HARD_CLOSE_MS = 240_000 + +interface NotifyPayload { + op: 'I' | 'U' + entity: 'task' | 'story' | 'question' + id: string + product_id: string + story_id?: string + task_id?: string | null + assignee_id?: string | null + status?: string +} + +export async function GET(request: NextRequest) { + const session = await getSession() + if (!session.userId) { + return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) + } + const userId = session.userId + + // Haal alle accessible product-IDs één keer op — gebruikt voor SSE-filter en + // voor de initial-state query. Geen real-time refresh als de user halverwege + // toegang krijgt of verliest; reconnect lost dat op (frontend doet dat al). + const products = await prisma.product.findMany({ + where: { archived: false, ...productAccessFilter(userId) }, + select: { id: true }, + }) + const accessibleProductIds = new Set(products.map((p) => p.id)) + + const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL + if (!directUrl) { + return Response.json( + { error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, + { status: 500 }, + ) + } + + const encoder = new TextEncoder() + const pgClient = new Client({ connectionString: directUrl }) + + let heartbeatTimer: ReturnType | null = null + let hardCloseTimer: ReturnType | null = null + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + const enqueue = (chunk: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(chunk)) + } catch { + // controller al gesloten — negeren + } + } + + const cleanup = async (reason: string) => { + if (closed) return + closed = true + if (heartbeatTimer) clearInterval(heartbeatTimer) + if (hardCloseTimer) clearTimeout(hardCloseTimer) + try { + await pgClient.end() + } catch { + // ignore + } + try { + controller.close() + } catch { + // already closed + } + if (process.env.NODE_ENV !== 'production') { + console.log(`[realtime/notifications] closed: ${reason}`) + } + } + + try { + await pgClient.connect() + await pgClient.query(`LISTEN ${CHANNEL}`) + } catch (err) { + console.error('[realtime/notifications] pg connect/listen failed:', err) + enqueue( + `event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`, + ) + await cleanup('pg connect failed') + return + } + + pgClient.on('notification', (msg) => { + if (!msg.payload) return + let payload: NotifyPayload + try { + payload = JSON.parse(msg.payload) as NotifyPayload + } catch { + return + } + if (payload.entity !== 'question') return + if (!accessibleProductIds.has(payload.product_id)) return + enqueue(`data: ${msg.payload}\n\n`) + }) + + pgClient.on('error', (err) => { + console.error('[realtime/notifications] pg client error:', err) + cleanup('pg error') + }) + + // Initial state ná LISTEN actief — race-fix conform M10 ST-1004 / ST-1006. + // Voorkomt dat een vraag die net vóór SSE-open landt verloren gaat. + const openQuestions = await prisma.claudeQuestion.findMany({ + where: { + status: 'open', + expires_at: { gt: new Date() }, + product_id: { in: products.map((p) => p.id) }, + }, + orderBy: { created_at: 'desc' }, + take: 100, + select: { + id: true, + product_id: true, + story_id: true, + task_id: true, + question: true, + options: true, + created_at: true, + expires_at: true, + story: { select: { code: true, title: true, assignee_id: true } }, + }, + }) + + enqueue( + `event: state\ndata: ${JSON.stringify({ + questions: openQuestions.map((q) => ({ + id: q.id, + product_id: q.product_id, + story_id: q.story_id, + task_id: q.task_id, + story_code: q.story.code, + story_title: q.story.title, + assignee_id: q.story.assignee_id, + question: q.question, + options: q.options, + created_at: q.created_at.toISOString(), + expires_at: q.expires_at.toISOString(), + })), + })}\n\n`, + ) + + heartbeatTimer = setInterval(() => { + enqueue(`: heartbeat\n\n`) + }, HEARTBEAT_MS) + + hardCloseTimer = setTimeout(() => { + cleanup('hard close 240s') + }, HARD_CLOSE_MS) + + request.signal.addEventListener('abort', () => { + cleanup('client aborted') + }) + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }, + }) +} diff --git a/app/api/realtime/solo/route.ts b/app/api/realtime/solo/route.ts index 06127ff..ba68b63 100644 --- a/app/api/realtime/solo/route.ts +++ b/app/api/realtime/solo/route.ts @@ -25,7 +25,10 @@ const HARD_CLOSE_MS = 240_000 interface NotifyPayload { op: 'I' | 'U' | 'D' - entity: 'task' | 'story' + // M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes- + // kanaal; we filteren die hieronder weg zodat solo-clients geen + // notification-events ontvangen waar ze niets mee doen. + entity: 'task' | 'story' | 'question' id: string story_id?: string product_id: string @@ -40,6 +43,9 @@ function shouldEmit( activeSprintId: string | null, userId: string, ): boolean { + // M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier. + if (payload.entity === 'question') return false + if (payload.product_id !== productId) return false // Sprint scope: alleen events binnen de actieve sprint (of zonder sprint