import { NextRequest } from 'next/server' import { getCurrentUser } from '@/lib/session' import { prisma } from '@/lib/prisma' import { FlowStatus } from '@prisma/client' export const dynamic = 'force-dynamic' const AGENT_URL = process.env.OPS_AGENT_URL ?? 'http://127.0.0.1:3099' const AGENT_SECRET = process.env.OPS_AGENT_SECRET ?? '' const TRUNCATE_BYTES = 64 * 1024 const truncate = (s: string) => (s.length > TRUNCATE_BYTES ? s.slice(-TRUNCATE_BYTES) : s) export async function POST(request: NextRequest) { const user = await getCurrentUser() if (!user) { return Response.json({ error: 'unauthorized' }, { status: 401 }) } let body: { flow_key?: string; dry_run?: boolean } try { body = await request.json() } catch { return Response.json({ error: 'invalid JSON body' }, { status: 400 }) } const { flow_key, dry_run = false } = body if (!flow_key) { return Response.json({ error: 'flow_key required' }, { status: 400 }) } const flowRun = await prisma.flowRun.create({ data: { user_id: user.id, flow_key, status: FlowStatus.running, dry_run, }, }) const encoder = new TextEncoder() const stream = new ReadableStream({ async start(controller) { const enqueue = (event: string, data: unknown) => { controller.enqueue( encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`), ) } enqueue('flow_run_id', { flow_run_id: flowRun.id }) let agentResponse: Response try { agentResponse = await fetch(`${AGENT_URL}/agent/v1/flow`, { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${AGENT_SECRET}`, }, body: JSON.stringify({ flow_key, dry_run }), }) } catch (err) { const message = err instanceof Error ? err.message : 'agent unreachable' await prisma.flowRun.update({ where: { id: flowRun.id }, data: { status: FlowStatus.failed, ended_at: new Date() }, }) enqueue('error', { message }) controller.close() return } if (!agentResponse.ok) { const text = await agentResponse.text() await prisma.flowRun.update({ where: { id: flowRun.id }, data: { status: FlowStatus.failed, ended_at: new Date() }, }) enqueue('error', { message: `agent ${agentResponse.status}: ${text}` }) controller.close() return } const reader = agentResponse.body!.getReader() const decoder = new TextDecoder() let buffer = '' let currentEvent = '' // Per-step accumulators for DB writes const stepRecordIds = new Map() const stepStdout = new Map() const stepStderr = new Map() let currentStepIndex = -1 try { while (true) { const { done, value } = await reader.read() if (done) break buffer += decoder.decode(value, { stream: true }) const lines = buffer.split('\n') buffer = lines.pop() ?? '' for (const line of lines) { if (line.startsWith('event:')) { currentEvent = line.slice(6).trim() } else if (line.startsWith('data:')) { try { const parsed = JSON.parse(line.slice(5).trim()) as Record if (currentEvent === 'step_start') { const stepIndex = parsed.step_index as number currentStepIndex = stepIndex const command_key = String(parsed.command_key ?? '') const args = Array.isArray(parsed.args) ? (parsed.args as string[]) : [] const flowStep = await prisma.flowStep.create({ data: { flow_run_id: flowRun.id, step_index: stepIndex, command_key, args_json: JSON.stringify(args), }, }) stepRecordIds.set(stepIndex, flowStep.id) stepStdout.set(stepIndex, '') stepStderr.set(stepIndex, '') enqueue('step_start', parsed) } else if (currentEvent === 'stdout') { const chunk = String(parsed.data ?? '') if (currentStepIndex >= 0) { stepStdout.set(currentStepIndex, (stepStdout.get(currentStepIndex) ?? '') + chunk) } enqueue('stdout', { data: chunk }) } else if (currentEvent === 'stderr') { const chunk = String(parsed.data ?? '') if (currentStepIndex >= 0) { stepStderr.set(currentStepIndex, (stepStderr.get(currentStepIndex) ?? '') + chunk) } enqueue('stderr', { data: chunk }) } else if (currentEvent === 'step_done') { const stepIndex = parsed.step_index as number const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null const stepId = stepRecordIds.get(stepIndex) if (stepId) { await prisma.flowStep.update({ where: { id: stepId }, data: { exit_code: exitCode, ended_at: new Date(), stdout: truncate(stepStdout.get(stepIndex) ?? ''), stderr: truncate(stepStderr.get(stepIndex) ?? ''), }, }) } enqueue('step_done', parsed) } else if (currentEvent === 'done') { const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null await prisma.flowRun.update({ where: { id: flowRun.id }, data: { status: exitCode === 0 ? FlowStatus.success : FlowStatus.failed, exit_code: exitCode, ended_at: new Date(), }, }) enqueue('done', { flow_run_id: flowRun.id, exit_code: exitCode }) } else if (currentEvent === 'error') { const message = String(parsed.message ?? 'unknown error') await prisma.flowRun.update({ where: { id: flowRun.id }, data: { status: FlowStatus.failed, ended_at: new Date() }, }) enqueue('error', { message }) } } catch { // ignore malformed SSE data } } } } } catch { // stream ended unexpectedly } controller.close() }, }) return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }, }) }