Queueing a task is itself the explicit "run me" signal — the extra tag/list filter was redundant and surprised users whose queued tasks were silently skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
40 lines
1.6 KiB
C#
40 lines
1.6 KiB
C#
using ClaudeDo.Data;
|
|
using ClaudeDo.Data.Models;
|
|
using Microsoft.EntityFrameworkCore;
|
|
|
|
namespace ClaudeDo.Worker.Queue;
|
|
|
|
public sealed class QueuePicker : IQueuePicker
|
|
{
|
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
|
|
|
public QueuePicker(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
|
=> _dbFactory = dbFactory;
|
|
|
|
public async Task<TaskEntity?> ClaimNextAsync(DateTime now, CancellationToken ct)
|
|
{
|
|
// Atomic queue claim: UPDATE + RETURNING in a single statement prevents TOCTOU races.
|
|
// Raw SQL because EF cannot express UPDATE...RETURNING.
|
|
// Eligible task must be Queued, unblocked, and due (or unscheduled).
|
|
// EF SQLite stores DateTime as "yyyy-MM-dd HH:mm:ss.fffffff" — same format used here for comparison.
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var nowStr = now.ToUniversalTime().ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
|
var startedAtStr = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fffffff");
|
|
|
|
var rows = await ctx.Tasks.FromSqlRaw("""
|
|
UPDATE tasks SET status = 'running', started_at = {1}
|
|
WHERE id = (
|
|
SELECT t.id FROM tasks t
|
|
WHERE t.status = 'queued'
|
|
AND t.blocked_by_task_id IS NULL
|
|
AND (t.scheduled_for IS NULL OR t.scheduled_for <= {0})
|
|
ORDER BY t.sort_order ASC, t.created_at ASC
|
|
LIMIT 1
|
|
)
|
|
RETURNING *
|
|
""", nowStr, startedAtStr).ToListAsync(ct);
|
|
|
|
return rows.FirstOrDefault();
|
|
}
|
|
}
|