Files
ClaudeDo/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs
mika kuns 36484ed45a feat(worker,ui): wire EF Core into DI and update all consumers to IDbContextFactory
Worker and App Program.cs: replace SqliteConnectionFactory+SchemaInitializer
with AddDbContextFactory<ClaudeDoDbContext> + Database.Migrate(). Repos
changed from AddSingleton to AddScoped.

All singleton services (QueueService, StaleTaskRecovery, WorktreeManager,
TaskRunner) and singleton ViewModels (MainWindowViewModel, TaskDetailViewModel,
TaskListViewModel, TaskEditorViewModel) now take IDbContextFactory<ClaudeDoDbContext>
and create short-lived contexts per operation.

Test infrastructure: DbFixture now uses EF migrations instead of SchemaInitializer;
all test classes create contexts via DbFixture.CreateContext().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 08:59:24 +02:00

332 lines
11 KiB
C#

using ClaudeDo.Data;
using ClaudeDo.Data.Git;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Services;
using ClaudeDo.Worker.Tests.Infrastructure;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging.Abstractions;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Tests.Services;
public sealed class QueueServiceTests : IDisposable
{
private readonly DbFixture _db = new();
private readonly ClaudeDoDbContext _ctx;
private readonly TaskRepository _taskRepo;
private readonly ListRepository _listRepo;
private readonly TagRepository _tagRepo;
private readonly WorkerConfig _cfg;
private readonly string _tempDir;
public QueueServiceTests()
{
_ctx = _db.CreateContext();
_taskRepo = new TaskRepository(_ctx);
_listRepo = new ListRepository(_ctx);
_tagRepo = new TagRepository(_ctx);
_tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(_tempDir);
_cfg = new WorkerConfig
{
SandboxRoot = Path.Combine(_tempDir, "sandbox"),
LogRoot = Path.Combine(_tempDir, "logs"),
QueueBackstopIntervalMs = 50, // fast for tests
};
}
public void Dispose()
{
_ctx.Dispose();
_db.Dispose();
try { Directory.Delete(_tempDir, true); } catch { }
}
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{
var fake = new FakeClaudeProcess(handler);
var broadcaster = new HubBroadcaster(new FakeHubContext());
var dbFactory = _db.CreateFactory();
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
var argsBuilder = new ClaudeArgsBuilder();
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
NullLogger<TaskRunner>.Instance);
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
return (service, fake);
}
private async Task<(string listId, long agentTagId)> SeedListWithAgentTag()
{
var listId = Guid.NewGuid().ToString();
await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow });
var tags = await _tagRepo.GetAllAsync();
var agentTag = tags.First(t => t.Name == "agent");
await _listRepo.AddTagAsync(listId, agentTag.Id);
return (listId, agentTag.Id);
}
private async Task<TaskEntity> SeedQueuedTask(string listId, DateTime? scheduledFor = null, DateTime? createdAt = null)
{
var task = new TaskEntity
{
Id = Guid.NewGuid().ToString(),
ListId = listId,
Title = "Test task",
Description = "Do something",
Status = TaskStatus.Queued,
ScheduledFor = scheduledFor,
CreatedAt = createdAt ?? DateTime.UtcNow,
};
await _taskRepo.AddAsync(task);
return task;
}
[Fact]
public async Task RunNow_Throws_When_Override_Slot_Busy()
{
var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, ct) => tcs.Task);
var task1 = await SeedQueuedTask(listId);
var task2 = await SeedQueuedTask(listId);
await service.RunNow(task1.Id);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => service.RunNow(task2.Id));
Assert.Equal("override slot busy", ex.Message);
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
}
[Fact]
public async Task RunNow_Throws_For_Unknown_Task()
{
var (service, _) = CreateService();
await Assert.ThrowsAsync<KeyNotFoundException>(() => service.RunNow("nonexistent"));
}
[Fact]
public async Task Schedule_Filter_Skips_Future_Tasks()
{
var (listId, _) = await SeedListWithAgentTag();
await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
var (service, fake) = CreateService((_, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
using var cts = new CancellationTokenSource();
// Start the service loop, wake it, give it time.
await service.StartAsync(cts.Token);
service.WakeQueue();
await Task.Delay(200);
cts.Cancel();
// The fake should never have been called because the task is scheduled in the future.
Assert.Equal(0, fake.CallCount);
}
[Fact]
public async Task Queue_FIFO_Sequentiality()
{
var (listId, _) = await SeedListWithAgentTag();
var order = new List<string>();
var gate1 = new TaskCompletionSource();
var gate2 = new TaskCompletionSource();
var callCount = 0;
var (service, _) = CreateService(async (_, _, _, _, ct) =>
{
var n = Interlocked.Increment(ref callCount);
lock (order) { order.Add(n.ToString()); }
if (n == 1) await gate1.Task;
if (n == 2) gate2.SetResult();
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
});
await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2));
await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1));
using var cts = new CancellationTokenSource();
await service.StartAsync(cts.Token);
service.WakeQueue();
// Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load).
var deadline = DateTime.UtcNow.AddSeconds(5);
while (order.Count == 0 && DateTime.UtcNow < deadline)
await Task.Delay(20);
// Only task1 should be running (task2 waiting on the queue slot).
Assert.Single(order);
Assert.Equal("1", order[0]);
// Release first task.
gate1.SetResult();
// Wait for second task to complete.
await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(2, order.Count);
Assert.Equal("2", order[1]);
cts.Cancel();
}
[Fact]
public async Task CancelTask_Triggers_Cancellation()
{
var (listId, _) = await SeedListWithAgentTag();
var running = new TaskCompletionSource();
var cancelled = false;
var (service, _) = CreateService(async (_, _, _, _, ct) =>
{
running.SetResult();
try
{
await Task.Delay(Timeout.Infinite, ct);
}
catch (OperationCanceledException)
{
cancelled = true;
throw;
}
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
});
var task = await SeedQueuedTask(listId);
await service.RunNow(task.Id);
await running.Task.WaitAsync(TimeSpan.FromSeconds(5));
var result = service.CancelTask(task.Id);
Assert.True(result);
await Task.Delay(200);
Assert.True(cancelled);
}
[Fact]
public async Task RunNow_AutoRetries_On_Failure_With_SessionId()
{
var (listId, agentTagId) = await SeedListWithAgentTag();
var task = await SeedQueuedTask(listId);
var callCount = 0;
var (service, fake) = CreateService((prompt, dir, args, onLine, ct) =>
{
callCount++;
if (callCount == 1)
{
return Task.FromResult(new RunResult
{
ExitCode = 1,
ErrorMarkdown = "something broke",
SessionId = "sess-retry-test",
});
}
return Task.FromResult(new RunResult
{
ExitCode = 0,
ResultMarkdown = "fixed it",
SessionId = "sess-retry-test",
});
});
await service.StartAsync(CancellationToken.None);
await service.RunNow(task.Id);
// Wait for both runs to complete.
await Task.Delay(2000);
await service.StopAsync(CancellationToken.None);
Assert.Equal(2, callCount);
var finalTask = await _taskRepo.GetByIdAsync(task.Id);
Assert.NotNull(finalTask);
Assert.Equal(TaskStatus.Done, finalTask.Status);
}
[Fact]
public async Task GetActive_Returns_Running_Slots()
{
var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, _) => tcs.Task);
var task = await SeedQueuedTask(listId);
await service.RunNow(task.Id);
var active = service.GetActive();
Assert.Single(active);
Assert.Equal("override", active[0].slot);
Assert.Equal(task.Id, active[0].taskId);
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
}
}
#region Test doubles
internal sealed class FakeClaudeProcess : IClaudeProcess
{
private readonly Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>> _handler;
private int _callCount;
public int CallCount => _callCount;
public FakeClaudeProcess(
Func<string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{
_handler = handler ?? ((_, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
}
public async Task<RunResult> RunAsync(string arguments, string prompt, string workingDirectory,
Func<string, Task> onStdoutLine, CancellationToken ct)
{
Interlocked.Increment(ref _callCount);
return await _handler(prompt, workingDirectory, arguments, onStdoutLine, ct);
}
}
internal sealed class FakeHubContext : IHubContext<WorkerHub>
{
public IHubClients Clients { get; } = new FakeHubClients();
public IGroupManager Groups => throw new NotImplementedException();
}
internal sealed class FakeHubClients : IHubClients
{
private readonly FakeClientProxy _proxy = new();
public IClientProxy All => _proxy;
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds) => _proxy;
public IClientProxy Client(string connectionId) => _proxy;
public IClientProxy Clients(IReadOnlyList<string> connectionIds) => _proxy;
public IClientProxy Group(string groupName) => _proxy;
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds) => _proxy;
public IClientProxy Groups(IReadOnlyList<string> groupNames) => _proxy;
public IClientProxy User(string userId) => _proxy;
public IClientProxy Users(IReadOnlyList<string> userIds) => _proxy;
}
internal sealed class FakeClientProxy : IClientProxy
{
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) =>
Task.CompletedTask;
}
#endregion