refactor(worker): extract OverrideSlotService and reorganize Worker/Services into domain folders
Slice 5 of the worker state consolidation refactor.
OverrideSlotService (new in Worker/Queue/) owns RunNow, ContinueTask,
and the override-slot piece of CancelTask. QueueService keeps the
queue-slot guard for "task is already running" rejection and delegates
to OverrideSlotService for execution; CancelTask tries the override
slot first, then the queue slot. QueueSlotState is extracted to its own
file.
Folder reorg (via git mv to preserve history):
- Worker/Queue/ QueueService, OverrideSlotService, QueueSlotState
(alongside existing waker/picker)
- Worker/Lifecycle/ StaleTaskRecovery, TaskResetService, TaskMergeService
- Worker/Worktrees/ WorktreeMaintenanceService
- Worker/Agents/ AgentFileService, DefaultAgentSeeder
Worker/Services/ folder removed. All consumers updated to the new
namespaces (Program.cs, WorkerHub, ExternalMcpService,
PlanningMergeOrchestrator, all Worker tests).
OverrideSlotService is registered as a DI singleton in both the main
worker app and the external MCP app.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
159
src/ClaudeDo.Worker/Queue/QueueService.cs
Normal file
159
src/ClaudeDo.Worker/Queue/QueueService.cs
Normal file
@@ -0,0 +1,159 @@
|
||||
using ClaudeDo.Data;
|
||||
using ClaudeDo.Data.Models;
|
||||
using ClaudeDo.Data.Repositories;
|
||||
using ClaudeDo.Worker.Config;
|
||||
using ClaudeDo.Worker.Runner;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace ClaudeDo.Worker.Queue;
|
||||
|
||||
public sealed class QueueService : BackgroundService
|
||||
{
|
||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||
private readonly TaskRunner _runner;
|
||||
private readonly WorkerConfig _cfg;
|
||||
private readonly ILogger<QueueService> _logger;
|
||||
private readonly QueueWaker _waker;
|
||||
private readonly IQueuePicker _picker;
|
||||
private readonly OverrideSlotService _override;
|
||||
|
||||
private readonly object _lock = new();
|
||||
private volatile QueueSlotState? _queueSlot;
|
||||
|
||||
public QueueService(
|
||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||
TaskRunner runner,
|
||||
WorkerConfig cfg,
|
||||
ILogger<QueueService> logger,
|
||||
QueueWaker waker,
|
||||
IQueuePicker picker,
|
||||
OverrideSlotService overrideSlot)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_runner = runner;
|
||||
_cfg = cfg;
|
||||
_logger = logger;
|
||||
_waker = waker;
|
||||
_picker = picker;
|
||||
_override = overrideSlot;
|
||||
}
|
||||
|
||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||
{
|
||||
var list = new List<(string, string, DateTime)>();
|
||||
var q = _queueSlot;
|
||||
if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt));
|
||||
var o = _override.CurrentSlot;
|
||||
if (o is not null) list.Add(("override", o.TaskId, o.StartedAt));
|
||||
return list;
|
||||
}
|
||||
|
||||
public Task RunNow(string taskId)
|
||||
{
|
||||
EnsureNotInQueueSlot(taskId);
|
||||
return _override.RunNow(taskId);
|
||||
}
|
||||
|
||||
public Task<string> ContinueTask(string taskId, string followUpPrompt)
|
||||
{
|
||||
EnsureNotInQueueSlot(taskId);
|
||||
return _override.ContinueTask(taskId, followUpPrompt);
|
||||
}
|
||||
|
||||
private void EnsureNotInQueueSlot(string taskId)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_queueSlot?.TaskId == taskId)
|
||||
throw new InvalidOperationException("task is already running in queue slot");
|
||||
}
|
||||
}
|
||||
|
||||
public bool CancelTask(string taskId)
|
||||
{
|
||||
if (_override.TryCancel(taskId)) return true;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
if (_queueSlot is not null && _queueSlot.TaskId == taskId)
|
||||
{
|
||||
_queueSlot.Cts.Cancel();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("QueueService started");
|
||||
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(_cfg.QueueBackstopIntervalMs));
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Wait for wake signal or backstop timer.
|
||||
var wakeTask = _waker.WaitAsync(stoppingToken);
|
||||
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
|
||||
|
||||
await Task.WhenAny(wakeTask, timerTask);
|
||||
|
||||
if (_queueSlot is not null) continue;
|
||||
|
||||
var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken);
|
||||
if (task is null) continue;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
if (_queueSlot is not null) continue;
|
||||
|
||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||
|
||||
_ = RunInSlotAsync(task.Id, cts.Token).ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||
lock (_lock) { _queueSlot = null; }
|
||||
cts.Dispose();
|
||||
_waker.Wake(); // Check for next task immediately.
|
||||
}, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "QueueService loop error");
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("QueueService stopping");
|
||||
}
|
||||
|
||||
private async Task RunInSlotAsync(string taskId, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Starting task {TaskId} in queue slot", taskId);
|
||||
|
||||
TaskEntity task;
|
||||
using (var context = _dbFactory.CreateDbContext())
|
||||
{
|
||||
var taskRepo = new TaskRepository(context);
|
||||
task = await taskRepo.GetByIdAsync(taskId, ct)
|
||||
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||
}
|
||||
|
||||
await _runner.RunAsync(task, "queue", ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Slot runner error for task {TaskId}", taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user