fix(worker): queue dispatches skip the StartRunning re-claim

The picker claims Queued->Running atomically before dispatch; the new
StartRunningAsync guard then rejected every queue-dispatched run. Add
alreadyClaimed to RunAsync/ContinueAsync (queue passes true, override
slot keeps the guard) and align the routing tests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-06-09 23:59:56 +02:00
parent 0cba9f9640
commit 74ca2e0dcd
3 changed files with 24 additions and 15 deletions

View File

@@ -191,14 +191,14 @@ public sealed class QueueService : BackgroundService
if (sessionId is not null) if (sessionId is not null)
{ {
await _runner.ContinueAsync(taskId, feedback, "queue", ct); await _runner.ContinueAsync(taskId, feedback, "queue", ct, alreadyClaimed: true);
} }
else else
{ {
task.Description = string.IsNullOrWhiteSpace(task.Description) task.Description = string.IsNullOrWhiteSpace(task.Description)
? $"Reviewer feedback: {feedback}" ? $"Reviewer feedback: {feedback}"
: $"{task.Description}\n\nReviewer feedback: {feedback}"; : $"{task.Description}\n\nReviewer feedback: {feedback}";
await _runner.RunAsync(task, "queue", ct); await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true);
} }
// Clear the consumed feedback only once the run reached a successful // Clear the consumed feedback only once the run reached a successful
@@ -212,7 +212,7 @@ public sealed class QueueService : BackgroundService
return; return;
} }
await _runner.RunAsync(task, "queue", ct); await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@@ -44,7 +44,7 @@ public sealed class TaskRunner
_tokens = tokens; _tokens = tokens;
} }
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct, bool alreadyClaimed = false)
{ {
string? mcpToken = null; string? mcpToken = null;
string? mcpConfigPath = null; string? mcpConfigPath = null;
@@ -98,12 +98,17 @@ public sealed class TaskRunner
} }
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
// The queue picker claims Queued→Running atomically (incl. StartedAt) before
// dispatching; only unclaimed dispatches (override slot) claim here.
if (!alreadyClaimed)
{
var startResult = await _state.StartRunningAsync(task.Id, now, ct); var startResult = await _state.StartRunningAsync(task.Id, now, ct);
if (!startResult.Ok) if (!startResult.Ok)
{ {
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason); _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason);
return; return;
} }
}
await _broadcaster.TaskStarted(slot, task.Id, now); await _broadcaster.TaskStarted(slot, task.Id, now);
// Build prompt: title + description + only the OPEN sub-tasks (resolved ones are dropped). // Build prompt: title + description + only the OPEN sub-tasks (resolved ones are dropped).
@@ -167,7 +172,7 @@ public sealed class TaskRunner
} }
} }
public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct) public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct, bool alreadyClaimed = false)
{ {
TaskEntity task; TaskEntity task;
TaskRunEntity lastRun; TaskRunEntity lastRun;
@@ -213,12 +218,16 @@ public sealed class TaskRunner
} }
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
// See RunAsync: queue dispatches arrive pre-claimed by the picker.
if (!alreadyClaimed)
{
var startResult = await _state.StartRunningAsync(taskId, now, ct); var startResult = await _state.StartRunningAsync(taskId, now, ct);
if (!startResult.Ok) if (!startResult.Ok)
{ {
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason); _logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason);
return; return;
} }
}
await _broadcaster.TaskStarted(slot, taskId, now); await _broadcaster.TaskStarted(slot, taskId, now);
try try

View File

@@ -47,7 +47,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable
NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry()); NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
using (var ctx = _db.CreateContext()) using (var ctx = _db.CreateContext())
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default); await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default, alreadyClaimed: true);
using var verify = _db.CreateContext(); using var verify = _db.CreateContext();
var repo = new TaskRepository(verify); var repo = new TaskRepository(verify);
@@ -74,7 +74,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable
new ClaudeArgsBuilder(), _cfg, NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry()); new ClaudeArgsBuilder(), _cfg, NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
using (var ctx = _db.CreateContext()) using (var ctx = _db.CreateContext())
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default); await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default, alreadyClaimed: true);
using var verify = _db.CreateContext(); using var verify = _db.CreateContext();
Assert.Equal(TaskStatus.WaitingForReview, (await new TaskRepository(verify).GetByIdAsync("solo"))!.Status); Assert.Equal(TaskStatus.WaitingForReview, (await new TaskRepository(verify).GetByIdAsync("solo"))!.Status);