fix(worker): address concurrency, cancellation, and resource issues

- claude process: run stdout/stderr reads without ct; rely on
  kill-on-cancel closing the pipes to unblock them — previously
  ReadLineAsync(ct) could hang, stalling task slots and shutdown
- task runner: terminal db writes (task_runs, MarkDone, MarkFailed,
  SetLogPath) now use CancellationToken.None; RunOnceAsync catches
  OCE and finalizes the run row so ContinueAsync can resume
- task repository: GetNextQueuedAgentTaskAsync is now a single
  UPDATE ... RETURNING statement — closes TOCTOU window where two
  loop iterations could dispatch the same queued task
- queue service: dispose CancellationTokenSource in slot-completion
  ContinueWith to stop leaking wait handles
- git service: register ct.Kill(processTree), drain reads without ct,
  always reap via WaitForExitAsync(None) — no more git zombies on
  cancelled worktree ops
- worktree manager: branch name uses full task id (dashes stripped)
  instead of 8-char prefix, eliminating collision risk

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-04-15 16:27:18 +02:00
parent fc9029de97
commit d3b85f2234
7 changed files with 122 additions and 57 deletions

View File

@@ -104,20 +104,34 @@ public sealed class GitService
using var proc = new Process { StartInfo = psi }; using var proc = new Process { StartInfo = psi };
proc.Start(); proc.Start();
// On cancellation: kill the git process tree. Killing closes the
// redirected pipes, which unblocks the ReadToEndAsync calls below
// and lets WaitForExitAsync return so the process is reaped.
// Without this, cancelling mid-git leaves zombie processes.
await using var ctr = ct.Register(() =>
{
try { proc.Kill(entireProcessTree: true); }
catch { /* already exited */ }
});
if (stdinData is not null) if (stdinData is not null)
{ {
await proc.StandardInput.WriteAsync(stdinData.AsMemory(), ct); await proc.StandardInput.WriteAsync(stdinData.AsMemory(), ct);
proc.StandardInput.Close(); proc.StandardInput.Close();
} }
var stdoutTask = proc.StandardOutput.ReadToEndAsync(ct); // Drain output without ct — pipes close when the process exits
var stderrTask = proc.StandardError.ReadToEndAsync(ct); // (whether naturally or via Kill above), so these always complete.
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
await proc.WaitForExitAsync(ct); await proc.WaitForExitAsync(CancellationToken.None);
var stdout = await stdoutTask; var stdout = await stdoutTask;
var stderr = await stderrTask; var stderr = await stderrTask;
ct.ThrowIfCancellationRequested();
return (proc.ExitCode, stdout.TrimEnd(), stderr.TrimEnd()); return (proc.ExitCode, stdout.TrimEnd(), stderr.TrimEnd());
} }
} }

View File

@@ -174,26 +174,36 @@ public sealed class TaskRepository
public async Task<TaskEntity?> GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default) public async Task<TaskEntity?> GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default)
{ {
// Atomically claim the next queued agent task: the UPDATE flips its
// status to 'running' in the same statement that returns its row,
// eliminating the TOCTOU gap where two queue-loop iterations could
// both select the same queued task before either marked it running.
// The caller is responsible for populating started_at shortly after.
await using var conn = _factory.Open(); await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
cmd.CommandText = """ cmd.CommandText = """
SELECT t.id, t.list_id, t.title, t.description, t.status, t.scheduled_for, UPDATE tasks
t.result, t.log_path, t.created_at, t.started_at, t.finished_at, t.commit_type, SET status = 'running'
t.model, t.system_prompt, t.agent_path WHERE id = (
FROM tasks t SELECT t.id
WHERE t.status = 'queued' FROM tasks t
AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now) WHERE t.status = 'queued'
AND EXISTS ( AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now)
SELECT 1 FROM task_tags tt AND EXISTS (
JOIN tags tg ON tg.id = tt.tag_id SELECT 1 FROM task_tags tt
WHERE tt.task_id = t.id AND tg.name = 'agent' JOIN tags tg ON tg.id = tt.tag_id
UNION WHERE tt.task_id = t.id AND tg.name = 'agent'
SELECT 1 FROM list_tags lt UNION
JOIN tags tg ON tg.id = lt.tag_id SELECT 1 FROM list_tags lt
WHERE lt.list_id = t.list_id AND tg.name = 'agent' JOIN tags tg ON tg.id = lt.tag_id
) WHERE lt.list_id = t.list_id AND tg.name = 'agent'
ORDER BY t.created_at ASC )
LIMIT 1 ORDER BY t.created_at ASC
LIMIT 1
)
RETURNING id, list_id, title, description, status, scheduled_for,
result, log_path, created_at, started_at, finished_at, commit_type,
model, system_prompt, agent_path
"""; """;
cmd.Parameters.AddWithValue("@now", now.ToString("o")); cmd.Parameters.AddWithValue("@now", now.ToString("o"));

View File

@@ -45,6 +45,9 @@ public sealed class ClaudeProcess : IClaudeProcess
var analyzer = new StreamAnalyzer(); var analyzer = new StreamAnalyzer();
var lastStderr = new StringBuilder(); var lastStderr = new StringBuilder();
// On cancellation: kill the tree. Killing closes the redirected pipes,
// which unblocks the ReadLineAsync loops below (which run without ct
// so they reliably drain instead of hanging on cancellation).
await using var ctr = ct.Register(() => await using var ctr = ct.Register(() =>
{ {
try { process.Kill(entireProcessTree: true); } try { process.Kill(entireProcessTree: true); }
@@ -53,26 +56,30 @@ public sealed class ClaudeProcess : IClaudeProcess
var stdoutTask = Task.Run(async () => var stdoutTask = Task.Run(async () =>
{ {
while (await process.StandardOutput.ReadLineAsync(ct) is { } line) while (await process.StandardOutput.ReadLineAsync() is { } line)
{ {
if (string.IsNullOrEmpty(line)) continue; if (string.IsNullOrEmpty(line)) continue;
await onStdoutLine(line); await onStdoutLine(line);
analyzer.ProcessLine(line); analyzer.ProcessLine(line);
} }
}, ct); });
var stderrTask = Task.Run(async () => var stderrTask = Task.Run(async () =>
{ {
while (await process.StandardError.ReadLineAsync(ct) is { } line) while (await process.StandardError.ReadLineAsync() is { } line)
{ {
if (string.IsNullOrEmpty(line)) continue; if (string.IsNullOrEmpty(line)) continue;
lastStderr.AppendLine(line); lastStderr.AppendLine(line);
await onStdoutLine($"[stderr] {line}"); await onStdoutLine($"[stderr] {line}");
} }
}, ct); });
await Task.WhenAll(stdoutTask, stderrTask); await Task.WhenAll(stdoutTask, stderrTask);
await process.WaitForExitAsync(ct); await process.WaitForExitAsync(CancellationToken.None);
// If we were asked to cancel, surface that to the caller now that
// the process is fully reaped.
ct.ThrowIfCancellationRequested();
var exitCode = process.ExitCode; var exitCode = process.ExitCode;
var streamResult = analyzer.GetResult(); var streamResult = analyzer.GetResult();

View File

@@ -232,33 +232,56 @@ public sealed class TaskRunner
await using var logWriter = new LogWriter(logPath); await using var logWriter = new LogWriter(logPath);
var result = await _claude.RunAsync( try
arguments, {
prompt, var result = await _claude.RunAsync(
runDir, arguments,
async line => prompt,
runDir,
async line =>
{
await logWriter.WriteLineAsync(line, ct);
await _broadcaster.TaskMessage(taskId, line);
},
ct);
// Update the run record with results. Use CancellationToken.None:
// this is a terminal write that must always complete, even if the
// caller's token is already cancelled.
run.SessionId = result.SessionId;
run.ResultMarkdown = result.ResultMarkdown;
run.StructuredOutputJson = result.StructuredOutputJson;
run.ErrorMarkdown = result.ErrorMarkdown;
run.ExitCode = result.ExitCode;
run.TurnCount = result.TurnCount;
run.TokensIn = result.TokensIn;
run.TokensOut = result.TokensOut;
run.FinishedAt = DateTime.UtcNow;
await _runRepo.UpdateAsync(run, CancellationToken.None);
// Update denormalized fields on the task.
await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None);
return result;
}
catch (OperationCanceledException)
{
// Ensure the run row is completed so ContinueAsync / inspection
// isn't left staring at a null session_id / finished_at.
run.ErrorMarkdown = "Cancelled.";
run.ExitCode = -1;
run.FinishedAt = DateTime.UtcNow;
try
{ {
await logWriter.WriteLineAsync(line, ct); await _runRepo.UpdateAsync(run, CancellationToken.None);
await _broadcaster.TaskMessage(taskId, line); await _taskRepo.SetLogPathAsync(taskId, logPath, CancellationToken.None);
}, }
ct); catch (Exception updateEx)
{
// Update the run record with results. _logger.LogError(updateEx, "Failed to finalize cancelled run {RunId} for task {TaskId}", runId, taskId);
run.SessionId = result.SessionId; }
run.ResultMarkdown = result.ResultMarkdown; throw;
run.StructuredOutputJson = result.StructuredOutputJson; }
run.ErrorMarkdown = result.ErrorMarkdown;
run.ExitCode = result.ExitCode;
run.TurnCount = result.TurnCount;
run.TokensIn = result.TokensIn;
run.TokensOut = result.TokensOut;
run.FinishedAt = DateTime.UtcNow;
await _runRepo.UpdateAsync(run, ct);
// Update denormalized fields on the task.
await _taskRepo.SetLogPathAsync(taskId, logPath, ct);
return result;
} }
private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct) private async Task HandleSuccess(TaskEntity task, ListEntity list, string slot, WorktreeContext? wtCtx, RunResult result, CancellationToken ct)
@@ -270,8 +293,11 @@ public sealed class TaskRunner
await _broadcaster.WorktreeUpdated(task.Id); await _broadcaster.WorktreeUpdated(task.Id);
} }
// Terminal DB write uses CancellationToken.None so the task status
// is never left as 'running' because of a cancel that arrived
// after the Claude run already succeeded.
var finishedAt = DateTime.UtcNow; var finishedAt = DateTime.UtcNow;
await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, ct); await _taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
task.Id, result.TurnCount, result.TokensIn, result.TokensOut); task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
@@ -279,8 +305,10 @@ public sealed class TaskRunner
private async Task HandleFailure(string taskId, string slot, RunResult result) private async Task HandleFailure(string taskId, string slot, RunResult result)
{ {
// Intentionally does not accept a CancellationToken: this is the
// terminal write for a failed task and must always be persisted.
var finishedAt = DateTime.UtcNow; var finishedAt = DateTime.UtcNow;
await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown); await _taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None);
await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt);
_logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown); _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown);
} }
@@ -290,7 +318,8 @@ public sealed class TaskRunner
try try
{ {
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _taskRepo.MarkFailedAsync(taskId, now, error); // Terminal write — never cancel.
await _taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None);
await _broadcaster.TaskFinished(slot, taskId, "failed", now); await _broadcaster.TaskFinished(slot, taskId, "failed", now);
await _broadcaster.TaskUpdated(taskId); await _broadcaster.TaskUpdated(taskId);
} }

View File

@@ -31,8 +31,10 @@ public sealed class WorktreeManager
throw new InvalidOperationException($"working_dir is not a git repository: {workingDir}"); throw new InvalidOperationException($"working_dir is not a git repository: {workingDir}");
var baseCommit = await _git.RevParseHeadAsync(workingDir, ct); var baseCommit = await _git.RevParseHeadAsync(workingDir, ct);
var shortId = task.Id.Length >= 8 ? task.Id[..8] : task.Id; // Use the full task id (dashes stripped) in the branch name so
var branchName = $"claudedo/{shortId}"; // two GUIDs sharing an 8-char prefix cannot collide on the same branch.
var idForBranch = task.Id.Replace("-", "");
var branchName = $"claudedo/{idForBranch}";
var slug = CommitMessageBuilder.ToSlug(list.Name); var slug = CommitMessageBuilder.ToSlug(list.Name);
var worktreePath = _cfg.WorktreeRootStrategy.Equals("central", StringComparison.OrdinalIgnoreCase) var worktreePath = _cfg.WorktreeRootStrategy.Equals("central", StringComparison.OrdinalIgnoreCase)

View File

@@ -71,6 +71,7 @@ public sealed class QueueService : BackgroundService
_ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ => _ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ =>
{ {
lock (_lock) { _overrideSlot = null; } lock (_lock) { _overrideSlot = null; }
cts.Dispose();
}, TaskScheduler.Default); }, TaskScheduler.Default);
} }
} }
@@ -94,6 +95,7 @@ public sealed class QueueService : BackgroundService
_ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(_ => _ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(_ =>
{ {
lock (_lock) { _overrideSlot = null; } lock (_lock) { _overrideSlot = null; }
cts.Dispose();
}, TaskScheduler.Default); }, TaskScheduler.Default);
} }
@@ -155,6 +157,7 @@ public sealed class QueueService : BackgroundService
_ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ => _ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ =>
{ {
lock (_lock) { _queueSlot = null; } lock (_lock) { _queueSlot = null; }
cts.Dispose();
WakeQueue(); // Check for next task immediately. WakeQueue(); // Check for next task immediately.
}, TaskScheduler.Default); }, TaskScheduler.Default);
} }

View File

@@ -63,7 +63,7 @@ public class WorktreeManagerTests : IDisposable
Assert.NotNull(ctx); Assert.NotNull(ctx);
Assert.True(Directory.Exists(ctx.WorktreePath)); Assert.True(Directory.Exists(ctx.WorktreePath));
Assert.Equal($"claudedo/{task.Id[..8]}", ctx.BranchName); Assert.Equal($"claudedo/{task.Id.Replace("-", "")}", ctx.BranchName);
Assert.Equal(repo.BaseCommit, ctx.BaseCommit); Assert.Equal(repo.BaseCommit, ctx.BaseCommit);
var row = await wtRepo.GetByTaskIdAsync(task.Id); var row = await wtRepo.GetByTaskIdAsync(task.Id);