Files
ClaudeDo/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs
Mika Kuns 01235d986f feat(worker,data): add git worktree support and conventional commits
GitService (in ClaudeDo.Data so the UI can reuse it) wraps the git CLI:
IsGitRepo, RevParseHead, WorktreeAdd/Remove, HasChanges, AddAll, Commit
(multi-line via -F -), DiffStat, BranchDelete, MergeFfOnly. Throws with
stderr on failure.

WorktreeManager owns the per-task lifecycle: validate working_dir is a
git repo (throws if not, no DB row written), create the worktree at
<repo>/../.claudedo-worktrees/<slug>/<id>/ (or central root per config),
insert the worktrees row. CommitIfChangedAsync skips when there are no
changes, otherwise commits and updates head_commit + diff_stat.

CommitMessageBuilder produces "{type}({list-slug}): {title<=60}" with a
blank-line-separated description (truncated to 400) and a permanent
"ClaudeDo-Task: <id>" trailer. Slug normalises whitespace + strips
non-alphanumerics. Newlines hard-coded to \n so git on Windows doesn't
choke on \r\n.

TaskRunner branches on list.WorkingDir: worktree path runs Claude in the
worktree, commits on success, broadcasts WorktreeUpdated; failure leaves
the worktree row active for inspection. Sandbox path unchanged.

Tests: 38 pass (12 new). GitRepoFixture spins up a real temp repo with a
seed commit; tests skip gracefully if `git` isn't on PATH.
CommitMessageBuilder fully unit-tested. WorktreeManager covers create,
no-change skip, real-commit, and non-repo failure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 13:29:26 +02:00

282 lines
9.6 KiB
C#

using ClaudeDo.Data.Git;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Services;
using ClaudeDo.Worker.Tests.Infrastructure;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging.Abstractions;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Tests.Services;
public sealed class QueueServiceTests : IDisposable
{
private readonly DbFixture _db = new();
private readonly TaskRepository _taskRepo;
private readonly ListRepository _listRepo;
private readonly TagRepository _tagRepo;
private readonly WorkerConfig _cfg;
private readonly string _tempDir;
public QueueServiceTests()
{
_taskRepo = new TaskRepository(_db.Factory);
_listRepo = new ListRepository(_db.Factory);
_tagRepo = new TagRepository(_db.Factory);
_tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(_tempDir);
_cfg = new WorkerConfig
{
SandboxRoot = Path.Combine(_tempDir, "sandbox"),
LogRoot = Path.Combine(_tempDir, "logs"),
QueueBackstopIntervalMs = 50, // fast for tests
};
}
public void Dispose()
{
_db.Dispose();
try { Directory.Delete(_tempDir, true); } catch { }
}
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{
var fake = new FakeClaudeProcess(handler);
var broadcaster = new HubBroadcaster(new FakeHubContext());
var wtRepo = new WorktreeRepository(_db.Factory);
var wtManager = new WorktreeManager(new GitService(), wtRepo, _cfg, NullLogger<WorktreeManager>.Instance);
var runner = new TaskRunner(fake, _taskRepo, _listRepo, broadcaster, wtManager, _cfg,
NullLogger<TaskRunner>.Instance);
var service = new QueueService(_taskRepo, runner, _cfg, NullLogger<QueueService>.Instance);
return (service, fake);
}
private async Task<(string listId, long agentTagId)> SeedListWithAgentTag()
{
var listId = Guid.NewGuid().ToString();
await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow });
var tags = await _tagRepo.GetAllAsync();
var agentTag = tags.First(t => t.Name == "agent");
await _listRepo.AddTagAsync(listId, agentTag.Id);
return (listId, agentTag.Id);
}
private async Task<TaskEntity> SeedQueuedTask(string listId, DateTime? scheduledFor = null, DateTime? createdAt = null)
{
var task = new TaskEntity
{
Id = Guid.NewGuid().ToString(),
ListId = listId,
Title = "Test task",
Description = "Do something",
Status = TaskStatus.Queued,
ScheduledFor = scheduledFor,
CreatedAt = createdAt ?? DateTime.UtcNow,
};
await _taskRepo.AddAsync(task);
return task;
}
[Fact]
public async Task RunNow_Throws_When_Override_Slot_Busy()
{
var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, _, ct) => tcs.Task);
var task1 = await SeedQueuedTask(listId);
var task2 = await SeedQueuedTask(listId);
await service.RunNow(task1.Id);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => service.RunNow(task2.Id));
Assert.Equal("override slot busy", ex.Message);
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
}
[Fact]
public async Task RunNow_Throws_For_Unknown_Task()
{
var (service, _) = CreateService();
await Assert.ThrowsAsync<KeyNotFoundException>(() => service.RunNow("nonexistent"));
}
[Fact]
public async Task Schedule_Filter_Skips_Future_Tasks()
{
var (listId, _) = await SeedListWithAgentTag();
await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
var (service, fake) = CreateService((_, _, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
using var cts = new CancellationTokenSource();
// Start the service loop, wake it, give it time.
await service.StartAsync(cts.Token);
service.WakeQueue();
await Task.Delay(200);
cts.Cancel();
// The fake should never have been called because the task is scheduled in the future.
Assert.Equal(0, fake.CallCount);
}
[Fact]
public async Task Queue_FIFO_Sequentiality()
{
var (listId, _) = await SeedListWithAgentTag();
var order = new List<string>();
var gate1 = new TaskCompletionSource();
var gate2 = new TaskCompletionSource();
var callCount = 0;
var (service, _) = CreateService(async (prompt, _, _, taskId, _, ct) =>
{
var n = Interlocked.Increment(ref callCount);
lock (order) { order.Add(taskId); }
if (n == 1) await gate1.Task;
if (n == 2) gate2.SetResult();
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
});
var task1 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2));
var task2 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1));
using var cts = new CancellationTokenSource();
await service.StartAsync(cts.Token);
service.WakeQueue();
await Task.Delay(200);
// Only task1 should be running (task2 waiting).
Assert.Single(order);
Assert.Equal(task1.Id, order[0]);
// Release first task.
gate1.SetResult();
// Wait for second task to complete.
await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(2, order.Count);
Assert.Equal(task2.Id, order[1]);
cts.Cancel();
}
[Fact]
public async Task CancelTask_Triggers_Cancellation()
{
var (listId, _) = await SeedListWithAgentTag();
var running = new TaskCompletionSource();
var cancelled = false;
var (service, _) = CreateService(async (_, _, _, _, _, ct) =>
{
running.SetResult();
try
{
await Task.Delay(Timeout.Infinite, ct);
}
catch (OperationCanceledException)
{
cancelled = true;
throw;
}
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
});
var task = await SeedQueuedTask(listId);
await service.RunNow(task.Id);
await running.Task.WaitAsync(TimeSpan.FromSeconds(5));
var result = service.CancelTask(task.Id);
Assert.True(result);
await Task.Delay(200);
Assert.True(cancelled);
}
[Fact]
public async Task GetActive_Returns_Running_Slots()
{
var (listId, _) = await SeedListWithAgentTag();
var tcs = new TaskCompletionSource<RunResult>();
var (service, _) = CreateService((_, _, _, _, _, _) => tcs.Task);
var task = await SeedQueuedTask(listId);
await service.RunNow(task.Id);
var active = service.GetActive();
Assert.Single(active);
Assert.Equal("override", active[0].slot);
Assert.Equal(task.Id, active[0].taskId);
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
}
}
#region Test doubles
internal sealed class FakeClaudeProcess : IClaudeProcess
{
private readonly Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>> _handler;
private int _callCount;
public int CallCount => _callCount;
public FakeClaudeProcess(
Func<string, string, string, string, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
{
_handler = handler ?? ((_, _, _, _, _, _) =>
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
}
public async Task<RunResult> RunAsync(string prompt, string workingDirectory, string logPath, string taskId,
Func<string, Task> onStdoutLine, CancellationToken ct)
{
Interlocked.Increment(ref _callCount);
return await _handler(prompt, workingDirectory, logPath, taskId, onStdoutLine, ct);
}
}
internal sealed class FakeHubContext : IHubContext<WorkerHub>
{
public IHubClients Clients { get; } = new FakeHubClients();
public IGroupManager Groups => throw new NotImplementedException();
}
internal sealed class FakeHubClients : IHubClients
{
private readonly FakeClientProxy _proxy = new();
public IClientProxy All => _proxy;
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds) => _proxy;
public IClientProxy Client(string connectionId) => _proxy;
public IClientProxy Clients(IReadOnlyList<string> connectionIds) => _proxy;
public IClientProxy Group(string groupName) => _proxy;
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds) => _proxy;
public IClientProxy Groups(IReadOnlyList<string> groupNames) => _proxy;
public IClientProxy User(string userId) => _proxy;
public IClientProxy Users(IReadOnlyList<string> userIds) => _proxy;
}
internal sealed class FakeClientProxy : IClientProxy
{
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) =>
Task.CompletedTask;
}
#endregion