From d3b85f223444c4ee23349212dbd0c7bb86d704d4 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Wed, 15 Apr 2026 16:27:18 +0200 Subject: [PATCH] fix(worker): address concurrency, cancellation, and resource issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - claude process: run stdout/stderr reads without ct; rely on kill-on-cancel closing the pipes to unblock them — previously ReadLineAsync(ct) could hang, stalling task slots and shutdown - task runner: terminal db writes (task_runs, MarkDone, MarkFailed, SetLogPath) now use CancellationToken.None; RunOnceAsync catches OCE and finalizes the run row so ContinueAsync can resume - task repository: GetNextQueuedAgentTaskAsync is now a single UPDATE ... RETURNING statement — closes TOCTOU window where two loop iterations could dispatch the same queued task - queue service: dispose CancellationTokenSource in slot-completion ContinueWith to stop leaking wait handles - git service: register ct.Kill(processTree), drain reads without ct, always reap via WaitForExitAsync(None) — no more git zombies on cancelled worktree ops - worktree manager: branch name uses full task id (dashes stripped) instead of 8-char prefix, eliminating collision risk Co-Authored-By: Claude Opus 4.6 (1M context) --- src/ClaudeDo.Data/Git/GitService.cs | 20 ++++- .../Repositories/TaskRepository.cs | 44 ++++++---- src/ClaudeDo.Worker/Runner/ClaudeProcess.cs | 17 ++-- src/ClaudeDo.Worker/Runner/TaskRunner.cs | 87 ++++++++++++------- src/ClaudeDo.Worker/Runner/WorktreeManager.cs | 6 +- src/ClaudeDo.Worker/Services/QueueService.cs | 3 + .../Runner/WorktreeManagerTests.cs | 2 +- 7 files changed, 122 insertions(+), 57 deletions(-) diff --git a/src/ClaudeDo.Data/Git/GitService.cs b/src/ClaudeDo.Data/Git/GitService.cs index f1e7662..ecd7fdb 100644 --- a/src/ClaudeDo.Data/Git/GitService.cs +++ b/src/ClaudeDo.Data/Git/GitService.cs @@ -104,20 +104,34 @@ public sealed class GitService using var proc = new Process { StartInfo = psi }; proc.Start(); + // On cancellation: kill the git process tree. Killing closes the + // redirected pipes, which unblocks the ReadToEndAsync calls below + // and lets WaitForExitAsync return so the process is reaped. + // Without this, cancelling mid-git leaves zombie processes. + await using var ctr = ct.Register(() => + { + try { proc.Kill(entireProcessTree: true); } + catch { /* already exited */ } + }); + if (stdinData is not null) { await proc.StandardInput.WriteAsync(stdinData.AsMemory(), ct); proc.StandardInput.Close(); } - var stdoutTask = proc.StandardOutput.ReadToEndAsync(ct); - var stderrTask = proc.StandardError.ReadToEndAsync(ct); + // Drain output without ct — pipes close when the process exits + // (whether naturally or via Kill above), so these always complete. + var stdoutTask = proc.StandardOutput.ReadToEndAsync(); + var stderrTask = proc.StandardError.ReadToEndAsync(); - await proc.WaitForExitAsync(ct); + await proc.WaitForExitAsync(CancellationToken.None); var stdout = await stdoutTask; var stderr = await stderrTask; + ct.ThrowIfCancellationRequested(); + return (proc.ExitCode, stdout.TrimEnd(), stderr.TrimEnd()); } } diff --git a/src/ClaudeDo.Data/Repositories/TaskRepository.cs b/src/ClaudeDo.Data/Repositories/TaskRepository.cs index ba62735..64de4d2 100644 --- a/src/ClaudeDo.Data/Repositories/TaskRepository.cs +++ b/src/ClaudeDo.Data/Repositories/TaskRepository.cs @@ -174,26 +174,36 @@ public sealed class TaskRepository public async Task GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default) { + // Atomically claim the next queued agent task: the UPDATE flips its + // status to 'running' in the same statement that returns its row, + // eliminating the TOCTOU gap where two queue-loop iterations could + // both select the same queued task before either marked it running. + // The caller is responsible for populating started_at shortly after. await using var conn = _factory.Open(); await using var cmd = conn.CreateCommand(); cmd.CommandText = """ - SELECT t.id, t.list_id, t.title, t.description, t.status, t.scheduled_for, - t.result, t.log_path, t.created_at, t.started_at, t.finished_at, t.commit_type, - t.model, t.system_prompt, t.agent_path - FROM tasks t - WHERE t.status = 'queued' - AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now) - AND EXISTS ( - SELECT 1 FROM task_tags tt - JOIN tags tg ON tg.id = tt.tag_id - WHERE tt.task_id = t.id AND tg.name = 'agent' - UNION - SELECT 1 FROM list_tags lt - JOIN tags tg ON tg.id = lt.tag_id - WHERE lt.list_id = t.list_id AND tg.name = 'agent' - ) - ORDER BY t.created_at ASC - LIMIT 1 + UPDATE tasks + SET status = 'running' + WHERE id = ( + SELECT t.id + FROM tasks t + WHERE t.status = 'queued' + AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now) + AND EXISTS ( + SELECT 1 FROM task_tags tt + JOIN tags tg ON tg.id = tt.tag_id + WHERE tt.task_id = t.id AND tg.name = 'agent' + UNION + SELECT 1 FROM list_tags lt + JOIN tags tg ON tg.id = lt.tag_id + WHERE lt.list_id = t.list_id AND tg.name = 'agent' + ) + ORDER BY t.created_at ASC + LIMIT 1 + ) + RETURNING id, list_id, title, description, status, scheduled_for, + result, log_path, created_at, started_at, finished_at, commit_type, + model, system_prompt, agent_path """; cmd.Parameters.AddWithValue("@now", now.ToString("o")); diff --git a/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs index 1cbe132..39a59cf 100644 --- a/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs +++ b/src/ClaudeDo.Worker/Runner/ClaudeProcess.cs @@ -45,6 +45,9 @@ public sealed class ClaudeProcess : IClaudeProcess var analyzer = new StreamAnalyzer(); var lastStderr = new StringBuilder(); + // On cancellation: kill the tree. Killing closes the redirected pipes, + // which unblocks the ReadLineAsync loops below (which run without ct + // so they reliably drain instead of hanging on cancellation). await using var ctr = ct.Register(() => { try { process.Kill(entireProcessTree: true); } @@ -53,26 +56,30 @@ public sealed class ClaudeProcess : IClaudeProcess var stdoutTask = Task.Run(async () => { - while (await process.StandardOutput.ReadLineAsync(ct) is { } line) + while (await process.StandardOutput.ReadLineAsync() is { } line) { if (string.IsNullOrEmpty(line)) continue; await onStdoutLine(line); analyzer.ProcessLine(line); } - }, ct); + }); var stderrTask = Task.Run(async () => { - while (await process.StandardError.ReadLineAsync(ct) is { } line) + while (await process.StandardError.ReadLineAsync() is { } line) { if (string.IsNullOrEmpty(line)) continue; lastStderr.AppendLine(line); await onStdoutLine($"[stderr] {line}"); } - }, ct); + }); await Task.WhenAll(stdoutTask, stderrTask); - await process.WaitForExitAsync(ct); + await process.WaitForExitAsync(CancellationToken.None); + + // If we were asked to cancel, surface that to the caller now that + // the process is fully reaped. + ct.ThrowIfCancellationRequested(); var exitCode = process.ExitCode; var streamResult = analyzer.GetResult(); diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index 63cfcb2..2b13c51 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -232,33 +232,56 @@ public sealed class TaskRunner await using var logWriter = new LogWriter(logPath); - var result = await _claude.RunAsync( - arguments, - prompt, - runDir, - async line => + try + { + var result = await _claude.RunAsync( + arguments, + prompt, + runDir, + async line => + { + await logWriter.WriteLineAsync(line, ct); + await _broadcaster.TaskMessage(taskId, line); + }, + ct); + + // Update the run record with results. Use CancellationToken.None: + // this is a terminal write that must always complete, even if the + // caller's token is already cancelled. + run.SessionId = result.SessionId; + run.ResultMarkdown = result.ResultMarkdown; + run.StructuredOutputJson = result.StructuredOutputJson; + run.ErrorMarkdown = result.ErrorMarkdown; + run.ExitCode = result.ExitCode; + run.TurnCount = result.TurnCount; + run.TokensIn = result.TokensIn; + run.TokensOut = result.TokensOut; + run.FinishedAt = DateTime.UtcNow; + await _runRepo.UpdateAsync(run, CancellationToken.None); + + // Update denormalized fields on the task. + await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None); + + return result; + } + catch (OperationCanceledException) + { + // Ensure the run row is completed so ContinueAsync / inspection + // isn't left staring at a null session_id / finished_at. + run.ErrorMarkdown = "Cancelled."; + run.ExitCode = -1; + run.FinishedAt = DateTime.UtcNow; + try { - await logWriter.WriteLineAsync(line, ct); - await _broadcaster.TaskMessage(taskId, line); - }, - ct); - - // Update the run record with results. - run.SessionId = result.SessionId; - run.ResultMarkdown = result.ResultMarkdown; - run.StructuredOutputJson = result.StructuredOutputJson; - run.ErrorMarkdown = result.ErrorMarkdown; - run.ExitCode = result.ExitCode; - run.TurnCount = result.TurnCount; - run.TokensIn = result.TokensIn; - run.TokensOut = result.TokensOut; - run.FinishedAt = DateTime.UtcNow; - await _runRepo.UpdateAsync(run, ct); - - // Update denormalized fields on the task. - await _taskRepo.SetLogPathAsync(taskId, logPath, ct); - - return result; + await _runRepo.UpdateAsync(run, CancellationToken.None); + await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None); + } + catch (Exception updateEx) + { + _logger.LogError(updateEx, "Failed to finalize cancelled run {RunId} for task {TaskId}", runId, taskId); + } + throw; + } } private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct) @@ -270,8 +293,11 @@ public sealed class TaskRunner await _broadcaster.WorktreeUpdated(task.Id); } + // Terminal DB write uses CancellationToken.None so the task status + // is never left as 'running' because of a cancel that arrived + // after the Claude run already succeeded. var finishedAt = DateTime.UtcNow; - await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct); + await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", task.Id, result.TurnCount, result.TokensIn, result.TokensOut); @@ -279,8 +305,10 @@ public sealed class TaskRunner private async Task HandleFailure(string taskId, string slot, RunResult result) { + // Intentionally does not accept a CancellationToken: this is the + // terminal write for a failed task and must always be persisted. var finishedAt = DateTime.UtcNow; - await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown); + await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None); await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown); } @@ -290,7 +318,8 @@ public sealed class TaskRunner try { var now = DateTime.UtcNow; - await _taskRepo.MarkFailedAsync(taskId, now, error); + // Terminal write — never cancel. + await _taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None); await _broadcaster.TaskFinished(slot, taskId, "failed", now); await _broadcaster.TaskUpdated(taskId); } diff --git a/src/ClaudeDo.Worker/Runner/WorktreeManager.cs b/src/ClaudeDo.Worker/Runner/WorktreeManager.cs index ceabbc6..51b76d6 100644 --- a/src/ClaudeDo.Worker/Runner/WorktreeManager.cs +++ b/src/ClaudeDo.Worker/Runner/WorktreeManager.cs @@ -31,8 +31,10 @@ public sealed class WorktreeManager throw new InvalidOperationException($"working_dir is not a git repository: {workingDir}"); var baseCommit = await _git.RevParseHeadAsync(workingDir, ct); - var shortId = task.Id.Length >= 8 ? task.Id[..8] : task.Id; - var branchName = $"claudedo/{shortId}"; + // Use the full task id (dashes stripped) in the branch name so + // two GUIDs sharing an 8-char prefix cannot collide on the same branch. + var idForBranch = task.Id.Replace("-", ""); + var branchName = $"claudedo/{idForBranch}"; var slug = CommitMessageBuilder.ToSlug(list.Name); var worktreePath = _cfg.WorktreeRootStrategy.Equals("central", StringComparison.OrdinalIgnoreCase) diff --git a/src/ClaudeDo.Worker/Services/QueueService.cs b/src/ClaudeDo.Worker/Services/QueueService.cs index e3912c5..27b256d 100644 --- a/src/ClaudeDo.Worker/Services/QueueService.cs +++ b/src/ClaudeDo.Worker/Services/QueueService.cs @@ -71,6 +71,7 @@ public sealed class QueueService : BackgroundService _ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ => { lock (_lock) { _overrideSlot = null; } + cts.Dispose(); }, TaskScheduler.Default); } } @@ -94,6 +95,7 @@ public sealed class QueueService : BackgroundService _ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(_ => { lock (_lock) { _overrideSlot = null; } + cts.Dispose(); }, TaskScheduler.Default); } @@ -155,6 +157,7 @@ public sealed class QueueService : BackgroundService _ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ => { lock (_lock) { _queueSlot = null; } + cts.Dispose(); WakeQueue(); // Check for next task immediately. }, TaskScheduler.Default); } diff --git a/tests/ClaudeDo.Worker.Tests/Runner/WorktreeManagerTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/WorktreeManagerTests.cs index 97803aa..7e5ed68 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/WorktreeManagerTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/WorktreeManagerTests.cs @@ -63,7 +63,7 @@ public class WorktreeManagerTests : IDisposable Assert.NotNull(ctx); Assert.True(Directory.Exists(ctx.WorktreePath)); - Assert.Equal($"claudedo/{task.Id[..8]}", ctx.BranchName); + Assert.Equal($"claudedo/{task.Id.Replace("-", "")}", ctx.BranchName); Assert.Equal(repo.BaseCommit, ctx.BaseCommit); var row = await wtRepo.GetByTaskIdAsync(task.Id);