feat(worker): in-app interactive session service, replacing the wt terminal launch
InteractiveSessionService resolves a task's list working dir + seeded prompt, spawns a StreamingClaudeSession (claude stream-json in the list dir, model+auto as before), registers it in LiveSessionRegistry, streams output over TaskMessage, and broadcasts InteractiveSessionStarted/Ended (an exit watcher fires Ended). The hub's OpenInteractiveTerminalAsync now starts this in-app session; SendInteractiveMessage and StopInteractiveSession route to it. The external Windows-Terminal interactive launch (LaunchInteractiveAsync / InteractiveLaunchContext / OpenInteractiveAsync) is removed; planning sessions keep their terminal launch.
This commit is contained in:
@@ -77,4 +77,10 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster
|
|||||||
Task IRefineBroadcaster.RefineStartedAsync(string taskId) => RefineStarted(taskId);
|
Task IRefineBroadcaster.RefineStartedAsync(string taskId) => RefineStarted(taskId);
|
||||||
Task IRefineBroadcaster.RefineFinishedAsync(string taskId, bool success, string? error) =>
|
Task IRefineBroadcaster.RefineFinishedAsync(string taskId, bool success, string? error) =>
|
||||||
RefineFinished(taskId, success, error);
|
RefineFinished(taskId, success, error);
|
||||||
|
|
||||||
|
public Task InteractiveSessionStarted(string taskId) =>
|
||||||
|
_hub.Clients.All.SendAsync("InteractiveSessionStarted", taskId);
|
||||||
|
|
||||||
|
public Task InteractiveSessionEnded(string taskId) =>
|
||||||
|
_hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ using ClaudeDo.Worker.Lifecycle;
|
|||||||
using ClaudeDo.Worker.Logging;
|
using ClaudeDo.Worker.Logging;
|
||||||
using ClaudeDo.Worker.Online;
|
using ClaudeDo.Worker.Online;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Prime;
|
using ClaudeDo.Worker.Prime;
|
||||||
using ClaudeDo.Worker.Queue;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Refine;
|
using ClaudeDo.Worker.Refine;
|
||||||
@@ -116,6 +117,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
private readonly OnlineInboxConfig _onlineInboxConfig;
|
private readonly OnlineInboxConfig _onlineInboxConfig;
|
||||||
private readonly OnlineTokenStore _onlineTokenStore;
|
private readonly OnlineTokenStore _onlineTokenStore;
|
||||||
private readonly Runner.PendingQuestionRegistry _pendingQuestions;
|
private readonly Runner.PendingQuestionRegistry _pendingQuestions;
|
||||||
|
private readonly InteractiveSessionService _interactive;
|
||||||
private readonly LogRingBuffer? _logBuffer;
|
private readonly LogRingBuffer? _logBuffer;
|
||||||
|
|
||||||
public WorkerHub(
|
public WorkerHub(
|
||||||
@@ -142,6 +144,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
OnlineInboxConfig onlineInboxConfig,
|
OnlineInboxConfig onlineInboxConfig,
|
||||||
OnlineTokenStore onlineTokenStore,
|
OnlineTokenStore onlineTokenStore,
|
||||||
Runner.PendingQuestionRegistry pendingQuestions,
|
Runner.PendingQuestionRegistry pendingQuestions,
|
||||||
|
InteractiveSessionService interactive,
|
||||||
LogRingBuffer? logBuffer = null)
|
LogRingBuffer? logBuffer = null)
|
||||||
{
|
{
|
||||||
_queue = queue;
|
_queue = queue;
|
||||||
@@ -167,6 +170,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
_onlineInboxConfig = onlineInboxConfig;
|
_onlineInboxConfig = onlineInboxConfig;
|
||||||
_onlineTokenStore = onlineTokenStore;
|
_onlineTokenStore = onlineTokenStore;
|
||||||
_pendingQuestions = pendingQuestions;
|
_pendingQuestions = pendingQuestions;
|
||||||
|
_interactive = interactive;
|
||||||
_logBuffer = logBuffer;
|
_logBuffer = logBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -568,11 +572,14 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task OpenInteractiveTerminalAsync(string taskId)
|
public Task OpenInteractiveTerminalAsync(string taskId) =>
|
||||||
{
|
_interactive.StartAsync(taskId, Context.ConnectionAborted);
|
||||||
var ctx = await _planning.OpenInteractiveAsync(taskId, Context.ConnectionAborted);
|
|
||||||
await _launcher.LaunchInteractiveAsync(ctx, Context.ConnectionAborted);
|
public Task SendInteractiveMessage(string taskId, string text) =>
|
||||||
}
|
_interactive.SendAsync(taskId, text, Context.ConnectionAborted);
|
||||||
|
|
||||||
|
public Task StopInteractiveSession(string taskId) =>
|
||||||
|
_interactive.StopAsync(taskId, Context.ConnectionAborted);
|
||||||
|
|
||||||
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
|
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
|
||||||
{
|
{
|
||||||
|
|||||||
147
src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
Normal file
147
src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
using System.Text;
|
||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Config;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Runner;
|
||||||
|
using ClaudeDo.Worker.Runner.Interfaces;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Planning;
|
||||||
|
|
||||||
|
public sealed class InteractiveSessionService
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly WorkerConfig _cfg;
|
||||||
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly LiveSessionRegistry _registry;
|
||||||
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
|
||||||
|
// Optional factory for tests. Signature: (onLine) -> (session, waitForExitTask).
|
||||||
|
// The waitForExitTask completes when the underlying process has exited.
|
||||||
|
private readonly Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? _sessionFactory;
|
||||||
|
|
||||||
|
public InteractiveSessionService(
|
||||||
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
WorkerConfig cfg,
|
||||||
|
HubBroadcaster broadcaster,
|
||||||
|
LiveSessionRegistry registry,
|
||||||
|
ILoggerFactory loggerFactory,
|
||||||
|
Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? sessionFactory = null)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_cfg = cfg;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
|
_registry = registry;
|
||||||
|
_loggerFactory = loggerFactory;
|
||||||
|
_sessionFactory = sessionFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task StartAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_registry.TryGet(taskId, out _))
|
||||||
|
throw new InvalidOperationException("An interactive session is already running for this task.");
|
||||||
|
|
||||||
|
await using var ctx = _dbFactory.CreateDbContext();
|
||||||
|
var tasks = new TaskRepository(ctx);
|
||||||
|
var lists = new ListRepository(ctx);
|
||||||
|
|
||||||
|
var task = await tasks.GetByIdAsync(taskId, ct)
|
||||||
|
?? throw new InvalidOperationException($"Task {taskId} not found.");
|
||||||
|
var list = await lists.GetByIdAsync(task.ListId, ct)
|
||||||
|
?? throw new InvalidOperationException($"List {task.ListId} not found.");
|
||||||
|
|
||||||
|
var workingDir = list.WorkingDir;
|
||||||
|
if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir))
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"List '{list.Name}' has no valid working directory configured.");
|
||||||
|
|
||||||
|
var seededPrompt = BuildInteractivePrompt(task);
|
||||||
|
|
||||||
|
var args = new[]
|
||||||
|
{
|
||||||
|
"-p",
|
||||||
|
"--input-format", "stream-json",
|
||||||
|
"--output-format", "stream-json",
|
||||||
|
"--verbose",
|
||||||
|
"--replay-user-messages",
|
||||||
|
"--model", ModelRegistry.PlanningAlias,
|
||||||
|
"--permission-mode", "auto",
|
||||||
|
};
|
||||||
|
|
||||||
|
Func<string, Task> onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line);
|
||||||
|
|
||||||
|
ILiveSession session;
|
||||||
|
Task exitTask;
|
||||||
|
|
||||||
|
if (_sessionFactory is not null)
|
||||||
|
{
|
||||||
|
// Factory is responsible for providing a ready-to-use session and its exit signal.
|
||||||
|
(session, exitTask) = _sessionFactory(workingDir, args, onLine);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var transport = new ProcessClaudeStreamTransport(
|
||||||
|
_cfg,
|
||||||
|
_loggerFactory.CreateLogger<ProcessClaudeStreamTransport>());
|
||||||
|
var streamingSession = new StreamingClaudeSession(
|
||||||
|
transport,
|
||||||
|
onLine,
|
||||||
|
_loggerFactory.CreateLogger<StreamingClaudeSession>());
|
||||||
|
await streamingSession.StartAsync(args, workingDir, seededPrompt, ct);
|
||||||
|
session = streamingSession;
|
||||||
|
exitTask = transport.WaitForExitAsync();
|
||||||
|
}
|
||||||
|
_registry.Register(taskId, session);
|
||||||
|
await _broadcaster.InteractiveSessionStarted(taskId);
|
||||||
|
|
||||||
|
var logger = _loggerFactory.CreateLogger<InteractiveSessionService>();
|
||||||
|
_ = WatchExitAsync(taskId, exitTask, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task WatchExitAsync(string taskId, Task exitTask, ILogger logger)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await exitTask;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
logger.LogWarning(ex, "Interactive session exit watcher caught an exception for task {task_id}", taskId);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_registry.Unregister(taskId);
|
||||||
|
try { await _broadcaster.InteractiveSessionEnded(taskId); }
|
||||||
|
catch (Exception ex) { logger.LogWarning(ex, "InteractiveSessionEnded broadcast failed for task {task_id}", taskId); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task SendAsync(string taskId, string text, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (!_registry.TryGet(taskId, out var session))
|
||||||
|
throw new InvalidOperationException("No interactive session is running for this task.");
|
||||||
|
await session.SendUserMessageAsync(text, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task StopAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
// StopAsync removes from registry and kills the session.
|
||||||
|
// The exit watcher will fire InteractiveSessionEnded once the process exits,
|
||||||
|
// so we don't broadcast here — the watcher is the single authoritative source.
|
||||||
|
await _registry.StopAsync(taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string BuildInteractivePrompt(TaskEntity task)
|
||||||
|
{
|
||||||
|
var sb = new StringBuilder();
|
||||||
|
sb.AppendLine($"# Task: {task.Title}");
|
||||||
|
if (!string.IsNullOrWhiteSpace(task.Description))
|
||||||
|
{
|
||||||
|
sb.AppendLine();
|
||||||
|
sb.AppendLine(task.Description);
|
||||||
|
}
|
||||||
|
return sb.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,13 +1,12 @@
|
|||||||
namespace ClaudeDo.Worker.Planning;
|
namespace ClaudeDo.Worker.Planning;
|
||||||
|
|
||||||
// Launches the Claude CLI in a visible terminal for human-driven sessions:
|
// Launches the Claude CLI in a visible terminal for human-driven planning sessions.
|
||||||
// planning (start/resume) and the ad-hoc "Run interactively" action. Not used for
|
// Not used for headless task execution (that path is ClaudeProcess, prompt over stdin)
|
||||||
// headless task execution (that path is ClaudeProcess, prompt over stdin).
|
// nor for interactive sessions (those use InteractiveSessionService + StreamingClaudeSession).
|
||||||
public interface ITerminalLauncher
|
public interface ITerminalLauncher
|
||||||
{
|
{
|
||||||
Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken);
|
Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken);
|
||||||
Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken);
|
Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken);
|
||||||
Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class TerminalLaunchException : Exception
|
public sealed class TerminalLaunchException : Exception
|
||||||
|
|||||||
@@ -5,11 +5,6 @@ public sealed record PlanningSessionFiles(
|
|||||||
string SystemPromptPath,
|
string SystemPromptPath,
|
||||||
string InitialPromptPath);
|
string InitialPromptPath);
|
||||||
|
|
||||||
public sealed record InteractiveLaunchContext(
|
|
||||||
string TaskId,
|
|
||||||
string WorkingDir,
|
|
||||||
string InitialPrompt);
|
|
||||||
|
|
||||||
public sealed class PlanningMcpContext
|
public sealed class PlanningMcpContext
|
||||||
{
|
{
|
||||||
public required string ParentTaskId { get; init; }
|
public required string ParentTaskId { get; init; }
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
using System.Security.Cryptography;
|
using System.Security.Cryptography;
|
||||||
using System.Text;
|
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Data.Git;
|
using ClaudeDo.Data.Git;
|
||||||
@@ -161,40 +160,6 @@ public sealed class PlanningSessionManager
|
|||||||
Files: files);
|
Files: files);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<InteractiveLaunchContext> OpenInteractiveAsync(string taskId, CancellationToken ct)
|
|
||||||
{
|
|
||||||
var (tasks, lists, _, ctx) = CreateRepos();
|
|
||||||
await using var __ = ctx;
|
|
||||||
|
|
||||||
var task = await tasks.GetByIdAsync(taskId, ct)
|
|
||||||
?? throw new InvalidOperationException($"Task {taskId} not found.");
|
|
||||||
|
|
||||||
var list = await lists.GetByIdAsync(task.ListId, ct)
|
|
||||||
?? throw new InvalidOperationException($"List {task.ListId} not found.");
|
|
||||||
|
|
||||||
var workingDir = list.WorkingDir;
|
|
||||||
if (string.IsNullOrWhiteSpace(workingDir) || !Directory.Exists(workingDir))
|
|
||||||
throw new InvalidOperationException(
|
|
||||||
$"List '{list.Name}' has no valid working directory configured.");
|
|
||||||
|
|
||||||
return new InteractiveLaunchContext(
|
|
||||||
TaskId: taskId,
|
|
||||||
WorkingDir: workingDir,
|
|
||||||
InitialPrompt: BuildInteractivePrompt(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static string BuildInteractivePrompt(TaskEntity task)
|
|
||||||
{
|
|
||||||
var sb = new StringBuilder();
|
|
||||||
sb.AppendLine($"# Task: {task.Title}");
|
|
||||||
if (!string.IsNullOrWhiteSpace(task.Description))
|
|
||||||
{
|
|
||||||
sb.AppendLine();
|
|
||||||
sb.AppendLine(task.Description);
|
|
||||||
}
|
|
||||||
return sb.ToString();
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<int> FinalizeAsync(string taskId, bool queueAgentTasks, CancellationToken ct)
|
public async Task<int> FinalizeAsync(string taskId, bool queueAgentTasks, CancellationToken ct)
|
||||||
{
|
{
|
||||||
var (tasks, lists, settings, ctx) = CreateRepos();
|
var (tasks, lists, settings, ctx) = CreateRepos();
|
||||||
|
|||||||
@@ -74,29 +74,6 @@ public sealed class WindowsTerminalLauncher : ITerminalLauncher
|
|||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
if (!Directory.Exists(ctx.WorkingDir))
|
|
||||||
throw new TerminalLaunchException($"Working directory does not exist: {ctx.WorkingDir}");
|
|
||||||
|
|
||||||
var resolvedWt = ResolveWtOrThrow();
|
|
||||||
var resolvedClaude = ResolveClaudeOrThrow();
|
|
||||||
|
|
||||||
var command = BuildPwshCommand(resolvedClaude, new[]
|
|
||||||
{
|
|
||||||
"--model", Model,
|
|
||||||
"--permission-mode", "auto",
|
|
||||||
}, appendPrompt: true);
|
|
||||||
|
|
||||||
StartInWindowsTerminal(resolvedWt, ctx.WorkingDir, command, env =>
|
|
||||||
{
|
|
||||||
env["MAX_THINKING_TOKENS"] = "20000";
|
|
||||||
env[PromptEnvVar] = ctx.InitialPrompt;
|
|
||||||
});
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken)
|
public Task LaunchPlanningResumeAsync(PlanningSessionResumeContext ctx, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (!Directory.Exists(ctx.WorkingDir))
|
if (!Directory.Exists(ctx.WorkingDir))
|
||||||
|
|||||||
@@ -80,6 +80,8 @@ builder.Services.AddSingleton<TaskMergeService>();
|
|||||||
builder.Services.AddSingleton<PlanningAggregator>();
|
builder.Services.AddSingleton<PlanningAggregator>();
|
||||||
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
||||||
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
||||||
|
builder.Services.AddSingleton<LiveSessionRegistry>();
|
||||||
|
builder.Services.AddSingleton<InteractiveSessionService>();
|
||||||
|
|
||||||
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
||||||
// performs atomic Queued→Running claim. Both injected into the state service so it
|
// performs atomic Queued→Running claim. Both injected into the state service so it
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ public sealed class ClearMyDayHubTests : IDisposable
|
|||||||
null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
|
null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
|
||||||
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
||||||
new ClaudeDo.Worker.Runner.PendingQuestionRegistry());
|
new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
|
||||||
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
return hub;
|
return hub;
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ public sealed class OnlineInboxHubTests : IDisposable
|
|||||||
var hub = new WorkerHub(
|
var hub = new WorkerHub(
|
||||||
null!, null!, null!, null!, broadcaster, null!,
|
null!, null!, null!, null!, broadcaster, null!,
|
||||||
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry());
|
cfg, inboxCfg, store, new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
|
||||||
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
return (hub, inboxCfg, store);
|
return (hub, inboxCfg, store);
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ public sealed class PlanningHubTests : IDisposable
|
|||||||
null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
_planning, _launcher, null!, null!, null!, null!, null!, null!, null!, null!,
|
_planning, _launcher, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
||||||
new ClaudeDo.Worker.Runner.PendingQuestionRegistry());
|
new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
|
||||||
hub.Clients = new FakeHubCallerClients(_proxy);
|
hub.Clients = new FakeHubCallerClients(_proxy);
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
return hub;
|
return hub;
|
||||||
@@ -179,7 +179,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher
|
|||||||
public bool ShouldThrow { get; set; }
|
public bool ShouldThrow { get; set; }
|
||||||
public int LaunchStartCalls { get; private set; }
|
public int LaunchStartCalls { get; private set; }
|
||||||
public int LaunchResumeCalls { get; private set; }
|
public int LaunchResumeCalls { get; private set; }
|
||||||
public int LaunchInteractiveCalls { get; private set; }
|
|
||||||
|
|
||||||
public Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken)
|
public Task LaunchPlanningStartAsync(PlanningSessionStartContext ctx, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
@@ -193,13 +192,6 @@ internal sealed class FakeTerminalLauncher : ITerminalLauncher
|
|||||||
LaunchResumeCalls++;
|
LaunchResumeCalls++;
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task LaunchInteractiveAsync(InteractiveLaunchContext ctx, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
if (ShouldThrow) throw new TerminalLaunchException("fake launch failure");
|
|
||||||
LaunchInteractiveCalls++;
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal sealed class RecordingClientProxy : IClientProxy
|
internal sealed class RecordingClientProxy : IClientProxy
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ public sealed class WorktreeStateHubTests : IDisposable
|
|||||||
null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
|
null!, null!, null!, null!, broadcaster, _db.CreateFactory(),
|
||||||
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
null!, new ClaudeDo.Worker.Online.OnlineInboxConfig(), new ClaudeDo.Worker.Online.OnlineTokenStore(),
|
||||||
new ClaudeDo.Worker.Runner.PendingQuestionRegistry());
|
new ClaudeDo.Worker.Runner.PendingQuestionRegistry(), null!);
|
||||||
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
hub.Clients = new FakeHubCallerClients(new RecordingClientProxy());
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
return hub;
|
return hub;
|
||||||
|
|||||||
@@ -0,0 +1,301 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Config;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Runner;
|
||||||
|
using ClaudeDo.Worker.Runner.Interfaces;
|
||||||
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Planning;
|
||||||
|
|
||||||
|
public sealed class InteractiveSessionServiceTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly DbFixture _db = new();
|
||||||
|
private readonly ClaudeDoDbContext _ctx;
|
||||||
|
private readonly TaskRepository _tasks;
|
||||||
|
private readonly ListRepository _lists;
|
||||||
|
private readonly CapturingHubContext _hubCtx;
|
||||||
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly LiveSessionRegistry _registry;
|
||||||
|
private readonly WorkerConfig _cfg;
|
||||||
|
|
||||||
|
public InteractiveSessionServiceTests()
|
||||||
|
{
|
||||||
|
_ctx = _db.CreateContext();
|
||||||
|
_tasks = new TaskRepository(_ctx);
|
||||||
|
_lists = new ListRepository(_ctx);
|
||||||
|
_hubCtx = new CapturingHubContext();
|
||||||
|
_broadcaster = new HubBroadcaster(_hubCtx);
|
||||||
|
_registry = new LiveSessionRegistry();
|
||||||
|
_cfg = new WorkerConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
_ctx.Dispose();
|
||||||
|
_db.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private InteractiveSessionService CreateService(
|
||||||
|
Func<string, IReadOnlyList<string>, Func<string, Task>, (ILiveSession session, Task exitTask)>? factory = null)
|
||||||
|
{
|
||||||
|
return new InteractiveSessionService(
|
||||||
|
_db.CreateFactory(),
|
||||||
|
_cfg,
|
||||||
|
_broadcaster,
|
||||||
|
_registry,
|
||||||
|
NullLoggerFactory.Instance,
|
||||||
|
factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<(string listId, string taskId, string workingDir)> SeedAsync()
|
||||||
|
{
|
||||||
|
var wd = Path.Combine(Path.GetTempPath(), $"iss_wd_{Guid.NewGuid():N}");
|
||||||
|
Directory.CreateDirectory(wd);
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
await _lists.AddAsync(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId,
|
||||||
|
Name = "L",
|
||||||
|
WorkingDir = wd,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
var task = new TaskEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
ListId = listId,
|
||||||
|
Title = "My task",
|
||||||
|
Description = "Do the thing",
|
||||||
|
Status = TaskStatus.Idle,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
CommitType = "feat",
|
||||||
|
};
|
||||||
|
await _tasks.AddAsync(task);
|
||||||
|
return (listId, task.Id, wd);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_MissingWorkingDir_Throws()
|
||||||
|
{
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
await _lists.AddAsync(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId,
|
||||||
|
Name = "NoDir",
|
||||||
|
WorkingDir = "/no/such/dir/ever/exists",
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
var task = new TaskEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
ListId = listId,
|
||||||
|
Title = "T",
|
||||||
|
Status = TaskStatus.Idle,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
CommitType = "feat",
|
||||||
|
};
|
||||||
|
await _tasks.AddAsync(task);
|
||||||
|
|
||||||
|
var svc = CreateService();
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => svc.StartAsync(task.Id, CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_NullWorkingDir_Throws()
|
||||||
|
{
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
await _lists.AddAsync(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId,
|
||||||
|
Name = "NullDir",
|
||||||
|
WorkingDir = null,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
var task = new TaskEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
ListId = listId,
|
||||||
|
Title = "T",
|
||||||
|
Status = TaskStatus.Idle,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
CommitType = "feat",
|
||||||
|
};
|
||||||
|
await _tasks.AddAsync(task);
|
||||||
|
|
||||||
|
var svc = CreateService();
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => svc.StartAsync(task.Id, CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_RegistersSessionAndBroadcastsStarted()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
// Session registered
|
||||||
|
Assert.True(_registry.TryGet(taskId, out var registered));
|
||||||
|
Assert.Same(fakeSession, registered);
|
||||||
|
|
||||||
|
// InteractiveSessionStarted broadcast
|
||||||
|
Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionStarted");
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
await Task.Delay(50); // let watcher fire
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_AlreadyRunning_Throws()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => svc.StartAsync(taskId, CancellationToken.None));
|
||||||
|
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
await Task.Delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ExitWatcher_UnregistersAndBroadcastsEnded()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
// Process exits naturally
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
|
||||||
|
// Give the watcher time to run
|
||||||
|
var deadline = DateTime.UtcNow.AddSeconds(2);
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
if (_registry.TryGet(taskId, out _) == false) break;
|
||||||
|
await Task.Delay(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.False(_registry.TryGet(taskId, out _));
|
||||||
|
Assert.Contains(_hubCtx.Proxy.Calls, c => c.Method == "InteractiveSessionEnded");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SendAsync_RoutesToSession()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
await svc.SendAsync(taskId, "hello", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(1, fakeSession.SendCalls);
|
||||||
|
Assert.Equal("hello", fakeSession.LastSentText);
|
||||||
|
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
await Task.Delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SendAsync_NoSession_Throws()
|
||||||
|
{
|
||||||
|
var svc = CreateService();
|
||||||
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => svc.SendAsync("nonexistent-task", "text", CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StopAsync_UnregistersSessionAndStopsIt()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
// Keep the exit task pending so the exit watcher doesn't race with StopAsync.
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, ___) => (fakeSession, exitTcs.Task));
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
// Stop before the process exits naturally.
|
||||||
|
await svc.StopAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
// Registry is cleared by StopAsync (which calls _registry.StopAsync -> session.StopAsync + TryRemove).
|
||||||
|
Assert.False(_registry.TryGet(taskId, out _));
|
||||||
|
Assert.True(fakeSession.Stopped);
|
||||||
|
|
||||||
|
// Let the watcher complete harmlessly.
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
await Task.Delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OnLineCallback_BroadcastsTaskMessageWithPrefix()
|
||||||
|
{
|
||||||
|
var (_, taskId, _) = await SeedAsync();
|
||||||
|
Func<string, Task>? capturedOnLine = null;
|
||||||
|
var fakeSession = new FakeLiveSession();
|
||||||
|
var exitTcs = new TaskCompletionSource<bool>();
|
||||||
|
|
||||||
|
var svc = CreateService((_, __, onLine) =>
|
||||||
|
{
|
||||||
|
capturedOnLine = onLine;
|
||||||
|
return (fakeSession, exitTcs.Task);
|
||||||
|
});
|
||||||
|
|
||||||
|
await svc.StartAsync(taskId, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.NotNull(capturedOnLine);
|
||||||
|
await capturedOnLine!("some line");
|
||||||
|
|
||||||
|
Assert.Contains(_hubCtx.Proxy.Calls, c =>
|
||||||
|
c.Method == "TaskMessage" &&
|
||||||
|
c.Args.Length >= 2 &&
|
||||||
|
c.Args[1] is string s && s.StartsWith("[stdout] "));
|
||||||
|
|
||||||
|
exitTcs.SetResult(true);
|
||||||
|
await Task.Delay(50);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class FakeLiveSession : ILiveSession
|
||||||
|
{
|
||||||
|
public bool IsTurnInFlight => false;
|
||||||
|
public int SendCalls { get; private set; }
|
||||||
|
public string? LastSentText { get; private set; }
|
||||||
|
public bool Stopped { get; private set; }
|
||||||
|
|
||||||
|
public Task SendUserMessageAsync(string text, CancellationToken ct)
|
||||||
|
{
|
||||||
|
SendCalls++;
|
||||||
|
LastSentText = text;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task StopAsync()
|
||||||
|
{
|
||||||
|
Stopped = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user