diff --git a/app/api/flows/start/route.ts b/app/api/flows/start/route.ts new file mode 100644 index 0000000..6fded15 --- /dev/null +++ b/app/api/flows/start/route.ts @@ -0,0 +1,165 @@ +import { NextRequest } from 'next/server' +import { getCurrentUser } from '@/lib/session' +import { prisma } from '@/lib/prisma' +import { FlowStatus } from '@prisma/client' + +export const dynamic = 'force-dynamic' + +const AGENT_URL = process.env.OPS_AGENT_URL ?? 'http://127.0.0.1:3099' +const AGENT_SECRET = process.env.OPS_AGENT_SECRET ?? '' + +export async function POST(request: NextRequest) { + const user = await getCurrentUser() + if (!user) { + return Response.json({ error: 'unauthorized' }, { status: 401 }) + } + + let body: { command_key?: string; args?: string[]; stdin?: string } + try { + body = await request.json() + } catch { + return Response.json({ error: 'invalid JSON body' }, { status: 400 }) + } + + const { command_key, args = [], stdin } = body + if (!command_key) { + return Response.json({ error: 'command_key required' }, { status: 400 }) + } + + const flowRun = await prisma.flowRun.create({ + data: { + user_id: user.id, + flow_key: command_key, + status: FlowStatus.running, + }, + }) + + const flowStep = await prisma.flowStep.create({ + data: { + flow_run_id: flowRun.id, + step_index: 0, + command_key, + args_json: JSON.stringify(args), + }, + }) + + const encoder = new TextEncoder() + + const stream = new ReadableStream({ + async start(controller) { + const enqueue = (event: string, data: unknown) => { + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`), + ) + } + + enqueue('flow_run_id', { flow_run_id: flowRun.id }) + + let agentResponse: Response + try { + agentResponse = await fetch(`${AGENT_URL}/agent/v1/exec`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${AGENT_SECRET}`, + }, + body: JSON.stringify({ command_key, args, ...(stdin != null ? { stdin } : {}) }), + }) + } catch (err) { + const message = err instanceof Error ? err.message : 'agent unreachable' + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message }) + controller.close() + return + } + + if (!agentResponse.ok) { + const text = await agentResponse.text() + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message: `agent ${agentResponse.status}: ${text}` }) + controller.close() + return + } + + const reader = agentResponse.body!.getReader() + const decoder = new TextDecoder() + let buffer = '' + let currentEvent = '' + let stdout = '' + let stderr = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim() + } else if (line.startsWith('data:')) { + try { + const parsed = JSON.parse(line.slice(5).trim()) as Record + if (currentEvent === 'stdout') { + const chunk = String(parsed.data ?? '') + stdout += chunk + enqueue('stdout', { data: chunk }) + } else if (currentEvent === 'stderr') { + const chunk = String(parsed.data ?? '') + stderr += chunk + enqueue('stderr', { data: chunk }) + } else if (currentEvent === 'exit') { + const exitCode = typeof parsed.code === 'number' ? parsed.code : null + const now = new Date() + await prisma.flowStep.update({ + where: { id: flowStep.id }, + data: { stdout, stderr, exit_code: exitCode, ended_at: now }, + }) + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { + status: exitCode === 0 ? FlowStatus.success : FlowStatus.failed, + exit_code: exitCode, + ended_at: now, + }, + }) + enqueue('done', { flow_run_id: flowRun.id, exit_code: exitCode }) + } else if (currentEvent === 'error') { + const message = String(parsed.message ?? 'unknown error') + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message }) + } + } catch { + // ignore malformed SSE data + } + } + } + } + } catch { + // stream ended unexpectedly + } + + controller.close() + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }) +}