diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..c27299b --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,15 @@ +{ + "hooks": { + "PostToolUse": [ + { + "matcher": "mcp__scrum4me__update_job_status", + "hooks": [ + { + "type": "command", + "command": "tsx \"${SCRUM4ME_MCP_DIR:-$CLAUDE_PROJECT_DIR}/scripts/persist-job-usage.ts\"" + } + ] + } + ] + } +} diff --git a/CLAUDE.md b/CLAUDE.md index 610eb21..7d53eb6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -46,6 +46,23 @@ Or add to `~/.scrum4me-agent-config.json`: If no repo root is found, `wait_for_job` rolls the claim back to QUEUED and returns an error. +## Token-usage capture (PostToolUse hook) + +`update_job_status` accepts optional fields `model_id`, `input_tokens`, `output_tokens`, `cache_read_tokens`, `cache_write_tokens`. The agent never has to pass them — `scripts/persist-job-usage.ts` runs as a PostToolUse hook, reads the local Claude Code transcript JSONL (no Anthropic API needed), sums per-job usage, and writes directly to `claude_jobs` via Prisma. Window detection: from the most-recent `wait_for_job` tool_use to EOF. + +The hook is registered in `.claude/settings.json` of this repo. **For agent-worker mode** (Claude Code running with cwd inside a product worktree, not scrum4me-mcp), copy the same hook block into your user settings (`~/.claude/settings.json`) and set `SCRUM4ME_MCP_DIR` so the script resolves regardless of cwd: + +```bash +export SCRUM4ME_MCP_DIR=/absolute/path/to/scrum4me-mcp +``` + +Pricing rows (`model_prices`) are seeded by Scrum4Me's `prisma/seed.ts`. Unknown `model_id`s leave `cost_usd = NULL` in Insights queries — add a row and re-run `npm run seed` to fill them in. + +Robustness notes: +- Subagent (`isSidechain: true`) lines in the main JSONL are skipped to avoid double-counting against `subagents/`-subdirectory transcripts. +- Lines are deduplicated on `uuid` because branching/resumption can rewrite the same message into multiple JSONLs. +- Known Claude Code bug: auto-updates can silently delete files under `~/.claude/projects/`. If you depend on these numbers for billing/reporting, persist `claude_jobs.input_tokens` etc. immediately on `update_job_status` (already what this hook does) and consider an external backup of `~/.claude/projects/` if you want to retain historical detail. + ## Manual worktree cleanup Run `cleanup_my_worktrees` (no arguments) to scan `~/.scrum4me-agent-worktrees/` and remove worktrees for jobs that are in a terminal state (DONE, FAILED, CANCELLED). Worktrees for active jobs (QUEUED, CLAIMED, RUNNING) are left untouched. Returns `{ removed, kept, skipped }`. diff --git a/__tests__/scripts/persist-job-usage.test.ts b/__tests__/scripts/persist-job-usage.test.ts new file mode 100644 index 0000000..7611de1 --- /dev/null +++ b/__tests__/scripts/persist-job-usage.test.ts @@ -0,0 +1,287 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { mkdtempSync, writeFileSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +vi.mock('../../src/prisma.js', () => ({ + prisma: { + claudeJob: { update: vi.fn() }, + }, +})) + +import { prisma } from '../../src/prisma.js' +import { + parseTranscript, + computeUsageFromTranscript, + normalizeModelId, + persistJobUsage, +} from '../../scripts/persist-job-usage.js' + +const mockUpdate = (prisma as unknown as { claudeJob: { update: ReturnType } }) + .claudeJob.update + +beforeEach(() => { + mockUpdate.mockReset() +}) + +function assistantLine(opts: { + model?: string + usage?: { + input_tokens?: number + output_tokens?: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + } + toolUseName?: string + isSidechain?: boolean + uuid?: string +}) { + const content: Array<{ type: string; name?: string }> = [] + if (opts.toolUseName) content.push({ type: 'tool_use', name: opts.toolUseName }) + return JSON.stringify({ + type: 'assistant', + uuid: opts.uuid, + isSidechain: opts.isSidechain ?? false, + message: { + role: 'assistant', + model: opts.model ?? 'claude-sonnet-4-6', + content, + usage: opts.usage, + }, + }) +} + +describe('normalizeModelId', () => { + it('strips bracket suffix', () => { + expect(normalizeModelId('claude-opus-4-7[1m]')).toBe('claude-opus-4-7-1m') + }) + + it('passes through plain ids', () => { + expect(normalizeModelId('claude-sonnet-4-6')).toBe('claude-sonnet-4-6') + }) +}) + +describe('parseTranscript', () => { + it('skips malformed lines', () => { + const raw = `${assistantLine({})}\nnot-json\n${assistantLine({})}\n` + expect(parseTranscript(raw)).toHaveLength(2) + }) + + it('handles trailing newline + empty lines', () => { + expect(parseTranscript('\n\n')).toEqual([]) + }) + + it('dedups on uuid (branching/resumption)', () => { + const a = assistantLine({ uuid: 'u1', usage: { input_tokens: 5, output_tokens: 5 } }) + const b = assistantLine({ uuid: 'u1', usage: { input_tokens: 99, output_tokens: 99 } }) + const c = assistantLine({ uuid: 'u2', usage: { input_tokens: 1, output_tokens: 1 } }) + const lines = parseTranscript([a, b, c].join('\n')) + expect(lines).toHaveLength(2) + expect(lines[0].uuid).toBe('u1') + expect(lines[1].uuid).toBe('u2') + }) +}) + +describe('computeUsageFromTranscript', () => { + it('sums assistant usage after wait_for_job marker', () => { + const lines = parseTranscript( + [ + assistantLine({ + toolUseName: 'mcp__scrum4me__wait_for_job', + usage: { input_tokens: 999, output_tokens: 999 }, + }), + assistantLine({ + usage: { + input_tokens: 10, + output_tokens: 20, + cache_creation_input_tokens: 30, + cache_read_input_tokens: 40, + }, + }), + assistantLine({ + usage: { + input_tokens: 1, + output_tokens: 2, + cache_creation_input_tokens: 3, + cache_read_input_tokens: 4, + }, + }), + assistantLine({ toolUseName: 'mcp__scrum4me__update_job_status' }), + ].join('\n'), + ) + + const usage = computeUsageFromTranscript(lines) + expect(usage.input_tokens).toBe(11) + expect(usage.output_tokens).toBe(22) + expect(usage.cache_write_tokens).toBe(33) + expect(usage.cache_read_tokens).toBe(44) + expect(usage.model_id).toBe('claude-sonnet-4-6') + }) + + it('sums whole session when no wait_for_job marker', () => { + const lines = parseTranscript( + [ + assistantLine({ usage: { input_tokens: 5, output_tokens: 6 } }), + assistantLine({ usage: { input_tokens: 7, output_tokens: 8 } }), + ].join('\n'), + ) + + const usage = computeUsageFromTranscript(lines) + expect(usage.input_tokens).toBe(12) + expect(usage.output_tokens).toBe(14) + }) + + it('ignores non-assistant lines', () => { + const userLine = JSON.stringify({ + type: 'user', + message: { role: 'user', content: [] }, + }) + const lines = parseTranscript( + [ + assistantLine({ toolUseName: 'mcp__scrum4me__wait_for_job' }), + userLine, + assistantLine({ usage: { input_tokens: 100, output_tokens: 200 } }), + ].join('\n'), + ) + + const usage = computeUsageFromTranscript(lines) + expect(usage.input_tokens).toBe(100) + expect(usage.output_tokens).toBe(200) + }) + + it('returns last model_id and normalizes [1m]-suffix', () => { + const lines = parseTranscript( + [ + assistantLine({ model: 'claude-sonnet-4-6', usage: { input_tokens: 1, output_tokens: 1 } }), + assistantLine({ model: 'claude-opus-4-7[1m]', usage: { input_tokens: 1, output_tokens: 1 } }), + ].join('\n'), + ) + const usage = computeUsageFromTranscript(lines) + expect(usage.model_id).toBe('claude-opus-4-7-1m') + }) + + it('returns null model_id when transcript is empty', () => { + expect(computeUsageFromTranscript([]).model_id).toBe(null) + }) + + it('skips sidechain (subagent) lines to avoid double-counting', () => { + const lines = parseTranscript( + [ + assistantLine({ + toolUseName: 'mcp__scrum4me__wait_for_job', + uuid: 'main-1', + }), + assistantLine({ + isSidechain: true, + uuid: 'sub-1', + usage: { input_tokens: 9999, output_tokens: 9999 }, + }), + assistantLine({ + uuid: 'main-2', + usage: { input_tokens: 50, output_tokens: 60 }, + }), + ].join('\n'), + ) + + const usage = computeUsageFromTranscript(lines) + expect(usage.input_tokens).toBe(50) + expect(usage.output_tokens).toBe(60) + }) +}) + +describe('persistJobUsage', () => { + let tmpDir: string + let transcriptPath: string + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'persist-job-usage-test-')) + transcriptPath = join(tmpDir, 'session.jsonl') + }) + + function cleanup() { + rmSync(tmpDir, { recursive: true, force: true }) + } + + it('skips when tool_name is not update_job_status', async () => { + const result = await persistJobUsage({ + tool_name: 'mcp__scrum4me__create_task', + tool_input: { job_id: 'j1', status: 'done' }, + transcript_path: transcriptPath, + }) + expect(result).toBe('skipped') + expect(mockUpdate).not.toHaveBeenCalled() + cleanup() + }) + + it('skips on status=running', async () => { + const result = await persistJobUsage({ + tool_name: 'mcp__scrum4me__update_job_status', + tool_input: { job_id: 'j1', status: 'running' }, + transcript_path: transcriptPath, + }) + expect(result).toBe('skipped') + expect(mockUpdate).not.toHaveBeenCalled() + cleanup() + }) + + it('skips when transcript missing', async () => { + const result = await persistJobUsage({ + tool_name: 'mcp__scrum4me__update_job_status', + tool_input: { job_id: 'j1', status: 'done' }, + transcript_path: '/no/such/file.jsonl', + }) + expect(result).toBe('skipped') + expect(mockUpdate).not.toHaveBeenCalled() + }) + + it('writes computed usage on success', async () => { + writeFileSync( + transcriptPath, + [ + assistantLine({ toolUseName: 'mcp__scrum4me__wait_for_job' }), + assistantLine({ + model: 'claude-sonnet-4-6', + usage: { + input_tokens: 10, + output_tokens: 20, + cache_creation_input_tokens: 30, + cache_read_input_tokens: 40, + }, + }), + assistantLine({ toolUseName: 'mcp__scrum4me__update_job_status' }), + ].join('\n'), + ) + + mockUpdate.mockResolvedValue({}) + const result = await persistJobUsage({ + tool_name: 'mcp__scrum4me__update_job_status', + tool_input: { job_id: 'job-123', status: 'done' }, + transcript_path: transcriptPath, + }) + + expect(result).toBe('written') + expect(mockUpdate).toHaveBeenCalledWith({ + where: { id: 'job-123' }, + data: { + model_id: 'claude-sonnet-4-6', + input_tokens: 10, + output_tokens: 20, + cache_read_tokens: 40, + cache_write_tokens: 30, + }, + }) + cleanup() + }) + + it('returns noop when transcript has no usage', async () => { + writeFileSync(transcriptPath, '') + const result = await persistJobUsage({ + tool_name: 'mcp__scrum4me__update_job_status', + tool_input: { job_id: 'job-123', status: 'failed' }, + transcript_path: transcriptPath, + }) + expect(result).toBe('noop') + expect(mockUpdate).not.toHaveBeenCalled() + cleanup() + }) +}) diff --git a/scripts/persist-job-usage.ts b/scripts/persist-job-usage.ts new file mode 100644 index 0000000..152ccb5 --- /dev/null +++ b/scripts/persist-job-usage.ts @@ -0,0 +1,229 @@ +// PostToolUse hook for mcp__scrum4me__update_job_status. +// +// Reads the local Claude Code transcript (no Anthropic API needed) and writes +// per-job token usage + model_id to claude_jobs. The hook receives a JSON +// payload on stdin with { session_id, transcript_path, tool_name, tool_input }. +// +// Window detection: the most-recent assistant message before EOF that issued a +// `mcp__scrum4me__wait_for_job` tool_use marks the job's start. All assistant +// messages after that index, up to and including the one that just called +// update_job_status, are summed. +// +// Idempotent — running twice for the same job overwrites with the same values. +// Designed to never block the agent: any failure logs a warning and exits 0. + +import { readFile } from 'node:fs/promises' +import { prisma } from '../src/prisma.js' + +export type HookInput = { + session_id?: string + transcript_path?: string + tool_name?: string + tool_input?: { job_id?: string; status?: string } +} + +type Usage = { + input_tokens?: number + output_tokens?: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number +} + +type ContentBlock = { type?: string; name?: string } + +type TranscriptLine = { + type?: string + uuid?: string + isSidechain?: boolean + message?: { + role?: string + model?: string + content?: ContentBlock[] + usage?: Usage + } +} + +export type ComputedUsage = { + model_id: string | null + input_tokens: number + output_tokens: number + cache_read_tokens: number + cache_write_tokens: number +} + +const WAIT_TOOL_NAME = 'mcp__scrum4me__wait_for_job' +const UPDATE_TOOL_NAME = 'mcp__scrum4me__update_job_status' + +export function parseTranscript(raw: string): TranscriptLine[] { + const lines = raw.split('\n') + const out: TranscriptLine[] = [] + const seenUuids = new Set() + for (const line of lines) { + if (!line) continue + let parsed: TranscriptLine + try { + parsed = JSON.parse(line) as TranscriptLine + } catch { + continue // skip malformed lines — transcript may be partially written + } + // Dedup on uuid: branching/resumption can re-write the same message into + // multiple JSONLs. Keep first occurrence. + if (parsed.uuid) { + if (seenUuids.has(parsed.uuid)) continue + seenUuids.add(parsed.uuid) + } + out.push(parsed) + } + return out +} + +function hasToolUse(line: TranscriptLine, toolName: string): boolean { + const content = line.message?.content + if (!Array.isArray(content)) return false + return content.some((c) => c.type === 'tool_use' && c.name === toolName) +} + +export function computeUsageFromTranscript(lines: TranscriptLine[]): ComputedUsage { + // Skip subagent (sidechain) lines: token usage attributed to subagent work + // is reported in the main transcript via assistant messages of the parent + // agent. Counting sidechain lines as well risks double-attribution because + // those same units of work also appear in `subagents/`-subdirectory files. + const main = lines.filter((l) => !l.isSidechain) + + // Find the last main-agent assistant message that called wait_for_job. + let startIdx = -1 + for (let i = main.length - 1; i >= 0; i--) { + if (hasToolUse(main[i], WAIT_TOOL_NAME)) { + startIdx = i + break + } + } + + // Window = (startIdx, end]. If no wait_for_job found, sum the whole session. + const from = startIdx + 1 + const window = main.slice(from) + + let input = 0 + let output = 0 + let cacheRead = 0 + let cacheWrite = 0 + let model: string | null = null + const modelsSeen = new Set() + + for (const line of window) { + if (line.type !== 'assistant') continue + const msg = line.message + if (!msg || msg.role !== 'assistant') continue + const u = msg.usage + if (u) { + input += u.input_tokens ?? 0 + output += u.output_tokens ?? 0 + cacheRead += u.cache_read_input_tokens ?? 0 + cacheWrite += u.cache_creation_input_tokens ?? 0 + } + if (msg.model) { + modelsSeen.add(msg.model) + model = msg.model // keep last + } + } + + if (modelsSeen.size > 1) { + console.warn( + `[persist-job-usage] multiple models in window: ${[...modelsSeen].join(', ')} — using last (${model})`, + ) + } + + return { + model_id: model ? normalizeModelId(model) : null, + input_tokens: input, + output_tokens: output, + cache_read_tokens: cacheRead, + cache_write_tokens: cacheWrite, + } +} + +// Strip wrapping brackets so [1m]-suffix maps cleanly to a model_prices row. +// Example: 'claude-opus-4-7[1m]' → 'claude-opus-4-7-1m'. +export function normalizeModelId(raw: string): string { + return raw.replace(/\[(.*?)\]/g, '-$1') +} + +export async function readHookInput(): Promise { + const chunks: Buffer[] = [] + for await (const chunk of process.stdin) { + chunks.push(chunk as Buffer) + } + const raw = Buffer.concat(chunks).toString('utf8').trim() + if (!raw) return {} + try { + return JSON.parse(raw) as HookInput + } catch { + return {} + } +} + +export async function persistJobUsage(input: HookInput): Promise<'skipped' | 'written' | 'noop'> { + if (input.tool_name !== UPDATE_TOOL_NAME) return 'skipped' + const status = input.tool_input?.status + if (status !== 'done' && status !== 'failed') return 'skipped' + const jobId = input.tool_input?.job_id + if (!jobId) return 'skipped' + const transcriptPath = input.transcript_path + if (!transcriptPath) return 'skipped' + + let raw: string + try { + raw = await readFile(transcriptPath, 'utf8') + } catch (err) { + console.warn(`[persist-job-usage] cannot read transcript ${transcriptPath}:`, err) + return 'skipped' + } + + const lines = parseTranscript(raw) + const usage = computeUsageFromTranscript(lines) + + // Skip pure no-op: no usage data and no model — nothing meaningful to persist. + if ( + usage.model_id === null && + usage.input_tokens === 0 && + usage.output_tokens === 0 && + usage.cache_read_tokens === 0 && + usage.cache_write_tokens === 0 + ) { + return 'noop' + } + + await prisma.claudeJob.update({ + where: { id: jobId }, + data: { + ...(usage.model_id !== null ? { model_id: usage.model_id } : {}), + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_read_tokens: usage.cache_read_tokens, + cache_write_tokens: usage.cache_write_tokens, + }, + }) + return 'written' +} + +async function main(): Promise { + try { + const input = await readHookInput() + const result = await persistJobUsage(input) + if (result === 'written') { + console.log(`[persist-job-usage] persisted usage for job=${input.tool_input?.job_id}`) + } + } catch (err) { + console.warn('[persist-job-usage] error:', err) + } finally { + // Ensure clean exit even if Prisma keeps a connection pool alive. + process.exit(0) + } +} + +const isDirect = + import.meta.url === `file://${process.argv[1]}` || + process.argv[1]?.endsWith('persist-job-usage.ts') +if (isDirect) { + void main() +} diff --git a/src/tools/update-job-status.ts b/src/tools/update-job-status.ts index b979df6..6f21c4a 100644 --- a/src/tools/update-job-status.ts +++ b/src/tools/update-job-status.ts @@ -21,6 +21,11 @@ const inputSchema = z.object({ branch: z.string().min(1).optional(), summary: z.string().max(1_000).optional(), error: z.string().max(2_000).optional(), + model_id: z.string().min(1).max(200).optional(), + input_tokens: z.number().int().nonnegative().optional(), + output_tokens: z.number().int().nonnegative().optional(), + cache_read_tokens: z.number().int().nonnegative().optional(), + cache_write_tokens: z.number().int().nonnegative().optional(), }) export async function cleanupWorktreeForTerminalStatus( @@ -266,10 +271,24 @@ export function registerUpdateJobStatusTool(server: McpServer) { 'PARTIAL/DIVERGENT but requires a non-empty summary (≥20 chars) explaining the drift; ANY ' + 'accepts everything. ' + 'Automatically emits an SSE event so the Scrum4Me UI updates in real time. ' + + 'Optionally accepts token-usage fields (model_id + input/output/cache_read/cache_write tokens) ' + + 'for cost tracking — typically populated by a PostToolUse hook from the local Claude Code transcript, ' + + 'not by the agent itself. ' + 'Response includes next_action: when wait_for_job_again, immediately call wait_for_job again. When queue_empty, the agent batch is done.', inputSchema, }, - async ({ job_id, status, branch, summary, error }) => + async ({ + job_id, + status, + branch, + summary, + error, + model_id, + input_tokens, + output_tokens, + cache_read_tokens, + cache_write_tokens, + }) => withToolErrors(async () => { const auth = await requireWriteAccess() const { tokenId, userId } = auth @@ -371,6 +390,11 @@ export function registerUpdateJobStatusTool(server: McpServer) { ...(summary !== undefined ? { summary } : {}), ...(errorToWrite !== undefined ? { error: errorToWrite } : {}), ...(prUrl !== null ? { pr_url: prUrl } : {}), + ...(model_id !== undefined ? { model_id } : {}), + ...(input_tokens !== undefined ? { input_tokens } : {}), + ...(output_tokens !== undefined ? { output_tokens } : {}), + ...(cache_read_tokens !== undefined ? { cache_read_tokens } : {}), + ...(cache_write_tokens !== undefined ? { cache_write_tokens } : {}), }, select: { id: true,