* feat(PBI-58): Vitest-tests voor SoloTaskCard veldmapping en 4-regels layout Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(PBI-59): server action fetchJobsPageData voor jobs-pagina Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(PBI-59): SSE-route /api/realtime/jobs voor user-scoped job-events Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(PBI-59): JobCard component voor jobs-pagina Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(PBI-59): JobDetailPane component voor jobs-pagina Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(PBI-59): API route GET /api/jobs/[id]/sub-tasks voor sprint task executions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
170 lines
4.7 KiB
TypeScript
170 lines
4.7 KiB
TypeScript
import { NextRequest } from 'next/server'
|
|
import { Client } from 'pg'
|
|
import { getSession } from '@/lib/auth'
|
|
import { closePgClientSafely } from '@/lib/realtime/pg-client-cleanup'
|
|
|
|
export const runtime = 'nodejs'
|
|
export const dynamic = 'force-dynamic'
|
|
export const maxDuration = 300
|
|
|
|
const CHANNEL = 'scrum4me_changes'
|
|
const HEARTBEAT_MS = 25_000
|
|
const HARD_CLOSE_MS = 240_000
|
|
|
|
type JobPayload = {
|
|
type: 'claude_job_enqueued' | 'claude_job_status'
|
|
job_id: string
|
|
task_id?: string | null
|
|
idea_id?: string | null
|
|
sprint_run_id?: string | null
|
|
kind?: string
|
|
user_id: string
|
|
status: string
|
|
branch?: string
|
|
pushed_at?: string
|
|
pr_url?: string
|
|
verify_result?: string
|
|
summary?: string
|
|
error?: string
|
|
}
|
|
|
|
function shouldEmit(raw: unknown, userId: string): boolean {
|
|
if (!raw || typeof raw !== 'object') return false
|
|
const p = raw as Record<string, unknown>
|
|
return 'type' in p && typeof p.user_id === 'string' && p.user_id === userId
|
|
}
|
|
|
|
export async function GET(request: NextRequest) {
|
|
const session = await getSession()
|
|
if (!session.userId) {
|
|
return Response.json({ error: 'Niet ingelogd' }, { status: 401 })
|
|
}
|
|
const userId = session.userId
|
|
|
|
const directUrl = process.env.DIRECT_URL ?? process.env.DATABASE_URL
|
|
if (!directUrl) {
|
|
return Response.json({ error: 'DIRECT_URL/DATABASE_URL niet geconfigureerd' }, { status: 500 })
|
|
}
|
|
|
|
const encoder = new TextEncoder()
|
|
const pgClient = new Client({ connectionString: directUrl })
|
|
|
|
let heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
|
let hardCloseTimer: ReturnType<typeof setTimeout> | null = null
|
|
let closed = false
|
|
|
|
const stream = new ReadableStream({
|
|
async start(controller) {
|
|
const enqueue = (chunk: string) => {
|
|
if (closed) return
|
|
try {
|
|
controller.enqueue(encoder.encode(chunk))
|
|
} catch {
|
|
// Stream al gesloten
|
|
}
|
|
}
|
|
|
|
const cleanup = async (reason: string) => {
|
|
if (closed) return
|
|
closed = true
|
|
if (heartbeatTimer) clearInterval(heartbeatTimer)
|
|
if (hardCloseTimer) clearTimeout(hardCloseTimer)
|
|
await closePgClientSafely(pgClient, 'realtime/jobs')
|
|
try {
|
|
controller.close()
|
|
} catch {
|
|
// already closed
|
|
}
|
|
if (process.env.NODE_ENV !== 'production') {
|
|
console.log(`[realtime/jobs] closed: ${reason}`)
|
|
}
|
|
}
|
|
|
|
try {
|
|
await pgClient.connect()
|
|
await pgClient.query(`LISTEN ${CHANNEL}`)
|
|
} catch (err) {
|
|
console.error('[realtime/jobs] pg connect/listen failed:', err)
|
|
enqueue(`event: error\ndata: ${JSON.stringify({ message: 'pg connect failed' })}\n\n`)
|
|
await cleanup('pg connect failed')
|
|
return
|
|
}
|
|
|
|
pgClient.on('notification', (msg) => {
|
|
if (!msg.payload) return
|
|
let payload: unknown
|
|
try {
|
|
payload = JSON.parse(msg.payload)
|
|
} catch {
|
|
return
|
|
}
|
|
if (!shouldEmit(payload, userId)) return
|
|
enqueue(`data: ${msg.payload}\n\n`)
|
|
})
|
|
|
|
pgClient.on('error', async (err) => {
|
|
console.error('[realtime/jobs] pg client error:', err)
|
|
await cleanup('pg error')
|
|
})
|
|
|
|
enqueue(`event: ready\ndata: ${JSON.stringify({ user_id: userId })}\n\n`)
|
|
|
|
const activeJobs = await prisma_jobs_findActive(userId)
|
|
if (activeJobs.length > 0) {
|
|
enqueue(`event: jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`)
|
|
}
|
|
|
|
heartbeatTimer = setInterval(() => {
|
|
enqueue(`: heartbeat\n\n`)
|
|
}, HEARTBEAT_MS)
|
|
|
|
hardCloseTimer = setTimeout(() => {
|
|
cleanup('hard close 240s')
|
|
}, HARD_CLOSE_MS)
|
|
|
|
request.signal.addEventListener('abort', () => {
|
|
cleanup('client aborted')
|
|
})
|
|
},
|
|
})
|
|
|
|
return new Response(stream, {
|
|
headers: {
|
|
'Content-Type': 'text/event-stream; charset=utf-8',
|
|
'Cache-Control': 'no-cache, no-transform',
|
|
Connection: 'keep-alive',
|
|
'X-Accel-Buffering': 'no',
|
|
},
|
|
})
|
|
}
|
|
|
|
async function prisma_jobs_findActive(userId: string): Promise<JobPayload[]> {
|
|
const { prisma } = await import('@/lib/prisma')
|
|
const jobs = await prisma.claudeJob.findMany({
|
|
where: { user_id: userId, status: { notIn: ['DONE'] } },
|
|
select: {
|
|
id: true,
|
|
kind: true,
|
|
status: true,
|
|
task_id: true,
|
|
idea_id: true,
|
|
sprint_run_id: true,
|
|
branch: true,
|
|
error: true,
|
|
summary: true,
|
|
},
|
|
})
|
|
return jobs.map(j => ({
|
|
type: 'claude_job_status' as const,
|
|
job_id: j.id,
|
|
kind: j.kind,
|
|
user_id: userId,
|
|
status: j.status,
|
|
task_id: j.task_id,
|
|
idea_id: j.idea_id,
|
|
sprint_run_id: j.sprint_run_id,
|
|
branch: j.branch ?? undefined,
|
|
error: j.error ?? undefined,
|
|
summary: j.summary ?? undefined,
|
|
}))
|
|
}
|