refactor(worker/queue): split queue waker and picker, auto-wake on enqueue
Slice 3 of the worker state and queue consolidation refactor. - Add IQueueWaker / QueueWaker (singleton holding the wake semaphore). - Add IQueuePicker / QueuePicker; raw SQL UPDATE...RETURNING moves out of TaskRepository.GetNextQueuedAgentTaskAsync (deleted) and now also filters on blocked_by_task_id IS NULL and writes started_at on claim. - TaskStateService takes IQueueWaker directly; the Func<QueueService> indirection is gone. State transitions to Queued auto-wake the dispatcher. - QueueService waits via the shared waker and dispatches via the picker. - Drop explicit _queue.WakeQueue() calls in WorkerHub.QueuePlanningSubtasksAsync and ExternalMcpService.AddTask. The hub WakeQueue endpoint stays for diagnostics, delegating to _waker.Wake(). - Migrate tests; pre-existing flaky AppSettings/ExternalMcp tests untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -498,43 +498,4 @@ public sealed class TaskRepository
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Queue selection
|
||||
|
||||
public async Task<TaskEntity?> GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default)
|
||||
{
|
||||
// Atomic queue claim: UPDATE + RETURNING in one statement prevents TOCTOU races.
|
||||
// Uses raw SQL because EF cannot express UPDATE...RETURNING.
|
||||
// Includes both task-level and list-level "agent" tag so lists tagged "agent"
|
||||
// automatically enqueue all their tasks without per-task tagging.
|
||||
// EF SQLite stores DateTime as "yyyy-MM-dd HH:mm:ss.fffffff" — use the same format for comparison.
|
||||
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||
var result = await _context.Tasks.FromSqlRaw("""
|
||||
UPDATE tasks SET status = 'running'
|
||||
WHERE id = (
|
||||
SELECT t.id FROM tasks t
|
||||
WHERE t.status = 'queued'
|
||||
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
||||
AND (
|
||||
EXISTS (
|
||||
SELECT 1 FROM task_tags tt
|
||||
JOIN tags tg ON tg.id = tt.tag_id
|
||||
WHERE tt.task_id = t.id AND tg.name = 'agent'
|
||||
)
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM list_tags lt
|
||||
JOIN tags tg ON tg.id = lt.tag_id
|
||||
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
|
||||
)
|
||||
)
|
||||
ORDER BY t.sort_order ASC, t.created_at ASC
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING *
|
||||
""", nowStr).ToListAsync(ct);
|
||||
|
||||
return result.FirstOrDefault();
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ public sealed class ExternalMcpService
|
||||
ListId = listId,
|
||||
Title = title,
|
||||
Description = description,
|
||||
Status = queueImmediately ? TaskStatus.Queued : TaskStatus.Manual,
|
||||
Status = TaskStatus.Manual,
|
||||
CreatedAt = DateTime.UtcNow,
|
||||
CommitType = list.DefaultCommitType,
|
||||
CreatedBy = createdBy,
|
||||
@@ -128,7 +128,13 @@ public sealed class ExternalMcpService
|
||||
await _tasks.SetTagsAsync(entity.Id, tags, cancellationToken);
|
||||
|
||||
if (queueImmediately)
|
||||
_queue.WakeQueue();
|
||||
{
|
||||
// Routes through TaskStateService so the queue is woken automatically.
|
||||
var enqueue = await _state.EnqueueAsync(entity.Id, cancellationToken);
|
||||
if (!enqueue.Ok)
|
||||
throw new InvalidOperationException(enqueue.Reason ?? "Cannot enqueue task.");
|
||||
entity.Status = TaskStatus.Queued;
|
||||
}
|
||||
|
||||
await _broadcaster.TaskUpdated(entity.Id);
|
||||
return ToDto(entity);
|
||||
|
||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data;
|
||||
using ClaudeDo.Data.Models;
|
||||
using ClaudeDo.Data.Repositories;
|
||||
using ClaudeDo.Worker.Planning;
|
||||
using ClaudeDo.Worker.Queue;
|
||||
using ClaudeDo.Worker.Services;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
@@ -37,6 +38,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
|
||||
|
||||
private readonly QueueService _queue;
|
||||
private readonly IQueueWaker _waker;
|
||||
private readonly AgentFileService _agentService;
|
||||
private readonly DefaultAgentSeeder _seeder;
|
||||
private readonly HubBroadcaster _broadcaster;
|
||||
@@ -52,6 +54,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
|
||||
public WorkerHub(
|
||||
QueueService queue,
|
||||
IQueueWaker waker,
|
||||
AgentFileService agentService,
|
||||
DefaultAgentSeeder seeder,
|
||||
HubBroadcaster broadcaster,
|
||||
@@ -66,6 +69,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
PlanningChainCoordinator planningChain)
|
||||
{
|
||||
_queue = queue;
|
||||
_waker = waker;
|
||||
_agentService = agentService;
|
||||
_seeder = seeder;
|
||||
_broadcaster = broadcaster;
|
||||
@@ -99,8 +103,6 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
await _broadcaster.TaskUpdated(parentTaskId);
|
||||
foreach (var id in childIds)
|
||||
await _broadcaster.TaskUpdated(id);
|
||||
|
||||
_queue.WakeQueue();
|
||||
}
|
||||
|
||||
public string Ping() => $"pong v{Version}";
|
||||
@@ -162,7 +164,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||
|
||||
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
|
||||
|
||||
public void WakeQueue() => _queue.WakeQueue();
|
||||
public void WakeQueue() => _waker.Wake();
|
||||
|
||||
public async Task<List<AgentInfo>> GetAgents() => await _agentService.ScanAsync();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ using ClaudeDo.Worker.Config;
|
||||
using ClaudeDo.Worker.External;
|
||||
using ClaudeDo.Worker.Hub;
|
||||
using ClaudeDo.Worker.Planning;
|
||||
using ClaudeDo.Worker.Queue;
|
||||
using ClaudeDo.Worker.Runner;
|
||||
using ClaudeDo.Worker.Services;
|
||||
using ClaudeDo.Worker.State;
|
||||
@@ -42,14 +43,18 @@ builder.Services.AddSingleton<PlanningAggregator>();
|
||||
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
||||
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
||||
|
||||
// Centralized status mutation. Use a delegate for WakeQueue to break the
|
||||
// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle;
|
||||
// Slice 3 will replace this with IQueueWaker.
|
||||
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
||||
// performs atomic Queued→Running claim. Both injected into the state service so it
|
||||
// can wake the dispatcher without depending on QueueService directly.
|
||||
builder.Services.AddSingleton<QueueWaker>();
|
||||
builder.Services.AddSingleton<IQueueWaker>(sp => sp.GetRequiredService<QueueWaker>());
|
||||
builder.Services.AddSingleton<IQueuePicker, QueuePicker>();
|
||||
|
||||
builder.Services.AddSingleton<Func<ITaskStateService>>(sp => () => sp.GetRequiredService<ITaskStateService>());
|
||||
builder.Services.AddSingleton<ITaskStateService>(sp => new TaskStateService(
|
||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
||||
sp.GetRequiredService<HubBroadcaster>(),
|
||||
() => sp.GetRequiredService<QueueService>().WakeQueue(),
|
||||
sp.GetRequiredService<IQueueWaker>(),
|
||||
sp.GetRequiredService<PlanningChainCoordinator>(),
|
||||
sp.GetRequiredService<ILogger<TaskStateService>>()));
|
||||
|
||||
@@ -137,6 +142,7 @@ if (cfg.ExternalMcpPort > 0)
|
||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IQueueWaker>());
|
||||
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
|
||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
|
||||
externalBuilder.Services.AddScoped<TaskRepository>();
|
||||
|
||||
12
src/ClaudeDo.Worker/Queue/IQueuePicker.cs
Normal file
12
src/ClaudeDo.Worker/Queue/IQueuePicker.cs
Normal file
@@ -0,0 +1,12 @@
|
||||
using ClaudeDo.Data.Models;
|
||||
|
||||
namespace ClaudeDo.Worker.Queue;
|
||||
|
||||
/// <summary>
|
||||
/// Atomic queue claim. Returns the claimed task (already flipped to Running with
|
||||
/// StartedAt set) or null if no eligible task is available.
|
||||
/// </summary>
|
||||
public interface IQueuePicker
|
||||
{
|
||||
Task<TaskEntity?> ClaimNextAsync(DateTime now, CancellationToken ct);
|
||||
}
|
||||
11
src/ClaudeDo.Worker/Queue/IQueueWaker.cs
Normal file
11
src/ClaudeDo.Worker/Queue/IQueueWaker.cs
Normal file
@@ -0,0 +1,11 @@
|
||||
namespace ClaudeDo.Worker.Queue;
|
||||
|
||||
/// <summary>
|
||||
/// Signals the queue dispatcher to check for new work. Wake() is non-blocking and
|
||||
/// idempotent — multiple calls before the dispatcher consumes the signal collapse
|
||||
/// into a single wake-up.
|
||||
/// </summary>
|
||||
public interface IQueueWaker
|
||||
{
|
||||
void Wake();
|
||||
}
|
||||
52
src/ClaudeDo.Worker/Queue/QueuePicker.cs
Normal file
52
src/ClaudeDo.Worker/Queue/QueuePicker.cs
Normal file
@@ -0,0 +1,52 @@
|
||||
using ClaudeDo.Data;
|
||||
using ClaudeDo.Data.Models;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace ClaudeDo.Worker.Queue;
|
||||
|
||||
public sealed class QueuePicker : IQueuePicker
|
||||
{
|
||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||
|
||||
public QueuePicker(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
||||
=> _dbFactory = dbFactory;
|
||||
|
||||
public async Task<TaskEntity?> ClaimNextAsync(DateTime now, CancellationToken ct)
|
||||
{
|
||||
// Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races.
|
||||
// Raw SQL because EF cannot express UPDATE...RETURNING.
|
||||
// Eligible task must be Queued, unblocked, due (or unscheduled), and tagged 'agent'
|
||||
// either directly or via its list. EF SQLite stores DateTime as
|
||||
// "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison.
|
||||
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||
var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||
|
||||
var rows = await ctx.Tasks.FromSqlRaw("""
|
||||
UPDATE tasks SET status = 'running', started_at = {1}
|
||||
WHERE id = (
|
||||
SELECT t.id FROM tasks t
|
||||
WHERE t.status = 'queued'
|
||||
AND t.blocked_by_task_id IS NULL
|
||||
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
||||
AND (
|
||||
EXISTS (
|
||||
SELECT 1 FROM task_tags tt
|
||||
JOIN tags tg ON tg.id = tt.tag_id
|
||||
WHERE tt.task_id = t.id AND tg.name = 'agent'
|
||||
)
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM list_tags lt
|
||||
JOIN tags tg ON tg.id = lt.tag_id
|
||||
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
|
||||
)
|
||||
)
|
||||
ORDER BY t.sort_order ASC, t.created_at ASC
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING *
|
||||
""", nowStr, startedAtStr).ToListAsync(ct);
|
||||
|
||||
return rows.FirstOrDefault();
|
||||
}
|
||||
}
|
||||
18
src/ClaudeDo.Worker/Queue/QueueWaker.cs
Normal file
18
src/ClaudeDo.Worker/Queue/QueueWaker.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
namespace ClaudeDo.Worker.Queue;
|
||||
|
||||
/// <summary>
|
||||
/// Owns the wake semaphore. Producers (state mutations, hub) call Wake();
|
||||
/// the queue dispatcher awaits WaitAsync.
|
||||
/// </summary>
|
||||
public sealed class QueueWaker : IQueueWaker
|
||||
{
|
||||
private readonly SemaphoreSlim _signal = new(0, 1);
|
||||
|
||||
public void Wake()
|
||||
{
|
||||
try { _signal.Release(); }
|
||||
catch (SemaphoreFullException) { /* already signalled */ }
|
||||
}
|
||||
|
||||
public Task WaitAsync(CancellationToken ct) => _signal.WaitAsync(ct);
|
||||
}
|
||||
@@ -2,6 +2,7 @@ using ClaudeDo.Data;
|
||||
using ClaudeDo.Data.Models;
|
||||
using ClaudeDo.Data.Repositories;
|
||||
using ClaudeDo.Worker.Config;
|
||||
using ClaudeDo.Worker.Queue;
|
||||
using ClaudeDo.Worker.Runner;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
@@ -20,23 +21,27 @@ public sealed class QueueService : BackgroundService
|
||||
private readonly TaskRunner _runner;
|
||||
private readonly WorkerConfig _cfg;
|
||||
private readonly ILogger<QueueService> _logger;
|
||||
private readonly QueueWaker _waker;
|
||||
private readonly IQueuePicker _picker;
|
||||
|
||||
private readonly object _lock = new();
|
||||
private volatile QueueSlotState? _queueSlot;
|
||||
private volatile QueueSlotState? _overrideSlot;
|
||||
|
||||
private readonly SemaphoreSlim _wakeSignal = new(0, 1);
|
||||
|
||||
public QueueService(
|
||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||
TaskRunner runner,
|
||||
WorkerConfig cfg,
|
||||
ILogger<QueueService> logger)
|
||||
ILogger<QueueService> logger,
|
||||
QueueWaker waker,
|
||||
IQueuePicker picker)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_runner = runner;
|
||||
_cfg = cfg;
|
||||
_logger = logger;
|
||||
_waker = waker;
|
||||
_picker = picker;
|
||||
}
|
||||
|
||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||
@@ -49,13 +54,6 @@ public sealed class QueueService : BackgroundService
|
||||
return list;
|
||||
}
|
||||
|
||||
public void WakeQueue()
|
||||
{
|
||||
// Release if not already signalled.
|
||||
try { _wakeSignal.Release(); }
|
||||
catch (SemaphoreFullException) { /* already signalled */ }
|
||||
}
|
||||
|
||||
public async Task RunNow(string taskId)
|
||||
{
|
||||
using (var context = _dbFactory.CreateDbContext())
|
||||
@@ -147,25 +145,14 @@ public sealed class QueueService : BackgroundService
|
||||
try
|
||||
{
|
||||
// Wait for wake signal or backstop timer.
|
||||
var wakeTask = _wakeSignal.WaitAsync(stoppingToken);
|
||||
var wakeTask = _waker.WaitAsync(stoppingToken);
|
||||
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
|
||||
|
||||
await Task.WhenAny(wakeTask, timerTask);
|
||||
|
||||
// Drain wake signal if it fired.
|
||||
if (wakeTask.IsCompletedSuccessfully)
|
||||
{
|
||||
// Good — signal consumed.
|
||||
}
|
||||
|
||||
if (_queueSlot is not null) continue;
|
||||
|
||||
TaskEntity? task;
|
||||
using (var context = _dbFactory.CreateDbContext())
|
||||
{
|
||||
var taskRepo = new TaskRepository(context);
|
||||
task = await taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken);
|
||||
}
|
||||
var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken);
|
||||
if (task is null) continue;
|
||||
|
||||
lock (_lock)
|
||||
@@ -181,7 +168,7 @@ public sealed class QueueService : BackgroundService
|
||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||
lock (_lock) { _queueSlot = null; }
|
||||
cts.Dispose();
|
||||
WakeQueue(); // Check for next task immediately.
|
||||
_waker.Wake(); // Check for next task immediately.
|
||||
}, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
||||
using ClaudeDo.Data.Repositories;
|
||||
using ClaudeDo.Worker.Hub;
|
||||
using ClaudeDo.Worker.Planning;
|
||||
using ClaudeDo.Worker.Queue;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||
|
||||
@@ -12,20 +13,20 @@ public sealed class TaskStateService : ITaskStateService
|
||||
{
|
||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||
private readonly HubBroadcaster _broadcaster;
|
||||
private readonly Action _wakeQueue;
|
||||
private readonly IQueueWaker _waker;
|
||||
private readonly PlanningChainCoordinator _chain;
|
||||
private readonly ILogger<TaskStateService> _logger;
|
||||
|
||||
public TaskStateService(
|
||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||
HubBroadcaster broadcaster,
|
||||
Action wakeQueue,
|
||||
IQueueWaker waker,
|
||||
PlanningChainCoordinator chain,
|
||||
ILogger<TaskStateService> logger)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_broadcaster = broadcaster;
|
||||
_wakeQueue = wakeQueue;
|
||||
_waker = waker;
|
||||
_chain = chain;
|
||||
_logger = logger;
|
||||
}
|
||||
@@ -40,7 +41,7 @@ public sealed class TaskStateService : ITaskStateService
|
||||
if (affected == 0)
|
||||
return new TransitionResult(false, "Task not found or already running.");
|
||||
|
||||
_wakeQueue();
|
||||
_waker.Wake();
|
||||
await _broadcaster.TaskUpdated(taskId);
|
||||
return new TransitionResult(true, null);
|
||||
}
|
||||
@@ -203,7 +204,7 @@ public sealed class TaskStateService : ITaskStateService
|
||||
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
|
||||
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
||||
|
||||
_wakeQueue();
|
||||
_waker.Wake();
|
||||
await _broadcaster.TaskUpdated(taskId);
|
||||
return new TransitionResult(true, null);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user