refactor(worker): remove MessageParser (replaced by StreamAnalyzer)

This commit is contained in:
Mika Kuns
2026-04-14 14:12:21 +02:00
parent 03728c8e4a
commit c1c4c75979
26 changed files with 3978 additions and 88 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,514 @@
# Worker CLI Modernization
**Date:** 2026-04-14
**Status:** Approved
**Scope:** ClaudeDo.Worker — CLI invocation, execution tracking, per-task configuration, multi-turn support
## Problem
The Worker currently invokes Claude CLI with hardcoded flags (`-p --output-format stream-json --verbose --dangerously-skip-permissions`). There is no way to configure model, system prompt, or agent per list or task. Execution is single-shot with no retry or follow-up capability. Results are stored as a single markdown blob on the `tasks` row with no structured metadata, token usage, or turn count.
## Goals
1. Per-list configuration (model, system prompt, agent file) with per-task overrides
2. Execution history — each CLI invocation tracked as its own `task_runs` row
3. Multi-turn support — manual continue and auto-retry via `--resume`
4. Structured output alongside markdown via `--json-schema`
5. Agent file management — filesystem-based `.md` agents with UI to browse/create/edit
6. Richer stream parsing — token usage, turn count, session ID, retry events
## Non-Goals (Deferred)
- `--bare` mode (forces API key; user relies on OAuth/keychain auth)
- `--allowedTools` / permission modes (keep `--dangerously-skip-permissions`)
- Schema migration framework (use `IF NOT EXISTS` / `INSERT OR IGNORE` for additive changes)
---
## 1. Schema Changes
### 1.1 New table: `list_config`
One-to-one with `lists`. Stores per-list defaults for CLI invocation.
```sql
CREATE TABLE IF NOT EXISTS list_config (
list_id TEXT PRIMARY KEY REFERENCES lists(id) ON DELETE CASCADE,
model TEXT NULL, -- 'opus-4-6' | 'sonnet-4-6' | 'haiku-4-5'
system_prompt TEXT NULL, -- appended via --append-system-prompt
agent_path TEXT NULL -- path to agent .md file, passed via --agents
);
```
### 1.2 New columns on `tasks`
Per-task overrides. All nullable — NULL means "use list default".
```sql
ALTER TABLE tasks ADD COLUMN model TEXT NULL;
ALTER TABLE tasks ADD COLUMN system_prompt TEXT NULL;
ALTER TABLE tasks ADD COLUMN agent_path TEXT NULL;
```
Since schema uses `IF NOT EXISTS` and is re-applied on startup, these are added via `ALTER TABLE ... ADD COLUMN` wrapped in a try/catch (SQLite raises "duplicate column" if already present — safe to ignore).
### 1.3 New table: `task_runs`
One row per CLI invocation. Supports multi-turn and retry tracking.
```sql
CREATE TABLE IF NOT EXISTS task_runs (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
run_number INTEGER NOT NULL, -- 1, 2, 3... sequential per task
session_id TEXT NULL, -- Claude CLI session ID (for --resume)
is_retry INTEGER NOT NULL DEFAULT 0, -- 0 = normal/continue, 1 = auto-retry
prompt TEXT NOT NULL, -- the prompt sent for this run
result_markdown TEXT NULL, -- free-form result from 'result' field
structured_output TEXT NULL, -- JSON from 'structured_output' field
error_markdown TEXT NULL, -- error output on failure
exit_code INTEGER NULL, -- CLI exit code
turn_count INTEGER NULL, -- number of agent loop turns
tokens_in INTEGER NULL, -- total input tokens
tokens_out INTEGER NULL, -- total output tokens
log_path TEXT NULL, -- NDJSON log file for this run
started_at TIMESTAMP NULL,
finished_at TIMESTAMP NULL
);
CREATE INDEX IF NOT EXISTS idx_task_runs_task_id ON task_runs(task_id);
```
### 1.4 Denormalized fields on `tasks`
Keep existing `result`, `log_path`, `started_at`, `finished_at` on the `tasks` table. After each run completes, update them with the latest run's values. This preserves backward compatibility for UI queries that read `tasks` directly.
### 1.5 Model validation
Valid model values: `opus-4-6`, `sonnet-4-6`, `haiku-4-5`. Validated at the application layer (repository/service), not via SQL CHECK constraint, to allow easy future additions.
---
## 2. Agent File Management
### 2.1 Directory
Agents live in `~/.todo-app/agents/`. The directory is created on Worker startup if absent.
### 2.2 File format
Standard Claude agent markdown with YAML frontmatter:
```markdown
---
name: .NET Developer
description: Senior .NET developer focused on clean architecture
---
You are a senior .NET developer. Follow existing project patterns...
```
### 2.3 AgentFileService
New service in `ClaudeDo.Worker` (not a repository — operates on filesystem, not DB):
| Method | Description |
|--------|-------------|
| `ScanAsync()` | Returns `List<AgentInfo>` — parse frontmatter for name/description from all `*.md` in agents dir |
| `ReadAsync(string path)` | Full file content |
| `WriteAsync(string path, string content)` | Create or overwrite |
| `DeleteAsync(string path)` | Remove file |
### 2.4 AgentInfo DTO
```csharp
public sealed record AgentInfo(string Name, string Description, string Path);
```
### 2.5 Discovery
- Worker scans on startup and exposes agents via a new SignalR method `GetAgents()`.
- UI calls `GetAgents()` to populate dropdowns.
- A `RefreshAgents()` hub method triggers a re-scan (for after UI creates/edits a file).
---
## 3. CLI Invocation Changes
### 3.1 Current invocation
```
claude -p --output-format stream-json --verbose --dangerously-skip-permissions
```
Prompt written to stdin. Single-shot, no config, no structured output.
### 3.2 New invocation
Built dynamically per run by `ClaudeArgsBuilder`:
```
claude -p
--output-format stream-json
--verbose
--dangerously-skip-permissions
--model <resolved-model> # if set
--append-system-prompt <resolved-prompt> # if set
--agents '[{"file":"<resolved-agent-path>"}]' # if set
--json-schema <schema-json> # always
--resume <session-id> # only for multi-turn/retry
```
### 3.3 Config resolution
```
resolved_model = task.model ?? list_config.model ?? null (omit --model)
resolved_prompt = task.system_prompt ?? list_config.system_prompt ?? null (omit --append-system-prompt)
resolved_agent = task.agent_path ?? list_config.agent_path ?? null (omit --agents)
```
### 3.4 Structured output schema
Passed via `--json-schema` on every invocation:
```json
{
"type": "object",
"properties": {
"summary": { "type": "string" },
"files_changed": {
"type": "array",
"items": { "type": "string" }
},
"commit_type": { "type": "string" }
},
"required": ["summary"]
}
```
The CLI returns this in the `structured_output` field of the JSON result event. The markdown result remains in the `result` field.
### 3.5 ClaudeArgsBuilder
New class, single responsibility for argument construction:
```csharp
public sealed class ClaudeArgsBuilder
{
// Returns the full argument string for ProcessStartInfo.Arguments
public string Build(ClaudeRunConfig config);
}
public sealed record ClaudeRunConfig(
string? Model,
string? SystemPrompt,
string? AgentPath,
string? ResumeSessionId
);
```
Testable in isolation — no process spawning, just string building.
---
## 4. Stream Parsing
### 4.1 StreamAnalyzer (replaces MessageParser)
Processes each NDJSON line and accumulates metrics:
| Responsibility | How |
|---|---|
| Extract result markdown | Look for `type: "result"`, read `.result` field |
| Extract structured output | Same event, read `.structured_output` field |
| Extract session ID | Read `.session_id` from the result event |
| Count turns | Count events where `.type == "assistant"` |
| Accumulate tokens | Sum `.usage.input_tokens` and `.usage.output_tokens` from each turn |
| Track retries | Count `system/api_retry` events (informational logging) |
### 4.2 StreamResult
```csharp
public sealed class StreamResult
{
public string? ResultMarkdown { get; set; }
public string? StructuredOutputJson { get; set; }
public string? SessionId { get; set; }
public int TurnCount { get; set; }
public int TokensIn { get; set; }
public int TokensOut { get; set; }
public int ApiRetryCount { get; set; }
}
```
### 4.3 Extended RunResult
```csharp
public sealed class RunResult
{
public required int ExitCode { get; init; }
public string? ResultMarkdown { get; init; }
public string? ErrorMarkdown { get; init; }
public string? StructuredOutputJson { get; init; }
public string? SessionId { get; init; }
public int TurnCount { get; init; }
public int TokensIn { get; init; }
public int TokensOut { get; init; }
public bool IsSuccess => ExitCode == 0 && ResultMarkdown is not null;
}
```
---
## 5. Multi-Turn & Auto-Retry
### 5.1 Execution flow
```
Task queued
-> Run 1 (run_number=1, is_retry=0)
-> Resolve config (list defaults + task overrides)
-> Build CLI args (no --resume on first run)
-> Spawn claude, stream output, parse via StreamAnalyzer
-> Create task_runs row with all metrics
-> Update denormalized tasks fields
If failure (exit_code != 0):
-> Auto-retry: Run 2 (run_number=2, is_retry=1)
-> Prompt: "The previous attempt failed with:\n\n{error_markdown}\n\nTry again and fix the issues."
-> Uses --resume <session_id> from Run 1
-> Same worktree, same config
-> Create new task_runs row
-> If still fails: mark task Failed, stop
If success (exit_code == 0):
-> Auto-commit in worktree if changes
-> Mark task Done
User triggers "Continue" on finished/failed task:
-> New run (run_number=N+1, is_retry=0)
-> User-provided follow-up prompt
-> Uses --resume <session_id> from last run
-> Task status -> Running -> Done/Failed
```
### 5.2 Rules
- Max 1 auto-retry per task execution (no retry loops)
- Auto-retry reuses the session via `--resume` (full context of prior failure)
- Manual continue works on both Done and Failed tasks
- Each run gets its own log file: `{task_id}_run{N}.ndjson`
- Worktree commit happens only after a successful run
- If Run 1 has no session_id (edge case: CLI crashed before producing one), skip auto-retry
### 5.3 Continue via SignalR
New hub method: `ContinueTask(string taskId, string followUpPrompt)` -> returns `string runId`
Validation:
- Task must exist
- Task must not be currently running
- Previous run must have a session_id
---
## 6. TaskRunner Refactoring
### 6.1 Current flow (TaskRunner.RunAsync)
1. Load list, create worktree/sandbox, mark running
2. Build prompt from title + description
3. Call `_claude.RunAsync(prompt, dir, logPath, taskId, callback, ct)`
4. Handle result: commit on success, mark done/failed
### 6.2 New flow
```csharp
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
{
// 1. Load list + list_config
// 2. Resolve config (merge list_config + task overrides)
// 3. Create worktree/sandbox (unchanged)
// 4. Execute run (see RunOnceAsync below)
// 5. If failed and no prior retry: auto-retry
// 6. Final status update
}
public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct)
{
// 1. Load task, last run (for session_id)
// 2. Mark task running
// 3. Execute run with --resume
// 4. Commit if success + worktree
// 5. Final status update
}
private async Task<RunResult> RunOnceAsync(
TaskEntity task, string slot, string runDir, ClaudeRunConfig config,
int runNumber, bool isRetry, string prompt, CancellationToken ct)
{
// 1. Create task_runs row (started_at = now)
// 2. Build log path: {task_id}_run{runNumber}.ndjson
// 3. Build CLI args via ClaudeArgsBuilder
// 4. Spawn ClaudeProcess
// 5. Stream lines to LogWriter + StreamAnalyzer + HubBroadcaster
// 6. Build RunResult from StreamAnalyzer
// 7. Update task_runs row (finished_at, metrics, result)
// 8. Update denormalized tasks fields
// 9. Return RunResult
}
```
### 6.3 ClaudeProcess changes
Simplified — receives pre-built args, no longer constructs its own:
```csharp
public async Task<RunResult> RunAsync(
string arguments, // pre-built by ClaudeArgsBuilder
string prompt, // written to stdin
string workingDirectory,
Func<string, Task> onStdoutLine,
CancellationToken ct)
```
The `StreamAnalyzer` instance is owned by the caller (TaskRunner), not ClaudeProcess. ClaudeProcess just feeds lines via the callback.
---
## 7. Repository Changes
### 7.1 New: TaskRunRepository
| Method | Description |
|--------|-------------|
| `AddAsync(TaskRunEntity)` | Insert new run |
| `UpdateAsync(TaskRunEntity)` | Update after completion |
| `GetByTaskIdAsync(string taskId)` | All runs for a task, ordered by run_number |
| `GetLatestByTaskIdAsync(string taskId)` | Most recent run (for session_id lookup) |
| `GetByIdAsync(string runId)` | Single run |
### 7.2 Extended: ListRepository
| Method | Description |
|--------|-------------|
| `GetConfigAsync(string listId)` | Returns `ListConfigEntity?` |
| `SetConfigAsync(ListConfigEntity)` | Upsert via INSERT OR REPLACE |
### 7.3 New models
```csharp
public sealed class TaskRunEntity
{
public required string Id { get; init; }
public required string TaskId { get; init; }
public required int RunNumber { get; init; }
public string? SessionId { get; set; }
public required bool IsRetry { get; init; }
public required string Prompt { get; init; }
public string? ResultMarkdown { get; set; }
public string? StructuredOutputJson { get; set; }
public string? ErrorMarkdown { get; set; }
public int? ExitCode { get; set; }
public int? TurnCount { get; set; }
public int? TokensIn { get; set; }
public int? TokensOut { get; set; }
public string? LogPath { get; set; }
public DateTime? StartedAt { get; set; }
public DateTime? FinishedAt { get; set; }
}
public sealed class ListConfigEntity
{
public required string ListId { get; init; }
public string? Model { get; set; }
public string? SystemPrompt { get; set; }
public string? AgentPath { get; set; }
}
```
---
## 8. SignalR Hub Changes
### 8.1 New server methods
| Method | Description |
|--------|-------------|
| `ContinueTask(string taskId, string followUpPrompt)` | Trigger follow-up run. Returns `string runId`. Throws if running or no session. |
| `GetAgents()` | Returns `List<AgentInfo>` from AgentFileService scan |
| `RefreshAgents()` | Re-scan agents directory |
### 8.2 Updated broadcasts
| Event | Change |
|-------|--------|
| `TaskStarted(slot, taskId, runId, runNumber, startedAt)` | Added `runId`, `runNumber` |
| `TaskFinished(slot, taskId, runId, status, finishedAt)` | Added `runId` |
| `TaskMessage(taskId, runId, ndjsonLine)` | Added `runId` |
| `RunCreated(taskId, runId, runNumber, isRetry)` | New — signals retry/continue started |
### 8.3 Unchanged
`Ping`, `GetActive`, `CancelTask`, `WakeQueue`, `WorktreeUpdated`, `TaskUpdated` — no changes.
---
## 9. File Structure (New/Changed)
```
src/ClaudeDo.Worker/
Runner/
ClaudeArgsBuilder.cs NEW — CLI argument construction
StreamAnalyzer.cs NEW — replaces MessageParser
StreamResult.cs NEW — accumulated stream metrics
RunResult.cs CHANGED — extended with tokens, turns, session_id
ClaudeProcess.cs CHANGED — simplified, takes pre-built args
TaskRunner.cs CHANGED — retry/continue logic, config resolution
MessageParser.cs DELETED — replaced by StreamAnalyzer
Services/
AgentFileService.cs NEW — filesystem agent management
src/ClaudeDo.Data/
Models/
TaskRunEntity.cs NEW
ListConfigEntity.cs NEW
AgentInfo.cs NEW — DTO (name, description, path)
Repositories/
TaskRunRepository.cs NEW
ListRepository.cs CHANGED — GetConfigAsync, SetConfigAsync
schema/
schema.sql CHANGED — list_config table, task_runs table, tasks columns
```
---
## 10. Testing Strategy
### 10.1 Unit tests (new)
| Test class | Covers |
|------------|--------|
| `ClaudeArgsBuilderTests` | Arg construction with all config combos, omitted flags for null values |
| `StreamAnalyzerTests` | Turn counting, token accumulation, result extraction, session_id, retry events, malformed input |
| `AgentFileServiceTests` | Scan, frontmatter parsing, read/write/delete, missing directory handling |
### 10.2 Unit tests (updated)
| Test class | Changes |
|------------|---------|
| `TaskRunnerTests` | New: auto-retry flow, continue flow, config resolution |
| `QueueServiceTests` | New: continue task routing |
### 10.3 Integration tests (new)
| Test class | Covers |
|------------|--------|
| `TaskRunRepositoryTests` | CRUD, ordering, latest-by-task queries |
| `ListRepositoryConfigTests` | GetConfig, SetConfig upsert behavior |
### 10.4 Existing tests (MessageParserTests)
Removed along with `MessageParser`. Equivalent coverage moves to `StreamAnalyzerTests`.