- ops-agent/src/lib/flow-runner.ts: loads YAML flows, validates all steps
against the command whitelist, executes sequentially; supports dry_run
(emits WOULD RUN lines) and on_failure: abort|continue per step
- ops-agent/src/routes/flow.ts: POST /agent/v1/flow { flow_key, dry_run }
streams step_start/stdout/stderr/step_done/done SSE events
- ops-agent/src/index.ts: register flow route, add FLOWS_PATH env var
- ops-agent/flows.example/: three flow definitions — update_scrum4me_web,
update_mcp_worker, update_caddy_config; deploy to /etc/ops-agent/flows/
- ops-agent/commands.yml.example: add curl_smoke_scrum4me_web and
docker_compose_ps_worker smoke-test commands
- app/api/flows/run/route.ts: Next.js proxy — creates FlowRun/FlowStep
DB records per step, forwards SSE stream to browser
- hooks/useFlowRun.ts: add startFlow(flowKey, dryRun) method; handle
step_start events to display step headers in the terminal
- components/StreamingTerminal.tsx: add 'info' line type (sky-400) for
step headers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
167 lines
5.5 KiB
TypeScript
167 lines
5.5 KiB
TypeScript
'use client'
|
|
|
|
import { useState, useCallback, useRef } from 'react'
|
|
import type { TerminalLine, TerminalStatus } from '@/components/StreamingTerminal'
|
|
|
|
export interface FlowRunState {
|
|
status: TerminalStatus
|
|
flowRunId: string | null
|
|
lines: TerminalLine[]
|
|
exitCode: number | null
|
|
error: string | null
|
|
}
|
|
|
|
export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | null) => void) {
|
|
const [state, setState] = useState<FlowRunState>({
|
|
status: 'idle',
|
|
flowRunId: null,
|
|
lines: [],
|
|
exitCode: null,
|
|
error: null,
|
|
})
|
|
|
|
const abortRef = useRef<AbortController | null>(null)
|
|
|
|
const streamSSE = useCallback(
|
|
async (url: string, body: Record<string, unknown>, signal: AbortSignal) => {
|
|
let response: Response
|
|
try {
|
|
response = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(body),
|
|
signal,
|
|
})
|
|
} catch (err) {
|
|
if ((err as Error).name === 'AbortError') return
|
|
setState((s) => ({
|
|
...s,
|
|
status: 'error',
|
|
error: err instanceof Error ? err.message : 'request failed',
|
|
}))
|
|
return
|
|
}
|
|
|
|
if (!response.ok) {
|
|
const text = await response.text()
|
|
setState((s) => ({
|
|
...s,
|
|
status: 'error',
|
|
error: `${response.status}: ${text}`,
|
|
}))
|
|
return
|
|
}
|
|
|
|
const reader = response.body!.getReader()
|
|
const decoder = new TextDecoder()
|
|
let buffer = ''
|
|
let currentEvent = ''
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
|
|
buffer += decoder.decode(value, { stream: true })
|
|
const rawLines = buffer.split('\n')
|
|
buffer = rawLines.pop() ?? ''
|
|
|
|
for (const line of rawLines) {
|
|
if (line.startsWith('event:')) {
|
|
currentEvent = line.slice(6).trim()
|
|
} else if (line.startsWith('data:')) {
|
|
try {
|
|
const parsed = JSON.parse(line.slice(5).trim()) as Record<string, unknown>
|
|
if (currentEvent === 'flow_run_id') {
|
|
setState((s) => ({ ...s, flowRunId: String(parsed.flow_run_id ?? '') }))
|
|
} else if (currentEvent === 'step_start') {
|
|
const stepIndex = (parsed.step_index as number) + 1
|
|
const totalSteps = parsed.total_steps as number
|
|
const commandKey = String(parsed.command_key ?? '')
|
|
setState((s) => ({
|
|
...s,
|
|
lines: [
|
|
...s.lines,
|
|
{
|
|
type: 'info' as const,
|
|
text: `\n── Step ${stepIndex}/${totalSteps}: ${commandKey} ──\n`,
|
|
},
|
|
],
|
|
}))
|
|
} else if (currentEvent === 'stdout') {
|
|
const text = String(parsed.data ?? '')
|
|
setState((s) => ({
|
|
...s,
|
|
lines: [...s.lines, { type: 'stdout' as const, text }],
|
|
}))
|
|
} else if (currentEvent === 'stderr') {
|
|
const text = String(parsed.data ?? '')
|
|
setState((s) => ({
|
|
...s,
|
|
lines: [...s.lines, { type: 'stderr' as const, text }],
|
|
}))
|
|
} else if (currentEvent === 'done') {
|
|
const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null
|
|
const flowRunId = String(parsed.flow_run_id ?? '')
|
|
setState((s) => ({
|
|
...s,
|
|
status: exitCode === 0 ? 'done' : 'failed',
|
|
exitCode,
|
|
flowRunId,
|
|
}))
|
|
onComplete?.(flowRunId, exitCode)
|
|
} else if (currentEvent === 'error') {
|
|
const message = String(parsed.message ?? 'unknown error')
|
|
setState((s) => ({ ...s, status: 'error', error: message }))
|
|
}
|
|
} catch {
|
|
// ignore malformed SSE data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (err) {
|
|
if ((err as Error).name === 'AbortError') return
|
|
setState((s) => ({
|
|
...s,
|
|
status: 'error',
|
|
error: err instanceof Error ? err.message : 'stream error',
|
|
}))
|
|
}
|
|
},
|
|
[onComplete],
|
|
)
|
|
|
|
const start = useCallback(
|
|
async (commandKey: string, args: string[] = [], stdin?: string) => {
|
|
abortRef.current?.abort()
|
|
const abort = new AbortController()
|
|
abortRef.current = abort
|
|
setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null })
|
|
await streamSSE(
|
|
'/api/flows/start',
|
|
{ command_key: commandKey, args, ...(stdin != null ? { stdin } : {}) },
|
|
abort.signal,
|
|
)
|
|
},
|
|
[streamSSE],
|
|
)
|
|
|
|
const startFlow = useCallback(
|
|
async (flowKey: string, dryRun = false) => {
|
|
abortRef.current?.abort()
|
|
const abort = new AbortController()
|
|
abortRef.current = abort
|
|
setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null })
|
|
await streamSSE('/api/flows/run', { flow_key: flowKey, dry_run: dryRun }, abort.signal)
|
|
},
|
|
[streamSSE],
|
|
)
|
|
|
|
const reset = useCallback(() => {
|
|
abortRef.current?.abort()
|
|
setState({ status: 'idle', flowRunId: null, lines: [], exitCode: null, error: null })
|
|
}, [])
|
|
|
|
return { ...state, start, startFlow, reset }
|
|
}
|