diff --git a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs index 6476797..77bad36 100644 --- a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs @@ -27,6 +27,8 @@ public interface IWorkerClient : INotifyPropertyChanged event Action? InteractiveSessionStartedEvent; event Action? InteractiveSessionEndedEvent; + event Action>? InteractiveQueueChangedEvent; + event Action? InteractiveMessageSentEvent; event Action? PrepStartedEvent; event Action? PrepLineEvent; diff --git a/src/ClaudeDo.Ui/Services/WorkerClient.cs b/src/ClaudeDo.Ui/Services/WorkerClient.cs index c0c1eab..a27971d 100644 --- a/src/ClaudeDo.Ui/Services/WorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/WorkerClient.cs @@ -51,6 +51,8 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC public event Action? TaskQuestionResolvedEvent; public event Action? InteractiveSessionStartedEvent; public event Action? InteractiveSessionEndedEvent; + public event Action>? InteractiveQueueChangedEvent; + public event Action? InteractiveMessageSentEvent; public event Action? ConnectionRestoredEvent; public event Action? WorktreeUpdatedEvent; public event Action? ListUpdatedEvent; @@ -160,6 +162,16 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC Dispatcher.UIThread.Post(() => InteractiveSessionEndedEvent?.Invoke(taskId)); }); + _hub.On>("InteractiveQueueChanged", (taskId, pending) => + { + Dispatcher.UIThread.Post(() => InteractiveQueueChangedEvent?.Invoke(taskId, pending)); + }); + + _hub.On("InteractiveMessageSent", (taskId, text) => + { + Dispatcher.UIThread.Post(() => InteractiveMessageSentEvent?.Invoke(taskId, text)); + }); + _hub.On("WorktreeUpdated", taskId => { Dispatcher.UIThread.Post(() => WorktreeUpdatedEvent?.Invoke(taskId)); diff --git a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs index a311f7d..7ff40ea 100644 --- a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs +++ b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs @@ -83,4 +83,10 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster public Task InteractiveSessionEnded(string taskId) => _hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId); + + public Task InteractiveQueueChanged(string taskId, IReadOnlyList pending) => + _hub.Clients.All.SendAsync("InteractiveQueueChanged", taskId, pending); + + public Task InteractiveMessageSent(string taskId, string text) => + _hub.Clients.All.SendAsync("InteractiveMessageSent", taskId, text); } diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs index 8495e3d..e32c523 100644 --- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs +++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs @@ -88,7 +88,9 @@ public sealed class InteractiveSessionService var streamingSession = new StreamingClaudeSession( transport, onLine, - _loggerFactory.CreateLogger()); + _loggerFactory.CreateLogger(), + onQueueChanged: pending => _ = _broadcaster.InteractiveQueueChanged(taskId, pending), + onUserMessageSent: text => _ = _broadcaster.InteractiveMessageSent(taskId, text)); await streamingSession.StartAsync(args, workingDir, seededPrompt, ct); session = streamingSession; exitTask = transport.WaitForExitAsync(); diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs index 1dc2a4c..271c5ca 100644 --- a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs +++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs @@ -8,6 +8,8 @@ public sealed class StreamingClaudeSession : ILiveSession private readonly IClaudeStreamTransport _transport; private readonly Func _onLine; private readonly ILogger _logger; + private readonly Action>? _onQueueChanged; + private readonly Action? _onUserMessageSent; private readonly SemaphoreSlim _sendLock = new(1, 1); private volatile bool _isTurnInFlight; @@ -18,13 +20,19 @@ public sealed class StreamingClaudeSession : ILiveSession public StreamingClaudeSession( IClaudeStreamTransport transport, Func onLine, - ILogger logger) + ILogger logger, + Action>? onQueueChanged = null, + Action? onUserMessageSent = null) { _transport = transport; _onLine = onLine; _logger = logger; + _onQueueChanged = onQueueChanged; + _onUserMessageSent = onUserMessageSent; } + private IReadOnlyList SnapshotPending() => _pending.ToArray(); + public async Task StartAsync( IReadOnlyList args, string workingDirectory, @@ -34,6 +42,7 @@ public sealed class StreamingClaudeSession : ILiveSession _transport.LineReceived += HandleLineAsync; await _transport.StartAsync(args, workingDirectory, ct); await SendTurnAsync(firstPrompt, ct); + _onUserMessageSent?.Invoke(firstPrompt); } private async Task HandleLineAsync(string line) @@ -53,30 +62,45 @@ public sealed class StreamingClaudeSession : ILiveSession if (!isResult) return; // Turn ended — flush one queued message if available. + string? flushedText = null; + IReadOnlyList? remainingSnapshot = null; + await _sendLock.WaitAsync(); try { _isTurnInFlight = false; if (_pending.Count > 0) { - var next = _pending.Dequeue(); - await SendTurnAsync(next, CancellationToken.None); + flushedText = _pending.Dequeue(); + remainingSnapshot = SnapshotPending(); + await SendTurnAsync(flushedText, CancellationToken.None); } } finally { _sendLock.Release(); } + + if (flushedText is not null) + { + _onQueueChanged?.Invoke(remainingSnapshot!); + _onUserMessageSent?.Invoke(flushedText); + } } public async Task SendUserMessageAsync(string text, CancellationToken ct) { + bool enqueued = false; + IReadOnlyList? snapshot = null; + await _sendLock.WaitAsync(ct); try { if (_isTurnInFlight || _pending.Count > 0) { _pending.Enqueue(text); + snapshot = SnapshotPending(); + enqueued = true; } else { @@ -87,6 +111,11 @@ public sealed class StreamingClaudeSession : ILiveSession { _sendLock.Release(); } + + if (enqueued) + _onQueueChanged?.Invoke(snapshot!); + else + _onUserMessageSent?.Invoke(text); } public async Task InterruptAsync(CancellationToken ct) diff --git a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs index 2c3a110..4fe2de2 100644 --- a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs +++ b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs @@ -27,6 +27,8 @@ public abstract class StubWorkerClient : IWorkerClient public event Action? TaskQuestionResolvedEvent; public event Action? InteractiveSessionStartedEvent; public event Action? InteractiveSessionEndedEvent; + public event Action>? InteractiveQueueChangedEvent; + public event Action? InteractiveMessageSentEvent; public event Action? PrepStartedEvent; public event Action? PrepLineEvent; public event Action? PrepFinishedEvent; @@ -59,6 +61,8 @@ public abstract class StubWorkerClient : IWorkerClient public void RaiseInteractiveStarted(string taskId) => InteractiveSessionStartedEvent?.Invoke(taskId); public void RaiseInteractiveEnded(string taskId) => InteractiveSessionEndedEvent?.Invoke(taskId); + public void RaiseInteractiveQueueChanged(string taskId, IReadOnlyList pending) => InteractiveQueueChangedEvent?.Invoke(taskId, pending); + public void RaiseInteractiveMessageSent(string taskId, string text) => InteractiveMessageSentEvent?.Invoke(taskId, text); public virtual bool IsConnected => false; public virtual bool IsReconnecting => false; diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs index 0297b2e..01791a9 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs @@ -266,4 +266,141 @@ public sealed class StreamingClaudeSessionTests await session.DisposeAsync(); } + + // ────────────────────────────────────────────────────────────────────────── + // Callback tests (onQueueChanged / onUserMessageSent) + // ────────────────────────────────────────────────────────────────────────── + + private static StreamingClaudeSession BuildWithCallbacks( + FakeClaudeStreamTransport transport, + List> queueChanges, + List sent) + { + return new StreamingClaudeSession( + transport, + line => Task.CompletedTask, + NullLogger.Instance, + onQueueChanged: snapshot => queueChanges.Add(snapshot), + onUserMessageSent: text => sent.Add(text)); + } + + [Fact] + public async Task Start_InvokesOnUserMessageSent_WithFirstPrompt() + { + var transport = new FakeClaudeStreamTransport(); + var sent = new List(); + var session = BuildWithCallbacks(transport, [], sent); + + await session.StartAsync([], "/tmp", "hello", CancellationToken.None); + + Assert.Single(sent); + Assert.Equal("hello", sent[0]); + + await session.DisposeAsync(); + } + + [Fact] + public async Task SendWhileInFlight_InvokesOnQueueChanged_NotOnUserMessageSent() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + sent.Clear(); // ignore the initial prompt notification + + await session.SendUserMessageAsync("queued-msg", CancellationToken.None); + + Assert.Single(queueChanges); + Assert.Contains("queued-msg", queueChanges[0]); + Assert.DoesNotContain("queued-msg", sent); + + await session.DisposeAsync(); + } + + [Fact] + public async Task PushResult_FlushesPending_InvokesQueueClearThenUserMessageSent() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + sent.Clear(); + + await session.SendUserMessageAsync("queued-msg", CancellationToken.None); + queueChanges.Clear(); // ignore the enqueue snapshot + + await transport.PushLineAsync(ResultLine()); + + // After flush: one queueChanged with empty list, then sent contains flushed text. + Assert.Single(queueChanges); + Assert.Empty(queueChanges[0]); + Assert.Single(sent); + Assert.Equal("queued-msg", sent[0]); + + await session.DisposeAsync(); + } + + [Fact] + public async Task SendWhileIdle_InvokesOnUserMessageSent_NoQueueChanged() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await transport.PushLineAsync(ResultLine()); // go idle + sent.Clear(); + queueChanges.Clear(); + + await session.SendUserMessageAsync("idle-msg", CancellationToken.None); + + Assert.Empty(queueChanges); + Assert.Single(sent); + Assert.Equal("idle-msg", sent[0]); + + await session.DisposeAsync(); + } + + [Fact] + public async Task TwoMessagesQueued_FlushFifo_QueueSnapshotsShrink() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + sent.Clear(); + + await session.SendUserMessageAsync("second", CancellationToken.None); + await session.SendUserMessageAsync("third", CancellationToken.None); + // queueChanges[0] = ["second"], queueChanges[1] = ["second","third"] + Assert.Equal(2, queueChanges.Count); + Assert.Equal(new[] { "second" }, queueChanges[0]); + Assert.Equal(new[] { "second", "third" }, queueChanges[1]); + queueChanges.Clear(); + + // Result 1 → flushes "second"; remaining queue = ["third"] + await transport.PushLineAsync(ResultLine()); + Assert.Single(queueChanges); + Assert.Equal(new[] { "third" }, queueChanges[0]); + Assert.Single(sent); + Assert.Equal("second", sent[0]); + sent.Clear(); + queueChanges.Clear(); + + // Result 2 → flushes "third"; remaining queue = [] + await transport.PushLineAsync(ResultLine()); + Assert.Single(queueChanges); + Assert.Empty(queueChanges[0]); + Assert.Single(sent); + Assert.Equal("third", sent[0]); + + await session.DisposeAsync(); + } } diff --git a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs index d5c07bb..f5099d2 100644 --- a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs +++ b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs @@ -38,6 +38,8 @@ sealed class FakeWorkerClient : IWorkerClient public event Action? TaskQuestionResolvedEvent; public event Action? InteractiveSessionStartedEvent; public event Action? InteractiveSessionEndedEvent; + public event Action>? InteractiveQueueChangedEvent; + public event Action? InteractiveMessageSentEvent; public void RaiseTaskUpdated(string taskId) => TaskUpdatedEvent?.Invoke(taskId); public void RaiseWorktreeUpdated(string taskId) => WorktreeUpdatedEvent?.Invoke(taskId); public void RaiseTaskMessage(string taskId, string line) => TaskMessageEvent?.Invoke(taskId, line);