diff --git a/src/ClaudeDo.Data/Repositories/TagRepository.cs b/src/ClaudeDo.Data/Repositories/TagRepository.cs index d73d9b7..d15fa0e 100644 --- a/src/ClaudeDo.Data/Repositories/TagRepository.cs +++ b/src/ClaudeDo.Data/Repositories/TagRepository.cs @@ -1,3 +1,4 @@ +using ClaudeDo.Data.Models; using Microsoft.Data.Sqlite; namespace ClaudeDo.Data.Repositories; @@ -8,6 +9,19 @@ public sealed class TagRepository public TagRepository(SqliteConnectionFactory factory) => _factory = factory; + public async Task> GetAllAsync(CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, name FROM tags ORDER BY id"; + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) }); + return result; + } + public async Task GetOrCreateAsync(string name, CancellationToken ct = default) { await using var conn = _factory.Open(); diff --git a/src/ClaudeDo.Data/Repositories/TaskRepository.cs b/src/ClaudeDo.Data/Repositories/TaskRepository.cs index 5bbb404..8db5b23 100644 --- a/src/ClaudeDo.Data/Repositories/TaskRepository.cs +++ b/src/ClaudeDo.Data/Repositories/TaskRepository.cs @@ -202,6 +202,16 @@ public sealed class TaskRepository #region Transitions + public async Task SetLogPathAsync(string taskId, string logPath, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE tasks SET log_path = @log_path WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + cmd.Parameters.AddWithValue("@log_path", logPath); + await cmd.ExecuteNonQueryAsync(ct); + } + public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default) { await using var conn = _factory.Open(); diff --git a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs new file mode 100644 index 0000000..d46d4bb --- /dev/null +++ b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs @@ -0,0 +1,25 @@ +using Microsoft.AspNetCore.SignalR; + +namespace ClaudeDo.Worker.Hub; + +public sealed class HubBroadcaster +{ + private readonly IHubContext _hub; + + public HubBroadcaster(IHubContext hub) => _hub = hub; + + public Task TaskStarted(string slot, string taskId, DateTime startedAt) => + _hub.Clients.All.SendAsync("TaskStarted", slot, taskId, startedAt); + + public Task TaskFinished(string slot, string taskId, string status, DateTime finishedAt) => + _hub.Clients.All.SendAsync("TaskFinished", slot, taskId, status, finishedAt); + + public Task TaskMessage(string taskId, string ndjsonLine) => + _hub.Clients.All.SendAsync("TaskMessage", taskId, ndjsonLine); + + public Task WorktreeUpdated(string taskId) => + _hub.Clients.All.SendAsync("WorktreeUpdated", taskId); + + public Task TaskUpdated(string taskId) => + _hub.Clients.All.SendAsync("TaskUpdated", taskId); +} diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index 24d32ab..6acbea3 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -1,16 +1,44 @@ using System.Reflection; +using ClaudeDo.Worker.Services; using Microsoft.AspNetCore.SignalR; namespace ClaudeDo.Worker.Hub; -/// -/// SignalR hub the UI connects to. Only is implemented at this stage; -/// RunNow/CancelTask/WakeQueue/GetActive land here once QueueService exists. -/// public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub { private static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0"; + private readonly QueueService _queue; + + public WorkerHub(QueueService queue) => _queue = queue; + public string Ping() => $"pong v{Version}"; + + public IReadOnlyList GetActive() + { + return _queue.GetActive() + .Select(a => (object)new { slot = a.slot, taskId = a.taskId, startedAt = a.startedAt }) + .ToList(); + } + + public async Task RunNow(string taskId) + { + try + { + await _queue.RunNow(taskId); + } + catch (InvalidOperationException) + { + throw new HubException("override slot busy"); + } + catch (KeyNotFoundException) + { + throw new HubException("task not found"); + } + } + + public bool CancelTask(string taskId) => _queue.CancelTask(taskId); + + public void WakeQueue() => _queue.WakeQueue(); } diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index a3e06ac..0f3d109 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -2,6 +2,7 @@ using ClaudeDo.Data; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; var cfg = WorkerConfig.Load(); @@ -21,6 +22,15 @@ builder.Services.AddSingleton(); builder.Services.AddHostedService(); builder.Services.AddSignalR(); +// Runner stack. +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); + +// QueueService: singleton + hosted service (same instance). +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => sp.GetRequiredService()); + // Loopback-only bind. Firewall is irrelevant for 127.0.0.1. builder.WebHost.UseUrls($"http://127.0.0.1:{cfg.SignalRPort}"); diff --git a/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs new file mode 100644 index 0000000..1a4d0a5 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs @@ -0,0 +1,96 @@ +using System.Diagnostics; +using System.Text; +using ClaudeDo.Worker.Config; + +namespace ClaudeDo.Worker.Runner; + +public sealed class ClaudeProcess : IClaudeProcess +{ + private readonly WorkerConfig _cfg; + private readonly ILogger _logger; + + public ClaudeProcess(WorkerConfig cfg, ILogger logger) + { + _cfg = cfg; + _logger = logger; + } + + public async Task RunAsync( + string prompt, + string workingDirectory, + string logPath, + string taskId, + Func onStdoutLine, + CancellationToken ct) + { + var psi = new ProcessStartInfo + { + FileName = _cfg.ClaudeBin, + Arguments = "-p --output-format stream-json --verbose --dangerously-skip-permissions", + WorkingDirectory = workingDirectory, + RedirectStandardInput = true, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true, + StandardOutputEncoding = Encoding.UTF8, + StandardErrorEncoding = Encoding.UTF8, + }; + + using var process = new Process { StartInfo = psi }; + process.Start(); + + // Write prompt to stdin, then close. + await process.StandardInput.WriteAsync(prompt); + process.StandardInput.Close(); + + string? resultMarkdown = null; + var lastStderr = new StringBuilder(); + + // Register cancellation to kill the process tree. + await using var ctr = ct.Register(() => + { + try { process.Kill(entireProcessTree: true); } + catch { /* already exited */ } + }); + + // Read stdout and stderr concurrently. + var stdoutTask = Task.Run(async () => + { + while (await process.StandardOutput.ReadLineAsync(ct) is { } line) + { + if (string.IsNullOrEmpty(line)) continue; + await onStdoutLine(line); + + if (MessageParser.TryExtractResult(line, out var res)) + resultMarkdown = res; + } + }, ct); + + var stderrTask = Task.Run(async () => + { + while (await process.StandardError.ReadLineAsync(ct) is { } line) + { + if (string.IsNullOrEmpty(line)) continue; + lastStderr.AppendLine(line); + await onStdoutLine($"[stderr] {line}"); + } + }, ct); + + await Task.WhenAll(stdoutTask, stderrTask); + await process.WaitForExitAsync(ct); + + var exitCode = process.ExitCode; + + if (exitCode == 0 && resultMarkdown is not null) + { + return new RunResult { ExitCode = exitCode, ResultMarkdown = resultMarkdown }; + } + + var error = lastStderr.Length > 0 + ? lastStderr.ToString().Trim() + : $"Claude exited with code {exitCode} and no result."; + + return new RunResult { ExitCode = exitCode, ErrorMarkdown = error }; + } +} diff --git a/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs b/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs new file mode 100644 index 0000000..e3fb057 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/IClaudeProcess.cs @@ -0,0 +1,12 @@ +namespace ClaudeDo.Worker.Runner; + +public interface IClaudeProcess +{ + Task RunAsync( + string prompt, + string workingDirectory, + string logPath, + string taskId, + Func onStdoutLine, + CancellationToken ct); +} diff --git a/src/ClaudeDo.Worker/Runner/LogWriter.cs b/src/ClaudeDo.Worker/Runner/LogWriter.cs new file mode 100644 index 0000000..a49b35c --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/LogWriter.cs @@ -0,0 +1,26 @@ +namespace ClaudeDo.Worker.Runner; + +public sealed class LogWriter : IAsyncDisposable +{ + private readonly StreamWriter _writer; + + public LogWriter(string filePath) + { + var dir = Path.GetDirectoryName(filePath); + if (dir is not null) + Directory.CreateDirectory(dir); + + _writer = new StreamWriter(filePath, append: true) { AutoFlush = true }; + } + + public async Task WriteLineAsync(string line, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + await _writer.WriteLineAsync(line.AsMemory(), ct); + } + + public async ValueTask DisposeAsync() + { + await _writer.DisposeAsync(); + } +} diff --git a/src/ClaudeDo.Worker/Runner/MessageParser.cs b/src/ClaudeDo.Worker/Runner/MessageParser.cs new file mode 100644 index 0000000..9343569 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/MessageParser.cs @@ -0,0 +1,33 @@ +using System.Text.Json; + +namespace ClaudeDo.Worker.Runner; + +public static class MessageParser +{ + public static bool TryExtractResult(string ndjsonLine, out string? result) + { + result = null; + if (string.IsNullOrWhiteSpace(ndjsonLine)) + return false; + + try + { + using var doc = JsonDocument.Parse(ndjsonLine); + var root = doc.RootElement; + + if (root.TryGetProperty("type", out var typeProp) && + typeProp.GetString() == "result" && + root.TryGetProperty("result", out var resultProp)) + { + result = resultProp.GetString(); + return true; + } + } + catch (JsonException) + { + // Malformed JSON — not a result line. + } + + return false; + } +} diff --git a/src/ClaudeDo.Worker/Runner/RunResult.cs b/src/ClaudeDo.Worker/Runner/RunResult.cs new file mode 100644 index 0000000..bed0cff --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/RunResult.cs @@ -0,0 +1,10 @@ +namespace ClaudeDo.Worker.Runner; + +public sealed class RunResult +{ + public required int ExitCode { get; init; } + public string? ResultMarkdown { get; init; } + public string? ErrorMarkdown { get; init; } + + public bool IsSuccess => ExitCode == 0 && ResultMarkdown is not null; +} diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs new file mode 100644 index 0000000..93f3f0e --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -0,0 +1,123 @@ +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Hub; + +namespace ClaudeDo.Worker.Runner; + +public sealed class TaskRunner +{ + private readonly IClaudeProcess _claude; + private readonly TaskRepository _taskRepo; + private readonly ListRepository _listRepo; + private readonly HubBroadcaster _broadcaster; + private readonly WorkerConfig _cfg; + private readonly ILogger _logger; + + public TaskRunner( + IClaudeProcess claude, + TaskRepository taskRepo, + ListRepository listRepo, + HubBroadcaster broadcaster, + WorkerConfig cfg, + ILogger logger) + { + _claude = claude; + _taskRepo = taskRepo; + _listRepo = listRepo; + _broadcaster = broadcaster; + _cfg = cfg; + _logger = logger; + } + + public async Task RunAsync(Data.Models.TaskEntity task, string slot, CancellationToken ct) + { + try + { + var list = await _listRepo.GetByIdAsync(task.ListId, ct); + if (list is null) + { + await MarkFailed(task.Id, slot, "List not found."); + return; + } + + // Slice D: worktree mode not yet implemented. + if (list.WorkingDir is not null) + { + await MarkFailed(task.Id, slot, "Worktree mode not implemented yet (Slice E)"); + return; + } + + // Non-worktree sandbox path. + var sandboxDir = Path.Combine(_cfg.SandboxRoot, task.Id); + Directory.CreateDirectory(sandboxDir); + + var logPath = Path.Combine(_cfg.LogRoot, $"{task.Id}.ndjson"); + + await _taskRepo.SetLogPathAsync(task.Id, logPath, ct); + var now = DateTime.UtcNow; + await _taskRepo.MarkRunningAsync(task.Id, now, ct); + await _broadcaster.TaskStarted(slot, task.Id, now); + + // Build prompt. + var prompt = string.IsNullOrWhiteSpace(task.Description) + ? task.Title + : $"{task.Title}\n\n{task.Description.Trim()}"; + + await using var logWriter = new LogWriter(logPath); + + var result = await _claude.RunAsync( + prompt, + sandboxDir, + logPath, + task.Id, + async line => + { + await logWriter.WriteLineAsync(line, ct); + await _broadcaster.TaskMessage(task.Id, line); + }, + ct); + + var finishedAt = DateTime.UtcNow; + + if (result.IsSuccess) + { + await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct); + await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); + _logger.LogInformation("Task {TaskId} completed successfully", task.Id); + } + else + { + await _taskRepo.MarkFailedAsync(task.Id, finishedAt, result.ErrorMarkdown, ct); + await _broadcaster.TaskFinished(slot, task.Id, "failed", finishedAt); + _logger.LogWarning("Task {TaskId} failed: {Error}", task.Id, result.ErrorMarkdown); + } + + await _broadcaster.TaskUpdated(task.Id); + } + catch (OperationCanceledException) + { + _logger.LogInformation("Task {TaskId} was cancelled", task.Id); + await MarkFailed(task.Id, slot, "Task cancelled."); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled exception running task {TaskId}", task.Id); + await MarkFailed(task.Id, slot, $"Unhandled error: {ex.Message}"); + } + } + + private async Task MarkFailed(string taskId, string slot, string error) + { + try + { + var now = DateTime.UtcNow; + await _taskRepo.MarkFailedAsync(taskId, now, error); + await _broadcaster.TaskFinished(slot, taskId, "failed", now); + await _broadcaster.TaskUpdated(taskId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to mark task {TaskId} as failed", taskId); + } + } +} diff --git a/src/ClaudeDo.Worker/Services/QueueService.cs b/src/ClaudeDo.Worker/Services/QueueService.cs new file mode 100644 index 0000000..19f8bed --- /dev/null +++ b/src/ClaudeDo.Worker/Services/QueueService.cs @@ -0,0 +1,162 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Runner; + +namespace ClaudeDo.Worker.Services; + +public sealed class QueueSlotState +{ + public required string TaskId { get; init; } + public required DateTime StartedAt { get; init; } + public required CancellationTokenSource Cts { get; init; } +} + +public sealed class QueueService : BackgroundService +{ + private readonly TaskRepository _taskRepo; + private readonly TaskRunner _runner; + private readonly WorkerConfig _cfg; + private readonly ILogger _logger; + + private readonly object _lock = new(); + private volatile QueueSlotState? _queueSlot; + private volatile QueueSlotState? _overrideSlot; + + private readonly SemaphoreSlim _wakeSignal = new(0, 1); + + public QueueService( + TaskRepository taskRepo, + TaskRunner runner, + WorkerConfig cfg, + ILogger logger) + { + _taskRepo = taskRepo; + _runner = runner; + _cfg = cfg; + _logger = logger; + } + + public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive() + { + var list = new List<(string, string, DateTime)>(); + var q = _queueSlot; + if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt)); + var o = _overrideSlot; + if (o is not null) list.Add(("override", o.TaskId, o.StartedAt)); + return list; + } + + public void WakeQueue() + { + // Release if not already signalled. + try { _wakeSignal.Release(); } + catch (SemaphoreFullException) { /* already signalled */ } + } + + public async Task RunNow(string taskId) + { + var task = await _taskRepo.GetByIdAsync(taskId); + if (task is null) + throw new KeyNotFoundException($"Task '{taskId}' not found."); + + lock (_lock) + { + if (_overrideSlot is not null) + throw new InvalidOperationException("override slot busy"); + + var cts = new CancellationTokenSource(); + _overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts }; + + _ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ => + { + lock (_lock) { _overrideSlot = null; } + }, TaskScheduler.Default); + } + } + + public bool CancelTask(string taskId) + { + lock (_lock) + { + if (_queueSlot is not null && _queueSlot.TaskId == taskId) + { + _queueSlot.Cts.Cancel(); + return true; + } + if (_overrideSlot is not null && _overrideSlot.TaskId == taskId) + { + _overrideSlot.Cts.Cancel(); + return true; + } + } + return false; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("QueueService started"); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(_cfg.QueueBackstopIntervalMs)); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + // Wait for wake signal or backstop timer. + var wakeTask = _wakeSignal.WaitAsync(stoppingToken); + var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask(); + + await Task.WhenAny(wakeTask, timerTask); + + // Drain wake signal if it fired. + if (wakeTask.IsCompletedSuccessfully) + { + // Good — signal consumed. + } + + if (_queueSlot is not null) continue; + + var task = await _taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken); + if (task is null) continue; + + lock (_lock) + { + if (_queueSlot is not null) continue; + + var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + _queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts }; + + _ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ => + { + lock (_lock) { _queueSlot = null; } + WakeQueue(); // Check for next task immediately. + }, TaskScheduler.Default); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "QueueService loop error"); + } + } + + _logger.LogInformation("QueueService stopping"); + } + + private async Task RunInSlotAsync(TaskEntity task, string slot, CancellationToken ct) + { + try + { + _logger.LogInformation("Starting task {TaskId} in {Slot} slot", task.Id, slot); + await _runner.RunAsync(task, slot, ct); + } + catch (Exception ex) + { + _logger.LogError(ex, "Slot runner error for task {TaskId}", task.Id); + } + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Runner/MessageParserTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/MessageParserTests.cs new file mode 100644 index 0000000..56b4faf --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Runner/MessageParserTests.cs @@ -0,0 +1,53 @@ +using ClaudeDo.Worker.Runner; + +namespace ClaudeDo.Worker.Tests.Runner; + +public sealed class MessageParserTests +{ + [Fact] + public void WellFormed_Result_Line_Extracts_Result() + { + var line = """{"type":"result","result":"Hello **world**"}"""; + Assert.True(MessageParser.TryExtractResult(line, out var result)); + Assert.Equal("Hello **world**", result); + } + + [Fact] + public void Non_Result_Type_Returns_False() + { + var line = """{"type":"assistant","message":"hi"}"""; + Assert.False(MessageParser.TryExtractResult(line, out var result)); + Assert.Null(result); + } + + [Fact] + public void Missing_Type_Property_Returns_False() + { + var line = """{"result":"data"}"""; + Assert.False(MessageParser.TryExtractResult(line, out var result)); + Assert.Null(result); + } + + [Fact] + public void Malformed_Json_Returns_False_No_Throw() + { + var line = "this is not json {{{"; + Assert.False(MessageParser.TryExtractResult(line, out var result)); + Assert.Null(result); + } + + [Fact] + public void Empty_Line_Returns_False() + { + Assert.False(MessageParser.TryExtractResult("", out _)); + Assert.False(MessageParser.TryExtractResult(" ", out _)); + } + + [Fact] + public void Null_Result_Value_Returns_True_With_Null() + { + var line = """{"type":"result","result":null}"""; + Assert.True(MessageParser.TryExtractResult(line, out var result)); + Assert.Null(result); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs new file mode 100644 index 0000000..7a4e7e6 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs @@ -0,0 +1,278 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Services; + +public sealed class QueueServiceTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly TaskRepository _taskRepo; + private readonly ListRepository _listRepo; + private readonly TagRepository _tagRepo; + private readonly WorkerConfig _cfg; + private readonly string _tempDir; + + public QueueServiceTests() + { + _taskRepo = new TaskRepository(_db.Factory); + _listRepo = new ListRepository(_db.Factory); + _tagRepo = new TagRepository(_db.Factory); + _tempDir = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}"); + Directory.CreateDirectory(_tempDir); + _cfg = new WorkerConfig + { + SandboxRoot = Path.Combine(_tempDir, "sandbox"), + LogRoot = Path.Combine(_tempDir, "logs"), + QueueBackstopIntervalMs = 50, // fast for tests + }; + } + + public void Dispose() + { + _db.Dispose(); + try { Directory.Delete(_tempDir, true); } catch { } + } + + private (QueueService service, FakeClaudeProcess fakeProcess) CreateService( + Func, CancellationToken, Task>? handler = null) + { + var fake = new FakeClaudeProcess(handler); + var broadcaster = new HubBroadcaster(new FakeHubContext()); + var runner = new TaskRunner(fake, _taskRepo, _listRepo, broadcaster, _cfg, + NullLogger.Instance); + var service = new QueueService(_taskRepo, runner, _cfg, NullLogger.Instance); + return (service, fake); + } + + private async Task<(string listId, long agentTagId)> SeedListWithAgentTag() + { + var listId = Guid.NewGuid().ToString(); + await _listRepo.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow }); + + var tags = await _tagRepo.GetAllAsync(); + var agentTag = tags.First(t => t.Name == "agent"); + await _listRepo.AddTagAsync(listId, agentTag.Id); + return (listId, agentTag.Id); + } + + private async Task SeedQueuedTask(string listId, DateTime? scheduledFor = null, DateTime? createdAt = null) + { + var task = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "Test task", + Description = "Do something", + Status = TaskStatus.Queued, + ScheduledFor = scheduledFor, + CreatedAt = createdAt ?? DateTime.UtcNow, + }; + await _taskRepo.AddAsync(task); + return task; + } + + [Fact] + public async Task RunNow_Throws_When_Override_Slot_Busy() + { + var (listId, _) = await SeedListWithAgentTag(); + var tcs = new TaskCompletionSource(); + + var (service, _) = CreateService((_, _, _, _, _, ct) => tcs.Task); + + var task1 = await SeedQueuedTask(listId); + var task2 = await SeedQueuedTask(listId); + + await service.RunNow(task1.Id); + + var ex = await Assert.ThrowsAsync(() => service.RunNow(task2.Id)); + Assert.Equal("override slot busy", ex.Message); + + tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); + } + + [Fact] + public async Task RunNow_Throws_For_Unknown_Task() + { + var (service, _) = CreateService(); + await Assert.ThrowsAsync(() => service.RunNow("nonexistent")); + } + + [Fact] + public async Task Schedule_Filter_Skips_Future_Tasks() + { + var (listId, _) = await SeedListWithAgentTag(); + await SeedQueuedTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); + + var (service, fake) = CreateService((_, _, _, _, _, _) => + Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); + + using var cts = new CancellationTokenSource(); + + // Start the service loop, wake it, give it time. + await service.StartAsync(cts.Token); + service.WakeQueue(); + await Task.Delay(200); + cts.Cancel(); + + // The fake should never have been called because the task is scheduled in the future. + Assert.Equal(0, fake.CallCount); + } + + [Fact] + public async Task Queue_FIFO_Sequentiality() + { + var (listId, _) = await SeedListWithAgentTag(); + + var order = new List(); + var gate1 = new TaskCompletionSource(); + var gate2 = new TaskCompletionSource(); + var callCount = 0; + + var (service, _) = CreateService(async (prompt, _, _, taskId, _, ct) => + { + var n = Interlocked.Increment(ref callCount); + lock (order) { order.Add(taskId); } + if (n == 1) await gate1.Task; + if (n == 2) gate2.SetResult(); + return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; + }); + + var task1 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-2)); + var task2 = await SeedQueuedTask(listId, createdAt: DateTime.UtcNow.AddSeconds(-1)); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + service.WakeQueue(); + + await Task.Delay(200); + + // Only task1 should be running (task2 waiting). + Assert.Single(order); + Assert.Equal(task1.Id, order[0]); + + // Release first task. + gate1.SetResult(); + + // Wait for second task to complete. + await gate2.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Equal(2, order.Count); + Assert.Equal(task2.Id, order[1]); + + cts.Cancel(); + } + + [Fact] + public async Task CancelTask_Triggers_Cancellation() + { + var (listId, _) = await SeedListWithAgentTag(); + + var running = new TaskCompletionSource(); + var cancelled = false; + + var (service, _) = CreateService(async (_, _, _, _, _, ct) => + { + running.SetResult(); + try + { + await Task.Delay(Timeout.Infinite, ct); + } + catch (OperationCanceledException) + { + cancelled = true; + throw; + } + return new RunResult { ExitCode = 0, ResultMarkdown = "ok" }; + }); + + var task = await SeedQueuedTask(listId); + await service.RunNow(task.Id); + + await running.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + var result = service.CancelTask(task.Id); + Assert.True(result); + + await Task.Delay(200); + Assert.True(cancelled); + } + + [Fact] + public async Task GetActive_Returns_Running_Slots() + { + var (listId, _) = await SeedListWithAgentTag(); + var tcs = new TaskCompletionSource(); + + var (service, _) = CreateService((_, _, _, _, _, _) => tcs.Task); + + var task = await SeedQueuedTask(listId); + await service.RunNow(task.Id); + + var active = service.GetActive(); + Assert.Single(active); + Assert.Equal("override", active[0].slot); + Assert.Equal(task.Id, active[0].taskId); + + tcs.SetResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" }); + } +} + +#region Test doubles + +internal sealed class FakeClaudeProcess : IClaudeProcess +{ + private readonly Func, CancellationToken, Task> _handler; + private int _callCount; + + public int CallCount => _callCount; + + public FakeClaudeProcess( + Func, CancellationToken, Task>? handler = null) + { + _handler = handler ?? ((_, _, _, _, _, _) => + Task.FromResult(new RunResult { ExitCode = 0, ResultMarkdown = "ok" })); + } + + public async Task RunAsync(string prompt, string workingDirectory, string logPath, string taskId, + Func onStdoutLine, CancellationToken ct) + { + Interlocked.Increment(ref _callCount); + return await _handler(prompt, workingDirectory, logPath, taskId, onStdoutLine, ct); + } +} + +internal sealed class FakeHubContext : IHubContext +{ + public IHubClients Clients { get; } = new FakeHubClients(); + public IGroupManager Groups => throw new NotImplementedException(); +} + +internal sealed class FakeHubClients : IHubClients +{ + private readonly FakeClientProxy _proxy = new(); + public IClientProxy All => _proxy; + public IClientProxy AllExcept(IReadOnlyList excludedConnectionIds) => _proxy; + public IClientProxy Client(string connectionId) => _proxy; + public IClientProxy Clients(IReadOnlyList connectionIds) => _proxy; + public IClientProxy Group(string groupName) => _proxy; + public IClientProxy GroupExcept(string groupName, IReadOnlyList excludedConnectionIds) => _proxy; + public IClientProxy Groups(IReadOnlyList groupNames) => _proxy; + public IClientProxy User(string userId) => _proxy; + public IClientProxy Users(IReadOnlyList userIds) => _proxy; +} + +internal sealed class FakeClientProxy : IClientProxy +{ + public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) => + Task.CompletedTask; +} + +#endregion