From a2e262317dea3a1b9a59e7b2ca0c3627e54a24a2 Mon Sep 17 00:00:00 2001 From: Scrum4Me Agent <30029041+madhura68@users.noreply.github.com> Date: Thu, 7 May 2026 18:35:40 +0200 Subject: [PATCH] feat(PBI-59): SSE-route /api/realtime/jobs voor user-scoped job-events Co-Authored-By: Claude Sonnet 4.6 --- app/api/realtime/jobs/route.ts | 170 +++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 app/api/realtime/jobs/route.ts diff --git a/app/api/realtime/jobs/route.ts b/app/api/realtime/jobs/route.ts new file mode 100644 index 0000000..67edefd --- /dev/null +++ b/app/api/realtime/jobs/route.ts @@ -0,0 +1,170 @@ +import { NextRequest } from 'next/server' +import { Client } from 'pg' +import { getSession } from '@/lib/auth' +import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 300 + +const CHANNEL = 'scrum4me_changes' +const HEARTBEAT_MS = 25_000 +const HARD_CLOSE_MS = 240_000 + +type JobPayload = { + type: 'claude_job_enqueued' | 'claude_job_status' + job_id: string + task_id?: string | null + idea_id?: string | null + sprint_run_id?: string | null + kind?: string + user_id: string + status: string + branch?: string + pushed_at?: string + pr_url?: string + verify_result?: string + summary?: string + error?: string +} + +function shouldEmit(raw: unknown, userId: string): boolean { + if (!raw || typeof raw !== 'object') return false + const p = raw as Record + return 'type' in p && typeof p.user_id === 'string' && p.user_id === userId +} + +export async function GET(request: NextRequest) { + const session = await getSession() + if (!session.userId) { + return Response.json({ error: 'Niet ingelogd' }, { status: 401 }) + } + const userId = session.userId + + const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL + if (!directUrl) { + return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 }) + } + + const encoder = new TextEncoder() + const pgClient = new Client({ connectionString: directUrl }) + + let heartbeatTimer: ReturnType | null = null + let hardCloseTimer: ReturnType | null = null + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + const enqueue = (chunk: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(chunk)) + } catch { + // Stream al gesloten + } + } + + const cleanup = async (reason: string) => { + if (closed) return + closed = true + if (heartbeatTimer) clearInterval(heartbeatTimer) + if (hardCloseTimer) clearTimeout(hardCloseTimer) + await closePgClientSafely(pgClient, 'realtime/jobs') + try { + controller.close() + } catch { + // already closed + } + if (process.env.NODE_ENV !== 'production') { + console.log(`[realtime/jobs] closed: ${reason}`) + } + } + + try { + await pgClient.connect() + await pgClient.query(`LISTEN ${CHANNEL}`) + } catch (err) { + console.error('[realtime/jobs] pg connect/listen failed:', err) + enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`) + await cleanup('pg connect failed') + return + } + + pgClient.on('notification', (msg) => { + if (!msg.payload) return + let payload: unknown + try { + payload = JSON.parse(msg.payload) + } catch { + return + } + if (!shouldEmit(payload, userId)) return + enqueue(`data: ${msg.payload}\n\n`) + }) + + pgClient.on('error', async (err) => { + console.error('[realtime/jobs] pg client error:', err) + await cleanup('pg error') + }) + + enqueue(`event: ready\ndata: ${JSON.stringify({ user_id: userId })}\n\n`) + + const activeJobs = await prisma_jobs_findActive(userId) + if (activeJobs.length > 0) { + enqueue(`event: jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`) + } + + heartbeatTimer = setInterval(() => { + enqueue(`: heartbeat\n\n`) + }, HEARTBEAT_MS) + + hardCloseTimer = setTimeout(() => { + cleanup('hard close 240s') + }, HARD_CLOSE_MS) + + request.signal.addEventListener('abort', () => { + cleanup('client aborted') + }) + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }, + }) +} + +async function prisma_jobs_findActive(userId: string): Promise { + const { prisma } = await import('@/lib/prisma') + const jobs = await prisma.claudeJob.findMany({ + where: { user_id: userId, status: { notIn: ['DONE'] } }, + select: { + id: true, + kind: true, + status: true, + task_id: true, + idea_id: true, + sprint_run_id: true, + branch: true, + error: true, + summary: true, + }, + }) + return jobs.map(j => ({ + type: 'claude_job_status' as const, + job_id: j.id, + kind: j.kind, + user_id: userId, + status: j.status, + task_id: j.task_id, + idea_id: j.idea_id, + sprint_run_id: j.sprint_run_id, + branch: j.branch ?? undefined, + error: j.error ?? undefined, + summary: j.summary ?? undefined, + })) +}