From d8a043fae7e13dda4325700e00a57533a8c3851a Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Fri, 26 Jun 2026 08:56:19 +0200 Subject: [PATCH] feat(worker): persistent streaming Claude session + live session registry StreamingClaudeSession drives claude --input-format stream-json over a kept- open stdin: sends user messages, interrupts the in-flight turn via the verified control_request protocol, and tracks turn state from result events (treating an interrupt-aborted error_during_execution result as turn-ended). IClaudeStreamTransport abstracts the process I/O so it is unit-tested with a fake (no real claude). LiveSessionRegistry maps taskId -> live session for the hub to route into. Backs the upcoming in-app interactive sessions; autonomous task execution untouched. --- .../Interfaces/IClaudeStreamTransport.cs | 11 ++ .../Runner/Interfaces/ILiveSession.cs | 9 + .../Runner/LiveSessionRegistry.cs | 43 +++++ .../Runner/ProcessClaudeStreamTransport.cs | 110 +++++++++++ .../Runner/StreamingClaudeSession.cs | 138 ++++++++++++++ .../FakeClaudeStreamTransport.cs | 47 +++++ .../Runner/LiveSessionRegistryTests.cs | 92 ++++++++++ .../Runner/StreamingClaudeSessionTests.cs | 173 ++++++++++++++++++ 8 files changed, 623 insertions(+) create mode 100644 src/ClaudeDo.Worker/Runner/Interfaces/IClaudeStreamTransport.cs create mode 100644 src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs create mode 100644 src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs create mode 100644 src/ClaudeDo.Worker/Runner/ProcessClaudeStreamTransport.cs create mode 100644 src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Infrastructure/FakeClaudeStreamTransport.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs diff --git a/src/ClaudeDo.Worker/Runner/Interfaces/IClaudeStreamTransport.cs b/src/ClaudeDo.Worker/Runner/Interfaces/IClaudeStreamTransport.cs new file mode 100644 index 0000000..94c61ac --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/Interfaces/IClaudeStreamTransport.cs @@ -0,0 +1,11 @@ +namespace ClaudeDo.Worker.Runner.Interfaces; + +public interface IClaudeStreamTransport : IAsyncDisposable +{ + Task StartAsync(IReadOnlyList args, string workingDirectory, CancellationToken ct); + Task WriteLineAsync(string jsonLine, CancellationToken ct); + event Func? LineReceived; + event Func? StderrReceived; + void Kill(); + Task WaitForExitAsync(); +} diff --git a/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs new file mode 100644 index 0000000..c7fb538 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs @@ -0,0 +1,9 @@ +namespace ClaudeDo.Worker.Runner.Interfaces; + +public interface ILiveSession : IAsyncDisposable +{ + bool IsTurnInFlight { get; } + Task SendUserMessageAsync(string text, CancellationToken ct); + Task InterruptAsync(CancellationToken ct); + Task StopAsync(); +} diff --git a/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs b/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs new file mode 100644 index 0000000..60d4376 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs @@ -0,0 +1,43 @@ +using System.Collections.Concurrent; +using ClaudeDo.Worker.Runner.Interfaces; + +namespace ClaudeDo.Worker.Runner; + +// Singleton in-memory registry of active live streaming sessions. +// A session's lifetime matches its associated task run; dead entries are removed by the runner. +public sealed class LiveSessionRegistry +{ + private readonly ConcurrentDictionary _sessions = new(); + + public void Register(string taskId, ILiveSession session) + { + if (_sessions.TryRemove(taskId, out var existing)) + { + // Best-effort stop of the replaced session; don't await to avoid deadlock risk. + _ = existing.StopAsync().ContinueWith(t => + { + if (t.IsFaulted) { /* swallow — old session is already orphaned */ } + }, TaskScheduler.Default); + } + _sessions[taskId] = session; + } + + public bool TryGet(string taskId, out ILiveSession session) + { + if (_sessions.TryGetValue(taskId, out var s)) + { + session = s; + return true; + } + session = null!; + return false; + } + + public void Unregister(string taskId) => _sessions.TryRemove(taskId, out _); + + public async Task StopAsync(string taskId) + { + if (_sessions.TryRemove(taskId, out var session)) + await session.StopAsync(); + } +} diff --git a/src/ClaudeDo.Worker/Runner/ProcessClaudeStreamTransport.cs b/src/ClaudeDo.Worker/Runner/ProcessClaudeStreamTransport.cs new file mode 100644 index 0000000..ae888ee --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/ProcessClaudeStreamTransport.cs @@ -0,0 +1,110 @@ +using System.Diagnostics; +using System.Text; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Runner.Interfaces; + +namespace ClaudeDo.Worker.Runner; + +public sealed class ProcessClaudeStreamTransport : IClaudeStreamTransport +{ + private readonly WorkerConfig _cfg; + private readonly ILogger _logger; + + private Process? _process; + private Task? _stdoutTask; + private Task? _stderrTask; + + public event Func? LineReceived; + public event Func? StderrReceived; + + public ProcessClaudeStreamTransport(WorkerConfig cfg, ILogger logger) + { + _cfg = cfg; + _logger = logger; + } + + public Task StartAsync(IReadOnlyList args, string workingDirectory, CancellationToken ct) + { + var psi = new ProcessStartInfo + { + FileName = _cfg.ClaudeBin, + WorkingDirectory = workingDirectory, + RedirectStandardInput = true, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true, + StandardOutputEncoding = Encoding.UTF8, + StandardErrorEncoding = Encoding.UTF8, + }; + + foreach (var arg in args) + psi.ArgumentList.Add(arg); + + psi.Environment["MCP_TOOL_TIMEOUT"] = "200000"; + + _process = new Process { StartInfo = psi }; + _process.Start(); + + // Keep stdin open — turns are driven by WriteLineAsync calls. + _process.StandardInput.AutoFlush = false; + + _stdoutTask = Task.Run(async () => + { + while (await _process.StandardOutput.ReadLineAsync() is { } line) + { + if (string.IsNullOrEmpty(line)) continue; + var handler = LineReceived; + if (handler is not null) + { + try { await handler(line); } + catch (Exception ex) { _logger.LogWarning(ex, "LineReceived handler threw"); } + } + } + }, CancellationToken.None); + + _stderrTask = Task.Run(async () => + { + while (await _process.StandardError.ReadLineAsync() is { } line) + { + if (string.IsNullOrEmpty(line)) continue; + var handler = StderrReceived; + if (handler is not null) + { + try { await handler(line); } + catch (Exception ex) { _logger.LogWarning(ex, "StderrReceived handler threw"); } + } + } + }, CancellationToken.None); + + return Task.CompletedTask; + } + + public async Task WriteLineAsync(string jsonLine, CancellationToken ct) + { + if (_process is null) throw new InvalidOperationException("Transport not started."); + await _process.StandardInput.WriteAsync((jsonLine + "\n").AsMemory(), ct); + await _process.StandardInput.FlushAsync(ct); + } + + public void Kill() + { + try { _process?.Kill(entireProcessTree: true); } + catch { /* already exited */ } + } + + public async Task WaitForExitAsync() + { + if (_process is not null) + await _process.WaitForExitAsync(CancellationToken.None); + if (_stdoutTask is not null) await _stdoutTask; + if (_stderrTask is not null) await _stderrTask; + } + + public async ValueTask DisposeAsync() + { + Kill(); + await WaitForExitAsync(); + _process?.Dispose(); + } +} diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs new file mode 100644 index 0000000..8a879d4 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs @@ -0,0 +1,138 @@ +using System.Text.Json; +using ClaudeDo.Worker.Runner.Interfaces; + +namespace ClaudeDo.Worker.Runner; + +public sealed class StreamingClaudeSession : ILiveSession +{ + private readonly IClaudeStreamTransport _transport; + private readonly Func _onLine; + private readonly ILogger _logger; + + private readonly SemaphoreSlim _sendLock = new(1, 1); + private volatile bool _isTurnInFlight; + private TaskCompletionSource _turnTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public bool IsTurnInFlight => _isTurnInFlight; + + public StreamingClaudeSession( + IClaudeStreamTransport transport, + Func onLine, + ILogger logger) + { + _transport = transport; + _onLine = onLine; + _logger = logger; + } + + public async Task StartAsync( + IReadOnlyList args, + string workingDirectory, + string firstPrompt, + CancellationToken ct) + { + _transport.LineReceived += HandleLineAsync; + await _transport.StartAsync(args, workingDirectory, ct); + await SendTurnAsync(firstPrompt, ct); + } + + private async Task HandleLineAsync(string line) + { + try { await _onLine(line); } + catch (Exception ex) { _logger.LogWarning(ex, "onLine callback threw"); } + + try + { + using var doc = JsonDocument.Parse(line); + if (doc.RootElement.TryGetProperty("type", out var typeProp) + && typeProp.GetString() == "result") + { + _isTurnInFlight = false; + _turnTcs.TrySetResult(true); + } + } + catch { /* unparseable line — ignore */ } + } + + public async Task SendUserMessageAsync(string text, CancellationToken ct) + { + await _sendLock.WaitAsync(ct); + try + { + if (_isTurnInFlight) + { + await InterruptInternalAsync(ct); + + // Wait for the current turn to end (interrupt-aborted result), with timeout. + var turnDone = _turnTcs.Task; + var timeout = Task.Delay(TimeSpan.FromSeconds(30), ct); + var winner = await Task.WhenAny(turnDone, timeout); + if (winner == timeout) + _logger.LogWarning("Timed out waiting for turn to end after interrupt; proceeding anyway."); + } + + await SendTurnAsync(text, ct); + } + finally + { + _sendLock.Release(); + } + } + + public async Task InterruptAsync(CancellationToken ct) + { + await _sendLock.WaitAsync(ct); + try { await InterruptInternalAsync(ct); } + finally { _sendLock.Release(); } + } + + private async Task InterruptInternalAsync(CancellationToken ct) + { + var requestId = Guid.NewGuid().ToString(); + var payload = JsonSerializer.Serialize(new + { + type = "control_request", + request_id = requestId, + request = new { subtype = "interrupt" } + }); + + try { await _transport.WriteLineAsync(payload, ct); } + catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); } + } + + private async Task SendTurnAsync(string text, CancellationToken ct) + { + // Reset the TCS for this new turn. + _turnTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _isTurnInFlight = true; + + var payload = JsonSerializer.Serialize(new + { + type = "user", + message = new + { + role = "user", + content = new[] + { + new { type = "text", text } + } + }, + parent_tool_use_id = (string?)null + }); + + await _transport.WriteLineAsync(payload, ct); + } + + public async Task StopAsync() + { + _transport.Kill(); + await _transport.WaitForExitAsync(); + } + + public async ValueTask DisposeAsync() + { + await StopAsync(); + await _transport.DisposeAsync(); + _sendLock.Dispose(); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeClaudeStreamTransport.cs b/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeClaudeStreamTransport.cs new file mode 100644 index 0000000..445c099 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeClaudeStreamTransport.cs @@ -0,0 +1,47 @@ +using ClaudeDo.Worker.Runner.Interfaces; + +namespace ClaudeDo.Worker.Tests.Infrastructure; + +public sealed class FakeClaudeStreamTransport : IClaudeStreamTransport +{ + public List Written { get; } = []; + public bool Killed { get; private set; } + public bool Started { get; private set; } + + public event Func? LineReceived; + public event Func? StderrReceived; + + public Task StartAsync(IReadOnlyList args, string workingDirectory, CancellationToken ct) + { + Started = true; + return Task.CompletedTask; + } + + public Task WriteLineAsync(string jsonLine, CancellationToken ct) + { + Written.Add(jsonLine); + return Task.CompletedTask; + } + + public void Kill() => Killed = true; + + public Task WaitForExitAsync() => Task.CompletedTask; + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + + // Test helper: push a simulated stdout line to all LineReceived subscribers. + public async Task PushLineAsync(string line) + { + var handler = LineReceived; + if (handler is not null) + await handler(line); + } + + // Test helper: push a simulated stderr line. + public async Task PushStderrAsync(string line) + { + var handler = StderrReceived; + if (handler is not null) + await handler(line); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs new file mode 100644 index 0000000..b028bd0 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs @@ -0,0 +1,92 @@ +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Runner.Interfaces; + +namespace ClaudeDo.Worker.Tests.Runner; + +public sealed class LiveSessionRegistryTests +{ + private sealed class FakeLiveSession : ILiveSession + { + public bool StopCalled { get; private set; } + public bool IsTurnInFlight => false; + + public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask; + public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask; + + public Task StopAsync() + { + StopCalled = true; + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + [Fact] + public void Register_ThenTryGet_ReturnsSession() + { + var registry = new LiveSessionRegistry(); + var session = new FakeLiveSession(); + + registry.Register("task-1", session); + + Assert.True(registry.TryGet("task-1", out var retrieved)); + Assert.Same(session, retrieved); + } + + [Fact] + public void TryGet_Missing_ReturnsFalse() + { + var registry = new LiveSessionRegistry(); + + Assert.False(registry.TryGet("no-such-task", out _)); + } + + [Fact] + public void Unregister_RemovesSession() + { + var registry = new LiveSessionRegistry(); + registry.Register("task-1", new FakeLiveSession()); + registry.Unregister("task-1"); + + Assert.False(registry.TryGet("task-1", out _)); + } + + [Fact] + public async Task Register_WhenSessionAlreadyExists_StopsPreviousSession() + { + var registry = new LiveSessionRegistry(); + var first = new FakeLiveSession(); + var second = new FakeLiveSession(); + + registry.Register("task-1", first); + registry.Register("task-1", second); + + // Give the fire-and-forget stop a tick to run. + await Task.Delay(50); + + Assert.True(first.StopCalled); + Assert.True(registry.TryGet("task-1", out var retrieved)); + Assert.Same(second, retrieved); + } + + [Fact] + public async Task StopAsync_StopsAndRemovesSession() + { + var registry = new LiveSessionRegistry(); + var session = new FakeLiveSession(); + registry.Register("task-1", session); + + await registry.StopAsync("task-1"); + + Assert.True(session.StopCalled); + Assert.False(registry.TryGet("task-1", out _)); + } + + [Fact] + public async Task StopAsync_MissingTask_DoesNotThrow() + { + var registry = new LiveSessionRegistry(); + await registry.StopAsync("no-such-task"); // should not throw + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs new file mode 100644 index 0000000..73b1a1e --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs @@ -0,0 +1,173 @@ +using System.Text.Json; +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.Extensions.Logging.Abstractions; + +namespace ClaudeDo.Worker.Tests.Runner; + +public sealed class StreamingClaudeSessionTests +{ + private static StreamingClaudeSession Build( + FakeClaudeStreamTransport transport, + List received) + { + return new StreamingClaudeSession( + transport, + line => { received.Add(line); return Task.CompletedTask; }, + NullLogger.Instance); + } + + private static string ResultLine(bool isError = false, string subtype = "success") => + JsonSerializer.Serialize(new { type = "result", is_error = isError, subtype }); + + private static string UserMessageLine(string text) => + JsonSerializer.Serialize(new + { + type = "user", + message = new { role = "user", content = new[] { new { type = "text", text } } }, + parent_tool_use_id = (string?)null + }); + + // ---- Start sends first prompt as user-message, IsTurnInFlight = true ---- + + [Fact] + public async Task Start_SendsFirstPromptAsUserMessage_AndTurnIsInFlight() + { + var transport = new FakeClaudeStreamTransport(); + var received = new List(); + var session = Build(transport, received); + + await session.StartAsync([], "/tmp", "hello world", CancellationToken.None); + + Assert.True(session.IsTurnInFlight); + Assert.Single(transport.Written); + + using var doc = JsonDocument.Parse(transport.Written[0]); + var root = doc.RootElement; + Assert.Equal("user", root.GetProperty("type").GetString()); + var text = root.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); + Assert.Equal("hello world", text); + + await session.DisposeAsync(); + } + + // ---- Pushing a result line flips IsTurnInFlight to false ---- + + [Fact] + public async Task PushingResultLine_FlipsIsTurnInFlightToFalse() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); + Assert.True(session.IsTurnInFlight); + + await transport.PushLineAsync(ResultLine()); + + Assert.False(session.IsTurnInFlight); + + await session.DisposeAsync(); + } + + // ---- Sending while in-flight: interrupt first, then user message after result ---- + + [Fact] + public async Task SendWhileInFlight_WritesInterruptFirst_ThenUserMessage() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + // Written[0] = first user message. Turn is in flight. + Assert.True(session.IsTurnInFlight); + + // Fire the second send on a background task (it will block waiting for the turn to end). + var sendTask = Task.Run(() => session.SendUserMessageAsync("second", CancellationToken.None)); + + // Give the background task time to reach the await-turn-ended point. + await Task.Delay(50); + + // Push a result line to unblock it. + await transport.PushLineAsync(ResultLine()); + + await sendTask; + + // Written[0] = first prompt, Written[1] = interrupt, Written[2] = second user message. + Assert.True(transport.Written.Count >= 3, $"Expected ≥3 writes, got {transport.Written.Count}"); + + // Written[1] must be an interrupt control_request. + using var interruptDoc = JsonDocument.Parse(transport.Written[1]); + Assert.Equal("control_request", interruptDoc.RootElement.GetProperty("type").GetString()); + Assert.Equal("interrupt", interruptDoc.RootElement.GetProperty("request").GetProperty("subtype").GetString()); + + // Last write must be the user message with "second". + using var userDoc = JsonDocument.Parse(transport.Written[^1]); + Assert.Equal("user", userDoc.RootElement.GetProperty("type").GetString()); + var text = userDoc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); + Assert.Equal("second", text); + + await session.DisposeAsync(); + } + + // ---- Sending while idle writes user message with no interrupt ---- + + [Fact] + public async Task SendWhileIdle_WritesUserMessageWithNoInterrupt() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await transport.PushLineAsync(ResultLine()); // end the turn → idle + + Assert.False(session.IsTurnInFlight); + var countBefore = transport.Written.Count; + + await session.SendUserMessageAsync("second", CancellationToken.None); + + // Exactly one new write, no interrupt. + Assert.Equal(countBefore + 1, transport.Written.Count); + using var doc = JsonDocument.Parse(transport.Written[^1]); + Assert.Equal("user", doc.RootElement.GetProperty("type").GetString()); + + await session.DisposeAsync(); + } + + // ---- Result with is_error / error_during_execution still ends the turn ---- + + [Fact] + public async Task ResultWithIsError_StillEndsTurn_NoThrow() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); + Assert.True(session.IsTurnInFlight); + + await transport.PushLineAsync(ResultLine(isError: true, subtype: "error_during_execution")); + + Assert.False(session.IsTurnInFlight); + + await session.DisposeAsync(); + } + + // ---- onLine receives every pushed stdout line ---- + + [Fact] + public async Task OnLine_ReceivesEveryPushedLine() + { + var transport = new FakeClaudeStreamTransport(); + var received = new List(); + var session = Build(transport, received); + + await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); + + var lines = new[] { "{\"type\":\"assistant\"}", "{\"type\":\"stream_event\"}", ResultLine() }; + foreach (var l in lines) + await transport.PushLineAsync(l); + + Assert.Equal(lines, received); + + await session.DisposeAsync(); + } +}