feat(worker): queue interactive messages by default, interrupt opt-in
StreamingClaudeSession now buffers a mid-turn user message in a FIFO queue and flushes one when the turn's result arrives (no implicit interrupt). InterruptAsync only writes the control_request (no-op when idle); the resulting turn-end then flushes any queued message. New InteractiveSessionService.InterruptAsync + WorkerHub.InterruptInteractiveSession + IWorkerClient.InterruptInteractiveSessionAsync.
This commit is contained in:
@@ -51,6 +51,7 @@ public interface IWorkerClient : INotifyPropertyChanged
|
||||
Task AnswerTaskQuestionAsync(string taskId, string questionId, string answer);
|
||||
Task SendInteractiveMessageAsync(string taskId, string text);
|
||||
Task StopInteractiveSessionAsync(string taskId);
|
||||
Task InterruptInteractiveSessionAsync(string taskId);
|
||||
/// <summary>The question a running task is currently blocked on, if any (for re-attach).</summary>
|
||||
Task<PendingQuestionDto?> GetPendingQuestionAsync(string taskId);
|
||||
Task ResetTaskAsync(string taskId);
|
||||
|
||||
@@ -303,6 +303,12 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC
|
||||
catch { /* offline */ }
|
||||
}
|
||||
|
||||
public async Task InterruptInteractiveSessionAsync(string taskId)
|
||||
{
|
||||
try { await _hub.InvokeAsync("InterruptInteractiveSession", taskId); }
|
||||
catch { /* offline */ }
|
||||
}
|
||||
|
||||
public Task<PendingQuestionDto?> GetPendingQuestionAsync(string taskId)
|
||||
=> TryInvokeAsync<PendingQuestionDto>("GetPendingQuestion", taskId);
|
||||
|
||||
|
||||
@@ -581,6 +581,9 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
public Task StopInteractiveSession(string taskId) =>
|
||||
_interactive.StopAsync(taskId, Context.ConnectionAborted);
|
||||
|
||||
public Task InterruptInteractiveSession(string taskId) =>
|
||||
_interactive.InterruptAsync(taskId, Context.ConnectionAborted);
|
||||
|
||||
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
|
||||
{
|
||||
var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted);
|
||||
|
||||
@@ -125,6 +125,12 @@ public sealed class InteractiveSessionService
|
||||
await session.SendUserMessageAsync(text, ct);
|
||||
}
|
||||
|
||||
public async Task InterruptAsync(string taskId, CancellationToken ct)
|
||||
{
|
||||
if (_registry.TryGet(taskId, out var session))
|
||||
await session.InterruptAsync(ct);
|
||||
}
|
||||
|
||||
public async Task StopAsync(string taskId, CancellationToken ct)
|
||||
{
|
||||
// StopAsync removes from registry and kills the session.
|
||||
|
||||
@@ -11,7 +11,7 @@ public sealed class StreamingClaudeSession : ILiveSession
|
||||
|
||||
private readonly SemaphoreSlim _sendLock = new(1, 1);
|
||||
private volatile bool _isTurnInFlight;
|
||||
private TaskCompletionSource<bool> _turnTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly Queue<string> _pending = new();
|
||||
|
||||
public bool IsTurnInFlight => _isTurnInFlight;
|
||||
|
||||
@@ -41,17 +41,32 @@ public sealed class StreamingClaudeSession : ILiveSession
|
||||
try { await _onLine(line); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "onLine callback threw"); }
|
||||
|
||||
bool isResult;
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(line);
|
||||
if (doc.RootElement.TryGetProperty("type", out var typeProp)
|
||||
&& typeProp.GetString() == "result")
|
||||
isResult = doc.RootElement.TryGetProperty("type", out var typeProp)
|
||||
&& typeProp.GetString() == "result";
|
||||
}
|
||||
catch { isResult = false; }
|
||||
|
||||
if (!isResult) return;
|
||||
|
||||
// Turn ended — flush one queued message if available.
|
||||
await _sendLock.WaitAsync();
|
||||
try
|
||||
{
|
||||
_isTurnInFlight = false;
|
||||
_turnTcs.TrySetResult(true);
|
||||
if (_pending.Count > 0)
|
||||
{
|
||||
var next = _pending.Dequeue();
|
||||
await SendTurnAsync(next, CancellationToken.None);
|
||||
}
|
||||
}
|
||||
catch { /* unparseable line — ignore */ }
|
||||
finally
|
||||
{
|
||||
_sendLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendUserMessageAsync(string text, CancellationToken ct)
|
||||
@@ -59,20 +74,15 @@ public sealed class StreamingClaudeSession : ILiveSession
|
||||
await _sendLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
if (_isTurnInFlight)
|
||||
if (_isTurnInFlight || _pending.Count > 0)
|
||||
{
|
||||
await InterruptInternalAsync(ct);
|
||||
|
||||
// Wait for the current turn to end (interrupt-aborted result), with timeout.
|
||||
var turnDone = _turnTcs.Task;
|
||||
var timeout = Task.Delay(TimeSpan.FromSeconds(30), ct);
|
||||
var winner = await Task.WhenAny(turnDone, timeout);
|
||||
if (winner == timeout)
|
||||
_logger.LogWarning("Timed out waiting for turn to end after interrupt; proceeding anyway.");
|
||||
_pending.Enqueue(text);
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
await SendTurnAsync(text, ct);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_sendLock.Release();
|
||||
@@ -82,12 +92,10 @@ public sealed class StreamingClaudeSession : ILiveSession
|
||||
public async Task InterruptAsync(CancellationToken ct)
|
||||
{
|
||||
await _sendLock.WaitAsync(ct);
|
||||
try { await InterruptInternalAsync(ct); }
|
||||
finally { _sendLock.Release(); }
|
||||
}
|
||||
|
||||
private async Task InterruptInternalAsync(CancellationToken ct)
|
||||
try
|
||||
{
|
||||
if (!_isTurnInFlight) return;
|
||||
|
||||
var requestId = Guid.NewGuid().ToString();
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
{
|
||||
@@ -99,11 +107,14 @@ public sealed class StreamingClaudeSession : ILiveSession
|
||||
try { await _transport.WriteLineAsync(payload, ct); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "Failed to write interrupt control_request; degrading gracefully."); }
|
||||
}
|
||||
finally
|
||||
{
|
||||
_sendLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendTurnAsync(string text, CancellationToken ct)
|
||||
{
|
||||
// Reset the TCS for this new turn.
|
||||
_turnTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
_isTurnInFlight = true;
|
||||
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
|
||||
@@ -150,6 +150,12 @@ public abstract class StubWorkerClient : IWorkerClient
|
||||
StoppedInteractive.Add(taskId);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
public List<string> InterruptedInteractive { get; } = new();
|
||||
public virtual Task InterruptInteractiveSessionAsync(string taskId)
|
||||
{
|
||||
InterruptedInteractive.Add(taskId);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
protected void RaisePropertyChanged(string name) => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(name));
|
||||
}
|
||||
|
||||
@@ -69,10 +69,10 @@ public sealed class StreamingClaudeSessionTests
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Sending while in-flight: interrupt first, then user message after result ----
|
||||
// ---- Sending while in-flight queues the message; no interrupt written ----
|
||||
|
||||
[Fact]
|
||||
public async Task SendWhileInFlight_WritesInterruptFirst_ThenUserMessage()
|
||||
public async Task SendWhileInFlight_QueuesMessage_NoInterrupt()
|
||||
{
|
||||
var transport = new FakeClaudeStreamTransport();
|
||||
var session = Build(transport, []);
|
||||
@@ -80,27 +80,67 @@ public sealed class StreamingClaudeSessionTests
|
||||
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
|
||||
// Written[0] = first user message. Turn is in flight.
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
var countBefore = transport.Written.Count;
|
||||
|
||||
// Fire the second send on a background task (it will block waiting for the turn to end).
|
||||
var sendTask = Task.Run(() => session.SendUserMessageAsync("second", CancellationToken.None));
|
||||
await session.SendUserMessageAsync("second", CancellationToken.None);
|
||||
|
||||
// Give the background task time to reach the await-turn-ended point.
|
||||
await Task.Delay(50);
|
||||
// Nothing extra written yet — message is queued, no interrupt issued.
|
||||
Assert.Equal(countBefore, transport.Written.Count);
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
|
||||
// Push a result line to unblock it.
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Queued message flushes automatically when result arrives ----
|
||||
|
||||
[Fact]
|
||||
public async Task QueuedMessage_FlushesOnResult()
|
||||
{
|
||||
var transport = new FakeClaudeStreamTransport();
|
||||
var session = Build(transport, []);
|
||||
|
||||
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
|
||||
await session.SendUserMessageAsync("second", CancellationToken.None);
|
||||
|
||||
// Push result — should dequeue "second" and send it.
|
||||
await transport.PushLineAsync(ResultLine());
|
||||
|
||||
await sendTask;
|
||||
// After flush: IsTurnInFlight is true again for the second turn.
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
|
||||
// Written[0] = first prompt, Written[1] = interrupt, Written[2] = second user message.
|
||||
Assert.True(transport.Written.Count >= 3, $"Expected ≥3 writes, got {transport.Written.Count}");
|
||||
// Written[0] = "first", Written[1] = "second" user message.
|
||||
Assert.Equal(2, transport.Written.Count);
|
||||
using var doc = JsonDocument.Parse(transport.Written[1]);
|
||||
Assert.Equal("user", doc.RootElement.GetProperty("type").GetString());
|
||||
var text = doc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString();
|
||||
Assert.Equal("second", text);
|
||||
|
||||
// Written[1] must be an interrupt control_request.
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Interrupt writes control_request when in-flight ----
|
||||
|
||||
[Fact]
|
||||
public async Task Interrupt_WritesControlRequest_WhenInFlight()
|
||||
{
|
||||
var transport = new FakeClaudeStreamTransport();
|
||||
var session = Build(transport, []);
|
||||
|
||||
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
|
||||
await session.SendUserMessageAsync("second", CancellationToken.None); // queued
|
||||
|
||||
await session.InterruptAsync(CancellationToken.None);
|
||||
|
||||
// Written[0] = first user message, Written[1] = interrupt control_request.
|
||||
Assert.True(transport.Written.Count >= 2);
|
||||
using var interruptDoc = JsonDocument.Parse(transport.Written[1]);
|
||||
Assert.Equal("control_request", interruptDoc.RootElement.GetProperty("type").GetString());
|
||||
Assert.Equal("interrupt", interruptDoc.RootElement.GetProperty("request").GetProperty("subtype").GetString());
|
||||
|
||||
// Last write must be the user message with "second".
|
||||
// Now push result — queued "second" must flush.
|
||||
await transport.PushLineAsync(ResultLine());
|
||||
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
using var userDoc = JsonDocument.Parse(transport.Written[^1]);
|
||||
Assert.Equal("user", userDoc.RootElement.GetProperty("type").GetString());
|
||||
var text = userDoc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString();
|
||||
@@ -109,6 +149,26 @@ public sealed class StreamingClaudeSessionTests
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Interrupt is a no-op when idle ----
|
||||
|
||||
[Fact]
|
||||
public async Task Interrupt_NoOp_WhenIdle()
|
||||
{
|
||||
var transport = new FakeClaudeStreamTransport();
|
||||
var session = Build(transport, []);
|
||||
|
||||
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
|
||||
await transport.PushLineAsync(ResultLine()); // idle now
|
||||
Assert.False(session.IsTurnInFlight);
|
||||
|
||||
var countBefore = transport.Written.Count;
|
||||
await session.InterruptAsync(CancellationToken.None);
|
||||
|
||||
Assert.Equal(countBefore, transport.Written.Count);
|
||||
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Sending while idle writes user message with no interrupt ----
|
||||
|
||||
[Fact]
|
||||
@@ -170,4 +230,40 @@ public sealed class StreamingClaudeSessionTests
|
||||
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
// ---- Multiple queued messages flush one-per-result in FIFO order ----
|
||||
|
||||
[Fact]
|
||||
public async Task MultipleQueued_FlushInFifoOrder()
|
||||
{
|
||||
var transport = new FakeClaudeStreamTransport();
|
||||
var session = Build(transport, []);
|
||||
|
||||
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
|
||||
await session.SendUserMessageAsync("second", CancellationToken.None);
|
||||
await session.SendUserMessageAsync("third", CancellationToken.None);
|
||||
|
||||
// Both "second" and "third" are queued; nothing extra written yet.
|
||||
Assert.Single(transport.Written);
|
||||
|
||||
// Result 1 → flushes "second".
|
||||
await transport.PushLineAsync(ResultLine());
|
||||
Assert.Equal(2, transport.Written.Count);
|
||||
using var doc2 = JsonDocument.Parse(transport.Written[1]);
|
||||
Assert.Equal("second", doc2.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString());
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
|
||||
// Result 2 → flushes "third".
|
||||
await transport.PushLineAsync(ResultLine());
|
||||
Assert.Equal(3, transport.Written.Count);
|
||||
using var doc3 = JsonDocument.Parse(transport.Written[2]);
|
||||
Assert.Equal("third", doc3.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString());
|
||||
Assert.True(session.IsTurnInFlight);
|
||||
|
||||
// Result 3 → queue empty, idle.
|
||||
await transport.PushLineAsync(ResultLine());
|
||||
Assert.False(session.IsTurnInFlight);
|
||||
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,6 +126,7 @@ sealed class FakeWorkerClient : IWorkerClient
|
||||
public Task ClearOnlineInboxAuthAsync() => Task.CompletedTask;
|
||||
public Task SendInteractiveMessageAsync(string taskId, string text) => Task.CompletedTask;
|
||||
public Task StopInteractiveSessionAsync(string taskId) => Task.CompletedTask;
|
||||
public Task InterruptInteractiveSessionAsync(string taskId) => Task.CompletedTask;
|
||||
public IReadOnlyList<ActiveTask> GetActiveTasks() => System.Array.Empty<ActiveTask>();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user