feat(hooks): add useFlowRun hook for SSE flow execution
This commit is contained in:
parent
394e8cdde3
commit
f99b12ad5c
1 changed files with 133 additions and 0 deletions
133
hooks/useFlowRun.ts
Normal file
133
hooks/useFlowRun.ts
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
'use client'
|
||||
|
||||
import { useState, useCallback, useRef } from 'react'
|
||||
import type { TerminalLine, TerminalStatus } from '@/components/StreamingTerminal'
|
||||
|
||||
export interface FlowRunState {
|
||||
status: TerminalStatus
|
||||
flowRunId: string | null
|
||||
lines: TerminalLine[]
|
||||
exitCode: number | null
|
||||
error: string | null
|
||||
}
|
||||
|
||||
export function useFlowRun(onComplete?: (flowRunId: string, exitCode: number | null) => void) {
|
||||
const [state, setState] = useState<FlowRunState>({
|
||||
status: 'idle',
|
||||
flowRunId: null,
|
||||
lines: [],
|
||||
exitCode: null,
|
||||
error: null,
|
||||
})
|
||||
|
||||
const abortRef = useRef<AbortController | null>(null)
|
||||
|
||||
const start = useCallback(
|
||||
async (commandKey: string, args: string[] = [], stdin?: string) => {
|
||||
abortRef.current?.abort()
|
||||
const abort = new AbortController()
|
||||
abortRef.current = abort
|
||||
|
||||
setState({ status: 'running', flowRunId: null, lines: [], exitCode: null, error: null })
|
||||
|
||||
let response: Response
|
||||
try {
|
||||
response = await fetch('/api/flows/start', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ command_key: commandKey, args, ...(stdin != null ? { stdin } : {}) }),
|
||||
signal: abort.signal,
|
||||
})
|
||||
} catch (err) {
|
||||
if ((err as Error).name === 'AbortError') return
|
||||
setState((s) => ({
|
||||
...s,
|
||||
status: 'error',
|
||||
error: err instanceof Error ? err.message : 'request failed',
|
||||
}))
|
||||
return
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
const text = await response.text()
|
||||
setState((s) => ({
|
||||
...s,
|
||||
status: 'error',
|
||||
error: `${response.status}: ${text}`,
|
||||
}))
|
||||
return
|
||||
}
|
||||
|
||||
const reader = response.body!.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
let currentEvent = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
const rawLines = buffer.split('\n')
|
||||
buffer = rawLines.pop() ?? ''
|
||||
|
||||
for (const line of rawLines) {
|
||||
if (line.startsWith('event:')) {
|
||||
currentEvent = line.slice(6).trim()
|
||||
} else if (line.startsWith('data:')) {
|
||||
try {
|
||||
const parsed = JSON.parse(line.slice(5).trim()) as Record<string, unknown>
|
||||
if (currentEvent === 'flow_run_id') {
|
||||
setState((s) => ({ ...s, flowRunId: String(parsed.flow_run_id ?? '') }))
|
||||
} else if (currentEvent === 'stdout') {
|
||||
const text = String(parsed.data ?? '')
|
||||
setState((s) => ({
|
||||
...s,
|
||||
lines: [...s.lines, { type: 'stdout' as const, text }],
|
||||
}))
|
||||
} else if (currentEvent === 'stderr') {
|
||||
const text = String(parsed.data ?? '')
|
||||
setState((s) => ({
|
||||
...s,
|
||||
lines: [...s.lines, { type: 'stderr' as const, text }],
|
||||
}))
|
||||
} else if (currentEvent === 'done') {
|
||||
const exitCode = typeof parsed.exit_code === 'number' ? parsed.exit_code : null
|
||||
const flowRunId = String(parsed.flow_run_id ?? '')
|
||||
setState((s) => ({
|
||||
...s,
|
||||
status: exitCode === 0 ? 'done' : 'failed',
|
||||
exitCode,
|
||||
flowRunId,
|
||||
}))
|
||||
onComplete?.(flowRunId, exitCode)
|
||||
} else if (currentEvent === 'error') {
|
||||
const message = String(parsed.message ?? 'unknown error')
|
||||
setState((s) => ({ ...s, status: 'error', error: message }))
|
||||
}
|
||||
} catch {
|
||||
// ignore malformed SSE data
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if ((err as Error).name === 'AbortError') return
|
||||
setState((s) => ({
|
||||
...s,
|
||||
status: 'error',
|
||||
error: err instanceof Error ? err.message : 'stream error',
|
||||
}))
|
||||
}
|
||||
},
|
||||
[onComplete],
|
||||
)
|
||||
|
||||
const reset = useCallback(() => {
|
||||
abortRef.current?.abort()
|
||||
setState({ status: 'idle', flowRunId: null, lines: [], exitCode: null, error: null })
|
||||
}, [])
|
||||
|
||||
return { ...state, start, reset }
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue