diff --git a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs index 0bbb8f8..a311f7d 100644 --- a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs +++ b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs @@ -77,4 +77,10 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster Task IRefineBroadcaster.RefineStartedAsync(string taskId) => RefineStarted(taskId); Task IRefineBroadcaster.RefineFinishedAsync(string taskId, bool success, string? error) => RefineFinished(taskId, success, error); + + public Task InteractiveSessionStarted(string taskId) => + _hub.Clients.All.SendAsync("InteractiveSessionStarted", taskId); + + public Task InteractiveSessionEnded(string taskId) => + _hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId); } diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index 2864fb8..7e8a592 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -9,6 +9,7 @@ using ClaudeDo.Worker.Lifecycle; using ClaudeDo.Worker.Logging; using ClaudeDo.Worker.Online; using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Prime; using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Refine; @@ -116,6 +117,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub private readonly OnlineInboxConfig _onlineInboxConfig; private readonly OnlineTokenStore _onlineTokenStore; private readonly Runner.PendingQuestionRegistry _pendingQuestions; + private readonly InteractiveSessionService _interactive; private readonly LogRingBuffer? _logBuffer; public WorkerHub( @@ -142,6 +144,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub OnlineInboxConfig onlineInboxConfig, OnlineTokenStore onlineTokenStore, Runner.PendingQuestionRegistry pendingQuestions, + InteractiveSessionService interactive, LogRingBuffer? logBuffer = null) { _queue = queue; @@ -167,6 +170,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub _onlineInboxConfig = onlineInboxConfig; _onlineTokenStore = onlineTokenStore; _pendingQuestions = pendingQuestions; + _interactive = interactive; _logBuffer = logBuffer; } @@ -568,11 +572,14 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub return ctx; } - public async Task OpenInteractiveTerminalAsync(string taskId) - { - var ctx = await _planning.OpenInteractiveAsync(taskId, Context.ConnectionAborted); - await _launcher.LaunchInteractiveAsync(ctx, Context.ConnectionAborted); - } + public Task OpenInteractiveTerminalAsync(string taskId) => + _interactive.StartAsync(taskId, Context.ConnectionAborted); + + public Task SendInteractiveMessage(string taskId, string text) => + _interactive.SendAsync(taskId, text, Context.ConnectionAborted); + + public Task StopInteractiveSession(string taskId) => + _interactive.StopAsync(taskId, Context.ConnectionAborted); public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false) { diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs new file mode 100644 index 0000000..3d5b6f7 --- /dev/null +++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs @@ -0,0 +1,147 @@ +using System.Text; +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Runner.Interfaces; +using Microsoft.EntityFrameworkCore; + +namespace ClaudeDo.Worker.Planning; + +public sealed class InteractiveSessionService +{ + private readonly IDbContextFactory _dbFactory; + private readonly WorkerConfig _cfg; + private readonly HubBroadcaster _broadcaster; + private readonly LiveSessionRegistry _registry; + private readonly ILoggerFactory _loggerFactory; + + // Optional factory for tests. Signature: (onLine) -> (session, waitForExitTask). + // The waitForExitTask completes when the underlying process has exited. + private readonly Func, Func, (ILiveSession session, Task exitTask)>? _sessionFactory; + + public InteractiveSessionService( + IDbContextFactory dbFactory, + WorkerConfig cfg, + HubBroadcaster broadcaster, + LiveSessionRegistry registry, + ILoggerFactory loggerFactory, + Func, Func, (ILiveSession session, Task exitTask)>? sessionFactory = null) + { + _dbFactory = dbFactory; + _cfg = cfg; + _broadcaster = broadcaster; + _registry = registry; + _loggerFactory = loggerFactory; + _sessionFactory = sessionFactory; + } + + public async Task StartAsync(string taskId, CancellationToken ct) + { + if (_registry.TryGet(taskId, out _)) + throw new InvalidOperationException("An interactive session is already running for this task."); + + await using var ctx = _dbFactory.CreateDbContext(); + var tasks = new TaskRepository(ctx); + var lists = new ListRepository(ctx); + + var task = await tasks.GetByIdAsync(taskId, ct) + ?? throw new InvalidOperationException($"Task {taskId} not found."); + var list = await lists.GetByIdAsync(task.ListId, ct) + ?? throw new InvalidOperationException($"List {task.ListId} not found."); + + var workingDir = list.WorkingDir; + if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir)) + throw new InvalidOperationException( + $"List '{list.Name}' has no valid working directory configured."); + + var seededPrompt = BuildInteractivePrompt(task); + + var args = new[] + { + "-p", + "--input-format", "stream-json", + "--output-format", "stream-json", + "--verbose", + "--replay-user-messages", + "--model", ModelRegistry.PlanningAlias, + "--permission-mode", "auto", + }; + + Func onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line); + + ILiveSession session; + Task exitTask; + + if (_sessionFactory is not null) + { + // Factory is responsible for providing a ready-to-use session and its exit signal. + (session, exitTask) = _sessionFactory(workingDir, args, onLine); + } + else + { + var transport = new ProcessClaudeStreamTransport( + _cfg, + _loggerFactory.CreateLogger()); + var streamingSession = new StreamingClaudeSession( + transport, + onLine, + _loggerFactory.CreateLogger()); + await streamingSession.StartAsync(args, workingDir, seededPrompt, ct); + session = streamingSession; + exitTask = transport.WaitForExitAsync(); + } + _registry.Register(taskId, session); + await _broadcaster.InteractiveSessionStarted(taskId); + + var logger = _loggerFactory.CreateLogger(); + _ = WatchExitAsync(taskId, exitTask, logger); + } + + private async Task WatchExitAsync(string taskId, Task exitTask, ILogger logger) + { + try + { + await exitTask; + } + catch (Exception ex) + { + logger.LogWarning(ex, "Interactive session exit watcher caught an exception for task {task_id}", taskId); + } + finally + { + _registry.Unregister(taskId); + try { await _broadcaster.InteractiveSessionEnded(taskId); } + catch (Exception ex) { logger.LogWarning(ex, "InteractiveSessionEnded broadcast failed for task {task_id}", taskId); } + } + } + + public async Task SendAsync(string taskId, string text, CancellationToken ct) + { + if (!_registry.TryGet(taskId, out var session)) + throw new InvalidOperationException("No interactive session is running for this task."); + await session.SendUserMessageAsync(text, ct); + } + + public async Task StopAsync(string taskId, CancellationToken ct) + { + // StopAsync removes from registry and kills the session. + // The exit watcher will fire InteractiveSessionEnded once the process exits, + // so we don't broadcast here — the watcher is the single authoritative source. + await _registry.StopAsync(taskId); + } + + private static string BuildInteractivePrompt(TaskEntity task) + { + var sb = new StringBuilder(); + sb.AppendLine($"# Task: {task.Title}"); + if (!string.IsNullOrWhiteSpace(task.Description)) + { + sb.AppendLine(); + sb.AppendLine(task.Description); + } + return sb.ToString(); + } +} diff --git a/src/ClaudeDo.Worker/Planning/Interfaces/ITerminalLauncher.cs b/src/ClaudeDo.Worker/Planning/Interfaces/ITerminalLauncher.cs index 2700e07..3b93832 100644 --- a/src/ClaudeDo.Worker/Planning/Interfaces/ITerminalLauncher.cs +++ b/src/ClaudeDo.Worker/Planning/Interfaces/ITerminalLauncher.cs @@ -1,13 +1,12 @@ namespace ClaudeDo.Worker.Planning; -// Launches the Claude CLI in a visible terminal for human-driven sessions: -// planning (start/resume) and the ad-hoc "Run interactively" action. Not used for -// headless task execution (that path is ClaudeProcess, prompt over stdin). +// Launches the Claude CLI in a visible terminal for human-driven planning sessions. +// Not used for headless task execution (that path is ClaudeProcess, prompt over stdin) +// nor for interactive sessions (those use InteractiveSessionService + StreamingClaudeSession). public interface ITerminalLauncher { Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken); Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken); - Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken); } public sealed class TerminalLaunchException : Exception diff --git a/src/ClaudeDo.Worker/Planning/PlanningSessionContext.cs b/src/ClaudeDo.Worker/Planning/PlanningSessionContext.cs index e183665..3bd8628 100644 --- a/src/ClaudeDo.Worker/Planning/PlanningSessionContext.cs +++ b/src/ClaudeDo.Worker/Planning/PlanningSessionContext.cs @@ -5,11 +5,6 @@ public sealed record PlanningSessionFiles( string SystemPromptPath, string InitialPromptPath); -public sealed record InteractiveLaunchContext( - string TaskId, - string WorkingDir, - string InitialPrompt); - public sealed class PlanningMcpContext { public required string ParentTaskId { get; init; } diff --git a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs index 7a56ca3..6dc5864 100644 --- a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs +++ b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs @@ -1,5 +1,4 @@ using System.Security.Cryptography; -using System.Text; using System.Text.Json; using ClaudeDo.Data; using ClaudeDo.Data.Git; @@ -161,40 +160,6 @@ public sealed class PlanningSessionManager Files: files); } - public async Task OpenInteractiveAsync(string taskId, CancellationToken ct) - { - var (tasks, lists, _, ctx) = CreateRepos(); - await using var __ = ctx; - - var task = await tasks.GetByIdAsync(taskId, ct) - ?? throw new InvalidOperationException($"Task {taskId} not found."); - - var list = await lists.GetByIdAsync(task.ListId, ct) - ?? throw new InvalidOperationException($"List {task.ListId} not found."); - - var workingDir = list.WorkingDir; - if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir)) - throw new InvalidOperationException( - $"List '{list.Name}' has no valid working directory configured."); - - return new InteractiveLaunchContext( - TaskId: taskId, - WorkingDir: workingDir, - InitialPrompt: BuildInteractivePrompt(task)); - } - - private static string BuildInteractivePrompt(TaskEntity task) - { - var sb = new StringBuilder(); - sb.AppendLine($"# Task: {task.Title}"); - if (!string.IsNullOrWhiteSpace(task.Description)) - { - sb.AppendLine(); - sb.AppendLine(task.Description); - } - return sb.ToString(); - } - public async Task FinalizeAsync(string taskId, bool queueAgentTasks, CancellationToken ct) { var (tasks, lists, settings, ctx) = CreateRepos(); diff --git a/src/ClaudeDo.Worker/Planning/WindowsTerminalLauncher.cs b/src/ClaudeDo.Worker/Planning/WindowsTerminalLauncher.cs index b8b4e4f..a162869 100644 --- a/src/ClaudeDo.Worker/Planning/WindowsTerminalLauncher.cs +++ b/src/ClaudeDo.Worker/Planning/WindowsTerminalLauncher.cs @@ -74,29 +74,6 @@ public sealed class WindowsTerminalLauncher : ITerminalLauncher return Task.CompletedTask; } - public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken) - { - if (!Directory.Exists(ctx.WorkingDir)) - throw new TerminalLaunchException($"Working directory does not exist: {ctx.WorkingDir}"); - - var resolvedWt = ResolveWtOrThrow(); - var resolvedClaude = ResolveClaudeOrThrow(); - - var command = BuildPwshCommand(resolvedClaude, new[] - { - "--model", Model, - "--permission-mode", "auto", - }, appendPrompt: true); - - StartInWindowsTerminal(resolvedWt, ctx.WorkingDir, command, env => - { - env["MAX_THINKING_TOKENS"] = "20000"; - env[PromptEnvVar] = ctx.InitialPrompt; - }); - - return Task.CompletedTask; - } - public Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken) { if (!Directory.Exists(ctx.WorkingDir)) diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 3c4270e..29f4c48 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -80,6 +80,8 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); // Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker // performs atomic Queued→Running claim. Both injected into the state service so it diff --git a/tests/ClaudeDo.Worker.Tests/Hub/ClearMyDayHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/ClearMyDayHubTests.cs index fbbd52a..7e4ac85 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/ClearMyDayHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/ClearMyDayHubTests.cs @@ -21,7 +21,7 @@ public sealed class ClearMyDayHubTests : IDisposable null!, null!, null!, null!, broadcaster, _db.CreateFactory(), null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), - new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); + new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Context = new FakeHubCallerContext(); return hub; diff --git a/tests/ClaudeDo.Worker.Tests/Hub/OnlineInboxHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/OnlineInboxHubTests.cs index 9d7c406..c654fd1 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/OnlineInboxHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/OnlineInboxHubTests.cs @@ -31,7 +31,7 @@ public sealed class OnlineInboxHubTests : IDisposable var hub = new WorkerHub( null!, null!, null!, null!, broadcaster, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, - cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); + cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Context = new FakeHubCallerContext(); return (hub, inboxCfg, store); diff --git a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs index eca57bd..ff4c6b5 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs @@ -57,7 +57,7 @@ public sealed class PlanningHubTests : IDisposable null!, null!, null!, null!, null!, null!, null!, null!, null!, _planning, _launcher, null!, null!, null!, null!, null!, null!, null!, null!, null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), - new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); + new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!); hub.Clients = new FakeHubCallerClients(_proxy); hub.Context = new FakeHubCallerContext(); return hub; @@ -179,7 +179,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher public bool ShouldThrow { get; set; } public int LaunchStartCalls { get; private set; } public int LaunchResumeCalls { get; private set; } - public int LaunchInteractiveCalls { get; private set; } public Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken) { @@ -193,13 +192,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher LaunchResumeCalls++; return Task.CompletedTask; } - - public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken) - { - if (ShouldThrow) throw new TerminalLaunchException("fake launch failure"); - LaunchInteractiveCalls++; - return Task.CompletedTask; - } } internal sealed class RecordingClientProxy : IClientProxy diff --git a/tests/ClaudeDo.Worker.Tests/Hub/WorktreeStateHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/WorktreeStateHubTests.cs index a46be04..ee80390 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/WorktreeStateHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/WorktreeStateHubTests.cs @@ -21,7 +21,7 @@ public sealed class WorktreeStateHubTests : IDisposable null!, null!, null!, null!, broadcaster, _db.CreateFactory(), null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), - new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); + new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Context = new FakeHubCallerContext(); return hub; diff --git a/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs new file mode 100644 index 0000000..80da00a --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs @@ -0,0 +1,301 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Runner.Interfaces; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.Extensions.Logging.Abstractions; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Planning; + +public sealed class InteractiveSessionServiceTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly ClaudeDoDbContext _ctx; + private readonly TaskRepository _tasks; + private readonly ListRepository _lists; + private readonly CapturingHubContext _hubCtx; + private readonly HubBroadcaster _broadcaster; + private readonly LiveSessionRegistry _registry; + private readonly WorkerConfig _cfg; + + public InteractiveSessionServiceTests() + { + _ctx = _db.CreateContext(); + _tasks = new TaskRepository(_ctx); + _lists = new ListRepository(_ctx); + _hubCtx = new CapturingHubContext(); + _broadcaster = new HubBroadcaster(_hubCtx); + _registry = new LiveSessionRegistry(); + _cfg = new WorkerConfig(); + } + + public void Dispose() + { + _ctx.Dispose(); + _db.Dispose(); + } + + private InteractiveSessionService CreateService( + Func, Func, (ILiveSession session, Task exitTask)>? factory = null) + { + return new InteractiveSessionService( + _db.CreateFactory(), + _cfg, + _broadcaster, + _registry, + NullLoggerFactory.Instance, + factory); + } + + private async Task<(string listId, string taskId, string workingDir)> SeedAsync() + { + var wd = Path.Combine(Path.GetTempPath(), $"iss_wd_{Guid.NewGuid():N}"); + Directory.CreateDirectory(wd); + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity + { + Id = listId, + Name = "L", + WorkingDir = wd, + CreatedAt = DateTime.UtcNow, + }); + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "My task", + Description = "Do the thing", + Status = TaskStatus.Idle, + CreatedAt = DateTime.UtcNow, + CommitType = "feat", + }; + await _tasks.AddAsync(task); + return (listId, task.Id, wd); + } + + [Fact] + public async Task StartAsync_MissingWorkingDir_Throws() + { + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity + { + Id = listId, + Name = "NoDir", + WorkingDir = "/no/such/dir/ever/exists", + CreatedAt = DateTime.UtcNow, + }); + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "T", + Status = TaskStatus.Idle, + CreatedAt = DateTime.UtcNow, + CommitType = "feat", + }; + await _tasks.AddAsync(task); + + var svc = CreateService(); + await Assert.ThrowsAsync( + () => svc.StartAsync(task.Id, CancellationToken.None)); + } + + [Fact] + public async Task StartAsync_NullWorkingDir_Throws() + { + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity + { + Id = listId, + Name = "NullDir", + WorkingDir = null, + CreatedAt = DateTime.UtcNow, + }); + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "T", + Status = TaskStatus.Idle, + CreatedAt = DateTime.UtcNow, + CommitType = "feat", + }; + await _tasks.AddAsync(task); + + var svc = CreateService(); + await Assert.ThrowsAsync( + () => svc.StartAsync(task.Id, CancellationToken.None)); + } + + [Fact] + public async Task StartAsync_RegistersSessionAndBroadcastsStarted() + { + var (_, taskId, _) = await SeedAsync(); + var fakeSession = new FakeLiveSession(); + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task)); + await svc.StartAsync(taskId, CancellationToken.None); + + // Session registered + Assert.True(_registry.TryGet(taskId, out var registered)); + Assert.Same(fakeSession, registered); + + // InteractiveSessionStarted broadcast + Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionStarted"); + + // Cleanup + exitTcs.SetResult(true); + await Task.Delay(50); // let watcher fire + } + + [Fact] + public async Task StartAsync_AlreadyRunning_Throws() + { + var (_, taskId, _) = await SeedAsync(); + var fakeSession = new FakeLiveSession(); + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task)); + await svc.StartAsync(taskId, CancellationToken.None); + + await Assert.ThrowsAsync( + () => svc.StartAsync(taskId, CancellationToken.None)); + + exitTcs.SetResult(true); + await Task.Delay(50); + } + + [Fact] + public async Task ExitWatcher_UnregistersAndBroadcastsEnded() + { + var (_, taskId, _) = await SeedAsync(); + var fakeSession = new FakeLiveSession(); + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task)); + await svc.StartAsync(taskId, CancellationToken.None); + + // Process exits naturally + exitTcs.SetResult(true); + + // Give the watcher time to run + var deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline) + { + if (_registry.TryGet(taskId, out _) == false) break; + await Task.Delay(10); + } + + Assert.False(_registry.TryGet(taskId, out _)); + Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionEnded"); + } + + [Fact] + public async Task SendAsync_RoutesToSession() + { + var (_, taskId, _) = await SeedAsync(); + var fakeSession = new FakeLiveSession(); + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task)); + await svc.StartAsync(taskId, CancellationToken.None); + + await svc.SendAsync(taskId, "hello", CancellationToken.None); + + Assert.Equal(1, fakeSession.SendCalls); + Assert.Equal("hello", fakeSession.LastSentText); + + exitTcs.SetResult(true); + await Task.Delay(50); + } + + [Fact] + public async Task SendAsync_NoSession_Throws() + { + var svc = CreateService(); + await Assert.ThrowsAsync( + () => svc.SendAsync("nonexistent-task", "text", CancellationToken.None)); + } + + [Fact] + public async Task StopAsync_UnregistersSessionAndStopsIt() + { + var (_, taskId, _) = await SeedAsync(); + var fakeSession = new FakeLiveSession(); + // Keep the exit task pending so the exit watcher doesn't race with StopAsync. + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task)); + await svc.StartAsync(taskId, CancellationToken.None); + + // Stop before the process exits naturally. + await svc.StopAsync(taskId, CancellationToken.None); + + // Registry is cleared by StopAsync (which calls _registry.StopAsync -> session.StopAsync + TryRemove). + Assert.False(_registry.TryGet(taskId, out _)); + Assert.True(fakeSession.Stopped); + + // Let the watcher complete harmlessly. + exitTcs.SetResult(true); + await Task.Delay(50); + } + + [Fact] + public async Task OnLineCallback_BroadcastsTaskMessageWithPrefix() + { + var (_, taskId, _) = await SeedAsync(); + Func? capturedOnLine = null; + var fakeSession = new FakeLiveSession(); + var exitTcs = new TaskCompletionSource(); + + var svc = CreateService((_, __, onLine) => + { + capturedOnLine = onLine; + return (fakeSession, exitTcs.Task); + }); + + await svc.StartAsync(taskId, CancellationToken.None); + + Assert.NotNull(capturedOnLine); + await capturedOnLine!("some line"); + + Assert.Contains(_hubCtx.Proxy.Calls, c => + c.Method == "TaskMessage" && + c.Args.Length >= 2 && + c.Args[1] is string s && s.StartsWith("[stdout] ")); + + exitTcs.SetResult(true); + await Task.Delay(50); + } +} + +internal sealed class FakeLiveSession : ILiveSession +{ + public bool IsTurnInFlight => false; + public int SendCalls { get; private set; } + public string? LastSentText { get; private set; } + public bool Stopped { get; private set; } + + public Task SendUserMessageAsync(string text, CancellationToken ct) + { + SendCalls++; + LastSentText = text; + return Task.CompletedTask; + } + + public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask; + + public Task StopAsync() + { + Stopped = true; + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; +}