M13: Claude job queue — 'Voer uit'-knop + worker presence (ST-1111) (#18)
* feat(ST-1111.1): add ClaudeJob model and state-machine enum Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.2): add ClaudeJob status API mappers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.3): add enqueue/cancel ClaudeJob server actions with idempotency + NOTIFY Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.4): forward ClaudeJob events on solo SSE stream + initial state Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.6): add 'Voer uit' + cancel buttons to task detail dialog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.7): add job status pill with spinner on solo task cards Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(ST-1111.8): cover job-status mappers and enqueue/cancel actions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs(ST-1111.9): document Claude job queue architecture and agent flow Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.10a): add ClaudeWorker presence model Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.10c): forward worker presence events on solo SSE + initial count Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(ST-1111.10d): show worker presence indicator and gate 'Voer uit' on connected workers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
1cb5772edd
commit
73087e9705
18 changed files with 921 additions and 27 deletions
|
|
@ -23,7 +23,7 @@ const CHANNEL = 'scrum4me_changes'
|
|||
const HEARTBEAT_MS = 25_000
|
||||
const HARD_CLOSE_MS = 240_000
|
||||
|
||||
interface NotifyPayload {
|
||||
type EntityPayload = {
|
||||
op: 'I' | 'U' | 'D'
|
||||
// M11 (ST-1101) voegt entity:'question' toe op hetzelfde scrum4me_changes-
|
||||
// kanaal; we filteren die hieronder weg zodat solo-clients geen
|
||||
|
|
@ -37,12 +37,49 @@ interface NotifyPayload {
|
|||
changed_fields?: string[]
|
||||
}
|
||||
|
||||
type JobPayload = {
|
||||
type: 'claude_job_enqueued' | 'claude_job_status'
|
||||
job_id: string
|
||||
task_id: string
|
||||
user_id: string
|
||||
product_id: string
|
||||
status: string
|
||||
branch?: string
|
||||
summary?: string
|
||||
error?: string
|
||||
}
|
||||
|
||||
type WorkerPayload = {
|
||||
type: 'worker_connected' | 'worker_disconnected'
|
||||
user_id: string
|
||||
token_id: string
|
||||
product_id?: string
|
||||
}
|
||||
|
||||
type NotifyPayload = EntityPayload | JobPayload | WorkerPayload
|
||||
|
||||
function isJobPayload(p: NotifyPayload): p is JobPayload {
|
||||
return 'type' in p && (p.type === 'claude_job_enqueued' || p.type === 'claude_job_status')
|
||||
}
|
||||
|
||||
function isWorkerPayload(p: NotifyPayload): p is WorkerPayload {
|
||||
return 'type' in p && (p.type === 'worker_connected' || p.type === 'worker_disconnected')
|
||||
}
|
||||
|
||||
function shouldEmit(
|
||||
payload: NotifyPayload,
|
||||
productId: string,
|
||||
activeSprintId: string | null,
|
||||
userId: string,
|
||||
): boolean {
|
||||
if (isJobPayload(payload)) {
|
||||
return payload.user_id === userId && payload.product_id === productId
|
||||
}
|
||||
|
||||
if (isWorkerPayload(payload)) {
|
||||
return payload.user_id === userId
|
||||
}
|
||||
|
||||
// M11 (ST-1104): question-events horen op /api/realtime/notifications, niet hier.
|
||||
if (payload.entity === 'question') return false
|
||||
|
||||
|
|
@ -159,6 +196,17 @@ export async function GET(request: NextRequest) {
|
|||
})}\n\n`,
|
||||
)
|
||||
|
||||
// Stuur initiële ClaudeJob-state zodat de UI synchroon is bij reconnect
|
||||
const activeJobs = await prisma_jobs_findActive(userId, productId)
|
||||
if (activeJobs.length > 0) {
|
||||
enqueue(`event: claude_jobs_initial\ndata: ${JSON.stringify(activeJobs)}\n\n`)
|
||||
}
|
||||
|
||||
// Stale workers opruimen + actieve count sturen
|
||||
await prisma_workers_cleanup()
|
||||
const workerCount = await prisma_workers_count(userId)
|
||||
enqueue(`event: workers_initial\ndata: ${JSON.stringify({ count: workerCount })}\n\n`)
|
||||
|
||||
// Heartbeat als SSE-comment — voorkomt proxy-timeouts
|
||||
heartbeatTimer = setInterval(() => {
|
||||
enqueue(`: heartbeat\n\n`)
|
||||
|
|
@ -186,8 +234,6 @@ export async function GET(request: NextRequest) {
|
|||
})
|
||||
}
|
||||
|
||||
// Lokaal helper — Prisma vermijden voor deze ene query om de pg-only flow
|
||||
// schoon te houden. Geeft de actieve sprint van een product, of null.
|
||||
async function prisma_sprint_findActive(productId: string): Promise<{ id: string } | null> {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
return prisma.sprint.findFirst({
|
||||
|
|
@ -196,3 +242,51 @@ async function prisma_sprint_findActive(productId: string): Promise<{ id: string
|
|||
orderBy: { created_at: 'desc' },
|
||||
})
|
||||
}
|
||||
|
||||
async function prisma_jobs_findActive(userId: string, productId: string) {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
const { jobStatusToApi } = await import('@/lib/job-status')
|
||||
const today = new Date()
|
||||
today.setHours(0, 0, 0, 0)
|
||||
const jobs = await prisma.claudeJob.findMany({
|
||||
where: {
|
||||
user_id: userId,
|
||||
product_id: productId,
|
||||
OR: [
|
||||
{ status: { in: ['QUEUED', 'CLAIMED', 'RUNNING'] } },
|
||||
{ status: { in: ['DONE', 'FAILED'] }, finished_at: { gte: today } },
|
||||
],
|
||||
},
|
||||
select: {
|
||||
id: true, task_id: true, status: true, branch: true, summary: true, error: true,
|
||||
},
|
||||
orderBy: { created_at: 'asc' },
|
||||
})
|
||||
return jobs.map(j => ({
|
||||
job_id: j.id,
|
||||
task_id: j.task_id,
|
||||
status: jobStatusToApi(j.status),
|
||||
branch: j.branch ?? undefined,
|
||||
summary: j.summary ?? undefined,
|
||||
error: j.error ?? undefined,
|
||||
}))
|
||||
}
|
||||
|
||||
const WORKER_STALE_MS = 60_000
|
||||
|
||||
async function prisma_workers_cleanup() {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
await prisma.claudeWorker.deleteMany({
|
||||
where: { last_seen_at: { lt: new Date(Date.now() - WORKER_STALE_MS) } },
|
||||
})
|
||||
}
|
||||
|
||||
async function prisma_workers_count(userId: string): Promise<number> {
|
||||
const { prisma } = await import('@/lib/prisma')
|
||||
return prisma.claudeWorker.count({
|
||||
where: {
|
||||
user_id: userId,
|
||||
last_seen_at: { gt: new Date(Date.now() - 15_000) },
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue