feat(worker): add PrimeScheduler hosted service
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
6
src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs
Normal file
6
src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
namespace ClaudeDo.Worker.Prime;
|
||||||
|
|
||||||
|
public interface IPrimeBroadcaster
|
||||||
|
{
|
||||||
|
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
|
||||||
|
}
|
||||||
111
src/ClaudeDo.Worker/Prime/PrimeScheduler.cs
Normal file
111
src/ClaudeDo.Worker/Prime/PrimeScheduler.cs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Prime;
|
||||||
|
|
||||||
|
public sealed class PrimeScheduler : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly IPrimeRunner _runner;
|
||||||
|
private readonly IPrimeClock _clock;
|
||||||
|
private readonly IPrimeScheduleSignal _signal;
|
||||||
|
private readonly IPrimeBroadcaster _broadcaster;
|
||||||
|
private readonly PrimeSchedulerOptions _options;
|
||||||
|
private readonly ILogger<PrimeScheduler> _logger;
|
||||||
|
|
||||||
|
public PrimeScheduler(
|
||||||
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
IPrimeRunner runner,
|
||||||
|
IPrimeClock clock,
|
||||||
|
IPrimeScheduleSignal signal,
|
||||||
|
IPrimeBroadcaster broadcaster,
|
||||||
|
PrimeSchedulerOptions options,
|
||||||
|
ILogger<PrimeScheduler> logger)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_runner = runner;
|
||||||
|
_clock = clock;
|
||||||
|
_signal = signal;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
|
_options = options;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await TickAsync(stoppingToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "PrimeScheduler tick failed; backing off");
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task TickAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
var schedules = await LoadAsync(stoppingToken);
|
||||||
|
var now = _clock.Now;
|
||||||
|
var due = NextDueCalculator.Compute(schedules, now, _options.CatchUpWindow);
|
||||||
|
|
||||||
|
var signalToken = _signal.CurrentToken;
|
||||||
|
using var linked = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, signalToken);
|
||||||
|
|
||||||
|
if (due is null)
|
||||||
|
{
|
||||||
|
try { await Task.Delay(TimeSpan.FromHours(1), linked.Token); }
|
||||||
|
catch (OperationCanceledException) { /* signal or shutdown */ }
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var delay = due.FireImmediately ? TimeSpan.Zero : due.At - now;
|
||||||
|
if (delay > TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
try { await Task.Delay(delay, linked.Token); }
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
if (signalToken.IsCancellationRequested) return;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await FireAsync(due.Schedule, stoppingToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<IReadOnlyList<PrimeScheduleDto>> LoadAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var rows = await new PrimeScheduleRepository(ctx).ListAsync(ct);
|
||||||
|
return rows.Select(ToDto).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PrimeScheduleDto ToDto(Data.Models.PrimeScheduleEntity e) =>
|
||||||
|
new(e.Id, e.StartDate, e.EndDate, e.TimeOfDay, e.WorkdaysOnly, e.Enabled, e.LastRunAt, e.PromptOverride);
|
||||||
|
|
||||||
|
private async Task FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var firedAt = _clock.Now;
|
||||||
|
var outcome = await _runner.FireAsync(schedule, ct);
|
||||||
|
|
||||||
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||||
|
await new PrimeScheduleRepository(ctx).UpdateLastRunAsync(schedule.Id, firedAt, ct);
|
||||||
|
|
||||||
|
await _broadcaster.PrimeFiredAsync(schedule.Id, outcome.Success, outcome.Message, firedAt);
|
||||||
|
|
||||||
|
if (outcome.Success)
|
||||||
|
_logger.LogInformation("Prime fired {Id} at {When}", schedule.Id, firedAt);
|
||||||
|
else
|
||||||
|
_logger.LogWarning("Prime failed {Id}: {Msg}", schedule.Id, outcome.Message);
|
||||||
|
}
|
||||||
|
}
|
||||||
153
tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs
Normal file
153
tests/ClaudeDo.Worker.Tests/Prime/PrimeSchedulerTests.cs
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Prime;
|
||||||
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Prime;
|
||||||
|
|
||||||
|
public class PrimeSchedulerTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly DbFixture _db = new();
|
||||||
|
public void Dispose() => _db.Dispose();
|
||||||
|
|
||||||
|
private sealed class FakeClock : IPrimeClock
|
||||||
|
{
|
||||||
|
public DateTimeOffset Now { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeRunner : IPrimeRunner
|
||||||
|
{
|
||||||
|
public List<Guid> FiredIds { get; } = new();
|
||||||
|
public Task<PrimeRunOutcome> FireAsync(PrimeScheduleDto s, CancellationToken ct)
|
||||||
|
{
|
||||||
|
FiredIds.Add(s.Id);
|
||||||
|
return Task.FromResult(new PrimeRunOutcome(true, "ok"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeBroadcaster : IPrimeBroadcaster
|
||||||
|
{
|
||||||
|
public List<(Guid id, bool ok, string msg)> Calls { get; } = new();
|
||||||
|
public Task PrimeFiredAsync(Guid id, bool ok, string msg, DateTimeOffset firedAt)
|
||||||
|
{
|
||||||
|
Calls.Add((id, ok, msg));
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Fires_Schedule_When_Within_CatchUp_On_Startup()
|
||||||
|
{
|
||||||
|
var id = Guid.NewGuid();
|
||||||
|
using (var ctx = _db.CreateContext())
|
||||||
|
await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity
|
||||||
|
{
|
||||||
|
Id = id,
|
||||||
|
StartDate = new DateOnly(2026, 5, 5),
|
||||||
|
EndDate = new DateOnly(2026, 5, 5),
|
||||||
|
TimeOfDay = new TimeSpan(7, 0, 0),
|
||||||
|
WorkdaysOnly = false,
|
||||||
|
Enabled = true,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
});
|
||||||
|
|
||||||
|
var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 7, 10, 0, TimeSpan.FromHours(2)) };
|
||||||
|
var runner = new FakeRunner();
|
||||||
|
var broadcaster = new FakeBroadcaster();
|
||||||
|
var signal = new PrimeScheduleSignal();
|
||||||
|
var scheduler = new PrimeScheduler(
|
||||||
|
_db.CreateFactory(), runner, clock, signal, broadcaster,
|
||||||
|
PrimeSchedulerOptions.Default, NullLogger<PrimeScheduler>.Instance);
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
var run = scheduler.StartAsync(cts.Token);
|
||||||
|
|
||||||
|
await WaitFor(() => runner.FiredIds.Count >= 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
cts.Cancel();
|
||||||
|
await scheduler.StopAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Single(runner.FiredIds);
|
||||||
|
Assert.Equal(id, runner.FiredIds[0]);
|
||||||
|
Assert.Single(broadcaster.Calls);
|
||||||
|
Assert.True(broadcaster.Calls[0].ok);
|
||||||
|
|
||||||
|
using var read = _db.CreateContext();
|
||||||
|
var row = await new PrimeScheduleRepository(read).GetAsync(id);
|
||||||
|
Assert.NotNull(row!.LastRunAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Does_Not_Fire_Past_CatchUp_Window()
|
||||||
|
{
|
||||||
|
var id = Guid.NewGuid();
|
||||||
|
using (var ctx = _db.CreateContext())
|
||||||
|
await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity
|
||||||
|
{
|
||||||
|
Id = id,
|
||||||
|
StartDate = new DateOnly(2026, 5, 5),
|
||||||
|
EndDate = new DateOnly(2026, 5, 5),
|
||||||
|
TimeOfDay = new TimeSpan(7, 0, 0),
|
||||||
|
WorkdaysOnly = false,
|
||||||
|
Enabled = true,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
});
|
||||||
|
|
||||||
|
var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 9, 0, 0, TimeSpan.FromHours(2)) };
|
||||||
|
var runner = new FakeRunner();
|
||||||
|
var signal = new PrimeScheduleSignal();
|
||||||
|
var scheduler = new PrimeScheduler(
|
||||||
|
_db.CreateFactory(), runner, clock, signal, new FakeBroadcaster(),
|
||||||
|
PrimeSchedulerOptions.Default, NullLogger<PrimeScheduler>.Instance);
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
await scheduler.StartAsync(cts.Token);
|
||||||
|
await Task.Delay(200);
|
||||||
|
cts.Cancel();
|
||||||
|
await scheduler.StopAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Empty(runner.FiredIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Signal_Recomputes_Mid_Wait()
|
||||||
|
{
|
||||||
|
var clock = new FakeClock { Now = new DateTimeOffset(2026, 5, 5, 7, 10, 0, TimeSpan.FromHours(2)) };
|
||||||
|
var runner = new FakeRunner();
|
||||||
|
var signal = new PrimeScheduleSignal();
|
||||||
|
var scheduler = new PrimeScheduler(
|
||||||
|
_db.CreateFactory(), runner, clock, signal, new FakeBroadcaster(),
|
||||||
|
PrimeSchedulerOptions.Default, NullLogger<PrimeScheduler>.Instance);
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
await scheduler.StartAsync(cts.Token);
|
||||||
|
|
||||||
|
var id = Guid.NewGuid();
|
||||||
|
using (var ctx = _db.CreateContext())
|
||||||
|
await new PrimeScheduleRepository(ctx).UpsertAsync(new PrimeScheduleEntity
|
||||||
|
{
|
||||||
|
Id = id,
|
||||||
|
StartDate = new DateOnly(2026, 5, 5),
|
||||||
|
EndDate = new DateOnly(2026, 5, 5),
|
||||||
|
TimeOfDay = new TimeSpan(7, 0, 0),
|
||||||
|
WorkdaysOnly = false,
|
||||||
|
Enabled = true,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
});
|
||||||
|
signal.Signal();
|
||||||
|
|
||||||
|
await WaitFor(() => runner.FiredIds.Count >= 1, TimeSpan.FromSeconds(3));
|
||||||
|
cts.Cancel();
|
||||||
|
await scheduler.StopAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Single(runner.FiredIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitFor(Func<bool> cond, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (!cond() && DateTime.UtcNow < deadline)
|
||||||
|
await Task.Delay(20);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user