Scrum4Me/app/api/realtime/backlog/route.ts
janpeter visser 801da46f11 fix(realtime): force-destroy pg socket on cleanup timeout (SSE leak)
Three SSE-routes (solo, backlog, notifications) each create a long-
running pg.Client that LISTENs on scrum4me_changes. On abrupt close
(Fast Refresh, browser refresh, Vercel function recycle) the
pgClient.end()-await sometimes hangs silently, leaving the underlying
socket connected to Postgres. The connection stays in 'idle' on Neon's
side and after ~10-20 reconnects the connection-pool fills up — new
SSE connects fail with ERR_INCOMPLETE_CHUNKED_ENCODING in the browser.

Fix: shared `closePgClientSafely` helper that races client.end()
against a 2 s timeout; on timeout it force-destroys the underlying
socket so the OS releases the FD and Postgres notices the disconnect.

Validated by direct DB inspection: 18 stale 'idle LISTEN'-connections
were piled up before the fix; after manual pg_terminate_backend cleanup
the SSE-stream stabilised. This change makes the pile-up impossible
going forward.

- new lib/realtime/pg-client-cleanup.ts
- 3 routes use the helper instead of bare `await pgClient.end()`
- 3 unit tests for the helper (timely-end, hang-falls-back-to-destroy,
  end-rejection-is-swallowed)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 20:03:15 +02:00

130 lines
4.2 KiB
TypeScript

// SSE endpoint for the backlog 3-pane (PBI / story / task changes).
// Simpler than /api/realtime/solo — no sprint or user scoping, just product_id filter.
// Auth: iron-session cookie. Demo users may read (no 403 for demo).
import { NextRequest } from 'next/server'
import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { getAccessibleProduct } from '@/lib/product-access'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export const maxDuration = 300
const CHANNEL = 'scrum4me_changes'
const HEARTBEAT_MS = 25_000
const HARD_CLOSE_MS = 240_000
type NotifyPayload = Record<string, unknown>
function shouldEmit(payload: NotifyPayload, productId: string): boolean {
if ('type' in payload) return false // job / worker events — not relevant here
const entity = payload.entity as string | undefined
if (!entity || !['pbi', 'story', 'task'].includes(entity)) return false
return payload.product_id === productId
}
export async function GET(request: NextRequest) {
const session = await getSession()
if (!session.userId) {
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
}
const productId = request.nextUrl.searchParams.get('product_id')
if (!productId) {
return Response.json({ error: 'product_id is verplicht' }, { status: 400 })
}
const product = await getAccessibleProduct(productId, session.userId)
if (!product) {
return Response.json({ error: 'Geen toegang tot dit product' }, { status: 403 })
}
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
if (!directUrl) {
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
}
const encoder = new TextEncoder()
const pgClient = new Client({ connectionString: directUrl })
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
let closed = false
const stream = new ReadableStream({
async start(controller) {
const enqueue = (chunk: string) => {
if (closed) return
try {
controller.enqueue(encoder.encode(chunk))
} catch {
// stream already closed
}
}
const cleanup = async (reason: string) => {
if (closed) return
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
await closePgClientSafely(pgClient, 'realtime/backlog')
try { controller.close() } catch { /* already closed */ }
if (process.env.NODE_ENV !== 'production') {
console.log(`[realtime/backlog] closed: ${reason}`)
}
}
try {
await pgClient.connect()
await pgClient.query(`LISTEN ${CHANNEL}`)
} catch (err) {
console.error('[realtime/backlog] pg connect/listen failed:', err)
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
await cleanup('pg connect failed')
return
}
pgClient.on('notification', (msg) => {
if (!msg.payload) return
let payload: NotifyPayload
try {
payload = JSON.parse(msg.payload) as NotifyPayload
} catch {
return
}
if (!shouldEmit(payload, productId)) return
enqueue(`data: ${msg.payload}\n\n`)
})
pgClient.on('error', async (err) => {
console.error('[realtime/backlog] pg client error:', err)
await cleanup('pg error')
})
enqueue(`event: ready\ndata: ${JSON.stringify({ product_id: productId })}\n\n`)
heartbeatTimer = setInterval(() => {
enqueue(`: heartbeat\n\n`)
}, HEARTBEAT_MS)
hardCloseTimer = setTimeout(() => {
cleanup('hard close 240s')
}, HARD_CLOSE_MS)
request.signal.addEventListener('abort', () => {
cleanup('client aborted')
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
})
}