From f99b12ad5cc4a28c7bd8f47a3b0dd47a7e77a712 Mon Sep 17 00:00:00 2001 From: Scrum4Me Agent <30029041+madhura68@users.noreply.github.com> Date: Wed, 13 May 2026 18:00:26 +0200 Subject: [PATCH] feat(hooks): add useFlowRun hook for SSE flow execution --- hooks/useFlowRun.ts | 133 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 hooks/useFlowRun.ts diff --git a/hooks/useFlowRun.ts b/hooks/useFlowRun.ts new file mode 100644 index 0000000..5fd6bd4 --- /dev/null +++ b/hooks/useFlowRun.ts @@ -0,0 +1,133 @@ +'use client' + +import { useState, useCallback, useRef } from 'react' +import type { TerminalLine, TerminalStatus } from '@/components/StreamingTerminal' + +export interface FlowRunState { + status: TerminalStatus + flowRunId: string | null + lines: TerminalLine[] + exitCode: number | null + error: string | null +} + +export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | null) => void) { + const [state, setState] = useState({ + status: 'idle', + flowRunId: null, + lines: [], + exitCode: null, + error: null, + }) + + const abortRef = useRef(null) + + const start = useCallback( + async (commandKey: string, args: string[] = [], stdin?: string) => { + abortRef.current?.abort() + const abort = new AbortController() + abortRef.current = abort + + setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null }) + + let response: Response + try { + response = await fetch('/api/flows/start', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ command_key: commandKey, args, ...(stdin != null ? { stdin } : {}) }), + signal: abort.signal, + }) + } catch (err) { + if ((err as Error).name === 'AbortError') return + setState((s) => ({ + ...s, + status: 'error', + error: err instanceof Error ? err.message : 'request failed', + })) + return + } + + if (!response.ok) { + const text = await response.text() + setState((s) => ({ + ...s, + status: 'error', + error: `${response.status}: ${text}`, + })) + return + } + + const reader = response.body!.getReader() + const decoder = new TextDecoder() + let buffer = '' + let currentEvent = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const rawLines = buffer.split('\n') + buffer = rawLines.pop() ?? '' + + for (const line of rawLines) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim() + } else if (line.startsWith('data:')) { + try { + const parsed = JSON.parse(line.slice(5).trim()) as Record + if (currentEvent === 'flow_run_id') { + setState((s) => ({ ...s, flowRunId: String(parsed.flow_run_id ?? '') })) + } else if (currentEvent === 'stdout') { + const text = String(parsed.data ?? '') + setState((s) => ({ + ...s, + lines: [...s.lines, { type: 'stdout' as const, text }], + })) + } else if (currentEvent === 'stderr') { + const text = String(parsed.data ?? '') + setState((s) => ({ + ...s, + lines: [...s.lines, { type: 'stderr' as const, text }], + })) + } else if (currentEvent === 'done') { + const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null + const flowRunId = String(parsed.flow_run_id ?? '') + setState((s) => ({ + ...s, + status: exitCode === 0 ? 'done' : 'failed', + exitCode, + flowRunId, + })) + onComplete?.(flowRunId, exitCode) + } else if (currentEvent === 'error') { + const message = String(parsed.message ?? 'unknown error') + setState((s) => ({ ...s, status: 'error', error: message })) + } + } catch { + // ignore malformed SSE data + } + } + } + } + } catch (err) { + if ((err as Error).name === 'AbortError') return + setState((s) => ({ + ...s, + status: 'error', + error: err instanceof Error ? err.message : 'stream error', + })) + } + }, + [onComplete], + ) + + const reset = useCallback(() => { + abortRef.current?.abort() + setState({ status: 'idle', flowRunId: null, lines: [], exitCode: null, error: null }) + }, []) + + return { ...state, start, reset } +}