From 064a9030764395ce805999c9a7c4b4cff0cfad25 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Mon, 27 Apr 2026 12:05:54 +0200 Subject: [PATCH] refactor(worker/queue): split queue waker and picker, auto-wake on enqueue Slice 3 of the worker state and queue consolidation refactor. - Add IQueueWaker / QueueWaker (singleton holding the wake semaphore). - Add IQueuePicker / QueuePicker; raw SQL UPDATE...RETURNING moves out of TaskRepository.GetNextQueuedAgentTaskAsync (deleted) and now also filters on blocked_by_task_id IS NULL and writes started_at on claim. - TaskStateService takes IQueueWaker directly; the Func indirection is gone. State transitions to Queued auto-wake the dispatcher. - QueueService waits via the shared waker and dispatches via the picker. - Drop explicit _queue.WakeQueue() calls in WorkerHub.QueuePlanningSubtasksAsync and ExternalMcpService.AddTask. The hub WakeQueue endpoint stays for diagnostics, delegating to _waker.Wake(). - Migrate tests; pre-existing flaky AppSettings/ExternalMcp tests untouched. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Repositories/TaskRepository.cs | 39 ---- .../External/ExternalMcpService.cs | 10 +- src/ClaudeDo.Worker/Hub/WorkerHub.cs | 8 +- src/ClaudeDo.Worker/Program.cs | 14 +- src/ClaudeDo.Worker/Queue/IQueuePicker.cs | 12 ++ src/ClaudeDo.Worker/Queue/IQueueWaker.cs | 11 + src/ClaudeDo.Worker/Queue/QueuePicker.cs | 52 +++++ src/ClaudeDo.Worker/Queue/QueueWaker.cs | 18 ++ src/ClaudeDo.Worker/Services/QueueService.cs | 35 +--- src/ClaudeDo.Worker/State/TaskStateService.cs | 11 +- .../External/ExternalMcpServiceTests.cs | 4 +- .../Hub/PlanningHubTests.cs | 2 +- .../Infrastructure/TaskStateServiceBuilder.cs | 17 +- .../Queue/QueuePickerTests.cs | 188 ++++++++++++++++++ .../TaskRepositoryPlanningTests.cs | 24 --- .../Repositories/TaskRepositoryTests.cs | 78 -------- .../Services/QueueServiceSlotGuardTests.cs | 11 +- .../Services/QueueServiceTests.cs | 11 +- 18 files changed, 354 insertions(+), 191 deletions(-) create mode 100644 src/ClaudeDo.Worker/Queue/IQueuePicker.cs create mode 100644 src/ClaudeDo.Worker/Queue/IQueueWaker.cs create mode 100644 src/ClaudeDo.Worker/Queue/QueuePicker.cs create mode 100644 src/ClaudeDo.Worker/Queue/QueueWaker.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs diff --git a/src/ClaudeDo.Data/Repositories/TaskRepository.cs b/src/ClaudeDo.Data/Repositories/TaskRepository.cs index 5c76d14..d383d2c 100644 --- a/src/ClaudeDo.Data/Repositories/TaskRepository.cs +++ b/src/ClaudeDo.Data/Repositories/TaskRepository.cs @@ -498,43 +498,4 @@ public sealed class TaskRepository } #endregion - - #region Queue selection - - public async Task GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default) - { - // Atomic queue claim: UPDATE + RETURNING in one statement prevents TOCTOU races. - // Uses raw SQL because EF cannot express UPDATE...RETURNING. - // Includes both task-level and list-level "agent" tag so lists tagged "agent" - // automatically enqueue all their tasks without per-task tagging. - // EF SQLite stores DateTime as "yyyy-MM-dd HH:mm:ss.fffffff" — use the same format for comparison. - var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff"); - var result = await _context.Tasks.FromSqlRaw(""" - UPDATE tasks SET status = 'running' - WHERE id = ( - SELECT t.id FROM tasks t - WHERE t.status = 'queued' - AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0}) - AND ( - EXISTS ( - SELECT 1 FROM task_tags tt - JOIN tags tg ON tg.id = tt.tag_id - WHERE tt.task_id = t.id AND tg.name = 'agent' - ) - OR EXISTS ( - SELECT 1 FROM list_tags lt - JOIN tags tg ON tg.id = lt.tag_id - WHERE lt.list_id = t.list_id AND tg.name = 'agent' - ) - ) - ORDER BY t.sort_order ASC, t.created_at ASC - LIMIT 1 - ) - RETURNING * - """, nowStr).ToListAsync(ct); - - return result.FirstOrDefault(); - } - - #endregion } diff --git a/src/ClaudeDo.Worker/External/ExternalMcpService.cs b/src/ClaudeDo.Worker/External/ExternalMcpService.cs index 4573cbe..d13e396 100644 --- a/src/ClaudeDo.Worker/External/ExternalMcpService.cs +++ b/src/ClaudeDo.Worker/External/ExternalMcpService.cs @@ -117,7 +117,7 @@ public sealed class ExternalMcpService ListId = listId, Title = title, Description = description, - Status = queueImmediately ? TaskStatus.Queued : TaskStatus.Manual, + Status = TaskStatus.Manual, CreatedAt = DateTime.UtcNow, CommitType = list.DefaultCommitType, CreatedBy = createdBy, @@ -128,7 +128,13 @@ public sealed class ExternalMcpService await _tasks.SetTagsAsync(entity.Id, tags, cancellationToken); if (queueImmediately) - _queue.WakeQueue(); + { + // Routes through TaskStateService so the queue is woken automatically. + var enqueue = await _state.EnqueueAsync(entity.Id, cancellationToken); + if (!enqueue.Ok) + throw new InvalidOperationException(enqueue.Reason ?? "Cannot enqueue task."); + entity.Status = TaskStatus.Queued; + } await _broadcaster.TaskUpdated(entity.Id); return ToDto(entity); diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index d1d4479..4e11591 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -3,6 +3,7 @@ using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Services; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; @@ -37,6 +38,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0"; private readonly QueueService _queue; + private readonly IQueueWaker _waker; private readonly AgentFileService _agentService; private readonly DefaultAgentSeeder _seeder; private readonly HubBroadcaster _broadcaster; @@ -52,6 +54,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub public WorkerHub( QueueService queue, + IQueueWaker waker, AgentFileService agentService, DefaultAgentSeeder seeder, HubBroadcaster broadcaster, @@ -66,6 +69,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub PlanningChainCoordinator planningChain) { _queue = queue; + _waker = waker; _agentService = agentService; _seeder = seeder; _broadcaster = broadcaster; @@ -99,8 +103,6 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub await _broadcaster.TaskUpdated(parentTaskId); foreach (var id in childIds) await _broadcaster.TaskUpdated(id); - - _queue.WakeQueue(); } public string Ping() => $"pong v{Version}"; @@ -162,7 +164,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub public bool CancelTask(string taskId) => _queue.CancelTask(taskId); - public void WakeQueue() => _queue.WakeQueue(); + public void WakeQueue() => _waker.Wake(); public async Task> GetAgents() => await _agentService.ScanAsync(); diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index bbd9044..60e2e25 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -5,6 +5,7 @@ using ClaudeDo.Worker.Config; using ClaudeDo.Worker.External; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; using ClaudeDo.Worker.State; @@ -42,14 +43,18 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -// Centralized status mutation. Use a delegate for WakeQueue to break the -// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle; -// Slice 3 will replace this with IQueueWaker. +// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker +// performs atomic Queued→Running claim. Both injected into the state service so it +// can wake the dispatcher without depending on QueueService directly. +builder.Services.AddSingleton(); +builder.Services.AddSingleton(sp => sp.GetRequiredService()); +builder.Services.AddSingleton(); + builder.Services.AddSingleton>(sp => () => sp.GetRequiredService()); builder.Services.AddSingleton(sp => new TaskStateService( sp.GetRequiredService>(), sp.GetRequiredService(), - () => sp.GetRequiredService().WakeQueue(), + sp.GetRequiredService(), sp.GetRequiredService(), sp.GetRequiredService>())); @@ -137,6 +142,7 @@ if (cfg.ExternalMcpPort > 0) externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); externalBuilder.Services.AddSingleton(app.Services.GetRequiredService>()); externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); + externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); externalBuilder.Services.AddScoped(sp => sp.GetRequiredService>().CreateDbContext()); externalBuilder.Services.AddScoped(); diff --git a/src/ClaudeDo.Worker/Queue/IQueuePicker.cs b/src/ClaudeDo.Worker/Queue/IQueuePicker.cs new file mode 100644 index 0000000..7b8dad2 --- /dev/null +++ b/src/ClaudeDo.Worker/Queue/IQueuePicker.cs @@ -0,0 +1,12 @@ +using ClaudeDo.Data.Models; + +namespace ClaudeDo.Worker.Queue; + +/// +/// Atomic queue claim. Returns the claimed task (already flipped to Running with +/// StartedAt set) or null if no eligible task is available. +/// +public interface IQueuePicker +{ + Task ClaimNextAsync(DateTime now, CancellationToken ct); +} diff --git a/src/ClaudeDo.Worker/Queue/IQueueWaker.cs b/src/ClaudeDo.Worker/Queue/IQueueWaker.cs new file mode 100644 index 0000000..644859c --- /dev/null +++ b/src/ClaudeDo.Worker/Queue/IQueueWaker.cs @@ -0,0 +1,11 @@ +namespace ClaudeDo.Worker.Queue; + +/// +/// Signals the queue dispatcher to check for new work. Wake() is non-blocking and +/// idempotent — multiple calls before the dispatcher consumes the signal collapse +/// into a single wake-up. +/// +public interface IQueueWaker +{ + void Wake(); +} diff --git a/src/ClaudeDo.Worker/Queue/QueuePicker.cs b/src/ClaudeDo.Worker/Queue/QueuePicker.cs new file mode 100644 index 0000000..aff911e --- /dev/null +++ b/src/ClaudeDo.Worker/Queue/QueuePicker.cs @@ -0,0 +1,52 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using Microsoft.EntityFrameworkCore; + +namespace ClaudeDo.Worker.Queue; + +public sealed class QueuePicker : IQueuePicker +{ + private readonly IDbContextFactory _dbFactory; + + public QueuePicker(IDbContextFactory dbFactory) + => _dbFactory = dbFactory; + + public async Task ClaimNextAsync(DateTime now, CancellationToken ct) + { + // Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races. + // Raw SQL because EF cannot express UPDATE...RETURNING. + // Eligible task must be Queued, unblocked, due (or unscheduled), and tagged 'agent' + // either directly or via its list. EF SQLite stores DateTime as + // "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison. + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff"); + var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff"); + + var rows = await ctx.Tasks.FromSqlRaw(""" + UPDATE tasks SET status = 'running', started_at = {1} + WHERE id = ( + SELECT t.id FROM tasks t + WHERE t.status = 'queued' + AND t.blocked_by_task_id IS NULL + AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0}) + AND ( + EXISTS ( + SELECT 1 FROM task_tags tt + JOIN tags tg ON tg.id = tt.tag_id + WHERE tt.task_id = t.id AND tg.name = 'agent' + ) + OR EXISTS ( + SELECT 1 FROM list_tags lt + JOIN tags tg ON tg.id = lt.tag_id + WHERE lt.list_id = t.list_id AND tg.name = 'agent' + ) + ) + ORDER BY t.sort_order ASC, t.created_at ASC + LIMIT 1 + ) + RETURNING * + """, nowStr, startedAtStr).ToListAsync(ct); + + return rows.FirstOrDefault(); + } +} diff --git a/src/ClaudeDo.Worker/Queue/QueueWaker.cs b/src/ClaudeDo.Worker/Queue/QueueWaker.cs new file mode 100644 index 0000000..26c1053 --- /dev/null +++ b/src/ClaudeDo.Worker/Queue/QueueWaker.cs @@ -0,0 +1,18 @@ +namespace ClaudeDo.Worker.Queue; + +/// +/// Owns the wake semaphore. Producers (state mutations, hub) call Wake(); +/// the queue dispatcher awaits WaitAsync. +/// +public sealed class QueueWaker : IQueueWaker +{ + private readonly SemaphoreSlim _signal = new(0, 1); + + public void Wake() + { + try { _signal.Release(); } + catch (SemaphoreFullException) { /* already signalled */ } + } + + public Task WaitAsync(CancellationToken ct) => _signal.WaitAsync(ct); +} diff --git a/src/ClaudeDo.Worker/Services/QueueService.cs b/src/ClaudeDo.Worker/Services/QueueService.cs index 33a35f4..ba42d46 100644 --- a/src/ClaudeDo.Worker/Services/QueueService.cs +++ b/src/ClaudeDo.Worker/Services/QueueService.cs @@ -2,6 +2,7 @@ using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Runner; using Microsoft.EntityFrameworkCore; @@ -20,23 +21,27 @@ public sealed class QueueService : BackgroundService private readonly TaskRunner _runner; private readonly WorkerConfig _cfg; private readonly ILogger _logger; + private readonly QueueWaker _waker; + private readonly IQueuePicker _picker; private readonly object _lock = new(); private volatile QueueSlotState? _queueSlot; private volatile QueueSlotState? _overrideSlot; - private readonly SemaphoreSlim _wakeSignal = new(0, 1); - public QueueService( IDbContextFactory dbFactory, TaskRunner runner, WorkerConfig cfg, - ILogger logger) + ILogger logger, + QueueWaker waker, + IQueuePicker picker) { _dbFactory = dbFactory; _runner = runner; _cfg = cfg; _logger = logger; + _waker = waker; + _picker = picker; } public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive() @@ -49,13 +54,6 @@ public sealed class QueueService : BackgroundService return list; } - public void WakeQueue() - { - // Release if not already signalled. - try { _wakeSignal.Release(); } - catch (SemaphoreFullException) { /* already signalled */ } - } - public async Task RunNow(string taskId) { using (var context = _dbFactory.CreateDbContext()) @@ -147,25 +145,14 @@ public sealed class QueueService : BackgroundService try { // Wait for wake signal or backstop timer. - var wakeTask = _wakeSignal.WaitAsync(stoppingToken); + var wakeTask = _waker.WaitAsync(stoppingToken); var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask(); await Task.WhenAny(wakeTask, timerTask); - // Drain wake signal if it fired. - if (wakeTask.IsCompletedSuccessfully) - { - // Good — signal consumed. - } - if (_queueSlot is not null) continue; - TaskEntity? task; - using (var context = _dbFactory.CreateDbContext()) - { - var taskRepo = new TaskRepository(context); - task = await taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken); - } + var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken); if (task is null) continue; lock (_lock) @@ -181,7 +168,7 @@ public sealed class QueueService : BackgroundService _logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id); lock (_lock) { _queueSlot = null; } cts.Dispose(); - WakeQueue(); // Check for next task immediately. + _waker.Wake(); // Check for next task immediately. }, TaskScheduler.Default); } } diff --git a/src/ClaudeDo.Worker/State/TaskStateService.cs b/src/ClaudeDo.Worker/State/TaskStateService.cs index dc976b0..dc6ca66 100644 --- a/src/ClaudeDo.Worker/State/TaskStateService.cs +++ b/src/ClaudeDo.Worker/State/TaskStateService.cs @@ -3,6 +3,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Queue; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -12,20 +13,20 @@ public sealed class TaskStateService : ITaskStateService { private readonly IDbContextFactory _dbFactory; private readonly HubBroadcaster _broadcaster; - private readonly Action _wakeQueue; + private readonly IQueueWaker _waker; private readonly PlanningChainCoordinator _chain; private readonly ILogger _logger; public TaskStateService( IDbContextFactory dbFactory, HubBroadcaster broadcaster, - Action wakeQueue, + IQueueWaker waker, PlanningChainCoordinator chain, ILogger logger) { _dbFactory = dbFactory; _broadcaster = broadcaster; - _wakeQueue = wakeQueue; + _waker = waker; _chain = chain; _logger = logger; } @@ -40,7 +41,7 @@ public sealed class TaskStateService : ITaskStateService if (affected == 0) return new TransitionResult(false, "Task not found or already running."); - _wakeQueue(); + _waker.Wake(); await _broadcaster.TaskUpdated(taskId); return new TransitionResult(true, null); } @@ -203,7 +204,7 @@ public sealed class TaskStateService : ITaskStateService .Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting) .ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct); - _wakeQueue(); + _waker.Wake(); await _broadcaster.TaskUpdated(taskId); return new TransitionResult(true, null); } diff --git a/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs b/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs index 1f1ce0c..ce5e3f5 100644 --- a/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs @@ -115,7 +115,9 @@ public sealed class ExternalMcpServiceTests : IDisposable var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg, NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); - return new QueueService(dbFactory, runner, cfg, NullLogger.Instance); + var waker = new ClaudeDo.Worker.Queue.QueueWaker(); + var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory); + return new QueueService(dbFactory, runner, cfg, NullLogger.Instance, waker, picker); } [Fact] diff --git a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs index 6e78a03..d112b7a 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs @@ -49,7 +49,7 @@ public sealed class PlanningHubTests : IDisposable private WorkerHub CreateHub() { var hub = new WorkerHub( - null!, null!, null!, null!, null!, null!, null!, null!, + null!, null!, null!, null!, null!, null!, null!, null!, null!, _planning, _launcher, null!, null!, null!); hub.Clients = new FakeHubCallerClients(_proxy); hub.Context = new FakeHubCallerContext(); diff --git a/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs b/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs index 3551677..5529b47 100644 --- a/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs +++ b/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs @@ -1,6 +1,7 @@ using ClaudeDo.Data; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging.Abstractions; @@ -15,23 +16,31 @@ public static class TaskStateServiceBuilder TaskStateService State, PlanningChainCoordinator Chain, CapturingHubContext Hub, - Func WakeCount); + Func WakeCount, + CountingQueueWaker Waker); public static Built Build(IDbContextFactory dbFactory) { var hub = new CapturingHubContext(); var broadcaster = new HubBroadcaster(hub); - var wakeCount = new int[1]; + var waker = new CountingQueueWaker(); TaskStateService? state = null; var chain = new PlanningChainCoordinator(dbFactory, () => state!); state = new TaskStateService( dbFactory, broadcaster, - () => Interlocked.Increment(ref wakeCount[0]), + waker, chain, NullLogger.Instance); - return new Built(state, chain, hub, () => Volatile.Read(ref wakeCount[0])); + return new Built(state, chain, hub, () => waker.Count, waker); } } + +public sealed class CountingQueueWaker : IQueueWaker +{ + private int _count; + public int Count => Volatile.Read(ref _count); + public void Wake() => Interlocked.Increment(ref _count); +} diff --git a/tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs b/tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs new file mode 100644 index 0000000..0ca4772 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Queue/QueuePickerTests.cs @@ -0,0 +1,188 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Queue; +using ClaudeDo.Worker.Tests.Infrastructure; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Queue; + +public sealed class QueuePickerTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly ClaudeDoDbContext _ctx; + private readonly TaskRepository _tasks; + private readonly ListRepository _lists; + private readonly TagRepository _tags; + private readonly QueuePicker _picker; + + public QueuePickerTests() + { + _ctx = _db.CreateContext(); + _tasks = new TaskRepository(_ctx); + _lists = new ListRepository(_ctx); + _tags = new TagRepository(_ctx); + _picker = new QueuePicker(_db.CreateFactory()); + } + + public void Dispose() + { + _ctx.Dispose(); + _db.Dispose(); + } + + private async Task CreateListAsync(bool listAgentTag = false) + { + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity + { + Id = listId, + Name = "Test", + CreatedAt = DateTime.UtcNow, + }); + if (listAgentTag) + { + var tagId = await _tags.GetOrCreateAsync("agent"); + await _lists.AddTagAsync(listId, tagId); + } + return listId; + } + + private async Task SeedAsync( + string listId, + TaskStatus status = TaskStatus.Queued, + DateTime? createdAt = null, + DateTime? scheduledFor = null, + string? blockedBy = null, + bool taskAgentTag = false, + int? sortOrder = null) + { + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "T", + Status = status, + CreatedAt = createdAt ?? DateTime.UtcNow, + ScheduledFor = scheduledFor, + BlockedByTaskId = blockedBy, + CommitType = "feat", + }; + await _tasks.AddAsync(task); + if (taskAgentTag) + { + var tagId = await _tags.GetOrCreateAsync("agent"); + await _tasks.AddTagAsync(task.Id, tagId); + } + if (sortOrder is not null) + { + task.SortOrder = sortOrder.Value; + await _tasks.UpdateAsync(task); + } + return task; + } + + [Fact] + public async Task ClaimNextAsync_Skips_TasksWithBlockedByTaskId() + { + var listId = await CreateListAsync(listAgentTag: true); + var blocker = await SeedAsync(listId); + await SeedAsync(listId, blockedBy: blocker.Id); + + // Only `blocker` is unblocked → it should be claimed; the second pick is null. + var first = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.NotNull(first); + Assert.Equal(blocker.Id, first!.Id); + + var second = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.Null(second); + } + + [Fact] + public async Task ClaimNextAsync_Skips_TasksWithoutAgentTag() + { + var listId = await CreateListAsync(listAgentTag: false); + await SeedAsync(listId); + + var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.Null(picked); + } + + [Fact] + public async Task ClaimNextAsync_Skips_FutureScheduledFor() + { + var listId = await CreateListAsync(listAgentTag: true); + await SeedAsync(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); + + var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.Null(picked); + } + + [Fact] + public async Task ClaimNextAsync_Skips_NonQueuedStatuses() + { + var listId = await CreateListAsync(listAgentTag: true); + await SeedAsync(listId, status: TaskStatus.Idle); + await SeedAsync(listId, status: TaskStatus.Running); + await SeedAsync(listId, status: TaskStatus.Done); + await SeedAsync(listId, status: TaskStatus.Failed); + await SeedAsync(listId, status: TaskStatus.Cancelled); + await SeedAsync(listId, status: TaskStatus.Manual); + await SeedAsync(listId, status: TaskStatus.Draft); + await SeedAsync(listId, status: TaskStatus.Planning); + await SeedAsync(listId, status: TaskStatus.Planned); + await SeedAsync(listId, status: TaskStatus.Waiting); + + var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.Null(picked); + } + + [Fact] + public async Task ClaimNextAsync_Picks_ByUserSortOrder_ThenCreatedAt() + { + var listId = await CreateListAsync(listAgentTag: true); + + // Created in order first, second; reorder so second is sort-order 0. + var first = await SeedAsync(listId, createdAt: DateTime.UtcNow.AddMinutes(-10)); + var second = await SeedAsync(listId, createdAt: DateTime.UtcNow); + await _tasks.ReorderAsync(listId, new[] { second.Id, first.Id }); + + var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None); + Assert.NotNull(picked); + Assert.Equal(second.Id, picked!.Id); + } + + [Fact] + public async Task ClaimNextAsync_FlipsToRunning_WithStartedAt() + { + var listId = await CreateListAsync(listAgentTag: true); + var task = await SeedAsync(listId); + + var before = DateTime.UtcNow; + var picked = await _picker.ClaimNextAsync(before, CancellationToken.None); + Assert.NotNull(picked); + + var loaded = await _tasks.GetByIdAsync(task.Id); + Assert.Equal(TaskStatus.Running, loaded!.Status); + Assert.NotNull(loaded.StartedAt); + } + + [Fact] + public async Task ClaimNextAsync_TwoParallelPickers_OnlyOneClaimsRow() + { + var listId = await CreateListAsync(listAgentTag: true); + await SeedAsync(listId); + + // Two pickers, same DB factory, racing each other. + var picker1 = new QueuePicker(_db.CreateFactory()); + var picker2 = new QueuePicker(_db.CreateFactory()); + + var t1 = Task.Run(() => picker1.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None)); + var t2 = Task.Run(() => picker2.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None)); + + var results = await Task.WhenAll(t1, t2); + + var nonNull = results.Where(r => r is not null).ToList(); + Assert.Single(nonNull); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryPlanningTests.cs b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryPlanningTests.cs index 94a4e9c..c1a6b9a 100644 --- a/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryPlanningTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryPlanningTests.cs @@ -302,28 +302,4 @@ public sealed class TaskRepositoryPlanningTests : IDisposable Assert.NotNull(stillThere); } - [Fact] - public async Task GetNextQueuedAgentTask_SkipsDraftPlanningPlanned() - { - var listId = await CreateListAsync(); - var agentTagId = await _tags.GetOrCreateAsync("agent"); - - async Task T(TaskStatus s, bool withTag, string? parent = null) - { - var t = MakeTask(listId, s, parentId: parent); - await _tasks.AddAsync(t); - if (withTag) await _tasks.AddTagAsync(t.Id, agentTagId); - return t; - } - - var planning = await T(TaskStatus.Planning, withTag: true); - var planned = await T(TaskStatus.Planned, withTag: true); - var draft = await T(TaskStatus.Draft, withTag: true, parent: planning.Id); - var queued = await T(TaskStatus.Queued, withTag: true); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - - Assert.NotNull(picked); - Assert.Equal(queued.Id, picked!.Id); - } } diff --git a/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs index 017f0d6..8516d20 100644 --- a/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs @@ -87,64 +87,6 @@ public sealed class TaskRepositoryTests : IDisposable Assert.Equal(entity.CommitType, loaded.CommitType); } - [Fact] - public async Task GetNextQueuedAgentTaskAsync_Returns_OldestWithAgentTag_ViaTaskTag() - { - var listId = await CreateListAsync(); - var agentTagId = await _tags.GetOrCreateAsync("agent"); - - var older = MakeTask(listId, createdAt: DateTime.UtcNow.AddMinutes(-10)); - var newer = MakeTask(listId, createdAt: DateTime.UtcNow); - await _tasks.AddAsync(older); - await _tasks.AddAsync(newer); - await _tasks.AddTagAsync(older.Id, agentTagId); - await _tasks.AddTagAsync(newer.Id, agentTagId); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - Assert.NotNull(picked); - Assert.Equal(older.Id, picked.Id); - } - - [Fact] - public async Task GetNextQueuedAgentTaskAsync_Returns_TaskWithAgentTag_ViaListTag() - { - var listId = await CreateListAsync(); - var agentTagId = await _tags.GetOrCreateAsync("agent"); - await _lists.AddTagAsync(listId, agentTagId); - - var task = MakeTask(listId); - await _tasks.AddAsync(task); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - Assert.NotNull(picked); - Assert.Equal(task.Id, picked.Id); - } - - [Fact] - public async Task GetNextQueuedAgentTaskAsync_ReturnsNull_WhenNoAgentTag() - { - var listId = await CreateListAsync(); - var task = MakeTask(listId); - await _tasks.AddAsync(task); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - Assert.Null(picked); - } - - [Fact] - public async Task GetNextQueuedAgentTaskAsync_Skips_FutureScheduledFor() - { - var listId = await CreateListAsync(); - var agentTagId = await _tags.GetOrCreateAsync("agent"); - - var task = MakeTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); - await _tasks.AddAsync(task); - await _tasks.AddTagAsync(task.Id, agentTagId); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - Assert.Null(picked); - } - [Fact] public async Task Transitions_MarkRunning_ThenMarkDone() { @@ -297,26 +239,6 @@ public sealed class TaskRepositoryTests : IDisposable Assert.Equal(0, reloadB!.SortOrder); } - [Fact] - public async Task GetNextQueuedAgentTaskAsync_Picks_ByUserSortOrder() - { - var listId = await CreateListAsync(); - var agentTagId = await _tags.GetOrCreateAsync("agent"); - await _lists.AddTagAsync(listId, agentTagId); - - // created in order first, second; then user reorders to put second on top. - var first = MakeTask(listId, createdAt: DateTime.UtcNow.AddMinutes(-10)); - var second = MakeTask(listId, createdAt: DateTime.UtcNow); - await _tasks.AddAsync(first); - await _tasks.AddAsync(second); - - await _tasks.ReorderAsync(listId, new[] { second.Id, first.Id }); - - var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); - Assert.NotNull(picked); - Assert.Equal(second.Id, picked!.Id); - } - [Fact] public async Task GetEffectiveTagsAsync_Returns_Union_Of_ListTags_And_TaskTags() { diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs index 73de686..0b701b7 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs @@ -3,6 +3,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; using ClaudeDo.Worker.Tests.Infrastructure; @@ -45,6 +46,8 @@ public sealed class QueueServiceSlotGuardTests : IDisposable try { Directory.Delete(_tempDir, true); } catch { } } + private QueueWaker _waker = null!; + private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( Func, CancellationToken, Task>? handler = null) { @@ -55,7 +58,9 @@ public sealed class QueueServiceSlotGuardTests : IDisposable var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); - var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); + _waker = new QueueWaker(); + var picker = new QueuePicker(dbFactory); + var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance, _waker, picker); return (service, fake); } @@ -102,7 +107,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); - service.WakeQueue(); + _waker.Wake(); // Wait until the queue slot has actually picked up the task. await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); @@ -132,7 +137,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); - service.WakeQueue(); + _waker.Wake(); await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs index 50410f2..6f1de6b 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs @@ -4,6 +4,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; using ClaudeDo.Worker.Tests.Infrastructure; @@ -46,6 +47,8 @@ public sealed class QueueServiceTests : IDisposable try { Directory.Delete(_tempDir, true); } catch { } } + private QueueWaker _waker = null!; + private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( Func, CancellationToken, Task>? handler = null) { @@ -56,7 +59,9 @@ public sealed class QueueServiceTests : IDisposable var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); - var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); + _waker = new QueueWaker(); + var picker = new QueuePicker(dbFactory); + var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance, _waker, picker); return (service, fake); } @@ -126,7 +131,7 @@ public sealed class QueueServiceTests : IDisposable // Start the service loop, wake it, give it time. await service.StartAsync(cts.Token); - service.WakeQueue(); + _waker.Wake(); await Task.Delay(200); cts.Cancel(); @@ -158,7 +163,7 @@ public sealed class QueueServiceTests : IDisposable using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); - service.WakeQueue(); + _waker.Wake(); // Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load). var deadline = DateTime.UtcNow.AddSeconds(5);