using ClaudeDo.Data.Git; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; using ClaudeDo.Worker.Tests.Infrastructure; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging.Abstractions; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Tests.Services; public sealed class QueueServiceTests : IDisposable { private readonly DbFixture _db = new(); private readonly TaskRepository _taskRepo; private readonly ListRepository _listRepo; private readonly TagRepository _tagRepo; private readonly WorkerConfig _cfg; private readonly string _tempDir; public QueueServiceTests() { _taskRepo = new TaskRepository(_db.Factory); _listRepo = new ListRepository(_db.Factory); _tagRepo = new TagRepository(_db.Factory); _tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}"); Directory.CreateDirectory(_tempDir); _cfg = new WorkerConfig { SandboxRoot = Path.Combine(_tempDir, "sandbox"), LogRoot = Path.Combine(_tempDir, "logs"), QueueBackstopIntervalMs = 50, // fast for tests }; } public void Dispose() { _db.Dispose(); try { Directory.Delete(_tempDir, true); } catch { } } private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( Func, CancellationToken, Task>? handler = null) { var fake = new FakeClaudeProcess(handler); var broadcaster = new HubBroadcaster(new FakeHubContext()); var wtRepo = new WorktreeRepository(_db.Factory); var runRepo = new TaskRunRepository(_db.Factory); var wtManager = new WorktreeManager(new GitService(), wtRepo, _cfg, NullLogger.Instance); var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, _taskRepo, runRepo, _listRepo, wtRepo, broadcaster, wtManager, argsBuilder, _cfg, NullLogger.Instance); var service = new QueueService(_taskRepo, runner, _cfg, NullLogger.Instance); return (service, fake); } private async Task<(string listId, long agentTagId)> SeedListWithAgentTag() { var listId = Guid.NewGuid().ToString(); await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow }); var tags = await _tagRepo.GetAllAsync(); var agentTag = tags.First(t => t.Name == "agent"); await _listRepo.AddTagAsync(listId, agentTag.Id); return (listId, agentTag.Id); } private async Task SeedQueuedTask(string listId, DateTime? scheduledFor = null, DateTime? createdAt = null) { var task = new TaskEntity { Id = Guid.NewGuid().ToString(), ListId = listId, Title = "Test task", Description = "Do something", Status = TaskStatus.Queued, ScheduledFor = scheduledFor, CreatedAt = createdAt ?? DateTime.UtcNow, }; await _taskRepo.AddAsync(task); return task; } [Fact] public async Task RunNow_Throws_When_Override_Slot_Busy() { var (listId, _) = await SeedListWithAgentTag(); var tcs = new TaskCompletionSource(); var (service, _) = CreateService((_, _, _, _, ct) => tcs.Task); var task1 = await SeedQueuedTask(listId); var task2 = await SeedQueuedTask(listId); await service.RunNow(task1.Id); var ex = await Assert.ThrowsAsync(() => service.RunNow(task2.Id)); Assert.Equal("override slot busy", ex.Message); tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); } [Fact] public async Task RunNow_Throws_For_Unknown_Task() { var (service, _) = CreateService(); await Assert.ThrowsAsync(() => service.RunNow("nonexistent")); } [Fact] public async Task Schedule_Filter_Skips_Future_Tasks() { var (listId, _) = await SeedListWithAgentTag(); await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); var (service, fake) = CreateService((_, _, _, _, _) => Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); using var cts = new CancellationTokenSource(); // Start the service loop, wake it, give it time. await service.StartAsync(cts.Token); service.WakeQueue(); await Task.Delay(200); cts.Cancel(); // The fake should never have been called because the task is scheduled in the future. Assert.Equal(0, fake.CallCount); } [Fact] public async Task Queue_FIFO_Sequentiality() { var (listId, _) = await SeedListWithAgentTag(); var order = new List(); var gate1 = new TaskCompletionSource(); var gate2 = new TaskCompletionSource(); var callCount = 0; var (service, _) = CreateService(async (_, _, _, _, ct) => { var n = Interlocked.Increment(ref callCount); lock (order) { order.Add(n.ToString()); } if (n == 1) await gate1.Task; if (n == 2) gate2.SetResult(); return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; }); await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2)); await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1)); using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); service.WakeQueue(); // Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load). var deadline = DateTime.UtcNow.AddSeconds(5); while (order.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(20); // Only task1 should be running (task2 waiting on the queue slot). Assert.Single(order); Assert.Equal("1", order[0]); // Release first task. gate1.SetResult(); // Wait for second task to complete. await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal(2, order.Count); Assert.Equal("2", order[1]); cts.Cancel(); } [Fact] public async Task CancelTask_Triggers_Cancellation() { var (listId, _) = await SeedListWithAgentTag(); var running = new TaskCompletionSource(); var cancelled = false; var (service, _) = CreateService(async (_, _, _, _, ct) => { running.SetResult(); try { await Task.Delay(Timeout.Infinite, ct); } catch (OperationCanceledException) { cancelled = true; throw; } return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; }); var task = await SeedQueuedTask(listId); await service.RunNow(task.Id); await running.Task.WaitAsync(TimeSpan.FromSeconds(5)); var result = service.CancelTask(task.Id); Assert.True(result); await Task.Delay(200); Assert.True(cancelled); } [Fact] public async Task GetActive_Returns_Running_Slots() { var (listId, _) = await SeedListWithAgentTag(); var tcs = new TaskCompletionSource(); var (service, _) = CreateService((_, _, _, _, _) => tcs.Task); var task = await SeedQueuedTask(listId); await service.RunNow(task.Id); var active = service.GetActive(); Assert.Single(active); Assert.Equal("override", active[0].slot); Assert.Equal(task.Id, active[0].taskId); tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); } } #region Test doubles internal sealed class FakeClaudeProcess : IClaudeProcess { private readonly Func, CancellationToken, Task> _handler; private int _callCount; public int CallCount => _callCount; public FakeClaudeProcess( Func, CancellationToken, Task>? handler = null) { _handler = handler ?? ((_, _, _, _, _) => Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); } public async Task RunAsync(string arguments, string prompt, string workingDirectory, Func onStdoutLine, CancellationToken ct) { Interlocked.Increment(ref _callCount); return await _handler(prompt, workingDirectory, arguments, onStdoutLine, ct); } } internal sealed class FakeHubContext : IHubContext { public IHubClients Clients { get; } = new FakeHubClients(); public IGroupManager Groups => throw new NotImplementedException(); } internal sealed class FakeHubClients : IHubClients { private readonly FakeClientProxy _proxy = new(); public IClientProxy All => _proxy; public IClientProxy AllExcept(IReadOnlyList excludedConnectionIds) => _proxy; public IClientProxy Client(string connectionId) => _proxy; public IClientProxy Clients(IReadOnlyList connectionIds) => _proxy; public IClientProxy Group(string groupName) => _proxy; public IClientProxy GroupExcept(string groupName, IReadOnlyList excludedConnectionIds) => _proxy; public IClientProxy Groups(IReadOnlyList groupNames) => _proxy; public IClientProxy User(string userId) => _proxy; public IClientProxy Users(IReadOnlyList userIds) => _proxy; } internal sealed class FakeClientProxy : IClientProxy { public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) => Task.CompletedTask; } #endregion