diff --git a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
index 77bad36..77d74fd 100644
--- a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
+++ b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
@@ -52,6 +52,7 @@ public interface IWorkerClient : INotifyPropertyChanged
/// Answer a question a running task raised via AskUser.
Task AnswerTaskQuestionAsync(string taskId, string questionId, string answer);
Task SendInteractiveMessageAsync(string taskId, string text);
+ Task RemoveQueuedInteractiveMessageAsync(string taskId, string text);
Task StopInteractiveSessionAsync(string taskId);
Task InterruptInteractiveSessionAsync(string taskId);
/// The question a running task is currently blocked on, if any (for re-attach).
diff --git a/src/ClaudeDo.Ui/Services/WorkerClient.cs b/src/ClaudeDo.Ui/Services/WorkerClient.cs
index a27971d..ca04d53 100644
--- a/src/ClaudeDo.Ui/Services/WorkerClient.cs
+++ b/src/ClaudeDo.Ui/Services/WorkerClient.cs
@@ -309,6 +309,12 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC
catch { /* offline or session already ended */ }
}
+ public async Task RemoveQueuedInteractiveMessageAsync(string taskId, string text)
+ {
+ try { await _hub.InvokeAsync("RemoveQueuedInteractiveMessage", taskId, text); }
+ catch { /* offline or session already ended */ }
+ }
+
public async Task StopInteractiveSessionAsync(string taskId)
{
try { await _hub.InvokeAsync("StopInteractiveSession", taskId); }
diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs
index 3803fe6..e8f3287 100644
--- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs
+++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs
@@ -584,6 +584,9 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
public Task InterruptInteractiveSession(string taskId) =>
_interactive.InterruptAsync(taskId, Context.ConnectionAborted);
+ public Task RemoveQueuedInteractiveMessage(string taskId, string text) =>
+ _interactive.RemoveQueuedAsync(taskId, text, Context.ConnectionAborted);
+
public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
{
var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted);
diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
index e32c523..099790a 100644
--- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
+++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
@@ -127,6 +127,12 @@ public sealed class InteractiveSessionService
await session.SendUserMessageAsync(text, ct);
}
+ public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct)
+ {
+ if (_registry.TryGet(taskId, out var session))
+ await session.RemoveQueuedAsync(text, ct);
+ }
+
public async Task InterruptAsync(string taskId, CancellationToken ct)
{
if (_registry.TryGet(taskId, out var session))
diff --git a/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs
index c7fb538..2fe72be 100644
--- a/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs
+++ b/src/ClaudeDo.Worker/Runner/Interfaces/ILiveSession.cs
@@ -4,6 +4,7 @@ public interface ILiveSession : IAsyncDisposable
{
bool IsTurnInFlight { get; }
Task SendUserMessageAsync(string text, CancellationToken ct);
+ Task RemoveQueuedAsync(string text, CancellationToken ct);
Task InterruptAsync(CancellationToken ct);
Task StopAsync();
}
diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
index 271c5ca..57de070 100644
--- a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
+++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
@@ -118,6 +118,35 @@ public sealed class StreamingClaudeSession : ILiveSession
_onUserMessageSent?.Invoke(text);
}
+ public async Task RemoveQueuedAsync(string text, CancellationToken ct)
+ {
+ IReadOnlyList? snapshot = null;
+
+ await _sendLock.WaitAsync(ct);
+ try
+ {
+ if (_pending.Count == 0) return;
+
+ var list = _pending.ToList();
+ var idx = list.IndexOf(text);
+ if (idx < 0) return;
+
+ list.RemoveAt(idx);
+ _pending.Clear();
+ foreach (var item in list)
+ _pending.Enqueue(item);
+
+ snapshot = SnapshotPending();
+ }
+ finally
+ {
+ _sendLock.Release();
+ }
+
+ if (snapshot is not null)
+ _onQueueChanged?.Invoke(snapshot);
+ }
+
public async Task InterruptAsync(CancellationToken ct)
{
await _sendLock.WaitAsync(ct);
diff --git a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
index 4fe2de2..3733047 100644
--- a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
+++ b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
@@ -148,6 +148,12 @@ public abstract class StubWorkerClient : IWorkerClient
SentInteractive.Add((taskId, text));
return Task.CompletedTask;
}
+ public List<(string TaskId, string Text)> RemovedQueued { get; } = new();
+ public virtual Task RemoveQueuedInteractiveMessageAsync(string taskId, string text)
+ {
+ RemovedQueued.Add((taskId, text));
+ return Task.CompletedTask;
+ }
public List StoppedInteractive { get; } = new();
public virtual Task StopInteractiveSessionAsync(string taskId)
{
diff --git a/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs
index 80da00a..9e384b2 100644
--- a/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/Planning/InteractiveSessionServiceTests.cs
@@ -289,6 +289,8 @@ internal sealed class FakeLiveSession : ILiveSession
return Task.CompletedTask;
}
+ public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask;
+
public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask;
public Task StopAsync()
diff --git a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs
index b028bd0..54ab2c8 100644
--- a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs
@@ -11,6 +11,7 @@ public sealed class LiveSessionRegistryTests
public bool IsTurnInFlight => false;
public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask;
+ public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask;
public Task InterruptAsync(CancellationToken ct) => Task.CompletedTask;
public Task StopAsync()
diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
index 01791a9..634fed6 100644
--- a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
@@ -403,4 +403,67 @@ public sealed class StreamingClaudeSessionTests
await session.DisposeAsync();
}
+
+ // ──────────────────────────────────────────────────────────────────────────
+ // RemoveQueuedAsync tests
+ // ──────────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task RemoveQueued_RemovesFirstOccurrence_SnapshotContainsOnlySecond_AndSecondDeliveredOnResult()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var queueChanges = new List>();
+ var sent = new List();
+ var session = BuildWithCallbacks(transport, queueChanges, sent);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ sent.Clear();
+
+ // Enqueue two messages while turn is in flight.
+ await session.SendUserMessageAsync("alpha", CancellationToken.None);
+ await session.SendUserMessageAsync("beta", CancellationToken.None);
+ queueChanges.Clear();
+
+ // Remove "alpha" from the queue.
+ await session.RemoveQueuedAsync("alpha", CancellationToken.None);
+
+ // Snapshot emitted and contains only "beta".
+ Assert.Single(queueChanges);
+ Assert.Equal(new[] { "beta" }, queueChanges[0]);
+
+ // Push result → only "beta" is flushed, not "alpha".
+ sent.Clear();
+ queueChanges.Clear();
+ await transport.PushLineAsync(ResultLine());
+
+ Assert.Single(sent);
+ Assert.Equal("beta", sent[0]);
+
+ // Queue now empty; next result leaves us idle.
+ await transport.PushLineAsync(ResultLine());
+ Assert.False(session.IsTurnInFlight);
+
+ await session.DisposeAsync();
+ }
+
+ [Fact]
+ public async Task RemoveQueued_NotFound_NoQueueChangedCallback()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var queueChanges = new List>();
+ var sent = new List();
+ var session = BuildWithCallbacks(transport, queueChanges, sent);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ await session.SendUserMessageAsync("alpha", CancellationToken.None);
+ queueChanges.Clear();
+
+ // Try to remove a message that is not in the queue.
+ await session.RemoveQueuedAsync("nope", CancellationToken.None);
+
+ // No new snapshot emitted.
+ Assert.Empty(queueChanges);
+
+ await session.DisposeAsync();
+ }
}
diff --git a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
index f5099d2..052394f 100644
--- a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
@@ -127,6 +127,7 @@ sealed class FakeWorkerClient : IWorkerClient
public Task SetOnlineInboxAuthAsync(string refreshToken) => Task.CompletedTask;
public Task ClearOnlineInboxAuthAsync() => Task.CompletedTask;
public Task SendInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask;
+ public Task RemoveQueuedInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask;
public Task StopInteractiveSessionAsync(string taskId) => Task.CompletedTask;
public Task InterruptInteractiveSessionAsync(string taskId) => Task.CompletedTask;
public IReadOnlyList GetActiveTasks() => System.Array.Empty();