using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; using ClaudeDo.Worker.Tests.Infrastructure; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging.Abstractions; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Tests.Services; public sealed class QueueServiceSlotGuardTests : IDisposable { private readonly DbFixture _db = new(); private readonly ClaudeDoDbContext _ctx; private readonly TaskRepository _taskRepo; private readonly ListRepository _listRepo; private readonly TagRepository _tagRepo; private readonly WorkerConfig _cfg; private readonly string _tempDir; public QueueServiceSlotGuardTests() { _ctx = _db.CreateContext(); _taskRepo = new TaskRepository(_ctx); _listRepo = new ListRepository(_ctx); _tagRepo = new TagRepository(_ctx); _tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_slotguard_{Guid.NewGuid():N}"); Directory.CreateDirectory(_tempDir); _cfg = new WorkerConfig { SandboxRoot = Path.Combine(_tempDir, "sandbox"), LogRoot = Path.Combine(_tempDir, "logs"), QueueBackstopIntervalMs = 50, }; } public void Dispose() { _ctx.Dispose(); _db.Dispose(); try { Directory.Delete(_tempDir, true); } catch { } } private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( Func, CancellationToken, Task>? handler = null) { var fake = new FakeClaudeProcess(handler); var broadcaster = new HubBroadcaster(new FakeHubContext()); var dbFactory = _db.CreateFactory(); var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger.Instance); var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, NullLogger.Instance); var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); return (service, fake); } private async Task SeedListWithAgentTagAsync() { var listId = Guid.NewGuid().ToString(); await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow }); var tags = await _tagRepo.GetAllAsync(); var agentTag = tags.First(t => t.Name == "agent"); await _listRepo.AddTagAsync(listId, agentTag.Id); return listId; } private async Task SeedQueuedTaskAsync(string listId) { var task = new TaskEntity { Id = Guid.NewGuid().ToString(), ListId = listId, Title = "Guard test task", Description = "Test", Status = TaskStatus.Queued, CreatedAt = DateTime.UtcNow, }; await _taskRepo.AddAsync(task); return task; } [Fact] public async Task RunNow_Throws_When_Task_Already_Running_In_Queue_Slot() { var listId = await SeedListWithAgentTagAsync(); var task = await SeedQueuedTaskAsync(listId); // Gate keeps the queue slot occupied indefinitely. var tcs = new TaskCompletionSource(); var queuePickedUp = new TaskCompletionSource(); var (service, _) = CreateService(async (_, _, _, _, ct) => { queuePickedUp.TrySetResult(); return await tcs.Task; }); using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); service.WakeQueue(); // Wait until the queue slot has actually picked up the task. await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); // Now the same taskId is in the queue slot — RunNow must reject it. var ex = await Assert.ThrowsAsync(() => service.RunNow(task.Id)); Assert.Contains("already running", ex.Message); tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); cts.Cancel(); } [Fact] public async Task ContinueTask_Throws_When_Task_Already_Running_In_Queue_Slot() { var listId = await SeedListWithAgentTagAsync(); var task = await SeedQueuedTaskAsync(listId); var tcs = new TaskCompletionSource(); var queuePickedUp = new TaskCompletionSource(); var (service, _) = CreateService(async (_, _, _, _, ct) => { queuePickedUp.TrySetResult(); return await tcs.Task; }); using var cts = new CancellationTokenSource(); await service.StartAsync(cts.Token); service.WakeQueue(); await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); var ex = await Assert.ThrowsAsync(() => service.ContinueTask(task.Id, "follow-up")); Assert.Contains("already running", ex.Message); tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); cts.Cancel(); } }