diff --git a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
index d33626f..6476797 100644
--- a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
+++ b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs
@@ -51,6 +51,7 @@ public interface IWorkerClient : INotifyPropertyChanged
Task AnswerTaskQuestionAsync(string taskId, string questionId, string answer);
Task SendInteractiveMessageAsync(string taskId, string text);
Task StopInteractiveSessionAsync(string taskId);
+ Task InterruptInteractiveSessionAsync(string taskId);
/// The question a running task is currently blocked on, if any (for re-attach).
Task GetPendingQuestionAsync(string taskId);
Task ResetTaskAsync(string taskId);
diff --git a/src/ClaudeDo.Ui/Services/WorkerClient.cs b/src/ClaudeDo.Ui/Services/WorkerClient.cs
index 02197d9..c0c1eab 100644
--- a/src/ClaudeDo.Ui/Services/WorkerClient.cs
+++ b/src/ClaudeDo.Ui/Services/WorkerClient.cs
@@ -303,6 +303,12 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC
catch { /* offline */ }
}
+ public async Task InterruptInteractiveSessionAsync(string taskId)
+ {
+ try { await _hub.InvokeAsync("InterruptInteractiveSession", taskId); }
+ catch { /* offline */ }
+ }
+
public Task GetPendingQuestionAsync(string taskId)
=> TryInvokeAsync("GetPendingQuestion", taskId);
diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs
index 7e8a592..3803fe6 100644
--- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs
+++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs
@@ -581,6 +581,9 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
public Task StopInteractiveSession(string taskId) =>
_interactive.StopAsync(taskId, Context.ConnectionAborted);
+ public Task InterruptInteractiveSession(string taskId) =>
+ _interactive.InterruptAsync(taskId, Context.ConnectionAborted);
+
public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
{
var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted);
diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
index 3d5b6f7..8495e3d 100644
--- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
+++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs
@@ -125,6 +125,12 @@ public sealed class InteractiveSessionService
await session.SendUserMessageAsync(text, ct);
}
+ public async Task InterruptAsync(string taskId, CancellationToken ct)
+ {
+ if (_registry.TryGet(taskId, out var session))
+ await session.InterruptAsync(ct);
+ }
+
public async Task StopAsync(string taskId, CancellationToken ct)
{
// StopAsync removes from registry and kills the session.
diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
index 8a879d4..1dc2a4c 100644
--- a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
+++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs
@@ -11,7 +11,7 @@ public sealed class StreamingClaudeSession : ILiveSession
private readonly SemaphoreSlim _sendLock = new(1, 1);
private volatile bool _isTurnInFlight;
- private TaskCompletionSource _turnTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly Queue _pending = new();
public bool IsTurnInFlight => _isTurnInFlight;
@@ -41,17 +41,32 @@ public sealed class StreamingClaudeSession : ILiveSession
try { await _onLine(line); }
catch (Exception ex) { _logger.LogWarning(ex, "onLine callback threw"); }
+ bool isResult;
try
{
using var doc = JsonDocument.Parse(line);
- if (doc.RootElement.TryGetProperty("type", out var typeProp)
- && typeProp.GetString() == "result")
+ isResult = doc.RootElement.TryGetProperty("type", out var typeProp)
+ && typeProp.GetString() == "result";
+ }
+ catch { isResult = false; }
+
+ if (!isResult) return;
+
+ // Turn ended — flush one queued message if available.
+ await _sendLock.WaitAsync();
+ try
+ {
+ _isTurnInFlight = false;
+ if (_pending.Count > 0)
{
- _isTurnInFlight = false;
- _turnTcs.TrySetResult(true);
+ var next = _pending.Dequeue();
+ await SendTurnAsync(next, CancellationToken.None);
}
}
- catch { /* unparseable line — ignore */ }
+ finally
+ {
+ _sendLock.Release();
+ }
}
public async Task SendUserMessageAsync(string text, CancellationToken ct)
@@ -59,19 +74,14 @@ public sealed class StreamingClaudeSession : ILiveSession
await _sendLock.WaitAsync(ct);
try
{
- if (_isTurnInFlight)
+ if (_isTurnInFlight || _pending.Count > 0)
{
- await InterruptInternalAsync(ct);
-
- // Wait for the current turn to end (interrupt-aborted result), with timeout.
- var turnDone = _turnTcs.Task;
- var timeout = Task.Delay(TimeSpan.FromSeconds(30), ct);
- var winner = await Task.WhenAny(turnDone, timeout);
- if (winner == timeout)
- _logger.LogWarning("Timed out waiting for turn to end after interrupt; proceeding anyway.");
+ _pending.Enqueue(text);
+ }
+ else
+ {
+ await SendTurnAsync(text, ct);
}
-
- await SendTurnAsync(text, ct);
}
finally
{
@@ -82,28 +92,29 @@ public sealed class StreamingClaudeSession : ILiveSession
public async Task InterruptAsync(CancellationToken ct)
{
await _sendLock.WaitAsync(ct);
- try { await InterruptInternalAsync(ct); }
- finally { _sendLock.Release(); }
- }
-
- private async Task InterruptInternalAsync(CancellationToken ct)
- {
- var requestId = Guid.NewGuid().ToString();
- var payload = JsonSerializer.Serialize(new
+ try
{
- type = "control_request",
- request_id = requestId,
- request = new { subtype = "interrupt" }
- });
+ if (!_isTurnInFlight) return;
- try { await _transport.WriteLineAsync(payload, ct); }
- catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); }
+ var requestId = Guid.NewGuid().ToString();
+ var payload = JsonSerializer.Serialize(new
+ {
+ type = "control_request",
+ request_id = requestId,
+ request = new { subtype = "interrupt" }
+ });
+
+ try { await _transport.WriteLineAsync(payload, ct); }
+ catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); }
+ }
+ finally
+ {
+ _sendLock.Release();
+ }
}
private async Task SendTurnAsync(string text, CancellationToken ct)
{
- // Reset the TCS for this new turn.
- _turnTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_isTurnInFlight = true;
var payload = JsonSerializer.Serialize(new
diff --git a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
index 5005274..2c3a110 100644
--- a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
+++ b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs
@@ -150,6 +150,12 @@ public abstract class StubWorkerClient : IWorkerClient
StoppedInteractive.Add(taskId);
return Task.CompletedTask;
}
+ public List InterruptedInteractive { get; } = new();
+ public virtual Task InterruptInteractiveSessionAsync(string taskId)
+ {
+ InterruptedInteractive.Add(taskId);
+ return Task.CompletedTask;
+ }
protected void RaisePropertyChanged(string name) => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(name));
}
diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
index 73b1a1e..0297b2e 100644
--- a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs
@@ -69,10 +69,10 @@ public sealed class StreamingClaudeSessionTests
await session.DisposeAsync();
}
- // ---- Sending while in-flight: interrupt first, then user message after result ----
+ // ---- Sending while in-flight queues the message; no interrupt written ----
[Fact]
- public async Task SendWhileInFlight_WritesInterruptFirst_ThenUserMessage()
+ public async Task SendWhileInFlight_QueuesMessage_NoInterrupt()
{
var transport = new FakeClaudeStreamTransport();
var session = Build(transport, []);
@@ -80,27 +80,67 @@ public sealed class StreamingClaudeSessionTests
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
// Written[0] = first user message. Turn is in flight.
Assert.True(session.IsTurnInFlight);
+ var countBefore = transport.Written.Count;
- // Fire the second send on a background task (it will block waiting for the turn to end).
- var sendTask = Task.Run(() => session.SendUserMessageAsync("second", CancellationToken.None));
+ await session.SendUserMessageAsync("second", CancellationToken.None);
- // Give the background task time to reach the await-turn-ended point.
- await Task.Delay(50);
+ // Nothing extra written yet — message is queued, no interrupt issued.
+ Assert.Equal(countBefore, transport.Written.Count);
+ Assert.True(session.IsTurnInFlight);
- // Push a result line to unblock it.
+ await session.DisposeAsync();
+ }
+
+ // ---- Queued message flushes automatically when result arrives ----
+
+ [Fact]
+ public async Task QueuedMessage_FlushesOnResult()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var session = Build(transport, []);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ await session.SendUserMessageAsync("second", CancellationToken.None);
+
+ // Push result — should dequeue "second" and send it.
await transport.PushLineAsync(ResultLine());
- await sendTask;
+ // After flush: IsTurnInFlight is true again for the second turn.
+ Assert.True(session.IsTurnInFlight);
- // Written[0] = first prompt, Written[1] = interrupt, Written[2] = second user message.
- Assert.True(transport.Written.Count >= 3, $"Expected ≥3 writes, got {transport.Written.Count}");
+ // Written[0] = "first", Written[1] = "second" user message.
+ Assert.Equal(2, transport.Written.Count);
+ using var doc = JsonDocument.Parse(transport.Written[1]);
+ Assert.Equal("user", doc.RootElement.GetProperty("type").GetString());
+ var text = doc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString();
+ Assert.Equal("second", text);
- // Written[1] must be an interrupt control_request.
+ await session.DisposeAsync();
+ }
+
+ // ---- Interrupt writes control_request when in-flight ----
+
+ [Fact]
+ public async Task Interrupt_WritesControlRequest_WhenInFlight()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var session = Build(transport, []);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ await session.SendUserMessageAsync("second", CancellationToken.None); // queued
+
+ await session.InterruptAsync(CancellationToken.None);
+
+ // Written[0] = first user message, Written[1] = interrupt control_request.
+ Assert.True(transport.Written.Count >= 2);
using var interruptDoc = JsonDocument.Parse(transport.Written[1]);
Assert.Equal("control_request", interruptDoc.RootElement.GetProperty("type").GetString());
Assert.Equal("interrupt", interruptDoc.RootElement.GetProperty("request").GetProperty("subtype").GetString());
- // Last write must be the user message with "second".
+ // Now push result — queued "second" must flush.
+ await transport.PushLineAsync(ResultLine());
+
+ Assert.True(session.IsTurnInFlight);
using var userDoc = JsonDocument.Parse(transport.Written[^1]);
Assert.Equal("user", userDoc.RootElement.GetProperty("type").GetString());
var text = userDoc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString();
@@ -109,6 +149,26 @@ public sealed class StreamingClaudeSessionTests
await session.DisposeAsync();
}
+ // ---- Interrupt is a no-op when idle ----
+
+ [Fact]
+ public async Task Interrupt_NoOp_WhenIdle()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var session = Build(transport, []);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ await transport.PushLineAsync(ResultLine()); // idle now
+ Assert.False(session.IsTurnInFlight);
+
+ var countBefore = transport.Written.Count;
+ await session.InterruptAsync(CancellationToken.None);
+
+ Assert.Equal(countBefore, transport.Written.Count);
+
+ await session.DisposeAsync();
+ }
+
// ---- Sending while idle writes user message with no interrupt ----
[Fact]
@@ -170,4 +230,40 @@ public sealed class StreamingClaudeSessionTests
await session.DisposeAsync();
}
+
+ // ---- Multiple queued messages flush one-per-result in FIFO order ----
+
+ [Fact]
+ public async Task MultipleQueued_FlushInFifoOrder()
+ {
+ var transport = new FakeClaudeStreamTransport();
+ var session = Build(transport, []);
+
+ await session.StartAsync([], "/tmp", "first", CancellationToken.None);
+ await session.SendUserMessageAsync("second", CancellationToken.None);
+ await session.SendUserMessageAsync("third", CancellationToken.None);
+
+ // Both "second" and "third" are queued; nothing extra written yet.
+ Assert.Single(transport.Written);
+
+ // Result 1 → flushes "second".
+ await transport.PushLineAsync(ResultLine());
+ Assert.Equal(2, transport.Written.Count);
+ using var doc2 = JsonDocument.Parse(transport.Written[1]);
+ Assert.Equal("second", doc2.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString());
+ Assert.True(session.IsTurnInFlight);
+
+ // Result 2 → flushes "third".
+ await transport.PushLineAsync(ResultLine());
+ Assert.Equal(3, transport.Written.Count);
+ using var doc3 = JsonDocument.Parse(transport.Written[2]);
+ Assert.Equal("third", doc3.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString());
+ Assert.True(session.IsTurnInFlight);
+
+ // Result 3 → queue empty, idle.
+ await transport.PushLineAsync(ResultLine());
+ Assert.False(session.IsTurnInFlight);
+
+ await session.DisposeAsync();
+ }
}
diff --git a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
index 7b5afff..d5c07bb 100644
--- a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
+++ b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs
@@ -126,6 +126,7 @@ sealed class FakeWorkerClient : IWorkerClient
public Task ClearOnlineInboxAuthAsync() => Task.CompletedTask;
public Task SendInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask;
public Task StopInteractiveSessionAsync(string taskId) => Task.CompletedTask;
+ public Task InterruptInteractiveSessionAsync(string taskId) => Task.CompletedTask;
public IReadOnlyList GetActiveTasks() => System.Array.Empty();
}