refactor(worker): rewrite TaskRunner with config resolution, retry, and continue support

This commit is contained in:
Mika Kuns
2026-04-14 14:02:57 +02:00
parent 1cdaaf9fd2
commit 76473dd92a

View File

@@ -1,3 +1,4 @@
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories; using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Hub;
@@ -8,31 +9,40 @@ public sealed class TaskRunner
{ {
private readonly IClaudeProcess _claude; private readonly IClaudeProcess _claude;
private readonly TaskRepository _taskRepo; private readonly TaskRepository _taskRepo;
private readonly TaskRunRepository _runRepo;
private readonly ListRepository _listRepo; private readonly ListRepository _listRepo;
private readonly WorktreeRepository _wtRepo;
private readonly HubBroadcaster _broadcaster; private readonly HubBroadcaster _broadcaster;
private readonly WorktreeManager _wtManager; private readonly WorktreeManager _wtManager;
private readonly ClaudeArgsBuilder _argsBuilder;
private readonly WorkerConfig _cfg; private readonly WorkerConfig _cfg;
private readonly ILogger<TaskRunner> _logger; private readonly ILogger<TaskRunner> _logger;
public TaskRunner( public TaskRunner(
IClaudeProcess claude, IClaudeProcess claude,
TaskRepository taskRepo, TaskRepository taskRepo,
TaskRunRepository runRepo,
ListRepository listRepo, ListRepository listRepo,
WorktreeRepository wtRepo,
HubBroadcaster broadcaster, HubBroadcaster broadcaster,
WorktreeManager wtManager, WorktreeManager wtManager,
ClaudeArgsBuilder argsBuilder,
WorkerConfig cfg, WorkerConfig cfg,
ILogger<TaskRunner> logger) ILogger<TaskRunner> logger)
{ {
_claude = claude; _claude = claude;
_taskRepo = taskRepo; _taskRepo = taskRepo;
_runRepo = runRepo;
_listRepo = listRepo; _listRepo = listRepo;
_wtRepo = wtRepo;
_broadcaster = broadcaster; _broadcaster = broadcaster;
_wtManager = wtManager; _wtManager = wtManager;
_argsBuilder = argsBuilder;
_cfg = cfg; _cfg = cfg;
_logger = logger; _logger = logger;
} }
public async Task RunAsync(Data.Models.TaskEntity task, string slot, CancellationToken ct) public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
{ {
try try
{ {
@@ -63,64 +73,60 @@ public sealed class TaskRunner
} }
else else
{ {
// Non-worktree sandbox path.
runDir = Path.Combine(_cfg.SandboxRoot, task.Id); runDir = Path.Combine(_cfg.SandboxRoot, task.Id);
Directory.CreateDirectory(runDir); Directory.CreateDirectory(runDir);
} }
var logPath = Path.Combine(_cfg.LogRoot, $"{task.Id}.ndjson"); // Resolve config: task overrides > list config > null.
var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct);
var resolvedConfig = new ClaudeRunConfig(
Model: task.Model ?? listConfig?.Model,
SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt,
AgentPath: task.AgentPath ?? listConfig?.AgentPath,
ResumeSessionId: null
);
await _taskRepo.SetLogPathAsync(task.Id, logPath, ct);
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _taskRepo.MarkRunningAsync(task.Id, now, ct); await _taskRepo.MarkRunningAsync(task.Id, now, ct);
await _broadcaster.TaskStarted(slot, task.Id, now); await _broadcaster.TaskStarted(slot, task.Id, now);
// Build prompt and arguments. // Build prompt.
var prompt = string.IsNullOrWhiteSpace(task.Description) var prompt = string.IsNullOrWhiteSpace(task.Description)
? task.Title ? task.Title
: $"{task.Title}\n\n{task.Description.Trim()}"; : $"{task.Title}\n\n{task.Description.Trim()}";
var arguments = new ClaudeArgsBuilder().Build(new ClaudeRunConfig( // Run 1.
Model: null, var result = await RunOnceAsync(task.Id, slot, runDir, resolvedConfig, 1, false, prompt, ct);
SystemPrompt: null,
AgentPath: null,
ResumeSessionId: null));
await using var logWriter = new LogWriter(logPath);
var result = await _claude.RunAsync(
arguments,
prompt,
runDir,
async line =>
{
await logWriter.WriteLineAsync(line, ct);
await _broadcaster.TaskMessage(task.Id, line);
},
ct);
var finishedAt = DateTime.UtcNow;
if (result.IsSuccess) if (result.IsSuccess)
{ {
// Auto-commit if worktree mode and run succeeded. await HandleSuccess(task, list, slot, wtCtx, result, ct);
if (wtCtx is not null)
{
var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct);
if (committed)
await _broadcaster.WorktreeUpdated(task.Id);
}
await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct);
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed successfully", task.Id);
} }
else else
{ {
// Failed run: do NOT commit. Worktree row stays active for inspection. // Auto-retry: one attempt if we have a session ID.
await _taskRepo.MarkFailedAsync(task.Id, finishedAt, result.ErrorMarkdown, ct); if (result.SessionId is not null)
await _broadcaster.TaskFinished(slot, task.Id, "failed", finishedAt); {
_logger.LogWarning("Task {TaskId} failed: {Error}", task.Id, result.ErrorMarkdown); _logger.LogInformation("Auto-retrying task {TaskId} with session {SessionId}", task.Id, result.SessionId);
var retryConfig = resolvedConfig with { ResumeSessionId = result.SessionId };
var retryPrompt = $"The previous attempt failed with:\n\n{result.ErrorMarkdown}\n\nTry again and fix the issues.";
await _broadcaster.RunCreated(task.Id, 2, true);
var retryResult = await RunOnceAsync(task.Id, slot, runDir, retryConfig, 2, true, retryPrompt, ct);
if (retryResult.IsSuccess)
{
await HandleSuccess(task, list, slot, wtCtx, retryResult, ct);
}
else
{
await HandleFailure(task.Id, slot, retryResult);
}
}
else
{
await HandleFailure(task.Id, slot, result);
}
} }
await _broadcaster.TaskUpdated(task.Id); await _broadcaster.TaskUpdated(task.Id);
@@ -137,6 +143,138 @@ public sealed class TaskRunner
} }
} }
public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct)
{
var task = await _taskRepo.GetByIdAsync(taskId, ct)
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
var lastRun = await _runRepo.GetLatestByTaskIdAsync(taskId, ct)
?? throw new InvalidOperationException("No previous run to continue.");
if (lastRun.SessionId is null)
throw new InvalidOperationException("Previous run has no session ID — cannot resume.");
var list = await _listRepo.GetByIdAsync(task.ListId, ct)
?? throw new InvalidOperationException("List not found.");
var listConfig = await _listRepo.GetConfigAsync(task.ListId, ct);
var resolvedConfig = new ClaudeRunConfig(
Model: task.Model ?? listConfig?.Model,
SystemPrompt: task.SystemPrompt ?? listConfig?.SystemPrompt,
AgentPath: task.AgentPath ?? listConfig?.AgentPath,
ResumeSessionId: lastRun.SessionId
);
// Determine run directory from existing worktree or sandbox.
string runDir;
WorktreeContext? wtCtx = null;
var worktree = await _wtRepo.GetByTaskIdAsync(taskId, ct);
if (worktree is not null)
{
runDir = worktree.Path;
wtCtx = new WorktreeContext(worktree.Path, worktree.BranchName, worktree.BaseCommit);
}
else
{
runDir = Path.Combine(_cfg.SandboxRoot, taskId);
}
var now = DateTime.UtcNow;
await _taskRepo.MarkRunningAsync(taskId, now, ct);
await _broadcaster.TaskStarted(slot, taskId, now);
var nextRunNumber = lastRun.RunNumber + 1;
await _broadcaster.RunCreated(taskId, nextRunNumber, false);
var result = await RunOnceAsync(taskId, slot, runDir, resolvedConfig, nextRunNumber, false, followUpPrompt, ct);
if (result.IsSuccess)
{
await HandleSuccess(task, list, slot, wtCtx, result, ct);
}
else
{
await HandleFailure(taskId, slot, result);
}
await _broadcaster.TaskUpdated(taskId);
}
private async Task<RunResult> RunOnceAsync(
string taskId, string slot, string runDir, ClaudeRunConfig config,
int runNumber, bool isRetry, string prompt, CancellationToken ct)
{
var runId = Guid.NewGuid().ToString();
var logPath = Path.Combine(_cfg.LogRoot, $"{taskId}_run{runNumber}.ndjson");
var run = new TaskRunEntity
{
Id = runId,
TaskId = taskId,
RunNumber = runNumber,
IsRetry = isRetry,
Prompt = prompt,
LogPath = logPath,
StartedAt = DateTime.UtcNow,
};
await _runRepo.AddAsync(run, ct);
var arguments = _argsBuilder.Build(config);
await using var logWriter = new LogWriter(logPath);
var result = await _claude.RunAsync(
arguments,
prompt,
runDir,
async line =>
{
await logWriter.WriteLineAsync(line, ct);
await _broadcaster.TaskMessage(taskId, line);
},
ct);
// Update the run record with results.
run.SessionId = result.SessionId;
run.ResultMarkdown = result.ResultMarkdown;
run.StructuredOutputJson = result.StructuredOutputJson;
run.ErrorMarkdown = result.ErrorMarkdown;
run.ExitCode = result.ExitCode;
run.TurnCount = result.TurnCount;
run.TokensIn = result.TokensIn;
run.TokensOut = result.TokensOut;
run.FinishedAt = DateTime.UtcNow;
await _runRepo.UpdateAsync(run, ct);
// Update denormalized fields on the task.
await _taskRepo.SetLogPathAsync(taskId, logPath, ct);
return result;
}
private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct)
{
if (wtCtx is not null)
{
var committed = await _wtManager.CommitIfChangedAsync(wtCtx, task, list, ct);
if (committed)
await _broadcaster.WorktreeUpdated(task.Id);
}
var finishedAt = DateTime.UtcNow;
await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct);
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
}
private async Task HandleFailure(string taskId, string slot, RunResult result)
{
var finishedAt = DateTime.UtcNow;
await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown);
await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt);
_logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown);
}
private async Task MarkFailed(string taskId, string slot, string error) private async Task MarkFailed(string taskId, string slot, string error)
{ {
try try