From 09e3e7e8b50934426d2edaf09a698772ad69aefc Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Tue, 28 Apr 2026 09:02:12 +0200 Subject: [PATCH] feat(worker): add PrimeScheduler hosted service Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Prime/IPrimeBroadcaster.cs | 6 + src/ClaudeDo.Worker/Prime/PrimeScheduler.cs | 111 +++++++++++++ .../Prime/PrimeSchedulerTests.cs | 153 ++++++++++++++++++ 3 files changed, 270 insertions(+) create mode 100644 src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs create mode 100644 src/ClaudeDo.Worker/Prime/PrimeScheduler.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs diff --git a/src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs b/src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs new file mode 100644 index 0000000..8be317a --- /dev/null +++ b/src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs @@ -0,0 +1,6 @@ +namespace ClaudeDo.Worker.Prime; + +public interface IPrimeBroadcaster +{ + Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt); +} diff --git a/src/ClaudeDo.Worker/Prime/PrimeScheduler.cs b/src/ClaudeDo.Worker/Prime/PrimeScheduler.cs new file mode 100644 index 0000000..c8eb5cb --- /dev/null +++ b/src/ClaudeDo.Worker/Prime/PrimeScheduler.cs @@ -0,0 +1,111 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Repositories; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Hosting; + +namespace ClaudeDo.Worker.Prime; + +public sealed class PrimeScheduler : BackgroundService +{ + private readonly IDbContextFactory _dbFactory; + private readonly IPrimeRunner _runner; + private readonly IPrimeClock _clock; + private readonly IPrimeScheduleSignal _signal; + private readonly IPrimeBroadcaster _broadcaster; + private readonly PrimeSchedulerOptions _options; + private readonly ILogger _logger; + + public PrimeScheduler( + IDbContextFactory dbFactory, + IPrimeRunner runner, + IPrimeClock clock, + IPrimeScheduleSignal signal, + IPrimeBroadcaster broadcaster, + PrimeSchedulerOptions options, + ILogger logger) + { + _dbFactory = dbFactory; + _runner = runner; + _clock = clock; + _signal = signal; + _broadcaster = broadcaster; + _options = options; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await TickAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "PrimeScheduler tick failed; backing off"); + await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); + } + } + } + + private async Task TickAsync(CancellationToken stoppingToken) + { + var schedules = await LoadAsync(stoppingToken); + var now = _clock.Now; + var due = NextDueCalculator.Compute(schedules, now, _options.CatchUpWindow); + + var signalToken = _signal.CurrentToken; + using var linked = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, signalToken); + + if (due is null) + { + try { await Task.Delay(TimeSpan.FromHours(1), linked.Token); } + catch (OperationCanceledException) { /* signal or shutdown */ } + return; + } + + var delay = due.FireImmediately ? TimeSpan.Zero : due.At - now; + if (delay > TimeSpan.Zero) + { + try { await Task.Delay(delay, linked.Token); } + catch (OperationCanceledException) + { + if (signalToken.IsCancellationRequested) return; + throw; + } + } + + await FireAsync(due.Schedule, stoppingToken); + } + + private async Task> LoadAsync(CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var rows = await new PrimeScheduleRepository(ctx).ListAsync(ct); + return rows.Select(ToDto).ToList(); + } + + private static PrimeScheduleDto ToDto(Data.Models.PrimeScheduleEntity e) => + new(e.Id, e.StartDate, e.EndDate, e.TimeOfDay, e.WorkdaysOnly, e.Enabled, e.LastRunAt, e.PromptOverride); + + private async Task FireAsync(PrimeScheduleDto schedule, CancellationToken ct) + { + var firedAt = _clock.Now; + var outcome = await _runner.FireAsync(schedule, ct); + + await using (var ctx = await _dbFactory.CreateDbContextAsync(ct)) + await new PrimeScheduleRepository(ctx).UpdateLastRunAsync(schedule.Id, firedAt, ct); + + await _broadcaster.PrimeFiredAsync(schedule.Id, outcome.Success, outcome.Message, firedAt); + + if (outcome.Success) + _logger.LogInformation("Prime fired {Id} at {When}", schedule.Id, firedAt); + else + _logger.LogWarning("Prime failed {Id}: {Msg}", schedule.Id, outcome.Message); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs b/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs new file mode 100644 index 0000000..39a9f53 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs @@ -0,0 +1,153 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Prime; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.Extensions.Logging.Abstractions; + +namespace ClaudeDo.Worker.Tests.Prime; + +public class PrimeSchedulerTests : IDisposable +{ + private readonly DbFixture _db = new(); + public void Dispose() => _db.Dispose(); + + private sealed class FakeClock : IPrimeClock + { + public DateTimeOffset Now { get; set; } + } + + private sealed class FakeRunner : IPrimeRunner + { + public List FiredIds { get; } = new(); + public Task FireAsync(PrimeScheduleDto s, CancellationToken ct) + { + FiredIds.Add(s.Id); + return Task.FromResult(new PrimeRunOutcome(true, "ok")); + } + } + + private sealed class FakeBroadcaster : IPrimeBroadcaster + { + public List<(Guid id, bool ok, string msg)> Calls { get; } = new(); + public Task PrimeFiredAsync(Guid id, bool ok, string msg, DateTimeOffset firedAt) + { + Calls.Add((id, ok, msg)); + return Task.CompletedTask; + } + } + + [Fact] + public async Task Fires_Schedule_When_Within_CatchUp_On_Startup() + { + var id = Guid.NewGuid(); + using (var ctx = _db.CreateContext()) + await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity + { + Id = id, + StartDate = new DateOnly(2026, 5, 5), + EndDate = new DateOnly(2026, 5, 5), + TimeOfDay = new TimeSpan(7, 0, 0), + WorkdaysOnly = false, + Enabled = true, + CreatedAt = DateTimeOffset.UtcNow, + }); + + var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 7, 10, 0, TimeSpan.FromHours(2)) }; + var runner = new FakeRunner(); + var broadcaster = new FakeBroadcaster(); + var signal = new PrimeScheduleSignal(); + var scheduler = new PrimeScheduler( + _db.CreateFactory(), runner, clock, signal, broadcaster, + PrimeSchedulerOptions.Default, NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + var run = scheduler.StartAsync(cts.Token); + + await WaitFor(() => runner.FiredIds.Count >= 1, TimeSpan.FromSeconds(3)); + + cts.Cancel(); + await scheduler.StopAsync(CancellationToken.None); + + Assert.Single(runner.FiredIds); + Assert.Equal(id, runner.FiredIds[0]); + Assert.Single(broadcaster.Calls); + Assert.True(broadcaster.Calls[0].ok); + + using var read = _db.CreateContext(); + var row = await new PrimeScheduleRepository(read).GetAsync(id); + Assert.NotNull(row!.LastRunAt); + } + + [Fact] + public async Task Does_Not_Fire_Past_CatchUp_Window() + { + var id = Guid.NewGuid(); + using (var ctx = _db.CreateContext()) + await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity + { + Id = id, + StartDate = new DateOnly(2026, 5, 5), + EndDate = new DateOnly(2026, 5, 5), + TimeOfDay = new TimeSpan(7, 0, 0), + WorkdaysOnly = false, + Enabled = true, + CreatedAt = DateTimeOffset.UtcNow, + }); + + var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 9, 0, 0, TimeSpan.FromHours(2)) }; + var runner = new FakeRunner(); + var signal = new PrimeScheduleSignal(); + var scheduler = new PrimeScheduler( + _db.CreateFactory(), runner, clock, signal, new FakeBroadcaster(), + PrimeSchedulerOptions.Default, NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + await scheduler.StartAsync(cts.Token); + await Task.Delay(200); + cts.Cancel(); + await scheduler.StopAsync(CancellationToken.None); + + Assert.Empty(runner.FiredIds); + } + + [Fact] + public async Task Signal_Recomputes_Mid_Wait() + { + var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 7, 10, 0, TimeSpan.FromHours(2)) }; + var runner = new FakeRunner(); + var signal = new PrimeScheduleSignal(); + var scheduler = new PrimeScheduler( + _db.CreateFactory(), runner, clock, signal, new FakeBroadcaster(), + PrimeSchedulerOptions.Default, NullLogger.Instance); + + using var cts = new CancellationTokenSource(); + await scheduler.StartAsync(cts.Token); + + var id = Guid.NewGuid(); + using (var ctx = _db.CreateContext()) + await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity + { + Id = id, + StartDate = new DateOnly(2026, 5, 5), + EndDate = new DateOnly(2026, 5, 5), + TimeOfDay = new TimeSpan(7, 0, 0), + WorkdaysOnly = false, + Enabled = true, + CreatedAt = DateTimeOffset.UtcNow, + }); + signal.Signal(); + + await WaitFor(() => runner.FiredIds.Count >= 1, TimeSpan.FromSeconds(3)); + cts.Cancel(); + await scheduler.StopAsync(CancellationToken.None); + + Assert.Single(runner.FiredIds); + } + + private static async Task WaitFor(Func cond, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (!cond() && DateTime.UtcNow < deadline) + await Task.Delay(20); + } +}