feat(worker): configurable max parallel task executions
Add a "Max parallel executions" setting to the General settings tab so the queue can run more than one task concurrently. QueueService now tracks multiple active slots and reads the limit from app settings each cycle, so changes take effect without restarting the worker. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -18,7 +18,7 @@ public sealed class QueueService : BackgroundService
|
||||
private readonly OverrideSlotService _override;
|
||||
|
||||
private readonly object _lock = new();
|
||||
private volatile QueueSlotState? _queueSlot;
|
||||
private readonly Dictionary<string, QueueSlotState> _queueSlots = new();
|
||||
|
||||
public QueueService(
|
||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||
@@ -41,8 +41,11 @@ public sealed class QueueService : BackgroundService
|
||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||
{
|
||||
var list = new List<(string, string, DateTime)>();
|
||||
var q = _queueSlot;
|
||||
if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt));
|
||||
lock (_lock)
|
||||
{
|
||||
foreach (var slot in _queueSlots.Values)
|
||||
list.Add(("queue", slot.TaskId, slot.StartedAt));
|
||||
}
|
||||
var o = _override.CurrentSlot;
|
||||
if (o is not null) list.Add(("override", o.TaskId, o.StartedAt));
|
||||
return list;
|
||||
@@ -64,7 +67,7 @@ public sealed class QueueService : BackgroundService
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_queueSlot?.TaskId == taskId)
|
||||
if (_queueSlots.ContainsKey(taskId))
|
||||
throw new InvalidOperationException("task is already running in queue slot");
|
||||
}
|
||||
}
|
||||
@@ -75,9 +78,9 @@ public sealed class QueueService : BackgroundService
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
if (_queueSlot is not null && _queueSlot.TaskId == taskId)
|
||||
if (_queueSlots.TryGetValue(taskId, out var slot))
|
||||
{
|
||||
_queueSlot.Cts.Cancel();
|
||||
slot.Cts.Cancel();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -100,26 +103,33 @@ public sealed class QueueService : BackgroundService
|
||||
|
||||
await Task.WhenAny(wakeTask, timerTask);
|
||||
|
||||
if (_queueSlot is not null) continue;
|
||||
var maxParallel = await GetMaxParallelAsync(stoppingToken);
|
||||
|
||||
var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken);
|
||||
if (task is null) continue;
|
||||
|
||||
lock (_lock)
|
||||
// Fill as many free slots as the limit allows.
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
if (_queueSlot is not null) continue;
|
||||
|
||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||
|
||||
_ = RunInSlotAsync(task.Id, cts.Token).ContinueWith(t =>
|
||||
lock (_lock)
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||
lock (_lock) { _queueSlot = null; }
|
||||
cts.Dispose();
|
||||
_waker.Wake(); // Check for next task immediately.
|
||||
}, TaskScheduler.Default);
|
||||
if (_queueSlots.Count >= maxParallel) break;
|
||||
}
|
||||
|
||||
var task = await _picker.ClaimNextAsync(DateTime.UtcNow, stoppingToken);
|
||||
if (task is null) break;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||
_queueSlots[task.Id] = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||
|
||||
_ = RunInSlotAsync(task.Id, cts.Token).ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||
lock (_lock) { _queueSlots.Remove(task.Id); }
|
||||
cts.Dispose();
|
||||
_waker.Wake(); // Check for next task immediately.
|
||||
}, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
@@ -135,6 +145,21 @@ public sealed class QueueService : BackgroundService
|
||||
_logger.LogInformation("QueueService stopping");
|
||||
}
|
||||
|
||||
private async Task<int> GetMaxParallelAsync(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var context = _dbFactory.CreateDbContext();
|
||||
var settings = await new AppSettingsRepository(context).GetAsync(ct);
|
||||
return Math.Max(1, settings.MaxParallelExecutions);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to read max parallel executions; defaulting to 1");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunInSlotAsync(string taskId, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
|
||||
Reference in New Issue
Block a user