feat(daily-prep): stream prep output via PrepStarted/PrepLine/PrepFinished

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-06-04 08:02:24 +02:00
parent 46f42a4d93
commit e48475d6cd
5 changed files with 70 additions and 7 deletions

View File

@@ -54,4 +54,12 @@ public sealed class HubBroadcaster : IPrimeBroadcaster
Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) =>
PrimeFired(scheduleId, success, message, firedAt);
public Task PrepStarted() => _hub.Clients.All.SendAsync("PrepStarted");
public Task PrepLine(string line) => _hub.Clients.All.SendAsync("PrepLine", line);
public Task PrepFinished(bool success) => _hub.Clients.All.SendAsync("PrepFinished", success);
Task IPrimeBroadcaster.PrepStartedAsync() => PrepStarted();
Task IPrimeBroadcaster.PrepLineAsync(string line) => PrepLine(line);
Task IPrimeBroadcaster.PrepFinishedAsync(bool success) => PrepFinished(success);
}

View File

@@ -3,4 +3,7 @@ namespace ClaudeDo.Worker.Prime;
public interface IPrimeBroadcaster
{
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
Task PrepStartedAsync();
Task PrepLineAsync(string line);
Task PrepFinishedAsync(bool success);
}

View File

@@ -14,18 +14,21 @@ public sealed class PrimeRunner : IPrimeRunner
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly IPrimeClock _clock;
private readonly ILogger<PrimeRunner> _logger;
private readonly IPrimeBroadcaster _broadcaster;
private readonly SemaphoreSlim _gate = new(1, 1);
public PrimeRunner(
IClaudeProcess claude,
IDbContextFactory<ClaudeDoDbContext> dbFactory,
IPrimeClock clock,
ILogger<PrimeRunner> logger)
ILogger<PrimeRunner> logger,
IPrimeBroadcaster broadcaster)
{
_claude = claude;
_dbFactory = dbFactory;
_clock = clock;
_logger = logger;
_broadcaster = broadcaster;
}
public async Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
@@ -33,8 +36,11 @@ public sealed class PrimeRunner : IPrimeRunner
if (!await _gate.WaitAsync(0, ct))
return new PrimeRunOutcome(false, "Daily prep already running");
var success = false;
try
{
await _broadcaster.PrepStartedAsync();
var cwd = Paths.AppDataRoot();
Directory.CreateDirectory(cwd);
@@ -56,10 +62,11 @@ public sealed class PrimeRunner : IPrimeRunner
arguments: args,
prompt: prompt,
workingDirectory: cwd,
onStdoutLine: _ => Task.CompletedTask,
onStdoutLine: line => _broadcaster.PrepLineAsync(line),
ct: timeoutCts.Token);
return result.IsSuccess
success = result.IsSuccess;
return success
? new PrimeRunOutcome(true, "Daily prep complete")
: new PrimeRunOutcome(false, $"exit code {result.ExitCode}");
}
@@ -74,6 +81,7 @@ public sealed class PrimeRunner : IPrimeRunner
}
finally
{
await _broadcaster.PrepFinishedAsync(success);
_gate.Release();
}
}

View File

@@ -19,11 +19,15 @@ public class PrimeRunnerTests : IDisposable
{
private readonly TimeSpan _delay;
private readonly int _exitCode;
private readonly string[] _emitLines;
private readonly string? _result;
public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0)
public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0, string[]? emitLines = null, string? result = null)
{
_delay = delay;
_exitCode = exitCode;
_emitLines = emitLines ?? [];
_result = result;
}
public async Task<RunResult> RunAsync(
@@ -36,16 +40,36 @@ public class PrimeRunnerTests : IDisposable
if (_delay > TimeSpan.Zero)
await Task.Delay(_delay, ct);
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _exitCode == 0 ? "ok" : null };
foreach (var line in _emitLines)
await onStdoutLine(line);
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _result ?? (_exitCode == 0 ? "ok" : null) };
}
}
private sealed class RecordingPrimeBroadcaster : IPrimeBroadcaster
{
public int StartedCount { get; private set; }
public List<string> Lines { get; } = [];
public List<bool> FinishedResults { get; } = [];
public Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => Task.CompletedTask;
public Task PrepStartedAsync() { StartedCount++; return Task.CompletedTask; }
public Task PrepLineAsync(string line) { Lines.Add(line); return Task.CompletedTask; }
public Task PrepFinishedAsync(bool success) { FinishedResults.Add(success); return Task.CompletedTask; }
}
private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) =>
NewRunner(new FakeClaudeProcess(claudeDelay, exitCode), new RecordingPrimeBroadcaster());
private PrimeRunner NewRunner(FakeClaudeProcess claude, IPrimeBroadcaster broadcaster) =>
new PrimeRunner(
new FakeClaudeProcess(claudeDelay, exitCode),
claude,
_db.CreateFactory(),
new FakeClock(),
NullLogger<PrimeRunner>.Instance);
NullLogger<PrimeRunner>.Instance,
broadcaster);
private static PrimeScheduleDto DefaultSchedule() =>
new(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
@@ -83,4 +107,21 @@ public class PrimeRunnerTests : IDisposable
Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase);
await first;
}
[Fact]
public async Task FireAsync_streams_started_lines_and_finished()
{
var broadcaster = new RecordingPrimeBroadcaster();
var claude = new FakeClaudeProcess(emitLines: ["{\"a\":1}", "{\"b\":2}"], exitCode: 0, result: "ok");
var runner = NewRunner(claude, broadcaster);
var schedule = new PrimeScheduleDto(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
var outcome = await runner.FireAsync(schedule, CancellationToken.None);
Assert.True(outcome.Success);
Assert.Equal(1, broadcaster.StartedCount);
Assert.Equal(new[] { "{\"a\":1}", "{\"b\":2}" }, broadcaster.Lines);
Assert.Single(broadcaster.FinishedResults);
Assert.True(broadcaster.FinishedResults[0]);
}
}

View File

@@ -34,6 +34,9 @@ public class PrimeSchedulerTests : IDisposable
Calls.Add((id, ok, msg));
return Task.CompletedTask;
}
public Task PrepStartedAsync() => Task.CompletedTask;
public Task PrepLineAsync(string line) => Task.CompletedTask;
public Task PrepFinishedAsync(bool success) => Task.CompletedTask;
}
[Fact]