Scrum4Me/app/api/realtime/user-settings/route.ts
Madhura68 eda131dbde feat(PBI-76): SSE route for user-settings
User-scoped /api/realtime/user-settings stream that filters
scrum4me_changes notifications on kind=user_settings and matching
userId. Forwards the patch as a data: event so other tabs can
applyServerPatch without re-fetching settings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 11:38:11 +02:00

146 lines
4.3 KiB
TypeScript

// PBI-76: User-scoped SSE stream voor user-settings cross-tab/cross-device sync.
//
// Wordt door <UserSettingsBridge /> in app/(app)/layout.tsx geopend zodra de
// gebruiker is ingelogd. Filtert pg_notify-payloads op
// `kind === 'user_settings' && userId === session.userId`. Settings worden
// via prop al gehydrateerd; deze route levert alleen incrementele patches.
//
// Auth: iron-session cookie. Demo-tokens openen geen subscription (bridge
// skipt voor isDemo).
// Output: text/event-stream — `data:` met de patch (Partial<UserSettings>).
// Sluit zelf na 240s als safety-net; client herconnect.
import { NextRequest } from 'next/server'
import { Client } from 'pg'
import { getSession } from '@/lib/auth'
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export const maxDuration = 300
const CHANNEL = 'scrum4me_changes'
const HEARTBEAT_MS = 25_000
const HARD_CLOSE_MS = 240_000
interface UserSettingsPayload {
kind: 'user_settings'
userId: string
patch: Record<string, unknown>
}
function isUserSettingsPayload(p: unknown): p is UserSettingsPayload {
if (typeof p !== 'object' || p === null) return false
const obj = p as Record<string, unknown>
return (
obj.kind === 'user_settings' &&
typeof obj.userId === 'string' &&
typeof obj.patch === 'object' &&
obj.patch !== null
)
}
export async function GET(request: NextRequest) {
const session = await getSession()
if (!session.userId) {
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
}
const userId = session.userId
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
if (!directUrl) {
return Response.json(
{ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' },
{ status: 500 },
)
}
const encoder = new TextEncoder()
const pgClient = new Client({ connectionString: directUrl })
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
let closed = false
const stream = new ReadableStream({
async start(controller) {
const enqueue = (chunk: string) => {
if (closed) return
try {
controller.enqueue(encoder.encode(chunk))
} catch {
// controller already closed
}
}
const cleanup = async (reason: string) => {
if (closed) return
closed = true
if (heartbeatTimer) clearInterval(heartbeatTimer)
if (hardCloseTimer) clearTimeout(hardCloseTimer)
await closePgClientSafely(pgClient, 'realtime/user-settings')
try {
controller.close()
} catch {
// already closed
}
if (process.env.NODE_ENV !== 'production') {
console.log(`[realtime/user-settings] closed: ${reason}`)
}
}
try {
await pgClient.connect()
await pgClient.query(`LISTEN ${CHANNEL}`)
} catch (err) {
console.error('[realtime/user-settings] pg connect/listen failed:', err)
enqueue(
`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`,
)
await cleanup('pg connect failed')
return
}
pgClient.on('notification', (msg) => {
if (!msg.payload) return
let payload: unknown
try {
payload = JSON.parse(msg.payload)
} catch {
return
}
if (!isUserSettingsPayload(payload)) return
if (payload.userId !== userId) return
enqueue(`data: ${JSON.stringify(payload.patch)}\n\n`)
})
pgClient.on('error', (err) => {
console.error('[realtime/user-settings] pg client error:', err)
cleanup('pg error')
})
enqueue(`: connected\n\n`)
heartbeatTimer = setInterval(() => {
enqueue(`: heartbeat\n\n`)
}, HEARTBEAT_MS)
hardCloseTimer = setTimeout(() => {
cleanup('hard close 240s')
}, HARD_CLOSE_MS)
request.signal.addEventListener('abort', () => {
cleanup('client aborted')
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
})
}