feat(ST-1104): add user-scoped /api/realtime/notifications + filter solo-route
Twee delen:
1. Solo-route filter (1-regel-fix in app/api/realtime/solo/route.ts):
- NotifyPayload uitgebreid met entity:'question'
- shouldEmit returnt direct false bij entity='question'
Voorkomt dat solo-clients M11 question-events ontvangen (geen lekkage naar
het Solo-bord; geen onnodig netwerk-verkeer; loose coupling tussen features).
2. Nieuwe SSE-route app/api/realtime/notifications/route.ts:
- User-scoped (geen ?product_id=); query alle accessible product-IDs één keer
bij connect via productAccessFilter
- LISTEN scrum4me_changes; filter entity='question' && product_id ∈ accessible
- Initial-state-event NA LISTEN actief (race-fix conform M10 ST-1004):
query open vragen voor deze user's accessible products, stuur als event:state
met summary (id, story_code/title, assignee_id, question, options, expires_at)
- Hergebruikt het pg.Client + ReadableStream + heartbeat 25s + hard-close 240s +
abort-cleanup-pattern uit solo-route
Tests __tests__/api/notifications-stream.test.ts:
- 401 zonder iron-session cookie (en geen DB-call)
- Solo-route filter wordt visueel/E2E gedekt in ST-1108-acceptatie
Quality gates: lint 0 errors, tsc clean, vitest 146/146 (18 files).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c642c29b58
commit
009375a131
3 changed files with 252 additions and 1 deletions
51
__tests__/api/notifications-stream.test.ts
Normal file
51
__tests__/api/notifications-stream.test.ts
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import { describe, it, expect, vi, beforeEach } from 'vitest'
|
||||
|
||||
const { mockGetSession } = vi.hoisted(() => ({ mockGetSession: vi.fn() }))
|
||||
|
||||
vi.mock('@/lib/auth', () => ({
|
||||
getSession: mockGetSession,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/prisma', () => ({
|
||||
prisma: {
|
||||
product: { findMany: vi.fn() },
|
||||
claudeQuestion: { findMany: vi.fn() },
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/product-access', () => ({
|
||||
productAccessFilter: vi.fn().mockReturnValue({}),
|
||||
getAccessibleProduct: vi.fn(),
|
||||
}))
|
||||
|
||||
import { prisma } from '@/lib/prisma'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { GET } from '@/app/api/realtime/notifications/route'
|
||||
|
||||
const mockPrisma = prisma as unknown as {
|
||||
product: { findMany: ReturnType<typeof vi.fn> }
|
||||
claudeQuestion: { findMany: ReturnType<typeof vi.fn> }
|
||||
}
|
||||
|
||||
function makeReq(): NextRequest {
|
||||
// Minimaal NextRequest-shape voor de auth-pad — we komen niet bij de
|
||||
// pg-stream-setup omdat de auth-fail vóór dat punt gebeurt.
|
||||
return { signal: new AbortController().signal } as unknown as NextRequest
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
describe('GET /api/realtime/notifications', () => {
|
||||
it('401 zonder iron-session cookie, geen DB-call', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: undefined, isDemo: false })
|
||||
const res = await GET(makeReq())
|
||||
expect(res.status).toBe(401)
|
||||
expect(mockPrisma.product.findMany).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
// Solo-route filter (entity='question' uitgesloten) is een 1-regel-fix in
|
||||
// app/api/realtime/solo/route.ts. Visueel reviewbaar in de diff; full-stream-
|
||||
// regressie wordt handmatig gedekt in ST-1108-acceptatie.
|
||||
194
app/api/realtime/notifications/route.ts
Normal file
194
app/api/realtime/notifications/route.ts
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
// ST-1104: User-scoped Server-Sent Events stream voor de notificatie-bel (M11).
|
||||
//
|
||||
// Wordt door <NotificationsBridge /> in app/(app)/layout.tsx gemount zodra de
|
||||
// gebruiker is ingelogd. In tegenstelling tot /api/realtime/solo (product-
|
||||
// scoped, voor één Solo-bord) is deze stream **user-scoped**: hij filtert
|
||||
// op alle producten waar de ingelogde user toegang toe heeft, zodat de bell
|
||||
// flikkert ongeacht op welke pagina je staat.
|
||||
//
|
||||
// Auth: iron-session cookie. Demo-tokens mogen lezen.
|
||||
// Output: text/event-stream — `event:state` met initial open-questions list,
|
||||
// daarna `data:` events bij elke status-overgang in scrum4me_changes.
|
||||
// Sluit zelf na 240s als safety-net; client herconnect.
|
||||
|
||||
import { NextRequest } from 'next/server'
|
||||
import { Client } from 'pg'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { prisma } from '@/lib/prisma'
|
||||
import { productAccessFilter } from '@/lib/product-access'
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const maxDuration = 300
|
||||
|
||||
const CHANNEL = 'scrum4me_changes'
|
||||
const HEARTBEAT_MS = 25_000
|
||||
const HARD_CLOSE_MS = 240_000
|
||||
|
||||
interface NotifyPayload {
|
||||
op: 'I' | 'U'
|
||||
entity: 'task' | 'story' | 'question'
|
||||
id: string
|
||||
product_id: string
|
||||
story_id?: string
|
||||
task_id?: string | null
|
||||
assignee_id?: string | null
|
||||
status?: string
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const session = await getSession()
|
||||
if (!session.userId) {
|
||||
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
|
||||
}
|
||||
const userId = session.userId
|
||||
|
||||
// Haal alle accessible product-IDs één keer op — gebruikt voor SSE-filter en
|
||||
// voor de initial-state query. Geen real-time refresh als de user halverwege
|
||||
// toegang krijgt of verliest; reconnect lost dat op (frontend doet dat al).
|
||||
const products = await prisma.product.findMany({
|
||||
where: { archived: false, ...productAccessFilter(userId) },
|
||||
select: { id: true },
|
||||
})
|
||||
const accessibleProductIds = new Set(products.map((p) => p.id))
|
||||
|
||||
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
|
||||
if (!directUrl) {
|
||||
return Response.json(
|
||||
{ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' },
|
||||
{ status: 500 },
|
||||
)
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const pgClient = new Client({ connectionString: directUrl })
|
||||
|
||||
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
||||
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let closed = false
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const enqueue = (chunk: string) => {
|
||||
if (closed) return
|
||||
try {
|
||||
controller.enqueue(encoder.encode(chunk))
|
||||
} catch {
|
||||
// controller al gesloten — negeren
|
||||
}
|
||||
}
|
||||
|
||||
const cleanup = async (reason: string) => {
|
||||
if (closed) return
|
||||
closed = true
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||
try {
|
||||
await pgClient.end()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// already closed
|
||||
}
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
console.log(`[realtime/notifications] closed: ${reason}`)
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await pgClient.connect()
|
||||
await pgClient.query(`LISTEN ${CHANNEL}`)
|
||||
} catch (err) {
|
||||
console.error('[realtime/notifications] pg connect/listen failed:', err)
|
||||
enqueue(
|
||||
`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`,
|
||||
)
|
||||
await cleanup('pg connect failed')
|
||||
return
|
||||
}
|
||||
|
||||
pgClient.on('notification', (msg) => {
|
||||
if (!msg.payload) return
|
||||
let payload: NotifyPayload
|
||||
try {
|
||||
payload = JSON.parse(msg.payload) as NotifyPayload
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
if (payload.entity !== 'question') return
|
||||
if (!accessibleProductIds.has(payload.product_id)) return
|
||||
enqueue(`data: ${msg.payload}\n\n`)
|
||||
})
|
||||
|
||||
pgClient.on('error', (err) => {
|
||||
console.error('[realtime/notifications] pg client error:', err)
|
||||
cleanup('pg error')
|
||||
})
|
||||
|
||||
// Initial state ná LISTEN actief — race-fix conform M10 ST-1004 / ST-1006.
|
||||
// Voorkomt dat een vraag die net vóór SSE-open landt verloren gaat.
|
||||
const openQuestions = await prisma.claudeQuestion.findMany({
|
||||
where: {
|
||||
status: 'open',
|
||||
expires_at: { gt: new Date() },
|
||||
product_id: { in: products.map((p) => p.id) },
|
||||
},
|
||||
orderBy: { created_at: 'desc' },
|
||||
take: 100,
|
||||
select: {
|
||||
id: true,
|
||||
product_id: true,
|
||||
story_id: true,
|
||||
task_id: true,
|
||||
question: true,
|
||||
options: true,
|
||||
created_at: true,
|
||||
expires_at: true,
|
||||
story: { select: { code: true, title: true, assignee_id: true } },
|
||||
},
|
||||
})
|
||||
|
||||
enqueue(
|
||||
`event: state\ndata: ${JSON.stringify({
|
||||
questions: openQuestions.map((q) => ({
|
||||
id: q.id,
|
||||
product_id: q.product_id,
|
||||
story_id: q.story_id,
|
||||
task_id: q.task_id,
|
||||
story_code: q.story.code,
|
||||
story_title: q.story.title,
|
||||
assignee_id: q.story.assignee_id,
|
||||
question: q.question,
|
||||
options: q.options,
|
||||
created_at: q.created_at.toISOString(),
|
||||
expires_at: q.expires_at.toISOString(),
|
||||
})),
|
||||
})}\n\n`,
|
||||
)
|
||||
|
||||
heartbeatTimer = setInterval(() => {
|
||||
enqueue(`: heartbeat\n\n`)
|
||||
}, HEARTBEAT_MS)
|
||||
|
||||
hardCloseTimer = setTimeout(() => {
|
||||
cleanup('hard close 240s')
|
||||
}, HARD_CLOSE_MS)
|
||||
|
||||
request.signal.addEventListener('abort', () => {
|
||||
cleanup('client aborted')
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream; charset=utf-8',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
@ -25,7 +25,10 @@ const HARD_CLOSE_MS = 240_000
|
|||
|
||||
interface NotifyPayload {
|
||||
op: 'I' | 'U' | 'D'
|
||||
entity: 'task' | 'story'
|
||||
// M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes-
|
||||
// kanaal; we filteren die hieronder weg zodat solo-clients geen
|
||||
// notification-events ontvangen waar ze niets mee doen.
|
||||
entity: 'task' | 'story' | 'question'
|
||||
id: string
|
||||
story_id?: string
|
||||
product_id: string
|
||||
|
|
@ -40,6 +43,9 @@ function shouldEmit(
|
|||
activeSprintId: string | null,
|
||||
userId: string,
|
||||
): boolean {
|
||||
// M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier.
|
||||
if (payload.entity === 'question') return false
|
||||
|
||||
if (payload.product_id !== productId) return false
|
||||
|
||||
// Sprint scope: alleen events binnen de actieve sprint (of zonder sprint
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue