From 76473dd92a2c5b6045fe1f6798a409b6b4db80c7 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Tue, 14 Apr 2026 14:02:57 +0200 Subject: [PATCH] refactor(worker): rewrite TaskRunner with config resolution, retry, and continue support --- src/ClaudeDo.Worker/Runner/TaskRunner.cs | 218 ++++++++++++++++++----- 1 file changed, 178 insertions(+), 40 deletions(-) diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index cab7436..e7ba4f8 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -1,3 +1,4 @@ +using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; @@ -8,31 +9,40 @@ public sealed class TaskRunner { private readonly IClaudeProcess _claude; private readonly TaskRepository _taskRepo; + private readonly TaskRunRepository _runRepo; private readonly ListRepository _listRepo; + private readonly WorktreeRepository _wtRepo; private readonly HubBroadcaster _broadcaster; private readonly WorktreeManager _wtManager; + private readonly ClaudeArgsBuilder _argsBuilder; private readonly WorkerConfig _cfg; private readonly ILogger _logger; public TaskRunner( IClaudeProcess claude, TaskRepository taskRepo, + TaskRunRepository runRepo, ListRepository listRepo, + WorktreeRepository wtRepo, HubBroadcaster broadcaster, WorktreeManager wtManager, + ClaudeArgsBuilder argsBuilder, WorkerConfig cfg, ILogger logger) { _claude = claude; _taskRepo = taskRepo; + _runRepo = runRepo; _listRepo = listRepo; + _wtRepo = wtRepo; _broadcaster = broadcaster; _wtManager = wtManager; + _argsBuilder = argsBuilder; _cfg = cfg; _logger = logger; } - public async Task RunAsync(Data.Models.TaskEntity task, string slot, CancellationToken ct) + public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) { try { @@ -63,64 +73,60 @@ public sealed class TaskRunner } else { - // Non-worktree sandbox path. runDir = Path.Combine(_cfg.SandboxRoot, task.Id); Directory.CreateDirectory(runDir); } - var logPath = Path.Combine(_cfg.LogRoot, $"{task.Id}.ndjson"); + // Resolve config: task overrides > list config > null. + var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct); + var resolvedConfig = new ClaudeRunConfig( + Model: task.Model ?? listConfig?.Model, + SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt, + AgentPath: task.AgentPath ?? listConfig?.AgentPath, + ResumeSessionId: null + ); - await _taskRepo.SetLogPathAsync(task.Id, logPath, ct); var now = DateTime.UtcNow; await _taskRepo.MarkRunningAsync(task.Id, now, ct); await _broadcaster.TaskStarted(slot, task.Id, now); - // Build prompt and arguments. + // Build prompt. var prompt = string.IsNullOrWhiteSpace(task.Description) ? task.Title : $"{task.Title}\n\n{task.Description.Trim()}"; - var arguments = new ClaudeArgsBuilder().Build(new ClaudeRunConfig( - Model: null, - SystemPrompt: null, - AgentPath: null, - ResumeSessionId: null)); - - await using var logWriter = new LogWriter(logPath); - - var result = await _claude.RunAsync( - arguments, - prompt, - runDir, - async line => - { - await logWriter.WriteLineAsync(line, ct); - await _broadcaster.TaskMessage(task.Id, line); - }, - ct); - - var finishedAt = DateTime.UtcNow; + // Run 1. + var result = await RunOnceAsync(task.Id, slot, runDir, resolvedConfig, 1, false, prompt, ct); if (result.IsSuccess) { - // Auto-commit if worktree mode and run succeeded. - if (wtCtx is not null) - { - var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct); - if (committed) - await _broadcaster.WorktreeUpdated(task.Id); - } - - await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct); - await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); - _logger.LogInformation("Task {TaskId} completed successfully", task.Id); + await HandleSuccess(task, list, slot, wtCtx, result, ct); } else { - // Failed run: do NOT commit. Worktree row stays active for inspection. - await _taskRepo.MarkFailedAsync(task.Id, finishedAt, result.ErrorMarkdown, ct); - await _broadcaster.TaskFinished(slot, task.Id, "failed", finishedAt); - _logger.LogWarning("Task {TaskId} failed: {Error}", task.Id, result.ErrorMarkdown); + // Auto-retry: one attempt if we have a session ID. + if (result.SessionId is not null) + { + _logger.LogInformation("Auto-retrying task {TaskId} with session {SessionId}", task.Id, result.SessionId); + var retryConfig = resolvedConfig with { ResumeSessionId = result.SessionId }; + var retryPrompt = $"The previous attempt failed with:\n\n{result.ErrorMarkdown}\n\nTry again and fix the issues."; + + await _broadcaster.RunCreated(task.Id, 2, true); + var retryResult = await RunOnceAsync(task.Id, slot, runDir, retryConfig, 2, true, retryPrompt, ct); + + if (retryResult.IsSuccess) + { + await HandleSuccess(task, list, slot, wtCtx, retryResult, ct); + } + else + { + await HandleFailure(task.Id, slot, retryResult); + } + } + else + { + await HandleFailure(task.Id, slot, result); + } } await _broadcaster.TaskUpdated(task.Id); @@ -137,6 +143,138 @@ public sealed class TaskRunner } } + public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct) + { + var task = await _taskRepo.GetByIdAsync(taskId, ct) + ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); + + var lastRun = await _runRepo.GetLatestByTaskIdAsync(taskId, ct) + ?? throw new InvalidOperationException("No previous run to continue."); + + if (lastRun.SessionId is null) + throw new InvalidOperationException("Previous run has no session ID — cannot resume."); + + var list = await _listRepo.GetByIdAsync(task.ListId, ct) + ?? throw new InvalidOperationException("List not found."); + + var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct); + var resolvedConfig = new ClaudeRunConfig( + Model: task.Model ?? listConfig?.Model, + SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt, + AgentPath: task.AgentPath ?? listConfig?.AgentPath, + ResumeSessionId: lastRun.SessionId + ); + + // Determine run directory from existing worktree or sandbox. + string runDir; + WorktreeContext? wtCtx = null; + var worktree = await _wtRepo.GetByTaskIdAsync(taskId, ct); + if (worktree is not null) + { + runDir = worktree.Path; + wtCtx = new WorktreeContext(worktree.Path, worktree.BranchName, worktree.BaseCommit); + } + else + { + runDir = Path.Combine(_cfg.SandboxRoot, taskId); + } + + var now = DateTime.UtcNow; + await _taskRepo.MarkRunningAsync(taskId, now, ct); + await _broadcaster.TaskStarted(slot, taskId, now); + + var nextRunNumber = lastRun.RunNumber + 1; + await _broadcaster.RunCreated(taskId, nextRunNumber, false); + var result = await RunOnceAsync(taskId, slot, runDir, resolvedConfig, nextRunNumber, false, followUpPrompt, ct); + + if (result.IsSuccess) + { + await HandleSuccess(task, list, slot, wtCtx, result, ct); + } + else + { + await HandleFailure(taskId, slot, result); + } + + await _broadcaster.TaskUpdated(taskId); + } + + private async Task RunOnceAsync( + string taskId, string slot, string runDir, ClaudeRunConfig config, + int runNumber, bool isRetry, string prompt, CancellationToken ct) + { + var runId = Guid.NewGuid().ToString(); + var logPath = Path.Combine(_cfg.LogRoot, $"{taskId}_run{runNumber}.ndjson"); + + var run = new TaskRunEntity + { + Id = runId, + TaskId = taskId, + RunNumber = runNumber, + IsRetry = isRetry, + Prompt = prompt, + LogPath = logPath, + StartedAt = DateTime.UtcNow, + }; + await _runRepo.AddAsync(run, ct); + + var arguments = _argsBuilder.Build(config); + + await using var logWriter = new LogWriter(logPath); + + var result = await _claude.RunAsync( + arguments, + prompt, + runDir, + async line => + { + await logWriter.WriteLineAsync(line, ct); + await _broadcaster.TaskMessage(taskId, line); + }, + ct); + + // Update the run record with results. + run.SessionId = result.SessionId; + run.ResultMarkdown = result.ResultMarkdown; + run.StructuredOutputJson = result.StructuredOutputJson; + run.ErrorMarkdown = result.ErrorMarkdown; + run.ExitCode = result.ExitCode; + run.TurnCount = result.TurnCount; + run.TokensIn = result.TokensIn; + run.TokensOut = result.TokensOut; + run.FinishedAt = DateTime.UtcNow; + await _runRepo.UpdateAsync(run, ct); + + // Update denormalized fields on the task. + await _taskRepo.SetLogPathAsync(taskId, logPath, ct); + + return result; + } + + private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct) + { + if (wtCtx is not null) + { + var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct); + if (committed) + await _broadcaster.WorktreeUpdated(task.Id); + } + + var finishedAt = DateTime.UtcNow; + await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct); + await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); + _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", + task.Id, result.TurnCount, result.TokensIn, result.TokensOut); + } + + private async Task HandleFailure(string taskId, string slot, RunResult result) + { + var finishedAt = DateTime.UtcNow; + await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown); + await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); + _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown); + } + private async Task MarkFailed(string taskId, string slot, string error) { try