Slice 5 of the worker state consolidation refactor.
OverrideSlotService (new in Worker/Queue/) owns RunNow, ContinueTask,
and the override-slot piece of CancelTask. QueueService keeps the
queue-slot guard for "task is already running" rejection and delegates
to OverrideSlotService for execution; CancelTask tries the override
slot first, then the queue slot. QueueSlotState is extracted to its own
file.
Folder reorg (via git mv to preserve history):
- Worker/Queue/ QueueService, OverrideSlotService, QueueSlotState
(alongside existing waker/picker)
- Worker/Lifecycle/ StaleTaskRecovery, TaskResetService, TaskMergeService
- Worker/Worktrees/ WorktreeMaintenanceService
- Worker/Agents/ AgentFileService, DefaultAgentSeeder
Worker/Services/ folder removed. All consumers updated to the new
namespaces (Program.cs, WorkerHub, ExternalMcpService,
PlanningMergeOrchestrator, all Worker tests).
OverrideSlotService is registered as a DI singleton in both the main
worker app and the external MCP app.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
152 lines
5.5 KiB
C#
152 lines
5.5 KiB
C#
using ClaudeDo.Data;
|
|
using ClaudeDo.Data.Models;
|
|
using ClaudeDo.Data.Repositories;
|
|
using ClaudeDo.Worker.Config;
|
|
using ClaudeDo.Worker.Hub;
|
|
using ClaudeDo.Worker.Queue;
|
|
using ClaudeDo.Worker.Runner;
|
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
|
|
|
namespace ClaudeDo.Worker.Tests.Services;
|
|
|
|
public sealed class QueueServiceSlotGuardTests : IDisposable
|
|
{
|
|
private readonly DbFixture _db = new();
|
|
private readonly ClaudeDoDbContext _ctx;
|
|
private readonly TaskRepository _taskRepo;
|
|
private readonly ListRepository _listRepo;
|
|
private readonly TagRepository _tagRepo;
|
|
private readonly WorkerConfig _cfg;
|
|
private readonly string _tempDir;
|
|
|
|
public QueueServiceSlotGuardTests()
|
|
{
|
|
_ctx = _db.CreateContext();
|
|
_taskRepo = new TaskRepository(_ctx);
|
|
_listRepo = new ListRepository(_ctx);
|
|
_tagRepo = new TagRepository(_ctx);
|
|
_tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_slotguard_{Guid.NewGuid():N}");
|
|
Directory.CreateDirectory(_tempDir);
|
|
_cfg = new WorkerConfig
|
|
{
|
|
SandboxRoot = Path.Combine(_tempDir, "sandbox"),
|
|
LogRoot = Path.Combine(_tempDir, "logs"),
|
|
QueueBackstopIntervalMs = 50,
|
|
};
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_ctx.Dispose();
|
|
_db.Dispose();
|
|
try { Directory.Delete(_tempDir, true); } catch { }
|
|
}
|
|
|
|
private QueueWaker _waker = null!;
|
|
|
|
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
|
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
|
{
|
|
var fake = new FakeClaudeProcess(handler);
|
|
var broadcaster = new HubBroadcaster(new FakeHubContext());
|
|
var dbFactory = _db.CreateFactory();
|
|
var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
|
var argsBuilder = new ClaudeArgsBuilder();
|
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
|
_waker = new QueueWaker();
|
|
var picker = new QueuePicker(dbFactory);
|
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot);
|
|
return (service, fake);
|
|
}
|
|
|
|
private async Task<string> SeedListWithAgentTagAsync()
|
|
{
|
|
var listId = Guid.NewGuid().ToString();
|
|
await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow });
|
|
var tags = await _tagRepo.GetAllAsync();
|
|
var agentTag = tags.First(t => t.Name == "agent");
|
|
await _listRepo.AddTagAsync(listId, agentTag.Id);
|
|
return listId;
|
|
}
|
|
|
|
private async Task<TaskEntity> SeedQueuedTaskAsync(string listId)
|
|
{
|
|
var task = new TaskEntity
|
|
{
|
|
Id = Guid.NewGuid().ToString(),
|
|
ListId = listId,
|
|
Title = "Guard test task",
|
|
Description = "Test",
|
|
Status = TaskStatus.Queued,
|
|
CreatedAt = DateTime.UtcNow,
|
|
};
|
|
await _taskRepo.AddAsync(task);
|
|
return task;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RunNow_Throws_When_Task_Already_Running_In_Queue_Slot()
|
|
{
|
|
var listId = await SeedListWithAgentTagAsync();
|
|
var task = await SeedQueuedTaskAsync(listId);
|
|
|
|
// Gate keeps the queue slot occupied indefinitely.
|
|
var tcs = new TaskCompletionSource<RunResult>();
|
|
var queuePickedUp = new TaskCompletionSource();
|
|
|
|
var (service, _) = CreateService(async (_, _, _, _, ct) =>
|
|
{
|
|
queuePickedUp.TrySetResult();
|
|
return await tcs.Task;
|
|
});
|
|
|
|
using var cts = new CancellationTokenSource();
|
|
await service.StartAsync(cts.Token);
|
|
_waker.Wake();
|
|
|
|
// Wait until the queue slot has actually picked up the task.
|
|
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
// Now the same taskId is in the queue slot — RunNow must reject it.
|
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => service.RunNow(task.Id));
|
|
Assert.Contains("already running", ex.Message);
|
|
|
|
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
|
|
cts.Cancel();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ContinueTask_Throws_When_Task_Already_Running_In_Queue_Slot()
|
|
{
|
|
var listId = await SeedListWithAgentTagAsync();
|
|
var task = await SeedQueuedTaskAsync(listId);
|
|
|
|
var tcs = new TaskCompletionSource<RunResult>();
|
|
var queuePickedUp = new TaskCompletionSource();
|
|
|
|
var (service, _) = CreateService(async (_, _, _, _, ct) =>
|
|
{
|
|
queuePickedUp.TrySetResult();
|
|
return await tcs.Task;
|
|
});
|
|
|
|
using var cts = new CancellationTokenSource();
|
|
await service.StartAsync(cts.Token);
|
|
_waker.Wake();
|
|
|
|
await queuePickedUp.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() =>
|
|
service.ContinueTask(task.Id, "follow-up"));
|
|
Assert.Contains("already running", ex.Message);
|
|
|
|
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
|
|
cts.Cancel();
|
|
}
|
|
}
|