feat(worker): broadcast interactive message queue + delivery

StreamingClaudeSession raises onQueueChanged (pending snapshot) and onUserMessageSent
(on delivery, incl. the seeded first prompt); InteractiveSessionService forwards these
as InteractiveQueueChanged/InteractiveMessageSent broadcasts. Lets the UI show queued
messages above the input and move a message into the transcript only when actually
delivered to Claude. Client events + fakes updated.
This commit is contained in:
Mika Kuns
2026-06-26 10:57:20 +02:00
parent 8e1732a3a0
commit 84034e8395
8 changed files with 198 additions and 4 deletions

View File

@@ -27,6 +27,8 @@ public interface IWorkerClient : INotifyPropertyChanged
event Action<string>? InteractiveSessionStartedEvent; event Action<string>? InteractiveSessionStartedEvent;
event Action<string>? InteractiveSessionEndedEvent; event Action<string>? InteractiveSessionEndedEvent;
event Action<string, IReadOnlyList<string>>? InteractiveQueueChangedEvent;
event Action<string, string>? InteractiveMessageSentEvent;
event Action? PrepStartedEvent; event Action? PrepStartedEvent;
event Action<string>? PrepLineEvent; event Action<string>? PrepLineEvent;

View File

@@ -51,6 +51,8 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC
public event Action<string, string>? TaskQuestionResolvedEvent; public event Action<string, string>? TaskQuestionResolvedEvent;
public event Action<string>? InteractiveSessionStartedEvent; public event Action<string>? InteractiveSessionStartedEvent;
public event Action<string>? InteractiveSessionEndedEvent; public event Action<string>? InteractiveSessionEndedEvent;
public event Action<string, IReadOnlyList<string>>? InteractiveQueueChangedEvent;
public event Action<string, string>? InteractiveMessageSentEvent;
public event Action? ConnectionRestoredEvent; public event Action? ConnectionRestoredEvent;
public event Action<string>? WorktreeUpdatedEvent; public event Action<string>? WorktreeUpdatedEvent;
public event Action<string>? ListUpdatedEvent; public event Action<string>? ListUpdatedEvent;
@@ -160,6 +162,16 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerC
Dispatcher.UIThread.Post(() => InteractiveSessionEndedEvent?.Invoke(taskId)); Dispatcher.UIThread.Post(() => InteractiveSessionEndedEvent?.Invoke(taskId));
}); });
_hub.On<string, IReadOnlyList<string>>("InteractiveQueueChanged", (taskId, pending) =>
{
Dispatcher.UIThread.Post(() => InteractiveQueueChangedEvent?.Invoke(taskId, pending));
});
_hub.On<string, string>("InteractiveMessageSent", (taskId, text) =>
{
Dispatcher.UIThread.Post(() => InteractiveMessageSentEvent?.Invoke(taskId, text));
});
_hub.On<string>("WorktreeUpdated", taskId => _hub.On<string>("WorktreeUpdated", taskId =>
{ {
Dispatcher.UIThread.Post(() => WorktreeUpdatedEvent?.Invoke(taskId)); Dispatcher.UIThread.Post(() => WorktreeUpdatedEvent?.Invoke(taskId));

View File

@@ -83,4 +83,10 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster
public Task InteractiveSessionEnded(string taskId) => public Task InteractiveSessionEnded(string taskId) =>
_hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId); _hub.Clients.All.SendAsync("InteractiveSessionEnded", taskId);
public Task InteractiveQueueChanged(string taskId, IReadOnlyList<string> pending) =>
_hub.Clients.All.SendAsync("InteractiveQueueChanged", taskId, pending);
public Task InteractiveMessageSent(string taskId, string text) =>
_hub.Clients.All.SendAsync("InteractiveMessageSent", taskId, text);
} }

View File

@@ -88,7 +88,9 @@ public sealed class InteractiveSessionService
var streamingSession = new StreamingClaudeSession( var streamingSession = new StreamingClaudeSession(
transport, transport,
onLine, onLine,
_loggerFactory.CreateLogger<StreamingClaudeSession>()); _loggerFactory.CreateLogger<StreamingClaudeSession>(),
onQueueChanged: pending => _ = _broadcaster.InteractiveQueueChanged(taskId, pending),
onUserMessageSent: text => _ = _broadcaster.InteractiveMessageSent(taskId, text));
await streamingSession.StartAsync(args, workingDir, seededPrompt, ct); await streamingSession.StartAsync(args, workingDir, seededPrompt, ct);
session = streamingSession; session = streamingSession;
exitTask = transport.WaitForExitAsync(); exitTask = transport.WaitForExitAsync();

View File

@@ -8,6 +8,8 @@ public sealed class StreamingClaudeSession : ILiveSession
private readonly IClaudeStreamTransport _transport; private readonly IClaudeStreamTransport _transport;
private readonly Func<string, Task> _onLine; private readonly Func<string, Task> _onLine;
private readonly ILogger<StreamingClaudeSession> _logger; private readonly ILogger<StreamingClaudeSession> _logger;
private readonly Action<IReadOnlyList<string>>? _onQueueChanged;
private readonly Action<string>? _onUserMessageSent;
private readonly SemaphoreSlim _sendLock = new(1, 1); private readonly SemaphoreSlim _sendLock = new(1, 1);
private volatile bool _isTurnInFlight; private volatile bool _isTurnInFlight;
@@ -18,13 +20,19 @@ public sealed class StreamingClaudeSession : ILiveSession
public StreamingClaudeSession( public StreamingClaudeSession(
IClaudeStreamTransport transport, IClaudeStreamTransport transport,
Func<string, Task> onLine, Func<string, Task> onLine,
ILogger<StreamingClaudeSession> logger) ILogger<StreamingClaudeSession> logger,
Action<IReadOnlyList<string>>? onQueueChanged = null,
Action<string>? onUserMessageSent = null)
{ {
_transport = transport; _transport = transport;
_onLine = onLine; _onLine = onLine;
_logger = logger; _logger = logger;
_onQueueChanged = onQueueChanged;
_onUserMessageSent = onUserMessageSent;
} }
private IReadOnlyList<string> SnapshotPending() => _pending.ToArray();
public async Task StartAsync( public async Task StartAsync(
IReadOnlyList<string> args, IReadOnlyList<string> args,
string workingDirectory, string workingDirectory,
@@ -34,6 +42,7 @@ public sealed class StreamingClaudeSession : ILiveSession
_transport.LineReceived += HandleLineAsync; _transport.LineReceived += HandleLineAsync;
await _transport.StartAsync(args, workingDirectory, ct); await _transport.StartAsync(args, workingDirectory, ct);
await SendTurnAsync(firstPrompt, ct); await SendTurnAsync(firstPrompt, ct);
_onUserMessageSent?.Invoke(firstPrompt);
} }
private async Task HandleLineAsync(string line) private async Task HandleLineAsync(string line)
@@ -53,30 +62,45 @@ public sealed class StreamingClaudeSession : ILiveSession
if (!isResult) return; if (!isResult) return;
// Turn ended — flush one queued message if available. // Turn ended — flush one queued message if available.
string? flushedText = null;
IReadOnlyList<string>? remainingSnapshot = null;
await _sendLock.WaitAsync(); await _sendLock.WaitAsync();
try try
{ {
_isTurnInFlight = false; _isTurnInFlight = false;
if (_pending.Count > 0) if (_pending.Count > 0)
{ {
var next = _pending.Dequeue(); flushedText = _pending.Dequeue();
await SendTurnAsync(next, CancellationToken.None); remainingSnapshot = SnapshotPending();
await SendTurnAsync(flushedText, CancellationToken.None);
} }
} }
finally finally
{ {
_sendLock.Release(); _sendLock.Release();
} }
if (flushedText is not null)
{
_onQueueChanged?.Invoke(remainingSnapshot!);
_onUserMessageSent?.Invoke(flushedText);
}
} }
public async Task SendUserMessageAsync(string text, CancellationToken ct) public async Task SendUserMessageAsync(string text, CancellationToken ct)
{ {
bool enqueued = false;
IReadOnlyList<string>? snapshot = null;
await _sendLock.WaitAsync(ct); await _sendLock.WaitAsync(ct);
try try
{ {
if (_isTurnInFlight || _pending.Count > 0) if (_isTurnInFlight || _pending.Count > 0)
{ {
_pending.Enqueue(text); _pending.Enqueue(text);
snapshot = SnapshotPending();
enqueued = true;
} }
else else
{ {
@@ -87,6 +111,11 @@ public sealed class StreamingClaudeSession : ILiveSession
{ {
_sendLock.Release(); _sendLock.Release();
} }
if (enqueued)
_onQueueChanged?.Invoke(snapshot!);
else
_onUserMessageSent?.Invoke(text);
} }
public async Task InterruptAsync(CancellationToken ct) public async Task InterruptAsync(CancellationToken ct)

View File

@@ -27,6 +27,8 @@ public abstract class StubWorkerClient : IWorkerClient
public event Action<string, string>? TaskQuestionResolvedEvent; public event Action<string, string>? TaskQuestionResolvedEvent;
public event Action<string>? InteractiveSessionStartedEvent; public event Action<string>? InteractiveSessionStartedEvent;
public event Action<string>? InteractiveSessionEndedEvent; public event Action<string>? InteractiveSessionEndedEvent;
public event Action<string, IReadOnlyList<string>>? InteractiveQueueChangedEvent;
public event Action<string, string>? InteractiveMessageSentEvent;
public event Action? PrepStartedEvent; public event Action? PrepStartedEvent;
public event Action<string>? PrepLineEvent; public event Action<string>? PrepLineEvent;
public event Action<bool>? PrepFinishedEvent; public event Action<bool>? PrepFinishedEvent;
@@ -59,6 +61,8 @@ public abstract class StubWorkerClient : IWorkerClient
public void RaiseInteractiveStarted(string taskId) => InteractiveSessionStartedEvent?.Invoke(taskId); public void RaiseInteractiveStarted(string taskId) => InteractiveSessionStartedEvent?.Invoke(taskId);
public void RaiseInteractiveEnded(string taskId) => InteractiveSessionEndedEvent?.Invoke(taskId); public void RaiseInteractiveEnded(string taskId) => InteractiveSessionEndedEvent?.Invoke(taskId);
public void RaiseInteractiveQueueChanged(string taskId, IReadOnlyList<string> pending) => InteractiveQueueChangedEvent?.Invoke(taskId, pending);
public void RaiseInteractiveMessageSent(string taskId, string text) => InteractiveMessageSentEvent?.Invoke(taskId, text);
public virtual bool IsConnected => false; public virtual bool IsConnected => false;
public virtual bool IsReconnecting => false; public virtual bool IsReconnecting => false;

View File

@@ -266,4 +266,141 @@ public sealed class StreamingClaudeSessionTests
await session.DisposeAsync(); await session.DisposeAsync();
} }
// ──────────────────────────────────────────────────────────────────────────
// Callback tests (onQueueChanged / onUserMessageSent)
// ──────────────────────────────────────────────────────────────────────────
private static StreamingClaudeSession BuildWithCallbacks(
FakeClaudeStreamTransport transport,
List<IReadOnlyList<string>> queueChanges,
List<string> sent)
{
return new StreamingClaudeSession(
transport,
line => Task.CompletedTask,
NullLogger<StreamingClaudeSession>.Instance,
onQueueChanged: snapshot => queueChanges.Add(snapshot),
onUserMessageSent: text => sent.Add(text));
}
[Fact]
public async Task Start_InvokesOnUserMessageSent_WithFirstPrompt()
{
var transport = new FakeClaudeStreamTransport();
var sent = new List<string>();
var session = BuildWithCallbacks(transport, [], sent);
await session.StartAsync([], "/tmp", "hello", CancellationToken.None);
Assert.Single(sent);
Assert.Equal("hello", sent[0]);
await session.DisposeAsync();
}
[Fact]
public async Task SendWhileInFlight_InvokesOnQueueChanged_NotOnUserMessageSent()
{
var transport = new FakeClaudeStreamTransport();
var queueChanges = new List<IReadOnlyList<string>>();
var sent = new List<string>();
var session = BuildWithCallbacks(transport, queueChanges, sent);
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
sent.Clear(); // ignore the initial prompt notification
await session.SendUserMessageAsync("queued-msg", CancellationToken.None);
Assert.Single(queueChanges);
Assert.Contains("queued-msg", queueChanges[0]);
Assert.DoesNotContain("queued-msg", sent);
await session.DisposeAsync();
}
[Fact]
public async Task PushResult_FlushesPending_InvokesQueueClearThenUserMessageSent()
{
var transport = new FakeClaudeStreamTransport();
var queueChanges = new List<IReadOnlyList<string>>();
var sent = new List<string>();
var session = BuildWithCallbacks(transport, queueChanges, sent);
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
sent.Clear();
await session.SendUserMessageAsync("queued-msg", CancellationToken.None);
queueChanges.Clear(); // ignore the enqueue snapshot
await transport.PushLineAsync(ResultLine());
// After flush: one queueChanged with empty list, then sent contains flushed text.
Assert.Single(queueChanges);
Assert.Empty(queueChanges[0]);
Assert.Single(sent);
Assert.Equal("queued-msg", sent[0]);
await session.DisposeAsync();
}
[Fact]
public async Task SendWhileIdle_InvokesOnUserMessageSent_NoQueueChanged()
{
var transport = new FakeClaudeStreamTransport();
var queueChanges = new List<IReadOnlyList<string>>();
var sent = new List<string>();
var session = BuildWithCallbacks(transport, queueChanges, sent);
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
await transport.PushLineAsync(ResultLine()); // go idle
sent.Clear();
queueChanges.Clear();
await session.SendUserMessageAsync("idle-msg", CancellationToken.None);
Assert.Empty(queueChanges);
Assert.Single(sent);
Assert.Equal("idle-msg", sent[0]);
await session.DisposeAsync();
}
[Fact]
public async Task TwoMessagesQueued_FlushFifo_QueueSnapshotsShrink()
{
var transport = new FakeClaudeStreamTransport();
var queueChanges = new List<IReadOnlyList<string>>();
var sent = new List<string>();
var session = BuildWithCallbacks(transport, queueChanges, sent);
await session.StartAsync([], "/tmp", "first", CancellationToken.None);
sent.Clear();
await session.SendUserMessageAsync("second", CancellationToken.None);
await session.SendUserMessageAsync("third", CancellationToken.None);
// queueChanges[0] = ["second"], queueChanges[1] = ["second","third"]
Assert.Equal(2, queueChanges.Count);
Assert.Equal(new[] { "second" }, queueChanges[0]);
Assert.Equal(new[] { "second", "third" }, queueChanges[1]);
queueChanges.Clear();
// Result 1 → flushes "second"; remaining queue = ["third"]
await transport.PushLineAsync(ResultLine());
Assert.Single(queueChanges);
Assert.Equal(new[] { "third" }, queueChanges[0]);
Assert.Single(sent);
Assert.Equal("second", sent[0]);
sent.Clear();
queueChanges.Clear();
// Result 2 → flushes "third"; remaining queue = []
await transport.PushLineAsync(ResultLine());
Assert.Single(queueChanges);
Assert.Empty(queueChanges[0]);
Assert.Single(sent);
Assert.Equal("third", sent[0]);
await session.DisposeAsync();
}
} }

View File

@@ -38,6 +38,8 @@ sealed class FakeWorkerClient : IWorkerClient
public event Action<string, string>? TaskQuestionResolvedEvent; public event Action<string, string>? TaskQuestionResolvedEvent;
public event Action<string>? InteractiveSessionStartedEvent; public event Action<string>? InteractiveSessionStartedEvent;
public event Action<string>? InteractiveSessionEndedEvent; public event Action<string>? InteractiveSessionEndedEvent;
public event Action<string, IReadOnlyList<string>>? InteractiveQueueChangedEvent;
public event Action<string, string>? InteractiveMessageSentEvent;
public void RaiseTaskUpdated(string taskId) => TaskUpdatedEvent?.Invoke(taskId); public void RaiseTaskUpdated(string taskId) => TaskUpdatedEvent?.Invoke(taskId);
public void RaiseWorktreeUpdated(string taskId) => WorktreeUpdatedEvent?.Invoke(taskId); public void RaiseWorktreeUpdated(string taskId) => WorktreeUpdatedEvent?.Invoke(taskId);
public void RaiseTaskMessage(string taskId, string line) => TaskMessageEvent?.Invoke(taskId, line); public void RaiseTaskMessage(string taskId, string line) => TaskMessageEvent?.Invoke(taskId, line);