feat(worker): drop 'agent' tag gate from queue claim
Queueing a task is itself the explicit "run me" signal — the extra tag/list filter was redundant and surprised users whose queued tasks were silently skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -21,7 +21,7 @@ Worker/
|
|||||||
|
|
||||||
- **Program.cs** — loads config, inits schema, registers DI, configures SignalR on `/hub`, binds to `127.0.0.1:47821`
|
- **Program.cs** — loads config, inits schema, registers DI, configures SignalR on `/hub`, binds to `127.0.0.1:47821`
|
||||||
- **TaskStateService** — only component that writes `Status`, `PlanningPhase`, `BlockedByTaskId`. All transitions return a `TransitionResult` (no exceptions on invalid moves). Wakes the queue and broadcasts `TaskUpdated` automatically; advances the planning chain on child terminal transitions.
|
- **TaskStateService** — only component that writes `Status`, `PlanningPhase`, `BlockedByTaskId`. All transitions return a `TransitionResult` (no exceptions on invalid moves). Wakes the queue and broadcasts `TaskUpdated` automatically; advances the planning chain on child terminal transitions.
|
||||||
- **IQueueWaker / IQueuePicker / QueueService** — waker is a singleton `SemaphoreSlim`; picker performs the atomic `Queued → Running` claim filtered by `BlockedByTaskId IS NULL`, schedule, and the `agent` tag; QueueService is a thin `BackgroundService` that loops on the waker and dispatches via `TaskRunner`.
|
- **IQueueWaker / IQueuePicker / QueueService** — waker is a singleton `SemaphoreSlim`; picker performs the atomic `Queued → Running` claim filtered by `BlockedByTaskId IS NULL` and schedule; QueueService is a thin `BackgroundService` that loops on the waker and dispatches via `TaskRunner`.
|
||||||
- **OverrideSlotService** — owns `RunNow` / `ContinueTask`; goes through `TaskStateService.StartRunningAsync` (caller-driven, serialized by slot lock).
|
- **OverrideSlotService** — owns `RunNow` / `ContinueTask`; goes through `TaskStateService.StartRunningAsync` (caller-driven, serialized by slot lock).
|
||||||
- **StaleTaskRecovery** — startup-only service; calls `TaskStateService.RecoverStaleRunningAsync` to flip orphaned `Running` rows to `Failed`.
|
- **StaleTaskRecovery** — startup-only service; calls `TaskStateService.RecoverStaleRunningAsync` to flip orphaned `Running` rows to `Failed`.
|
||||||
- **External/ExternalMcpService** — always-on MCP tools for general Claude sessions: `ListTaskLists`, `ListTasks`, `GetTask`, `AddTask` (with tags), `UpdateTask`, `UpdateTaskStatus` (`Idle` / `Queued`), `SetTaskTags`, `ListTags`, `DeleteTask`, `RunTaskNow`, `CancelTask`. Auth via optional `X-ClaudeDo-Key` header.
|
- **External/ExternalMcpService** — always-on MCP tools for general Claude sessions: `ListTaskLists`, `ListTasks`, `GetTask`, `AddTask` (with tags), `UpdateTask`, `UpdateTaskStatus` (`Idle` / `Queued`), `SetTaskTags`, `ListTags`, `DeleteTask`, `RunTaskNow`, `CancelTask`. Auth via optional `X-ClaudeDo-Key` header.
|
||||||
|
|||||||
@@ -15,9 +15,8 @@ public sealed class QueuePicker : IQueuePicker
|
|||||||
{
|
{
|
||||||
// Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races.
|
// Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races.
|
||||||
// Raw SQL because EF cannot express UPDATE...RETURNING.
|
// Raw SQL because EF cannot express UPDATE...RETURNING.
|
||||||
// Eligible task must be Queued, unblocked, due (or unscheduled), and tagged 'agent'
|
// Eligible task must be Queued, unblocked, and due (or unscheduled).
|
||||||
// either directly or via its list. EF SQLite stores DateTime as
|
// EF SQLite stores DateTime as "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison.
|
||||||
// "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison.
|
|
||||||
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||||
var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
||||||
@@ -29,18 +28,6 @@ public sealed class QueuePicker : IQueuePicker
|
|||||||
WHERE t.status = 'queued'
|
WHERE t.status = 'queued'
|
||||||
AND t.blocked_by_task_id IS NULL
|
AND t.blocked_by_task_id IS NULL
|
||||||
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
||||||
AND (
|
|
||||||
EXISTS (
|
|
||||||
SELECT 1 FROM task_tags tt
|
|
||||||
JOIN tags tg ON tg.id = tt.tag_id
|
|
||||||
WHERE tt.task_id = t.id AND tg.name = 'agent'
|
|
||||||
)
|
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM list_tags lt
|
|
||||||
JOIN tags tg ON tg.id = lt.tag_id
|
|
||||||
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
ORDER BY t.sort_order ASC, t.created_at ASC
|
ORDER BY t.sort_order ASC, t.created_at ASC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -99,13 +99,15 @@ public sealed class QueuePickerTests : IDisposable
|
|||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task ClaimNextAsync_Skips_TasksWithoutAgentTag()
|
public async Task ClaimNextAsync_Picks_TasksWithoutAgentTag()
|
||||||
{
|
{
|
||||||
|
// Queueing a task is itself the explicit "run me" signal — no tag gate.
|
||||||
var listId = await CreateListAsync(listAgentTag: false);
|
var listId = await CreateListAsync(listAgentTag: false);
|
||||||
await SeedAsync(listId);
|
var task = await SeedAsync(listId);
|
||||||
|
|
||||||
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
var picked = await _picker.ClaimNextAsync(DateTime.UtcNow, CancellationToken.None);
|
||||||
Assert.Null(picked);
|
Assert.NotNull(picked);
|
||||||
|
Assert.Equal(task.Id, picked!.Id);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
Reference in New Issue
Block a user