Ops-dashboard/ops-agent/src/lib/flow-runner.ts
Scrum4Me Agent bdc24b57ba feat(flows): add YAML flow format, flow-runner, and /agent/v1/flow endpoint
- ops-agent/src/lib/flow-runner.ts: loads YAML flows, validates all steps
  against the command whitelist, executes sequentially; supports dry_run
  (emits WOULD RUN lines) and on_failure: abort|continue per step
- ops-agent/src/routes/flow.ts: POST /agent/v1/flow { flow_key, dry_run }
  streams step_start/stdout/stderr/step_done/done SSE events
- ops-agent/src/index.ts: register flow route, add FLOWS_PATH env var
- ops-agent/flows.example/: three flow definitions — update_scrum4me_web,
  update_mcp_worker, update_caddy_config; deploy to /etc/ops-agent/flows/
- ops-agent/commands.yml.example: add curl_smoke_scrum4me_web and
  docker_compose_ps_worker smoke-test commands
- app/api/flows/run/route.ts: Next.js proxy — creates FlowRun/FlowStep
  DB records per step, forwards SSE stream to browser
- hooks/useFlowRun.ts: add startFlow(flowKey, dryRun) method; handle
  step_start events to display step headers in the terminal
- components/StreamingTerminal.tsx: add 'info' line type (sky-400) for
  step headers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 19:22:34 +02:00

168 lines
4.7 KiB
TypeScript

import fs from 'fs';
import path from 'path';
import { spawn } from 'child_process';
import yaml from 'js-yaml';
import { getCommand, validateArgs, validateCwd } from '../whitelist.js';
export interface FlowStepDef {
command_key: string;
args?: string[];
on_failure?: 'abort' | 'continue';
}
export interface FlowDef {
name: string;
description?: string;
steps: FlowStepDef[];
}
export type SendEvent = (event: string, data: unknown) => void;
export function loadFlow(flowsDir: string, flowKey: string): FlowDef {
const filePath = path.join(flowsDir, `${flowKey}.yml`);
if (!fs.existsSync(filePath)) {
throw new Error(`flow '${flowKey}' not found`);
}
const raw = fs.readFileSync(filePath, 'utf8');
const parsed = yaml.load(raw) as FlowDef;
if (!parsed?.steps || !Array.isArray(parsed.steps) || parsed.steps.length === 0) {
throw new Error(`flow '${flowKey}' has no steps`);
}
return parsed;
}
export function listFlowKeys(flowsDir: string): string[] {
if (!fs.existsSync(flowsDir)) return [];
return fs
.readdirSync(flowsDir)
.filter((f) => f.endsWith('.yml'))
.map((f) => f.slice(0, -4));
}
/**
* Runs a named flow from flowsDir, emitting SSE-style events via sendEvent.
* Returns the final exit code (0 = success).
*/
export async function runFlow(
flowsDir: string,
flowKey: string,
dryRun: boolean,
sendEvent: SendEvent,
): Promise<number> {
let flow: FlowDef;
try {
flow = loadFlow(flowsDir, flowKey);
} catch (err) {
sendEvent('error', { message: (err as Error).message });
return 1;
}
const totalSteps = flow.steps.length;
for (let i = 0; i < totalSteps; i++) {
const step = flow.steps[i];
const { command_key, args = [], on_failure = 'abort' } = step;
const def = getCommand(command_key);
if (!def) {
sendEvent('error', {
message: `step ${i}: command_key '${command_key}' is not in the whitelist`,
});
return 1;
}
const cwdError = validateCwd(def, args);
if (cwdError) {
sendEvent('error', { message: `step ${i}: ${cwdError}` });
return 1;
}
const argError = validateArgs(def, args);
if (argError) {
sendEvent('error', { message: `step ${i}: ${argError}` });
return 1;
}
sendEvent('step_start', {
step_index: i,
total_steps: totalSteps,
command_key,
args,
});
const cwd = def.cwd_pattern ? args[0] : def.cwd;
const [bin, ...staticArgs] = def.cmd;
const effectiveArgs = def.cwd_pattern ? args.slice(1) : args;
if (dryRun) {
const fullCmd = [...def.cmd, ...effectiveArgs].join(' ');
const cwdNote = cwd ? ` (cwd: ${cwd})` : '';
sendEvent('stdout', { data: `WOULD RUN: ${fullCmd}${cwdNote}\n` });
sendEvent('step_done', { step_index: i, exit_code: 0 });
continue;
}
// Check preconditions before executing
if (def.preconditions) {
for (const pre of def.preconditions) {
if (pre === 'git_status_clean') {
const clean = await checkGitStatusClean(cwd);
if (!clean) {
sendEvent('stderr', {
data: `precondition 'git_status_clean' failed: working tree is not clean\n`,
});
sendEvent('step_done', { step_index: i, exit_code: 1 });
sendEvent('done', { exit_code: 1 });
return 1;
}
}
}
}
const exitCode = await spawnStep(bin, [...staticArgs, ...effectiveArgs], cwd, sendEvent);
sendEvent('step_done', { step_index: i, exit_code: exitCode });
if (exitCode !== 0 && on_failure === 'abort') {
sendEvent('done', { exit_code: exitCode });
return exitCode;
}
}
sendEvent('done', { exit_code: 0 });
return 0;
}
function spawnStep(
bin: string,
args: string[],
cwd: string | undefined,
sendEvent: SendEvent,
): Promise<number> {
return new Promise((resolve) => {
const child = spawn(bin, args, { shell: false, cwd });
child.stdout.on('data', (chunk: Buffer) => {
sendEvent('stdout', { data: chunk.toString() });
});
child.stderr.on('data', (chunk: Buffer) => {
sendEvent('stderr', { data: chunk.toString() });
});
child.on('close', (code) => resolve(code ?? 1));
child.on('error', (err) => {
sendEvent('stderr', { data: `spawn error: ${err.message}\n` });
resolve(1);
});
});
}
function checkGitStatusClean(cwd: string | undefined): Promise<boolean> {
return new Promise((resolve) => {
const child = spawn('git', ['status', '--porcelain'], { shell: false, cwd });
let output = '';
child.stdout.on('data', (chunk: Buffer) => {
output += chunk.toString();
});
child.on('close', () => resolve(output.trim() === ''));
child.on('error', () => resolve(false));
});
}