feat(worker): add StreamAnalyzer for rich NDJSON stream parsing

This commit is contained in:
Mika Kuns
2026-04-14 11:36:58 +02:00
parent dab461cc41
commit 8b342bcc21
3 changed files with 173 additions and 0 deletions

View File

@@ -0,0 +1,79 @@
using System.Text.Json;
namespace ClaudeDo.Worker.Runner;
public sealed class StreamAnalyzer
{
private string? _resultMarkdown;
private string? _structuredOutputJson;
private string? _sessionId;
private int _turnCount;
private int _tokensIn;
private int _tokensOut;
private int _apiRetryCount;
public void ProcessLine(string ndjsonLine)
{
if (string.IsNullOrWhiteSpace(ndjsonLine)) return;
try
{
using var doc = JsonDocument.Parse(ndjsonLine);
var root = doc.RootElement;
if (!root.TryGetProperty("type", out var typeProp)) return;
var type = typeProp.GetString();
switch (type)
{
case "result":
if (root.TryGetProperty("result", out var resultProp))
_resultMarkdown = resultProp.GetString();
if (root.TryGetProperty("structured_output", out var structuredProp))
_structuredOutputJson = structuredProp.ToString();
if (root.TryGetProperty("session_id", out var sessionProp))
_sessionId = sessionProp.GetString();
break;
case "assistant":
_turnCount++;
break;
case "system":
if (root.TryGetProperty("subtype", out var subtypeProp) &&
subtypeProp.GetString() == "api_retry")
_apiRetryCount++;
break;
case "stream_event":
TryAccumulateUsage(root);
break;
}
}
catch (JsonException) { /* Malformed JSON — skip */ }
}
public StreamResult GetResult() => new()
{
ResultMarkdown = _resultMarkdown,
StructuredOutputJson = _structuredOutputJson,
SessionId = _sessionId,
TurnCount = _turnCount,
TokensIn = _tokensIn,
TokensOut = _tokensOut,
ApiRetryCount = _apiRetryCount,
};
private void TryAccumulateUsage(JsonElement root)
{
if (!root.TryGetProperty("event", out var eventProp)) return;
if (eventProp.TryGetProperty("message", out var msgProp) &&
msgProp.TryGetProperty("usage", out var usageProp))
{
if (usageProp.TryGetProperty("input_tokens", out var inp))
_tokensIn += inp.GetInt32();
if (usageProp.TryGetProperty("output_tokens", out var outp))
_tokensOut += outp.GetInt32();
}
}
}

View File

@@ -0,0 +1,12 @@
namespace ClaudeDo.Worker.Runner;
public sealed class StreamResult
{
public string? ResultMarkdown { get; set; }
public string? StructuredOutputJson { get; set; }
public string? SessionId { get; set; }
public int TurnCount { get; set; }
public int TokensIn { get; set; }
public int TokensOut { get; set; }
public int ApiRetryCount { get; set; }
}

View File

@@ -0,0 +1,82 @@
using ClaudeDo.Worker.Runner;
namespace ClaudeDo.Worker.Tests.Runner;
public sealed class StreamAnalyzerTests
{
[Fact]
public void Extracts_Result_Markdown()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"result","result":"## Done","session_id":"sess-1"}""");
var result = analyzer.GetResult();
Assert.Equal("## Done", result.ResultMarkdown);
Assert.Equal("sess-1", result.SessionId);
}
[Fact]
public void Extracts_Structured_Output()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"result","result":"ok","structured_output":{"summary":"all good"},"session_id":"s1"}""");
var result = analyzer.GetResult();
Assert.Equal("ok", result.ResultMarkdown);
Assert.Contains("all good", result.StructuredOutputJson);
}
[Fact]
public void Counts_Assistant_Turns()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"assistant","message":"hi"}""");
analyzer.ProcessLine("""{"type":"assistant","message":"working on it"}""");
analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}""");
var result = analyzer.GetResult();
Assert.Equal(2, result.TurnCount);
}
[Fact]
public void Accumulates_Token_Usage()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"stream_event","event":{"type":"message_start","message":{"usage":{"input_tokens":100,"output_tokens":50}}}}""");
analyzer.ProcessLine("""{"type":"stream_event","event":{"type":"message_start","message":{"usage":{"input_tokens":200,"output_tokens":80}}}}""");
analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}""");
var result = analyzer.GetResult();
Assert.Equal(300, result.TokensIn);
Assert.Equal(130, result.TokensOut);
}
[Fact]
public void Counts_Api_Retry_Events()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"system","subtype":"api_retry","attempt":1,"error":"rate_limit"}""");
analyzer.ProcessLine("""{"type":"system","subtype":"api_retry","attempt":2,"error":"rate_limit"}""");
analyzer.ProcessLine("""{"type":"result","result":"done","session_id":"s1"}""");
var result = analyzer.GetResult();
Assert.Equal(2, result.ApiRetryCount);
}
[Fact]
public void Malformed_Json_Is_Ignored()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("not json {{{");
analyzer.ProcessLine("");
analyzer.ProcessLine(" ");
var result = analyzer.GetResult();
Assert.Null(result.ResultMarkdown);
Assert.Equal(0, result.TurnCount);
}
[Fact]
public void No_Result_Event_Returns_Null_Fields()
{
var analyzer = new StreamAnalyzer();
analyzer.ProcessLine("""{"type":"assistant","message":"hi"}""");
var result = analyzer.GetResult();
Assert.Null(result.ResultMarkdown);
Assert.Null(result.SessionId);
}
}