From 8b342bcc218fe317d02a6f2669c3020408be7202 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Tue, 14 Apr 2026 11:36:58 +0200 Subject: [PATCH] feat(worker): add StreamAnalyzer for rich NDJSON stream parsing --- src/ClaudeDo.Worker/Runner/StreamAnalyzer.cs | 79 ++++++++++++++++++ src/ClaudeDo.Worker/Runner/StreamResult.cs | 12 +++ .../Runner/StreamAnalyzerTests.cs | 82 +++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 src/ClaudeDo.Worker/Runner/StreamAnalyzer.cs create mode 100644 src/ClaudeDo.Worker/Runner/StreamResult.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Runner/StreamAnalyzerTests.cs diff --git a/src/ClaudeDo.Worker/Runner/StreamAnalyzer.cs b/src/ClaudeDo.Worker/Runner/StreamAnalyzer.cs new file mode 100644 index 0000000..f50d386 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/StreamAnalyzer.cs @@ -0,0 +1,79 @@ +using System.Text.Json; + +namespace ClaudeDo.Worker.Runner; + +public sealed class StreamAnalyzer +{ + private string? _resultMarkdown; + private string? _structuredOutputJson; + private string? _sessionId; + private int _turnCount; + private int _tokensIn; + private int _tokensOut; + private int _apiRetryCount; + + public void ProcessLine(string ndjsonLine) + { + if (string.IsNullOrWhiteSpace(ndjsonLine)) return; + + try + { + using var doc = JsonDocument.Parse(ndjsonLine); + var root = doc.RootElement; + + if (!root.TryGetProperty("type", out var typeProp)) return; + var type = typeProp.GetString(); + + switch (type) + { + case "result": + if (root.TryGetProperty("result", out var resultProp)) + _resultMarkdown = resultProp.GetString(); + if (root.TryGetProperty("structured_output", out var structuredProp)) + _structuredOutputJson = structuredProp.ToString(); + if (root.TryGetProperty("session_id", out var sessionProp)) + _sessionId = sessionProp.GetString(); + break; + + case "assistant": + _turnCount++; + break; + + case "system": + if (root.TryGetProperty("subtype", out var subtypeProp) && + subtypeProp.GetString() == "api_retry") + _apiRetryCount++; + break; + + case "stream_event": + TryAccumulateUsage(root); + break; + } + } + catch (JsonException) { /* Malformed JSON — skip */ } + } + + public StreamResult GetResult() => new() + { + ResultMarkdown = _resultMarkdown, + StructuredOutputJson = _structuredOutputJson, + SessionId = _sessionId, + TurnCount = _turnCount, + TokensIn = _tokensIn, + TokensOut = _tokensOut, + ApiRetryCount = _apiRetryCount, + }; + + private void TryAccumulateUsage(JsonElement root) + { + if (!root.TryGetProperty("event", out var eventProp)) return; + if (eventProp.TryGetProperty("message", out var msgProp) && + msgProp.TryGetProperty("usage", out var usageProp)) + { + if (usageProp.TryGetProperty("input_tokens", out var inp)) + _tokensIn += inp.GetInt32(); + if (usageProp.TryGetProperty("output_tokens", out var outp)) + _tokensOut += outp.GetInt32(); + } + } +} diff --git a/src/ClaudeDo.Worker/Runner/StreamResult.cs b/src/ClaudeDo.Worker/Runner/StreamResult.cs new file mode 100644 index 0000000..212e5ad --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/StreamResult.cs @@ -0,0 +1,12 @@ +namespace ClaudeDo.Worker.Runner; + +public sealed class StreamResult +{ + public string? ResultMarkdown { get; set; } + public string? StructuredOutputJson { get; set; } + public string? SessionId { get; set; } + public int TurnCount { get; set; } + public int TokensIn { get; set; } + public int TokensOut { get; set; } + public int ApiRetryCount { get; set; } +} diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StreamAnalyzerTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StreamAnalyzerTests.cs new file mode 100644 index 0000000..009df99 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Runner/StreamAnalyzerTests.cs @@ -0,0 +1,82 @@ +using ClaudeDo.Worker.Runner; + +namespace ClaudeDo.Worker.Tests.Runner; + +public sealed class StreamAnalyzerTests +{ + [Fact] + public void Extracts_Result_Markdown() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"result","result":"## Done","session_id":"sess-1"}"""); + var result = analyzer.GetResult(); + Assert.Equal("## Done", result.ResultMarkdown); + Assert.Equal("sess-1", result.SessionId); + } + + [Fact] + public void Extracts_Structured_Output() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"result","result":"ok","structured_output":{"summary":"all good"},"session_id":"s1"}"""); + var result = analyzer.GetResult(); + Assert.Equal("ok", result.ResultMarkdown); + Assert.Contains("all good", result.StructuredOutputJson); + } + + [Fact] + public void Counts_Assistant_Turns() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"assistant","message":"hi"}"""); + analyzer.ProcessLine("""{"type":"assistant","message":"working on it"}"""); + analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}"""); + var result = analyzer.GetResult(); + Assert.Equal(2, result.TurnCount); + } + + [Fact] + public void Accumulates_Token_Usage() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"stream_event","event":{"type":"message_start","message":{"usage":{"input_tokens":100,"output_tokens":50}}}}"""); + analyzer.ProcessLine("""{"type":"stream_event","event":{"type":"message_start","message":{"usage":{"input_tokens":200,"output_tokens":80}}}}"""); + analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}"""); + var result = analyzer.GetResult(); + Assert.Equal(300, result.TokensIn); + Assert.Equal(130, result.TokensOut); + } + + [Fact] + public void Counts_Api_Retry_Events() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"system","subtype":"api_retry","attempt":1,"error":"rate_limit"}"""); + analyzer.ProcessLine("""{"type":"system","subtype":"api_retry","attempt":2,"error":"rate_limit"}"""); + analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}"""); + var result = analyzer.GetResult(); + Assert.Equal(2, result.ApiRetryCount); + } + + [Fact] + public void Malformed_Json_Is_Ignored() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("not json {{{"); + analyzer.ProcessLine(""); + analyzer.ProcessLine(" "); + var result = analyzer.GetResult(); + Assert.Null(result.ResultMarkdown); + Assert.Equal(0, result.TurnCount); + } + + [Fact] + public void No_Result_Event_Returns_Null_Fields() + { + var analyzer = new StreamAnalyzer(); + analyzer.ProcessLine("""{"type":"assistant","message":"hi"}"""); + var result = analyzer.GetResult(); + Assert.Null(result.ResultMarkdown); + Assert.Null(result.SessionId); + } +}