refactor(worker/queue): split queue waker and picker, auto-wake on enqueue
Slice 3 of the worker state and queue consolidation refactor. - Add IQueueWaker / QueueWaker (singleton holding the wake semaphore). - Add IQueuePicker / QueuePicker; raw SQL UPDATE...RETURNING moves out of TaskRepository.GetNextQueuedAgentTaskAsync (deleted) and now also filters on blocked_by_task_id IS NULL and writes started_at on claim. - TaskStateService takes IQueueWaker directly; the Func<QueueService> indirection is gone. State transitions to Queued auto-wake the dispatcher. - QueueService waits via the shared waker and dispatches via the picker. - Drop explicit _queue.WakeQueue() calls in WorkerHub.QueuePlanningSubtasksAsync and ExternalMcpService.AddTask. The hub WakeQueue endpoint stays for diagnostics, delegating to _waker.Wake(). - Migrate tests; pre-existing flaky AppSettings/ExternalMcp tests untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -498,43 +498,4 @@ public sealed class TaskRepository
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region Queue selection
|
|
||||||
|
|
||||||
public async Task<TaskEntity?> GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default)
|
|
||||||
{
|
|
||||||
// Atomic queue claim: UPDATE + RETURNING in one statement prevents TOCTOU races.
|
|
||||||
// Uses raw SQL because EF cannot express UPDATE...RETURNING.
|
|
||||||
// Includes both task-level and list-level "agent" tag so lists tagged "agent"
|
|
||||||
// automatically enqueue all their tasks without per-task tagging.
|
|
||||||
// EF SQLite stores DateTime as "yyyy-MM-dd HH:mm:ss.fffffff" — use the same format for comparison.
|
|
||||||
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
|
||||||
var result = await _context.Tasks.FromSqlRaw("""
|
|
||||||
UPDATE tasks SET status = 'running'
|
|
||||||
WHERE id = (
|
|
||||||
SELECT t.id FROM tasks t
|
|
||||||
WHERE t.status = 'queued'
|
|
||||||
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
|
||||||
AND (
|
|
||||||
EXISTS (
|
|
||||||
SELECT 1 FROM task_tags tt
|
|
||||||
JOIN tags tg ON tg.id = tt.tag_id
|
|
||||||
WHERE tt.task_id = t.id AND tg.name = 'agent'
|
|
||||||
)
|
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM list_tags lt
|
|
||||||
JOIN tags tg ON tg.id = lt.tag_id
|
|
||||||
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
ORDER BY t.sort_order ASC, t.created_at ASC
|
|
||||||
LIMIT 1
|
|
||||||
)
|
|
||||||
RETURNING *
|
|
||||||
""", nowStr).ToListAsync(ct);
|
|
||||||
|
|
||||||
return result.FirstOrDefault();
|
|
||||||
}
|
|
||||||
|
|
||||||
#endregion
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ public sealed class ExternalMcpService
|
|||||||
ListId = listId,
|
ListId = listId,
|
||||||
Title = title,
|
Title = title,
|
||||||
Description = description,
|
Description = description,
|
||||||
Status = queueImmediately ? TaskStatus.Queued : TaskStatus.Manual,
|
Status = TaskStatus.Manual,
|
||||||
CreatedAt = DateTime.UtcNow,
|
CreatedAt = DateTime.UtcNow,
|
||||||
CommitType = list.DefaultCommitType,
|
CommitType = list.DefaultCommitType,
|
||||||
CreatedBy = createdBy,
|
CreatedBy = createdBy,
|
||||||
@@ -128,7 +128,13 @@ public sealed class ExternalMcpService
|
|||||||
await _tasks.SetTagsAsync(entity.Id, tags, cancellationToken);
|
await _tasks.SetTagsAsync(entity.Id, tags, cancellationToken);
|
||||||
|
|
||||||
if (queueImmediately)
|
if (queueImmediately)
|
||||||
_queue.WakeQueue();
|
{
|
||||||
|
// Routes through TaskStateService so the queue is woken automatically.
|
||||||
|
var enqueue = await _state.EnqueueAsync(entity.Id, cancellationToken);
|
||||||
|
if (!enqueue.Ok)
|
||||||
|
throw new InvalidOperationException(enqueue.Reason ?? "Cannot enqueue task.");
|
||||||
|
entity.Status = TaskStatus.Queued;
|
||||||
|
}
|
||||||
|
|
||||||
await _broadcaster.TaskUpdated(entity.Id);
|
await _broadcaster.TaskUpdated(entity.Id);
|
||||||
return ToDto(entity);
|
return ToDto(entity);
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
@@ -37,6 +38,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
|
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
|
||||||
|
|
||||||
private readonly QueueService _queue;
|
private readonly QueueService _queue;
|
||||||
|
private readonly IQueueWaker _waker;
|
||||||
private readonly AgentFileService _agentService;
|
private readonly AgentFileService _agentService;
|
||||||
private readonly DefaultAgentSeeder _seeder;
|
private readonly DefaultAgentSeeder _seeder;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
@@ -52,6 +54,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
|
|
||||||
public WorkerHub(
|
public WorkerHub(
|
||||||
QueueService queue,
|
QueueService queue,
|
||||||
|
IQueueWaker waker,
|
||||||
AgentFileService agentService,
|
AgentFileService agentService,
|
||||||
DefaultAgentSeeder seeder,
|
DefaultAgentSeeder seeder,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
@@ -66,6 +69,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
PlanningChainCoordinator planningChain)
|
PlanningChainCoordinator planningChain)
|
||||||
{
|
{
|
||||||
_queue = queue;
|
_queue = queue;
|
||||||
|
_waker = waker;
|
||||||
_agentService = agentService;
|
_agentService = agentService;
|
||||||
_seeder = seeder;
|
_seeder = seeder;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
@@ -99,8 +103,6 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
await _broadcaster.TaskUpdated(parentTaskId);
|
await _broadcaster.TaskUpdated(parentTaskId);
|
||||||
foreach (var id in childIds)
|
foreach (var id in childIds)
|
||||||
await _broadcaster.TaskUpdated(id);
|
await _broadcaster.TaskUpdated(id);
|
||||||
|
|
||||||
_queue.WakeQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public string Ping() => $"pong v{Version}";
|
public string Ping() => $"pong v{Version}";
|
||||||
@@ -162,7 +164,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
|
|
||||||
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
|
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
|
||||||
|
|
||||||
public void WakeQueue() => _queue.WakeQueue();
|
public void WakeQueue() => _waker.Wake();
|
||||||
|
|
||||||
public async Task<List<AgentInfo>> GetAgents() => await _agentService.ScanAsync();
|
public async Task<List<AgentInfo>> GetAgents() => await _agentService.ScanAsync();
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ using ClaudeDo.Worker.Config;
|
|||||||
using ClaudeDo.Worker.External;
|
using ClaudeDo.Worker.External;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
using ClaudeDo.Worker.State;
|
using ClaudeDo.Worker.State;
|
||||||
@@ -42,14 +43,18 @@ builder.Services.AddSingleton<PlanningAggregator>();
|
|||||||
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
||||||
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
||||||
|
|
||||||
// Centralized status mutation. Use a delegate for WakeQueue to break the
|
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
||||||
// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle;
|
// performs atomic Queued→Running claim. Both injected into the state service so it
|
||||||
// Slice 3 will replace this with IQueueWaker.
|
// can wake the dispatcher without depending on QueueService directly.
|
||||||
|
builder.Services.AddSingleton<QueueWaker>();
|
||||||
|
builder.Services.AddSingleton<IQueueWaker>(sp => sp.GetRequiredService<QueueWaker>());
|
||||||
|
builder.Services.AddSingleton<IQueuePicker, QueuePicker>();
|
||||||
|
|
||||||
builder.Services.AddSingleton<Func<ITaskStateService>>(sp => () => sp.GetRequiredService<ITaskStateService>());
|
builder.Services.AddSingleton<Func<ITaskStateService>>(sp => () => sp.GetRequiredService<ITaskStateService>());
|
||||||
builder.Services.AddSingleton<ITaskStateService>(sp => new TaskStateService(
|
builder.Services.AddSingleton<ITaskStateService>(sp => new TaskStateService(
|
||||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
||||||
sp.GetRequiredService<HubBroadcaster>(),
|
sp.GetRequiredService<HubBroadcaster>(),
|
||||||
() => sp.GetRequiredService<QueueService>().WakeQueue(),
|
sp.GetRequiredService<IQueueWaker>(),
|
||||||
sp.GetRequiredService<PlanningChainCoordinator>(),
|
sp.GetRequiredService<PlanningChainCoordinator>(),
|
||||||
sp.GetRequiredService<ILogger<TaskStateService>>()));
|
sp.GetRequiredService<ILogger<TaskStateService>>()));
|
||||||
|
|
||||||
@@ -137,6 +142,7 @@ if (cfg.ExternalMcpPort > 0)
|
|||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
||||||
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IQueueWaker>());
|
||||||
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
|
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
|
||||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
|
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
|
||||||
externalBuilder.Services.AddScoped<TaskRepository>();
|
externalBuilder.Services.AddScoped<TaskRepository>();
|
||||||
|
|||||||
12
src/ClaudeDo.Worker/Queue/IQueuePicker.cs
Normal file
12
src/ClaudeDo.Worker/Queue/IQueuePicker.cs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Atomic queue claim. Returns the claimed task (already flipped to Running with
|
||||||
|
/// StartedAt set) or null if no eligible task is available.
|
||||||
|
/// </summary>
|
||||||
|
public interface IQueuePicker
|
||||||
|
{
|
||||||
|
Task<TaskEntity?> ClaimNextAsync(DateTime now, CancellationToken ct);
|
||||||
|
}
|
||||||
11
src/ClaudeDo.Worker/Queue/IQueueWaker.cs
Normal file
11
src/ClaudeDo.Worker/Queue/IQueueWaker.cs
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Signals the queue dispatcher to check for new work. Wake() is non-blocking and
|
||||||
|
/// idempotent — multiple calls before the dispatcher consumes the signal collapse
|
||||||
|
/// into a single wake-up.
|
||||||
|
/// </summary>
|
||||||
|
public interface IQueueWaker
|
||||||
|
{
|
||||||
|
void Wake();
|
||||||
|
}
|
||||||
52
src/ClaudeDo.Worker/Queue/QueuePicker.cs
Normal file
52
src/ClaudeDo.Worker/Queue/QueuePicker.cs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
public sealed class QueuePicker : IQueuePicker
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
|
||||||
|
public QueuePicker(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
||||||
|
=> _dbFactory = dbFactory;
|
||||||
|
|
||||||
|
public async Task<TaskEntity?> ClaimNextAsync(DateTime now, CancellationToken ct)
|
||||||
|
{
|
||||||
|
// Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races.
|
||||||
|
// Raw SQL because EF cannot express UPDATE...RETURNING.
|
||||||
|
// Eligible task must be Queued, unblocked, due (or unscheduled), and tagged 'agent'
|
||||||
|
// either directly or via its list. EF SQLite stores DateTime as
|
||||||
|
// "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison.
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||||
|
var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||||
|
|
||||||
|
var rows = await ctx.Tasks.FromSqlRaw("""
|
||||||
|
UPDATE tasks SET status = 'running', started_at = {1}
|
||||||
|
WHERE id = (
|
||||||
|
SELECT t.id FROM tasks t
|
||||||
|
WHERE t.status = 'queued'
|
||||||
|
AND t.blocked_by_task_id IS NULL
|
||||||
|
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
||||||
|
AND (
|
||||||
|
EXISTS (
|
||||||
|
SELECT 1 FROM task_tags tt
|
||||||
|
JOIN tags tg ON tg.id = tt.tag_id
|
||||||
|
WHERE tt.task_id = t.id AND tg.name = 'agent'
|
||||||
|
)
|
||||||
|
OR EXISTS (
|
||||||
|
SELECT 1 FROM list_tags lt
|
||||||
|
JOIN tags tg ON tg.id = lt.tag_id
|
||||||
|
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
ORDER BY t.sort_order ASC, t.created_at ASC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
RETURNING *
|
||||||
|
""", nowStr, startedAtStr).ToListAsync(ct);
|
||||||
|
|
||||||
|
return rows.FirstOrDefault();
|
||||||
|
}
|
||||||
|
}
|
||||||
18
src/ClaudeDo.Worker/Queue/QueueWaker.cs
Normal file
18
src/ClaudeDo.Worker/Queue/QueueWaker.cs
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Owns the wake semaphore. Producers (state mutations, hub) call Wake();
|
||||||
|
/// the queue dispatcher awaits WaitAsync.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class QueueWaker : IQueueWaker
|
||||||
|
{
|
||||||
|
private readonly SemaphoreSlim _signal = new(0, 1);
|
||||||
|
|
||||||
|
public void Wake()
|
||||||
|
{
|
||||||
|
try { _signal.Release(); }
|
||||||
|
catch (SemaphoreFullException) { /* already signalled */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task WaitAsync(CancellationToken ct) => _signal.WaitAsync(ct);
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ using ClaudeDo.Data;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
@@ -20,23 +21,27 @@ public sealed class QueueService : BackgroundService
|
|||||||
private readonly TaskRunner _runner;
|
private readonly TaskRunner _runner;
|
||||||
private readonly WorkerConfig _cfg;
|
private readonly WorkerConfig _cfg;
|
||||||
private readonly ILogger<QueueService> _logger;
|
private readonly ILogger<QueueService> _logger;
|
||||||
|
private readonly QueueWaker _waker;
|
||||||
|
private readonly IQueuePicker _picker;
|
||||||
|
|
||||||
private readonly object _lock = new();
|
private readonly object _lock = new();
|
||||||
private volatile QueueSlotState? _queueSlot;
|
private volatile QueueSlotState? _queueSlot;
|
||||||
private volatile QueueSlotState? _overrideSlot;
|
private volatile QueueSlotState? _overrideSlot;
|
||||||
|
|
||||||
private readonly SemaphoreSlim _wakeSignal = new(0, 1);
|
|
||||||
|
|
||||||
public QueueService(
|
public QueueService(
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
TaskRunner runner,
|
TaskRunner runner,
|
||||||
WorkerConfig cfg,
|
WorkerConfig cfg,
|
||||||
ILogger<QueueService> logger)
|
ILogger<QueueService> logger,
|
||||||
|
QueueWaker waker,
|
||||||
|
IQueuePicker picker)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_runner = runner;
|
_runner = runner;
|
||||||
_cfg = cfg;
|
_cfg = cfg;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_waker = waker;
|
||||||
|
_picker = picker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||||
@@ -49,13 +54,6 @@ public sealed class QueueService : BackgroundService
|
|||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void WakeQueue()
|
|
||||||
{
|
|
||||||
// Release if not already signalled.
|
|
||||||
try { _wakeSignal.Release(); }
|
|
||||||
catch (SemaphoreFullException) { /* already signalled */ }
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task RunNow(string taskId)
|
public async Task RunNow(string taskId)
|
||||||
{
|
{
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
@@ -147,25 +145,14 @@ public sealed class QueueService : BackgroundService
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Wait for wake signal or backstop timer.
|
// Wait for wake signal or backstop timer.
|
||||||
var wakeTask = _wakeSignal.WaitAsync(stoppingToken);
|
var wakeTask = _waker.WaitAsync(stoppingToken);
|
||||||
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
|
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
|
||||||
|
|
||||||
await Task.WhenAny(wakeTask, timerTask);
|
await Task.WhenAny(wakeTask, timerTask);
|
||||||
|
|
||||||
// Drain wake signal if it fired.
|
|
||||||
if (wakeTask.IsCompletedSuccessfully)
|
|
||||||
{
|
|
||||||
// Good — signal consumed.
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_queueSlot is not null) continue;
|
if (_queueSlot is not null) continue;
|
||||||
|
|
||||||
TaskEntity? task;
|
var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken);
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
|
||||||
{
|
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
task = await taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken);
|
|
||||||
}
|
|
||||||
if (task is null) continue;
|
if (task is null) continue;
|
||||||
|
|
||||||
lock (_lock)
|
lock (_lock)
|
||||||
@@ -181,7 +168,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||||
lock (_lock) { _queueSlot = null; }
|
lock (_lock) { _queueSlot = null; }
|
||||||
cts.Dispose();
|
cts.Dispose();
|
||||||
WakeQueue(); // Check for next task immediately.
|
_waker.Wake(); // Check for next task immediately.
|
||||||
}, TaskScheduler.Default);
|
}, TaskScheduler.Default);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -12,20 +13,20 @@ public sealed class TaskStateService : ITaskStateService
|
|||||||
{
|
{
|
||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
private readonly Action _wakeQueue;
|
private readonly IQueueWaker _waker;
|
||||||
private readonly PlanningChainCoordinator _chain;
|
private readonly PlanningChainCoordinator _chain;
|
||||||
private readonly ILogger<TaskStateService> _logger;
|
private readonly ILogger<TaskStateService> _logger;
|
||||||
|
|
||||||
public TaskStateService(
|
public TaskStateService(
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
Action wakeQueue,
|
IQueueWaker waker,
|
||||||
PlanningChainCoordinator chain,
|
PlanningChainCoordinator chain,
|
||||||
ILogger<TaskStateService> logger)
|
ILogger<TaskStateService> logger)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
_wakeQueue = wakeQueue;
|
_waker = waker;
|
||||||
_chain = chain;
|
_chain = chain;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
@@ -40,7 +41,7 @@ public sealed class TaskStateService : ITaskStateService
|
|||||||
if (affected == 0)
|
if (affected == 0)
|
||||||
return new TransitionResult(false, "Task not found or already running.");
|
return new TransitionResult(false, "Task not found or already running.");
|
||||||
|
|
||||||
_wakeQueue();
|
_waker.Wake();
|
||||||
await _broadcaster.TaskUpdated(taskId);
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
return new TransitionResult(true, null);
|
return new TransitionResult(true, null);
|
||||||
}
|
}
|
||||||
@@ -203,7 +204,7 @@ public sealed class TaskStateService : ITaskStateService
|
|||||||
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
|
||||||
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
||||||
|
|
||||||
_wakeQueue();
|
_waker.Wake();
|
||||||
await _broadcaster.TaskUpdated(taskId);
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
return new TransitionResult(true, null);
|
return new TransitionResult(true, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,7 +115,9 @@ public sealed class ExternalMcpServiceTests : IDisposable
|
|||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance);
|
var waker = new ClaudeDo.Worker.Queue.QueueWaker();
|
||||||
|
var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory);
|
||||||
|
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance, waker, picker);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ public sealed class PlanningHubTests : IDisposable
|
|||||||
private WorkerHub CreateHub()
|
private WorkerHub CreateHub()
|
||||||
{
|
{
|
||||||
var hub = new WorkerHub(
|
var hub = new WorkerHub(
|
||||||
null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
_planning, _launcher, null!, null!, null!);
|
_planning, _launcher, null!, null!, null!);
|
||||||
hub.Clients = new FakeHubCallerClients(_proxy);
|
hub.Clients = new FakeHubCallerClients(_proxy);
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.State;
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
@@ -15,23 +16,31 @@ public static class TaskStateServiceBuilder
|
|||||||
TaskStateService State,
|
TaskStateService State,
|
||||||
PlanningChainCoordinator Chain,
|
PlanningChainCoordinator Chain,
|
||||||
CapturingHubContext Hub,
|
CapturingHubContext Hub,
|
||||||
Func<int> WakeCount);
|
Func<int> WakeCount,
|
||||||
|
CountingQueueWaker Waker);
|
||||||
|
|
||||||
public static Built Build(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
public static Built Build(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
||||||
{
|
{
|
||||||
var hub = new CapturingHubContext();
|
var hub = new CapturingHubContext();
|
||||||
var broadcaster = new HubBroadcaster(hub);
|
var broadcaster = new HubBroadcaster(hub);
|
||||||
var wakeCount = new int[1];
|
var waker = new CountingQueueWaker();
|
||||||
|
|
||||||
TaskStateService? state = null;
|
TaskStateService? state = null;
|
||||||
var chain = new PlanningChainCoordinator(dbFactory, () => state!);
|
var chain = new PlanningChainCoordinator(dbFactory, () => state!);
|
||||||
state = new TaskStateService(
|
state = new TaskStateService(
|
||||||
dbFactory,
|
dbFactory,
|
||||||
broadcaster,
|
broadcaster,
|
||||||
() => Interlocked.Increment(ref wakeCount[0]),
|
waker,
|
||||||
chain,
|
chain,
|
||||||
NullLogger<TaskStateService>.Instance);
|
NullLogger<TaskStateService>.Instance);
|
||||||
|
|
||||||
return new Built(state, chain, hub, () => Volatile.Read(ref wakeCount[0]));
|
return new Built(state, chain, hub, () => waker.Count, waker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public sealed class CountingQueueWaker : IQueueWaker
|
||||||
|
{
|
||||||
|
private int _count;
|
||||||
|
public int Count => Volatile.Read(ref _count);
|
||||||
|
public void Wake() => Interlocked.Increment(ref _count);
|
||||||
|
}
|
||||||
|
|||||||
188
tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs
Normal file
188
tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Queue;
|
||||||
|
|
||||||
|
public sealed class QueuePickerTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly DbFixture _db = new();
|
||||||
|
private readonly ClaudeDoDbContext _ctx;
|
||||||
|
private readonly TaskRepository _tasks;
|
||||||
|
private readonly ListRepository _lists;
|
||||||
|
private readonly TagRepository _tags;
|
||||||
|
private readonly QueuePicker _picker;
|
||||||
|
|
||||||
|
public QueuePickerTests()
|
||||||
|
{
|
||||||
|
_ctx = _db.CreateContext();
|
||||||
|
_tasks = new TaskRepository(_ctx);
|
||||||
|
_lists = new ListRepository(_ctx);
|
||||||
|
_tags = new TagRepository(_ctx);
|
||||||
|
_picker = new QueuePicker(_db.CreateFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
_ctx.Dispose();
|
||||||
|
_db.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<string> CreateListAsync(bool listAgentTag = false)
|
||||||
|
{
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
await _lists.AddAsync(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId,
|
||||||
|
Name = "Test",
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
if (listAgentTag)
|
||||||
|
{
|
||||||
|
var tagId = await _tags.GetOrCreateAsync("agent");
|
||||||
|
await _lists.AddTagAsync(listId, tagId);
|
||||||
|
}
|
||||||
|
return listId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<TaskEntity> SeedAsync(
|
||||||
|
string listId,
|
||||||
|
TaskStatus status = TaskStatus.Queued,
|
||||||
|
DateTime? createdAt = null,
|
||||||
|
DateTime? scheduledFor = null,
|
||||||
|
string? blockedBy = null,
|
||||||
|
bool taskAgentTag = false,
|
||||||
|
int? sortOrder = null)
|
||||||
|
{
|
||||||
|
var task = new TaskEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
ListId = listId,
|
||||||
|
Title = "T",
|
||||||
|
Status = status,
|
||||||
|
CreatedAt = createdAt ?? DateTime.UtcNow,
|
||||||
|
ScheduledFor = scheduledFor,
|
||||||
|
BlockedByTaskId = blockedBy,
|
||||||
|
CommitType = "feat",
|
||||||
|
};
|
||||||
|
await _tasks.AddAsync(task);
|
||||||
|
if (taskAgentTag)
|
||||||
|
{
|
||||||
|
var tagId = await _tags.GetOrCreateAsync("agent");
|
||||||
|
await _tasks.AddTagAsync(task.Id, tagId);
|
||||||
|
}
|
||||||
|
if (sortOrder is not null)
|
||||||
|
{
|
||||||
|
task.SortOrder = sortOrder.Value;
|
||||||
|
await _tasks.UpdateAsync(task);
|
||||||
|
}
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_Skips_TasksWithBlockedByTaskId()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
var blocker = await SeedAsync(listId);
|
||||||
|
await SeedAsync(listId, blockedBy: blocker.Id);
|
||||||
|
|
||||||
|
// Only `blocker` is unblocked → it should be claimed; the second pick is null.
|
||||||
|
var first = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.NotNull(first);
|
||||||
|
Assert.Equal(blocker.Id, first!.Id);
|
||||||
|
|
||||||
|
var second = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.Null(second);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_Skips_TasksWithoutAgentTag()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: false);
|
||||||
|
await SeedAsync(listId);
|
||||||
|
|
||||||
|
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.Null(picked);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_Skips_FutureScheduledFor()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
await SeedAsync(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
|
||||||
|
|
||||||
|
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.Null(picked);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_Skips_NonQueuedStatuses()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Idle);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Running);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Done);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Failed);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Cancelled);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Manual);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Draft);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Planning);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Planned);
|
||||||
|
await SeedAsync(listId, status: TaskStatus.Waiting);
|
||||||
|
|
||||||
|
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.Null(picked);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_Picks_ByUserSortOrder_ThenCreatedAt()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
|
||||||
|
// Created in order first, second; reorder so second is sort-order 0.
|
||||||
|
var first = await SeedAsync(listId, createdAt: DateTime.UtcNow.AddMinutes(-10));
|
||||||
|
var second = await SeedAsync(listId, createdAt: DateTime.UtcNow);
|
||||||
|
await _tasks.ReorderAsync(listId, new[] { second.Id, first.Id });
|
||||||
|
|
||||||
|
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
|
Assert.NotNull(picked);
|
||||||
|
Assert.Equal(second.Id, picked!.Id);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_FlipsToRunning_WithStartedAt()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
var task = await SeedAsync(listId);
|
||||||
|
|
||||||
|
var before = DateTime.UtcNow;
|
||||||
|
var picked = await _picker.ClaimNextAsync(before, CancellationToken.None);
|
||||||
|
Assert.NotNull(picked);
|
||||||
|
|
||||||
|
var loaded = await _tasks.GetByIdAsync(task.Id);
|
||||||
|
Assert.Equal(TaskStatus.Running, loaded!.Status);
|
||||||
|
Assert.NotNull(loaded.StartedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ClaimNextAsync_TwoParallelPickers_OnlyOneClaimsRow()
|
||||||
|
{
|
||||||
|
var listId = await CreateListAsync(listAgentTag: true);
|
||||||
|
await SeedAsync(listId);
|
||||||
|
|
||||||
|
// Two pickers, same DB factory, racing each other.
|
||||||
|
var picker1 = new QueuePicker(_db.CreateFactory());
|
||||||
|
var picker2 = new QueuePicker(_db.CreateFactory());
|
||||||
|
|
||||||
|
var t1 = Task.Run(() => picker1.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None));
|
||||||
|
var t2 = Task.Run(() => picker2.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None));
|
||||||
|
|
||||||
|
var results = await Task.WhenAll(t1, t2);
|
||||||
|
|
||||||
|
var nonNull = results.Where(r => r is not null).ToList();
|
||||||
|
Assert.Single(nonNull);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -302,28 +302,4 @@ public sealed class TaskRepositoryPlanningTests : IDisposable
|
|||||||
Assert.NotNull(stillThere);
|
Assert.NotNull(stillThere);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTask_SkipsDraftPlanningPlanned()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var agentTagId = await _tags.GetOrCreateAsync("agent");
|
|
||||||
|
|
||||||
async Task<TaskEntity> T(TaskStatus s, bool withTag, string? parent = null)
|
|
||||||
{
|
|
||||||
var t = MakeTask(listId, s, parentId: parent);
|
|
||||||
await _tasks.AddAsync(t);
|
|
||||||
if (withTag) await _tasks.AddTagAsync(t.Id, agentTagId);
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
||||||
var planning = await T(TaskStatus.Planning, withTag: true);
|
|
||||||
var planned = await T(TaskStatus.Planned, withTag: true);
|
|
||||||
var draft = await T(TaskStatus.Draft, withTag: true, parent: planning.Id);
|
|
||||||
var queued = await T(TaskStatus.Queued, withTag: true);
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
|
|
||||||
Assert.NotNull(picked);
|
|
||||||
Assert.Equal(queued.Id, picked!.Id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,64 +87,6 @@ public sealed class TaskRepositoryTests : IDisposable
|
|||||||
Assert.Equal(entity.CommitType, loaded.CommitType);
|
Assert.Equal(entity.CommitType, loaded.CommitType);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTaskAsync_Returns_OldestWithAgentTag_ViaTaskTag()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var agentTagId = await _tags.GetOrCreateAsync("agent");
|
|
||||||
|
|
||||||
var older = MakeTask(listId, createdAt: DateTime.UtcNow.AddMinutes(-10));
|
|
||||||
var newer = MakeTask(listId, createdAt: DateTime.UtcNow);
|
|
||||||
await _tasks.AddAsync(older);
|
|
||||||
await _tasks.AddAsync(newer);
|
|
||||||
await _tasks.AddTagAsync(older.Id, agentTagId);
|
|
||||||
await _tasks.AddTagAsync(newer.Id, agentTagId);
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
Assert.NotNull(picked);
|
|
||||||
Assert.Equal(older.Id, picked.Id);
|
|
||||||
}
|
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTaskAsync_Returns_TaskWithAgentTag_ViaListTag()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var agentTagId = await _tags.GetOrCreateAsync("agent");
|
|
||||||
await _lists.AddTagAsync(listId, agentTagId);
|
|
||||||
|
|
||||||
var task = MakeTask(listId);
|
|
||||||
await _tasks.AddAsync(task);
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
Assert.NotNull(picked);
|
|
||||||
Assert.Equal(task.Id, picked.Id);
|
|
||||||
}
|
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTaskAsync_ReturnsNull_WhenNoAgentTag()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var task = MakeTask(listId);
|
|
||||||
await _tasks.AddAsync(task);
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
Assert.Null(picked);
|
|
||||||
}
|
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTaskAsync_Skips_FutureScheduledFor()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var agentTagId = await _tags.GetOrCreateAsync("agent");
|
|
||||||
|
|
||||||
var task = MakeTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
|
|
||||||
await _tasks.AddAsync(task);
|
|
||||||
await _tasks.AddTagAsync(task.Id, agentTagId);
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
Assert.Null(picked);
|
|
||||||
}
|
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Transitions_MarkRunning_ThenMarkDone()
|
public async Task Transitions_MarkRunning_ThenMarkDone()
|
||||||
{
|
{
|
||||||
@@ -297,26 +239,6 @@ public sealed class TaskRepositoryTests : IDisposable
|
|||||||
Assert.Equal(0, reloadB!.SortOrder);
|
Assert.Equal(0, reloadB!.SortOrder);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
|
||||||
public async Task GetNextQueuedAgentTaskAsync_Picks_ByUserSortOrder()
|
|
||||||
{
|
|
||||||
var listId = await CreateListAsync();
|
|
||||||
var agentTagId = await _tags.GetOrCreateAsync("agent");
|
|
||||||
await _lists.AddTagAsync(listId, agentTagId);
|
|
||||||
|
|
||||||
// created in order first, second; then user reorders to put second on top.
|
|
||||||
var first = MakeTask(listId, createdAt: DateTime.UtcNow.AddMinutes(-10));
|
|
||||||
var second = MakeTask(listId, createdAt: DateTime.UtcNow);
|
|
||||||
await _tasks.AddAsync(first);
|
|
||||||
await _tasks.AddAsync(second);
|
|
||||||
|
|
||||||
await _tasks.ReorderAsync(listId, new[] { second.Id, first.Id });
|
|
||||||
|
|
||||||
var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow);
|
|
||||||
Assert.NotNull(picked);
|
|
||||||
Assert.Equal(second.Id, picked!.Id);
|
|
||||||
}
|
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task GetEffectiveTagsAsync_Returns_Union_Of_ListTags_And_TaskTags()
|
public async Task GetEffectiveTagsAsync_Returns_Union_Of_ListTags_And_TaskTags()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
@@ -45,6 +46,8 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
try { Directory.Delete(_tempDir, true); } catch { }
|
try { Directory.Delete(_tempDir, true); } catch { }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private QueueWaker _waker = null!;
|
||||||
|
|
||||||
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
||||||
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
||||||
{
|
{
|
||||||
@@ -55,7 +58,9 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
_waker = new QueueWaker();
|
||||||
|
var picker = new QueuePicker(dbFactory);
|
||||||
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +107,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
|
|
||||||
using var cts = new CancellationTokenSource();
|
using var cts = new CancellationTokenSource();
|
||||||
await service.StartAsync(cts.Token);
|
await service.StartAsync(cts.Token);
|
||||||
service.WakeQueue();
|
_waker.Wake();
|
||||||
|
|
||||||
// Wait until the queue slot has actually picked up the task.
|
// Wait until the queue slot has actually picked up the task.
|
||||||
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
@@ -132,7 +137,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
|
|
||||||
using var cts = new CancellationTokenSource();
|
using var cts = new CancellationTokenSource();
|
||||||
await service.StartAsync(cts.Token);
|
await service.StartAsync(cts.Token);
|
||||||
service.WakeQueue();
|
_waker.Wake();
|
||||||
|
|
||||||
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
@@ -46,6 +47,8 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
try { Directory.Delete(_tempDir, true); } catch { }
|
try { Directory.Delete(_tempDir, true); } catch { }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private QueueWaker _waker = null!;
|
||||||
|
|
||||||
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
||||||
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
||||||
{
|
{
|
||||||
@@ -56,7 +59,9 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
_waker = new QueueWaker();
|
||||||
|
var picker = new QueuePicker(dbFactory);
|
||||||
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,7 +131,7 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
|
|
||||||
// Start the service loop, wake it, give it time.
|
// Start the service loop, wake it, give it time.
|
||||||
await service.StartAsync(cts.Token);
|
await service.StartAsync(cts.Token);
|
||||||
service.WakeQueue();
|
_waker.Wake();
|
||||||
await Task.Delay(200);
|
await Task.Delay(200);
|
||||||
cts.Cancel();
|
cts.Cancel();
|
||||||
|
|
||||||
@@ -158,7 +163,7 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
|
|
||||||
using var cts = new CancellationTokenSource();
|
using var cts = new CancellationTokenSource();
|
||||||
await service.StartAsync(cts.Token);
|
await service.StartAsync(cts.Token);
|
||||||
service.WakeQueue();
|
_waker.Wake();
|
||||||
|
|
||||||
// Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load).
|
// Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load).
|
||||||
var deadline = DateTime.UtcNow.AddSeconds(5);
|
var deadline = DateTime.UtcNow.AddSeconds(5);
|
||||||
|
|||||||
Reference in New Issue
Block a user