feat(daily-prep): stream prep output via PrepStarted/PrepLine/PrepFinished
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -54,4 +54,12 @@ public sealed class HubBroadcaster : IPrimeBroadcaster
|
|||||||
|
|
||||||
Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) =>
|
Task IPrimeBroadcaster.PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) =>
|
||||||
PrimeFired(scheduleId, success, message, firedAt);
|
PrimeFired(scheduleId, success, message, firedAt);
|
||||||
|
|
||||||
|
public Task PrepStarted() => _hub.Clients.All.SendAsync("PrepStarted");
|
||||||
|
public Task PrepLine(string line) => _hub.Clients.All.SendAsync("PrepLine", line);
|
||||||
|
public Task PrepFinished(bool success) => _hub.Clients.All.SendAsync("PrepFinished", success);
|
||||||
|
|
||||||
|
Task IPrimeBroadcaster.PrepStartedAsync() => PrepStarted();
|
||||||
|
Task IPrimeBroadcaster.PrepLineAsync(string line) => PrepLine(line);
|
||||||
|
Task IPrimeBroadcaster.PrepFinishedAsync(bool success) => PrepFinished(success);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,4 +3,7 @@ namespace ClaudeDo.Worker.Prime;
|
|||||||
public interface IPrimeBroadcaster
|
public interface IPrimeBroadcaster
|
||||||
{
|
{
|
||||||
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
|
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
|
||||||
|
Task PrepStartedAsync();
|
||||||
|
Task PrepLineAsync(string line);
|
||||||
|
Task PrepFinishedAsync(bool success);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,18 +14,21 @@ public sealed class PrimeRunner : IPrimeRunner
|
|||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
private readonly IPrimeClock _clock;
|
private readonly IPrimeClock _clock;
|
||||||
private readonly ILogger<PrimeRunner> _logger;
|
private readonly ILogger<PrimeRunner> _logger;
|
||||||
|
private readonly IPrimeBroadcaster _broadcaster;
|
||||||
private readonly SemaphoreSlim _gate = new(1, 1);
|
private readonly SemaphoreSlim _gate = new(1, 1);
|
||||||
|
|
||||||
public PrimeRunner(
|
public PrimeRunner(
|
||||||
IClaudeProcess claude,
|
IClaudeProcess claude,
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
IPrimeClock clock,
|
IPrimeClock clock,
|
||||||
ILogger<PrimeRunner> logger)
|
ILogger<PrimeRunner> logger,
|
||||||
|
IPrimeBroadcaster broadcaster)
|
||||||
{
|
{
|
||||||
_claude = claude;
|
_claude = claude;
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_clock = clock;
|
_clock = clock;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
|
public async Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
|
||||||
@@ -33,8 +36,11 @@ public sealed class PrimeRunner : IPrimeRunner
|
|||||||
if (!await _gate.WaitAsync(0, ct))
|
if (!await _gate.WaitAsync(0, ct))
|
||||||
return new PrimeRunOutcome(false, "Daily prep already running");
|
return new PrimeRunOutcome(false, "Daily prep already running");
|
||||||
|
|
||||||
|
var success = false;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
await _broadcaster.PrepStartedAsync();
|
||||||
|
|
||||||
var cwd = Paths.AppDataRoot();
|
var cwd = Paths.AppDataRoot();
|
||||||
Directory.CreateDirectory(cwd);
|
Directory.CreateDirectory(cwd);
|
||||||
|
|
||||||
@@ -56,10 +62,11 @@ public sealed class PrimeRunner : IPrimeRunner
|
|||||||
arguments: args,
|
arguments: args,
|
||||||
prompt: prompt,
|
prompt: prompt,
|
||||||
workingDirectory: cwd,
|
workingDirectory: cwd,
|
||||||
onStdoutLine: _ => Task.CompletedTask,
|
onStdoutLine: line => _broadcaster.PrepLineAsync(line),
|
||||||
ct: timeoutCts.Token);
|
ct: timeoutCts.Token);
|
||||||
|
|
||||||
return result.IsSuccess
|
success = result.IsSuccess;
|
||||||
|
return success
|
||||||
? new PrimeRunOutcome(true, "Daily prep complete")
|
? new PrimeRunOutcome(true, "Daily prep complete")
|
||||||
: new PrimeRunOutcome(false, $"exit code {result.ExitCode}");
|
: new PrimeRunOutcome(false, $"exit code {result.ExitCode}");
|
||||||
}
|
}
|
||||||
@@ -74,6 +81,7 @@ public sealed class PrimeRunner : IPrimeRunner
|
|||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
await _broadcaster.PrepFinishedAsync(success);
|
||||||
_gate.Release();
|
_gate.Release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,11 +19,15 @@ public class PrimeRunnerTests : IDisposable
|
|||||||
{
|
{
|
||||||
private readonly TimeSpan _delay;
|
private readonly TimeSpan _delay;
|
||||||
private readonly int _exitCode;
|
private readonly int _exitCode;
|
||||||
|
private readonly string[] _emitLines;
|
||||||
|
private readonly string? _result;
|
||||||
|
|
||||||
public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0)
|
public FakeClaudeProcess(TimeSpan delay = default, int exitCode = 0, string[]? emitLines = null, string? result = null)
|
||||||
{
|
{
|
||||||
_delay = delay;
|
_delay = delay;
|
||||||
_exitCode = exitCode;
|
_exitCode = exitCode;
|
||||||
|
_emitLines = emitLines ?? [];
|
||||||
|
_result = result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<RunResult> RunAsync(
|
public async Task<RunResult> RunAsync(
|
||||||
@@ -36,16 +40,36 @@ public class PrimeRunnerTests : IDisposable
|
|||||||
if (_delay > TimeSpan.Zero)
|
if (_delay > TimeSpan.Zero)
|
||||||
await Task.Delay(_delay, ct);
|
await Task.Delay(_delay, ct);
|
||||||
|
|
||||||
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _exitCode == 0 ? "ok" : null };
|
foreach (var line in _emitLines)
|
||||||
|
await onStdoutLine(line);
|
||||||
|
|
||||||
|
return new RunResult { ExitCode = _exitCode, ResultMarkdown = _result ?? (_exitCode == 0 ? "ok" : null) };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sealed class RecordingPrimeBroadcaster : IPrimeBroadcaster
|
||||||
|
{
|
||||||
|
public int StartedCount { get; private set; }
|
||||||
|
public List<string> Lines { get; } = [];
|
||||||
|
public List<bool> FinishedResults { get; } = [];
|
||||||
|
|
||||||
|
public Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task PrepStartedAsync() { StartedCount++; return Task.CompletedTask; }
|
||||||
|
public Task PrepLineAsync(string line) { Lines.Add(line); return Task.CompletedTask; }
|
||||||
|
public Task PrepFinishedAsync(bool success) { FinishedResults.Add(success); return Task.CompletedTask; }
|
||||||
|
}
|
||||||
|
|
||||||
private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) =>
|
private PrimeRunner NewRunner(TimeSpan claudeDelay = default, int exitCode = 0) =>
|
||||||
|
NewRunner(new FakeClaudeProcess(claudeDelay, exitCode), new RecordingPrimeBroadcaster());
|
||||||
|
|
||||||
|
private PrimeRunner NewRunner(FakeClaudeProcess claude, IPrimeBroadcaster broadcaster) =>
|
||||||
new PrimeRunner(
|
new PrimeRunner(
|
||||||
new FakeClaudeProcess(claudeDelay, exitCode),
|
claude,
|
||||||
_db.CreateFactory(),
|
_db.CreateFactory(),
|
||||||
new FakeClock(),
|
new FakeClock(),
|
||||||
NullLogger<PrimeRunner>.Instance);
|
NullLogger<PrimeRunner>.Instance,
|
||||||
|
broadcaster);
|
||||||
|
|
||||||
private static PrimeScheduleDto DefaultSchedule() =>
|
private static PrimeScheduleDto DefaultSchedule() =>
|
||||||
new(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
|
new(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
|
||||||
@@ -83,4 +107,21 @@ public class PrimeRunnerTests : IDisposable
|
|||||||
Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase);
|
Assert.Contains("already running", second.Message, StringComparison.OrdinalIgnoreCase);
|
||||||
await first;
|
await first;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FireAsync_streams_started_lines_and_finished()
|
||||||
|
{
|
||||||
|
var broadcaster = new RecordingPrimeBroadcaster();
|
||||||
|
var claude = new FakeClaudeProcess(emitLines: ["{\"a\":1}", "{\"b\":2}"], exitCode: 0, result: "ok");
|
||||||
|
var runner = NewRunner(claude, broadcaster);
|
||||||
|
var schedule = new PrimeScheduleDto(Guid.Empty, 0, TimeSpan.Zero, true, null, null);
|
||||||
|
|
||||||
|
var outcome = await runner.FireAsync(schedule, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.True(outcome.Success);
|
||||||
|
Assert.Equal(1, broadcaster.StartedCount);
|
||||||
|
Assert.Equal(new[] { "{\"a\":1}", "{\"b\":2}" }, broadcaster.Lines);
|
||||||
|
Assert.Single(broadcaster.FinishedResults);
|
||||||
|
Assert.True(broadcaster.FinishedResults[0]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,9 @@ public class PrimeSchedulerTests : IDisposable
|
|||||||
Calls.Add((id, ok, msg));
|
Calls.Add((id, ok, msg));
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
public Task PrepStartedAsync() => Task.CompletedTask;
|
||||||
|
public Task PrepLineAsync(string line) => Task.CompletedTask;
|
||||||
|
public Task PrepFinishedAsync(bool success) => Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
Reference in New Issue
Block a user