diff --git a/app/api/flows/run/route.ts b/app/api/flows/run/route.ts new file mode 100644 index 0000000..1c6c2f4 --- /dev/null +++ b/app/api/flows/run/route.ts @@ -0,0 +1,197 @@ +import { NextRequest } from 'next/server' +import { getCurrentUser } from '@/lib/session' +import { prisma } from '@/lib/prisma' +import { FlowStatus } from '@prisma/client' + +export const dynamic = 'force-dynamic' + +const AGENT_URL = process.env.OPS_AGENT_URL ?? 'http://127.0.0.1:3099' +const AGENT_SECRET = process.env.OPS_AGENT_SECRET ?? '' + +const TRUNCATE_BYTES = 64 * 1024 +const truncate = (s: string) => (s.length > TRUNCATE_BYTES ? s.slice(-TRUNCATE_BYTES) : s) + +export async function POST(request: NextRequest) { + const user = await getCurrentUser() + if (!user) { + return Response.json({ error: 'unauthorized' }, { status: 401 }) + } + + let body: { flow_key?: string; dry_run?: boolean } + try { + body = await request.json() + } catch { + return Response.json({ error: 'invalid JSON body' }, { status: 400 }) + } + + const { flow_key, dry_run = false } = body + if (!flow_key) { + return Response.json({ error: 'flow_key required' }, { status: 400 }) + } + + const flowRun = await prisma.flowRun.create({ + data: { + user_id: user.id, + flow_key, + status: FlowStatus.running, + dry_run, + }, + }) + + const encoder = new TextEncoder() + + const stream = new ReadableStream({ + async start(controller) { + const enqueue = (event: string, data: unknown) => { + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`), + ) + } + + enqueue('flow_run_id', { flow_run_id: flowRun.id }) + + let agentResponse: Response + try { + agentResponse = await fetch(`${AGENT_URL}/agent/v1/flow`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${AGENT_SECRET}`, + }, + body: JSON.stringify({ flow_key, dry_run }), + }) + } catch (err) { + const message = err instanceof Error ? err.message : 'agent unreachable' + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message }) + controller.close() + return + } + + if (!agentResponse.ok) { + const text = await agentResponse.text() + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message: `agent ${agentResponse.status}: ${text}` }) + controller.close() + return + } + + const reader = agentResponse.body!.getReader() + const decoder = new TextDecoder() + let buffer = '' + let currentEvent = '' + + // Per-step accumulators for DB writes + const stepRecordIds = new Map() + const stepStdout = new Map() + const stepStderr = new Map() + let currentStepIndex = -1 + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim() + } else if (line.startsWith('data:')) { + try { + const parsed = JSON.parse(line.slice(5).trim()) as Record + + if (currentEvent === 'step_start') { + const stepIndex = parsed.step_index as number + currentStepIndex = stepIndex + const command_key = String(parsed.command_key ?? '') + const args = Array.isArray(parsed.args) ? (parsed.args as string[]) : [] + const flowStep = await prisma.flowStep.create({ + data: { + flow_run_id: flowRun.id, + step_index: stepIndex, + command_key, + args_json: JSON.stringify(args), + }, + }) + stepRecordIds.set(stepIndex, flowStep.id) + stepStdout.set(stepIndex, '') + stepStderr.set(stepIndex, '') + enqueue('step_start', parsed) + } else if (currentEvent === 'stdout') { + const chunk = String(parsed.data ?? '') + if (currentStepIndex >= 0) { + stepStdout.set(currentStepIndex, (stepStdout.get(currentStepIndex) ?? '') + chunk) + } + enqueue('stdout', { data: chunk }) + } else if (currentEvent === 'stderr') { + const chunk = String(parsed.data ?? '') + if (currentStepIndex >= 0) { + stepStderr.set(currentStepIndex, (stepStderr.get(currentStepIndex) ?? '') + chunk) + } + enqueue('stderr', { data: chunk }) + } else if (currentEvent === 'step_done') { + const stepIndex = parsed.step_index as number + const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null + const stepId = stepRecordIds.get(stepIndex) + if (stepId) { + await prisma.flowStep.update({ + where: { id: stepId }, + data: { + exit_code: exitCode, + ended_at: new Date(), + stdout: truncate(stepStdout.get(stepIndex) ?? ''), + stderr: truncate(stepStderr.get(stepIndex) ?? ''), + }, + }) + } + enqueue('step_done', parsed) + } else if (currentEvent === 'done') { + const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { + status: exitCode === 0 ? FlowStatus.success : FlowStatus.failed, + exit_code: exitCode, + ended_at: new Date(), + }, + }) + enqueue('done', { flow_run_id: flowRun.id, exit_code: exitCode }) + } else if (currentEvent === 'error') { + const message = String(parsed.message ?? 'unknown error') + await prisma.flowRun.update({ + where: { id: flowRun.id }, + data: { status: FlowStatus.failed, ended_at: new Date() }, + }) + enqueue('error', { message }) + } + } catch { + // ignore malformed SSE data + } + } + } + } + } catch { + // stream ended unexpectedly + } + + controller.close() + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }) +} diff --git a/components/StreamingTerminal.tsx b/components/StreamingTerminal.tsx index 856806e..3902dbe 100644 --- a/components/StreamingTerminal.tsx +++ b/components/StreamingTerminal.tsx @@ -3,7 +3,7 @@ import { useEffect, useRef } from 'react' export type TerminalLine = { - type: 'stdout' | 'stderr' + type: 'stdout' | 'stderr' | 'info' text: string } @@ -75,7 +75,11 @@ export default function StreamingTerminal({ lines, status, error, className = '' key={i} className={ 'whitespace-pre-wrap break-all leading-5 ' + - (line.type === 'stderr' ? 'text-red-400' : 'text-zinc-100') + (line.type === 'stderr' + ? 'text-red-400' + : line.type === 'info' + ? 'text-sky-400' + : 'text-zinc-100') } > {line.text} diff --git a/hooks/useFlowRun.ts b/hooks/useFlowRun.ts index 5fd6bd4..238fba3 100644 --- a/hooks/useFlowRun.ts +++ b/hooks/useFlowRun.ts @@ -22,21 +22,15 @@ export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | n const abortRef = useRef(null) - const start = useCallback( - async (commandKey: string, args: string[] = [], stdin?: string) => { - abortRef.current?.abort() - const abort = new AbortController() - abortRef.current = abort - - setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null }) - + const streamSSE = useCallback( + async (url: string, body: Record, signal: AbortSignal) => { let response: Response try { - response = await fetch('/api/flows/start', { + response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ command_key: commandKey, args, ...(stdin != null ? { stdin } : {}) }), - signal: abort.signal, + body: JSON.stringify(body), + signal, }) } catch (err) { if ((err as Error).name === 'AbortError') return @@ -80,6 +74,20 @@ export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | n const parsed = JSON.parse(line.slice(5).trim()) as Record if (currentEvent === 'flow_run_id') { setState((s) => ({ ...s, flowRunId: String(parsed.flow_run_id ?? '') })) + } else if (currentEvent === 'step_start') { + const stepIndex = (parsed.step_index as number) + 1 + const totalSteps = parsed.total_steps as number + const commandKey = String(parsed.command_key ?? '') + setState((s) => ({ + ...s, + lines: [ + ...s.lines, + { + type: 'info' as const, + text: `\n── Step ${stepIndex}/${totalSteps}: ${commandKey} ──\n`, + }, + ], + })) } else if (currentEvent === 'stdout') { const text = String(parsed.data ?? '') setState((s) => ({ @@ -124,10 +132,36 @@ export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | n [onComplete], ) + const start = useCallback( + async (commandKey: string, args: string[] = [], stdin?: string) => { + abortRef.current?.abort() + const abort = new AbortController() + abortRef.current = abort + setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null }) + await streamSSE( + '/api/flows/start', + { command_key: commandKey, args, ...(stdin != null ? { stdin } : {}) }, + abort.signal, + ) + }, + [streamSSE], + ) + + const startFlow = useCallback( + async (flowKey: string, dryRun = false) => { + abortRef.current?.abort() + const abort = new AbortController() + abortRef.current = abort + setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null }) + await streamSSE('/api/flows/run', { flow_key: flowKey, dry_run: dryRun }, abort.signal) + }, + [streamSSE], + ) + const reset = useCallback(() => { abortRef.current?.abort() setState({ status: 'idle', flowRunId: null, lines: [], exitCode: null, error: null }) }, []) - return { ...state, start, reset } + return { ...state, start, startFlow, reset } } diff --git a/ops-agent/commands.yml.example b/ops-agent/commands.yml.example index 8ad2c02..e4c5197 100644 --- a/ops-agent/commands.yml.example +++ b/ops-agent/commands.yml.example @@ -151,3 +151,14 @@ commands: - "cat > /srv/scrum4me/caddy/Caddyfile.new && mv /srv/scrum4me/caddy/Caddyfile.new /srv/scrum4me/caddy/Caddyfile" stdin_from_body: true description: "Atomically replace /srv/scrum4me/caddy/Caddyfile (write stdin to .new, then mv)" + + # ── Smoke tests / health checks ─────────────────────────────────────────── + + curl_smoke_scrum4me_web: + cmd: ["curl", "-sf", "--max-time", "10", "https://scrum4me.com"] + description: "HTTP smoke test — fails (non-zero) if the site is unreachable or returns a non-2xx status" + + docker_compose_ps_worker: + cmd: ["docker", "compose", "ps", "--filter", "status=running", "worker-idea"] + cwd: "/srv/scrum4me/compose" + description: "Verify worker-idea container is in the running state" diff --git a/ops-agent/flows.example/update_caddy_config.yml b/ops-agent/flows.example/update_caddy_config.yml new file mode 100644 index 0000000..7c5cee9 --- /dev/null +++ b/ops-agent/flows.example/update_caddy_config.yml @@ -0,0 +1,22 @@ +# Reload Caddy after a config change. +# Copy to /etc/ops-agent/flows/update_caddy_config.yml on the host. +# +# Assumes the new Caddyfile is already written to /srv/scrum4me/caddy/Caddyfile +# (e.g. via the caddy_write_config command from the Ops Dashboard editor). +# +# Steps: +# 1. Validate the Caddyfile +# 2. Reload Caddy (zero-downtime config swap) +# 3. Smoke-test HTTPS connectivity + +name: Update Caddy Config +description: Validate and reload the Caddy configuration +steps: + - command_key: caddy_validate + on_failure: abort + + - command_key: caddy_reload + on_failure: abort + + - command_key: curl_smoke_scrum4me_web + on_failure: continue diff --git a/ops-agent/flows.example/update_mcp_worker.yml b/ops-agent/flows.example/update_mcp_worker.yml new file mode 100644 index 0000000..ef87339 --- /dev/null +++ b/ops-agent/flows.example/update_mcp_worker.yml @@ -0,0 +1,31 @@ +# Deploy the latest MCP worker image. +# Copy to /etc/ops-agent/flows/update_mcp_worker.yml on the host. +# +# Steps: +# 1. Fetch remote refs +# 2. Fast-forward pull (aborts if working tree is dirty) +# 3. Rebuild the Docker image +# 4. Recreate the container in detached mode +# 5. Verify the container is running + +name: Update MCP Worker +description: Pull latest code, rebuild Docker image, and restart the MCP worker service +steps: + - command_key: git_fetch + args: ["/srv/scrum4me"] + on_failure: abort + + - command_key: git_pull + args: ["/srv/scrum4me"] + on_failure: abort + + - command_key: docker_compose_build + args: ["worker-idea"] + on_failure: abort + + - command_key: docker_compose_up + args: ["worker-idea"] + on_failure: abort + + - command_key: docker_compose_ps_worker + on_failure: continue diff --git a/ops-agent/flows.example/update_scrum4me_web.yml b/ops-agent/flows.example/update_scrum4me_web.yml new file mode 100644 index 0000000..34a36fd --- /dev/null +++ b/ops-agent/flows.example/update_scrum4me_web.yml @@ -0,0 +1,31 @@ +# Deploy the latest Scrum4Me web image. +# Copy to /etc/ops-agent/flows/update_scrum4me_web.yml on the host. +# +# Steps: +# 1. Fetch remote refs +# 2. Fast-forward pull (aborts if working tree is dirty) +# 3. Rebuild the Docker image +# 4. Recreate the container in detached mode +# 5. Smoke-test the public endpoint + +name: Update Scrum4Me Web +description: Pull latest code, rebuild Docker image, and restart the Scrum4Me web service +steps: + - command_key: git_fetch + args: ["/srv/scrum4me"] + on_failure: abort + + - command_key: git_pull + args: ["/srv/scrum4me"] + on_failure: abort + + - command_key: docker_compose_build + args: ["scrum4me-web"] + on_failure: abort + + - command_key: docker_compose_up + args: ["scrum4me-web"] + on_failure: abort + + - command_key: curl_smoke_scrum4me_web + on_failure: continue diff --git a/ops-agent/src/index.ts b/ops-agent/src/index.ts index 73cf770..a0a88ef 100644 --- a/ops-agent/src/index.ts +++ b/ops-agent/src/index.ts @@ -4,8 +4,10 @@ import { loadWhitelist } from './whitelist.js'; import { loadSecret, authHook } from './auth.js'; import { healthRoutes } from './routes/health.js'; import { execRoutes } from './routes/exec.js'; +import { makeFlowRoutes } from './routes/flow.js'; const WHITELIST_PATH = process.env.OPS_AGENT_WHITELIST_PATH ?? '/etc/ops-agent/commands.yml'; +const FLOWS_PATH = process.env.OPS_AGENT_FLOWS_PATH ?? '/etc/ops-agent/flows'; const PORT = parseInt(process.env.OPS_AGENT_PORT ?? '3099', 10); const HOST = process.env.OPS_AGENT_HOST ?? '127.0.0.1'; @@ -21,6 +23,7 @@ async function main() { await app.register(healthRoutes); await app.register(execRoutes); + await app.register(makeFlowRoutes(FLOWS_PATH)); await app.listen({ port: PORT, host: HOST }); console.log(`ops-agent listening on ${HOST}:${PORT}`); diff --git a/ops-agent/src/lib/flow-runner.ts b/ops-agent/src/lib/flow-runner.ts new file mode 100644 index 0000000..2d3e584 --- /dev/null +++ b/ops-agent/src/lib/flow-runner.ts @@ -0,0 +1,168 @@ +import fs from 'fs'; +import path from 'path'; +import { spawn } from 'child_process'; +import yaml from 'js-yaml'; +import { getCommand, validateArgs, validateCwd } from '../whitelist.js'; + +export interface FlowStepDef { + command_key: string; + args?: string[]; + on_failure?: 'abort' | 'continue'; +} + +export interface FlowDef { + name: string; + description?: string; + steps: FlowStepDef[]; +} + +export type SendEvent = (event: string, data: unknown) => void; + +export function loadFlow(flowsDir: string, flowKey: string): FlowDef { + const filePath = path.join(flowsDir, `${flowKey}.yml`); + if (!fs.existsSync(filePath)) { + throw new Error(`flow '${flowKey}' not found`); + } + const raw = fs.readFileSync(filePath, 'utf8'); + const parsed = yaml.load(raw) as FlowDef; + if (!parsed?.steps || !Array.isArray(parsed.steps) || parsed.steps.length === 0) { + throw new Error(`flow '${flowKey}' has no steps`); + } + return parsed; +} + +export function listFlowKeys(flowsDir: string): string[] { + if (!fs.existsSync(flowsDir)) return []; + return fs + .readdirSync(flowsDir) + .filter((f) => f.endsWith('.yml')) + .map((f) => f.slice(0, -4)); +} + +/** + * Runs a named flow from flowsDir, emitting SSE-style events via sendEvent. + * Returns the final exit code (0 = success). + */ +export async function runFlow( + flowsDir: string, + flowKey: string, + dryRun: boolean, + sendEvent: SendEvent, +): Promise { + let flow: FlowDef; + try { + flow = loadFlow(flowsDir, flowKey); + } catch (err) { + sendEvent('error', { message: (err as Error).message }); + return 1; + } + + const totalSteps = flow.steps.length; + + for (let i = 0; i < totalSteps; i++) { + const step = flow.steps[i]; + const { command_key, args = [], on_failure = 'abort' } = step; + + const def = getCommand(command_key); + if (!def) { + sendEvent('error', { + message: `step ${i}: command_key '${command_key}' is not in the whitelist`, + }); + return 1; + } + + const cwdError = validateCwd(def, args); + if (cwdError) { + sendEvent('error', { message: `step ${i}: ${cwdError}` }); + return 1; + } + + const argError = validateArgs(def, args); + if (argError) { + sendEvent('error', { message: `step ${i}: ${argError}` }); + return 1; + } + + sendEvent('step_start', { + step_index: i, + total_steps: totalSteps, + command_key, + args, + }); + + const cwd = def.cwd_pattern ? args[0] : def.cwd; + const [bin, ...staticArgs] = def.cmd; + const effectiveArgs = def.cwd_pattern ? args.slice(1) : args; + + if (dryRun) { + const fullCmd = [...def.cmd, ...effectiveArgs].join(' '); + const cwdNote = cwd ? ` (cwd: ${cwd})` : ''; + sendEvent('stdout', { data: `WOULD RUN: ${fullCmd}${cwdNote}\n` }); + sendEvent('step_done', { step_index: i, exit_code: 0 }); + continue; + } + + // Check preconditions before executing + if (def.preconditions) { + for (const pre of def.preconditions) { + if (pre === 'git_status_clean') { + const clean = await checkGitStatusClean(cwd); + if (!clean) { + sendEvent('stderr', { + data: `precondition 'git_status_clean' failed: working tree is not clean\n`, + }); + sendEvent('step_done', { step_index: i, exit_code: 1 }); + sendEvent('done', { exit_code: 1 }); + return 1; + } + } + } + } + + const exitCode = await spawnStep(bin, [...staticArgs, ...effectiveArgs], cwd, sendEvent); + sendEvent('step_done', { step_index: i, exit_code: exitCode }); + + if (exitCode !== 0 && on_failure === 'abort') { + sendEvent('done', { exit_code: exitCode }); + return exitCode; + } + } + + sendEvent('done', { exit_code: 0 }); + return 0; +} + +function spawnStep( + bin: string, + args: string[], + cwd: string | undefined, + sendEvent: SendEvent, +): Promise { + return new Promise((resolve) => { + const child = spawn(bin, args, { shell: false, cwd }); + + child.stdout.on('data', (chunk: Buffer) => { + sendEvent('stdout', { data: chunk.toString() }); + }); + child.stderr.on('data', (chunk: Buffer) => { + sendEvent('stderr', { data: chunk.toString() }); + }); + child.on('close', (code) => resolve(code ?? 1)); + child.on('error', (err) => { + sendEvent('stderr', { data: `spawn error: ${err.message}\n` }); + resolve(1); + }); + }); +} + +function checkGitStatusClean(cwd: string | undefined): Promise { + return new Promise((resolve) => { + const child = spawn('git', ['status', '--porcelain'], { shell: false, cwd }); + let output = ''; + child.stdout.on('data', (chunk: Buffer) => { + output += chunk.toString(); + }); + child.on('close', () => resolve(output.trim() === '')); + child.on('error', () => resolve(false)); + }); +} diff --git a/ops-agent/src/routes/flow.ts b/ops-agent/src/routes/flow.ts new file mode 100644 index 0000000..7896795 --- /dev/null +++ b/ops-agent/src/routes/flow.ts @@ -0,0 +1,37 @@ +import { FastifyInstance, FastifyRequest } from 'fastify'; +import { runFlow } from '../lib/flow-runner.js'; + +interface FlowBody { + flow_key: string; + dry_run?: boolean; +} + +export function makeFlowRoutes(flowsDir: string) { + return async function flowRoutes(app: FastifyInstance): Promise { + app.post('/agent/v1/flow', async (req: FastifyRequest<{ Body: FlowBody }>, reply) => { + const { flow_key, dry_run = false } = req.body ?? {}; + + if (!flow_key) { + return reply.status(400).send({ error: 'flow_key required' }); + } + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + const sendEvent = (event: string, data: unknown) => { + reply.raw.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + }; + + req.raw.on('close', () => { + // client disconnected — runFlow will still complete the current step + // but we won't write any more events after the socket closes + }); + + await runFlow(flowsDir, flow_key, dry_run, sendEvent); + reply.raw.end(); + }); + }; +}