- Truncate accumulated stdout/stderr to last 64KB before persisting FlowStep to prevent unbounded DB growth on verbose commands - Add @@index([user_id, started_at(sort: Desc)]) to FlowRun schema so audit list queries (WHERE user_id = ? ORDER BY started_at DESC) use the index - Add migration 20260513200000_flowrun_user_idx Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
5.4 KiB
TypeScript
169 lines
5.4 KiB
TypeScript
import { NextRequest } from 'next/server'
|
|
import { getCurrentUser } from '@/lib/session'
|
|
import { prisma } from '@/lib/prisma'
|
|
import { FlowStatus } from '@prisma/client'
|
|
|
|
export const dynamic = 'force-dynamic'
|
|
|
|
const AGENT_URL = process.env.OPS_AGENT_URL ?? 'http://127.0.0.1:3099'
|
|
const AGENT_SECRET = process.env.OPS_AGENT_SECRET ?? ''
|
|
|
|
export async function POST(request: NextRequest) {
|
|
const user = await getCurrentUser()
|
|
if (!user) {
|
|
return Response.json({ error: 'unauthorized' }, { status: 401 })
|
|
}
|
|
|
|
let body: { command_key?: string; args?: string[]; stdin?: string }
|
|
try {
|
|
body = await request.json()
|
|
} catch {
|
|
return Response.json({ error: 'invalid JSON body' }, { status: 400 })
|
|
}
|
|
|
|
const { command_key, args = [], stdin } = body
|
|
if (!command_key) {
|
|
return Response.json({ error: 'command_key required' }, { status: 400 })
|
|
}
|
|
|
|
const flowRun = await prisma.flowRun.create({
|
|
data: {
|
|
user_id: user.id,
|
|
flow_key: command_key,
|
|
status: FlowStatus.running,
|
|
},
|
|
})
|
|
|
|
const flowStep = await prisma.flowStep.create({
|
|
data: {
|
|
flow_run_id: flowRun.id,
|
|
step_index: 0,
|
|
command_key,
|
|
args_json: JSON.stringify(args),
|
|
},
|
|
})
|
|
|
|
const encoder = new TextEncoder()
|
|
|
|
const stream = new ReadableStream({
|
|
async start(controller) {
|
|
const enqueue = (event: string, data: unknown) => {
|
|
controller.enqueue(
|
|
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`),
|
|
)
|
|
}
|
|
|
|
enqueue('flow_run_id', { flow_run_id: flowRun.id })
|
|
|
|
const TRUNCATE_BYTES = 64 * 1024
|
|
const truncate = (s: string) =>
|
|
s.length > TRUNCATE_BYTES ? s.slice(-TRUNCATE_BYTES) : s
|
|
|
|
let agentResponse: Response
|
|
try {
|
|
agentResponse = await fetch(`${AGENT_URL}/agent/v1/exec`, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
Authorization: `Bearer ${AGENT_SECRET}`,
|
|
},
|
|
body: JSON.stringify({ command_key, args, ...(stdin != null ? { stdin } : {}) }),
|
|
})
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : 'agent unreachable'
|
|
await prisma.flowRun.update({
|
|
where: { id: flowRun.id },
|
|
data: { status: FlowStatus.failed, ended_at: new Date() },
|
|
})
|
|
enqueue('error', { message })
|
|
controller.close()
|
|
return
|
|
}
|
|
|
|
if (!agentResponse.ok) {
|
|
const text = await agentResponse.text()
|
|
await prisma.flowRun.update({
|
|
where: { id: flowRun.id },
|
|
data: { status: FlowStatus.failed, ended_at: new Date() },
|
|
})
|
|
enqueue('error', { message: `agent ${agentResponse.status}: ${text}` })
|
|
controller.close()
|
|
return
|
|
}
|
|
|
|
const reader = agentResponse.body!.getReader()
|
|
const decoder = new TextDecoder()
|
|
let buffer = ''
|
|
let currentEvent = ''
|
|
let stdout = ''
|
|
let stderr = ''
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
|
|
buffer += decoder.decode(value, { stream: true })
|
|
const lines = buffer.split('\n')
|
|
buffer = lines.pop() ?? ''
|
|
|
|
for (const line of lines) {
|
|
if (line.startsWith('event:')) {
|
|
currentEvent = line.slice(6).trim()
|
|
} else if (line.startsWith('data:')) {
|
|
try {
|
|
const parsed = JSON.parse(line.slice(5).trim()) as Record<string, unknown>
|
|
if (currentEvent === 'stdout') {
|
|
const chunk = String(parsed.data ?? '')
|
|
stdout += chunk
|
|
enqueue('stdout', { data: chunk })
|
|
} else if (currentEvent === 'stderr') {
|
|
const chunk = String(parsed.data ?? '')
|
|
stderr += chunk
|
|
enqueue('stderr', { data: chunk })
|
|
} else if (currentEvent === 'exit') {
|
|
const exitCode = typeof parsed.code === 'number' ? parsed.code : null
|
|
const now = new Date()
|
|
await prisma.flowStep.update({
|
|
where: { id: flowStep.id },
|
|
data: { stdout: truncate(stdout), stderr: truncate(stderr), exit_code: exitCode, ended_at: now },
|
|
})
|
|
await prisma.flowRun.update({
|
|
where: { id: flowRun.id },
|
|
data: {
|
|
status: exitCode === 0 ? FlowStatus.success : FlowStatus.failed,
|
|
exit_code: exitCode,
|
|
ended_at: now,
|
|
},
|
|
})
|
|
enqueue('done', { flow_run_id: flowRun.id, exit_code: exitCode })
|
|
} else if (currentEvent === 'error') {
|
|
const message = String(parsed.message ?? 'unknown error')
|
|
await prisma.flowRun.update({
|
|
where: { id: flowRun.id },
|
|
data: { status: FlowStatus.failed, ended_at: new Date() },
|
|
})
|
|
enqueue('error', { message })
|
|
}
|
|
} catch {
|
|
// ignore malformed SSE data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch {
|
|
// stream ended unexpectedly
|
|
}
|
|
|
|
controller.close()
|
|
},
|
|
})
|
|
|
|
return new Response(stream, {
|
|
headers: {
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
Connection: 'keep-alive',
|
|
},
|
|
})
|
|
}
|