feat(worker): in-app interactive session service, replacing the wt terminal launch

InteractiveSessionService resolves a task's list working dir + seeded prompt,
spawns a StreamingClaudeSession (claude stream-json in the list dir, model+auto
as before), registers it in LiveSessionRegistry, streams output over TaskMessage,
and broadcasts InteractiveSessionStarted/Ended (an exit watcher fires Ended). The
hub's OpenInteractiveTerminalAsync now starts this in-app session; SendInteractiveMessage
and StopInteractiveSession route to it. The external Windows-Terminal interactive
launch (LaunchInteractiveAsync / InteractiveLaunchContext / OpenInteractiveAsync) is
removed; planning sessions keep their terminal launch.
This commit is contained in:
Mika Kuns
2026-06-26 09:17:11 +02:00
parent d8a043fae7
commit 30e87e698e
13 changed files with 475 additions and 84 deletions

View File

@@ -77,4 +77,10 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster
Task IRefineBroadcaster.RefineStartedAsync(string taskId) => RefineStarted(taskId); Task IRefineBroadcaster.RefineStartedAsync(string taskId) => RefineStarted(taskId);
Task IRefineBroadcaster.RefineFinishedAsync(string taskId, bool success, string? error) => Task IRefineBroadcaster.RefineFinishedAsync(string taskId, bool success, string? error) =>
RefineFinished(taskId, success, error); RefineFinished(taskId, success, error);
public Task InteractiveSessionStarted(string taskId) =>
_hub.Clients.All.SendAsync("InteractiveSessionStarted", taskId);
public Task InteractiveSessionEnded(string taskId) =>
_hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId);
} }

View File

@@ -9,6 +9,7 @@ using ClaudeDo.Worker.Lifecycle;
using ClaudeDo.Worker.Logging; using ClaudeDo.Worker.Logging;
using ClaudeDo.Worker.Online; using ClaudeDo.Worker.Online;
using ClaudeDo.Worker.Planning; using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Prime; using ClaudeDo.Worker.Prime;
using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Queue;
using ClaudeDo.Worker.Refine; using ClaudeDo.Worker.Refine;
@@ -116,6 +117,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
private readonly OnlineInboxConfig _onlineInboxConfig; private readonly OnlineInboxConfig _onlineInboxConfig;
private readonly OnlineTokenStore _onlineTokenStore; private readonly OnlineTokenStore _onlineTokenStore;
private readonly Runner.PendingQuestionRegistry _pendingQuestions; private readonly Runner.PendingQuestionRegistry _pendingQuestions;
private readonly InteractiveSessionService _interactive;
private readonly LogRingBuffer? _logBuffer; private readonly LogRingBuffer? _logBuffer;
public WorkerHub( public WorkerHub(
@@ -142,6 +144,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
OnlineInboxConfig onlineInboxConfig, OnlineInboxConfig onlineInboxConfig,
OnlineTokenStore onlineTokenStore, OnlineTokenStore onlineTokenStore,
Runner.PendingQuestionRegistry pendingQuestions, Runner.PendingQuestionRegistry pendingQuestions,
InteractiveSessionService interactive,
LogRingBuffer? logBuffer = null) LogRingBuffer? logBuffer = null)
{ {
_queue = queue; _queue = queue;
@@ -167,6 +170,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
_onlineInboxConfig = onlineInboxConfig; _onlineInboxConfig = onlineInboxConfig;
_onlineTokenStore = onlineTokenStore; _onlineTokenStore = onlineTokenStore;
_pendingQuestions = pendingQuestions; _pendingQuestions = pendingQuestions;
_interactive = interactive;
_logBuffer = logBuffer; _logBuffer = logBuffer;
} }
@@ -568,11 +572,14 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
return ctx; return ctx;
} }
public async Task OpenInteractiveTerminalAsync(string taskId) public Task OpenInteractiveTerminalAsync(string taskId) =>
{ _interactive.StartAsync(taskId, Context.ConnectionAborted);
var ctx = await _planning.OpenInteractiveAsync(taskId, Context.ConnectionAborted);
await _launcher.LaunchInteractiveAsync(ctx, Context.ConnectionAborted); public Task SendInteractiveMessage(string taskId, string text) =>
} _interactive.SendAsync(taskId, text, Context.ConnectionAborted);
public Task StopInteractiveSession(string taskId) =>
_interactive.StopAsync(taskId, Context.ConnectionAborted);
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false) public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
{ {

View File

@@ -0,0 +1,147 @@
using System.Text;
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Runner.Interfaces;
using Microsoft.EntityFrameworkCore;
namespace ClaudeDo.Worker.Planning;
public sealed class InteractiveSessionService
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly WorkerConfig _cfg;
private readonly HubBroadcaster _broadcaster;
private readonly LiveSessionRegistry _registry;
private readonly ILoggerFactory _loggerFactory;
// Optional factory for tests. Signature: (onLine) -> (session, waitForExitTask).
// The waitForExitTask completes when the underlying process has exited.
private readonly Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? _sessionFactory;
public InteractiveSessionService(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
WorkerConfig cfg,
HubBroadcaster broadcaster,
LiveSessionRegistry registry,
ILoggerFactory loggerFactory,
Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? sessionFactory = null)
{
_dbFactory = dbFactory;
_cfg = cfg;
_broadcaster = broadcaster;
_registry = registry;
_loggerFactory = loggerFactory;
_sessionFactory = sessionFactory;
}
public async Task StartAsync(string taskId, CancellationToken ct)
{
if (_registry.TryGet(taskId, out _))
throw new InvalidOperationException("An interactive session is already running for this task.");
await using var ctx = _dbFactory.CreateDbContext();
var tasks = new TaskRepository(ctx);
var lists = new ListRepository(ctx);
var task = await tasks.GetByIdAsync(taskId, ct)
?? throw new InvalidOperationException($"Task {taskId} not found.");
var list = await lists.GetByIdAsync(task.ListId, ct)
?? throw new InvalidOperationException($"List {task.ListId} not found.");
var workingDir = list.WorkingDir;
if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir))
throw new InvalidOperationException(
$"List '{list.Name}' has no valid working directory configured.");
var seededPrompt = BuildInteractivePrompt(task);
var args = new[]
{
"-p",
"--input-format", "stream-json",
"--output-format", "stream-json",
"--verbose",
"--replay-user-messages",
"--model", ModelRegistry.PlanningAlias,
"--permission-mode", "auto",
};
Func<string, Task> onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line);
ILiveSession session;
Task exitTask;
if (_sessionFactory is not null)
{
// Factory is responsible for providing a ready-to-use session and its exit signal.
(session, exitTask) = _sessionFactory(workingDir, args, onLine);
}
else
{
var transport = new ProcessClaudeStreamTransport(
_cfg,
_loggerFactory.CreateLogger<ProcessClaudeStreamTransport>());
var streamingSession = new StreamingClaudeSession(
transport,
onLine,
_loggerFactory.CreateLogger<StreamingClaudeSession>());
await streamingSession.StartAsync(args, workingDir, seededPrompt, ct);
session = streamingSession;
exitTask = transport.WaitForExitAsync();
}
_registry.Register(taskId, session);
await _broadcaster.InteractiveSessionStarted(taskId);
var logger = _loggerFactory.CreateLogger<InteractiveSessionService>();
_ = WatchExitAsync(taskId, exitTask, logger);
}
private async Task WatchExitAsync(string taskId, Task exitTask, ILogger logger)
{
try
{
await exitTask;
}
catch (Exception ex)
{
logger.LogWarning(ex, "Interactive session exit watcher caught an exception for task {task_id}", taskId);
}
finally
{
_registry.Unregister(taskId);
try { await _broadcaster.InteractiveSessionEnded(taskId); }
catch (Exception ex) { logger.LogWarning(ex, "InteractiveSessionEnded broadcast failed for task {task_id}", taskId); }
}
}
public async Task SendAsync(string taskId, string text, CancellationToken ct)
{
if (!_registry.TryGet(taskId, out var session))
throw new InvalidOperationException("No interactive session is running for this task.");
await session.SendUserMessageAsync(text, ct);
}
public async Task StopAsync(string taskId, CancellationToken ct)
{
// StopAsync removes from registry and kills the session.
// The exit watcher will fire InteractiveSessionEnded once the process exits,
// so we don't broadcast here — the watcher is the single authoritative source.
await _registry.StopAsync(taskId);
}
private static string BuildInteractivePrompt(TaskEntity task)
{
var sb = new StringBuilder();
sb.AppendLine($"# Task: {task.Title}");
if (!string.IsNullOrWhiteSpace(task.Description))
{
sb.AppendLine();
sb.AppendLine(task.Description);
}
return sb.ToString();
}
}

View File

@@ -1,13 +1,12 @@
namespace ClaudeDo.Worker.Planning; namespace ClaudeDo.Worker.Planning;
// Launches the Claude CLI in a visible terminal for human-driven sessions: // Launches the Claude CLI in a visible terminal for human-driven planning sessions.
// planning (start/resume) and the ad-hoc "Run interactively" action. Not used for // Not used for headless task execution (that path is ClaudeProcess, prompt over stdin)
// headless task execution (that path is ClaudeProcess, prompt over stdin). // nor for interactive sessions (those use InteractiveSessionService + StreamingClaudeSession).
public interface ITerminalLauncher public interface ITerminalLauncher
{ {
Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken); Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken);
Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken); Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken);
Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken);
} }
public sealed class TerminalLaunchException : Exception public sealed class TerminalLaunchException : Exception

View File

@@ -5,11 +5,6 @@ public sealed record PlanningSessionFiles(
string SystemPromptPath, string SystemPromptPath,
string InitialPromptPath); string InitialPromptPath);
public sealed record InteractiveLaunchContext(
string TaskId,
string WorkingDir,
string InitialPrompt);
public sealed class PlanningMcpContext public sealed class PlanningMcpContext
{ {
public required string ParentTaskId { get; init; } public required string ParentTaskId { get; init; }

View File

@@ -1,5 +1,4 @@
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text;
using System.Text.Json; using System.Text.Json;
using ClaudeDo.Data; using ClaudeDo.Data;
using ClaudeDo.Data.Git; using ClaudeDo.Data.Git;
@@ -161,40 +160,6 @@ public sealed class PlanningSessionManager
Files: files); Files: files);
} }
public async Task<InteractiveLaunchContext> OpenInteractiveAsync(string taskId, CancellationToken ct)
{
var (tasks, lists, _, ctx) = CreateRepos();
await using var __ = ctx;
var task = await tasks.GetByIdAsync(taskId, ct)
?? throw new InvalidOperationException($"Task {taskId} not found.");
var list = await lists.GetByIdAsync(task.ListId, ct)
?? throw new InvalidOperationException($"List {task.ListId} not found.");
var workingDir = list.WorkingDir;
if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir))
throw new InvalidOperationException(
$"List '{list.Name}' has no valid working directory configured.");
return new InteractiveLaunchContext(
TaskId: taskId,
WorkingDir: workingDir,
InitialPrompt: BuildInteractivePrompt(task));
}
private static string BuildInteractivePrompt(TaskEntity task)
{
var sb = new StringBuilder();
sb.AppendLine($"# Task: {task.Title}");
if (!string.IsNullOrWhiteSpace(task.Description))
{
sb.AppendLine();
sb.AppendLine(task.Description);
}
return sb.ToString();
}
public async Task<int> FinalizeAsync(string taskId, bool queueAgentTasks, CancellationToken ct) public async Task<int> FinalizeAsync(string taskId, bool queueAgentTasks, CancellationToken ct)
{ {
var (tasks, lists, settings, ctx) = CreateRepos(); var (tasks, lists, settings, ctx) = CreateRepos();

View File

@@ -74,29 +74,6 @@ public sealed class WindowsTerminalLauncher : ITerminalLauncher
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken)
{
if (!Directory.Exists(ctx.WorkingDir))
throw new TerminalLaunchException($"Working directory does not exist: {ctx.WorkingDir}");
var resolvedWt = ResolveWtOrThrow();
var resolvedClaude = ResolveClaudeOrThrow();
var command = BuildPwshCommand(resolvedClaude, new[]
{
"--model", Model,
"--permission-mode", "auto",
}, appendPrompt: true);
StartInWindowsTerminal(resolvedWt, ctx.WorkingDir, command, env =>
{
env["MAX_THINKING_TOKENS"] = "20000";
env[PromptEnvVar] = ctx.InitialPrompt;
});
return Task.CompletedTask;
}
public Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken) public Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken)
{ {
if (!Directory.Exists(ctx.WorkingDir)) if (!Directory.Exists(ctx.WorkingDir))

View File

@@ -80,6 +80,8 @@ builder.Services.AddSingleton<TaskMergeService>();
builder.Services.AddSingleton<PlanningAggregator>(); builder.Services.AddSingleton<PlanningAggregator>();
builder.Services.AddSingleton<PlanningMergeOrchestrator>(); builder.Services.AddSingleton<PlanningMergeOrchestrator>();
builder.Services.AddSingleton<PlanningChainCoordinator>(); builder.Services.AddSingleton<PlanningChainCoordinator>();
builder.Services.AddSingleton<LiveSessionRegistry>();
builder.Services.AddSingleton<InteractiveSessionService>();
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker // Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
// performs atomic Queued→Running claim. Both injected into the state service so it // performs atomic Queued→Running claim. Both injected into the state service so it

View File

@@ -21,7 +21,7 @@ public sealed class ClearMyDayHubTests : IDisposable
null!, null!, null!, null!, broadcaster, _db.CreateFactory(), null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
hub.Context = new FakeHubCallerContext(); hub.Context = new FakeHubCallerContext();
return hub; return hub;

View File

@@ -31,7 +31,7 @@ public sealed class OnlineInboxHubTests : IDisposable
var hub = new WorkerHub( var hub = new WorkerHub(
null!, null!, null!, null!, broadcaster, null!, null!, null!, null!, null!, broadcaster, null!,
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
hub.Context = new FakeHubCallerContext(); hub.Context = new FakeHubCallerContext();
return (hub, inboxCfg, store); return (hub, inboxCfg, store);

View File

@@ -57,7 +57,7 @@ public sealed class PlanningHubTests : IDisposable
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
_planning, _launcher, null!, null!, null!, null!, null!, null!, null!, null!, _planning, _launcher, null!, null!, null!, null!, null!, null!, null!, null!,
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
hub.Clients = new FakeHubCallerClients(_proxy); hub.Clients = new FakeHubCallerClients(_proxy);
hub.Context = new FakeHubCallerContext(); hub.Context = new FakeHubCallerContext();
return hub; return hub;
@@ -179,7 +179,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher
public bool ShouldThrow { get; set; } public bool ShouldThrow { get; set; }
public int LaunchStartCalls { get; private set; } public int LaunchStartCalls { get; private set; }
public int LaunchResumeCalls { get; private set; } public int LaunchResumeCalls { get; private set; }
public int LaunchInteractiveCalls { get; private set; }
public Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken) public Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken)
{ {
@@ -193,13 +192,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher
LaunchResumeCalls++; LaunchResumeCalls++;
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken)
{
if (ShouldThrow) throw new TerminalLaunchException("fake launch failure");
LaunchInteractiveCalls++;
return Task.CompletedTask;
}
} }
internal sealed class RecordingClientProxy : IClientProxy internal sealed class RecordingClientProxy : IClientProxy

View File

@@ -21,7 +21,7 @@ public sealed class WorktreeStateHubTests : IDisposable
null!, null!, null!, null!, broadcaster, _db.CreateFactory(), null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(), null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
new ClaudeDo.Worker.Runner.PendingQuestionRegistry()); new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy()); hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
hub.Context = new FakeHubCallerContext(); hub.Context = new FakeHubCallerContext();
return hub; return hub;

View File

@@ -0,0 +1,301 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Runner.Interfaces;
using ClaudeDo.Worker.Tests.Infrastructure;
using Microsoft.Extensions.Logging.Abstractions;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Tests.Planning;
public sealed class InteractiveSessionServiceTests : IDisposable
{
private readonly DbFixture _db = new();
private readonly ClaudeDoDbContext _ctx;
private readonly TaskRepository _tasks;
private readonly ListRepository _lists;
private readonly CapturingHubContext _hubCtx;
private readonly HubBroadcaster _broadcaster;
private readonly LiveSessionRegistry _registry;
private readonly WorkerConfig _cfg;
public InteractiveSessionServiceTests()
{
_ctx = _db.CreateContext();
_tasks = new TaskRepository(_ctx);
_lists = new ListRepository(_ctx);
_hubCtx = new CapturingHubContext();
_broadcaster = new HubBroadcaster(_hubCtx);
_registry = new LiveSessionRegistry();
_cfg = new WorkerConfig();
}
public void Dispose()
{
_ctx.Dispose();
_db.Dispose();
}
private InteractiveSessionService CreateService(
Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? factory = null)
{
return new InteractiveSessionService(
_db.CreateFactory(),
_cfg,
_broadcaster,
_registry,
NullLoggerFactory.Instance,
factory);
}
private async Task<(string listId, string taskId, string workingDir)> SeedAsync()
{
var wd = Path.Combine(Path.GetTempPath(), $"iss_wd_{Guid.NewGuid():N}");
Directory.CreateDirectory(wd);
var listId = Guid.NewGuid().ToString();
await _lists.AddAsync(new ListEntity
{
Id = listId,
Name = "L",
WorkingDir = wd,
CreatedAt = DateTime.UtcNow,
});
var task = new TaskEntity
{
Id = Guid.NewGuid().ToString(),
ListId = listId,
Title = "My task",
Description = "Do the thing",
Status = TaskStatus.Idle,
CreatedAt = DateTime.UtcNow,
CommitType = "feat",
};
await _tasks.AddAsync(task);
return (listId, task.Id, wd);
}
[Fact]
public async Task StartAsync_MissingWorkingDir_Throws()
{
var listId = Guid.NewGuid().ToString();
await _lists.AddAsync(new ListEntity
{
Id = listId,
Name = "NoDir",
WorkingDir = "/no/such/dir/ever/exists",
CreatedAt = DateTime.UtcNow,
});
var task = new TaskEntity
{
Id = Guid.NewGuid().ToString(),
ListId = listId,
Title = "T",
Status = TaskStatus.Idle,
CreatedAt = DateTime.UtcNow,
CommitType = "feat",
};
await _tasks.AddAsync(task);
var svc = CreateService();
await Assert.ThrowsAsync<InvalidOperationException>(
() => svc.StartAsync(task.Id, CancellationToken.None));
}
[Fact]
public async Task StartAsync_NullWorkingDir_Throws()
{
var listId = Guid.NewGuid().ToString();
await _lists.AddAsync(new ListEntity
{
Id = listId,
Name = "NullDir",
WorkingDir = null,
CreatedAt = DateTime.UtcNow,
});
var task = new TaskEntity
{
Id = Guid.NewGuid().ToString(),
ListId = listId,
Title = "T",
Status = TaskStatus.Idle,
CreatedAt = DateTime.UtcNow,
CommitType = "feat",
};
await _tasks.AddAsync(task);
var svc = CreateService();
await Assert.ThrowsAsync<InvalidOperationException>(
() => svc.StartAsync(task.Id, CancellationToken.None));
}
[Fact]
public async Task StartAsync_RegistersSessionAndBroadcastsStarted()
{
var (_, taskId, _) = await SeedAsync();
var fakeSession = new FakeLiveSession();
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
await svc.StartAsync(taskId, CancellationToken.None);
// Session registered
Assert.True(_registry.TryGet(taskId, out var registered));
Assert.Same(fakeSession, registered);
// InteractiveSessionStarted broadcast
Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionStarted");
// Cleanup
exitTcs.SetResult(true);
await Task.Delay(50); // let watcher fire
}
[Fact]
public async Task StartAsync_AlreadyRunning_Throws()
{
var (_, taskId, _) = await SeedAsync();
var fakeSession = new FakeLiveSession();
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
await svc.StartAsync(taskId, CancellationToken.None);
await Assert.ThrowsAsync<InvalidOperationException>(
() => svc.StartAsync(taskId, CancellationToken.None));
exitTcs.SetResult(true);
await Task.Delay(50);
}
[Fact]
public async Task ExitWatcher_UnregistersAndBroadcastsEnded()
{
var (_, taskId, _) = await SeedAsync();
var fakeSession = new FakeLiveSession();
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
await svc.StartAsync(taskId, CancellationToken.None);
// Process exits naturally
exitTcs.SetResult(true);
// Give the watcher time to run
var deadline = DateTime.UtcNow.AddSeconds(2);
while (DateTime.UtcNow < deadline)
{
if (_registry.TryGet(taskId, out _) == false) break;
await Task.Delay(10);
}
Assert.False(_registry.TryGet(taskId, out _));
Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionEnded");
}
[Fact]
public async Task SendAsync_RoutesToSession()
{
var (_, taskId, _) = await SeedAsync();
var fakeSession = new FakeLiveSession();
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
await svc.StartAsync(taskId, CancellationToken.None);
await svc.SendAsync(taskId, "hello", CancellationToken.None);
Assert.Equal(1, fakeSession.SendCalls);
Assert.Equal("hello", fakeSession.LastSentText);
exitTcs.SetResult(true);
await Task.Delay(50);
}
[Fact]
public async Task SendAsync_NoSession_Throws()
{
var svc = CreateService();
await Assert.ThrowsAsync<InvalidOperationException>(
() => svc.SendAsync("nonexistent-task", "text", CancellationToken.None));
}
[Fact]
public async Task StopAsync_UnregistersSessionAndStopsIt()
{
var (_, taskId, _) = await SeedAsync();
var fakeSession = new FakeLiveSession();
// Keep the exit task pending so the exit watcher doesn't race with StopAsync.
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
await svc.StartAsync(taskId, CancellationToken.None);
// Stop before the process exits naturally.
await svc.StopAsync(taskId, CancellationToken.None);
// Registry is cleared by StopAsync (which calls _registry.StopAsync -> session.StopAsync + TryRemove).
Assert.False(_registry.TryGet(taskId, out _));
Assert.True(fakeSession.Stopped);
// Let the watcher complete harmlessly.
exitTcs.SetResult(true);
await Task.Delay(50);
}
[Fact]
public async Task OnLineCallback_BroadcastsTaskMessageWithPrefix()
{
var (_, taskId, _) = await SeedAsync();
Func<string, Task>? capturedOnLine = null;
var fakeSession = new FakeLiveSession();
var exitTcs = new TaskCompletionSource<bool>();
var svc = CreateService((_, __, onLine) =>
{
capturedOnLine = onLine;
return (fakeSession, exitTcs.Task);
});
await svc.StartAsync(taskId, CancellationToken.None);
Assert.NotNull(capturedOnLine);
await capturedOnLine!("some line");
Assert.Contains(_hubCtx.Proxy.Calls, c =>
c.Method == "TaskMessage" &&
c.Args.Length >= 2 &&
c.Args[1] is string s && s.StartsWith("[stdout] "));
exitTcs.SetResult(true);
await Task.Delay(50);
}
}
internal sealed class FakeLiveSession : ILiveSession
{
public bool IsTurnInFlight => false;
public int SendCalls { get; private set; }
public string? LastSentText { get; private set; }
public bool Stopped { get; private set; }
public Task SendUserMessageAsync(string text, CancellationToken ct)
{
SendCalls++;
LastSentText = text;
return Task.CompletedTask;
}
public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask;
public Task StopAsync()
{
Stopped = true;
return Task.CompletedTask;
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}