feat(jobs): useJobsRealtime fetch-on-unknown met dedup-Set
Wanneer een SSE-event een onbekend job_id bevat, haalt de hook de volledige job op via GET /api/jobs/[id] en upsert die in de store. Een inFlight-Set voorkomt gelijktijdige dubbele fetches voor hetzelfde job_id. Bekende jobs blijven de bestaande partial-upsert gebruiken. Zelfde logica in jobs_initial. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c70ec17b79
commit
b76ae290e3
2 changed files with 183 additions and 9 deletions
147
__tests__/hooks/use-jobs-realtime.test.tsx
Normal file
147
__tests__/hooks/use-jobs-realtime.test.tsx
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
// @vitest-environment jsdom
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { renderHook, act } from '@testing-library/react'
|
||||
import { useJobsStore } from '@/stores/jobs-store'
|
||||
import useJobsRealtime from '@/hooks/use-jobs-realtime'
|
||||
|
||||
type Listener = (event: { data: string }) => void
|
||||
|
||||
class MockEventSource {
|
||||
static instance: MockEventSource | null = null
|
||||
private listeners: Record<string, Listener[]> = {}
|
||||
onerror: (() => void) | null = null
|
||||
|
||||
constructor(_url: string) {
|
||||
MockEventSource.instance = this
|
||||
}
|
||||
|
||||
addEventListener(type: string, listener: Listener) {
|
||||
if (!this.listeners[type]) this.listeners[type] = []
|
||||
this.listeners[type].push(listener)
|
||||
}
|
||||
|
||||
dispatch(type: string, data: unknown) {
|
||||
for (const l of this.listeners[type] ?? []) {
|
||||
l({ data: JSON.stringify(data) })
|
||||
}
|
||||
}
|
||||
|
||||
close() {}
|
||||
}
|
||||
|
||||
const fullJob = {
|
||||
id: 'job-unknown-1',
|
||||
kind: 'TASK_IMPLEMENTATION',
|
||||
status: 'RUNNING',
|
||||
taskCode: 'T-1',
|
||||
taskTitle: 'Test',
|
||||
ideaCode: null,
|
||||
ideaTitle: null,
|
||||
sprintGoal: null,
|
||||
sprintCode: null,
|
||||
productName: 'Scrum4Me',
|
||||
productCode: null,
|
||||
storyCode: null,
|
||||
pbiCode: null,
|
||||
modelId: null,
|
||||
inputTokens: null,
|
||||
outputTokens: null,
|
||||
cacheReadTokens: null,
|
||||
cacheWriteTokens: null,
|
||||
costUsd: null,
|
||||
branch: null,
|
||||
prUrl: null,
|
||||
error: null,
|
||||
summary: null,
|
||||
description: null,
|
||||
verifyResult: null,
|
||||
startedAt: null,
|
||||
finishedAt: null,
|
||||
createdAt: new Date('2026-01-01'),
|
||||
sprintRunId: null,
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.stubGlobal('EventSource', MockEventSource)
|
||||
MockEventSource.instance = null
|
||||
|
||||
// Lege store
|
||||
useJobsStore.setState({ activeJobs: [], doneJobs: [], selectedJobId: null })
|
||||
|
||||
// fetch resolveert naar de volledige job
|
||||
vi.stubGlobal(
|
||||
'fetch',
|
||||
vi.fn().mockImplementation(async () => ({
|
||||
ok: true,
|
||||
json: async () => fullJob,
|
||||
}))
|
||||
)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllGlobals()
|
||||
vi.restoreAllMocks()
|
||||
})
|
||||
|
||||
describe('useJobsRealtime: fetch-on-unknown', () => {
|
||||
it('haalt onbekende job op via REST bij message-event', async () => {
|
||||
renderHook(() => useJobsRealtime())
|
||||
const es = MockEventSource.instance!
|
||||
|
||||
// Dispatch twee events met hetzelfde onbekende job_id gelijktijdig
|
||||
act(() => {
|
||||
es.dispatch('message', { job_id: 'job-unknown-1', status: 'RUNNING' })
|
||||
es.dispatch('message', { job_id: 'job-unknown-1', status: 'RUNNING' })
|
||||
})
|
||||
|
||||
// Wacht op alle microtasks / fetch-promises
|
||||
await act(async () => {
|
||||
await Promise.resolve()
|
||||
})
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
expect(fetch).toHaveBeenCalledWith('/api/jobs/job-unknown-1')
|
||||
|
||||
const { activeJobs } = useJobsStore.getState()
|
||||
expect(activeJobs.some(j => j.id === 'job-unknown-1')).toBe(true)
|
||||
expect(activeJobs.find(j => j.id === 'job-unknown-1')?.taskTitle).toBe('Test')
|
||||
})
|
||||
|
||||
it('gebruikt partial-upsert voor bekende jobs bij message-event', async () => {
|
||||
// Zet een bekende job in de store
|
||||
useJobsStore.setState({
|
||||
activeJobs: [{ ...fullJob, id: 'job-known-1', status: 'QUEUED' } as never],
|
||||
doneJobs: [],
|
||||
selectedJobId: null,
|
||||
})
|
||||
|
||||
renderHook(() => useJobsRealtime())
|
||||
const es = MockEventSource.instance!
|
||||
|
||||
act(() => {
|
||||
es.dispatch('message', { job_id: 'job-known-1', status: 'RUNNING', branch: 'feat/x' })
|
||||
})
|
||||
|
||||
await act(async () => { await Promise.resolve() })
|
||||
|
||||
expect(fetch).not.toHaveBeenCalled()
|
||||
const { activeJobs } = useJobsStore.getState()
|
||||
expect(activeJobs.find(j => j.id === 'job-known-1')?.status).toBe('RUNNING')
|
||||
})
|
||||
|
||||
it('haalt onbekende job op via REST bij jobs_initial-event', async () => {
|
||||
renderHook(() => useJobsRealtime())
|
||||
const es = MockEventSource.instance!
|
||||
|
||||
act(() => {
|
||||
es.dispatch('jobs_initial', [{ job_id: 'job-unknown-1', status: 'RUNNING' }])
|
||||
})
|
||||
|
||||
await act(async () => { await Promise.resolve() })
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
expect(fetch).toHaveBeenCalledWith('/api/jobs/job-unknown-1')
|
||||
const { activeJobs } = useJobsStore.getState()
|
||||
expect(activeJobs.some(j => j.id === 'job-unknown-1')).toBe(true)
|
||||
})
|
||||
})
|
||||
|
|
@ -24,6 +24,22 @@ export default function useJobsRealtime() {
|
|||
let es: EventSource | null = null
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let active = true
|
||||
const inFlight = new Set<string>()
|
||||
|
||||
async function fetchAndUpsert(jobId: string) {
|
||||
if (inFlight.has(jobId)) return
|
||||
inFlight.add(jobId)
|
||||
try {
|
||||
const res = await fetch(`/api/jobs/${jobId}`)
|
||||
if (!res.ok) return
|
||||
const job = await res.json()
|
||||
if (active) upsertJob(job)
|
||||
} catch {
|
||||
// netwerk-/parse-fout: stil
|
||||
} finally {
|
||||
inFlight.delete(jobId)
|
||||
}
|
||||
}
|
||||
|
||||
function connect() {
|
||||
if (!active) return
|
||||
|
|
@ -34,20 +50,25 @@ export default function useJobsRealtime() {
|
|||
// De server stuurt JobPayload[] (met `job_id`), niet JobWithRelations[].
|
||||
// Daarom geen initJobs-overwrite — de SSR-fetch heeft de volledige
|
||||
// shape al in de store geplaatst. We reconcileren alleen status/branch
|
||||
// van bekende jobs en pushen onbekende jobs (nieuw aangemaakt tussen
|
||||
// SSR en SSE-connect) als partials.
|
||||
// van bekende jobs en fetchen onbekende jobs volledig via REST.
|
||||
try {
|
||||
const payload = JSON.parse(event.data)
|
||||
if (!Array.isArray(payload)) return
|
||||
const { activeJobs, doneJobs } = useJobsStore.getState()
|
||||
for (const p of payload as JobStatusPayload[]) {
|
||||
if (!p.job_id) continue
|
||||
upsertJob({
|
||||
id: p.job_id,
|
||||
status: p.status as ClaudeJobStatus,
|
||||
branch: p.branch ?? null,
|
||||
error: p.error ?? null,
|
||||
summary: p.summary ?? null,
|
||||
})
|
||||
const known = activeJobs.some(j => j.id === p.job_id) || doneJobs.some(j => j.id === p.job_id)
|
||||
if (!known) {
|
||||
void fetchAndUpsert(p.job_id)
|
||||
} else {
|
||||
upsertJob({
|
||||
id: p.job_id,
|
||||
status: p.status as ClaudeJobStatus,
|
||||
branch: p.branch ?? null,
|
||||
error: p.error ?? null,
|
||||
summary: p.summary ?? null,
|
||||
})
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// malformed JSON
|
||||
|
|
@ -58,6 +79,12 @@ export default function useJobsRealtime() {
|
|||
try {
|
||||
const payload = JSON.parse(event.data) as JobStatusPayload
|
||||
if (!payload.job_id) return
|
||||
const { activeJobs, doneJobs } = useJobsStore.getState()
|
||||
const known = activeJobs.some(j => j.id === payload.job_id) || doneJobs.some(j => j.id === payload.job_id)
|
||||
if (!known) {
|
||||
void fetchAndUpsert(payload.job_id)
|
||||
return
|
||||
}
|
||||
upsertJob({
|
||||
id: payload.job_id,
|
||||
status: payload.status as ClaudeJobStatus,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue