feat(worker): add PrimeScheduler hosted service
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
6
src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs
Normal file
6
src/ClaudeDo.Worker/Prime/IPrimeBroadcaster.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace ClaudeDo.Worker.Prime;
|
||||
|
||||
public interface IPrimeBroadcaster
|
||||
{
|
||||
Task PrimeFiredAsync(Guid scheduleId, bool success, string message, DateTimeOffset firedAt);
|
||||
}
|
||||
111
src/ClaudeDo.Worker/Prime/PrimeScheduler.cs
Normal file
111
src/ClaudeDo.Worker/Prime/PrimeScheduler.cs
Normal file
@@ -0,0 +1,111 @@
|
||||
using ClaudeDo.Data;
|
||||
using ClaudeDo.Data.Repositories;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
namespace ClaudeDo.Worker.Prime;
|
||||
|
||||
public sealed class PrimeScheduler : BackgroundService
|
||||
{
|
||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||
private readonly IPrimeRunner _runner;
|
||||
private readonly IPrimeClock _clock;
|
||||
private readonly IPrimeScheduleSignal _signal;
|
||||
private readonly IPrimeBroadcaster _broadcaster;
|
||||
private readonly PrimeSchedulerOptions _options;
|
||||
private readonly ILogger<PrimeScheduler> _logger;
|
||||
|
||||
public PrimeScheduler(
|
||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||
IPrimeRunner runner,
|
||||
IPrimeClock clock,
|
||||
IPrimeScheduleSignal signal,
|
||||
IPrimeBroadcaster broadcaster,
|
||||
PrimeSchedulerOptions options,
|
||||
ILogger<PrimeScheduler> logger)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_runner = runner;
|
||||
_clock = clock;
|
||||
_signal = signal;
|
||||
_broadcaster = broadcaster;
|
||||
_options = options;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await TickAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "PrimeScheduler tick failed; backing off");
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TickAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var schedules = await LoadAsync(stoppingToken);
|
||||
var now = _clock.Now;
|
||||
var due = NextDueCalculator.Compute(schedules, now, _options.CatchUpWindow);
|
||||
|
||||
var signalToken = _signal.CurrentToken;
|
||||
using var linked = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, signalToken);
|
||||
|
||||
if (due is null)
|
||||
{
|
||||
try { await Task.Delay(TimeSpan.FromHours(1), linked.Token); }
|
||||
catch (OperationCanceledException) { /* signal or shutdown */ }
|
||||
return;
|
||||
}
|
||||
|
||||
var delay = due.FireImmediately ? TimeSpan.Zero : due.At - now;
|
||||
if (delay > TimeSpan.Zero)
|
||||
{
|
||||
try { await Task.Delay(delay, linked.Token); }
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
if (signalToken.IsCancellationRequested) return;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
await FireAsync(due.Schedule, stoppingToken);
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<PrimeScheduleDto>> LoadAsync(CancellationToken ct)
|
||||
{
|
||||
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||
var rows = await new PrimeScheduleRepository(ctx).ListAsync(ct);
|
||||
return rows.Select(ToDto).ToList();
|
||||
}
|
||||
|
||||
private static PrimeScheduleDto ToDto(Data.Models.PrimeScheduleEntity e) =>
|
||||
new(e.Id, e.StartDate, e.EndDate, e.TimeOfDay, e.WorkdaysOnly, e.Enabled, e.LastRunAt, e.PromptOverride);
|
||||
|
||||
private async Task FireAsync(PrimeScheduleDto schedule, CancellationToken ct)
|
||||
{
|
||||
var firedAt = _clock.Now;
|
||||
var outcome = await _runner.FireAsync(schedule, ct);
|
||||
|
||||
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||
await new PrimeScheduleRepository(ctx).UpdateLastRunAsync(schedule.Id, firedAt, ct);
|
||||
|
||||
await _broadcaster.PrimeFiredAsync(schedule.Id, outcome.Success, outcome.Message, firedAt);
|
||||
|
||||
if (outcome.Success)
|
||||
_logger.LogInformation("Prime fired {Id} at {When}", schedule.Id, firedAt);
|
||||
else
|
||||
_logger.LogWarning("Prime failed {Id}: {Msg}", schedule.Id, outcome.Message);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user