feat(ST-1111.4): forward ClaudeJob events on solo SSE stream + initial state

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Janpeter Visser 2026-04-29 19:03:48 +02:00
parent 9d9fb4b4c0
commit ece0aa963d
3 changed files with 129 additions and 5 deletions

View file

@ -23,7 +23,7 @@ const CHANNEL = 'scrum4me_changes'
const HEARTBEAT_MS = 25_000
const HARD_CLOSE_MS = 240_000
interface NotifyPayload {
type EntityPayload = {
op: 'I' | 'U' | 'D'
// M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes-
// kanaal; we filteren die hieronder weg zodat solo-clients geen
@ -37,12 +37,34 @@ interface NotifyPayload {
changed_fields?: string[]
}
type JobPayload = {
type: 'claude_job_enqueued' | 'claude_job_status'
job_id: string
task_id: string
user_id: string
product_id: string
status: string
branch?: string
summary?: string
error?: string
}
type NotifyPayload = EntityPayload | JobPayload
function isJobPayload(p: NotifyPayload): p is JobPayload {
return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status')
}
function shouldEmit(
payload: NotifyPayload,
productId: string,
activeSprintId: string | null,
userId: string,
): boolean {
if (isJobPayload(payload)) {
return payload.user_id === userId && payload.product_id === productId
}
// M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier.
if (payload.entity === 'question') return false
@ -159,6 +181,12 @@ export async function GET(request: NextRequest) {
})}\n\n`,
)
// Stuur initiële ClaudeJob-state zodat de UI synchroon is bij reconnect
const activeJobs = await prisma_jobs_findActive(userId, productId)
if (activeJobs.length > 0) {
enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`)
}
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
heartbeatTimer = setInterval(() => {
enqueue(`: heartbeat\n\n`)
@ -186,8 +214,6 @@ export async function GET(request: NextRequest) {
})
}
// Lokaal helper — Prisma vermijden voor deze ene query om de pg-only flow
// schoon te houden. Geeft de actieve sprint van een product, of null.
async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> {
const { prisma } = await import('@/lib/prisma')
return prisma.sprint.findFirst({
@ -196,3 +222,32 @@ async function prisma_sprint_findActive(productId: string): Promise<{ id: string
orderBy: { created_at: 'desc' },
})
}
async function prisma_jobs_findActive(userId: string, productId: string) {
const { prisma } = await import('@/lib/prisma')
const { jobStatusToApi } = await import('@/lib/job-status')
const today = new Date()
today.setHours(0, 0, 0, 0)
const jobs = await prisma.claudeJob.findMany({
where: {
user_id: userId,
product_id: productId,
OR: [
{ status: { in: ['QUEUED', 'CLAIMED', 'RUNNING'] } },
{ status: { in: ['DONE', 'FAILED'] }, finished_at: { gte: today } },
],
},
select: {
id: true, task_id: true, status: true, branch: true, summary: true, error: true,
},
orderBy: { created_at: 'asc' },
})
return jobs.map(j => ({
job_id: j.id,
task_id: j.task_id,
status: jobStatusToApi(j.status),
branch: j.branch ?? undefined,
summary: j.summary ?? undefined,
error: j.error ?? undefined,
}))
}

View file

@ -20,7 +20,7 @@
import { useEffect, useRef } from 'react'
import { flushSync } from 'react-dom'
import { useSoloStore } from '@/stores/solo-store'
import type { RealtimeEvent, RealtimeStatus } from '@/stores/solo-store'
import type { ClaudeJobEvent, JobState, RealtimeEvent, RealtimeStatus } from '@/stores/solo-store'
const BACKOFF_START_MS = 1_000
const BACKOFF_MAX_MS = 30_000
@ -35,6 +35,8 @@ export function useSoloRealtime(productId: string | null) {
useEffect(() => {
const setStatus = useSoloStore.getState().setRealtimeStatus
const handleEvent = useSoloStore.getState().handleRealtimeEvent
const handleJobEvent = useSoloStore.getState().handleJobEvent
const initJobs = useSoloStore.getState().initJobs
if (!productId) {
// Geen actief product (gebruiker zit niet op /solo) — stream uit
@ -84,10 +86,24 @@ export function useSoloRealtime(productId: string | null) {
scheduleIndicator('open')
})
source.addEventListener('claude_jobs_initial', (e) => {
if (!e.data) return
try {
initJobs(JSON.parse(e.data) as JobState[])
} catch {
// ignore malformed payload
}
})
source.onmessage = (e) => {
if (!e.data) return
try {
const payload = JSON.parse(e.data) as RealtimeEvent
const raw = JSON.parse(e.data) as RealtimeEvent | ClaudeJobEvent
if ('type' in raw && (raw.type === 'claude_job_enqueued' || raw.type === 'claude_job_status')) {
handleJobEvent(raw)
return
}
const payload = raw as RealtimeEvent
// Animatie A: kanban-move animeren via View Transitions API. Voor
// task UPDATE-events wrap'en we de store-update in een view
// transition. flushSync forceert React om synchroon te renderen

View file

@ -1,8 +1,22 @@
import { create } from 'zustand'
import type { SoloTask } from '@/components/solo/solo-board'
import type { ClaudeJobStatusApi } from '@/lib/job-status'
type TaskStatus = SoloTask['status']
export interface JobState {
job_id: string
task_id: string
status: ClaudeJobStatusApi
branch?: string
summary?: string
error?: string
}
export type ClaudeJobEvent =
| { type: 'claude_job_enqueued'; job_id: string; task_id: string; user_id: string; product_id: string; status: 'queued' }
| { type: 'claude_job_status'; job_id: string; task_id: string; user_id: string; product_id: string; status: ClaudeJobStatusApi; branch?: string; summary?: string; error?: string }
// Payload-shape gepubliceerd door de Postgres-trigger via pg_notify (ST-801
// + ST-804 prereq). Komt het Solo Paneel binnen via de SSE-stream uit
// /api/realtime/solo (ST-802).
@ -42,6 +56,8 @@ interface SoloStore {
realtimeStatus: RealtimeStatus
showConnectingIndicator: boolean
claudeJobsByTaskId: Record<string, JobState>
initTasks: (tasks: SoloTask[]) => void
optimisticMove: (taskId: string, toStatus: TaskStatus) => TaskStatus | null
rollback: (taskId: string, prevStatus: TaskStatus) => void
@ -52,6 +68,9 @@ interface SoloStore {
setRealtimeStatus: (status: RealtimeStatus, showConnectingIndicator: boolean) => void
initJobs: (jobs: JobState[]) => void
handleJobEvent: (event: ClaudeJobEvent) => void
handleRealtimeEvent: (event: RealtimeEvent) => void
}
@ -60,6 +79,7 @@ export const useSoloStore = create<SoloStore>((set, get) => ({
pendingOps: new Set<string>(),
realtimeStatus: 'connecting',
showConnectingIndicator: false,
claudeJobsByTaskId: {},
initTasks: (tasks) =>
set({ tasks: Object.fromEntries(tasks.map(t => [t.id, t])) }),
@ -101,6 +121,39 @@ export const useSoloStore = create<SoloStore>((set, get) => ({
return { realtimeStatus: status, showConnectingIndicator }
}),
initJobs: (jobs) =>
set({ claudeJobsByTaskId: Object.fromEntries(jobs.map(j => [j.task_id, j])) }),
handleJobEvent: (event) => {
const { job_id, task_id } = event
if (event.type === 'claude_job_enqueued') {
set((s) => ({
claudeJobsByTaskId: {
...s.claudeJobsByTaskId,
[task_id]: { job_id, task_id, status: 'queued' },
},
}))
return
}
if (event.type === 'claude_job_status') {
const { status, branch, summary, error } = event
if (status === 'cancelled') {
set((s) => {
const next = { ...s.claudeJobsByTaskId }
delete next[task_id]
return { claudeJobsByTaskId: next }
})
return
}
set((s) => ({
claudeJobsByTaskId: {
...s.claudeJobsByTaskId,
[task_id]: { job_id, task_id, status, branch, summary, error },
},
}))
}
},
handleRealtimeEvent: (event) => {
if (event.entity === 'task') {
const { id, op } = event