using System.Text.Json; using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Runner; public sealed class TaskRunner { private readonly IClaudeProcess _claude; private readonly IDbContextFactory _dbFactory; private readonly HubBroadcaster _broadcaster; private readonly WorktreeManager _wtManager; private readonly ClaudeArgsBuilder _argsBuilder; private readonly WorkerConfig _cfg; private readonly ILogger _logger; private readonly ITaskStateService _state; private readonly TaskRunTokenRegistry _tokens; public TaskRunner( IClaudeProcess claude, IDbContextFactory dbFactory, HubBroadcaster broadcaster, WorktreeManager wtManager, ClaudeArgsBuilder argsBuilder, WorkerConfig cfg, ILogger logger, ITaskStateService state, TaskRunTokenRegistry tokens) { _claude = claude; _dbFactory = dbFactory; _broadcaster = broadcaster; _wtManager = wtManager; _argsBuilder = argsBuilder; _cfg = cfg; _logger = logger; _state = state; _tokens = tokens; } public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) { string? mcpToken = null; string? mcpConfigPath = null; try { ListEntity? list; ListConfigEntity? listConfig; List subtasks; using (var context = _dbFactory.CreateDbContext()) { var listRepo = new ListRepository(context); list = await listRepo.GetByIdAsync(task.ListId, ct); if (list is null) { await MarkFailed(task.Id, task.Title, slot, "List not found."); return; } listConfig = await listRepo.GetConfigAsync(task.ListId, ct); var subtaskRepo = new SubtaskRepository(context); subtasks = await subtaskRepo.GetByTaskIdAsync(task.Id, ct); } // Determine working directory: worktree or sandbox. var prep = await PrepareRunDirectoryAsync(task, list, ct); if (prep.FailureReason is not null) { await MarkFailed(task.Id, task.Title, slot, prep.FailureReason); return; } var wtCtx = prep.WtCtx; var runDir = prep.RunDir!; var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct); // Improvement-eligible runs get a per-run MCP identity so the agent can file // out-of-scope follow-ups via SuggestImprovement. Children and planning runs do not. if (task.ParentTaskId is null && task.PlanningPhase == PlanningPhase.None) { mcpToken = TaskRunTokenRegistry.GenerateToken(); _tokens.Register(mcpToken, task.Id); Directory.CreateDirectory(_cfg.LogRoot); mcpConfigPath = Path.Combine(_cfg.LogRoot, $"{task.Id}_mcp.json"); await File.WriteAllTextAsync(mcpConfigPath, BuildRunMcpConfigJson(mcpToken), ct); resolvedConfig = resolvedConfig with { McpConfigPath = mcpConfigPath, AllowedTools = "mcp__claudedo_run__SuggestImprovement", }; } var now = DateTime.UtcNow; await _state.StartRunningAsync(task.Id, now, ct); await _broadcaster.TaskStarted(slot, task.Id, now); // Build prompt: title + description + only the OPEN sub-tasks (resolved ones are dropped). var prompt = TaskPromptComposer.Compose( task.Title, task.Description, subtasks.Select(s => (s.Title, s.Completed))); // Run 1. var result = await RunOnceAsync(task.Id, task.Title, slot, runDir, resolvedConfig, 1, false, prompt, ct); if (result.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, result, ct); } else { // Auto-retry: one attempt if we have a session ID. if (result.SessionId is not null) { _logger.LogInformation("Auto-retrying task {TaskId} with session {SessionId}", task.Id, result.SessionId); var retryConfig = resolvedConfig with { ResumeSessionId = result.SessionId }; var retryPrompt = BuildRetryPrompt(result.ErrorMarkdown); var retryResult = await RunOnceAsync(task.Id, task.Title, slot, runDir, retryConfig, 2, true, retryPrompt, ct); if (retryResult.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, retryResult, ct); } else { await MarkFailed(task.Id, task.Title, slot, retryResult.ErrorMarkdown, retryResult.TurnCount); } } else { await MarkFailed(task.Id, task.Title, slot, result.ErrorMarkdown, result.TurnCount); } } await _broadcaster.TaskUpdated(task.Id); } catch (OperationCanceledException) { _logger.LogInformation("Task {TaskId} was cancelled", task.Id); await MarkFailed(task.Id, task.Title, slot, "Task cancelled."); } catch (Exception ex) { _logger.LogError(ex, "Unhandled exception running task {TaskId}", task.Id); await MarkFailed(task.Id, task.Title, slot, $"Unhandled error: {ex.Message}"); } finally { if (mcpToken is not null) { _tokens.Unregister(mcpToken); if (mcpConfigPath is not null) try { File.Delete(mcpConfigPath); } catch { /* best effort */ } } } } public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct) { TaskEntity task; TaskRunEntity lastRun; ListEntity list; ListConfigEntity? listConfig; WorktreeEntity? worktree; using (var context = _dbFactory.CreateDbContext()) { var taskRepo = new TaskRepository(context); task = await taskRepo.GetByIdAsync(taskId, ct) ?? throw new KeyNotFoundException($"Task '{taskId}' not found."); var runRepo = new TaskRunRepository(context); lastRun = await runRepo.GetLatestByTaskIdAsync(taskId, ct) ?? throw new InvalidOperationException("No previous run to continue."); if (lastRun.SessionId is null) throw new InvalidOperationException("Previous run has no session ID — cannot resume."); var listRepo = new ListRepository(context); list = await listRepo.GetByIdAsync(task.ListId, ct) ?? throw new InvalidOperationException("List not found."); listConfig = await listRepo.GetConfigAsync(task.ListId, ct); var wtRepo = new WorktreeRepository(context); worktree = await wtRepo.GetByTaskIdAsync(taskId, ct); } var resolvedConfig = await ResolveConfigAsync(task, listConfig, lastRun.SessionId, ct); // Determine run directory from existing worktree or sandbox. string runDir; WorktreeContext? wtCtx = null; if (worktree is not null) { runDir = worktree.Path; wtCtx = new WorktreeContext(worktree.Path, worktree.BranchName, worktree.BaseCommit); } else { runDir = Path.Combine(_cfg.SandboxRoot, taskId); } var now = DateTime.UtcNow; await _state.StartRunningAsync(taskId, now, ct); await _broadcaster.TaskStarted(slot, taskId, now); var nextRunNumber = lastRun.RunNumber + 1; var result = await RunOnceAsync(taskId, task.Title, slot, runDir, resolvedConfig, nextRunNumber, false, followUpPrompt, ct); if (result.IsSuccess) { await HandleSuccess(task, list, slot, wtCtx, result, ct); } else { await MarkFailed(taskId, task.Title, slot, result.ErrorMarkdown, result.TurnCount); } await _broadcaster.TaskUpdated(taskId); } private readonly record struct RunDirResult(string? RunDir, WorktreeContext? WtCtx, string? FailureReason); private async Task PrepareRunDirectoryAsync(TaskEntity task, ListEntity list, CancellationToken ct) { if (list.WorkingDir is not null) { try { var wtCtx = await _wtManager.CreateAsync(task, list, ct); await _broadcaster.WorkerLog($"Created worktree for \"{task.Title}\"", WorkerLogLevel.Info, DateTime.UtcNow); return new RunDirResult(wtCtx.WorktreePath, wtCtx, null); } catch (Exception ex) { _logger.LogError(ex, "Failed to create worktree for task {TaskId}", task.Id); return new RunDirResult(null, null, $"Worktree creation failed: {ex.Message}"); } } var sandboxDir = Path.Combine(_cfg.SandboxRoot, task.Id); Directory.CreateDirectory(sandboxDir); return new RunDirResult(sandboxDir, null, null); } private async Task RunOnceAsync( string taskId, string taskTitle, string slot, string runDir, ClaudeRunConfig config, int runNumber, bool isRetry, string prompt, CancellationToken ct) { var runId = Guid.NewGuid().ToString(); var logPath = Path.Combine(_cfg.LogRoot, $"{taskId}_run{runNumber}.ndjson"); var run = new TaskRunEntity { Id = runId, TaskId = taskId, RunNumber = runNumber, IsRetry = isRetry, Prompt = prompt, LogPath = logPath, StartedAt = DateTime.UtcNow, }; using (var context = _dbFactory.CreateDbContext()) { var runRepo = new TaskRunRepository(context); await runRepo.AddAsync(run, ct); // Point the task at this run's log immediately so the UI can replay // live output when the user navigates away and back mid-run. var taskRepo = new TaskRepository(context); await taskRepo.SetLogPathAsync(taskId, logPath, ct); } await _broadcaster.RunCreated(taskId, runNumber, isRetry); var arguments = _argsBuilder.Build(config); await using var logWriter = new LogWriter(logPath); try { await _broadcaster.WorkerLog($"Started Claude for \"{taskTitle}\"", WorkerLogLevel.Info, DateTime.UtcNow); var result = await _claude.RunAsync( arguments, prompt, runDir, async line => { await logWriter.WriteLineAsync(line, ct); await _broadcaster.TaskMessage(taskId, "[stdout] " + line); }, ct); // Update the run record with results. Use CancellationToken.None: // this is a terminal write that must always complete, even if the // caller's token is already cancelled. run.SessionId = result.SessionId; run.ResultMarkdown = result.ResultMarkdown; run.StructuredOutputJson = result.StructuredOutputJson; run.ErrorMarkdown = result.ErrorMarkdown; run.ExitCode = result.ExitCode; run.TurnCount = result.TurnCount; run.TokensIn = result.TokensIn; run.TokensOut = result.TokensOut; run.FinishedAt = DateTime.UtcNow; using (var context = _dbFactory.CreateDbContext()) { var runRepo = new TaskRunRepository(context); await runRepo.UpdateAsync(run, CancellationToken.None); } return result; } catch (OperationCanceledException) { // Ensure the run row is completed so ContinueAsync / inspection // isn't left staring at a null session_id / finished_at. run.ErrorMarkdown = "Cancelled."; run.ExitCode = -1; run.FinishedAt = DateTime.UtcNow; try { using var context = _dbFactory.CreateDbContext(); var runRepo = new TaskRunRepository(context); await runRepo.UpdateAsync(run, CancellationToken.None); } catch (Exception updateEx) { _logger.LogError(updateEx, "Failed to finalize cancelled run {RunId} for task {TaskId}", runId, taskId); } throw; } } private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct) { if (wtCtx is not null) { var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct); if (committed) { await _broadcaster.WorkerLog($"Committed changes in \"{task.Title}\"", WorkerLogLevel.Info, DateTime.UtcNow); await _broadcaster.WorktreeUpdated(task.Id); } } // Terminal DB write uses CancellationToken.None so the task status // is never left as 'running' because of a cancel that arrived // after the Claude run already succeeded. // Standalone tasks gate on review; planning children go straight to Done // so the sequential chain (which advances on terminal states) is unaffected. // Planning parents (PlanningPhase != None) are containers, not reviewable work. var finishedAt = DateTime.UtcNow; using (var ctx = _dbFactory.CreateDbContext()) { await new TaskRepository(ctx).SetRoadblockCountAsync(task.Id, result.Blocks.Count, CancellationToken.None); } var reviewResult = ComposeReviewResult(result.ResultMarkdown, result.Blocks); bool isStandalone = task.ParentTaskId is null && task.PlanningPhase == PlanningPhase.None; List pendingChildren = new(); if (isStandalone) { using var ctx = _dbFactory.CreateDbContext(); var children = await new TaskRepository(ctx).GetChildrenAsync(task.Id, CancellationToken.None); pendingChildren = children .Where(c => c.Status is TaskStatus.Idle or TaskStatus.Queued) .ToList(); } if (isStandalone && pendingChildren.Count > 0) { await _state.SubmitForChildrenAsync(task.Id, finishedAt, reviewResult, CancellationToken.None); foreach (var child in pendingChildren) await _state.EnqueueAsync(child.Id, CancellationToken.None); await _broadcaster.WorkerLog( $"Finished \"{task.Title}\" (waiting on {pendingChildren.Count} improvement(s))", WorkerLogLevel.Success, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, task.Id, "waiting_for_children", finishedAt); } else if (isStandalone) { await _state.SubmitForReviewAsync(task.Id, finishedAt, reviewResult, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (waiting for review)", WorkerLogLevel.Success, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, task.Id, "waiting_for_review", finishedAt); } else { await _state.CompleteAsync(task.Id, finishedAt, reviewResult, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); } _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", task.Id, result.TurnCount, result.TokensIn, result.TokensOut); } private async Task MarkFailed(string taskId, string taskTitle, string slot, string? error, int turnCount = 0) { // Terminal write for a failed task: never cancel (the status must always // be persisted) and never throw (a logging failure must not mask the error). try { var finishedAt = DateTime.UtcNow; await _state.FailAsync(taskId, finishedAt, error, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, turnCount, error); } catch (Exception ex) { _logger.LogError(ex, "Failed to mark task {TaskId} as failed", taskId); } } private string BuildRunMcpConfigJson(string token) { var payload = new { mcpServers = new { claudedo_run = new { type = "http", url = $"http://127.0.0.1:{_cfg.SignalRPort}/mcp", headers = new Dictionary { ["Authorization"] = $"Bearer {token}", }, }, }, }; return JsonSerializer.Serialize(payload, new JsonSerializerOptions { WriteIndented = true }); } private async Task ResolveConfigAsync( TaskEntity task, ListConfigEntity? listConfig, string? resumeSessionId, CancellationToken ct) { AppSettingsEntity global; using (var ctx = _dbFactory.CreateDbContext()) { var settingsRepo = new AppSettingsRepository(ctx); global = await settingsRepo.GetAsync(ct); } var systemFile = PromptFiles.ReadOrDefault(PromptKind.System); // Improvement children (filed via SuggestImprovement; CreatedBy == ParentTaskId) get a // narrow follow-up prompt so they stay tightly scoped instead of doing "too much". var isImprovementChild = task.ParentTaskId is not null && task.CreatedBy == task.ParentTaskId; var improvementPrompt = isImprovementChild ? PromptFiles.ReadOrDefault(PromptKind.ImprovementChild) : null; var instructions = MergeInstructions( systemFile, improvementPrompt, global.DefaultClaudeInstructions, listConfig?.SystemPrompt, task.SystemPrompt); return new ClaudeRunConfig( Model: task.Model ?? listConfig?.Model ?? global.DefaultModel, SystemPrompt: string.IsNullOrWhiteSpace(instructions) ? null : instructions, AgentPath: task.AgentPath ?? listConfig?.AgentPath, ResumeSessionId: resumeSessionId, MaxTurns: ResolveMaxTurns(task.MaxTurns, listConfig?.MaxTurns, global.DefaultMaxTurns), PermissionMode: global.DefaultPermissionMode); } internal static int? ResolveMaxTurns(int? taskTurns, int? listTurns, int globalDefault) => taskTurns ?? listTurns ?? globalDefault; public static string MergeInstructions(params string?[] parts) { var trimmed = parts .Where(p => !string.IsNullOrWhiteSpace(p)) .Select(p => p!.Trim()); return string.Join("\n\n", trimmed); } public static string BuildRetryPrompt(string? capturedError) { var basePrompt = PromptFiles.ReadOrDefault(PromptKind.Retry); var isReal = !string.IsNullOrWhiteSpace(capturedError) && !capturedError!.StartsWith(ClaudeProcess.NoResultPrefix, StringComparison.Ordinal); return isReal ? $"{basePrompt}\n\nCaptured error from the failed run:\n\n{capturedError!.Trim()}" : basePrompt; } public static string? ComposeReviewResult(string? result, IReadOnlyList blocks) { if (blocks.Count == 0) return result; var section = "⚠ Roadblocks reported during the run:\n" + string.Join('\n', blocks.Select(b => $"- {b}")); return string.IsNullOrWhiteSpace(result) ? section : $"{result}\n\n{section}"; } }