feat(worker): persistent streaming Claude session + live session registry

StreamingClaudeSession drives claude --input-format stream-json over a kept-
open stdin: sends user messages, interrupts the in-flight turn via the verified
control_request protocol, and tracks turn state from result events (treating an
interrupt-aborted error_during_execution result as turn-ended). IClaudeStreamTransport
abstracts the process I/O so it is unit-tested with a fake (no real claude).
LiveSessionRegistry maps taskId -> live session for the hub to route into.

Backs the upcoming in-app interactive sessions; autonomous task execution untouched.
This commit is contained in:
Mika Kuns
2026-06-26 08:56:19 +02:00
parent 10342bc562
commit d8a043fae7
8 changed files with 623 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
namespace ClaudeDo.Worker.Runner.Interfaces;
public interface IClaudeStreamTransport : IAsyncDisposable
{
Task StartAsync(IReadOnlyList<string> args, string workingDirectory, CancellationToken ct);
Task WriteLineAsync(string jsonLine, CancellationToken ct);
event Func<string, Task>? LineReceived;
event Func<string, Task>? StderrReceived;
void Kill();
Task WaitForExitAsync();
}

View File

@@ -0,0 +1,9 @@
namespace ClaudeDo.Worker.Runner.Interfaces;
public interface ILiveSession : IAsyncDisposable
{
bool IsTurnInFlight { get; }
Task SendUserMessageAsync(string text, CancellationToken ct);
Task InterruptAsync(CancellationToken ct);
Task StopAsync();
}

View File

@@ -0,0 +1,43 @@
using System.Collections.Concurrent;
using ClaudeDo.Worker.Runner.Interfaces;
namespace ClaudeDo.Worker.Runner;
// Singleton in-memory registry of active live streaming sessions.
// A session's lifetime matches its associated task run; dead entries are removed by the runner.
public sealed class LiveSessionRegistry
{
private readonly ConcurrentDictionary<string, ILiveSession> _sessions = new();
public void Register(string taskId, ILiveSession session)
{
if (_sessions.TryRemove(taskId, out var existing))
{
// Best-effort stop of the replaced session; don't await to avoid deadlock risk.
_ = existing.StopAsync().ContinueWith(t =>
{
if (t.IsFaulted) { /* swallow — old session is already orphaned */ }
}, TaskScheduler.Default);
}
_sessions[taskId] = session;
}
public bool TryGet(string taskId, out ILiveSession session)
{
if (_sessions.TryGetValue(taskId, out var s))
{
session = s;
return true;
}
session = null!;
return false;
}
public void Unregister(string taskId) => _sessions.TryRemove(taskId, out _);
public async Task StopAsync(string taskId)
{
if (_sessions.TryRemove(taskId, out var session))
await session.StopAsync();
}
}

View File

@@ -0,0 +1,110 @@
using System.Diagnostics;
using System.Text;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Runner.Interfaces;
namespace ClaudeDo.Worker.Runner;
public sealed class ProcessClaudeStreamTransport : IClaudeStreamTransport
{
private readonly WorkerConfig _cfg;
private readonly ILogger<ProcessClaudeStreamTransport> _logger;
private Process? _process;
private Task? _stdoutTask;
private Task? _stderrTask;
public event Func<string, Task>? LineReceived;
public event Func<string, Task>? StderrReceived;
public ProcessClaudeStreamTransport(WorkerConfig cfg, ILogger<ProcessClaudeStreamTransport> logger)
{
_cfg = cfg;
_logger = logger;
}
public Task StartAsync(IReadOnlyList<string> args, string workingDirectory, CancellationToken ct)
{
var psi = new ProcessStartInfo
{
FileName = _cfg.ClaudeBin,
WorkingDirectory = workingDirectory,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
StandardOutputEncoding = Encoding.UTF8,
StandardErrorEncoding = Encoding.UTF8,
};
foreach (var arg in args)
psi.ArgumentList.Add(arg);
psi.Environment["MCP_TOOL_TIMEOUT"] = "200000";
_process = new Process { StartInfo = psi };
_process.Start();
// Keep stdin open — turns are driven by WriteLineAsync calls.
_process.StandardInput.AutoFlush = false;
_stdoutTask = Task.Run(async () =>
{
while (await _process.StandardOutput.ReadLineAsync() is { } line)
{
if (string.IsNullOrEmpty(line)) continue;
var handler = LineReceived;
if (handler is not null)
{
try { await handler(line); }
catch (Exception ex) { _logger.LogWarning(ex, "LineReceived handler threw"); }
}
}
}, CancellationToken.None);
_stderrTask = Task.Run(async () =>
{
while (await _process.StandardError.ReadLineAsync() is { } line)
{
if (string.IsNullOrEmpty(line)) continue;
var handler = StderrReceived;
if (handler is not null)
{
try { await handler(line); }
catch (Exception ex) { _logger.LogWarning(ex, "StderrReceived handler threw"); }
}
}
}, CancellationToken.None);
return Task.CompletedTask;
}
public async Task WriteLineAsync(string jsonLine, CancellationToken ct)
{
if (_process is null) throw new InvalidOperationException("Transport not started.");
await _process.StandardInput.WriteAsync((jsonLine + "\n").AsMemory(), ct);
await _process.StandardInput.FlushAsync(ct);
}
public void Kill()
{
try { _process?.Kill(entireProcessTree: true); }
catch { /* already exited */ }
}
public async Task WaitForExitAsync()
{
if (_process is not null)
await _process.WaitForExitAsync(CancellationToken.None);
if (_stdoutTask is not null) await _stdoutTask;
if (_stderrTask is not null) await _stderrTask;
}
public async ValueTask DisposeAsync()
{
Kill();
await WaitForExitAsync();
_process?.Dispose();
}
}

View File

@@ -0,0 +1,138 @@
using System.Text.Json;
using ClaudeDo.Worker.Runner.Interfaces;
namespace ClaudeDo.Worker.Runner;
public sealed class StreamingClaudeSession : ILiveSession
{
private readonly IClaudeStreamTransport _transport;
private readonly Func<string, Task> _onLine;
private readonly ILogger<StreamingClaudeSession> _logger;
private readonly SemaphoreSlim _sendLock = new(1, 1);
private volatile bool _isTurnInFlight;
private TaskCompletionSource<bool> _turnTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
public bool IsTurnInFlight => _isTurnInFlight;
public StreamingClaudeSession(
IClaudeStreamTransport transport,
Func<string, Task> onLine,
ILogger<StreamingClaudeSession> logger)
{
_transport = transport;
_onLine = onLine;
_logger = logger;
}
public async Task StartAsync(
IReadOnlyList<string> args,
string workingDirectory,
string firstPrompt,
CancellationToken ct)
{
_transport.LineReceived += HandleLineAsync;
await _transport.StartAsync(args, workingDirectory, ct);
await SendTurnAsync(firstPrompt, ct);
}
private async Task HandleLineAsync(string line)
{
try { await _onLine(line); }
catch (Exception ex) { _logger.LogWarning(ex, "onLine callback threw"); }
try
{
using var doc = JsonDocument.Parse(line);
if (doc.RootElement.TryGetProperty("type", out var typeProp)
&& typeProp.GetString() == "result")
{
_isTurnInFlight = false;
_turnTcs.TrySetResult(true);
}
}
catch { /* unparseable line — ignore */ }
}
public async Task SendUserMessageAsync(string text, CancellationToken ct)
{
await _sendLock.WaitAsync(ct);
try
{
if (_isTurnInFlight)
{
await InterruptInternalAsync(ct);
// Wait for the current turn to end (interrupt-aborted result), with timeout.
var turnDone = _turnTcs.Task;
var timeout = Task.Delay(TimeSpan.FromSeconds(30), ct);
var winner = await Task.WhenAny(turnDone, timeout);
if (winner == timeout)
_logger.LogWarning("Timed out waiting for turn to end after interrupt; proceeding anyway.");
}
await SendTurnAsync(text, ct);
}
finally
{
_sendLock.Release();
}
}
public async Task InterruptAsync(CancellationToken ct)
{
await _sendLock.WaitAsync(ct);
try { await InterruptInternalAsync(ct); }
finally { _sendLock.Release(); }
}
private async Task InterruptInternalAsync(CancellationToken ct)
{
var requestId = Guid.NewGuid().ToString();
var payload = JsonSerializer.Serialize(new
{
type = "control_request",
request_id = requestId,
request = new { subtype = "interrupt" }
});
try { await _transport.WriteLineAsync(payload, ct); }
catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); }
}
private async Task SendTurnAsync(string text, CancellationToken ct)
{
// Reset the TCS for this new turn.
_turnTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_isTurnInFlight = true;
var payload = JsonSerializer.Serialize(new
{
type = "user",
message = new
{
role = "user",
content = new[]
{
new { type = "text", text }
}
},
parent_tool_use_id = (string?)null
});
await _transport.WriteLineAsync(payload, ct);
}
public async Task StopAsync()
{
_transport.Kill();
await _transport.WaitForExitAsync();
}
public async ValueTask DisposeAsync()
{
await StopAsync();
await _transport.DisposeAsync();
_sendLock.Dispose();
}
}