refactor(worker): extract OverrideSlotService and reorganize Worker/Services into domain folders
Slice 5 of the worker state consolidation refactor.
OverrideSlotService (new in Worker/Queue/) owns RunNow, ContinueTask,
and the override-slot piece of CancelTask. QueueService keeps the
queue-slot guard for "task is already running" rejection and delegates
to OverrideSlotService for execution; CancelTask tries the override
slot first, then the queue slot. QueueSlotState is extracted to its own
file.
Folder reorg (via git mv to preserve history):
- Worker/Queue/ QueueService, OverrideSlotService, QueueSlotState
(alongside existing waker/picker)
- Worker/Lifecycle/ StaleTaskRecovery, TaskResetService, TaskMergeService
- Worker/Worktrees/ WorktreeMaintenanceService
- Worker/Agents/ AgentFileService, DefaultAgentSeeder
Worker/Services/ folder removed. All consumers updated to the new
namespaces (Program.cs, WorkerHub, ExternalMcpService,
PlanningMergeOrchestrator, all Worker tests).
OverrideSlotService is registered as a DI singleton in both the main
worker app and the external MCP app.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Agents;
|
||||||
|
|
||||||
public sealed class AgentFileService
|
public sealed class AgentFileService
|
||||||
{
|
{
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Agents;
|
||||||
|
|
||||||
public sealed record SeedResult(int Copied, int Skipped);
|
public sealed record SeedResult(int Copied, int Skipped);
|
||||||
|
|
||||||
@@ -2,7 +2,7 @@ using System.ComponentModel;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.State;
|
using ClaudeDo.Worker.State;
|
||||||
using ModelContextProtocol.Server;
|
using ModelContextProtocol.Server;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|||||||
@@ -2,9 +2,11 @@ using System.Reflection;
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Agents;
|
||||||
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
using ClaudeDo.Worker.Queue;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Worktrees;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
using ClaudeDo.Worker.State;
|
using ClaudeDo.Worker.State;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Lifecycle;
|
||||||
|
|
||||||
public sealed class StaleTaskRecovery : IHostedService
|
public sealed class StaleTaskRecovery : IHostedService
|
||||||
{
|
{
|
||||||
@@ -6,7 +6,7 @@ using ClaudeDo.Worker.Hub;
|
|||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Lifecycle;
|
||||||
|
|
||||||
public sealed record MergeResult(
|
public sealed record MergeResult(
|
||||||
string Status,
|
string Status,
|
||||||
@@ -7,7 +7,7 @@ using ClaudeDo.Worker.State;
|
|||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Lifecycle;
|
||||||
|
|
||||||
public sealed class TaskResetService
|
public sealed class TaskResetService
|
||||||
{
|
{
|
||||||
@@ -3,7 +3,7 @@ using ClaudeDo.Data;
|
|||||||
using ClaudeDo.Data.Git;
|
using ClaudeDo.Data.Git;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,16 @@
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Data.Git;
|
using ClaudeDo.Data.Git;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Agents;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.External;
|
using ClaudeDo.Worker.External;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
using ClaudeDo.Worker.Queue;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
|
||||||
using ClaudeDo.Worker.State;
|
using ClaudeDo.Worker.State;
|
||||||
|
using ClaudeDo.Worker.Worktrees;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
var cfg = WorkerConfig.Load();
|
var cfg = WorkerConfig.Load();
|
||||||
@@ -69,6 +71,9 @@ builder.Services.AddSingleton(sp => new DefaultAgentSeeder(
|
|||||||
agentsDir,
|
agentsDir,
|
||||||
sp.GetService<Microsoft.Extensions.Logging.ILogger<DefaultAgentSeeder>>()));
|
sp.GetService<Microsoft.Extensions.Logging.ILogger<DefaultAgentSeeder>>()));
|
||||||
|
|
||||||
|
// Override slot owns RunNow / ContinueTask. Queue slot is the BackgroundService.
|
||||||
|
builder.Services.AddSingleton<OverrideSlotService>();
|
||||||
|
|
||||||
// QueueService: singleton + hosted service (same instance).
|
// QueueService: singleton + hosted service (same instance).
|
||||||
builder.Services.AddSingleton<QueueService>();
|
builder.Services.AddSingleton<QueueService>();
|
||||||
builder.Services.AddHostedService(sp => sp.GetRequiredService<QueueService>());
|
builder.Services.AddHostedService(sp => sp.GetRequiredService<QueueService>());
|
||||||
@@ -141,6 +146,7 @@ if (cfg.ExternalMcpPort > 0)
|
|||||||
externalBuilder.Services.AddSingleton(cfg);
|
externalBuilder.Services.AddSingleton(cfg);
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<HubBroadcaster>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<HubBroadcaster>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
||||||
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<OverrideSlotService>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IQueueWaker>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IQueueWaker>());
|
||||||
|
|||||||
134
src/ClaudeDo.Worker/Queue/OverrideSlotService.cs
Normal file
134
src/ClaudeDo.Worker/Queue/OverrideSlotService.cs
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Runner;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
public sealed class OverrideSlotService
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly TaskRunner _runner;
|
||||||
|
private readonly ILogger<OverrideSlotService> _logger;
|
||||||
|
|
||||||
|
private readonly object _lock = new();
|
||||||
|
private volatile QueueSlotState? _slot;
|
||||||
|
|
||||||
|
public OverrideSlotService(
|
||||||
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
TaskRunner runner,
|
||||||
|
ILogger<OverrideSlotService> logger)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_runner = runner;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueSlotState? CurrentSlot => _slot;
|
||||||
|
|
||||||
|
public async Task RunNow(string taskId)
|
||||||
|
{
|
||||||
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
var taskRepo = new TaskRepository(context);
|
||||||
|
var exists = await taskRepo.GetByIdAsync(taskId);
|
||||||
|
if (exists is null)
|
||||||
|
throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (_slot is not null)
|
||||||
|
throw new InvalidOperationException("override slot busy");
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
_slot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
|
_ = RunInSlotAsync(taskId, cts.Token).ContinueWith(t =>
|
||||||
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId}", taskId);
|
||||||
|
lock (_lock) { _slot = null; }
|
||||||
|
cts.Dispose();
|
||||||
|
}, TaskScheduler.Default);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<string> ContinueTask(string taskId, string followUpPrompt)
|
||||||
|
{
|
||||||
|
using var context = _dbFactory.CreateDbContext();
|
||||||
|
var taskRepo = new TaskRepository(context);
|
||||||
|
var task = await taskRepo.GetByIdAsync(taskId)
|
||||||
|
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
|
|
||||||
|
if (task.Status == Data.Models.TaskStatus.Running)
|
||||||
|
throw new InvalidOperationException("task is already running");
|
||||||
|
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (_slot is not null)
|
||||||
|
throw new InvalidOperationException("override slot busy");
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
_slot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
|
_ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(t =>
|
||||||
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
_logger.LogError(t.Exception, "RunContinueInSlotAsync failed for task {TaskId}", taskId);
|
||||||
|
lock (_lock) { _slot = null; }
|
||||||
|
cts.Dispose();
|
||||||
|
}, TaskScheduler.Default);
|
||||||
|
}
|
||||||
|
|
||||||
|
return taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryCancel(string taskId)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (_slot is not null && _slot.TaskId == taskId)
|
||||||
|
{
|
||||||
|
_slot.Cts.Cancel();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task RunInSlotAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Starting task {TaskId} in override slot", taskId);
|
||||||
|
|
||||||
|
Data.Models.TaskEntity task;
|
||||||
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
var taskRepo = new TaskRepository(context);
|
||||||
|
task = await taskRepo.GetByIdAsync(taskId, ct)
|
||||||
|
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await _runner.RunAsync(task, "override", ct);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Override slot runner error for task {TaskId}", taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task RunContinueInSlotAsync(string taskId, string followUpPrompt, CancellationToken ct)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Continuing task {TaskId} in override slot", taskId);
|
||||||
|
await _runner.ContinueAsync(taskId, followUpPrompt, "override", ct);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Continue runner error for task {TaskId}", taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,18 +2,10 @@ using ClaudeDo.Data;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Queue;
|
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
public sealed class QueueSlotState
|
|
||||||
{
|
|
||||||
public required string TaskId { get; init; }
|
|
||||||
public required DateTime StartedAt { get; init; }
|
|
||||||
public required CancellationTokenSource Cts { get; init; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public sealed class QueueService : BackgroundService
|
public sealed class QueueService : BackgroundService
|
||||||
{
|
{
|
||||||
@@ -23,10 +15,10 @@ public sealed class QueueService : BackgroundService
|
|||||||
private readonly ILogger<QueueService> _logger;
|
private readonly ILogger<QueueService> _logger;
|
||||||
private readonly QueueWaker _waker;
|
private readonly QueueWaker _waker;
|
||||||
private readonly IQueuePicker _picker;
|
private readonly IQueuePicker _picker;
|
||||||
|
private readonly OverrideSlotService _override;
|
||||||
|
|
||||||
private readonly object _lock = new();
|
private readonly object _lock = new();
|
||||||
private volatile QueueSlotState? _queueSlot;
|
private volatile QueueSlotState? _queueSlot;
|
||||||
private volatile QueueSlotState? _overrideSlot;
|
|
||||||
|
|
||||||
public QueueService(
|
public QueueService(
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
@@ -34,7 +26,8 @@ public sealed class QueueService : BackgroundService
|
|||||||
WorkerConfig cfg,
|
WorkerConfig cfg,
|
||||||
ILogger<QueueService> logger,
|
ILogger<QueueService> logger,
|
||||||
QueueWaker waker,
|
QueueWaker waker,
|
||||||
IQueuePicker picker)
|
IQueuePicker picker,
|
||||||
|
OverrideSlotService overrideSlot)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_runner = runner;
|
_runner = runner;
|
||||||
@@ -42,6 +35,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
_waker = waker;
|
_waker = waker;
|
||||||
_picker = picker;
|
_picker = picker;
|
||||||
|
_override = overrideSlot;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||||
@@ -49,75 +43,36 @@ public sealed class QueueService : BackgroundService
|
|||||||
var list = new List<(string, string, DateTime)>();
|
var list = new List<(string, string, DateTime)>();
|
||||||
var q = _queueSlot;
|
var q = _queueSlot;
|
||||||
if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt));
|
if (q is not null) list.Add(("queue", q.TaskId, q.StartedAt));
|
||||||
var o = _overrideSlot;
|
var o = _override.CurrentSlot;
|
||||||
if (o is not null) list.Add(("override", o.TaskId, o.StartedAt));
|
if (o is not null) list.Add(("override", o.TaskId, o.StartedAt));
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task RunNow(string taskId)
|
public Task RunNow(string taskId)
|
||||||
{
|
{
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
EnsureNotInQueueSlot(taskId);
|
||||||
{
|
return _override.RunNow(taskId);
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
var exists = await taskRepo.GetByIdAsync(taskId);
|
|
||||||
if (exists is null)
|
|
||||||
throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task<string> ContinueTask(string taskId, string followUpPrompt)
|
||||||
|
{
|
||||||
|
EnsureNotInQueueSlot(taskId);
|
||||||
|
return _override.ContinueTask(taskId, followUpPrompt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void EnsureNotInQueueSlot(string taskId)
|
||||||
|
{
|
||||||
lock (_lock)
|
lock (_lock)
|
||||||
{
|
{
|
||||||
if (_queueSlot?.TaskId == taskId)
|
if (_queueSlot?.TaskId == taskId)
|
||||||
throw new InvalidOperationException("task is already running in queue slot");
|
throw new InvalidOperationException("task is already running in queue slot");
|
||||||
if (_overrideSlot is not null)
|
|
||||||
throw new InvalidOperationException("override slot busy");
|
|
||||||
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
|
||||||
|
|
||||||
_ = RunInSlotAsync(taskId, "override", cts.Token).ContinueWith(t =>
|
|
||||||
{
|
|
||||||
if (t.IsFaulted)
|
|
||||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId}", taskId);
|
|
||||||
lock (_lock) { _overrideSlot = null; }
|
|
||||||
cts.Dispose();
|
|
||||||
}, TaskScheduler.Default);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<string> ContinueTask(string taskId, string followUpPrompt)
|
|
||||||
{
|
|
||||||
using var context = _dbFactory.CreateDbContext();
|
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
var task = await taskRepo.GetByIdAsync(taskId)
|
|
||||||
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
|
||||||
|
|
||||||
if (task.Status == Data.Models.TaskStatus.Running)
|
|
||||||
throw new InvalidOperationException("task is already running");
|
|
||||||
|
|
||||||
lock (_lock)
|
|
||||||
{
|
|
||||||
if (_queueSlot?.TaskId == taskId)
|
|
||||||
throw new InvalidOperationException("task is already running in queue slot");
|
|
||||||
if (_overrideSlot is not null)
|
|
||||||
throw new InvalidOperationException("override slot busy");
|
|
||||||
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
|
||||||
|
|
||||||
_ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(t =>
|
|
||||||
{
|
|
||||||
if (t.IsFaulted)
|
|
||||||
_logger.LogError(t.Exception, "RunContinueInSlotAsync failed for task {TaskId}", taskId);
|
|
||||||
lock (_lock) { _overrideSlot = null; }
|
|
||||||
cts.Dispose();
|
|
||||||
}, TaskScheduler.Default);
|
|
||||||
}
|
|
||||||
|
|
||||||
return taskId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool CancelTask(string taskId)
|
public bool CancelTask(string taskId)
|
||||||
{
|
{
|
||||||
|
if (_override.TryCancel(taskId)) return true;
|
||||||
|
|
||||||
lock (_lock)
|
lock (_lock)
|
||||||
{
|
{
|
||||||
if (_queueSlot is not null && _queueSlot.TaskId == taskId)
|
if (_queueSlot is not null && _queueSlot.TaskId == taskId)
|
||||||
@@ -125,11 +80,6 @@ public sealed class QueueService : BackgroundService
|
|||||||
_queueSlot.Cts.Cancel();
|
_queueSlot.Cts.Cancel();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (_overrideSlot is not null && _overrideSlot.TaskId == taskId)
|
|
||||||
{
|
|
||||||
_overrideSlot.Cts.Cancel();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -162,7 +112,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||||
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
_ = RunInSlotAsync(task.Id, "queue", cts.Token).ContinueWith(t =>
|
_ = RunInSlotAsync(task.Id, cts.Token).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
if (t.IsFaulted)
|
if (t.IsFaulted)
|
||||||
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||||
@@ -185,11 +135,11 @@ public sealed class QueueService : BackgroundService
|
|||||||
_logger.LogInformation("QueueService stopping");
|
_logger.LogInformation("QueueService stopping");
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunInSlotAsync(string taskId, string slot, CancellationToken ct)
|
private async Task RunInSlotAsync(string taskId, CancellationToken ct)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Starting task {TaskId} in {Slot} slot", taskId, slot);
|
_logger.LogInformation("Starting task {TaskId} in queue slot", taskId);
|
||||||
|
|
||||||
TaskEntity task;
|
TaskEntity task;
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
@@ -199,24 +149,11 @@ public sealed class QueueService : BackgroundService
|
|||||||
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
await _runner.RunAsync(task, slot, ct);
|
await _runner.RunAsync(task, "queue", ct);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "Slot runner error for task {TaskId}", taskId);
|
_logger.LogError(ex, "Slot runner error for task {TaskId}", taskId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunContinueInSlotAsync(string taskId, string followUpPrompt, CancellationToken ct)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Continuing task {TaskId} in override slot", taskId);
|
|
||||||
await _runner.ContinueAsync(taskId, followUpPrompt, "override", ct);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Continue runner error for task {TaskId}", taskId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
8
src/ClaudeDo.Worker/Queue/QueueSlotState.cs
Normal file
8
src/ClaudeDo.Worker/Queue/QueueSlotState.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace ClaudeDo.Worker.Queue;
|
||||||
|
|
||||||
|
public sealed class QueueSlotState
|
||||||
|
{
|
||||||
|
public required string TaskId { get; init; }
|
||||||
|
public required DateTime StartedAt { get; init; }
|
||||||
|
public required CancellationTokenSource Cts { get; init; }
|
||||||
|
}
|
||||||
@@ -3,7 +3,7 @@ using ClaudeDo.Data.Git;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Worktrees;
|
||||||
|
|
||||||
public sealed class WorktreeMaintenanceService
|
public sealed class WorktreeMaintenanceService
|
||||||
{
|
{
|
||||||
@@ -5,8 +5,8 @@ using ClaudeDo.Data.Repositories;
|
|||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.External;
|
using ClaudeDo.Worker.External;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using ClaudeDo.Worker.Tests.Services;
|
using ClaudeDo.Worker.Tests.Services;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
@@ -117,7 +117,8 @@ public sealed class ExternalMcpServiceTests : IDisposable
|
|||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
var waker = new ClaudeDo.Worker.Queue.QueueWaker();
|
var waker = new ClaudeDo.Worker.Queue.QueueWaker();
|
||||||
var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory);
|
var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory);
|
||||||
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance, waker, picker);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
|
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance, waker, picker, overrideSlot);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ public sealed class AgentSettingsHubTests : IDisposable
|
|||||||
Directory.CreateDirectory(targetDir);
|
Directory.CreateDirectory(targetDir);
|
||||||
await File.WriteAllTextAsync(Path.Combine(bundleDir, "code-reviewer.md"), "body");
|
await File.WriteAllTextAsync(Path.Combine(bundleDir, "code-reviewer.md"), "body");
|
||||||
|
|
||||||
var seeder = new ClaudeDo.Worker.Services.DefaultAgentSeeder(bundleDir, targetDir);
|
var seeder = new ClaudeDo.Worker.Agents.DefaultAgentSeeder(bundleDir, targetDir);
|
||||||
var result = await seeder.SeedMissingAsync();
|
var result = await seeder.SeedMissingAsync();
|
||||||
|
|
||||||
Assert.Equal(1, result.Copied);
|
Assert.Equal(1, result.Copied);
|
||||||
|
|||||||
@@ -4,8 +4,11 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Agents;
|
||||||
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Queue;
|
||||||
|
using ClaudeDo.Worker.Worktrees;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Xunit;
|
using Xunit;
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ using ClaudeDo.Data;
|
|||||||
using ClaudeDo.Data.Git;
|
using ClaudeDo.Data.Git;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
using ClaudeDo.Worker.Services;
|
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Agents;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Tests.Services;
|
namespace ClaudeDo.Worker.Tests.Services;
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Agents;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Tests.Services;
|
namespace ClaudeDo.Worker.Tests.Services;
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ using ClaudeDo.Worker.Config;
|
|||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Queue;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
@@ -60,7 +59,8 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
_waker = new QueueWaker();
|
_waker = new QueueWaker();
|
||||||
var picker = new QueuePicker(dbFactory);
|
var picker = new QueuePicker(dbFactory);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ using ClaudeDo.Worker.Config;
|
|||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Queue;
|
using ClaudeDo.Worker.Queue;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
@@ -61,7 +60,8 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
_waker = new QueueWaker();
|
_waker = new QueueWaker();
|
||||||
var picker = new QueuePicker(dbFactory);
|
var picker = new QueuePicker(dbFactory);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ using ClaudeDo.Data.Repositories;
|
|||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using ClaudeDo.Data.Git;
|
using ClaudeDo.Data.Git;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Worktrees;
|
||||||
using ClaudeDo.Worker.Tests.Infrastructure;
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user