diff --git a/__tests__/lib/realtime/pg-client-cleanup.test.ts b/__tests__/lib/realtime/pg-client-cleanup.test.ts new file mode 100644 index 0000000..099032b --- /dev/null +++ b/__tests__/lib/realtime/pg-client-cleanup.test.ts @@ -0,0 +1,66 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import type { Client } from 'pg' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' + +function makeFakeClient(opts: { + endResolves?: Promise + destroy?: ReturnType +}): Client { + const handlers = new Map void>>() + const fake = { + end: vi.fn().mockReturnValue(opts.endResolves ?? Promise.resolve()), + on: vi.fn((event: string, fn: (...args: unknown[]) => void) => { + const list = handlers.get(event) ?? [] + list.push(fn) + handlers.set(event, list) + return fake + }), + removeAllListeners: vi.fn((event: string) => { + handlers.delete(event) + return fake + }), + connection: { + stream: { destroy: opts.destroy ?? vi.fn() }, + }, + } + return fake as unknown as Client +} + +describe('closePgClientSafely', () => { + beforeEach(() => { + vi.useRealTimers() + }) + + it('drops listeners and awaits client.end() when it resolves quickly', async () => { + const destroy = vi.fn() + const client = makeFakeClient({ destroy }) + + await closePgClientSafely(client, 'test') + + expect(client.removeAllListeners).toHaveBeenCalledWith('notification') + expect(client.removeAllListeners).toHaveBeenCalledWith('error') + expect(client.end).toHaveBeenCalledOnce() + expect(destroy).not.toHaveBeenCalled() // ended in time + }) + + it('falls back to socket-destroy when client.end() hangs past the timeout', async () => { + const destroy = vi.fn() + // .end() never resolves + const client = makeFakeClient({ endResolves: new Promise(() => {}), destroy }) + + vi.useFakeTimers() + const promise = closePgClientSafely(client, 'test-hang') + await vi.advanceTimersByTimeAsync(2_001) + await promise + + expect(destroy).toHaveBeenCalledOnce() + const arg = destroy.mock.calls[0][0] + expect(arg).toBeInstanceOf(Error) + }) + + it('does not throw when client.end() rejects', async () => { + const client = makeFakeClient({ endResolves: Promise.reject(new Error('boom')) }) + + await expect(closePgClientSafely(client, 'test-reject')).resolves.toBeUndefined() + }) +}) diff --git a/app/api/realtime/backlog/route.ts b/app/api/realtime/backlog/route.ts index 1736710..dfbd835 100644 --- a/app/api/realtime/backlog/route.ts +++ b/app/api/realtime/backlog/route.ts @@ -6,6 +6,7 @@ import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { getAccessibleProduct } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -68,7 +69,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { await pgClient.end() } catch { /* ignore */ } + await closePgClientSafely(pgClient, 'realtime/backlog') try { controller.close() } catch { /* already closed */ } if (process.env.NODE_ENV !== 'production') { console.log(`[realtime/backlog] closed: ${reason}`) diff --git a/app/api/realtime/notifications/route.ts b/app/api/realtime/notifications/route.ts index f31c6d5..907898a 100644 --- a/app/api/realtime/notifications/route.ts +++ b/app/api/realtime/notifications/route.ts @@ -16,6 +16,7 @@ import { Client } from 'pg' import { getSession } from '@/lib/auth' import { prisma } from '@/lib/prisma' import { productAccessFilter } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -83,11 +84,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { - await pgClient.end() - } catch { - // ignore - } + await closePgClientSafely(pgClient, 'realtime/notifications') try { controller.close() } catch { diff --git a/app/api/realtime/solo/route.ts b/app/api/realtime/solo/route.ts index 112e0cc..0553cf6 100644 --- a/app/api/realtime/solo/route.ts +++ b/app/api/realtime/solo/route.ts @@ -14,6 +14,7 @@ import { NextRequest } from 'next/server' import { Client } from 'pg' import { getSession } from '@/lib/auth' import { getAccessibleProduct } from '@/lib/product-access' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -145,11 +146,7 @@ export async function GET(request: NextRequest) { closed = true if (heartbeatTimer) clearInterval(heartbeatTimer) if (hardCloseTimer) clearTimeout(hardCloseTimer) - try { - await pgClient.end() - } catch { - // ignore - } + await closePgClientSafely(pgClient, 'realtime/solo') try { controller.close() } catch { diff --git a/lib/realtime/pg-client-cleanup.ts b/lib/realtime/pg-client-cleanup.ts new file mode 100644 index 0000000..6021320 --- /dev/null +++ b/lib/realtime/pg-client-cleanup.ts @@ -0,0 +1,55 @@ +// Robust pg.Client cleanup for SSE-routes that hold a long-running LISTEN- +// connection. Without this helper, `pgClient.end()` can hang silently when +// the underlying socket is in a weird state (Fast Refresh, abrupt browser +// close, Vercel function recycle), leaving the connection in 'idle' on the +// Postgres server. After ~10-20 reconnects the Neon connection-pool fills +// up and new SSE-connections fail with ERR_INCOMPLETE_CHUNKED_ENCODING. +// +// Strategy: race `pgClient.end()` against a short timeout; if the timeout +// wins, force-destroy the underlying socket so the OS releases the FD and +// Neon notices the disconnect. + +import type { Client } from 'pg' + +const END_TIMEOUT_MS = 2_000 + +interface PgClientWithStream { + connection?: { stream?: { destroy?: (err?: Error) => void } } +} + +export async function closePgClientSafely( + client: Client, + label: string, +): Promise { + // Drop notification/error handlers so a late event from the dying + // connection cannot trigger downstream cleanup again. + client.removeAllListeners('notification') + client.removeAllListeners('error') + client.on('error', () => { + // Swallow: connection is being torn down on purpose. + }) + + let timer: ReturnType | null = null + const timeout = new Promise<'timeout'>((resolve) => { + timer = setTimeout(() => resolve('timeout'), END_TIMEOUT_MS) + }) + + const result = await Promise.race([ + client.end().then(() => 'ended' as const), + timeout, + ]).catch(() => 'error' as const) + + if (timer) clearTimeout(timer) + + if (result !== 'ended') { + if (process.env.NODE_ENV !== 'production') { + console.warn(`[${label}] pgClient.end() did not finish in ${END_TIMEOUT_MS}ms — forcing socket destroy`) + } + const stream = (client as unknown as PgClientWithStream).connection?.stream + try { + stream?.destroy?.(new Error(`forced socket destroy from ${label}`)) + } catch { + // best-effort + } + } +}