feat(ST-802): SSE route /api/realtime/solo for Solo Paneel updates
Node.js-runtime route handler die een dedicated pg-Client opent op DIRECT_URL en LISTEN't op het scrum4me_changes-kanaal. Per inkomende NOTIFY-payload filtert het server-side op product (uit query-param), sprint (active sprint van het product) en persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL voor stories voor de claim-lijst). Auth via iron-session cookie: - 401 als sessie ontbreekt - 400 als product_id query-param ontbreekt - 403 als de user geen toegang heeft tot het product - demo-tokens mogen lezen (geen write-tools op deze route) Stream-bouw: - text/event-stream met juiste headers (no-cache, no-transform, X-Accel-Buffering: no voor proxy-vrije buffering) - ready-event bij connect met product_id en active sprint_id - heartbeat (SSE-comment) elke 25s - hard-close na 240s als safety-net onder Vercel maxDuration; client herconnect via EventSource - cleanup op request.signal abort (tab dicht / refresh) Cleanup-pad sluit de pg-client en de stream-controller idempotent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
16ce4dd13d
commit
e6be578c28
1 changed files with 190 additions and 0 deletions
190
app/api/realtime/solo/route.ts
Normal file
190
app/api/realtime/solo/route.ts
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
// ST-802: Server-Sent Events stream voor het Solo Paneel.
|
||||
//
|
||||
// Opent een dedicated pg-verbinding (DIRECT_URL) en LISTEN't op het
|
||||
// `scrum4me_changes`-kanaal. Filtert events server-side op product
|
||||
// (uit query-param), sprint (actieve sprint van het product), en
|
||||
// persoonlijke relevantie (assignee_id == userId, of assignee_id IS NULL
|
||||
// voor stories — claim-lijst).
|
||||
//
|
||||
// Auth: iron-session cookie. Demo-tokens mogen lezen.
|
||||
// Output: text/event-stream met JSON-payloads + heartbeat-comments.
|
||||
// Sluit zelf na 240s als safety-net; client herconnect.
|
||||
|
||||
import { NextRequest } from 'next/server'
|
||||
import { Client } from 'pg'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { getAccessibleProduct } from '@/lib/product-access'
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const maxDuration = 300
|
||||
|
||||
const CHANNEL = 'scrum4me_changes'
|
||||
const HEARTBEAT_MS = 25_000
|
||||
const HARD_CLOSE_MS = 240_000
|
||||
|
||||
interface NotifyPayload {
|
||||
op: 'I' | 'U' | 'D'
|
||||
entity: 'task' | 'story'
|
||||
id: string
|
||||
story_id?: string
|
||||
product_id: string
|
||||
sprint_id: string | null
|
||||
assignee_id: string | null
|
||||
changed_fields?: string[]
|
||||
}
|
||||
|
||||
function shouldEmit(
|
||||
payload: NotifyPayload,
|
||||
productId: string,
|
||||
activeSprintId: string | null,
|
||||
userId: string,
|
||||
): boolean {
|
||||
if (payload.product_id !== productId) return false
|
||||
|
||||
// Sprint scope: alleen events binnen de actieve sprint (of zonder sprint
|
||||
// voor unassigned-story claims die in de claim-sheet horen).
|
||||
if (payload.entity === 'story' && payload.assignee_id === null) {
|
||||
// Unassigned story (claim-lijst) — toon altijd, ongeacht sprint
|
||||
return payload.sprint_id === activeSprintId || payload.sprint_id === null
|
||||
}
|
||||
if (payload.sprint_id !== activeSprintId) return false
|
||||
|
||||
// Persoonlijke relevantie
|
||||
return payload.assignee_id === userId
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const session = await getSession()
|
||||
if (!session.userId) {
|
||||
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
|
||||
}
|
||||
const userId = session.userId
|
||||
|
||||
const productId = request.nextUrl.searchParams.get('product_id')
|
||||
if (!productId) {
|
||||
return Response.json({ error: 'product_id is verplicht' }, { status: 400 })
|
||||
}
|
||||
|
||||
const product = await getAccessibleProduct(productId, userId)
|
||||
if (!product) {
|
||||
return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 })
|
||||
}
|
||||
|
||||
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
|
||||
if (!directUrl) {
|
||||
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const pgClient = new Client({ connectionString: directUrl })
|
||||
|
||||
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
||||
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let closed = false
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const enqueue = (chunk: string) => {
|
||||
if (closed) return
|
||||
try {
|
||||
controller.enqueue(encoder.encode(chunk))
|
||||
} catch {
|
||||
// Stream al gesloten — controller throwt, negeren
|
||||
}
|
||||
}
|
||||
|
||||
const cleanup = async (reason: string) => {
|
||||
if (closed) return
|
||||
closed = true
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||
try {
|
||||
await pgClient.end()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// already closed
|
||||
}
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
console.log(`[realtime/solo] closed: ${reason}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve actieve sprint éénmalig per connectie
|
||||
const sprint = await prisma_sprint_findActive(productId)
|
||||
const activeSprintId = sprint?.id ?? null
|
||||
|
||||
try {
|
||||
await pgClient.connect()
|
||||
await pgClient.query(`LISTEN ${CHANNEL}`)
|
||||
} catch (err) {
|
||||
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
|
||||
await cleanup('pg connect failed')
|
||||
return
|
||||
}
|
||||
|
||||
pgClient.on('notification', (msg) => {
|
||||
if (!msg.payload) return
|
||||
let payload: NotifyPayload
|
||||
try {
|
||||
payload = JSON.parse(msg.payload) as NotifyPayload
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
if (!shouldEmit(payload, productId, activeSprintId, userId)) return
|
||||
enqueue(`data: ${msg.payload}\n\n`)
|
||||
})
|
||||
|
||||
pgClient.on('error', async () => {
|
||||
await cleanup('pg error')
|
||||
})
|
||||
|
||||
// Stuur eerst een "ready"-event zodat de client weet dat de connectie staat
|
||||
enqueue(
|
||||
`event: ready\ndata: ${JSON.stringify({
|
||||
product_id: productId,
|
||||
sprint_id: activeSprintId,
|
||||
})}\n\n`,
|
||||
)
|
||||
|
||||
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
|
||||
heartbeatTimer = setInterval(() => {
|
||||
enqueue(`: heartbeat\n\n`)
|
||||
}, HEARTBEAT_MS)
|
||||
|
||||
// Hard-close safety: Vercel kapt na maxDuration; we sluiten zelf eerder
|
||||
hardCloseTimer = setTimeout(() => {
|
||||
cleanup('hard close 240s')
|
||||
}, HARD_CLOSE_MS)
|
||||
|
||||
// Client trekt de stekker (tab dicht, refresh, etc.)
|
||||
request.signal.addEventListener('abort', () => {
|
||||
cleanup('client aborted')
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream; charset=utf-8',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Lokaal helper — Prisma vermijden voor deze ene query om de pg-only flow
|
||||
// schoon te houden. Geeft de actieve sprint van een product, of null.
|
||||
async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
return prisma.sprint.findFirst({
|
||||
where: { product_id: productId, status: 'ACTIVE' },
|
||||
select: { id: true },
|
||||
orderBy: { created_at: 'desc' },
|
||||
})
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue