From 801da46f11cb1f81a4548f06d0ff8e9a9c1a7c25 Mon Sep 17 00:00:00 2001 From: janpeter visser Date: Fri, 1 May 2026 20:03:15 +0200 Subject: [PATCH] fix(realtime): force-destroy pg socket on cleanup timeout (SSE leak) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three SSE-routes (solo, backlog, notifications) each create a long- running pg.Client that LISTENs on scrum4me_changes. On abrupt close (Fast Refresh, browser refresh, Vercel function recycle) the pgClient.end()-await sometimes hangs silently, leaving the underlying socket connected to Postgres. The connection stays in 'idle' on Neon's side and after ~10-20 reconnects the connection-pool fills up — new SSE connects fail with ERR_INCOMPLETE_CHUNKED_ENCODING in the browser. Fix: shared `closePgClientSafely` helper that races client.end() against a 2 s timeout; on timeout it force-destroys the underlying socket so the OS releases the FD and Postgres notices the disconnect. Validated by direct DB inspection: 18 stale 'idle LISTEN'-connections were piled up before the fix; after manual pg_terminate_backend cleanup the SSE-stream stabilised. This change makes the pile-up impossible going forward. - new lib/realtime/pg-client-cleanup.ts - 3 routes use the helper instead of bare `await pgClient.end()` - 3 unit tests for the helper (timely-end, hang-falls-back-to-destroy, end-rejection-is-swallowed) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/realtime/pg-client-cleanup.test.ts | 66 +++++++++++++++++++ app/api/realtime/backlog/route.ts | 3 +- app/api/realtime/notifications/route.ts | 7 +- app/api/realtime/solo/route.ts | 7 +- lib/realtime/pg-client-cleanup.ts | 55 ++++++++++++++++ 5 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 __tests__/lib/realtime/pg-client-cleanup.test.ts create mode 100644 lib/realtime/pg-client-cleanup.ts diff --git a/__tests__/lib/realtime/pg-client-cleanup.test.ts b/__tests__/lib/realtime/pg-client-cleanup.test.ts new file mode 100644 index 0000000..099032b --- /dev/null +++ b/__tests__/lib/realtime/pg-client-cleanup.test.ts @@ -0,0 +1,66 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import type { Client } from 'pg' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' + +function makeFakeClient(opts: { + endResolves?: Promise + destroy?: ReturnType +}): Client { + const handlers = new Map void>>() + const fake = { + end: vi.fn().mockReturnValue(opts.endResolves ?? Promise.resolve()), + on: vi.fn((event: string, fn: (...args: unknown[]) => void) => { + const list = handlers.get(event) ?? [] + list.push(fn) + handlers.set(event, list) + return fake + }), + removeAllListeners: vi.fn((event: string) => { + handlers.delete(event) + return fake + }), + connection: { + stream: { destroy: opts.destroy ?? vi.fn() }, + }, + } + return fake as unknown as Client +} + +describe('closePgClientSafely', () => { + beforeEach(() => { + vi.useRealTimers() + }) + + it('drops listeners and awaits client.end() when it resolves quickly', async () => { + const destroy = vi.fn() + const client = makeFakeClient({ destroy }) + + await closePgClientSafely(client, 'test') + + expect(client.removeAllListeners).toHaveBeenCalledWith('notification') + expect(client.removeAllListeners).toHaveBeenCalledWith('error') + expect(client.end).toHaveBeenCalledOnce() + expect(destroy).not.toHaveBeenCalled() // ended in time + }) + + it('falls back to socket-destroy when client.end() hangs past the timeout', async () => { + const destroy = vi.fn() + // .end() never resolves + const client = makeFakeClient({ endResolves: new Promise(() => {}), destroy }) + + vi.useFakeTimers() + const promise = closePgClientSafely(client, 'test-hang') + await vi.advanceTimersByTimeAsync(2_001) + await promise + + expect(destroy).toHaveBeenCalledOnce() + const arg = destroy.mock.calls[0][0] + expect(arg).toBeInstanceOf(Error) + }) + + it('does not throw when client.end() rejects', async () => { + const client = makeFakeClient({ endResolves: Promise.reject(new Error('boom')) }) + + await expect(closePgClientSafely(client, 'test-reject')).resolves.toBeUndefined() + }) +}) diff --git a/app/api/realtime/backlog/route.ts b/app/api/realtime/backlog/route.ts index 1736710..dfbd835 100644 --- a/app/api/realtime/backlog/route.ts +++ b/app/api/realtime/backlog/route.ts @@ -6,6 +6,7 @@ import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { getAccessibleProduct } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -68,7 +69,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { await pgClient.end() } catch { /* ignore */ } + await closePgClientSafely(pgClient, 'realtime/backlog') try { controller.close() } catch { /* already closed */ } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/backlog] closed: ${reason}`) diff --git a/app/api/realtime/notifications/route.ts b/app/api/realtime/notifications/route.ts index f31c6d5..907898a 100644 --- a/app/api/realtime/notifications/route.ts +++ b/app/api/realtime/notifications/route.ts @@ -16,6 +16,7 @@ import { Client } from 'pg' import { getSession } from '@/lib/auth' import { prisma } from '@/lib/prisma' import { productAccessFilter } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -83,11 +84,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { - await pgClient.end() - } catch { - // ignore - } + await closePgClientSafely(pgClient, 'realtime/notifications') try { controller.close() } catch { diff --git a/app/api/realtime/solo/route.ts b/app/api/realtime/solo/route.ts index 112e0cc..0553cf6 100644 --- a/app/api/realtime/solo/route.ts +++ b/app/api/realtime/solo/route.ts @@ -14,6 +14,7 @@ import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { getAccessibleProduct } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -145,11 +146,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { - await pgClient.end() - } catch { - // ignore - } + await closePgClientSafely(pgClient, 'realtime/solo') try { controller.close() } catch { diff --git a/lib/realtime/pg-client-cleanup.ts b/lib/realtime/pg-client-cleanup.ts new file mode 100644 index 0000000..6021320 --- /dev/null +++ b/lib/realtime/pg-client-cleanup.ts @@ -0,0 +1,55 @@ +// Robust pg.Client cleanup for SSE-routes that hold a long-running LISTEN- +// connection. Without this helper, `pgClient.end()` can hang silently when +// the underlying socket is in a weird state (Fast Refresh, abrupt browser +// close, Vercel function recycle), leaving the connection in 'idle' on the +// Postgres server. After ~10-20 reconnects the Neon connection-pool fills +// up and new SSE-connections fail with ERR_INCOMPLETE_CHUNKED_ENCODING. +// +// Strategy: race `pgClient.end()` against a short timeout; if the timeout +// wins, force-destroy the underlying socket so the OS releases the FD and +// Neon notices the disconnect. + +import type { Client } from 'pg' + +const END_TIMEOUT_MS = 2_000 + +interface PgClientWithStream { + connection?: { stream?: { destroy?: (err?: Error) => void } } +} + +export async function closePgClientSafely( + client: Client, + label: string, +): Promise { + // Drop notification/error handlers so a late event from the dying + // connection cannot trigger downstream cleanup again. + client.removeAllListeners('notification') + client.removeAllListeners('error') + client.on('error', () => { + // Swallow: connection is being torn down on purpose. + }) + + let timer: ReturnType | null = null + const timeout = new Promise<'timeout'>((resolve) => { + timer = setTimeout(() => resolve('timeout'), END_TIMEOUT_MS) + }) + + const result = await Promise.race([ + client.end().then(() => 'ended' as const), + timeout, + ]).catch(() => 'error' as const) + + if (timer) clearTimeout(timer) + + if (result !== 'ended') { + if (process.env.NODE_ENV !== 'production') { + console.warn(`[${label}] pgClient.end() did not finish in ${END_TIMEOUT_MS}ms — forcing socket destroy`) + } + const stream = (client as unknown as PgClientWithStream).connection?.stream + try { + stream?.destroy?.(new Error(`forced socket destroy from ${label}`)) + } catch { + // best-effort + } + } +}