From fd1e38fb7f3e2ce96f2a295db5ba972ab0082f2c Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Fri, 26 Jun 2026 11:15:10 +0200 Subject: [PATCH] feat(worker): remove a queued interactive message StreamingClaudeSession.RemoveQueuedAsync drops the first occurrence of a queued message from _pending and re-broadcasts the updated queue. Wired through InteractiveSessionService + WorkerHub.RemoveQueuedInteractiveMessage + IWorkerClient.RemoveQueuedInteractiveMessageAsync. Removal by text (first match) is robust to a turn flushing mid-click. Fakes + ILiveSession impls updated. --- .../Services/Interfaces/IWorkerClient.cs | 1 + src/ClaudeDo.Ui/Services/WorkerClient.cs | 6 ++ src/ClaudeDo.Worker/Hub/WorkerHub.cs | 3 + .../Planning/InteractiveSessionService.cs | 6 ++ .../Runner/Interfaces/ILiveSession.cs | 1 + .../Runner/StreamingClaudeSession.cs | 29 +++++++++ tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs | 6 ++ .../InteractiveSessionServiceTests.cs | 2 + .../Runner/LiveSessionRegistryTests.cs | 1 + .../Runner/StreamingClaudeSessionTests.cs | 63 +++++++++++++++++++ .../UiVm/TasksIslandViewModelPlanningTests.cs | 1 + 11 files changed, 119 insertions(+) diff --git a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs index 77bad36..77d74fd 100644 --- a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs @@ -52,6 +52,7 @@ public interface IWorkerClient : INotifyPropertyChanged /// Answer a question a running task raised via AskUser. Task AnswerTaskQuestionAsync(string taskId, string questionId, string answer); Task SendInteractiveMessageAsync(string taskId, string text); + Task RemoveQueuedInteractiveMessageAsync(string taskId, string text); Task StopInteractiveSessionAsync(string taskId); Task InterruptInteractiveSessionAsync(string taskId); /// The question a running task is currently blocked on, if any (for re-attach). diff --git a/src/ClaudeDo.Ui/Services/WorkerClient.cs b/src/ClaudeDo.Ui/Services/WorkerClient.cs index a27971d..ca04d53 100644 --- a/src/ClaudeDo.Ui/Services/WorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/WorkerClient.cs @@ -309,6 +309,12 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC catch { /* offline or session already ended */ } } + public async Task RemoveQueuedInteractiveMessageAsync(string taskId, string text) + { + try { await _hub.InvokeAsync("RemoveQueuedInteractiveMessage", taskId, text); } + catch { /* offline or session already ended */ } + } + public async Task StopInteractiveSessionAsync(string taskId) { try { await _hub.InvokeAsync("StopInteractiveSession", taskId); } diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index 3803fe6..e8f3287 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -584,6 +584,9 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub public Task InterruptInteractiveSession(string taskId) => _interactive.InterruptAsync(taskId, Context.ConnectionAborted); + public Task RemoveQueuedInteractiveMessage(string taskId, string text) => + _interactive.RemoveQueuedAsync(taskId, text, Context.ConnectionAborted); + public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false) { var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted); diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs index e32c523..099790a 100644 --- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs +++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs @@ -127,6 +127,12 @@ public sealed class InteractiveSessionService await session.SendUserMessageAsync(text, ct); } + public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct) + { + if (_registry.TryGet(taskId, out var session)) + await session.RemoveQueuedAsync(text, ct); + } + public async Task InterruptAsync(string taskId, CancellationToken ct) { if (_registry.TryGet(taskId, out var session)) diff --git a/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs index c7fb538..2fe72be 100644 --- a/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs +++ b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs @@ -4,6 +4,7 @@ public interface ILiveSession : IAsyncDisposable { bool IsTurnInFlight { get; } Task SendUserMessageAsync(string text, CancellationToken ct); + Task RemoveQueuedAsync(string text, CancellationToken ct); Task InterruptAsync(CancellationToken ct); Task StopAsync(); } diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs index 271c5ca..57de070 100644 --- a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs +++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs @@ -118,6 +118,35 @@ public sealed class StreamingClaudeSession : ILiveSession _onUserMessageSent?.Invoke(text); } + public async Task RemoveQueuedAsync(string text, CancellationToken ct) + { + IReadOnlyList? snapshot = null; + + await _sendLock.WaitAsync(ct); + try + { + if (_pending.Count == 0) return; + + var list = _pending.ToList(); + var idx = list.IndexOf(text); + if (idx < 0) return; + + list.RemoveAt(idx); + _pending.Clear(); + foreach (var item in list) + _pending.Enqueue(item); + + snapshot = SnapshotPending(); + } + finally + { + _sendLock.Release(); + } + + if (snapshot is not null) + _onQueueChanged?.Invoke(snapshot); + } + public async Task InterruptAsync(CancellationToken ct) { await _sendLock.WaitAsync(ct); diff --git a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs index 4fe2de2..3733047 100644 --- a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs +++ b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs @@ -148,6 +148,12 @@ public abstract class StubWorkerClient : IWorkerClient SentInteractive.Add((taskId, text)); return Task.CompletedTask; } + public List<(string TaskId, string Text)> RemovedQueued { get; } = new(); + public virtual Task RemoveQueuedInteractiveMessageAsync(string taskId, string text) + { + RemovedQueued.Add((taskId, text)); + return Task.CompletedTask; + } public List StoppedInteractive { get; } = new(); public virtual Task StopInteractiveSessionAsync(string taskId) { diff --git a/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs index 80da00a..9e384b2 100644 --- a/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs @@ -289,6 +289,8 @@ internal sealed class FakeLiveSession : ILiveSession return Task.CompletedTask; } + public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask; + public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask; public Task StopAsync() diff --git a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs index b028bd0..54ab2c8 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs @@ -11,6 +11,7 @@ public sealed class LiveSessionRegistryTests public bool IsTurnInFlight => false; public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask; + public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask; public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask; public Task StopAsync() diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs index 01791a9..634fed6 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs @@ -403,4 +403,67 @@ public sealed class StreamingClaudeSessionTests await session.DisposeAsync(); } + + // ────────────────────────────────────────────────────────────────────────── + // RemoveQueuedAsync tests + // ────────────────────────────────────────────────────────────────────────── + + [Fact] + public async Task RemoveQueued_RemovesFirstOccurrence_SnapshotContainsOnlySecond_AndSecondDeliveredOnResult() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + sent.Clear(); + + // Enqueue two messages while turn is in flight. + await session.SendUserMessageAsync("alpha", CancellationToken.None); + await session.SendUserMessageAsync("beta", CancellationToken.None); + queueChanges.Clear(); + + // Remove "alpha" from the queue. + await session.RemoveQueuedAsync("alpha", CancellationToken.None); + + // Snapshot emitted and contains only "beta". + Assert.Single(queueChanges); + Assert.Equal(new[] { "beta" }, queueChanges[0]); + + // Push result → only "beta" is flushed, not "alpha". + sent.Clear(); + queueChanges.Clear(); + await transport.PushLineAsync(ResultLine()); + + Assert.Single(sent); + Assert.Equal("beta", sent[0]); + + // Queue now empty; next result leaves us idle. + await transport.PushLineAsync(ResultLine()); + Assert.False(session.IsTurnInFlight); + + await session.DisposeAsync(); + } + + [Fact] + public async Task RemoveQueued_NotFound_NoQueueChangedCallback() + { + var transport = new FakeClaudeStreamTransport(); + var queueChanges = new List>(); + var sent = new List(); + var session = BuildWithCallbacks(transport, queueChanges, sent); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await session.SendUserMessageAsync("alpha", CancellationToken.None); + queueChanges.Clear(); + + // Try to remove a message that is not in the queue. + await session.RemoveQueuedAsync("nope", CancellationToken.None); + + // No new snapshot emitted. + Assert.Empty(queueChanges); + + await session.DisposeAsync(); + } } diff --git a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs index f5099d2..052394f 100644 --- a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs +++ b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs @@ -127,6 +127,7 @@ sealed class FakeWorkerClient : IWorkerClient public Task SetOnlineInboxAuthAsync(string refreshToken) => Task.CompletedTask; public Task ClearOnlineInboxAuthAsync() => Task.CompletedTask; public Task SendInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask; + public Task RemoveQueuedInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask; public Task StopInteractiveSessionAsync(string taskId) => Task.CompletedTask; public Task InterruptInteractiveSessionAsync(string taskId) => Task.CompletedTask; public IReadOnlyList GetActiveTasks() => System.Array.Empty();