fix(worker): queue dispatches skip the StartRunning re-claim
The picker claims Queued->Running atomically before dispatch; the new StartRunningAsync guard then rejected every queue-dispatched run. Add alreadyClaimed to RunAsync/ContinueAsync (queue passes true, override slot keeps the guard) and align the routing tests. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -191,14 +191,14 @@ public sealed class QueueService : BackgroundService
|
|||||||
|
|
||||||
if (sessionId is not null)
|
if (sessionId is not null)
|
||||||
{
|
{
|
||||||
await _runner.ContinueAsync(taskId, feedback, "queue", ct);
|
await _runner.ContinueAsync(taskId, feedback, "queue", ct, alreadyClaimed: true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
task.Description = string.IsNullOrWhiteSpace(task.Description)
|
task.Description = string.IsNullOrWhiteSpace(task.Description)
|
||||||
? $"Reviewer feedback: {feedback}"
|
? $"Reviewer feedback: {feedback}"
|
||||||
: $"{task.Description}\n\nReviewer feedback: {feedback}";
|
: $"{task.Description}\n\nReviewer feedback: {feedback}";
|
||||||
await _runner.RunAsync(task, "queue", ct);
|
await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear the consumed feedback only once the run reached a successful
|
// Clear the consumed feedback only once the run reached a successful
|
||||||
@@ -212,7 +212,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await _runner.RunAsync(task, "queue", ct);
|
await _runner.RunAsync(task, "queue", ct, alreadyClaimed: true);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ public sealed class TaskRunner
|
|||||||
_tokens = tokens;
|
_tokens = tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
|
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct, bool alreadyClaimed = false)
|
||||||
{
|
{
|
||||||
string? mcpToken = null;
|
string? mcpToken = null;
|
||||||
string? mcpConfigPath = null;
|
string? mcpConfigPath = null;
|
||||||
@@ -98,11 +98,16 @@ public sealed class TaskRunner
|
|||||||
}
|
}
|
||||||
|
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
var startResult = await _state.StartRunningAsync(task.Id, now, ct);
|
// The queue picker claims Queued→Running atomically (incl. StartedAt) before
|
||||||
if (!startResult.Ok)
|
// dispatching; only unclaimed dispatches (override slot) claim here.
|
||||||
|
if (!alreadyClaimed)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason);
|
var startResult = await _state.StartRunningAsync(task.Id, now, ct);
|
||||||
return;
|
if (!startResult.Ok)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", task.Id, startResult.Reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
await _broadcaster.TaskStarted(slot, task.Id, now);
|
await _broadcaster.TaskStarted(slot, task.Id, now);
|
||||||
|
|
||||||
@@ -167,7 +172,7 @@ public sealed class TaskRunner
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct)
|
public async Task ContinueAsync(string taskId, string followUpPrompt, string slot, CancellationToken ct, bool alreadyClaimed = false)
|
||||||
{
|
{
|
||||||
TaskEntity task;
|
TaskEntity task;
|
||||||
TaskRunEntity lastRun;
|
TaskRunEntity lastRun;
|
||||||
@@ -213,11 +218,15 @@ public sealed class TaskRunner
|
|||||||
}
|
}
|
||||||
|
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
var startResult = await _state.StartRunningAsync(taskId, now, ct);
|
// See RunAsync: queue dispatches arrive pre-claimed by the picker.
|
||||||
if (!startResult.Ok)
|
if (!alreadyClaimed)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason);
|
var startResult = await _state.StartRunningAsync(taskId, now, ct);
|
||||||
return;
|
if (!startResult.Ok)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Task {TaskId} skipped: StartRunningAsync rejected ({Reason})", taskId, startResult.Reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
await _broadcaster.TaskStarted(slot, taskId, now);
|
await _broadcaster.TaskStarted(slot, taskId, now);
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable
|
|||||||
NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
|
NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
|
||||||
|
|
||||||
using (var ctx = _db.CreateContext())
|
using (var ctx = _db.CreateContext())
|
||||||
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default);
|
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("p1"))!, "slot-1", default, alreadyClaimed: true);
|
||||||
|
|
||||||
using var verify = _db.CreateContext();
|
using var verify = _db.CreateContext();
|
||||||
var repo = new TaskRepository(verify);
|
var repo = new TaskRepository(verify);
|
||||||
@@ -74,7 +74,7 @@ public sealed class StandaloneChildrenRoutingTests : IDisposable
|
|||||||
new ClaudeArgsBuilder(), _cfg, NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
|
new ClaudeArgsBuilder(), _cfg, NullLogger<TaskRunner>.Instance, state, new TaskRunTokenRegistry());
|
||||||
|
|
||||||
using (var ctx = _db.CreateContext())
|
using (var ctx = _db.CreateContext())
|
||||||
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default);
|
await runner.RunAsync((await new TaskRepository(ctx).GetByIdAsync("solo"))!, "slot-1", default, alreadyClaimed: true);
|
||||||
|
|
||||||
using var verify = _db.CreateContext();
|
using var verify = _db.CreateContext();
|
||||||
Assert.Equal(TaskStatus.WaitingForReview, (await new TaskRepository(verify).GetByIdAsync("solo"))!.Status);
|
Assert.Equal(TaskStatus.WaitingForReview, (await new TaskRepository(verify).GetByIdAsync("solo"))!.Status);
|
||||||
|
|||||||
Reference in New Issue
Block a user