From 74ca2e0dcd3feeb8200afe2c5c23540764e336c3 Mon Sep 17 00:00:00 2001 From: mika kuns Date: Tue, 9 Jun 2026 23:59:56 +0200 Subject: [PATCH] fix(worker): queue dispatches skip the StartRunning re-claim The picker claims Queued->Running atomically before dispatch; the new StartRunningAsync guard then rejected every queue-dispatched run. Add alreadyClaimed to RunAsync/ContinueAsync (queue passes true, override slot keeps the guard) and align the routing tests. Co-Authored-By: Claude Fable 5 --- src/ClaudeDo.Worker/Queue/QueueService.cs | 6 ++-- src/ClaudeDo.Worker/Runner/TaskRunner.cs | 29 ++++++++++++------- .../Runner/StandaloneChildrenRoutingTests.cs | 4 +-- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/ClaudeDo.Worker/Queue/QueueService.cs b/src/ClaudeDo.Worker/Queue/QueueService.cs index 11b99f2..949de0e 100644 --- a/src/ClaudeDo.Worker/Queue/QueueService.cs +++ b/src/ClaudeDo.Worker/Queue/QueueService.cs @@ -191,14 +191,14 @@ public sealed class QueueService : BackgroundService if (sessionId is not null) { - await _runner.ContinueAsync(taskId, feedback, "queue", ct); + await _runner.ContinueAsync(taskId, feedback, "queue", ct, alreadyClaimed: true); } else { task.Description = string.IsNullOrWhiteSpace(task.Description) ? $"Reviewer feedback: {feedback}" : $"{task.Description}\n\nReviewer feedback: {feedback}"; - await _runner.RunAsync(task, "queue", ct); + await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true); } // Clear the consumed feedback only once the run reached a successful @@ -212,7 +212,7 @@ public sealed class QueueService : BackgroundService return; } - await _runner.RunAsync(task, "queue", ct); + await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true); } catch (Exception ex) { diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index fd4591b..7866445 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -44,7 +44,7 @@ public sealed class TaskRunner _tokens = tokens; } - public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) + public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct, bool alreadyClaimed = false) { string? mcpToken = null; string? mcpConfigPath = null; @@ -98,11 +98,16 @@ public sealed class TaskRunner } var now = DateTime.UtcNow; - var startResult = await _state.StartRunningAsync(task.Id, now, ct); - if (!startResult.Ok) + // The queue picker claims Queued→Running atomically (incl. StartedAt) before + // dispatching; only unclaimed dispatches (override slot) claim here. + if (!alreadyClaimed) { - _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason); - return; + var startResult = await _state.StartRunningAsync(task.Id, now, ct); + if (!startResult.Ok) + { + _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason); + return; + } } await _broadcaster.TaskStarted(slot, task.Id, now); @@ -167,7 +172,7 @@ public sealed class TaskRunner } } - public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct) + public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct, bool alreadyClaimed = false) { TaskEntity task; TaskRunEntity lastRun; @@ -213,11 +218,15 @@ public sealed class TaskRunner } var now = DateTime.UtcNow; - var startResult = await _state.StartRunningAsync(taskId, now, ct); - if (!startResult.Ok) + // See RunAsync: queue dispatches arrive pre-claimed by the picker. + if (!alreadyClaimed) { - _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason); - return; + var startResult = await _state.StartRunningAsync(taskId, now, ct); + if (!startResult.Ok) + { + _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason); + return; + } } await _broadcaster.TaskStarted(slot, taskId, now); diff --git a/tests/ClaudeDo.Worker.Tests/Runner/StandaloneChildrenRoutingTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/StandaloneChildrenRoutingTests.cs index 83df840..5d3fe6d 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/StandaloneChildrenRoutingTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/StandaloneChildrenRoutingTests.cs @@ -47,7 +47,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable NullLogger.Instance, state, new TaskRunTokenRegistry()); using (var ctx = _db.CreateContext()) - await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default); + await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default, alreadyClaimed: true); using var verify = _db.CreateContext(); var repo = new TaskRepository(verify); @@ -74,7 +74,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable new ClaudeArgsBuilder(), _cfg, NullLogger.Instance, state, new TaskRunTokenRegistry()); using (var ctx = _db.CreateContext()) - await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default); + await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default, alreadyClaimed: true); using var verify = _db.CreateContext(); Assert.Equal(TaskStatus.WaitingForReview, (await new TaskRepository(verify).GetByIdAsync("solo"))!.Status);