using System.Text.Json; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Tests.Infrastructure; using Microsoft.Extensions.Logging.Abstractions; namespace ClaudeDo.Worker.Tests.Runner; public sealed class StreamingClaudeSessionTests { private static StreamingClaudeSession Build( FakeClaudeStreamTransport transport, List received) { return new StreamingClaudeSession( transport, line => { received.Add(line); return Task.CompletedTask; }, NullLogger.Instance); } private static string ResultLine(bool isError = false, string subtype = "success") => JsonSerializer.Serialize(new { type = "result", is_error = isError, subtype }); private static string UserMessageLine(string text) => JsonSerializer.Serialize(new { type = "user", message = new { role = "user", content = new[] { new { type = "text", text } } }, parent_tool_use_id = (string?)null }); // ---- Start sends first prompt as user-message, IsTurnInFlight = true ---- [Fact] public async Task Start_SendsFirstPromptAsUserMessage_AndTurnIsInFlight() { var transport = new FakeClaudeStreamTransport(); var received = new List(); var session = Build(transport, received); await session.StartAsync([], "/tmp", "hello world", CancellationToken.None); Assert.True(session.IsTurnInFlight); Assert.Single(transport.Written); using var doc = JsonDocument.Parse(transport.Written[0]); var root = doc.RootElement; Assert.Equal("user", root.GetProperty("type").GetString()); var text = root.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); Assert.Equal("hello world", text); await session.DisposeAsync(); } // ---- Pushing a result line flips IsTurnInFlight to false ---- [Fact] public async Task PushingResultLine_FlipsIsTurnInFlightToFalse() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); Assert.True(session.IsTurnInFlight); await transport.PushLineAsync(ResultLine()); Assert.False(session.IsTurnInFlight); await session.DisposeAsync(); } // ---- Sending while in-flight queues the message; no interrupt written ---- [Fact] public async Task SendWhileInFlight_QueuesMessage_NoInterrupt() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); // Written[0] = first user message. Turn is in flight. Assert.True(session.IsTurnInFlight); var countBefore = transport.Written.Count; await session.SendUserMessageAsync("second", CancellationToken.None); // Nothing extra written yet — message is queued, no interrupt issued. Assert.Equal(countBefore, transport.Written.Count); Assert.True(session.IsTurnInFlight); await session.DisposeAsync(); } // ---- Queued message flushes automatically when result arrives ---- [Fact] public async Task QueuedMessage_FlushesOnResult() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await session.SendUserMessageAsync("second", CancellationToken.None); // Push result — should dequeue "second" and send it. await transport.PushLineAsync(ResultLine()); // After flush: IsTurnInFlight is true again for the second turn. Assert.True(session.IsTurnInFlight); // Written[0] = "first", Written[1] = "second" user message. Assert.Equal(2, transport.Written.Count); using var doc = JsonDocument.Parse(transport.Written[1]); Assert.Equal("user", doc.RootElement.GetProperty("type").GetString()); var text = doc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); Assert.Equal("second", text); await session.DisposeAsync(); } // ---- Interrupt writes control_request when in-flight ---- [Fact] public async Task Interrupt_WritesControlRequest_WhenInFlight() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await session.SendUserMessageAsync("second", CancellationToken.None); // queued await session.InterruptAsync(CancellationToken.None); // Written[0] = first user message, Written[1] = interrupt control_request. Assert.True(transport.Written.Count >= 2); using var interruptDoc = JsonDocument.Parse(transport.Written[1]); Assert.Equal("control_request", interruptDoc.RootElement.GetProperty("type").GetString()); Assert.Equal("interrupt", interruptDoc.RootElement.GetProperty("request").GetProperty("subtype").GetString()); // Now push result — queued "second" must flush. await transport.PushLineAsync(ResultLine()); Assert.True(session.IsTurnInFlight); using var userDoc = JsonDocument.Parse(transport.Written[^1]); Assert.Equal("user", userDoc.RootElement.GetProperty("type").GetString()); var text = userDoc.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString(); Assert.Equal("second", text); await session.DisposeAsync(); } // ---- Interrupt is a no-op when idle ---- [Fact] public async Task Interrupt_NoOp_WhenIdle() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await transport.PushLineAsync(ResultLine()); // idle now Assert.False(session.IsTurnInFlight); var countBefore = transport.Written.Count; await session.InterruptAsync(CancellationToken.None); Assert.Equal(countBefore, transport.Written.Count); await session.DisposeAsync(); } // ---- Sending while idle writes user message with no interrupt ---- [Fact] public async Task SendWhileIdle_WritesUserMessageWithNoInterrupt() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await transport.PushLineAsync(ResultLine()); // end the turn → idle Assert.False(session.IsTurnInFlight); var countBefore = transport.Written.Count; await session.SendUserMessageAsync("second", CancellationToken.None); // Exactly one new write, no interrupt. Assert.Equal(countBefore + 1, transport.Written.Count); using var doc = JsonDocument.Parse(transport.Written[^1]); Assert.Equal("user", doc.RootElement.GetProperty("type").GetString()); await session.DisposeAsync(); } // ---- Result with is_error / error_during_execution still ends the turn ---- [Fact] public async Task ResultWithIsError_StillEndsTurn_NoThrow() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); Assert.True(session.IsTurnInFlight); await transport.PushLineAsync(ResultLine(isError: true, subtype: "error_during_execution")); Assert.False(session.IsTurnInFlight); await session.DisposeAsync(); } // ---- onLine receives every pushed stdout line ---- [Fact] public async Task OnLine_ReceivesEveryPushedLine() { var transport = new FakeClaudeStreamTransport(); var received = new List(); var session = Build(transport, received); await session.StartAsync([], "/tmp", "prompt", CancellationToken.None); var lines = new[] { "{\"type\":\"assistant\"}", "{\"type\":\"stream_event\"}", ResultLine() }; foreach (var l in lines) await transport.PushLineAsync(l); Assert.Equal(lines, received); await session.DisposeAsync(); } // ---- Multiple queued messages flush one-per-result in FIFO order ---- [Fact] public async Task MultipleQueued_FlushInFifoOrder() { var transport = new FakeClaudeStreamTransport(); var session = Build(transport, []); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await session.SendUserMessageAsync("second", CancellationToken.None); await session.SendUserMessageAsync("third", CancellationToken.None); // Both "second" and "third" are queued; nothing extra written yet. Assert.Single(transport.Written); // Result 1 → flushes "second". await transport.PushLineAsync(ResultLine()); Assert.Equal(2, transport.Written.Count); using var doc2 = JsonDocument.Parse(transport.Written[1]); Assert.Equal("second", doc2.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString()); Assert.True(session.IsTurnInFlight); // Result 2 → flushes "third". await transport.PushLineAsync(ResultLine()); Assert.Equal(3, transport.Written.Count); using var doc3 = JsonDocument.Parse(transport.Written[2]); Assert.Equal("third", doc3.RootElement.GetProperty("message").GetProperty("content")[0].GetProperty("text").GetString()); Assert.True(session.IsTurnInFlight); // Result 3 → queue empty, idle. await transport.PushLineAsync(ResultLine()); Assert.False(session.IsTurnInFlight); await session.DisposeAsync(); } // ────────────────────────────────────────────────────────────────────────── // Callback tests (onQueueChanged / onUserMessageSent) // ────────────────────────────────────────────────────────────────────────── private static StreamingClaudeSession BuildWithCallbacks( FakeClaudeStreamTransport transport, List> queueChanges, List sent) { return new StreamingClaudeSession( transport, line => Task.CompletedTask, NullLogger.Instance, onQueueChanged: snapshot => queueChanges.Add(snapshot), onUserMessageSent: text => sent.Add(text)); } [Fact] public async Task Start_InvokesOnUserMessageSent_WithFirstPrompt() { var transport = new FakeClaudeStreamTransport(); var sent = new List(); var session = BuildWithCallbacks(transport, [], sent); await session.StartAsync([], "/tmp", "hello", CancellationToken.None); Assert.Single(sent); Assert.Equal("hello", sent[0]); await session.DisposeAsync(); } [Fact] public async Task SendWhileInFlight_InvokesOnQueueChanged_NotOnUserMessageSent() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); sent.Clear(); // ignore the initial prompt notification await session.SendUserMessageAsync("queued-msg", CancellationToken.None); Assert.Single(queueChanges); Assert.Contains("queued-msg", queueChanges[0]); Assert.DoesNotContain("queued-msg", sent); await session.DisposeAsync(); } [Fact] public async Task PushResult_FlushesPending_InvokesQueueClearThenUserMessageSent() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); sent.Clear(); await session.SendUserMessageAsync("queued-msg", CancellationToken.None); queueChanges.Clear(); // ignore the enqueue snapshot await transport.PushLineAsync(ResultLine()); // After flush: one queueChanged with empty list, then sent contains flushed text. Assert.Single(queueChanges); Assert.Empty(queueChanges[0]); Assert.Single(sent); Assert.Equal("queued-msg", sent[0]); await session.DisposeAsync(); } [Fact] public async Task SendWhileIdle_InvokesOnUserMessageSent_NoQueueChanged() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await transport.PushLineAsync(ResultLine()); // go idle sent.Clear(); queueChanges.Clear(); await session.SendUserMessageAsync("idle-msg", CancellationToken.None); Assert.Empty(queueChanges); Assert.Single(sent); Assert.Equal("idle-msg", sent[0]); await session.DisposeAsync(); } [Fact] public async Task TwoMessagesQueued_FlushFifo_QueueSnapshotsShrink() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); sent.Clear(); await session.SendUserMessageAsync("second", CancellationToken.None); await session.SendUserMessageAsync("third", CancellationToken.None); // queueChanges[0] = ["second"], queueChanges[1] = ["second","third"] Assert.Equal(2, queueChanges.Count); Assert.Equal(new[] { "second" }, queueChanges[0]); Assert.Equal(new[] { "second", "third" }, queueChanges[1]); queueChanges.Clear(); // Result 1 → flushes "second"; remaining queue = ["third"] await transport.PushLineAsync(ResultLine()); Assert.Single(queueChanges); Assert.Equal(new[] { "third" }, queueChanges[0]); Assert.Single(sent); Assert.Equal("second", sent[0]); sent.Clear(); queueChanges.Clear(); // Result 2 → flushes "third"; remaining queue = [] await transport.PushLineAsync(ResultLine()); Assert.Single(queueChanges); Assert.Empty(queueChanges[0]); Assert.Single(sent); Assert.Equal("third", sent[0]); await session.DisposeAsync(); } // ────────────────────────────────────────────────────────────────────────── // RemoveQueuedAsync tests // ────────────────────────────────────────────────────────────────────────── [Fact] public async Task RemoveQueued_RemovesFirstOccurrence_SnapshotContainsOnlySecond_AndSecondDeliveredOnResult() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); sent.Clear(); // Enqueue two messages while turn is in flight. await session.SendUserMessageAsync("alpha", CancellationToken.None); await session.SendUserMessageAsync("beta", CancellationToken.None); queueChanges.Clear(); // Remove "alpha" from the queue. await session.RemoveQueuedAsync("alpha", CancellationToken.None); // Snapshot emitted and contains only "beta". Assert.Single(queueChanges); Assert.Equal(new[] { "beta" }, queueChanges[0]); // Push result → only "beta" is flushed, not "alpha". sent.Clear(); queueChanges.Clear(); await transport.PushLineAsync(ResultLine()); Assert.Single(sent); Assert.Equal("beta", sent[0]); // Queue now empty; next result leaves us idle. await transport.PushLineAsync(ResultLine()); Assert.False(session.IsTurnInFlight); await session.DisposeAsync(); } [Fact] public async Task RemoveQueued_NotFound_NoQueueChangedCallback() { var transport = new FakeClaudeStreamTransport(); var queueChanges = new List>(); var sent = new List(); var session = BuildWithCallbacks(transport, queueChanges, sent); await session.StartAsync([], "/tmp", "first", CancellationToken.None); await session.SendUserMessageAsync("alpha", CancellationToken.None); queueChanges.Clear(); // Try to remove a message that is not in the queue. await session.RemoveQueuedAsync("nope", CancellationToken.None); // No new snapshot emitted. Assert.Empty(queueChanges); await session.DisposeAsync(); } }