- Extract FakeClaudeProcess to Infrastructure/FakeClaudeProcess.cs (was defined inline in QueueServiceTests #region); all consumers updated - Replace duplicate FakeHubContext/FakeHubClients/FakeClientProxy (QueueServiceTests) with existing CapturingHubContext from Infrastructure across all 7 affected files; Planning's file-local FakeHubContext kept - Rename SeedListWithAgentTag → SeedListAsync (return Task<string>, drop unused agentTagId tuple element) and SeedListWithAgentTagAsync → SeedListAsync - PrimeRunnerTests keeps its private nested FakeClaudeProcess: constructor API (delay/exitCode/lines/result params) differs from the shared one and replacement would require rewriting every test in that file Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
345 lines
12 KiB
C#
345 lines
12 KiB
C#
using ClaudeDo.Data;
|
|
using ClaudeDo.Data.Git;
|
|
using ClaudeDo.Data.Models;
|
|
using ClaudeDo.Data.Repositories;
|
|
using ClaudeDo.Worker.Config;
|
|
using ClaudeDo.Worker.Hub;
|
|
using ClaudeDo.Worker.Queue;
|
|
using ClaudeDo.Worker.Runner;
|
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
|
|
|
namespace ClaudeDo.Worker.Tests.Services;
|
|
|
|
public sealed class QueueServiceTests : IDisposable
|
|
{
|
|
private readonly DbFixture _db = new();
|
|
private readonly ClaudeDoDbContext _ctx;
|
|
private readonly TaskRepository _taskRepo;
|
|
private readonly ListRepository _listRepo;
|
|
private readonly WorkerConfig _cfg;
|
|
private readonly string _tempDir;
|
|
|
|
public QueueServiceTests()
|
|
{
|
|
_ctx = _db.CreateContext();
|
|
_taskRepo = new TaskRepository(_ctx);
|
|
_listRepo = new ListRepository(_ctx);
|
|
_tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}");
|
|
Directory.CreateDirectory(_tempDir);
|
|
_cfg = new WorkerConfig
|
|
{
|
|
SandboxRoot = Path.Combine(_tempDir, "sandbox"),
|
|
LogRoot = Path.Combine(_tempDir, "logs"),
|
|
QueueBackstopIntervalMs = 50, // fast for tests
|
|
};
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_ctx.Dispose();
|
|
_db.Dispose();
|
|
try { Directory.Delete(_tempDir, true); } catch { }
|
|
}
|
|
|
|
private QueueWaker _waker = null!;
|
|
|
|
private (QueueService service, FakeClaudeProcess fakeProcess) CreateService(
|
|
Func<string, string, IReadOnlyList<string>, Func<string, Task>, CancellationToken, Task<RunResult>>? handler = null)
|
|
{
|
|
var fake = new FakeClaudeProcess(handler);
|
|
var broadcaster = new HubBroadcaster(new CapturingHubContext());
|
|
var dbFactory = _db.CreateFactory();
|
|
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
|
var argsBuilder = new ClaudeArgsBuilder();
|
|
var state = TaskStateServiceBuilder.Build(dbFactory).State;
|
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
|
NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
|
|
_waker = new QueueWaker();
|
|
var picker = new QueuePicker(dbFactory);
|
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot, state);
|
|
return (service, fake);
|
|
}
|
|
|
|
private async Task<string> SeedListAsync()
|
|
{
|
|
var listId = Guid.NewGuid().ToString();
|
|
await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow });
|
|
return listId;
|
|
}
|
|
|
|
private async Task<TaskEntity> SeedQueuedTask(string listId, DateTime? scheduledFor = null, DateTime? createdAt = null)
|
|
{
|
|
var task = new TaskEntity
|
|
{
|
|
Id = Guid.NewGuid().ToString(),
|
|
ListId = listId,
|
|
Title = "Test task",
|
|
Description = "Do something",
|
|
Status = TaskStatus.Queued,
|
|
ScheduledFor = scheduledFor,
|
|
CreatedAt = createdAt ?? DateTime.UtcNow,
|
|
};
|
|
await _taskRepo.AddAsync(task);
|
|
return task;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RunNow_Throws_When_Override_Slot_Busy()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
var tcs = new TaskCompletionSource<RunResult>();
|
|
|
|
var (service, _) = CreateService((_, _, _, _, ct) => tcs.Task);
|
|
|
|
var task1 = await SeedQueuedTask(listId);
|
|
var task2 = await SeedQueuedTask(listId);
|
|
|
|
await service.RunNow(task1.Id);
|
|
|
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => service.RunNow(task2.Id));
|
|
Assert.Equal("override slot busy", ex.Message);
|
|
|
|
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RunNow_Throws_For_Unknown_Task()
|
|
{
|
|
var (service, _) = CreateService();
|
|
await Assert.ThrowsAsync<KeyNotFoundException>(() => service.RunNow("nonexistent"));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReQueuedReviewTask_ResumesSession_WithFeedbackPrompt_AndClearsFeedback()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
|
|
IReadOnlyList<string>? capturedArgs = null;
|
|
string? capturedPrompt = null;
|
|
var done = new TaskCompletionSource();
|
|
|
|
var (service, _) = CreateService((prompt, _, args, _, _) =>
|
|
{
|
|
capturedPrompt = prompt;
|
|
capturedArgs = args;
|
|
done.TrySetResult();
|
|
return Task.FromResult(new RunResult { ExitCode = 0, SessionId = "sess-2", ResultMarkdown = "ok" });
|
|
});
|
|
|
|
// A task that was reviewed and rejected: Queued + ReviewFeedback, with a prior run carrying a session id.
|
|
var task = new TaskEntity
|
|
{
|
|
Id = Guid.NewGuid().ToString(),
|
|
ListId = listId,
|
|
Title = "Reviewed task",
|
|
Status = TaskStatus.Queued,
|
|
ReviewFeedback = "fix the bug",
|
|
CreatedAt = DateTime.UtcNow,
|
|
};
|
|
await _taskRepo.AddAsync(task);
|
|
await new TaskRunRepository(_ctx).AddAsync(new TaskRunEntity
|
|
{
|
|
Id = Guid.NewGuid().ToString(),
|
|
TaskId = task.Id,
|
|
RunNumber = 1,
|
|
IsRetry = false,
|
|
Prompt = "original",
|
|
SessionId = "sess-1",
|
|
StartedAt = DateTime.UtcNow.AddMinutes(-1),
|
|
});
|
|
|
|
using var cts = new CancellationTokenSource();
|
|
await service.StartAsync(cts.Token);
|
|
_waker.Wake();
|
|
await done.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
Assert.NotNull(capturedArgs);
|
|
Assert.Contains("--resume", capturedArgs);
|
|
Assert.Contains("sess-1", capturedArgs);
|
|
Assert.Equal("fix the bug", capturedPrompt);
|
|
|
|
// Feedback is cleared after the run reaches a successful terminal state (post-run),
|
|
// so poll rather than asserting on the handler-fired instant.
|
|
var deadline = DateTime.UtcNow.AddSeconds(5);
|
|
TaskEntity? reloaded;
|
|
do
|
|
{
|
|
reloaded = await new TaskRepository(_db.CreateContext()).GetByIdAsync(task.Id);
|
|
if (reloaded?.ReviewFeedback is null) break;
|
|
await Task.Delay(25);
|
|
} while (DateTime.UtcNow < deadline);
|
|
cts.Cancel();
|
|
|
|
Assert.Equal(TaskStatus.WaitingForReview, reloaded!.Status);
|
|
Assert.Null(reloaded.ReviewFeedback);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Schedule_Filter_Skips_Future_Tasks()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1));
|
|
|
|
var (service, fake) = CreateService((_, _, _, _, _) =>
|
|
Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }));
|
|
|
|
using var cts = new CancellationTokenSource();
|
|
|
|
// Start the service loop, wake it, give it time.
|
|
await service.StartAsync(cts.Token);
|
|
_waker.Wake();
|
|
await Task.Delay(200);
|
|
cts.Cancel();
|
|
|
|
// The fake should never have been called because the task is scheduled in the future.
|
|
Assert.Equal(0, fake.CallCount);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Queue_FIFO_Sequentiality()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
|
|
var order = new List<string>();
|
|
var gate1 = new TaskCompletionSource();
|
|
var gate2 = new TaskCompletionSource();
|
|
var callCount = 0;
|
|
|
|
var (service, _) = CreateService(async (_, _, _, _, ct) =>
|
|
{
|
|
var n = Interlocked.Increment(ref callCount);
|
|
lock (order) { order.Add(n.ToString()); }
|
|
if (n == 1) await gate1.Task;
|
|
if (n == 2) gate2.SetResult();
|
|
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
|
|
});
|
|
|
|
await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2));
|
|
await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1));
|
|
|
|
using var cts = new CancellationTokenSource();
|
|
await service.StartAsync(cts.Token);
|
|
_waker.Wake();
|
|
|
|
// Wait until task1 has been picked up (poll instead of fixed delay to avoid flake under load).
|
|
var deadline = DateTime.UtcNow.AddSeconds(5);
|
|
while (order.Count == 0 && DateTime.UtcNow < deadline)
|
|
await Task.Delay(20);
|
|
|
|
// Only task1 should be running (task2 waiting on the queue slot).
|
|
Assert.Single(order);
|
|
Assert.Equal("1", order[0]);
|
|
|
|
// Release first task.
|
|
gate1.SetResult();
|
|
|
|
// Wait for second task to complete.
|
|
await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
Assert.Equal(2, order.Count);
|
|
Assert.Equal("2", order[1]);
|
|
|
|
cts.Cancel();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task CancelTask_Triggers_Cancellation()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
|
|
var running = new TaskCompletionSource();
|
|
var cancelled = false;
|
|
|
|
var (service, _) = CreateService(async (_, _, _, _, ct) =>
|
|
{
|
|
running.SetResult();
|
|
try
|
|
{
|
|
await Task.Delay(Timeout.Infinite, ct);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
cancelled = true;
|
|
throw;
|
|
}
|
|
return new RunResult { ExitCode = 0, ResultMarkdown = "ok" };
|
|
});
|
|
|
|
var task = await SeedQueuedTask(listId);
|
|
await service.RunNow(task.Id);
|
|
|
|
await running.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
|
|
|
var result = service.CancelTask(task.Id);
|
|
Assert.True(result);
|
|
|
|
await Task.Delay(200);
|
|
Assert.True(cancelled);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RunNow_AutoRetries_On_Failure_With_SessionId()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
var task = await SeedQueuedTask(listId);
|
|
|
|
var callCount = 0;
|
|
var (service, fake) = CreateService((prompt, dir, args, onLine, ct) =>
|
|
{
|
|
callCount++;
|
|
if (callCount == 1)
|
|
{
|
|
return Task.FromResult(new RunResult
|
|
{
|
|
ExitCode = 1,
|
|
ErrorMarkdown = "something broke",
|
|
SessionId = "sess-retry-test",
|
|
});
|
|
}
|
|
return Task.FromResult(new RunResult
|
|
{
|
|
ExitCode = 0,
|
|
ResultMarkdown = "fixed it",
|
|
SessionId = "sess-retry-test",
|
|
});
|
|
});
|
|
|
|
await service.StartAsync(CancellationToken.None);
|
|
await service.RunNow(task.Id);
|
|
|
|
// Wait for both runs to complete.
|
|
await Task.Delay(2000);
|
|
|
|
await service.StopAsync(CancellationToken.None);
|
|
|
|
Assert.Equal(2, callCount);
|
|
|
|
var finalTask = await _taskRepo.GetByIdAsync(task.Id);
|
|
Assert.NotNull(finalTask);
|
|
// A standalone task that completes successfully now gates on review.
|
|
Assert.Equal(TaskStatus.WaitingForReview, finalTask.Status);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetActive_Returns_Running_Slots()
|
|
{
|
|
var listId = await SeedListAsync();
|
|
var tcs = new TaskCompletionSource<RunResult>();
|
|
|
|
var (service, _) = CreateService((_, _, _, _, _) => tcs.Task);
|
|
|
|
var task = await SeedQueuedTask(listId);
|
|
await service.RunNow(task.Id);
|
|
|
|
var active = service.GetActive();
|
|
Assert.Single(active);
|
|
Assert.Equal("override", active[0].slot);
|
|
Assert.Equal(task.Id, active[0].taskId);
|
|
|
|
tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" });
|
|
}
|
|
}
|