Compare commits
1 commit
main
...
fix/sse-pg
| Author | SHA1 | Date | |
|---|---|---|---|
| 801da46f11 |
5 changed files with 127 additions and 11 deletions
66
__tests__/lib/realtime/pg-client-cleanup.test.ts
Normal file
66
__tests__/lib/realtime/pg-client-cleanup.test.ts
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
import { describe, it, expect, vi, beforeEach } from 'vitest'
|
||||||
|
import type { Client } from 'pg'
|
||||||
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
||||||
|
|
||||||
|
function makeFakeClient(opts: {
|
||||||
|
endResolves?: Promise<void>
|
||||||
|
destroy?: ReturnType<typeof vi.fn>
|
||||||
|
}): Client {
|
||||||
|
const handlers = new Map<string, Array<(...args: unknown[]) => void>>()
|
||||||
|
const fake = {
|
||||||
|
end: vi.fn().mockReturnValue(opts.endResolves ?? Promise.resolve()),
|
||||||
|
on: vi.fn((event: string, fn: (...args: unknown[]) => void) => {
|
||||||
|
const list = handlers.get(event) ?? []
|
||||||
|
list.push(fn)
|
||||||
|
handlers.set(event, list)
|
||||||
|
return fake
|
||||||
|
}),
|
||||||
|
removeAllListeners: vi.fn((event: string) => {
|
||||||
|
handlers.delete(event)
|
||||||
|
return fake
|
||||||
|
}),
|
||||||
|
connection: {
|
||||||
|
stream: { destroy: opts.destroy ?? vi.fn() },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fake as unknown as Client
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('closePgClientSafely', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useRealTimers()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('drops listeners and awaits client.end() when it resolves quickly', async () => {
|
||||||
|
const destroy = vi.fn()
|
||||||
|
const client = makeFakeClient({ destroy })
|
||||||
|
|
||||||
|
await closePgClientSafely(client, 'test')
|
||||||
|
|
||||||
|
expect(client.removeAllListeners).toHaveBeenCalledWith('notification')
|
||||||
|
expect(client.removeAllListeners).toHaveBeenCalledWith('error')
|
||||||
|
expect(client.end).toHaveBeenCalledOnce()
|
||||||
|
expect(destroy).not.toHaveBeenCalled() // ended in time
|
||||||
|
})
|
||||||
|
|
||||||
|
it('falls back to socket-destroy when client.end() hangs past the timeout', async () => {
|
||||||
|
const destroy = vi.fn()
|
||||||
|
// .end() never resolves
|
||||||
|
const client = makeFakeClient({ endResolves: new Promise(() => {}), destroy })
|
||||||
|
|
||||||
|
vi.useFakeTimers()
|
||||||
|
const promise = closePgClientSafely(client, 'test-hang')
|
||||||
|
await vi.advanceTimersByTimeAsync(2_001)
|
||||||
|
await promise
|
||||||
|
|
||||||
|
expect(destroy).toHaveBeenCalledOnce()
|
||||||
|
const arg = destroy.mock.calls[0][0]
|
||||||
|
expect(arg).toBeInstanceOf(Error)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does not throw when client.end() rejects', async () => {
|
||||||
|
const client = makeFakeClient({ endResolves: Promise.reject(new Error('boom')) })
|
||||||
|
|
||||||
|
await expect(closePgClientSafely(client, 'test-reject')).resolves.toBeUndefined()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
@ -6,6 +6,7 @@ import { NextRequest } from 'next/server'
|
||||||
import { Client } from 'pg'
|
import { Client } from 'pg'
|
||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
import { getAccessibleProduct } from '@/lib/product-access'
|
import { getAccessibleProduct } from '@/lib/product-access'
|
||||||
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
||||||
|
|
||||||
export const runtime = 'nodejs'
|
export const runtime = 'nodejs'
|
||||||
export const dynamic = 'force-dynamic'
|
export const dynamic = 'force-dynamic'
|
||||||
|
|
@ -68,7 +69,7 @@ export async function GET(request: NextRequest) {
|
||||||
closed = true
|
closed = true
|
||||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||||
try { await pgClient.end() } catch { /* ignore */ }
|
await closePgClientSafely(pgClient, 'realtime/backlog')
|
||||||
try { controller.close() } catch { /* already closed */ }
|
try { controller.close() } catch { /* already closed */ }
|
||||||
if (process.env.NODE_ENV !== 'production') {
|
if (process.env.NODE_ENV !== 'production') {
|
||||||
console.log(`[realtime/backlog] closed: ${reason}`)
|
console.log(`[realtime/backlog] closed: ${reason}`)
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import { Client } from 'pg'
|
||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
import { prisma } from '@/lib/prisma'
|
import { prisma } from '@/lib/prisma'
|
||||||
import { productAccessFilter } from '@/lib/product-access'
|
import { productAccessFilter } from '@/lib/product-access'
|
||||||
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
||||||
|
|
||||||
export const runtime = 'nodejs'
|
export const runtime = 'nodejs'
|
||||||
export const dynamic = 'force-dynamic'
|
export const dynamic = 'force-dynamic'
|
||||||
|
|
@ -83,11 +84,7 @@ export async function GET(request: NextRequest) {
|
||||||
closed = true
|
closed = true
|
||||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||||
try {
|
await closePgClientSafely(pgClient, 'realtime/notifications')
|
||||||
await pgClient.end()
|
|
||||||
} catch {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
controller.close()
|
controller.close()
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import { NextRequest } from 'next/server'
|
||||||
import { Client } from 'pg'
|
import { Client } from 'pg'
|
||||||
import { getSession } from '@/lib/auth'
|
import { getSession } from '@/lib/auth'
|
||||||
import { getAccessibleProduct } from '@/lib/product-access'
|
import { getAccessibleProduct } from '@/lib/product-access'
|
||||||
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
||||||
|
|
||||||
export const runtime = 'nodejs'
|
export const runtime = 'nodejs'
|
||||||
export const dynamic = 'force-dynamic'
|
export const dynamic = 'force-dynamic'
|
||||||
|
|
@ -145,11 +146,7 @@ export async function GET(request: NextRequest) {
|
||||||
closed = true
|
closed = true
|
||||||
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
||||||
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
||||||
try {
|
await closePgClientSafely(pgClient, 'realtime/solo')
|
||||||
await pgClient.end()
|
|
||||||
} catch {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
controller.close()
|
controller.close()
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
||||||
55
lib/realtime/pg-client-cleanup.ts
Normal file
55
lib/realtime/pg-client-cleanup.ts
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
// Robust pg.Client cleanup for SSE-routes that hold a long-running LISTEN-
|
||||||
|
// connection. Without this helper, `pgClient.end()` can hang silently when
|
||||||
|
// the underlying socket is in a weird state (Fast Refresh, abrupt browser
|
||||||
|
// close, Vercel function recycle), leaving the connection in 'idle' on the
|
||||||
|
// Postgres server. After ~10-20 reconnects the Neon connection-pool fills
|
||||||
|
// up and new SSE-connections fail with ERR_INCOMPLETE_CHUNKED_ENCODING.
|
||||||
|
//
|
||||||
|
// Strategy: race `pgClient.end()` against a short timeout; if the timeout
|
||||||
|
// wins, force-destroy the underlying socket so the OS releases the FD and
|
||||||
|
// Neon notices the disconnect.
|
||||||
|
|
||||||
|
import type { Client } from 'pg'
|
||||||
|
|
||||||
|
const END_TIMEOUT_MS = 2_000
|
||||||
|
|
||||||
|
interface PgClientWithStream {
|
||||||
|
connection?: { stream?: { destroy?: (err?: Error) => void } }
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function closePgClientSafely(
|
||||||
|
client: Client,
|
||||||
|
label: string,
|
||||||
|
): Promise<void> {
|
||||||
|
// Drop notification/error handlers so a late event from the dying
|
||||||
|
// connection cannot trigger downstream cleanup again.
|
||||||
|
client.removeAllListeners('notification')
|
||||||
|
client.removeAllListeners('error')
|
||||||
|
client.on('error', () => {
|
||||||
|
// Swallow: connection is being torn down on purpose.
|
||||||
|
})
|
||||||
|
|
||||||
|
let timer: ReturnType<typeof setTimeout> | null = null
|
||||||
|
const timeout = new Promise<'timeout'>((resolve) => {
|
||||||
|
timer = setTimeout(() => resolve('timeout'), END_TIMEOUT_MS)
|
||||||
|
})
|
||||||
|
|
||||||
|
const result = await Promise.race([
|
||||||
|
client.end().then(() => 'ended' as const),
|
||||||
|
timeout,
|
||||||
|
]).catch(() => 'error' as const)
|
||||||
|
|
||||||
|
if (timer) clearTimeout(timer)
|
||||||
|
|
||||||
|
if (result !== 'ended') {
|
||||||
|
if (process.env.NODE_ENV !== 'production') {
|
||||||
|
console.warn(`[${label}] pgClient.end() did not finish in ${END_TIMEOUT_MS}ms — forcing socket destroy`)
|
||||||
|
}
|
||||||
|
const stream = (client as unknown as PgClientWithStream).connection?.stream
|
||||||
|
try {
|
||||||
|
stream?.destroy?.(new Error(`forced socket destroy from ${label}`))
|
||||||
|
} catch {
|
||||||
|
// best-effort
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue