diff --git a/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs index 1a4d0a5..1cbe132 100644 --- a/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs +++ b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs @@ -16,17 +16,16 @@ public sealed class ClaudeProcess : IClaudeProcess } public async Task RunAsync( + string arguments, string prompt, string workingDirectory, - string logPath, - string taskId, Func onStdoutLine, CancellationToken ct) { var psi = new ProcessStartInfo { FileName = _cfg.ClaudeBin, - Arguments = "-p --output-format stream-json --verbose --dangerously-skip-permissions", + Arguments = arguments, WorkingDirectory = workingDirectory, RedirectStandardInput = true, RedirectStandardOutput = true, @@ -40,30 +39,25 @@ public sealed class ClaudeProcess : IClaudeProcess using var process = new Process { StartInfo = psi }; process.Start(); - // Write prompt to stdin, then close. await process.StandardInput.WriteAsync(prompt); process.StandardInput.Close(); - string? resultMarkdown = null; + var analyzer = new StreamAnalyzer(); var lastStderr = new StringBuilder(); - // Register cancellation to kill the process tree. await using var ctr = ct.Register(() => { try { process.Kill(entireProcessTree: true); } catch { /* already exited */ } }); - // Read stdout and stderr concurrently. var stdoutTask = Task.Run(async () => { while (await process.StandardOutput.ReadLineAsync(ct) is { } line) { if (string.IsNullOrEmpty(line)) continue; await onStdoutLine(line); - - if (MessageParser.TryExtractResult(line, out var res)) - resultMarkdown = res; + analyzer.ProcessLine(line); } }, ct); @@ -81,16 +75,34 @@ public sealed class ClaudeProcess : IClaudeProcess await process.WaitForExitAsync(ct); var exitCode = process.ExitCode; + var streamResult = analyzer.GetResult(); - if (exitCode == 0 && resultMarkdown is not null) + if (exitCode == 0 && streamResult.ResultMarkdown is not null) { - return new RunResult { ExitCode = exitCode, ResultMarkdown = resultMarkdown }; + return new RunResult + { + ExitCode = exitCode, + ResultMarkdown = streamResult.ResultMarkdown, + StructuredOutputJson = streamResult.StructuredOutputJson, + SessionId = streamResult.SessionId, + TurnCount = streamResult.TurnCount, + TokensIn = streamResult.TokensIn, + TokensOut = streamResult.TokensOut, + }; } var error = lastStderr.Length > 0 ? lastStderr.ToString().Trim() : $"Claude exited with code {exitCode} and no result."; - return new RunResult { ExitCode = exitCode, ErrorMarkdown = error }; + return new RunResult + { + ExitCode = exitCode, + ErrorMarkdown = error, + SessionId = streamResult.SessionId, + TurnCount = streamResult.TurnCount, + TokensIn = streamResult.TokensIn, + TokensOut = streamResult.TokensOut, + }; } } diff --git a/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs b/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs index e3fb057..ebd18b8 100644 --- a/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs +++ b/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs @@ -3,10 +3,9 @@ namespace ClaudeDo.Worker.Runner; public interface IClaudeProcess { Task RunAsync( + string arguments, string prompt, string workingDirectory, - string logPath, - string taskId, Func onStdoutLine, CancellationToken ct); } diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index 1b9f517..cab7436 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -75,18 +75,23 @@ public sealed class TaskRunner await _taskRepo.MarkRunningAsync(task.Id, now, ct); await _broadcaster.TaskStarted(slot, task.Id, now); - // Build prompt. + // Build prompt and arguments. var prompt = string.IsNullOrWhiteSpace(task.Description) ? task.Title : $"{task.Title}\n\n{task.Description.Trim()}"; + var arguments = new ClaudeArgsBuilder().Build(new ClaudeRunConfig( + Model: null, + SystemPrompt: null, + AgentPath: null, + ResumeSessionId: null)); + await using var logWriter = new LogWriter(logPath); var result = await _claude.RunAsync( + arguments, prompt, runDir, - logPath, - task.Id, async line => { await logWriter.WriteLineAsync(line, ct); diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs index 9a8ecb9..44ed765 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs @@ -43,7 +43,7 @@ public sealed class QueueServiceTests : IDisposable } private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( - Func, CancellationToken, Task>? handler = null) + Func, CancellationToken, Task>? handler = null) { var fake = new FakeClaudeProcess(handler); var broadcaster = new HubBroadcaster(new FakeHubContext()); @@ -88,7 +88,7 @@ public sealed class QueueServiceTests : IDisposable var (listId, _) = await SeedListWithAgentTag(); var tcs = new TaskCompletionSource(); - var (service, _) = CreateService((_, _, _, _, _, ct) => tcs.Task); + var (service, _) = CreateService((_, _, _, _, ct) => tcs.Task); var task1 = await SeedQueuedTask(listId); var task2 = await SeedQueuedTask(listId); @@ -114,7 +114,7 @@ public sealed class QueueServiceTests : IDisposable var (listId, _) = await SeedListWithAgentTag(); await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); - var (service, fake) = CreateService((_, _, _, _, _, _) => + var (service, fake) = CreateService((_, _, _, _, _) => Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); using var cts = new CancellationTokenSource(); @@ -139,17 +139,17 @@ public sealed class QueueServiceTests : IDisposable var gate2 = new TaskCompletionSource(); var callCount = 0; - var (service, _) = CreateService(async (prompt, _, _, taskId, _, ct) => + var (service, _) = CreateService(async (_, _, _, _, ct) => { var n = Interlocked.Increment(ref callCount); - lock (order) { order.Add(taskId); } + lock (order) { order.Add(n.ToString()); } if (n == 1) await gate1.Task; if (n == 2) gate2.SetResult(); return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; }); - var task1 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2)); - var task2 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1)); + await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2)); + await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1)); using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); @@ -162,7 +162,7 @@ public sealed class QueueServiceTests : IDisposable // Only task1 should be running (task2 waiting on the queue slot). Assert.Single(order); - Assert.Equal(task1.Id, order[0]); + Assert.Equal("1", order[0]); // Release first task. gate1.SetResult(); @@ -171,7 +171,7 @@ public sealed class QueueServiceTests : IDisposable await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal(2, order.Count); - Assert.Equal(task2.Id, order[1]); + Assert.Equal("2", order[1]); cts.Cancel(); } @@ -184,7 +184,7 @@ public sealed class QueueServiceTests : IDisposable var running = new TaskCompletionSource(); var cancelled = false; - var (service, _) = CreateService(async (_, _, _, _, _, ct) => + var (service, _) = CreateService(async (_, _, _, _, ct) => { running.SetResult(); try @@ -217,7 +217,7 @@ public sealed class QueueServiceTests : IDisposable var (listId, _) = await SeedListWithAgentTag(); var tcs = new TaskCompletionSource(); - var (service, _) = CreateService((_, _, _, _, _, _) => tcs.Task); + var (service, _) = CreateService((_, _, _, _, _) => tcs.Task); var task = await SeedQueuedTask(listId); await service.RunNow(task.Id); @@ -235,23 +235,23 @@ public sealed class QueueServiceTests : IDisposable internal sealed class FakeClaudeProcess : IClaudeProcess { - private readonly Func, CancellationToken, Task> _handler; + private readonly Func, CancellationToken, Task> _handler; private int _callCount; public int CallCount => _callCount; public FakeClaudeProcess( - Func, CancellationToken, Task>? handler = null) + Func, CancellationToken, Task>? handler = null) { - _handler = handler ?? ((_, _, _, _, _, _) => + _handler = handler ?? ((_, _, _, _, _) => Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); } - public async Task RunAsync(string prompt, string workingDirectory, string logPath, string taskId, + public async Task RunAsync(string arguments, string prompt, string workingDirectory, Func onStdoutLine, CancellationToken ct) { Interlocked.Increment(ref _callCount); - return await _handler(prompt, workingDirectory, logPath, taskId, onStdoutLine, ct); + return await _handler(prompt, workingDirectory, arguments, onStdoutLine, ct); } }