using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; namespace ClaudeDo.Worker.Runner; public sealed class TaskRunner { private readonly IClaudeProcess _claude; private readonly TaskRepository _taskRepo; private readonly TaskRunRepository _runRepo; private readonly ListRepository _listRepo; private readonly WorktreeRepository _wtRepo; private readonly SubtaskRepository _subtaskRepo; private readonly HubBroadcaster _broadcaster; private readonly WorktreeManager _wtManager; private readonly ClaudeArgsBuilder _argsBuilder; private readonly WorkerConfig _cfg; private readonly ILogger _logger; public TaskRunner( IClaudeProcess claude, TaskRepository taskRepo, TaskRunRepository runRepo, ListRepository listRepo, WorktreeRepository wtRepo, SubtaskRepository subtaskRepo, HubBroadcaster broadcaster, WorktreeManager wtManager, ClaudeArgsBuilder argsBuilder, WorkerConfig cfg, ILogger logger) { _claude = claude; _taskRepo = taskRepo; _runRepo = runRepo; _listRepo = listRepo; _wtRepo = wtRepo; _subtaskRepo = subtaskRepo; _broadcaster = broadcaster; _wtManager = wtManager; _argsBuilder = argsBuilder; _cfg = cfg; _logger = logger; } public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) { try { var list = await _listRepo.GetByIdAsync(task.ListId, ct); if (list is null) { await MarkFailed(task.Id, slot, "List not found."); return; } // Determine working directory: worktree or sandbox. WorktreeContext? wtCtx = null; string runDir; if (list.WorkingDir is not null) { try { wtCtx = await _wtManager.CreateAsync(task, list, ct); runDir = wtCtx.WorktreePath; } catch (Exception ex) { _logger.LogError(ex, "Failed to create worktree for task {TaskId}", task.Id); await MarkFailed(task.Id, slot, $"Worktree creation failed: {ex.Message}"); return; } } else { runDir = Path.Combine(_cfg.SandboxRoot, task.Id); Directory.CreateDirectory(runDir); } // Resolve config: task overrides > list config > null. var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct); var resolvedConfig = new ClaudeRunConfig( Model: task.Model ?? listConfig?.Model ?? "claude-sonnet-4-6", SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt, AgentPath: task.AgentPath ?? listConfig?.AgentPath, ResumeSessionId: null ); var now = DateTime.UtcNow; await _taskRepo.MarkRunningAsync(task.Id, now, ct); await _broadcaster.TaskStarted(slot, task.Id, now); // Build prompt. var subtasks = await _subtaskRepo.GetByTaskIdAsync(task.Id, ct); var sb = new System.Text.StringBuilder(task.Title); if (!string.IsNullOrWhiteSpace(task.Description)) sb.Append("\n\n").Append(task.Description.Trim()); if (subtasks.Count > 0) { sb.Append("\n\n## Sub-Tasks\n"); foreach (var s in subtasks) sb.Append(s.Completed ? "- [x] " : "- [ ] ").Append(s.Title).Append('\n'); } var prompt = sb.ToString(); // Run 1. var result = await RunOnceAsync(task.Id, slot, runDir, resolvedConfig, 1, false, prompt, ct); if (result.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, result, ct); } else { // Auto-retry: one attempt if we have a session ID. if (result.SessionId is not null) { _logger.LogInformation("Auto-retrying task {TaskId} with session {SessionId}", task.Id, result.SessionId); var retryConfig = resolvedConfig with { ResumeSessionId = result.SessionId }; var retryPrompt = $"The previous attempt failed with:\n\n{result.ErrorMarkdown}\n\nTry again and fix the issues."; await _broadcaster.RunCreated(task.Id, 2, true); var retryResult = await RunOnceAsync(task.Id, slot, runDir, retryConfig, 2, true, retryPrompt, ct); if (retryResult.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, retryResult, ct); } else { await HandleFailure(task.Id, slot, retryResult); } } else { await HandleFailure(task.Id, slot, result); } } await _broadcaster.TaskUpdated(task.Id); } catch (OperationCanceledException) { _logger.LogInformation("Task {TaskId} was cancelled", task.Id); await MarkFailed(task.Id, slot, "Task cancelled."); } catch (Exception ex) { _logger.LogError(ex, "Unhandled exception running task {TaskId}", task.Id); await MarkFailed(task.Id, slot, $"Unhandled error: {ex.Message}"); } } public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct) { var task = await _taskRepo.GetByIdAsync(taskId, ct) ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); var lastRun = await _runRepo.GetLatestByTaskIdAsync(taskId, ct) ?? throw new InvalidOperationException("No previous run to continue."); if (lastRun.SessionId is null) throw new InvalidOperationException("Previous run has no session ID — cannot resume."); var list = await _listRepo.GetByIdAsync(task.ListId, ct) ?? throw new InvalidOperationException("List not found."); var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct); var resolvedConfig = new ClaudeRunConfig( Model: task.Model ?? listConfig?.Model, SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt, AgentPath: task.AgentPath ?? listConfig?.AgentPath, ResumeSessionId: lastRun.SessionId ); // Determine run directory from existing worktree or sandbox. string runDir; WorktreeContext? wtCtx = null; var worktree = await _wtRepo.GetByTaskIdAsync(taskId, ct); if (worktree is not null) { runDir = worktree.Path; wtCtx = new WorktreeContext(worktree.Path, worktree.BranchName, worktree.BaseCommit); } else { runDir = Path.Combine(_cfg.SandboxRoot, taskId); } var now = DateTime.UtcNow; await _taskRepo.MarkRunningAsync(taskId, now, ct); await _broadcaster.TaskStarted(slot, taskId, now); var nextRunNumber = lastRun.RunNumber + 1; await _broadcaster.RunCreated(taskId, nextRunNumber, false); var result = await RunOnceAsync(taskId, slot, runDir, resolvedConfig, nextRunNumber, false, followUpPrompt, ct); if (result.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, result, ct); } else { await HandleFailure(taskId, slot, result); } await _broadcaster.TaskUpdated(taskId); } private async Task RunOnceAsync( string taskId, string slot, string runDir, ClaudeRunConfig config, int runNumber, bool isRetry, string prompt, CancellationToken ct) { var runId = Guid.NewGuid().ToString(); var logPath = Path.Combine(_cfg.LogRoot, $"{taskId}_run{runNumber}.ndjson"); var run = new TaskRunEntity { Id = runId, TaskId = taskId, RunNumber = runNumber, IsRetry = isRetry, Prompt = prompt, LogPath = logPath, StartedAt = DateTime.UtcNow, }; await _runRepo.AddAsync(run, ct); var arguments = _argsBuilder.Build(config); await using var logWriter = new LogWriter(logPath); try { var result = await _claude.RunAsync( arguments, prompt, runDir, async line => { await logWriter.WriteLineAsync(line, ct); await _broadcaster.TaskMessage(taskId, line); }, ct); // Update the run record with results. Use CancellationToken.None: // this is a terminal write that must always complete, even if the // caller's token is already cancelled. run.SessionId = result.SessionId; run.ResultMarkdown = result.ResultMarkdown; run.StructuredOutputJson = result.StructuredOutputJson; run.ErrorMarkdown = result.ErrorMarkdown; run.ExitCode = result.ExitCode; run.TurnCount = result.TurnCount; run.TokensIn = result.TokensIn; run.TokensOut = result.TokensOut; run.FinishedAt = DateTime.UtcNow; await _runRepo.UpdateAsync(run, CancellationToken.None); // Update denormalized fields on the task. await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None); return result; } catch (OperationCanceledException) { // Ensure the run row is completed so ContinueAsync / inspection // isn't left staring at a null session_id / finished_at. run.ErrorMarkdown = "Cancelled."; run.ExitCode = -1; run.FinishedAt = DateTime.UtcNow; try { await _runRepo.UpdateAsync(run, CancellationToken.None); await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None); } catch (Exception updateEx) { _logger.LogError(updateEx, "Failed to finalize cancelled run {RunId} for task {TaskId}", runId, taskId); } throw; } } private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct) { if (wtCtx is not null) { var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct); if (committed) await _broadcaster.WorktreeUpdated(task.Id); } // Terminal DB write uses CancellationToken.None so the task status // is never left as 'running' because of a cancel that arrived // after the Claude run already succeeded. var finishedAt = DateTime.UtcNow; await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", task.Id, result.TurnCount, result.TokensIn, result.TokensOut); } private async Task HandleFailure(string taskId, string slot, RunResult result) { // Intentionally does not accept a CancellationToken: this is the // terminal write for a failed task and must always be persisted. var finishedAt = DateTime.UtcNow; await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None); await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown); } private async Task MarkFailed(string taskId, string slot, string error) { try { var now = DateTime.UtcNow; // Terminal write — never cancel. await _taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None); await _broadcaster.TaskFinished(slot, taskId, "failed", now); await _broadcaster.TaskUpdated(taskId); } catch (Exception ex) { _logger.LogError(ex, "Failed to mark task {TaskId} as failed", taskId); } } }