fix(realtime): force-destroy pg socket on cleanup timeout (SSE leak) (#44)

Three SSE-routes (solo, backlog, notifications) each create a long-
running pg.Client that LISTENs on scrum4me_changes. On abrupt close
(Fast Refresh, browser refresh, Vercel function recycle) the
pgClient.end()-await sometimes hangs silently, leaving the underlying
socket connected to Postgres. The connection stays in 'idle' on Neon's
side and after ~10-20 reconnects the connection-pool fills up — new
SSE connects fail with ERR_INCOMPLETE_CHUNKED_ENCODING in the browser.

Fix: shared `closePgClientSafely` helper that races client.end()
against a 2 s timeout; on timeout it force-destroys the underlying
socket so the OS releases the FD and Postgres notices the disconnect.

Validated by direct DB inspection: 18 stale 'idle LISTEN'-connections
were piled up before the fix; after manual pg_terminate_backend cleanup
the SSE-stream stabilised. This change makes the pile-up impossible
going forward.

- new lib/realtime/pg-client-cleanup.ts
- 3 routes use the helper instead of bare `await pgClient.end()`
- 3 unit tests for the helper (timely-end, hang-falls-back-to-destroy,
  end-rejection-is-swallowed)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Janpeter Visser 2026-05-01 20:04:22 +02:00 committed by GitHub
parent 070e1d9ea2
commit 6c6c8b96b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 127 additions and 11 deletions

View file

@ -0,0 +1,66 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import type { Client } from 'pg'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
function makeFakeClient(opts: {
endResolves?: Promise<void>
destroy?: ReturnType<typeof vi.fn>
}): Client {
const handlers = new Map<string, Array<(...args: unknown[]) => void>>()
const fake = {
end: vi.fn().mockReturnValue(opts.endResolves ?? Promise.resolve()),
on: vi.fn((event: string, fn: (...args: unknown[]) => void) => {
const list = handlers.get(event) ?? []
list.push(fn)
handlers.set(event, list)
return fake
}),
removeAllListeners: vi.fn((event: string) => {
handlers.delete(event)
return fake
}),
connection: {
stream: { destroy: opts.destroy ?? vi.fn() },
},
}
return fake as unknown as Client
}
describe('closePgClientSafely', () => {
beforeEach(() => {
vi.useRealTimers()
})
it('drops listeners and awaits client.end() when it resolves quickly', async () => {
const destroy = vi.fn()
const client = makeFakeClient({ destroy })
await closePgClientSafely(client, 'test')
expect(client.removeAllListeners).toHaveBeenCalledWith('notification')
expect(client.removeAllListeners).toHaveBeenCalledWith('error')
expect(client.end).toHaveBeenCalledOnce()
expect(destroy).not.toHaveBeenCalled() // ended in time
})
it('falls back to socket-destroy when client.end() hangs past the timeout', async () => {
const destroy = vi.fn()
// .end() never resolves
const client = makeFakeClient({ endResolves: new Promise(() => {}), destroy })
vi.useFakeTimers()
const promise = closePgClientSafely(client, 'test-hang')
await vi.advanceTimersByTimeAsync(2_001)
await promise
expect(destroy).toHaveBeenCalledOnce()
const arg = destroy.mock.calls[0][0]
expect(arg).toBeInstanceOf(Error)
})
it('does not throw when client.end() rejects', async () => {
const client = makeFakeClient({ endResolves: Promise.reject(new Error('boom')) })
await expect(closePgClientSafely(client, 'test-reject')).resolves.toBeUndefined()
})
})

View file

@ -6,6 +6,7 @@ import { NextRequest } from 'next/server'
import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { getAccessibleProduct } from '@/lib/product-access'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
@ -68,7 +69,7 @@ export async function GET(request: NextRequest) {
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
try { await pgClient.end() } catch { /* ignore */ }
await closePgClientSafely(pgClient, 'realtime/backlog')
try { controller.close() } catch { /* already closed */ }
if (process.env.NODE_ENV !== 'production') {
console.log(`[realtime/backlog] closed: ${reason}`)

View file

@ -16,6 +16,7 @@ import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { prisma } from '@/lib/prisma'
import { productAccessFilter } from '@/lib/product-access'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
@ -83,11 +84,7 @@ export async function GET(request: NextRequest) {
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
try {
await pgClient.end()
} catch {
// ignore
}
await closePgClientSafely(pgClient, 'realtime/notifications')
try {
controller.close()
} catch {

View file

@ -14,6 +14,7 @@ import { NextRequest } from 'next/server'
import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { getAccessibleProduct } from '@/lib/product-access'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
@ -145,11 +146,7 @@ export async function GET(request: NextRequest) {
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
try {
await pgClient.end()
} catch {
// ignore
}
await closePgClientSafely(pgClient, 'realtime/solo')
try {
controller.close()
} catch {

View file

@ -0,0 +1,55 @@
// Robust pg.Client cleanup for SSE-routes that hold a long-running LISTEN-
// connection. Without this helper, `pgClient.end()` can hang silently when
// the underlying socket is in a weird state (Fast Refresh, abrupt browser
// close, Vercel function recycle), leaving the connection in 'idle' on the
// Postgres server. After ~10-20 reconnects the Neon connection-pool fills
// up and new SSE-connections fail with ERR_INCOMPLETE_CHUNKED_ENCODING.
//
// Strategy: race `pgClient.end()` against a short timeout; if the timeout
// wins, force-destroy the underlying socket so the OS releases the FD and
// Neon notices the disconnect.
import type { Client } from 'pg'
const END_TIMEOUT_MS = 2_000
interface PgClientWithStream {
connection?: { stream?: { destroy?: (err?: Error) => void } }
}
export async function closePgClientSafely(
client: Client,
label: string,
): Promise<void> {
// Drop notification/error handlers so a late event from the dying
// connection cannot trigger downstream cleanup again.
client.removeAllListeners('notification')
client.removeAllListeners('error')
client.on('error', () => {
// Swallow: connection is being torn down on purpose.
})
let timer: ReturnType<typeof setTimeout> | null = null
const timeout = new Promise<'timeout'>((resolve) => {
timer = setTimeout(() => resolve('timeout'), END_TIMEOUT_MS)
})
const result = await Promise.race([
client.end().then(() => 'ended' as const),
timeout,
]).catch(() => 'error' as const)
if (timer) clearTimeout(timer)
if (result !== 'ended') {
if (process.env.NODE_ENV !== 'production') {
console.warn(`[${label}] pgClient.end() did not finish in ${END_TIMEOUT_MS}ms — forcing socket destroy`)
}
const stream = (client as unknown as PgClientWithStream).connection?.stream
try {
stream?.destroy?.(new Error(`forced socket destroy from ${label}`))
} catch {
// best-effort
}
}
}