feat(daily-prep): stream prep output via PrepStarted/PrepLine/PrepFinished

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-06-04 08:02:24 +02:00
parent 46f42a4d93
commit e48475d6cd
5 changed files with 70 additions and 7 deletions

View File

@@ -54,4 +54,12 @@ public sealed class HubBroadcaster : IPrimeBroadcaster
Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) =>
PrimeFired(scheduleId, success, message, firedAt); PrimeFired(scheduleId, success, message, firedAt);
public Task PrepStarted() => _hub.Clients.All.SendAsync("PrepStarted");
public Task PrepLine(string line) => _hub.Clients.All.SendAsync("PrepLine", line);
public Task PrepFinished(bool success) => _hub.Clients.All.SendAsync("PrepFinished", success);
Task IPrimeBroadcaster.PrepStartedAsync() => PrepStarted();
Task IPrimeBroadcaster.PrepLineAsync(string line) => PrepLine(line);
Task IPrimeBroadcaster.PrepFinishedAsync(bool success) => PrepFinished(success);
} }

View File

@@ -3,4 +3,7 @@ namespace ClaudeDo.Worker.Prime;
public interface IPrimeBroadcaster public interface IPrimeBroadcaster
{ {
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt); Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
Task PrepStartedAsync();
Task PrepLineAsync(string line);
Task PrepFinishedAsync(bool success);
} }

View File

@@ -14,18 +14,21 @@ public sealed class PrimeRunner : IPrimeRunner
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory; private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly IPrimeClock _clock; private readonly IPrimeClock _clock;
private readonly ILogger<PrimeRunner> _logger; private readonly ILogger<PrimeRunner> _logger;
private readonly IPrimeBroadcaster _broadcaster;
private readonly SemaphoreSlim _gate = new(1, 1); private readonly SemaphoreSlim _gate = new(1, 1);
public PrimeRunner( public PrimeRunner(
IClaudeProcess claude, IClaudeProcess claude,
IDbContextFactory<ClaudeDoDbContext> dbFactory, IDbContextFactory<ClaudeDoDbContext> dbFactory,
IPrimeClock clock, IPrimeClock clock,
ILogger<PrimeRunner> logger) ILogger<PrimeRunner> logger,
IPrimeBroadcaster broadcaster)
{ {
_claude = claude; _claude = claude;
_dbFactory = dbFactory; _dbFactory = dbFactory;
_clock = clock; _clock = clock;
_logger = logger; _logger = logger;
_broadcaster = broadcaster;
} }
public async Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto schedule, CancellationToken ct) public async Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
@@ -33,8 +36,11 @@ public sealed class PrimeRunner : IPrimeRunner
if (!await _gate.WaitAsync(0, ct)) if (!await _gate.WaitAsync(0, ct))
return new PrimeRunOutcome(false, "Daily prep already running"); return new PrimeRunOutcome(false, "Daily prep already running");
var success = false;
try try
{ {
await _broadcaster.PrepStartedAsync();
var cwd = Paths.AppDataRoot(); var cwd = Paths.AppDataRoot();
Directory.CreateDirectory(cwd); Directory.CreateDirectory(cwd);
@@ -56,10 +62,11 @@ public sealed class PrimeRunner : IPrimeRunner
arguments: args, arguments: args,
prompt: prompt, prompt: prompt,
workingDirectory: cwd, workingDirectory: cwd,
onStdoutLine: _ => Task.CompletedTask, onStdoutLine: line => _broadcaster.PrepLineAsync(line),
ct: timeoutCts.Token); ct: timeoutCts.Token);
return result.IsSuccess success = result.IsSuccess;
return success
? new PrimeRunOutcome(true, "Daily prep complete") ? new PrimeRunOutcome(true, "Daily prep complete")
: new PrimeRunOutcome(false, $"exit code {result.ExitCode}"); : new PrimeRunOutcome(false, $"exit code {result.ExitCode}");
} }
@@ -74,6 +81,7 @@ public sealed class PrimeRunner : IPrimeRunner
} }
finally finally
{ {
await _broadcaster.PrepFinishedAsync(success);
_gate.Release(); _gate.Release();
} }
} }

View File

@@ -19,11 +19,15 @@ public class PrimeRunnerTests : IDisposable
{ {
private readonly TimeSpan _delay; private readonly TimeSpan _delay;
private readonly int _exitCode; private readonly int _exitCode;
private readonly string[] _emitLines;
private readonly string? _result;
public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0) public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0, string[]? emitLines = null, string? result = null)
{ {
_delay = delay; _delay = delay;
_exitCode = exitCode; _exitCode = exitCode;
_emitLines = emitLines ?? [];
_result = result;
} }
public async Task<RunResult> RunAsync( public async Task<RunResult> RunAsync(
@@ -36,16 +40,36 @@ public class PrimeRunnerTests : IDisposable
if (_delay > TimeSpan.Zero) if (_delay > TimeSpan.Zero)
await Task.Delay(_delay, ct); await Task.Delay(_delay, ct);
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _exitCode == 0 ? "ok" : null }; foreach (var line in _emitLines)
await onStdoutLine(line);
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _result ?? (_exitCode == 0 ? "ok" : null) };
} }
} }
private sealed class RecordingPrimeBroadcaster : IPrimeBroadcaster
{
public int StartedCount { get; private set; }
public List<string> Lines { get; } = [];
public List<bool> FinishedResults { get; } = [];
public Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => Task.CompletedTask;
public Task PrepStartedAsync() { StartedCount++; return Task.CompletedTask; }
public Task PrepLineAsync(string line) { Lines.Add(line); return Task.CompletedTask; }
public Task PrepFinishedAsync(bool success) { FinishedResults.Add(success); return Task.CompletedTask; }
}
private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) => private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) =>
NewRunner(new FakeClaudeProcess(claudeDelay, exitCode), new RecordingPrimeBroadcaster());
private PrimeRunner NewRunner(FakeClaudeProcess claude, IPrimeBroadcaster broadcaster) =>
new PrimeRunner( new PrimeRunner(
new FakeClaudeProcess(claudeDelay, exitCode), claude,
_db.CreateFactory(), _db.CreateFactory(),
new FakeClock(), new FakeClock(),
NullLogger<PrimeRunner>.Instance); NullLogger<PrimeRunner>.Instance,
broadcaster);
private static PrimeScheduleDto DefaultSchedule() => private static PrimeScheduleDto DefaultSchedule() =>
new(Guid.Empty, 0, TimeSpan.Zero, true, null, null); new(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
@@ -83,4 +107,21 @@ public class PrimeRunnerTests : IDisposable
Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase); Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase);
await first; await first;
} }
[Fact]
public async Task FireAsync_streams_started_lines_and_finished()
{
var broadcaster = new RecordingPrimeBroadcaster();
var claude = new FakeClaudeProcess(emitLines: ["{\"a\":1}", "{\"b\":2}"], exitCode: 0, result: "ok");
var runner = NewRunner(claude, broadcaster);
var schedule = new PrimeScheduleDto(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
var outcome = await runner.FireAsync(schedule, CancellationToken.None);
Assert.True(outcome.Success);
Assert.Equal(1, broadcaster.StartedCount);
Assert.Equal(new[] { "{\"a\":1}", "{\"b\":2}" }, broadcaster.Lines);
Assert.Single(broadcaster.FinishedResults);
Assert.True(broadcaster.FinishedResults[0]);
}
} }

View File

@@ -34,6 +34,9 @@ public class PrimeSchedulerTests : IDisposable
Calls.Add((id, ok, msg)); Calls.Add((id, ok, msg));
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task PrepStartedAsync() => Task.CompletedTask;
public Task PrepLineAsync(string line) => Task.CompletedTask;
public Task PrepFinishedAsync(bool success) => Task.CompletedTask;
} }
[Fact] [Fact]