Scrum4Me/app/api/realtime/solo/route.ts
Madhura68 847fc84faf fix(M8): make SSE-stream survive Solo Paneel mutations
Symptoom op feat/ST-801-realtime-triggers initial implementation:
elke task-update sloot de open SSE-stream af en triggerde een
herverbinding met backoff. In de tussentijd gemiste events.

Oorzaak: Server Actions in App Router doen een impliciete
route-tree refresh die client components remount; daarmee killt
React de useEffect die de EventSource beheert.

Fix in twee delen:

1. Hef de realtime-hook op naar de (app)-layout via een nieuwe
   `SoloRealtimeBridge`-component. Layouts overleven Server-
   Action-refreshes beter dan pages, en de bridge leest het
   product-id uit de URL via usePathname. Connection-status
   (status, showConnectingIndicator) gaat naar de solo-store
   zodat SoloBoard 'm uit een gedeelde plek kan lezen.

2. Vervang updateTaskStatusAction en updateTaskPlanAction in de
   Solo-componenten door fetch naar de bestaande Route Handler
   `PATCH /api/tasks/[id]`. Route Handlers triggeren geen
   page-refresh, dus de SSE-stream blijft staan. lib/api-auth.ts
   accepteert nu naast Bearer-tokens ook iron-session cookies
   zodat browser-fetches zonder token werken.

Bijkomend: actions/tasks.ts laat /solo bewust niet meer
revalideren (wordt nu via realtime gedekt). Sprint/planning blijft
wel revalidaten — geen realtime daar.

Toegevoegd:
- components/solo/realtime-bridge.tsx — mount in (app) layout
- scripts/realtime-mutate.ts — handige test-helper voor externe
  mutaties (alsof MCP/REST schrijft) tijdens acceptance

Debug-logs in app/api/realtime/solo/route.ts staan nog aan voor
ST-806 acceptance; worden later gestript.

Bekend issue: Chrome op localhost (HTTP/1.1) cycle't EventSource
om de paar seconden vanwege de 6-connectie-limiet en retry-
heuristiek. Safari werkt stabiel. Productie op Vercel (HTTP/2
multiplexing) zou beide browsers stabiel moeten houden — Vercel
preview test is volgende stap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 04:35:12 +02:00

209 lines
6.7 KiB
TypeScript

// ST-802: Server-Sent Events stream voor het Solo Paneel.
//
// Opent een dedicated pg-verbinding (DIRECT_URL) en LISTEN't op het
// `scrum4me_changes`-kanaal. Filtert events server-side op product
// (uit query-param), sprint (actieve sprint van het product), en
// persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL
// voor stories — claim-lijst).
//
// Auth: iron-session cookie. Demo-tokens mogen lezen.
// Output: text/event-stream met JSON-payloads + heartbeat-comments.
// Sluit zelf na 240s als safety-net; client herconnect.
import { NextRequest } from 'next/server'
import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { getAccessibleProduct } from '@/lib/product-access'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export const maxDuration = 300
const CHANNEL = 'scrum4me_changes'
const HEARTBEAT_MS = 25_000
const HARD_CLOSE_MS = 240_000
interface NotifyPayload {
op: 'I' | 'U' | 'D'
entity: 'task' | 'story'
id: string
story_id?: string
product_id: string
sprint_id: string | null
assignee_id: string | null
changed_fields?: string[]
}
function shouldEmit(
payload: NotifyPayload,
productId: string,
activeSprintId: string | null,
userId: string,
): boolean {
if (payload.product_id !== productId) return false
// Sprint scope: alleen events binnen de actieve sprint (of zonder sprint
// voor unassigned-story claims die in de claim-sheet horen).
if (payload.entity === 'story' && payload.assignee_id === null) {
// Unassigned story (claim-lijst) — toon altijd, ongeacht sprint
return payload.sprint_id === activeSprintId || payload.sprint_id === null
}
if (payload.sprint_id !== activeSprintId) return false
// Persoonlijke relevantie
return payload.assignee_id === userId
}
export async function GET(request: NextRequest) {
const session = await getSession()
if (!session.userId) {
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
}
const userId = session.userId
const productId = request.nextUrl.searchParams.get('product_id')
if (!productId) {
return Response.json({ error: 'product_id is verplicht' }, { status: 400 })
}
const product = await getAccessibleProduct(productId, userId)
if (!product) {
return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 })
}
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
if (!directUrl) {
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
}
const encoder = new TextEncoder()
const pgClient = new Client({ connectionString: directUrl })
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
let closed = false
const stream = new ReadableStream({
async start(controller) {
const enqueue = (chunk: string) => {
if (closed) return
try {
controller.enqueue(encoder.encode(chunk))
} catch {
// Stream al gesloten — controller throwt, negeren
}
}
const cleanup = async (reason: string) => {
if (closed) return
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
try {
await pgClient.end()
} catch {
// ignore
}
try {
controller.close()
} catch {
// already closed
}
if (process.env.NODE_ENV !== 'production') {
console.log(`[realtime/solo] closed: ${reason}`)
}
}
// Resolve actieve sprint éénmalig per connectie
const sprint = await prisma_sprint_findActive(productId)
const activeSprintId = sprint?.id ?? null
const isPooled = directUrl.includes('pooler.')
if (process.env.NODE_ENV !== 'production') {
console.log(
`[realtime/solo] connecting (${isPooled ? 'POOLED — LISTEN may not work!' : 'direct'})`,
)
}
try {
await pgClient.connect()
await pgClient.query(`LISTEN ${CHANNEL}`)
if (process.env.NODE_ENV !== 'production') {
console.log(`[realtime/solo] LISTEN ${CHANNEL} ready`)
}
} catch (err) {
if (process.env.NODE_ENV !== 'production') {
console.error('[realtime/solo] pg connect/listen failed:', err)
}
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
await cleanup('pg connect failed')
return
}
pgClient.on('notification', (msg) => {
if (!msg.payload) return
let payload: NotifyPayload
try {
payload = JSON.parse(msg.payload) as NotifyPayload
} catch {
return
}
const emit = shouldEmit(payload, productId, activeSprintId, userId)
if (process.env.NODE_ENV !== 'production') {
console.log(
`[realtime/solo] NOTIFY ${payload.entity}:${payload.id} ${payload.op}${emit ? 'EMIT' : 'skip'} (sprint=${payload.sprint_id} assignee=${payload.assignee_id} user=${userId})`,
)
}
if (!emit) return
enqueue(`data: ${msg.payload}\n\n`)
})
pgClient.on('error', async () => {
await cleanup('pg error')
})
// Stuur eerst een "ready"-event zodat de client weet dat de connectie staat
enqueue(
`event: ready\ndata: ${JSON.stringify({
product_id: productId,
sprint_id: activeSprintId,
})}\n\n`,
)
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
heartbeatTimer = setInterval(() => {
enqueue(`: heartbeat\n\n`)
}, HEARTBEAT_MS)
// Hard-close safety: Vercel kapt na maxDuration; we sluiten zelf eerder
hardCloseTimer = setTimeout(() => {
cleanup('hard close 240s')
}, HARD_CLOSE_MS)
// Client trekt de stekker (tab dicht, refresh, etc.)
request.signal.addEventListener('abort', () => {
cleanup('client aborted')
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
})
}
// Lokaal helper — Prisma vermijden voor deze ene query om de pg-only flow
// schoon te houden. Geeft de actieve sprint van een product, of null.
async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> {
const { prisma } = await import('@/lib/prisma')
return prisma.sprint.findFirst({
where: { product_id: productId, status: 'ACTIVE' },
select: { id: true },
orderBy: { created_at: 'desc' },
})
}