using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Queue; public sealed class QueueService : BackgroundService { private readonly IDbContextFactory _dbFactory; private readonly TaskRunner _runner; private readonly WorkerConfig _cfg; private readonly ILogger _logger; private readonly QueueWaker _waker; private readonly IQueuePicker _picker; private readonly OverrideSlotService _override; private readonly ITaskStateService _state; private readonly object _lock = new(); private readonly Dictionary _queueSlots = new(); public QueueService( IDbContextFactory dbFactory, TaskRunner runner, WorkerConfig cfg, ILogger logger, QueueWaker waker, IQueuePicker picker, OverrideSlotService overrideSlot, ITaskStateService state) { _dbFactory = dbFactory; _runner = runner; _cfg = cfg; _logger = logger; _waker = waker; _picker = picker; _override = overrideSlot; _state = state; } public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive() { var list = new List<(string, string, DateTime)>(); lock (_lock) { foreach (var slot in _queueSlots.Values) list.Add(("queue", slot.TaskId, slot.StartedAt)); } var o = _override.CurrentSlot; if (o is not null) list.Add(("override", o.TaskId, o.StartedAt)); return list; } public Task RunNow(string taskId) { EnsureNotInQueueSlot(taskId); return _override.RunNow(taskId); } public Task ContinueTask(string taskId, string followUpPrompt) { EnsureNotInQueueSlot(taskId); return _override.ContinueTask(taskId, followUpPrompt); } private void EnsureNotInQueueSlot(string taskId) { lock (_lock) { if (_queueSlots.ContainsKey(taskId)) throw new InvalidOperationException("task is already running in queue slot"); } } public bool CancelTask(string taskId) { if (_override.TryCancel(taskId)) return true; lock (_lock) { if (_queueSlots.TryGetValue(taskId, out var slot)) { slot.Cts.Cancel(); return true; } } return false; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("QueueService started"); using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(_cfg.QueueBackstopIntervalMs)); while (!stoppingToken.IsCancellationRequested) { try { // Wait for wake signal or backstop timer. var wakeTask = _waker.WaitAsync(stoppingToken); var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask(); await Task.WhenAny(wakeTask, timerTask); var maxParallel = await GetMaxParallelAsync(stoppingToken); // Fill as many free slots as the limit allows. while (!stoppingToken.IsCancellationRequested) { lock (_lock) { if (_queueSlots.Count >= maxParallel) break; } var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken); if (task is null) break; lock (_lock) { var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); _queueSlots[task.Id] = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts }; _ = RunInSlotAsync(task.Id, cts.Token).ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id); lock (_lock) { _queueSlots.Remove(task.Id); } cts.Dispose(); _waker.Wake(); // Check for next task immediately. }, TaskScheduler.Default); } } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "QueueService loop error"); } } _logger.LogInformation("QueueService stopping"); } private async Task GetMaxParallelAsync(CancellationToken ct) { try { using var context = _dbFactory.CreateDbContext(); var settings = await new AppSettingsRepository(context).GetAsync(ct); return Math.Max(1, settings.MaxParallelExecutions); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to read max parallel executions; defaulting to 1"); return 1; } } private async Task RunInSlotAsync(string taskId, CancellationToken ct) { try { _logger.LogInformation("Starting task {TaskId} in queue slot", taskId); TaskEntity task; using (var context = _dbFactory.CreateDbContext()) { var taskRepo = new TaskRepository(context); task = await taskRepo.GetByIdAsync(taskId, ct) ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); } // A task re-queued from review carries reviewer feedback. Resume the prior // Claude session with that feedback as the next turn when a session exists; // otherwise fall back to a fresh run with the feedback folded into the prompt. if (!string.IsNullOrWhiteSpace(task.ReviewFeedback)) { var feedback = task.ReviewFeedback!; string? sessionId; using (var context = _dbFactory.CreateDbContext()) sessionId = (await new TaskRunRepository(context).GetLatestByTaskIdAsync(taskId, ct))?.SessionId; if (sessionId is not null) { await _runner.ContinueAsync(taskId, feedback, "queue", ct); } else { task.Description = string.IsNullOrWhiteSpace(task.Description) ? $"Reviewer feedback: {feedback}" : $"{task.Description}\n\nReviewer feedback: {feedback}"; await _runner.RunAsync(task, "queue", ct); } // Clear the consumed feedback only once the run reached a successful // terminal state, so a failed or cancelled run keeps it for a manual retry. TaskStatus statusAfter; using (var context = _dbFactory.CreateDbContext()) statusAfter = await context.Tasks.Where(t => t.Id == taskId) .Select(t => t.Status).FirstAsync(CancellationToken.None); if (statusAfter is TaskStatus.WaitingForReview or TaskStatus.Done) await _state.ClearReviewFeedbackAsync(taskId, CancellationToken.None); return; } await _runner.RunAsync(task, "queue", ct); } catch (Exception ex) { _logger.LogError(ex, "Slot runner error for task {TaskId}", taskId); } } }