From e48475d6cd649b89bb83c872d1de388d8543e41f Mon Sep 17 00:00:00 2001 From: mika kuns Date: Thu, 4 Jun 2026 08:02:24 +0200 Subject: [PATCH] feat(daily-prep): stream prep output via PrepStarted/PrepLine/PrepFinished Co-Authored-By: Claude Sonnet 4.6 --- src/ClaudeDo.Worker/Hub/HubBroadcaster.cs | 8 +++ .../Prime/Interfaces/IPrimeBroadcaster.cs | 3 ++ src/ClaudeDo.Worker/Prime/PrimeRunner.cs | 14 ++++-- .../Prime/PrimeRunnerTests.cs | 49 +++++++++++++++++-- .../Prime/PrimeSchedulerTests.cs | 3 ++ 5 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs index 0483fbc..467fae9 100644 --- a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs +++ b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs @@ -54,4 +54,12 @@ public sealed class HubBroadcaster : IPrimeBroadcaster Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => PrimeFired(scheduleId, success, message, firedAt); + + public Task PrepStarted() => _hub.Clients.All.SendAsync("PrepStarted"); + public Task PrepLine(string line) => _hub.Clients.All.SendAsync("PrepLine", line); + public Task PrepFinished(bool success) => _hub.Clients.All.SendAsync("PrepFinished", success); + + Task IPrimeBroadcaster.PrepStartedAsync() => PrepStarted(); + Task IPrimeBroadcaster.PrepLineAsync(string line) => PrepLine(line); + Task IPrimeBroadcaster.PrepFinishedAsync(bool success) => PrepFinished(success); } diff --git a/src/ClaudeDo.Worker/Prime/Interfaces/IPrimeBroadcaster.cs b/src/ClaudeDo.Worker/Prime/Interfaces/IPrimeBroadcaster.cs index 8be317a..548032d 100644 --- a/src/ClaudeDo.Worker/Prime/Interfaces/IPrimeBroadcaster.cs +++ b/src/ClaudeDo.Worker/Prime/Interfaces/IPrimeBroadcaster.cs @@ -3,4 +3,7 @@ namespace ClaudeDo.Worker.Prime; public interface IPrimeBroadcaster { Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt); + Task PrepStartedAsync(); + Task PrepLineAsync(string line); + Task PrepFinishedAsync(bool success); } diff --git a/src/ClaudeDo.Worker/Prime/PrimeRunner.cs b/src/ClaudeDo.Worker/Prime/PrimeRunner.cs index 41b0836..0577716 100644 --- a/src/ClaudeDo.Worker/Prime/PrimeRunner.cs +++ b/src/ClaudeDo.Worker/Prime/PrimeRunner.cs @@ -14,18 +14,21 @@ public sealed class PrimeRunner : IPrimeRunner private readonly IDbContextFactory _dbFactory; private readonly IPrimeClock _clock; private readonly ILogger _logger; + private readonly IPrimeBroadcaster _broadcaster; private readonly SemaphoreSlim _gate = new(1, 1); public PrimeRunner( IClaudeProcess claude, IDbContextFactory dbFactory, IPrimeClock clock, - ILogger logger) + ILogger logger, + IPrimeBroadcaster broadcaster) { _claude = claude; _dbFactory = dbFactory; _clock = clock; _logger = logger; + _broadcaster = broadcaster; } public async Task FireAsync(PrimeScheduleDto schedule, CancellationToken ct) @@ -33,8 +36,11 @@ public sealed class PrimeRunner : IPrimeRunner if (!await _gate.WaitAsync(0, ct)) return new PrimeRunOutcome(false, "Daily prep already running"); + var success = false; try { + await _broadcaster.PrepStartedAsync(); + var cwd = Paths.AppDataRoot(); Directory.CreateDirectory(cwd); @@ -56,10 +62,11 @@ public sealed class PrimeRunner : IPrimeRunner arguments: args, prompt: prompt, workingDirectory: cwd, - onStdoutLine: _ => Task.CompletedTask, + onStdoutLine: line => _broadcaster.PrepLineAsync(line), ct: timeoutCts.Token); - return result.IsSuccess + success = result.IsSuccess; + return success ? new PrimeRunOutcome(true, "Daily prep complete") : new PrimeRunOutcome(false, $"exit code {result.ExitCode}"); } @@ -74,6 +81,7 @@ public sealed class PrimeRunner : IPrimeRunner } finally { + await _broadcaster.PrepFinishedAsync(success); _gate.Release(); } } diff --git a/tests/ClaudeDo.Worker.Tests/Prime/PrimeRunnerTests.cs b/tests/ClaudeDo.Worker.Tests/Prime/PrimeRunnerTests.cs index 628cb83..d2af495 100644 --- a/tests/ClaudeDo.Worker.Tests/Prime/PrimeRunnerTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Prime/PrimeRunnerTests.cs @@ -19,11 +19,15 @@ public class PrimeRunnerTests : IDisposable { private readonly TimeSpan _delay; private readonly int _exitCode; + private readonly string[] _emitLines; + private readonly string? _result; - public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0) + public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0, string[]? emitLines = null, string? result = null) { _delay = delay; _exitCode = exitCode; + _emitLines = emitLines ?? []; + _result = result; } public async Task RunAsync( @@ -36,16 +40,36 @@ public class PrimeRunnerTests : IDisposable if (_delay > TimeSpan.Zero) await Task.Delay(_delay, ct); - return new RunResult { ExitCode = _exitCode, ResultMarkdown = _exitCode == 0 ? "ok" : null }; + foreach (var line in _emitLines) + await onStdoutLine(line); + + return new RunResult { ExitCode = _exitCode, ResultMarkdown = _result ?? (_exitCode == 0 ? "ok" : null) }; } } + private sealed class RecordingPrimeBroadcaster : IPrimeBroadcaster + { + public int StartedCount { get; private set; } + public List Lines { get; } = []; + public List FinishedResults { get; } = []; + + public Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => Task.CompletedTask; + + public Task PrepStartedAsync() { StartedCount++; return Task.CompletedTask; } + public Task PrepLineAsync(string line) { Lines.Add(line); return Task.CompletedTask; } + public Task PrepFinishedAsync(bool success) { FinishedResults.Add(success); return Task.CompletedTask; } + } + private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) => + NewRunner(new FakeClaudeProcess(claudeDelay, exitCode), new RecordingPrimeBroadcaster()); + + private PrimeRunner NewRunner(FakeClaudeProcess claude, IPrimeBroadcaster broadcaster) => new PrimeRunner( - new FakeClaudeProcess(claudeDelay, exitCode), + claude, _db.CreateFactory(), new FakeClock(), - NullLogger.Instance); + NullLogger.Instance, + broadcaster); private static PrimeScheduleDto DefaultSchedule() => new(Guid.Empty, 0, TimeSpan.Zero, true, null, null); @@ -83,4 +107,21 @@ public class PrimeRunnerTests : IDisposable Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase); await first; } + + [Fact] + public async Task FireAsync_streams_started_lines_and_finished() + { + var broadcaster = new RecordingPrimeBroadcaster(); + var claude = new FakeClaudeProcess(emitLines: ["{\"a\":1}", "{\"b\":2}"], exitCode: 0, result: "ok"); + var runner = NewRunner(claude, broadcaster); + var schedule = new PrimeScheduleDto(Guid.Empty, 0, TimeSpan.Zero, true, null, null); + + var outcome = await runner.FireAsync(schedule, CancellationToken.None); + + Assert.True(outcome.Success); + Assert.Equal(1, broadcaster.StartedCount); + Assert.Equal(new[] { "{\"a\":1}", "{\"b\":2}" }, broadcaster.Lines); + Assert.Single(broadcaster.FinishedResults); + Assert.True(broadcaster.FinishedResults[0]); + } } diff --git a/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs b/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs index de34a60..ee204e0 100644 --- a/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs @@ -34,6 +34,9 @@ public class PrimeSchedulerTests : IDisposable Calls.Add((id, ok, msg)); return Task.CompletedTask; } + public Task PrepStartedAsync() => Task.CompletedTask; + public Task PrepLineAsync(string line) => Task.CompletedTask; + public Task PrepFinishedAsync(bool success) => Task.CompletedTask; } [Fact]