From 5c226fb042e116b3a36de8b923ddcdda6084a519 Mon Sep 17 00:00:00 2001 From: Madhura68 Date: Wed, 29 Apr 2026 19:11:55 +0200 Subject: [PATCH] feat(ST-1111.10c): forward worker presence events on solo SSE + initial count Co-Authored-By: Claude Sonnet 4.6 --- app/api/realtime/solo/route.ts | 41 +++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/app/api/realtime/solo/route.ts b/app/api/realtime/solo/route.ts index 539eb17..4e93ba8 100644 --- a/app/api/realtime/solo/route.ts +++ b/app/api/realtime/solo/route.ts @@ -49,12 +49,23 @@ type JobPayload = { error?: string } -type NotifyPayload = EntityPayload | JobPayload +type WorkerPayload = { + type: 'worker_connected' | 'worker_disconnected' + user_id: string + token_id: string + product_id?: string +} + +type NotifyPayload = EntityPayload | JobPayload | WorkerPayload function isJobPayload(p: NotifyPayload): p is JobPayload { return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status') } +function isWorkerPayload(p: NotifyPayload): p is WorkerPayload { + return 'type' in p && (p.type === 'worker_connected' || p.type === 'worker_disconnected') +} + function shouldEmit( payload: NotifyPayload, productId: string, @@ -65,6 +76,10 @@ function shouldEmit( return payload.user_id === userId && payload.product_id === productId } + if (isWorkerPayload(payload)) { + return payload.user_id === userId + } + // M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier. if (payload.entity === 'question') return false @@ -187,6 +202,11 @@ export async function GET(request: NextRequest) { enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`) } + // Stale workers opruimen + actieve count sturen + await prisma_workers_cleanup() + const workerCount = await prisma_workers_count(userId) + enqueue(`event: workers_initial\ndata: ${JSON.stringify({ count: workerCount })}\n\n`) + // Heartbeat als SSE-comment — voorkomt proxy-timeouts heartbeatTimer = setInterval(() => { enqueue(`: heartbeat\n\n`) @@ -251,3 +271,22 @@ async function prisma_jobs_findActive(userId: string, productId: string) { error: j.error ?? undefined, })) } + +const WORKER_STALE_MS = 60_000 + +async function prisma_workers_cleanup() { + const { prisma } = await import('@/lib/prisma') + await prisma.claudeWorker.deleteMany({ + where: { last_seen_at: { lt: new Date(Date.now() - WORKER_STALE_MS) } }, + }) +} + +async function prisma_workers_count(userId: string): Promise { + const { prisma } = await import('@/lib/prisma') + return prisma.claudeWorker.count({ + where: { + user_id: userId, + last_seen_at: { gt: new Date(Date.now() - 15_000) }, + }, + }) +}