diff --git a/src/ClaudeDo.Worker/Services/QueueService.cs b/src/ClaudeDo.Worker/Services/QueueService.cs index 5850cbb..33a35f4 100644 --- a/src/ClaudeDo.Worker/Services/QueueService.cs +++ b/src/ClaudeDo.Worker/Services/QueueService.cs @@ -68,6 +68,8 @@ public sealed class QueueService : BackgroundService lock (_lock) { + if (_queueSlot?.TaskId == taskId) + throw new InvalidOperationException("task is already running in queue slot"); if (_overrideSlot is not null) throw new InvalidOperationException("override slot busy"); @@ -92,10 +94,12 @@ public sealed class QueueService : BackgroundService ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); if (task.Status == Data.Models.TaskStatus.Running) - throw new InvalidOperationException("Task is currently running."); + throw new InvalidOperationException("task is already running"); lock (_lock) { + if (_queueSlot?.TaskId == taskId) + throw new InvalidOperationException("task is already running in queue slot"); if (_overrideSlot is not null) throw new InvalidOperationException("override slot busy"); diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs new file mode 100644 index 0000000..5157372 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs @@ -0,0 +1,146 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Services; + +public sealed class QueueServiceSlotGuardTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly ClaudeDoDbContext _ctx; + private readonly TaskRepository _taskRepo; + private readonly ListRepository _listRepo; + private readonly TagRepository _tagRepo; + private readonly WorkerConfig _cfg; + private readonly string _tempDir; + + public QueueServiceSlotGuardTests() + { + _ctx = _db.CreateContext(); + _taskRepo = new TaskRepository(_ctx); + _listRepo = new ListRepository(_ctx); + _tagRepo = new TagRepository(_ctx); + _tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_slotguard_{Guid.NewGuid():N}"); + Directory.CreateDirectory(_tempDir); + _cfg = new WorkerConfig + { + SandboxRoot = Path.Combine(_tempDir, "sandbox"), + LogRoot = Path.Combine(_tempDir, "logs"), + QueueBackstopIntervalMs = 50, + }; + } + + public void Dispose() + { + _ctx.Dispose(); + _db.Dispose(); + try { Directory.Delete(_tempDir, true); } catch { } + } + + private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( + Func, CancellationToken, Task>? handler = null) + { + var fake = new FakeClaudeProcess(handler); + var broadcaster = new HubBroadcaster(new FakeHubContext()); + var dbFactory = _db.CreateFactory(); + var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger.Instance); + var argsBuilder = new ClaudeArgsBuilder(); + var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, + NullLogger.Instance); + var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); + return (service, fake); + } + + private async Task SeedListWithAgentTagAsync() + { + var listId = Guid.NewGuid().ToString(); + await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow }); + var tags = await _tagRepo.GetAllAsync(); + var agentTag = tags.First(t => t.Name == "agent"); + await _listRepo.AddTagAsync(listId, agentTag.Id); + return listId; + } + + private async Task SeedQueuedTaskAsync(string listId) + { + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "Guard test task", + Description = "Test", + Status = TaskStatus.Queued, + CreatedAt = DateTime.UtcNow, + }; + await _taskRepo.AddAsync(task); + return task; + } + + [Fact] + public async Task RunNow_Throws_When_Task_Already_Running_In_Queue_Slot() + { + var listId = await SeedListWithAgentTagAsync(); + var task = await SeedQueuedTaskAsync(listId); + + // Gate keeps the queue slot occupied indefinitely. + var tcs = new TaskCompletionSource(); + var queuePickedUp = new TaskCompletionSource(); + + var (service, _) = CreateService(async (_, _, _, _, ct) => + { + queuePickedUp.TrySetResult(); + return await tcs.Task; + }); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + service.WakeQueue(); + + // Wait until the queue slot has actually picked up the task. + await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // Now the same taskId is in the queue slot — RunNow must reject it. + var ex = await Assert.ThrowsAsync(() => service.RunNow(task.Id)); + Assert.Contains("already running", ex.Message); + + tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); + cts.Cancel(); + } + + [Fact] + public async Task ContinueTask_Throws_When_Task_Already_Running_In_Queue_Slot() + { + var listId = await SeedListWithAgentTagAsync(); + var task = await SeedQueuedTaskAsync(listId); + + var tcs = new TaskCompletionSource(); + var queuePickedUp = new TaskCompletionSource(); + + var (service, _) = CreateService(async (_, _, _, _, ct) => + { + queuePickedUp.TrySetResult(); + return await tcs.Task; + }); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + service.WakeQueue(); + + await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + var ex = await Assert.ThrowsAsync(() => + service.ContinueTask(task.Id, "follow-up")); + Assert.Contains("already running", ex.Message); + + tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); + cts.Cancel(); + } +}