feat(ST-1111.10c): forward worker presence events on solo SSE + initial count
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0fb4227cb1
commit
5c226fb042
1 changed files with 40 additions and 1 deletions
|
|
@ -49,12 +49,23 @@ type JobPayload = {
|
|||
error?: string
|
||||
}
|
||||
|
||||
type NotifyPayload = EntityPayload | JobPayload
|
||||
type WorkerPayload = {
|
||||
type: 'worker_connected' | 'worker_disconnected'
|
||||
user_id: string
|
||||
token_id: string
|
||||
product_id?: string
|
||||
}
|
||||
|
||||
type NotifyPayload = EntityPayload | JobPayload | WorkerPayload
|
||||
|
||||
function isJobPayload(p: NotifyPayload): p is JobPayload {
|
||||
return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status')
|
||||
}
|
||||
|
||||
function isWorkerPayload(p: NotifyPayload): p is WorkerPayload {
|
||||
return 'type' in p && (p.type === 'worker_connected' || p.type === 'worker_disconnected')
|
||||
}
|
||||
|
||||
function shouldEmit(
|
||||
payload: NotifyPayload,
|
||||
productId: string,
|
||||
|
|
@ -65,6 +76,10 @@ function shouldEmit(
|
|||
return payload.user_id === userId && payload.product_id === productId
|
||||
}
|
||||
|
||||
if (isWorkerPayload(payload)) {
|
||||
return payload.user_id === userId
|
||||
}
|
||||
|
||||
// M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier.
|
||||
if (payload.entity === 'question') return false
|
||||
|
||||
|
|
@ -187,6 +202,11 @@ export async function GET(request: NextRequest) {
|
|||
enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`)
|
||||
}
|
||||
|
||||
// Stale workers opruimen + actieve count sturen
|
||||
await prisma_workers_cleanup()
|
||||
const workerCount = await prisma_workers_count(userId)
|
||||
enqueue(`event: workers_initial\ndata: ${JSON.stringify({ count: workerCount })}\n\n`)
|
||||
|
||||
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
|
||||
heartbeatTimer = setInterval(() => {
|
||||
enqueue(`: heartbeat\n\n`)
|
||||
|
|
@ -251,3 +271,22 @@ async function prisma_jobs_findActive(userId: string, productId: string) {
|
|||
error: j.error ?? undefined,
|
||||
}))
|
||||
}
|
||||
|
||||
const WORKER_STALE_MS = 60_000
|
||||
|
||||
async function prisma_workers_cleanup() {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
await prisma.claudeWorker.deleteMany({
|
||||
where: { last_seen_at: { lt: new Date(Date.now() - WORKER_STALE_MS) } },
|
||||
})
|
||||
}
|
||||
|
||||
async function prisma_workers_count(userId: string): Promise<number> {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
return prisma.claudeWorker.count({
|
||||
where: {
|
||||
user_id: userId,
|
||||
last_seen_at: { gt: new Date(Date.now() - 15_000) },
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue