From bdda98eccd8caa7a43a51d3cc3fe6892f44f4ad7 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Fri, 26 Jun 2026 10:35:13 +0200 Subject: [PATCH] feat(worker): queue interactive messages by default, interrupt opt-in StreamingClaudeSession now buffers a mid-turn user message in a FIFO queue and flushes one when the turn's result arrives (no implicit interrupt). InterruptAsync only writes the control_request (no-op when idle); the resulting turn-end then flushes any queued message. New InteractiveSessionService.InterruptAsync + WorkerHub.InterruptInteractiveSession + IWorkerClient.InterruptInteractiveSessionAsync. --- .../Services/Interfaces/IWorkerClient.cs | 1 + src/ClaudeDo.Ui/Services/WorkerClient.cs | 6 + src/ClaudeDo.Worker/Hub/WorkerHub.cs | 3 + .../Planning/InteractiveSessionService.cs | 6 + .../Runner/StreamingClaudeSession.cs | 77 ++++++----- tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs | 6 + .../Runner/StreamingClaudeSessionTests.cs | 120 ++++++++++++++++-- .../UiVm/TasksIslandViewModelPlanningTests.cs | 1 + 8 files changed, 175 insertions(+), 45 deletions(-) diff --git a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs index d33626f..6476797 100644 --- a/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/Interfaces/IWorkerClient.cs @@ -51,6 +51,7 @@ public interface IWorkerClient : INotifyPropertyChanged Task AnswerTaskQuestionAsync(string taskId, string questionId, string answer); Task SendInteractiveMessageAsync(string taskId, string text); Task StopInteractiveSessionAsync(string taskId); + Task InterruptInteractiveSessionAsync(string taskId); /// The question a running task is currently blocked on, if any (for re-attach). Task GetPendingQuestionAsync(string taskId); Task ResetTaskAsync(string taskId); diff --git a/src/ClaudeDo.Ui/Services/WorkerClient.cs b/src/ClaudeDo.Ui/Services/WorkerClient.cs index 02197d9..c0c1eab 100644 --- a/src/ClaudeDo.Ui/Services/WorkerClient.cs +++ b/src/ClaudeDo.Ui/Services/WorkerClient.cs @@ -303,6 +303,12 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC catch { /* offline */ } } + public async Task InterruptInteractiveSessionAsync(string taskId) + { + try { await _hub.InvokeAsync("InterruptInteractiveSession", taskId); } + catch { /* offline */ } + } + public Task GetPendingQuestionAsync(string taskId) => TryInvokeAsync("GetPendingQuestion", taskId); diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index 7e8a592..3803fe6 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -581,6 +581,9 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub public Task StopInteractiveSession(string taskId) => _interactive.StopAsync(taskId, Context.ConnectionAborted); + public Task InterruptInteractiveSession(string taskId) => + _interactive.InterruptAsync(taskId, Context.ConnectionAborted); + public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false) { var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted); diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs index 3d5b6f7..8495e3d 100644 --- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs +++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs @@ -125,6 +125,12 @@ public sealed class InteractiveSessionService await session.SendUserMessageAsync(text, ct); } + public async Task InterruptAsync(string taskId, CancellationToken ct) + { + if (_registry.TryGet(taskId, out var session)) + await session.InterruptAsync(ct); + } + public async Task StopAsync(string taskId, CancellationToken ct) { // StopAsync removes from registry and kills the session. diff --git a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs index 8a879d4..1dc2a4c 100644 --- a/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs +++ b/src/ClaudeDo.Worker/Runner/StreamingClaudeSession.cs @@ -11,7 +11,7 @@ public sealed class StreamingClaudeSession : ILiveSession private readonly SemaphoreSlim _sendLock = new(1, 1); private volatile bool _isTurnInFlight; - private TaskCompletionSource _turnTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly Queue _pending = new(); public bool IsTurnInFlight => _isTurnInFlight; @@ -41,17 +41,32 @@ public sealed class StreamingClaudeSession : ILiveSession try { await _onLine(line); } catch (Exception ex) { _logger.LogWarning(ex, "onLine callback threw"); } + bool isResult; try { using var doc = JsonDocument.Parse(line); - if (doc.RootElement.TryGetProperty("type", out var typeProp) - && typeProp.GetString() == "result") + isResult = doc.RootElement.TryGetProperty("type", out var typeProp) + && typeProp.GetString() == "result"; + } + catch { isResult = false; } + + if (!isResult) return; + + // Turn ended — flush one queued message if available. + await _sendLock.WaitAsync(); + try + { + _isTurnInFlight = false; + if (_pending.Count > 0) { - _isTurnInFlight = false; - _turnTcs.TrySetResult(true); + var next = _pending.Dequeue(); + await SendTurnAsync(next, CancellationToken.None); } } - catch { /* unparseable line — ignore */ } + finally + { + _sendLock.Release(); + } } public async Task SendUserMessageAsync(string text, CancellationToken ct) @@ -59,19 +74,14 @@ public sealed class StreamingClaudeSession : ILiveSession await _sendLock.WaitAsync(ct); try { - if (_isTurnInFlight) + if (_isTurnInFlight || _pending.Count > 0) { - await InterruptInternalAsync(ct); - - // Wait for the current turn to end (interrupt-aborted result), with timeout. - var turnDone = _turnTcs.Task; - var timeout = Task.Delay(TimeSpan.FromSeconds(30), ct); - var winner = await Task.WhenAny(turnDone, timeout); - if (winner == timeout) - _logger.LogWarning("Timed out waiting for turn to end after interrupt; proceeding anyway."); + _pending.Enqueue(text); + } + else + { + await SendTurnAsync(text, ct); } - - await SendTurnAsync(text, ct); } finally { @@ -82,28 +92,29 @@ public sealed class StreamingClaudeSession : ILiveSession public async Task InterruptAsync(CancellationToken ct) { await _sendLock.WaitAsync(ct); - try { await InterruptInternalAsync(ct); } - finally { _sendLock.Release(); } - } - - private async Task InterruptInternalAsync(CancellationToken ct) - { - var requestId = Guid.NewGuid().ToString(); - var payload = JsonSerializer.Serialize(new + try { - type = "control_request", - request_id = requestId, - request = new { subtype = "interrupt" } - }); + if (!_isTurnInFlight) return; - try { await _transport.WriteLineAsync(payload, ct); } - catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); } + var requestId = Guid.NewGuid().ToString(); + var payload = JsonSerializer.Serialize(new + { + type = "control_request", + request_id = requestId, + request = new { subtype = "interrupt" } + }); + + try { await _transport.WriteLineAsync(payload, ct); } + catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); } + } + finally + { + _sendLock.Release(); + } } private async Task SendTurnAsync(string text, CancellationToken ct) { - // Reset the TCS for this new turn. - _turnTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _isTurnInFlight = true; var payload = JsonSerializer.Serialize(new diff --git a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs index 5005274..2c3a110 100644 --- a/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs +++ b/tests/ClaudeDo.Ui.Tests/StubWorkerClient.cs @@ -150,6 +150,12 @@ public abstract class StubWorkerClient : IWorkerClient StoppedInteractive.Add(taskId); return Task.CompletedTask; } + public List InterruptedInteractive { get; } = new(); + public virtual Task InterruptInteractiveSessionAsync(string taskId) + { + InterruptedInteractive.Add(taskId); + return Task.CompletedTask; + } protected void RaisePropertyChanged(string name) => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(name)); } diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs index 73b1a1e..0297b2e 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamingClaudeSessionTests.cs @@ -69,10 +69,10 @@ public sealed class StreamingClaudeSessionTests await session.DisposeAsync(); } - // ---- Sending while in-flight: interrupt first, then user message after result ---- + // ---- Sending while in-flight queues the message; no interrupt written ---- [Fact] - public async Task SendWhileInFlight_WritesInterruptFirst_ThenUserMessage() + public async Task SendWhileInFlight_QueuesMessage_NoInterrupt() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); @@ -80,27 +80,67 @@ public sealed class StreamingClaudeSessionTests await session.StartAsync([], "/tmp", "first", CancellationToken.None); // Written[0] = first user message. Turn is in flight. Assert.True(session.IsTurnInFlight); + var countBefore = transport.Written.Count; - // Fire the second send on a background task (it will block waiting for the turn to end). - var sendTask = Task.Run(() => session.SendUserMessageAsync("second", CancellationToken.None)); + await session.SendUserMessageAsync("second", CancellationToken.None); - // Give the background task time to reach the await-turn-ended point. - await Task.Delay(50); + // Nothing extra written yet — message is queued, no interrupt issued. + Assert.Equal(countBefore, transport.Written.Count); + Assert.True(session.IsTurnInFlight); - // Push a result line to unblock it. + await session.DisposeAsync(); + } + + // ---- Queued message flushes automatically when result arrives ---- + + [Fact] + public async Task QueuedMessage_FlushesOnResult() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await session.SendUserMessageAsync("second", CancellationToken.None); + + // Push result — should dequeue "second" and send it. await transport.PushLineAsync(ResultLine()); - await sendTask; + // After flush: IsTurnInFlight is true again for the second turn. + Assert.True(session.IsTurnInFlight); - // Written[0] = first prompt, Written[1] = interrupt, Written[2] = second user message. - Assert.True(transport.Written.Count >= 3, $"Expected ≥3 writes, got {transport.Written.Count}"); + // Written[0] = "first", Written[1] = "second" user message. + Assert.Equal(2, transport.Written.Count); + using var doc = JsonDocument.Parse(transport.Written[1]); + Assert.Equal("user", doc.RootElement.GetProperty("type").GetString()); + var text = doc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); + Assert.Equal("second", text); - // Written[1] must be an interrupt control_request. + await session.DisposeAsync(); + } + + // ---- Interrupt writes control_request when in-flight ---- + + [Fact] + public async Task Interrupt_WritesControlRequest_WhenInFlight() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await session.SendUserMessageAsync("second", CancellationToken.None); // queued + + await session.InterruptAsync(CancellationToken.None); + + // Written[0] = first user message, Written[1] = interrupt control_request. + Assert.True(transport.Written.Count >= 2); using var interruptDoc = JsonDocument.Parse(transport.Written[1]); Assert.Equal("control_request", interruptDoc.RootElement.GetProperty("type").GetString()); Assert.Equal("interrupt", interruptDoc.RootElement.GetProperty("request").GetProperty("subtype").GetString()); - // Last write must be the user message with "second". + // Now push result — queued "second" must flush. + await transport.PushLineAsync(ResultLine()); + + Assert.True(session.IsTurnInFlight); using var userDoc = JsonDocument.Parse(transport.Written[^1]); Assert.Equal("user", userDoc.RootElement.GetProperty("type").GetString()); var text = userDoc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); @@ -109,6 +149,26 @@ public sealed class StreamingClaudeSessionTests await session.DisposeAsync(); } + // ---- Interrupt is a no-op when idle ---- + + [Fact] + public async Task Interrupt_NoOp_WhenIdle() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await transport.PushLineAsync(ResultLine()); // idle now + Assert.False(session.IsTurnInFlight); + + var countBefore = transport.Written.Count; + await session.InterruptAsync(CancellationToken.None); + + Assert.Equal(countBefore, transport.Written.Count); + + await session.DisposeAsync(); + } + // ---- Sending while idle writes user message with no interrupt ---- [Fact] @@ -170,4 +230,40 @@ public sealed class StreamingClaudeSessionTests await session.DisposeAsync(); } + + // ---- Multiple queued messages flush one-per-result in FIFO order ---- + + [Fact] + public async Task MultipleQueued_FlushInFifoOrder() + { + var transport = new FakeClaudeStreamTransport(); + var session = Build(transport, []); + + await session.StartAsync([], "/tmp", "first", CancellationToken.None); + await session.SendUserMessageAsync("second", CancellationToken.None); + await session.SendUserMessageAsync("third", CancellationToken.None); + + // Both "second" and "third" are queued; nothing extra written yet. + Assert.Single(transport.Written); + + // Result 1 → flushes "second". + await transport.PushLineAsync(ResultLine()); + Assert.Equal(2, transport.Written.Count); + using var doc2 = JsonDocument.Parse(transport.Written[1]); + Assert.Equal("second", doc2.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString()); + Assert.True(session.IsTurnInFlight); + + // Result 2 → flushes "third". + await transport.PushLineAsync(ResultLine()); + Assert.Equal(3, transport.Written.Count); + using var doc3 = JsonDocument.Parse(transport.Written[2]); + Assert.Equal("third", doc3.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString()); + Assert.True(session.IsTurnInFlight); + + // Result 3 → queue empty, idle. + await transport.PushLineAsync(ResultLine()); + Assert.False(session.IsTurnInFlight); + + await session.DisposeAsync(); + } } diff --git a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs index 7b5afff..d5c07bb 100644 --- a/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs +++ b/tests/ClaudeDo.Worker.Tests/UiVm/TasksIslandViewModelPlanningTests.cs @@ -126,6 +126,7 @@ sealed class FakeWorkerClient : IWorkerClient public Task ClearOnlineInboxAuthAsync() => Task.CompletedTask; public Task SendInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask; public Task StopInteractiveSessionAsync(string taskId) => Task.CompletedTask; + public Task InterruptInteractiveSessionAsync(string taskId) => Task.CompletedTask; public IReadOnlyList GetActiveTasks() => System.Array.Empty(); }