feat(worker): add claude-cli runner, queue service, and hub api

Runner stack (non-worktree path): IClaudeProcess + ClaudeProcess spawning the
CLI with --output-format stream-json, prompt via stdin, parses the final
type:"result" line into RunResult. LogWriter appends ndjson to
~/.todo-app/logs/<taskId>.ndjson. TaskRunner orchestrates DB transitions
(MarkRunning -> MarkDone/Failed) and pushes TaskStarted/Message/Finished/
Updated via HubBroadcaster. Worktree-backed lists short-circuit with a
"Slice E" failure message until git support lands.

QueueService (BackgroundService) holds two in-memory slots (_queueSlot +
_overrideSlot) guarded by a lock. Uses PeriodicTimer + SemaphoreSlim wake
signal so WakeQueue() triggers an instant pickup. RunNow throws
InvalidOperationException when override busy; CancelTask cancels the linked
CTS which kills the child process tree.

WorkerHub extended with GetActive, RunNow (translated to HubException
variants), CancelTask, WakeQueue. HubBroadcaster exposes typed push methods.

Tests: 26 pass (12 new). QueueServiceTests cover override-busy,
schedule-filter, FIFO sequentiality, cancellation, plus a FakeClaudeProcess
that blocks on a TCS for deterministic slot-state assertions.
MessageParserTests cover result extraction + malformed/non-result lines.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-04-13 12:14:00 +02:00
parent 9f51ff0b17
commit e5038d7e16
14 changed files with 884 additions and 4 deletions

View File

@@ -0,0 +1,25 @@
using Microsoft.AspNetCore.SignalR;
namespace ClaudeDo.Worker.Hub;
public sealed class HubBroadcaster
{
private readonly IHubContext<WorkerHub> _hub;
public HubBroadcaster(IHubContext<WorkerHub> hub) => _hub = hub;
public Task TaskStarted(string slot, string taskId, DateTime startedAt) =>
_hub.Clients.All.SendAsync("TaskStarted", slot, taskId, startedAt);
public Task TaskFinished(string slot, string taskId, string status, DateTime finishedAt) =>
_hub.Clients.All.SendAsync("TaskFinished", slot, taskId, status, finishedAt);
public Task TaskMessage(string taskId, string ndjsonLine) =>
_hub.Clients.All.SendAsync("TaskMessage", taskId, ndjsonLine);
public Task WorktreeUpdated(string taskId) =>
_hub.Clients.All.SendAsync("WorktreeUpdated", taskId);
public Task TaskUpdated(string taskId) =>
_hub.Clients.All.SendAsync("TaskUpdated", taskId);
}

View File

@@ -1,16 +1,44 @@
using System.Reflection;
using ClaudeDo.Worker.Services;
using Microsoft.AspNetCore.SignalR;
namespace ClaudeDo.Worker.Hub;
/// <summary>
/// SignalR hub the UI connects to. Only <see cref="Ping"/> is implemented at this stage;
/// RunNow/CancelTask/WakeQueue/GetActive land here once QueueService exists.
/// </summary>
public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
{
private static readonly string Version =
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
private readonly QueueService _queue;
public WorkerHub(QueueService queue) => _queue = queue;
public string Ping() => $"pong v{Version}";
public IReadOnlyList<object> GetActive()
{
return _queue.GetActive()
.Select(a => (object)new { slot = a.slot, taskId = a.taskId, startedAt = a.startedAt })
.ToList();
}
public async Task RunNow(string taskId)
{
try
{
await _queue.RunNow(taskId);
}
catch (InvalidOperationException)
{
throw new HubException("override slot busy");
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
public void WakeQueue() => _queue.WakeQueue();
}

View File

@@ -2,6 +2,7 @@ using ClaudeDo.Data;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Services;
var cfg = WorkerConfig.Load();
@@ -21,6 +22,15 @@ builder.Services.AddSingleton<WorktreeRepository>();
builder.Services.AddHostedService<StaleTaskRecovery>();
builder.Services.AddSignalR();
// Runner stack.
builder.Services.AddSingleton<IClaudeProcess, ClaudeProcess>();
builder.Services.AddSingleton<HubBroadcaster>();
builder.Services.AddSingleton<TaskRunner>();
// QueueService: singleton + hosted service (same instance).
builder.Services.AddSingleton<QueueService>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<QueueService>());
// Loopback-only bind. Firewall is irrelevant for 127.0.0.1.
builder.WebHost.UseUrls($"http://127.0.0.1:{cfg.SignalRPort}");

View File

@@ -0,0 +1,96 @@
using System.Diagnostics;
using System.Text;
using ClaudeDo.Worker.Config;
namespace ClaudeDo.Worker.Runner;
public sealed class ClaudeProcess : IClaudeProcess
{
private readonly WorkerConfig _cfg;
private readonly ILogger<ClaudeProcess> _logger;
public ClaudeProcess(WorkerConfig cfg, ILogger<ClaudeProcess> logger)
{
_cfg = cfg;
_logger = logger;
}
public async Task<RunResult> RunAsync(
string prompt,
string workingDirectory,
string logPath,
string taskId,
Func<string, Task> onStdoutLine,
CancellationToken ct)
{
var psi = new ProcessStartInfo
{
FileName = _cfg.ClaudeBin,
Arguments = "-p --output-format stream-json --verbose --dangerously-skip-permissions",
WorkingDirectory = workingDirectory,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
StandardOutputEncoding = Encoding.UTF8,
StandardErrorEncoding = Encoding.UTF8,
};
using var process = new Process { StartInfo = psi };
process.Start();
// Write prompt to stdin, then close.
await process.StandardInput.WriteAsync(prompt);
process.StandardInput.Close();
string? resultMarkdown = null;
var lastStderr = new StringBuilder();
// Register cancellation to kill the process tree.
await using var ctr = ct.Register(() =>
{
try { process.Kill(entireProcessTree: true); }
catch { /* already exited */ }
});
// Read stdout and stderr concurrently.
var stdoutTask = Task.Run(async () =>
{
while (await process.StandardOutput.ReadLineAsync(ct) is { } line)
{
if (string.IsNullOrEmpty(line)) continue;
await onStdoutLine(line);
if (MessageParser.TryExtractResult(line, out var res))
resultMarkdown = res;
}
}, ct);
var stderrTask = Task.Run(async () =>
{
while (await process.StandardError.ReadLineAsync(ct) is { } line)
{
if (string.IsNullOrEmpty(line)) continue;
lastStderr.AppendLine(line);
await onStdoutLine($"[stderr] {line}");
}
}, ct);
await Task.WhenAll(stdoutTask, stderrTask);
await process.WaitForExitAsync(ct);
var exitCode = process.ExitCode;
if (exitCode == 0 && resultMarkdown is not null)
{
return new RunResult { ExitCode = exitCode, ResultMarkdown = resultMarkdown };
}
var error = lastStderr.Length > 0
? lastStderr.ToString().Trim()
: $"Claude exited with code {exitCode} and no result.";
return new RunResult { ExitCode = exitCode, ErrorMarkdown = error };
}
}

View File

@@ -0,0 +1,12 @@
namespace ClaudeDo.Worker.Runner;
public interface IClaudeProcess
{
Task<RunResult> RunAsync(
string prompt,
string workingDirectory,
string logPath,
string taskId,
Func<string, Task> onStdoutLine,
CancellationToken ct);
}

View File

@@ -0,0 +1,26 @@
namespace ClaudeDo.Worker.Runner;
public sealed class LogWriter : IAsyncDisposable
{
private readonly StreamWriter _writer;
public LogWriter(string filePath)
{
var dir = Path.GetDirectoryName(filePath);
if (dir is not null)
Directory.CreateDirectory(dir);
_writer = new StreamWriter(filePath, append: true) { AutoFlush = true };
}
public async Task WriteLineAsync(string line, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
await _writer.WriteLineAsync(line.AsMemory(), ct);
}
public async ValueTask DisposeAsync()
{
await _writer.DisposeAsync();
}
}

View File

@@ -0,0 +1,33 @@
using System.Text.Json;
namespace ClaudeDo.Worker.Runner;
public static class MessageParser
{
public static bool TryExtractResult(string ndjsonLine, out string? result)
{
result = null;
if (string.IsNullOrWhiteSpace(ndjsonLine))
return false;
try
{
using var doc = JsonDocument.Parse(ndjsonLine);
var root = doc.RootElement;
if (root.TryGetProperty("type", out var typeProp) &&
typeProp.GetString() == "result" &&
root.TryGetProperty("result", out var resultProp))
{
result = resultProp.GetString();
return true;
}
}
catch (JsonException)
{
// Malformed JSON — not a result line.
}
return false;
}
}

View File

@@ -0,0 +1,10 @@
namespace ClaudeDo.Worker.Runner;
public sealed class RunResult
{
public required int ExitCode { get; init; }
public string? ResultMarkdown { get; init; }
public string? ErrorMarkdown { get; init; }
public bool IsSuccess => ExitCode == 0 && ResultMarkdown is not null;
}

View File

@@ -0,0 +1,123 @@
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
namespace ClaudeDo.Worker.Runner;
public sealed class TaskRunner
{
private readonly IClaudeProcess _claude;
private readonly TaskRepository _taskRepo;
private readonly ListRepository _listRepo;
private readonly HubBroadcaster _broadcaster;
private readonly WorkerConfig _cfg;
private readonly ILogger<TaskRunner> _logger;
public TaskRunner(
IClaudeProcess claude,
TaskRepository taskRepo,
ListRepository listRepo,
HubBroadcaster broadcaster,
WorkerConfig cfg,
ILogger<TaskRunner> logger)
{
_claude = claude;
_taskRepo = taskRepo;
_listRepo = listRepo;
_broadcaster = broadcaster;
_cfg = cfg;
_logger = logger;
}
public async Task RunAsync(Data.Models.TaskEntity task, string slot, CancellationToken ct)
{
try
{
var list = await _listRepo.GetByIdAsync(task.ListId, ct);
if (list is null)
{
await MarkFailed(task.Id, slot, "List not found.");
return;
}
// Slice D: worktree mode not yet implemented.
if (list.WorkingDir is not null)
{
await MarkFailed(task.Id, slot, "Worktree mode not implemented yet (Slice E)");
return;
}
// Non-worktree sandbox path.
var sandboxDir = Path.Combine(_cfg.SandboxRoot, task.Id);
Directory.CreateDirectory(sandboxDir);
var logPath = Path.Combine(_cfg.LogRoot, $"{task.Id}.ndjson");
await _taskRepo.SetLogPathAsync(task.Id, logPath, ct);
var now = DateTime.UtcNow;
await _taskRepo.MarkRunningAsync(task.Id, now, ct);
await _broadcaster.TaskStarted(slot, task.Id, now);
// Build prompt.
var prompt = string.IsNullOrWhiteSpace(task.Description)
? task.Title
: $"{task.Title}\n\n{task.Description.Trim()}";
await using var logWriter = new LogWriter(logPath);
var result = await _claude.RunAsync(
prompt,
sandboxDir,
logPath,
task.Id,
async line =>
{
await logWriter.WriteLineAsync(line, ct);
await _broadcaster.TaskMessage(task.Id, line);
},
ct);
var finishedAt = DateTime.UtcNow;
if (result.IsSuccess)
{
await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct);
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed successfully", task.Id);
}
else
{
await _taskRepo.MarkFailedAsync(task.Id, finishedAt, result.ErrorMarkdown, ct);
await _broadcaster.TaskFinished(slot, task.Id, "failed", finishedAt);
_logger.LogWarning("Task {TaskId} failed: {Error}", task.Id, result.ErrorMarkdown);
}
await _broadcaster.TaskUpdated(task.Id);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Task {TaskId} was cancelled", task.Id);
await MarkFailed(task.Id, slot, "Task cancelled.");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled exception running task {TaskId}", task.Id);
await MarkFailed(task.Id, slot, $"Unhandled error: {ex.Message}");
}
}
private async Task MarkFailed(string taskId, string slot, string error)
{
try
{
var now = DateTime.UtcNow;
await _taskRepo.MarkFailedAsync(taskId, now, error);
await _broadcaster.TaskFinished(slot, taskId, "failed", now);
await _broadcaster.TaskUpdated(taskId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to mark task {TaskId} as failed", taskId);
}
}
}

View File

@@ -0,0 +1,162 @@
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Runner;
namespace ClaudeDo.Worker.Services;
public sealed class QueueSlotState
{
public required string TaskId { get; init; }
public required DateTime StartedAt { get; init; }
public required CancellationTokenSource Cts { get; init; }
}
public sealed class QueueService : BackgroundService
{
private readonly TaskRepository _taskRepo;
private readonly TaskRunner _runner;
private readonly WorkerConfig _cfg;
private readonly ILogger<QueueService> _logger;
private readonly object _lock = new();
private volatile QueueSlotState? _queueSlot;
private volatile QueueSlotState? _overrideSlot;
private readonly SemaphoreSlim _wakeSignal = new(0, 1);
public QueueService(
TaskRepository taskRepo,
TaskRunner runner,
WorkerConfig cfg,
ILogger<QueueService> logger)
{
_taskRepo = taskRepo;
_runner = runner;
_cfg = cfg;
_logger = logger;
}
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
{
var list = new List<(string, string, DateTime)>();
var q = _queueSlot;
if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt));
var o = _overrideSlot;
if (o is not null) list.Add(("override", o.TaskId, o.StartedAt));
return list;
}
public void WakeQueue()
{
// Release if not already signalled.
try { _wakeSignal.Release(); }
catch (SemaphoreFullException) { /* already signalled */ }
}
public async Task RunNow(string taskId)
{
var task = await _taskRepo.GetByIdAsync(taskId);
if (task is null)
throw new KeyNotFoundException($"Task '{taskId}' not found.");
lock (_lock)
{
if (_overrideSlot is not null)
throw new InvalidOperationException("override slot busy");
var cts = new CancellationTokenSource();
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
_ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ =>
{
lock (_lock) { _overrideSlot = null; }
}, TaskScheduler.Default);
}
}
public bool CancelTask(string taskId)
{
lock (_lock)
{
if (_queueSlot is not null && _queueSlot.TaskId == taskId)
{
_queueSlot.Cts.Cancel();
return true;
}
if (_overrideSlot is not null && _overrideSlot.TaskId == taskId)
{
_overrideSlot.Cts.Cancel();
return true;
}
}
return false;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("QueueService started");
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(_cfg.QueueBackstopIntervalMs));
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Wait for wake signal or backstop timer.
var wakeTask = _wakeSignal.WaitAsync(stoppingToken);
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
await Task.WhenAny(wakeTask, timerTask);
// Drain wake signal if it fired.
if (wakeTask.IsCompletedSuccessfully)
{
// Good — signal consumed.
}
if (_queueSlot is not null) continue;
var task = await _taskRepo.GetNextQueuedAgentTaskAsync(DateTime.UtcNow, stoppingToken);
if (task is null) continue;
lock (_lock)
{
if (_queueSlot is not null) continue;
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
_ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ =>
{
lock (_lock) { _queueSlot = null; }
WakeQueue(); // Check for next task immediately.
}, TaskScheduler.Default);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "QueueService loop error");
}
}
_logger.LogInformation("QueueService stopping");
}
private async Task RunInSlotAsync(TaskEntity task, string slot, CancellationToken ct)
{
try
{
_logger.LogInformation("Starting task {TaskId} in {Slot} slot", task.Id, slot);
await _runner.RunAsync(task, slot, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Slot runner error for task {TaskId}", task.Id);
}
}
}