feat(ST-1115): SSE backlog realtime — endpoint, hook, hydration mount, tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
070be76c05
commit
6052aa81fb
5 changed files with 358 additions and 1 deletions
131
__tests__/api/backlog-realtime.test.ts
Normal file
131
__tests__/api/backlog-realtime.test.ts
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
import { describe, it, expect, vi, beforeEach } from 'vitest'
|
||||
|
||||
const { mockGetSession } = vi.hoisted(() => ({ mockGetSession: vi.fn() }))
|
||||
|
||||
vi.mock('@/lib/auth', () => ({ getSession: mockGetSession }))
|
||||
vi.mock('@/lib/product-access', () => ({
|
||||
getAccessibleProduct: vi.fn(),
|
||||
}))
|
||||
|
||||
import { getAccessibleProduct } from '@/lib/product-access'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { GET } from '@/app/api/realtime/backlog/route'
|
||||
import { useBacklogStore } from '@/stores/backlog-store'
|
||||
|
||||
const mockGetAccessibleProduct = getAccessibleProduct as ReturnType<typeof vi.fn>
|
||||
|
||||
function makeReq(productId?: string): NextRequest {
|
||||
const url = productId
|
||||
? `http://localhost/api/realtime/backlog?product_id=${productId}`
|
||||
: 'http://localhost/api/realtime/backlog'
|
||||
return {
|
||||
signal: new AbortController().signal,
|
||||
nextUrl: new URL(url),
|
||||
} as unknown as NextRequest
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
describe('GET /api/realtime/backlog', () => {
|
||||
it('401 when not authenticated', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: undefined, isDemo: false })
|
||||
const res = await GET(makeReq('prod-1'))
|
||||
expect(res.status).toBe(401)
|
||||
expect(mockGetAccessibleProduct).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('400 when product_id is missing', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: 'user-1', isDemo: false })
|
||||
const res = await GET(makeReq())
|
||||
expect(res.status).toBe(400)
|
||||
})
|
||||
|
||||
it('403 when user has no access to the product', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: 'user-1', isDemo: false })
|
||||
mockGetAccessibleProduct.mockResolvedValue(null)
|
||||
const res = await GET(makeReq('prod-1'))
|
||||
expect(res.status).toBe(403)
|
||||
expect(mockGetAccessibleProduct).toHaveBeenCalledWith('prod-1', 'user-1')
|
||||
})
|
||||
|
||||
it('500 when DIRECT_URL and DATABASE_URL are absent', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: 'user-1', isDemo: false })
|
||||
mockGetAccessibleProduct.mockResolvedValue({ id: 'prod-1' })
|
||||
|
||||
const before = { DIRECT_URL: process.env.DIRECT_URL, DATABASE_URL: process.env.DATABASE_URL }
|
||||
delete process.env.DIRECT_URL
|
||||
delete process.env.DATABASE_URL
|
||||
try {
|
||||
const res = await GET(makeReq('prod-1'))
|
||||
expect(res.status).toBe(500)
|
||||
} finally {
|
||||
if (before.DIRECT_URL !== undefined) process.env.DIRECT_URL = before.DIRECT_URL
|
||||
if (before.DATABASE_URL !== undefined) process.env.DATABASE_URL = before.DATABASE_URL
|
||||
}
|
||||
})
|
||||
|
||||
it('demo user is allowed (no 403) when product is accessible', async () => {
|
||||
mockGetSession.mockResolvedValue({ userId: 'demo-user', isDemo: true })
|
||||
mockGetAccessibleProduct.mockResolvedValue({ id: 'prod-1' })
|
||||
|
||||
const before = { DIRECT_URL: process.env.DIRECT_URL, DATABASE_URL: process.env.DATABASE_URL }
|
||||
delete process.env.DIRECT_URL
|
||||
delete process.env.DATABASE_URL
|
||||
try {
|
||||
const res = await GET(makeReq('prod-1'))
|
||||
// Fails at 500 (no DB URL) — not 403, confirming demo user is not blocked
|
||||
expect(res.status).toBe(500)
|
||||
} finally {
|
||||
if (before.DIRECT_URL !== undefined) process.env.DIRECT_URL = before.DIRECT_URL
|
||||
if (before.DATABASE_URL !== undefined) process.env.DATABASE_URL = before.DATABASE_URL
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// shouldEmit scope filter — white-box unit tests
|
||||
describe('shouldEmit scope filter (via backlog-store reducer)', () => {
|
||||
it('applyChange: pbi INSERT adds to pbis array', () => {
|
||||
useBacklogStore.setState({ pbis: [], storiesByPbi: {}, tasksByStory: {} })
|
||||
const pbi = { id: 'pbi-1', code: 'PBI-1', title: 'Test', priority: 2, created_at: new Date(), status: 'open' }
|
||||
useBacklogStore.getState().applyChange('pbi', 'I', pbi)
|
||||
expect(useBacklogStore.getState().pbis).toHaveLength(1)
|
||||
expect(useBacklogStore.getState().pbis[0].id).toBe('pbi-1')
|
||||
})
|
||||
|
||||
it('applyChange: pbi UPDATE patches existing pbi', () => {
|
||||
const pbi = { id: 'pbi-1', code: 'PBI-1', title: 'Old', priority: 2, created_at: new Date(), status: 'open' }
|
||||
useBacklogStore.setState({ pbis: [pbi], storiesByPbi: {}, tasksByStory: {} })
|
||||
useBacklogStore.getState().applyChange('pbi', 'U', { id: 'pbi-1', title: 'New' })
|
||||
expect(useBacklogStore.getState().pbis[0].title).toBe('New')
|
||||
})
|
||||
|
||||
it('applyChange: pbi DELETE removes pbi', () => {
|
||||
const pbi = { id: 'pbi-1', code: 'PBI-1', title: 'Test', priority: 2, created_at: new Date(), status: 'open' }
|
||||
useBacklogStore.setState({ pbis: [pbi], storiesByPbi: {}, tasksByStory: {} })
|
||||
useBacklogStore.getState().applyChange('pbi', 'D', { id: 'pbi-1' })
|
||||
expect(useBacklogStore.getState().pbis).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('applyChange: story INSERT adds to storiesByPbi', () => {
|
||||
useBacklogStore.setState({ pbis: [], storiesByPbi: { 'pbi-1': [] }, tasksByStory: {} })
|
||||
const story = { id: 'story-1', code: 'ST-1', title: 'S', description: null, acceptance_criteria: null, priority: 2, status: 'OPEN', pbi_id: 'pbi-1', created_at: new Date() }
|
||||
useBacklogStore.getState().applyChange('story', 'I', story)
|
||||
expect(useBacklogStore.getState().storiesByPbi['pbi-1']).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('applyChange: story DELETE removes from correct pbi bucket', () => {
|
||||
const story = { id: 'story-1', code: 'ST-1', title: 'S', description: null, acceptance_criteria: null, priority: 2, status: 'OPEN', pbi_id: 'pbi-1', created_at: new Date() }
|
||||
useBacklogStore.setState({ pbis: [], storiesByPbi: { 'pbi-1': [story] }, tasksByStory: {} })
|
||||
useBacklogStore.getState().applyChange('story', 'D', { id: 'story-1' })
|
||||
expect(useBacklogStore.getState().storiesByPbi['pbi-1']).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('applyChange: task UPDATE patches task across story buckets', () => {
|
||||
const task = { id: 'task-1', title: 'Old', description: null, priority: 2, status: 'TO_DO', sort_order: 1, story_id: 'story-1', created_at: new Date() }
|
||||
useBacklogStore.setState({ pbis: [], storiesByPbi: {}, tasksByStory: { 'story-1': [task] } })
|
||||
useBacklogStore.getState().applyChange('task', 'U', { id: 'task-1', status: 'IN_PROGRESS' })
|
||||
expect(useBacklogStore.getState().tasksByStory['story-1'][0].status).toBe('IN_PROGRESS')
|
||||
})
|
||||
})
|
||||
|
|
@ -123,6 +123,7 @@ export default async function ProductBacklogPage({ params, searchParams }: Props
|
|||
{/* Split pane */}
|
||||
<div className="flex-1 overflow-hidden">
|
||||
<BacklogHydrationWrapper
|
||||
productId={id}
|
||||
initialData={{
|
||||
pbis: pbis.map((p) => ({ id: p.id, code: p.code, title: p.title, priority: p.priority, description: p.description, created_at: p.created_at, status: pbiStatusToApi(p.status) })),
|
||||
storiesByPbi,
|
||||
|
|
|
|||
129
app/api/realtime/backlog/route.ts
Normal file
129
app/api/realtime/backlog/route.ts
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
// SSE endpoint for the backlog 3-pane (PBI / story / task changes).
|
||||
// Simpler than /api/realtime/solo — no sprint or user scoping, just product_id filter.
|
||||
// Auth: iron-session cookie. Demo users may read (no 403 for demo).
|
||||
|
||||
import { NextRequest } from 'next/server'
|
||||
import { Client } from 'pg'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { getAccessibleProduct } from '@/lib/product-access'
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const maxDuration = 300
|
||||
|
||||
const CHANNEL = 'scrum4me_changes'
|
||||
const HEARTBEAT_MS = 25_000
|
||||
const HARD_CLOSE_MS = 240_000
|
||||
|
||||
type NotifyPayload = Record<string, unknown>
|
||||
|
||||
function shouldEmit(payload: NotifyPayload, productId: string): boolean {
|
||||
if ('type' in payload) return false // job / worker events — not relevant here
|
||||
const entity = payload.entity as string | undefined
|
||||
if (!entity || !['pbi', 'story', 'task'].includes(entity)) return false
|
||||
return payload.product_id === productId
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const session = await getSession()
|
||||
if (!session.userId) {
|
||||
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
|
||||
}
|
||||
|
||||
const productId = request.nextUrl.searchParams.get('product_id')
|
||||
if (!productId) {
|
||||
return Response.json({ error: 'product_id is verplicht' }, { status: 400 })
|
||||
}
|
||||
|
||||
const product = await getAccessibleProduct(productId, session.userId)
|
||||
if (!product) {
|
||||
return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 })
|
||||
}
|
||||
|
||||
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
|
||||
if (!directUrl) {
|
||||
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const pgClient = new Client({ connectionString: directUrl })
|
||||
|
||||
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
||||
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let closed = false
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const enqueue = (chunk: string) => {
|
||||
if (closed) return
|
||||
try {
|
||||
controller.enqueue(encoder.encode(chunk))
|
||||
} catch {
|
||||
// stream already closed
|
||||
}
|
||||
}
|
||||
|
||||
const cleanup = async (reason: string) => {
|
||||
if (closed) return
|
||||
closed = true
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||
try { await pgClient.end() } catch { /* ignore */ }
|
||||
try { controller.close() } catch { /* already closed */ }
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
console.log(`[realtime/backlog] closed: ${reason}`)
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await pgClient.connect()
|
||||
await pgClient.query(`LISTEN ${CHANNEL}`)
|
||||
} catch (err) {
|
||||
console.error('[realtime/backlog] pg connect/listen failed:', err)
|
||||
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
|
||||
await cleanup('pg connect failed')
|
||||
return
|
||||
}
|
||||
|
||||
pgClient.on('notification', (msg) => {
|
||||
if (!msg.payload) return
|
||||
let payload: NotifyPayload
|
||||
try {
|
||||
payload = JSON.parse(msg.payload) as NotifyPayload
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
if (!shouldEmit(payload, productId)) return
|
||||
enqueue(`data: ${msg.payload}\n\n`)
|
||||
})
|
||||
|
||||
pgClient.on('error', async (err) => {
|
||||
console.error('[realtime/backlog] pg client error:', err)
|
||||
await cleanup('pg error')
|
||||
})
|
||||
|
||||
enqueue(`event: ready\ndata: ${JSON.stringify({ product_id: productId })}\n\n`)
|
||||
|
||||
heartbeatTimer = setInterval(() => {
|
||||
enqueue(`: heartbeat\n\n`)
|
||||
}, HEARTBEAT_MS)
|
||||
|
||||
hardCloseTimer = setTimeout(() => {
|
||||
cleanup('hard close 240s')
|
||||
}, HARD_CLOSE_MS)
|
||||
|
||||
request.signal.addEventListener('abort', () => {
|
||||
cleanup('client aborted')
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream; charset=utf-8',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import { useEffect } from 'react'
|
||||
import { useBacklogStore, type BacklogPbi, type BacklogStory, type BacklogTask } from '@/stores/backlog-store'
|
||||
import { useBacklogRealtime } from '@/lib/realtime/use-backlog-realtime'
|
||||
|
||||
interface InitialData {
|
||||
pbis: BacklogPbi[]
|
||||
|
|
@ -11,10 +12,11 @@ interface InitialData {
|
|||
|
||||
interface BacklogHydrationWrapperProps {
|
||||
initialData: InitialData
|
||||
productId: string
|
||||
children: React.ReactNode
|
||||
}
|
||||
|
||||
export function BacklogHydrationWrapper({ initialData, children }: BacklogHydrationWrapperProps) {
|
||||
export function BacklogHydrationWrapper({ initialData, productId, children }: BacklogHydrationWrapperProps) {
|
||||
const setInitialData = useBacklogStore((s) => s.setInitialData)
|
||||
|
||||
useEffect(() => {
|
||||
|
|
@ -22,5 +24,7 @@ export function BacklogHydrationWrapper({ initialData, children }: BacklogHydrat
|
|||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [])
|
||||
|
||||
useBacklogRealtime(productId)
|
||||
|
||||
return <>{children}</>
|
||||
}
|
||||
|
|
|
|||
92
lib/realtime/use-backlog-realtime.ts
Normal file
92
lib/realtime/use-backlog-realtime.ts
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
'use client'
|
||||
|
||||
// ST-1115: Client hook for the backlog 3-pane SSE stream.
|
||||
// Mounts in BacklogHydrationWrapper so it survives Server Action refreshes.
|
||||
// Dispatches pbi/story/task change events into useBacklogStore.applyChange.
|
||||
|
||||
import { useEffect, useRef } from 'react'
|
||||
import { useBacklogStore } from '@/stores/backlog-store'
|
||||
|
||||
const BACKOFF_START_MS = 1_000
|
||||
const BACKOFF_MAX_MS = 30_000
|
||||
|
||||
type EntityPayload = {
|
||||
op: 'I' | 'U' | 'D'
|
||||
entity: 'pbi' | 'story' | 'task'
|
||||
[key: string]: unknown
|
||||
}
|
||||
|
||||
export function useBacklogRealtime(productId: string | null) {
|
||||
const sourceRef = useRef<EventSource | null>(null)
|
||||
const backoffRef = useRef<number>(BACKOFF_START_MS)
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
|
||||
|
||||
useEffect(() => {
|
||||
if (!productId) return
|
||||
|
||||
const close = () => {
|
||||
if (sourceRef.current) {
|
||||
sourceRef.current.close()
|
||||
sourceRef.current = null
|
||||
}
|
||||
if (reconnectTimerRef.current) {
|
||||
clearTimeout(reconnectTimerRef.current)
|
||||
reconnectTimerRef.current = null
|
||||
}
|
||||
}
|
||||
|
||||
const connect = () => {
|
||||
close()
|
||||
const source = new EventSource(
|
||||
`/api/realtime/backlog?product_id=${encodeURIComponent(productId)}`,
|
||||
)
|
||||
sourceRef.current = source
|
||||
|
||||
source.addEventListener('ready', () => {
|
||||
backoffRef.current = BACKOFF_START_MS
|
||||
})
|
||||
|
||||
source.onmessage = (e) => {
|
||||
if (!e.data) return
|
||||
try {
|
||||
const payload = JSON.parse(e.data) as EntityPayload
|
||||
useBacklogStore
|
||||
.getState()
|
||||
.applyChange(payload.entity, payload.op, payload as Record<string, unknown>)
|
||||
} catch (err) {
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
console.error('[realtime/backlog] failed to parse event', err, e.data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
source.onerror = () => {
|
||||
if (sourceRef.current !== source) return
|
||||
close()
|
||||
if (document.visibilityState === 'hidden') return
|
||||
const delay = backoffRef.current
|
||||
backoffRef.current = Math.min(backoffRef.current * 2, BACKOFF_MAX_MS)
|
||||
reconnectTimerRef.current = setTimeout(connect, delay)
|
||||
}
|
||||
}
|
||||
|
||||
const onVisibility = () => {
|
||||
if (document.visibilityState === 'hidden') {
|
||||
close()
|
||||
} else if (sourceRef.current === null) {
|
||||
backoffRef.current = BACKOFF_START_MS
|
||||
connect()
|
||||
}
|
||||
}
|
||||
|
||||
if (document.visibilityState === 'visible') {
|
||||
connect()
|
||||
}
|
||||
document.addEventListener('visibilitychange', onVisibility)
|
||||
|
||||
return () => {
|
||||
document.removeEventListener('visibilitychange', onVisibility)
|
||||
close()
|
||||
}
|
||||
}, [productId])
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue