refactor(worker): simplify ClaudeProcess to accept pre-built args and use StreamAnalyzer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-04-14 11:45:23 +02:00
parent 8825351526
commit 1cdaaf9fd2
4 changed files with 50 additions and 34 deletions

View File

@@ -16,17 +16,16 @@ public sealed class ClaudeProcess : IClaudeProcess
} }
public async Task<RunResult> RunAsync( public async Task<RunResult> RunAsync(
string arguments,
string prompt, string prompt,
string workingDirectory, string workingDirectory,
string logPath,
string taskId,
Func<string, Task> onStdoutLine, Func<string, Task> onStdoutLine,
CancellationToken ct) CancellationToken ct)
{ {
var psi = new ProcessStartInfo var psi = new ProcessStartInfo
{ {
FileName = _cfg.ClaudeBin, FileName = _cfg.ClaudeBin,
Arguments = "-p --output-format stream-json --verbose --dangerously-skip-permissions", Arguments = arguments,
WorkingDirectory = workingDirectory, WorkingDirectory = workingDirectory,
RedirectStandardInput = true, RedirectStandardInput = true,
RedirectStandardOutput = true, RedirectStandardOutput = true,
@@ -40,30 +39,25 @@ public sealed class ClaudeProcess : IClaudeProcess
using var process = new Process { StartInfo = psi }; using var process = new Process { StartInfo = psi };
process.Start(); process.Start();
// Write prompt to stdin, then close.
await process.StandardInput.WriteAsync(prompt); await process.StandardInput.WriteAsync(prompt);
process.StandardInput.Close(); process.StandardInput.Close();
string? resultMarkdown = null; var analyzer = new StreamAnalyzer();
var lastStderr = new StringBuilder(); var lastStderr = new StringBuilder();
// Register cancellation to kill the process tree.
await using var ctr = ct.Register(() => await using var ctr = ct.Register(() =>
{ {
try { process.Kill(entireProcessTree: true); } try { process.Kill(entireProcessTree: true); }
catch { /* already exited */ } catch { /* already exited */ }
}); });
// Read stdout and stderr concurrently.
var stdoutTask = Task.Run(async () => var stdoutTask = Task.Run(async () =>
{ {
while (await process.StandardOutput.ReadLineAsync(ct) is { } line) while (await process.StandardOutput.ReadLineAsync(ct) is { } line)
{ {
if (string.IsNullOrEmpty(line)) continue; if (string.IsNullOrEmpty(line)) continue;
await onStdoutLine(line); await onStdoutLine(line);
analyzer.ProcessLine(line);
if (MessageParser.TryExtractResult(line, out var res))
resultMarkdown = res;
} }
}, ct); }, ct);
@@ -81,16 +75,34 @@ public sealed class ClaudeProcess : IClaudeProcess
await process.WaitForExitAsync(ct); await process.WaitForExitAsync(ct);
var exitCode = process.ExitCode; var exitCode = process.ExitCode;
var streamResult = analyzer.GetResult();
if (exitCode == 0 && resultMarkdown is not null) if (exitCode == 0 && streamResult.ResultMarkdown is not null)
{ {
return new RunResult { ExitCode = exitCode, ResultMarkdown = resultMarkdown }; return new RunResult
{
ExitCode = exitCode,
ResultMarkdown = streamResult.ResultMarkdown,
StructuredOutputJson = streamResult.StructuredOutputJson,
SessionId = streamResult.SessionId,
TurnCount = streamResult.TurnCount,
TokensIn = streamResult.TokensIn,
TokensOut = streamResult.TokensOut,
};
} }
var error = lastStderr.Length > 0 var error = lastStderr.Length > 0
? lastStderr.ToString().Trim() ? lastStderr.ToString().Trim()
: $"Claude exited with code {exitCode} and no result."; : $"Claude exited with code {exitCode} and no result.";
return new RunResult { ExitCode = exitCode, ErrorMarkdown = error }; return new RunResult
{
ExitCode = exitCode,
ErrorMarkdown = error,
SessionId = streamResult.SessionId,
TurnCount = streamResult.TurnCount,
TokensIn = streamResult.TokensIn,
TokensOut = streamResult.TokensOut,
};
} }
} }

View File

@@ -3,10 +3,9 @@ namespace ClaudeDo.Worker.Runner;
public interface IClaudeProcess public interface IClaudeProcess
{ {
Task<RunResult> RunAsync( Task<RunResult> RunAsync(
string arguments,
string prompt, string prompt,
string workingDirectory, string workingDirectory,
string logPath,
string taskId,
Func<string, Task> onStdoutLine, Func<string, Task> onStdoutLine,
CancellationToken ct); CancellationToken ct);
} }

View File

@@ -75,18 +75,23 @@ public sealed class TaskRunner
await _taskRepo.MarkRunningAsync(task.Id, now, ct); await _taskRepo.MarkRunningAsync(task.Id, now, ct);
await _broadcaster.TaskStarted(slot, task.Id, now); await _broadcaster.TaskStarted(slot, task.Id, now);
// Build prompt. // Build prompt and arguments.
var prompt = string.IsNullOrWhiteSpace(task.Description) var prompt = string.IsNullOrWhiteSpace(task.Description)
? task.Title ? task.Title
: $"{task.Title}\n\n{task.Description.Trim()}"; : $"{task.Title}\n\n{task.Description.Trim()}";
var arguments = new ClaudeArgsBuilder().Build(new ClaudeRunConfig(
Model: null,
SystemPrompt: null,
AgentPath: null,
ResumeSessionId: null));
await using var logWriter = new LogWriter(logPath); await using var logWriter = new LogWriter(logPath);
var result = await _claude.RunAsync( var result = await _claude.RunAsync(
arguments,
prompt, prompt,
runDir, runDir,
logPath,
task.Id,
async line => async line =>
{ {
await logWriter.WriteLineAsync(line, ct); await logWriter.WriteLineAsync(line, ct);

View File

@@ -43,7 +43,7 @@ public sealed class QueueServiceTests : IDisposable
} }
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null) Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{ {
var fake = new FakeClaudeProcess(handler); var fake = new FakeClaudeProcess(handler);
var broadcaster = new HubBroadcaster(new FakeHubContext()); var broadcaster = new HubBroadcaster(new FakeHubContext());
@@ -88,7 +88,7 @@ public sealed class QueueServiceTests : IDisposable
var (listId, _) = await SeedListWithAgentTag(); var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>(); var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, _, ct) => tcs.Task); var (service, _) = CreateService((_, _, _, _, ct) => tcs.Task);
var task1 = await SeedQueuedTask(listId); var task1 = await SeedQueuedTask(listId);
var task2 = await SeedQueuedTask(listId); var task2 = await SeedQueuedTask(listId);
@@ -114,7 +114,7 @@ public sealed class QueueServiceTests : IDisposable
var (listId, _) = await SeedListWithAgentTag(); var (listId, _) = await SeedListWithAgentTag();
await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
var (service, fake) = CreateService((_, _, _, _, _, _) => var (service, fake) = CreateService((_, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
using var cts = new CancellationTokenSource(); using var cts = new CancellationTokenSource();
@@ -139,17 +139,17 @@ public sealed class QueueServiceTests : IDisposable
var gate2 = new TaskCompletionSource(); var gate2 = new TaskCompletionSource();
var callCount = 0; var callCount = 0;
var (service, _) = CreateService(async (prompt, _, _, taskId, _, ct) => var (service, _) = CreateService(async (_, _, _, _, ct) =>
{ {
var n = Interlocked.Increment(ref callCount); var n = Interlocked.Increment(ref callCount);
lock (order) { order.Add(taskId); } lock (order) { order.Add(n.ToString()); }
if (n == 1) await gate1.Task; if (n == 1) await gate1.Task;
if (n == 2) gate2.SetResult(); if (n == 2) gate2.SetResult();
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
}); });
var task1 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2)); await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2));
var task2 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1)); await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1));
using var cts = new CancellationTokenSource(); using var cts = new CancellationTokenSource();
await service.StartAsync(cts.Token); await service.StartAsync(cts.Token);
@@ -162,7 +162,7 @@ public sealed class QueueServiceTests : IDisposable
// Only task1 should be running (task2 waiting on the queue slot). // Only task1 should be running (task2 waiting on the queue slot).
Assert.Single(order); Assert.Single(order);
Assert.Equal(task1.Id, order[0]); Assert.Equal("1", order[0]);
// Release first task. // Release first task.
gate1.SetResult(); gate1.SetResult();
@@ -171,7 +171,7 @@ public sealed class QueueServiceTests : IDisposable
await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5)); await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(2, order.Count); Assert.Equal(2, order.Count);
Assert.Equal(task2.Id, order[1]); Assert.Equal("2", order[1]);
cts.Cancel(); cts.Cancel();
} }
@@ -184,7 +184,7 @@ public sealed class QueueServiceTests : IDisposable
var running = new TaskCompletionSource(); var running = new TaskCompletionSource();
var cancelled = false; var cancelled = false;
var (service, _) = CreateService(async (_, _, _, _, _, ct) => var (service, _) = CreateService(async (_, _, _, _, ct) =>
{ {
running.SetResult(); running.SetResult();
try try
@@ -217,7 +217,7 @@ public sealed class QueueServiceTests : IDisposable
var (listId, _) = await SeedListWithAgentTag(); var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>(); var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, _, _) => tcs.Task); var (service, _) = CreateService((_, _, _, _, _) => tcs.Task);
var task = await SeedQueuedTask(listId); var task = await SeedQueuedTask(listId);
await service.RunNow(task.Id); await service.RunNow(task.Id);
@@ -235,23 +235,23 @@ public sealed class QueueServiceTests : IDisposable
internal sealed class FakeClaudeProcess : IClaudeProcess internal sealed class FakeClaudeProcess : IClaudeProcess
{ {
private readonly Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>> _handler; private readonly Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>> _handler;
private int _callCount; private int _callCount;
public int CallCount => _callCount; public int CallCount => _callCount;
public FakeClaudeProcess( public FakeClaudeProcess(
Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null) Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{ {
_handler = handler ?? ((_, _, _, _, _, _) => _handler = handler ?? ((_, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
} }
public async Task<RunResult> RunAsync(string prompt, string workingDirectory, string logPath, string taskId, public async Task<RunResult> RunAsync(string arguments, string prompt, string workingDirectory,
Func<string, Task> onStdoutLine, CancellationToken ct) Func<string, Task> onStdoutLine, CancellationToken ct)
{ {
Interlocked.Increment(ref _callCount); Interlocked.Increment(ref _callCount);
return await _handler(prompt, workingDirectory, logPath, taskId, onStdoutLine, ct); return await _handler(prompt, workingDirectory, arguments, onStdoutLine, ct);
} }
} }