using System.Text; using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Runner.Interfaces; using Microsoft.EntityFrameworkCore; namespace ClaudeDo.Worker.Planning; public sealed class InteractiveSessionService { private readonly IDbContextFactory _dbFactory; private readonly WorkerConfig _cfg; private readonly HubBroadcaster _broadcaster; private readonly LiveSessionRegistry _registry; private readonly ILoggerFactory _loggerFactory; // Optional factory for tests. Signature: (onLine) -> (session, waitForExitTask). // The waitForExitTask completes when the underlying process has exited. private readonly Func, Func, (ILiveSession session, Task exitTask)>? _sessionFactory; public InteractiveSessionService( IDbContextFactory dbFactory, WorkerConfig cfg, HubBroadcaster broadcaster, LiveSessionRegistry registry, ILoggerFactory loggerFactory, Func, Func, (ILiveSession session, Task exitTask)>? sessionFactory = null) { _dbFactory = dbFactory; _cfg = cfg; _broadcaster = broadcaster; _registry = registry; _loggerFactory = loggerFactory; _sessionFactory = sessionFactory; } public async Task StartAsync(string taskId, CancellationToken ct) { if (_registry.TryGet(taskId, out _)) throw new InvalidOperationException("An interactive session is already running for this task."); await using var ctx = _dbFactory.CreateDbContext(); var tasks = new TaskRepository(ctx); var lists = new ListRepository(ctx); var task = await tasks.GetByIdAsync(taskId, ct) ?? throw new InvalidOperationException($"Task {taskId} not found."); var list = await lists.GetByIdAsync(task.ListId, ct) ?? throw new InvalidOperationException($"List {task.ListId} not found."); var workingDir = list.WorkingDir; if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir)) throw new InvalidOperationException( $"List '{list.Name}' has no valid working directory configured."); var seededPrompt = BuildInteractivePrompt(task); var args = new[] { "-p", "--input-format", "stream-json", "--output-format", "stream-json", "--verbose", "--replay-user-messages", "--model", ModelRegistry.PlanningAlias, "--permission-mode", "auto", }; Func onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line); ILiveSession session; Task exitTask; if (_sessionFactory is not null) { // Factory is responsible for providing a ready-to-use session and its exit signal. (session, exitTask) = _sessionFactory(workingDir, args, onLine); } else { var transport = new ProcessClaudeStreamTransport( _cfg, _loggerFactory.CreateLogger()); var streamingSession = new StreamingClaudeSession( transport, onLine, _loggerFactory.CreateLogger(), onQueueChanged: pending => _ = _broadcaster.InteractiveQueueChanged(taskId, pending), onUserMessageSent: text => _ = _broadcaster.InteractiveMessageSent(taskId, text)); await streamingSession.StartAsync(args, workingDir, seededPrompt, ct); session = streamingSession; exitTask = transport.WaitForExitAsync(); } _registry.Register(taskId, session); await _broadcaster.InteractiveSessionStarted(taskId); var logger = _loggerFactory.CreateLogger(); _ = WatchExitAsync(taskId, exitTask, logger); } private async Task WatchExitAsync(string taskId, Task exitTask, ILogger logger) { try { await exitTask; } catch (Exception ex) { logger.LogWarning(ex, "Interactive session exit watcher caught an exception for task {task_id}", taskId); } finally { _registry.Unregister(taskId); try { await _broadcaster.InteractiveSessionEnded(taskId); } catch (Exception ex) { logger.LogWarning(ex, "InteractiveSessionEnded broadcast failed for task {task_id}", taskId); } } } public async Task SendAsync(string taskId, string text, CancellationToken ct) { if (!_registry.TryGet(taskId, out var session)) throw new InvalidOperationException("No interactive session is running for this task."); await session.SendUserMessageAsync(text, ct); } public async Task InterruptAsync(string taskId, CancellationToken ct) { if (_registry.TryGet(taskId, out var session)) await session.InterruptAsync(ct); } public async Task StopAsync(string taskId, CancellationToken ct) { // StopAsync removes from registry and kills the session. // The exit watcher will fire InteractiveSessionEnded once the process exits, // so we don't broadcast here — the watcher is the single authoritative source. await _registry.StopAsync(taskId); } private static string BuildInteractivePrompt(TaskEntity task) { var sb = new StringBuilder(); sb.AppendLine($"# Task: {task.Title}"); if (!string.IsNullOrWhiteSpace(task.Description)) { sb.AppendLine(); sb.AppendLine(task.Description); } return sb.ToString(); } }