using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Runner; using Microsoft.EntityFrameworkCore; namespace ClaudeDo.Worker.Services; public sealed class QueueSlotState { public required string TaskId { get; init; } public required DateTime StartedAt { get; init; } public required CancellationTokenSource Cts { get; init; } } public sealed class QueueService : BackgroundService { private readonly IDbContextFactory _dbFactory; private readonly TaskRunner _runner; private readonly WorkerConfig _cfg; private readonly ILogger _logger; private readonly object _lock = new(); private volatile QueueSlotState? _queueSlot; private volatile QueueSlotState? _overrideSlot; private readonly SemaphoreSlim _wakeSignal = new(0, 1); public QueueService( IDbContextFactory dbFactory, TaskRunner runner, WorkerConfig cfg, ILogger logger) { _dbFactory = dbFactory; _runner = runner; _cfg = cfg; _logger = logger; } public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive() { var list = new List<(string, string, DateTime)>(); var q = _queueSlot; if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt)); var o = _overrideSlot; if (o is not null) list.Add(("override", o.TaskId, o.StartedAt)); return list; } public void WakeQueue() { // Release if not already signalled. try { _wakeSignal.Release(); } catch (SemaphoreFullException) { /* already signalled */ } } public async Task RunNow(string taskId) { using (var context = _dbFactory.CreateDbContext()) { var taskRepo = new TaskRepository(context); var exists = await taskRepo.GetByIdAsync(taskId); if (exists is null) throw new KeyNotFoundException($"Task '{taskId}' not found."); } lock (_lock) { if (_queueSlot?.TaskId == taskId) throw new InvalidOperationException("task is already running in queue slot"); if (_overrideSlot is not null) throw new InvalidOperationException("override slot busy"); var cts = new CancellationTokenSource(); _overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts }; _ = RunInSlotAsync(taskId, "override", cts.Token).ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId}", taskId); lock (_lock) { _overrideSlot = null; } cts.Dispose(); }, TaskScheduler.Default); } } public async Task ContinueTask(string taskId, string followUpPrompt) { using var context = _dbFactory.CreateDbContext(); var taskRepo = new TaskRepository(context); var task = await taskRepo.GetByIdAsync(taskId) ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); if (task.Status == Data.Models.TaskStatus.Running) throw new InvalidOperationException("task is already running"); lock (_lock) { if (_queueSlot?.TaskId == taskId) throw new InvalidOperationException("task is already running in queue slot"); if (_overrideSlot is not null) throw new InvalidOperationException("override slot busy"); var cts = new CancellationTokenSource(); _overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts }; _ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "RunContinueInSlotAsync failed for task {TaskId}", taskId); lock (_lock) { _overrideSlot = null; } cts.Dispose(); }, TaskScheduler.Default); } return taskId; } public bool CancelTask(string taskId) { lock (_lock) { if (_queueSlot is not null && _queueSlot.TaskId == taskId) { _queueSlot.Cts.Cancel(); return true; } if (_overrideSlot is not null && _overrideSlot.TaskId == taskId) { _overrideSlot.Cts.Cancel(); return true; } } return false; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("QueueService started"); using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(_cfg.QueueBackstopIntervalMs)); while (!stoppingToken.IsCancellationRequested) { try { // Wait for wake signal or backstop timer. var wakeTask = _wakeSignal.WaitAsync(stoppingToken); var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask(); await Task.WhenAny(wakeTask, timerTask); // Drain wake signal if it fired. if (wakeTask.IsCompletedSuccessfully) { // Good — signal consumed. } if (_queueSlot is not null) continue; TaskEntity? task; using (var context = _dbFactory.CreateDbContext()) { var taskRepo = new TaskRepository(context); task = await taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken); } if (task is null) continue; lock (_lock) { if (_queueSlot is not null) continue; var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); _queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts }; _ = RunInSlotAsync(task.Id, "queue", cts.Token).ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id); lock (_lock) { _queueSlot = null; } cts.Dispose(); WakeQueue(); // Check for next task immediately. }, TaskScheduler.Default); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "QueueService loop error"); } } _logger.LogInformation("QueueService stopping"); } private async Task RunInSlotAsync(string taskId, string slot, CancellationToken ct) { try { _logger.LogInformation("Starting task {TaskId} in {Slot} slot", taskId, slot); TaskEntity task; using (var context = _dbFactory.CreateDbContext()) { var taskRepo = new TaskRepository(context); task = await taskRepo.GetByIdAsync(taskId, ct) ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); } await _runner.RunAsync(task, slot, ct); } catch (Exception ex) { _logger.LogError(ex, "Slot runner error for task {TaskId}", taskId); } } private async Task RunContinueInSlotAsync(string taskId, string followUpPrompt, CancellationToken ct) { try { _logger.LogInformation("Continuing task {TaskId} in override slot", taskId); await _runner.ContinueAsync(taskId, followUpPrompt, "override", ct); } catch (Exception ex) { _logger.LogError(ex, "Continue runner error for task {TaskId}", taskId); } } }