refactor(worker/state): introduce TaskStateService and route mutations through it
Slice 2 of the worker state consolidation refactor (spec sections 2 and 8). Adds Worker/State/ITaskStateService + TaskStateService as the single component that mutates Status, PlanningPhase, and BlockedByTaskId. Each transition is one atomic ExecuteUpdate with a WHERE filter on the expected source status, so parallel claims are TOCTOU-free. Side effects (queue wake on -> Queued, hub TaskUpdated broadcast, chain advance + parent completion on terminal child) are owned by the service so callers no longer need to remember them. Migrated callers (mechanical, behavior preserved): - TaskRunner: HandleSuccess/HandleFailure/MarkFailed/RunAsync/ContinueAsync - StaleTaskRecovery: bulk recover stale Running tasks - TaskResetService: status flip (worktree cleanup stays in service) - PlanningSessionManager.StartAsync: status flip via state, token write via repo - PlanningChainCoordinator.OnChildFinishedAsync: routes the next-sibling write through state.UnblockAsync (Slice 4 finishes the rewrite) - ExternalMcpService.UpdateTaskStatus: Queued case via state.EnqueueAsync Repo Mark*Async helpers (MarkRunning/MarkDone/MarkFailed/FlipAllRunningToFailed) are now internal; ClaudeDo.Data grants InternalsVisibleTo to ClaudeDo.Worker and ClaudeDo.Worker.Tests for the existing repo-level tests. DI: TaskStateService is registered as Singleton in both the main app and the external-MCP app; the queue-wake delegate captures sp -> QueueService.WakeQueue to break the TaskStateService -> QueueService -> TaskRunner -> TaskStateService construction cycle. PlanningChainCoordinator takes Func<ITaskStateService> for the same reason; Slice 3 will replace both with IQueueWaker. Tests: TaskStateServiceTests covers happy + reject for every transition, the parallel StartRunningAsync claim race, child-terminal chain advancement, and stale recovery. Existing service/repo tests are updated to construct the new state-service via a TaskStateServiceBuilder helper. Pre-existing constructor drift in QueueService/ExternalMcp/PlanningHub tests is patched to keep the test project building (the surrounding test logic is otherwise untouched). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -14,4 +14,9 @@
|
|||||||
</PackageReference>
|
</PackageReference>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<InternalsVisibleTo Include="ClaudeDo.Worker" />
|
||||||
|
<InternalsVisibleTo Include="ClaudeDo.Worker.Tests" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ public sealed class TaskRepository
|
|||||||
|
|
||||||
#region Status transitions
|
#region Status transitions
|
||||||
|
|
||||||
public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default)
|
internal async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
await _context.Tasks
|
await _context.Tasks
|
||||||
.Where(t => t.Id == taskId)
|
.Where(t => t.Id == taskId)
|
||||||
@@ -97,7 +97,7 @@ public sealed class TaskRepository
|
|||||||
.SetProperty(t => t.StartedAt, startedAt), ct);
|
.SetProperty(t => t.StartedAt, startedAt), ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
|
internal async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
await _context.Tasks
|
await _context.Tasks
|
||||||
.Where(t => t.Id == taskId)
|
.Where(t => t.Id == taskId)
|
||||||
@@ -107,7 +107,7 @@ public sealed class TaskRepository
|
|||||||
.SetProperty(t => t.Result, result), ct);
|
.SetProperty(t => t.Result, result), ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
|
internal async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
await _context.Tasks
|
await _context.Tasks
|
||||||
.Where(t => t.Id == taskId)
|
.Where(t => t.Id == taskId)
|
||||||
@@ -124,7 +124,7 @@ public sealed class TaskRepository
|
|||||||
.ExecuteUpdateAsync(s => s.SetProperty(t => t.LogPath, logPath), ct);
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.LogPath, logPath), ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<int> FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default)
|
internal async Task<int> FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var resultText = "[stale] " + reason;
|
var resultText = "[stale] " + reason;
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
@@ -364,6 +364,17 @@ public sealed class TaskRepository
|
|||||||
return await _context.Tasks.AsNoTracking().FirstOrDefaultAsync(t => t.Id == taskId, ct);
|
return await _context.Tasks.AsNoTracking().FirstOrDefaultAsync(t => t.Id == taskId, ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task SetPlanningSessionTokenAsync(
|
||||||
|
string taskId,
|
||||||
|
string sessionToken,
|
||||||
|
CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
await _context.Tasks
|
||||||
|
.Where(t => t.Id == taskId)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.PlanningSessionToken, sessionToken), ct);
|
||||||
|
}
|
||||||
|
|
||||||
public async Task UpdatePlanningSessionIdAsync(
|
public async Task UpdatePlanningSessionIdAsync(
|
||||||
string parentId,
|
string parentId,
|
||||||
string sessionId,
|
string sessionId,
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using ModelContextProtocol.Server;
|
using ModelContextProtocol.Server;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -32,19 +33,22 @@ public sealed class ExternalMcpService
|
|||||||
private readonly QueueService _queue;
|
private readonly QueueService _queue;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
private readonly TagRepository _tags;
|
private readonly TagRepository _tags;
|
||||||
|
private readonly ITaskStateService _state;
|
||||||
|
|
||||||
public ExternalMcpService(
|
public ExternalMcpService(
|
||||||
TaskRepository tasks,
|
TaskRepository tasks,
|
||||||
ListRepository lists,
|
ListRepository lists,
|
||||||
QueueService queue,
|
QueueService queue,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
TagRepository tags)
|
TagRepository tags,
|
||||||
|
ITaskStateService state)
|
||||||
{
|
{
|
||||||
_tasks = tasks;
|
_tasks = tasks;
|
||||||
_lists = lists;
|
_lists = lists;
|
||||||
_queue = queue;
|
_queue = queue;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
_tags = tags;
|
_tags = tags;
|
||||||
|
_state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
[McpServerTool, Description("List all task lists available in ClaudeDo.")]
|
[McpServerTool, Description("List all task lists available in ClaudeDo.")]
|
||||||
@@ -173,14 +177,13 @@ public sealed class ExternalMcpService
|
|||||||
{
|
{
|
||||||
case TaskStatus.Manual:
|
case TaskStatus.Manual:
|
||||||
await _tasks.ResetToManualAsync(taskId, cancellationToken);
|
await _tasks.ResetToManualAsync(taskId, cancellationToken);
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TaskStatus.Queued:
|
case TaskStatus.Queued:
|
||||||
if (task.Status is TaskStatus.Running)
|
var enqueueResult = await _state.EnqueueAsync(taskId, cancellationToken);
|
||||||
throw new InvalidOperationException("Cannot enqueue a running task.");
|
if (!enqueueResult.Ok)
|
||||||
task.Status = TaskStatus.Queued;
|
throw new InvalidOperationException(enqueueResult.Reason ?? "Cannot enqueue task.");
|
||||||
await _tasks.UpdateAsync(task, cancellationToken);
|
|
||||||
_queue.WakeQueue();
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@@ -189,7 +192,6 @@ public sealed class ExternalMcpService
|
|||||||
}
|
}
|
||||||
|
|
||||||
var reload = (await _tasks.GetByIdAsync(taskId, cancellationToken))!;
|
var reload = (await _tasks.GetByIdAsync(taskId, cancellationToken))!;
|
||||||
await _broadcaster.TaskUpdated(taskId);
|
|
||||||
return ToDto(reload);
|
return ToDto(reload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -8,9 +9,15 @@ namespace ClaudeDo.Worker.Planning;
|
|||||||
public sealed class PlanningChainCoordinator
|
public sealed class PlanningChainCoordinator
|
||||||
{
|
{
|
||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly Func<ITaskStateService> _state;
|
||||||
|
|
||||||
public PlanningChainCoordinator(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
public PlanningChainCoordinator(
|
||||||
=> _dbFactory = dbFactory;
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
Func<ITaskStateService> state)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_state = state;
|
||||||
|
}
|
||||||
|
|
||||||
public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default)
|
public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
@@ -56,6 +63,7 @@ public sealed class PlanningChainCoordinator
|
|||||||
if (child?.ParentTaskId is null) return null;
|
if (child?.ParentTaskId is null) return null;
|
||||||
|
|
||||||
var next = await ctx.Tasks
|
var next = await ctx.Tasks
|
||||||
|
.AsNoTracking()
|
||||||
.Where(t => t.ParentTaskId == child.ParentTaskId
|
.Where(t => t.ParentTaskId == child.ParentTaskId
|
||||||
&& t.SortOrder > child.SortOrder
|
&& t.SortOrder > child.SortOrder
|
||||||
&& t.Status == TaskStatus.Waiting)
|
&& t.Status == TaskStatus.Waiting)
|
||||||
@@ -63,8 +71,7 @@ public sealed class PlanningChainCoordinator
|
|||||||
.FirstOrDefaultAsync(ct);
|
.FirstOrDefaultAsync(ct);
|
||||||
if (next is null) return null;
|
if (next is null) return null;
|
||||||
|
|
||||||
next.Status = TaskStatus.Queued;
|
await _state().UnblockAsync(next.Id, ct);
|
||||||
await ctx.SaveChangesAsync(ct);
|
|
||||||
return next.Id;
|
return next.Id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ using ClaudeDo.Data.Git;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -22,17 +23,20 @@ public sealed class PlanningSessionManager
|
|||||||
private readonly GitService _git;
|
private readonly GitService _git;
|
||||||
private readonly WorkerConfig _cfg;
|
private readonly WorkerConfig _cfg;
|
||||||
private readonly string _rootDirectory;
|
private readonly string _rootDirectory;
|
||||||
|
private readonly ITaskStateService? _state;
|
||||||
|
|
||||||
// DI constructor.
|
// DI constructor.
|
||||||
public PlanningSessionManager(
|
public PlanningSessionManager(
|
||||||
IDbContextFactory<ClaudeDoDbContext> factory,
|
IDbContextFactory<ClaudeDoDbContext> factory,
|
||||||
GitService git,
|
GitService git,
|
||||||
WorkerConfig cfg,
|
WorkerConfig cfg,
|
||||||
|
ITaskStateService state,
|
||||||
string rootDirectory)
|
string rootDirectory)
|
||||||
{
|
{
|
||||||
_factory = factory;
|
_factory = factory;
|
||||||
_git = git;
|
_git = git;
|
||||||
_cfg = cfg;
|
_cfg = cfg;
|
||||||
|
_state = state;
|
||||||
_rootDirectory = rootDirectory;
|
_rootDirectory = rootDirectory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -43,13 +47,15 @@ public sealed class PlanningSessionManager
|
|||||||
AppSettingsRepository settings,
|
AppSettingsRepository settings,
|
||||||
GitService git,
|
GitService git,
|
||||||
WorkerConfig cfg,
|
WorkerConfig cfg,
|
||||||
string rootDirectory)
|
string rootDirectory,
|
||||||
|
ITaskStateService? state = null)
|
||||||
{
|
{
|
||||||
_tasksOverride = tasks;
|
_tasksOverride = tasks;
|
||||||
_listsOverride = lists;
|
_listsOverride = lists;
|
||||||
_settingsOverride = settings;
|
_settingsOverride = settings;
|
||||||
_git = git;
|
_git = git;
|
||||||
_cfg = cfg;
|
_cfg = cfg;
|
||||||
|
_state = state;
|
||||||
_rootDirectory = rootDirectory;
|
_rootDirectory = rootDirectory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,8 +120,19 @@ public sealed class PlanningSessionManager
|
|||||||
|
|
||||||
// Session dir + token + prompt files.
|
// Session dir + token + prompt files.
|
||||||
var token = GenerateToken();
|
var token = GenerateToken();
|
||||||
var started = await tasks.SetPlanningStartedAsync(taskId, token, ct)
|
if (_state is not null)
|
||||||
?? throw new InvalidOperationException("Failed to transition task to Planning.");
|
{
|
||||||
|
var startResult = await _state.StartPlanningAsync(taskId, ct);
|
||||||
|
if (!startResult.Ok)
|
||||||
|
throw new InvalidOperationException(startResult.Reason ?? "Failed to transition task to Planning.");
|
||||||
|
await tasks.SetPlanningSessionTokenAsync(taskId, token, ct);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Test fallback when no state-service is provided.
|
||||||
|
if (await tasks.SetPlanningStartedAsync(taskId, token, ct) is null)
|
||||||
|
throw new InvalidOperationException("Failed to transition task to Planning.");
|
||||||
|
}
|
||||||
|
|
||||||
var sessionDir = Path.Combine(_rootDirectory, taskId);
|
var sessionDir = Path.Combine(_rootDirectory, taskId);
|
||||||
Directory.CreateDirectory(sessionDir);
|
Directory.CreateDirectory(sessionDir);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ using ClaudeDo.Worker.Hub;
|
|||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.Planning;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
var cfg = WorkerConfig.Load();
|
var cfg = WorkerConfig.Load();
|
||||||
@@ -41,6 +42,17 @@ builder.Services.AddSingleton<PlanningAggregator>();
|
|||||||
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
||||||
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
||||||
|
|
||||||
|
// Centralized status mutation. Use a delegate for WakeQueue to break the
|
||||||
|
// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle;
|
||||||
|
// Slice 3 will replace this with IQueueWaker.
|
||||||
|
builder.Services.AddSingleton<Func<ITaskStateService>>(sp => () => sp.GetRequiredService<ITaskStateService>());
|
||||||
|
builder.Services.AddSingleton<ITaskStateService>(sp => new TaskStateService(
|
||||||
|
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
||||||
|
sp.GetRequiredService<HubBroadcaster>(),
|
||||||
|
() => sp.GetRequiredService<QueueService>().WakeQueue(),
|
||||||
|
sp.GetRequiredService<PlanningChainCoordinator>(),
|
||||||
|
sp.GetRequiredService<ILogger<TaskStateService>>()));
|
||||||
|
|
||||||
// Agent file management.
|
// Agent file management.
|
||||||
var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents");
|
var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents");
|
||||||
Directory.CreateDirectory(agentsDir);
|
Directory.CreateDirectory(agentsDir);
|
||||||
@@ -65,6 +77,7 @@ builder.Services.AddSingleton(sp =>
|
|||||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
|
||||||
sp.GetRequiredService<GitService>(),
|
sp.GetRequiredService<GitService>(),
|
||||||
cfg,
|
cfg,
|
||||||
|
sp.GetRequiredService<ITaskStateService>(),
|
||||||
planningSessionsDir));
|
planningSessionsDir));
|
||||||
builder.Services.AddSingleton<IPlanningTerminalLauncher>(sp =>
|
builder.Services.AddSingleton<IPlanningTerminalLauncher>(sp =>
|
||||||
new WindowsTerminalPlanningLauncher("wt.exe", cfg.ClaudeBin));
|
new WindowsTerminalPlanningLauncher("wt.exe", cfg.ClaudeBin));
|
||||||
@@ -123,6 +136,7 @@ if (cfg.ExternalMcpPort > 0)
|
|||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<HubBroadcaster>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<HubBroadcaster>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
|
||||||
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
|
||||||
|
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
|
||||||
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
|
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
|
||||||
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
|
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
|
||||||
externalBuilder.Services.AddScoped<TaskRepository>();
|
externalBuilder.Services.AddScoped<TaskRepository>();
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Planning;
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -18,7 +18,7 @@ public sealed class TaskRunner
|
|||||||
private readonly ClaudeArgsBuilder _argsBuilder;
|
private readonly ClaudeArgsBuilder _argsBuilder;
|
||||||
private readonly WorkerConfig _cfg;
|
private readonly WorkerConfig _cfg;
|
||||||
private readonly ILogger<TaskRunner> _logger;
|
private readonly ILogger<TaskRunner> _logger;
|
||||||
private readonly PlanningChainCoordinator _chain;
|
private readonly ITaskStateService _state;
|
||||||
|
|
||||||
public TaskRunner(
|
public TaskRunner(
|
||||||
IClaudeProcess claude,
|
IClaudeProcess claude,
|
||||||
@@ -28,7 +28,7 @@ public sealed class TaskRunner
|
|||||||
ClaudeArgsBuilder argsBuilder,
|
ClaudeArgsBuilder argsBuilder,
|
||||||
WorkerConfig cfg,
|
WorkerConfig cfg,
|
||||||
ILogger<TaskRunner> logger,
|
ILogger<TaskRunner> logger,
|
||||||
PlanningChainCoordinator chain)
|
ITaskStateService state)
|
||||||
{
|
{
|
||||||
_claude = claude;
|
_claude = claude;
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
@@ -37,7 +37,7 @@ public sealed class TaskRunner
|
|||||||
_argsBuilder = argsBuilder;
|
_argsBuilder = argsBuilder;
|
||||||
_cfg = cfg;
|
_cfg = cfg;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_chain = chain;
|
_state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
|
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
|
||||||
@@ -91,11 +91,7 @@ public sealed class TaskRunner
|
|||||||
var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct);
|
var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct);
|
||||||
|
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
await _state.StartRunningAsync(task.Id, now, ct);
|
||||||
{
|
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
await taskRepo.MarkRunningAsync(task.Id, now, ct);
|
|
||||||
}
|
|
||||||
await _broadcaster.TaskStarted(slot, task.Id, now);
|
await _broadcaster.TaskStarted(slot, task.Id, now);
|
||||||
|
|
||||||
// Build prompt.
|
// Build prompt.
|
||||||
@@ -202,11 +198,7 @@ public sealed class TaskRunner
|
|||||||
}
|
}
|
||||||
|
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
await _state.StartRunningAsync(taskId, now, ct);
|
||||||
{
|
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
await taskRepo.MarkRunningAsync(taskId, now, ct);
|
|
||||||
}
|
|
||||||
await _broadcaster.TaskStarted(slot, taskId, now);
|
await _broadcaster.TaskStarted(slot, taskId, now);
|
||||||
|
|
||||||
var nextRunNumber = lastRun.RunNumber + 1;
|
var nextRunNumber = lastRun.RunNumber + 1;
|
||||||
@@ -332,34 +324,11 @@ public sealed class TaskRunner
|
|||||||
// is never left as 'running' because of a cancel that arrived
|
// is never left as 'running' because of a cancel that arrived
|
||||||
// after the Claude run already succeeded.
|
// after the Claude run already succeeded.
|
||||||
var finishedAt = DateTime.UtcNow;
|
var finishedAt = DateTime.UtcNow;
|
||||||
using (var context = _dbFactory.CreateDbContext())
|
await _state.CompleteAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
|
||||||
{
|
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
await taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
|
|
||||||
if (task.ParentTaskId is not null)
|
|
||||||
await taskRepo.TryCompleteParentAsync(task.ParentTaskId, CancellationToken.None);
|
|
||||||
}
|
|
||||||
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow);
|
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow);
|
||||||
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
|
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
|
||||||
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
|
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
|
||||||
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
|
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
|
||||||
|
|
||||||
// Sequential planning chain: if this task has a parent, flip the next
|
|
||||||
// Waiting sibling to Queued so the queue pickup loop dispatches it next.
|
|
||||||
if (task.ParentTaskId is not null)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var advanced = await _chain.OnChildFinishedAsync(
|
|
||||||
task.Id, TaskStatus.Done, CancellationToken.None);
|
|
||||||
if (advanced is not null)
|
|
||||||
await _broadcaster.TaskUpdated(advanced);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", task.Id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result)
|
private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result)
|
||||||
@@ -367,12 +336,7 @@ public sealed class TaskRunner
|
|||||||
// Intentionally does not accept a CancellationToken: this is the
|
// Intentionally does not accept a CancellationToken: this is the
|
||||||
// terminal write for a failed task and must always be persisted.
|
// terminal write for a failed task and must always be persisted.
|
||||||
var finishedAt = DateTime.UtcNow;
|
var finishedAt = DateTime.UtcNow;
|
||||||
using var context = _dbFactory.CreateDbContext();
|
await _state.FailAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None);
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
await taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None);
|
|
||||||
var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None);
|
|
||||||
if (justFailed?.ParentTaskId is not null)
|
|
||||||
await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None);
|
|
||||||
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
|
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
|
||||||
await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt);
|
await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt);
|
||||||
_logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown);
|
_logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown);
|
||||||
@@ -384,15 +348,9 @@ public sealed class TaskRunner
|
|||||||
{
|
{
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
// Terminal write — never cancel.
|
// Terminal write — never cancel.
|
||||||
using var context = _dbFactory.CreateDbContext();
|
await _state.FailAsync(taskId, now, error, CancellationToken.None);
|
||||||
var taskRepo = new TaskRepository(context);
|
|
||||||
await taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None);
|
|
||||||
var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None);
|
|
||||||
if (justFailed?.ParentTaskId is not null)
|
|
||||||
await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None);
|
|
||||||
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
|
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
|
||||||
await _broadcaster.TaskFinished(slot, taskId, "failed", now);
|
await _broadcaster.TaskFinished(slot, taskId, "failed", now);
|
||||||
await _broadcaster.TaskUpdated(taskId);
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,25 +1,21 @@
|
|||||||
using ClaudeDo.Data;
|
using ClaudeDo.Worker.State;
|
||||||
using ClaudeDo.Data.Repositories;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Services;
|
namespace ClaudeDo.Worker.Services;
|
||||||
|
|
||||||
public sealed class StaleTaskRecovery : IHostedService
|
public sealed class StaleTaskRecovery : IHostedService
|
||||||
{
|
{
|
||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly ITaskStateService _state;
|
||||||
private readonly ILogger<StaleTaskRecovery> _logger;
|
private readonly ILogger<StaleTaskRecovery> _logger;
|
||||||
|
|
||||||
public StaleTaskRecovery(IDbContextFactory<ClaudeDoDbContext> dbFactory, ILogger<StaleTaskRecovery> logger)
|
public StaleTaskRecovery(ITaskStateService state, ILogger<StaleTaskRecovery> logger)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_state = state;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task StartAsync(CancellationToken cancellationToken)
|
public async Task StartAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
using var context = _dbFactory.CreateDbContext();
|
var flipped = await _state.RecoverStaleRunningAsync("worker restart", cancellationToken);
|
||||||
var tasks = new TaskRepository(context);
|
|
||||||
var flipped = await tasks.FlipAllRunningToFailedAsync("worker restart", cancellationToken);
|
|
||||||
if (flipped > 0)
|
if (flipped > 0)
|
||||||
_logger.LogWarning("Stale task recovery: flipped {Count} running task(s) to failed", flipped);
|
_logger.LogWarning("Stale task recovery: flipped {Count} running task(s) to failed", flipped);
|
||||||
else
|
else
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -13,17 +14,20 @@ public sealed class TaskResetService
|
|||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
private readonly WorktreeManager _wtManager;
|
private readonly WorktreeManager _wtManager;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly ITaskStateService _state;
|
||||||
private readonly ILogger<TaskResetService> _logger;
|
private readonly ILogger<TaskResetService> _logger;
|
||||||
|
|
||||||
public TaskResetService(
|
public TaskResetService(
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
WorktreeManager wtManager,
|
WorktreeManager wtManager,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
|
ITaskStateService state,
|
||||||
ILogger<TaskResetService> logger)
|
ILogger<TaskResetService> logger)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_wtManager = wtManager;
|
_wtManager = wtManager;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
|
_state = state;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,16 +59,13 @@ public sealed class TaskResetService
|
|||||||
worktreeChanged = true;
|
worktreeChanged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
using (var ctx = _dbFactory.CreateDbContext())
|
await _state.ResetToIdleAsync(taskId, ct);
|
||||||
{
|
|
||||||
await new TaskRepository(ctx).ResetToManualAsync(taskId, ct);
|
|
||||||
}
|
|
||||||
|
|
||||||
await _broadcaster.TaskUpdated(taskId);
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
if (worktreeChanged)
|
if (worktreeChanged)
|
||||||
await _broadcaster.WorktreeUpdated(taskId);
|
await _broadcaster.WorktreeUpdated(taskId);
|
||||||
|
|
||||||
_logger.LogInformation("Reset task {TaskId} to Manual (worktree discarded: {Discarded})", taskId, worktreeChanged);
|
_logger.LogInformation("Reset task {TaskId} to Idle (worktree discarded: {Discarded})", taskId, worktreeChanged);
|
||||||
await _broadcaster.WorkerLog($"Reset \"{task.Title}\"", WorkerLogLevel.Warn, DateTime.UtcNow);
|
await _broadcaster.WorkerLog($"Reset \"{task.Title}\"", WorkerLogLevel.Warn, DateTime.UtcNow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
19
src/ClaudeDo.Worker/State/ITaskStateService.cs
Normal file
19
src/ClaudeDo.Worker/State/ITaskStateService.cs
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
namespace ClaudeDo.Worker.State;
|
||||||
|
|
||||||
|
public interface ITaskStateService
|
||||||
|
{
|
||||||
|
Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct);
|
||||||
|
Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct);
|
||||||
|
Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct);
|
||||||
|
Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct);
|
||||||
|
Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct);
|
||||||
|
Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct);
|
||||||
|
|
||||||
|
Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct);
|
||||||
|
Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct);
|
||||||
|
|
||||||
|
Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct);
|
||||||
|
Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct);
|
||||||
|
|
||||||
|
Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct);
|
||||||
|
}
|
||||||
258
src/ClaudeDo.Worker/State/TaskStateService.cs
Normal file
258
src/ClaudeDo.Worker/State/TaskStateService.cs
Normal file
@@ -0,0 +1,258 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.State;
|
||||||
|
|
||||||
|
public sealed class TaskStateService : ITaskStateService
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly Action _wakeQueue;
|
||||||
|
private readonly PlanningChainCoordinator _chain;
|
||||||
|
private readonly ILogger<TaskStateService> _logger;
|
||||||
|
|
||||||
|
public TaskStateService(
|
||||||
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
HubBroadcaster broadcaster,
|
||||||
|
Action wakeQueue,
|
||||||
|
PlanningChainCoordinator chain,
|
||||||
|
ILogger<TaskStateService> logger)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
|
_wakeQueue = wakeQueue;
|
||||||
|
_chain = chain;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
||||||
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not found or already running.");
|
||||||
|
|
||||||
|
_wakeQueue();
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Running)
|
||||||
|
.SetProperty(t => t.StartedAt, startedAt), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task already running or not found.");
|
||||||
|
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||||
|
{
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Done)
|
||||||
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
||||||
|
.SetProperty(t => t.Result, result), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not running; cannot complete.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await OnChildTerminalAsync(taskId, TaskStatus.Done);
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||||
|
{
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Done)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
||||||
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
||||||
|
.SetProperty(t => t.Result, error), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task already done; cannot fail.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await OnChildTerminalAsync(taskId, TaskStatus.Failed);
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||||
|
{
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId &&
|
||||||
|
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued))
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Cancelled)
|
||||||
|
.SetProperty(t => t.FinishedAt, finishedAt), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not in cancellable state.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await OnChildTerminalAsync(taskId, TaskStatus.Cancelled);
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Idle)
|
||||||
|
.SetProperty(t => t.StartedAt, (DateTime?)null)
|
||||||
|
.SetProperty(t => t.FinishedAt, (DateTime?)null)
|
||||||
|
.SetProperty(t => t.Result, (string?)null), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task is running; cannot reset.");
|
||||||
|
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == parentId &&
|
||||||
|
(t.Status == TaskStatus.Manual || t.Status == TaskStatus.Idle))
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Planning)
|
||||||
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not in plannable state.");
|
||||||
|
|
||||||
|
await _broadcaster.TaskUpdated(parentId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized)
|
||||||
|
.SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "No active planning session.");
|
||||||
|
|
||||||
|
await _broadcaster.TaskUpdated(parentId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId)
|
||||||
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not found.");
|
||||||
|
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
var affected = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId)
|
||||||
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct);
|
||||||
|
|
||||||
|
if (affected == 0)
|
||||||
|
return new TransitionResult(false, "Task not found.");
|
||||||
|
|
||||||
|
// Bridge to legacy chain layout: a Waiting predecessor-blocked sibling becomes Queued
|
||||||
|
// when its predecessor finishes. New layout (post-Slice 4) stores siblings as
|
||||||
|
// Status=Queued + BlockedByTaskId set, so this is a no-op for them.
|
||||||
|
await ctx.Tasks
|
||||||
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
|
||||||
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
||||||
|
|
||||||
|
_wakeQueue();
|
||||||
|
await _broadcaster.TaskUpdated(taskId);
|
||||||
|
return new TransitionResult(true, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var resultText = "[stale] " + reason;
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
||||||
|
return await ctx.Tasks
|
||||||
|
.Where(t => t.Status == TaskStatus.Running)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
||||||
|
.SetProperty(t => t.FinishedAt, now)
|
||||||
|
.SetProperty(t => t.Result, resultText), ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus)
|
||||||
|
{
|
||||||
|
// Terminal child writes are best-effort and use CancellationToken.None so the
|
||||||
|
// task lifecycle is never left partially completed because a caller cancelled.
|
||||||
|
string? parentId;
|
||||||
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
|
||||||
|
{
|
||||||
|
parentId = await ctx.Tasks
|
||||||
|
.AsNoTracking()
|
||||||
|
.Where(t => t.Id == taskId)
|
||||||
|
.Select(t => t.ParentTaskId)
|
||||||
|
.FirstOrDefaultAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
if (parentId is null) return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None);
|
||||||
|
await new TaskRepository(ctx).TryCompleteParentAsync(parentId, CancellationToken.None);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "TryCompleteParent failed for {ParentId}", parentId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
3
src/ClaudeDo.Worker/State/TransitionResult.cs
Normal file
3
src/ClaudeDo.Worker/State/TransitionResult.cs
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
namespace ClaudeDo.Worker.State;
|
||||||
|
|
||||||
|
public sealed record TransitionResult(bool Ok, string? Reason);
|
||||||
@@ -94,7 +94,8 @@ public sealed class ExternalMcpServiceTests : IDisposable
|
|||||||
// we never call its WakeQueue/RunNow/CancelTask paths, so a real QueueService
|
// we never call its WakeQueue/RunNow/CancelTask paths, so a real QueueService
|
||||||
// built with the same approach used in QueueServiceTests is sufficient.
|
// built with the same approach used in QueueServiceTests is sufficient.
|
||||||
private ExternalMcpService BuildSut(QueueService queue) =>
|
private ExternalMcpService BuildSut(QueueService queue) =>
|
||||||
new(_tasks, _lists, queue, _broadcaster, _tags);
|
new(_tasks, _lists, queue, _broadcaster, _tags,
|
||||||
|
TaskStateServiceBuilder.Build(_db.CreateFactory()).State);
|
||||||
|
|
||||||
private QueueService CreateQueue()
|
private QueueService CreateQueue()
|
||||||
{
|
{
|
||||||
@@ -113,7 +114,7 @@ public sealed class ExternalMcpServiceTests : IDisposable
|
|||||||
var wtManager = new WorktreeManager(new GitService(), dbFactory, cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new GitService(), dbFactory, cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
||||||
NullLogger<TaskRunner>.Instance);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance);
|
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ public sealed class PlanningHubTests : IDisposable
|
|||||||
{
|
{
|
||||||
var hub = new WorkerHub(
|
var hub = new WorkerHub(
|
||||||
null!, null!, null!, null!, null!, null!, null!, null!,
|
null!, null!, null!, null!, null!, null!, null!, null!,
|
||||||
_planning, _launcher, null!, null!);
|
_planning, _launcher, null!, null!, null!);
|
||||||
hub.Clients = new FakeHubCallerClients(_proxy);
|
hub.Clients = new FakeHubCallerClients(_proxy);
|
||||||
hub.Context = new FakeHubCallerContext();
|
hub.Context = new FakeHubCallerContext();
|
||||||
return hub;
|
return hub;
|
||||||
|
|||||||
39
tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs
Normal file
39
tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
|
||||||
|
public sealed record CapturedHubCall(string Method, object?[] Args);
|
||||||
|
|
||||||
|
public sealed class CapturingClientProxy : IClientProxy
|
||||||
|
{
|
||||||
|
public readonly List<CapturedHubCall> Calls = new();
|
||||||
|
|
||||||
|
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
Calls.Add(new CapturedHubCall(method, args));
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class CapturingHubClients : IHubClients
|
||||||
|
{
|
||||||
|
public CapturingClientProxy AllProxy { get; } = new();
|
||||||
|
public IClientProxy All => AllProxy;
|
||||||
|
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds) => AllProxy;
|
||||||
|
public IClientProxy Client(string connectionId) => AllProxy;
|
||||||
|
public IClientProxy Clients(IReadOnlyList<string> connectionIds) => AllProxy;
|
||||||
|
public IClientProxy Group(string groupName) => AllProxy;
|
||||||
|
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds) => AllProxy;
|
||||||
|
public IClientProxy Groups(IReadOnlyList<string> groupNames) => AllProxy;
|
||||||
|
public IClientProxy User(string userId) => AllProxy;
|
||||||
|
public IClientProxy Users(IReadOnlyList<string> userIds) => AllProxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class CapturingHubContext : IHubContext<WorkerHub>
|
||||||
|
{
|
||||||
|
private readonly CapturingHubClients _clients = new();
|
||||||
|
public CapturingClientProxy Proxy => _clients.AllProxy;
|
||||||
|
public IHubClients Clients => _clients;
|
||||||
|
public IGroupManager Groups => throw new NotImplementedException();
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
|
||||||
|
/// Test-only helper that wires TaskStateService and PlanningChainCoordinator
|
||||||
|
/// against a shared DB factory, breaking the Func cycle between them.
|
||||||
|
public static class TaskStateServiceBuilder
|
||||||
|
{
|
||||||
|
public sealed record Built(
|
||||||
|
TaskStateService State,
|
||||||
|
PlanningChainCoordinator Chain,
|
||||||
|
CapturingHubContext Hub,
|
||||||
|
Func<int> WakeCount);
|
||||||
|
|
||||||
|
public static Built Build(IDbContextFactory<ClaudeDoDbContext> dbFactory)
|
||||||
|
{
|
||||||
|
var hub = new CapturingHubContext();
|
||||||
|
var broadcaster = new HubBroadcaster(hub);
|
||||||
|
var wakeCount = new int[1];
|
||||||
|
|
||||||
|
TaskStateService? state = null;
|
||||||
|
var chain = new PlanningChainCoordinator(dbFactory, () => state!);
|
||||||
|
state = new TaskStateService(
|
||||||
|
dbFactory,
|
||||||
|
broadcaster,
|
||||||
|
() => Interlocked.Increment(ref wakeCount[0]),
|
||||||
|
chain,
|
||||||
|
NullLogger<TaskStateService>.Instance);
|
||||||
|
|
||||||
|
return new Built(state, chain, hub, () => Volatile.Read(ref wakeCount[0]));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,7 +17,7 @@ public sealed class PlanningChainCoordinatorTests : IDisposable
|
|||||||
public PlanningChainCoordinatorTests()
|
public PlanningChainCoordinatorTests()
|
||||||
{
|
{
|
||||||
_factory = _db.CreateFactory();
|
_factory = _db.CreateFactory();
|
||||||
_sut = new PlanningChainCoordinator(_factory);
|
_sut = TaskStateServiceBuilder.Build(_factory).Chain;
|
||||||
_listId = Guid.NewGuid().ToString();
|
_listId = Guid.NewGuid().ToString();
|
||||||
using var ctx = _factory.CreateDbContext();
|
using var ctx = _factory.CreateDbContext();
|
||||||
ctx.Lists.Add(new ListEntity
|
ctx.Lists.Add(new ListEntity
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance);
|
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,7 +54,8 @@ public sealed class StaleTaskRecoveryTests : IDisposable
|
|||||||
await _tasks.AddAsync(running);
|
await _tasks.AddAsync(running);
|
||||||
await _tasks.AddAsync(queued);
|
await _tasks.AddAsync(queued);
|
||||||
|
|
||||||
var recovery = new StaleTaskRecovery(_db.CreateFactory(), NullLogger<StaleTaskRecovery>.Instance);
|
var built = TaskStateServiceBuilder.Build(_db.CreateFactory());
|
||||||
|
var recovery = new StaleTaskRecovery(built.State, NullLogger<StaleTaskRecovery>.Instance);
|
||||||
await recovery.StartAsync(CancellationToken.None);
|
await recovery.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
var r = await _tasks.GetByIdAsync(running.Id);
|
var r = await _tasks.GetByIdAsync(running.Id);
|
||||||
|
|||||||
@@ -34,10 +34,12 @@ public class TaskResetServiceTests : IDisposable
|
|||||||
{
|
{
|
||||||
var fakeHub = new RecordingHubContext();
|
var fakeHub = new RecordingHubContext();
|
||||||
var broadcaster = new HubBroadcaster(fakeHub);
|
var broadcaster = new HubBroadcaster(fakeHub);
|
||||||
|
var built = TaskStateServiceBuilder.Build(db.CreateFactory());
|
||||||
var svc = new TaskResetService(
|
var svc = new TaskResetService(
|
||||||
db.CreateFactory(),
|
db.CreateFactory(),
|
||||||
wtMgr,
|
wtMgr,
|
||||||
broadcaster,
|
broadcaster,
|
||||||
|
built.State,
|
||||||
NullLogger<TaskResetService>.Instance);
|
NullLogger<TaskResetService>.Instance);
|
||||||
return (svc, fakeHub.Proxy);
|
return (svc, fakeHub.Proxy);
|
||||||
}
|
}
|
||||||
@@ -111,7 +113,7 @@ public class TaskResetServiceTests : IDisposable
|
|||||||
{
|
{
|
||||||
var updated = await new TaskRepository(ctx).GetByIdAsync(task.Id);
|
var updated = await new TaskRepository(ctx).GetByIdAsync(task.Id);
|
||||||
Assert.NotNull(updated);
|
Assert.NotNull(updated);
|
||||||
Assert.Equal(TaskStatus.Manual, updated!.Status);
|
Assert.Equal(TaskStatus.Idle, updated!.Status);
|
||||||
Assert.Null(updated.Result);
|
Assert.Null(updated.Result);
|
||||||
Assert.Null(updated.StartedAt);
|
Assert.Null(updated.StartedAt);
|
||||||
Assert.Null(updated.FinishedAt);
|
Assert.Null(updated.FinishedAt);
|
||||||
|
|||||||
383
tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs
Normal file
383
tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs
Normal file
@@ -0,0 +1,383 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Data.Repositories;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.State;
|
||||||
|
|
||||||
|
public sealed class TaskStateServiceTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly DbFixture _db = new();
|
||||||
|
private readonly TestDbContextFactory _factory;
|
||||||
|
private readonly TaskStateServiceBuilder.Built _built;
|
||||||
|
private readonly ITaskStateService _sut;
|
||||||
|
private readonly string _listId;
|
||||||
|
|
||||||
|
public TaskStateServiceTests()
|
||||||
|
{
|
||||||
|
_factory = _db.CreateFactory();
|
||||||
|
_built = TaskStateServiceBuilder.Build(_factory);
|
||||||
|
_sut = _built.State;
|
||||||
|
|
||||||
|
_listId = Guid.NewGuid().ToString();
|
||||||
|
using var ctx = _factory.CreateDbContext();
|
||||||
|
ctx.Lists.Add(new ListEntity
|
||||||
|
{
|
||||||
|
Id = _listId,
|
||||||
|
Name = "Test",
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
DefaultCommitType = "chore",
|
||||||
|
});
|
||||||
|
ctx.SaveChanges();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() => _db.Dispose();
|
||||||
|
|
||||||
|
private async Task<string> SeedTaskAsync(TaskStatus status, string? parentId = null, int sortOrder = 0)
|
||||||
|
{
|
||||||
|
var id = Guid.NewGuid().ToString();
|
||||||
|
await using var ctx = _factory.CreateDbContext();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = id,
|
||||||
|
ListId = _listId,
|
||||||
|
Title = "task",
|
||||||
|
Status = status,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
ParentTaskId = parentId,
|
||||||
|
SortOrder = sortOrder,
|
||||||
|
});
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<TaskStatus> GetStatusAsync(string id)
|
||||||
|
{
|
||||||
|
await using var ctx = _factory.CreateDbContext();
|
||||||
|
return await ctx.Tasks.Where(t => t.Id == id).Select(t => t.Status).FirstAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<TaskEntity> GetTaskAsync(string id)
|
||||||
|
{
|
||||||
|
await using var ctx = _factory.CreateDbContext();
|
||||||
|
return await new TaskRepository(ctx).GetByIdAsync(id) ?? throw new InvalidOperationException($"task {id} not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── EnqueueAsync ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task EnqueueAsync_FromIdle_TransitionsToQueued_AndWakesQueue()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Idle);
|
||||||
|
var wakesBefore = _built.WakeCount();
|
||||||
|
|
||||||
|
var result = await _sut.EnqueueAsync(id, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Queued, await GetStatusAsync(id));
|
||||||
|
Assert.True(_built.WakeCount() > wakesBefore);
|
||||||
|
Assert.Contains(_built.Hub.Proxy.Calls, c => c.Method == "TaskUpdated");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task EnqueueAsync_FromRunning_Rejects_AndDoesNotMutate()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.EnqueueAsync(id, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Running, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── StartRunningAsync ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartRunningAsync_FromQueued_TransitionsToRunning_AndStampsStartedAt()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
var startedAt = new DateTime(2026, 4, 27, 10, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
var result = await _sut.StartRunningAsync(id, startedAt, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(TaskStatus.Running, t.Status);
|
||||||
|
Assert.Equal(startedAt, t.StartedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartRunningAsync_FromRunning_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.StartRunningAsync(id, DateTime.UtcNow, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartRunningAsync_TwoParallelClaims_ExactlyOneWins()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
var startedAt = DateTime.UtcNow;
|
||||||
|
|
||||||
|
// Two concurrent calls: only one ExecuteUpdate should affect a row.
|
||||||
|
var t1 = Task.Run(() => _sut.StartRunningAsync(id, startedAt, default));
|
||||||
|
var t2 = Task.Run(() => _sut.StartRunningAsync(id, startedAt, default));
|
||||||
|
var results = await Task.WhenAll(t1, t2);
|
||||||
|
|
||||||
|
var winners = results.Count(r => r.Ok);
|
||||||
|
Assert.Equal(1, winners);
|
||||||
|
Assert.Equal(TaskStatus.Running, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── CompleteAsync ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteAsync_FromRunning_TransitionsToDone()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.CompleteAsync(id, DateTime.UtcNow, "ok", default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(TaskStatus.Done, t.Status);
|
||||||
|
Assert.Equal("ok", t.Result);
|
||||||
|
Assert.NotNull(t.FinishedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteAsync_FromQueued_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
|
||||||
|
var result = await _sut.CompleteAsync(id, DateTime.UtcNow, "ok", default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Queued, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── FailAsync ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FailAsync_FromRunning_TransitionsToFailed()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.FailAsync(id, DateTime.UtcNow, "boom", default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(TaskStatus.Failed, t.Status);
|
||||||
|
Assert.Equal("boom", t.Result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FailAsync_FromDone_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Done);
|
||||||
|
|
||||||
|
var result = await _sut.FailAsync(id, DateTime.UtcNow, "boom", default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Done, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── CancelAsync ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CancelAsync_FromRunning_TransitionsToCancelled()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.CancelAsync(id, DateTime.UtcNow, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Cancelled, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CancelAsync_FromDone_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Done);
|
||||||
|
|
||||||
|
var result = await _sut.CancelAsync(id, DateTime.UtcNow, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Done, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── ResetToIdleAsync ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ResetToIdleAsync_FromFailed_ClearsTimestamps()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Failed);
|
||||||
|
await using (var ctx = _factory.CreateDbContext())
|
||||||
|
{
|
||||||
|
await ctx.Tasks.Where(t => t.Id == id)
|
||||||
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.StartedAt, DateTime.UtcNow.AddMinutes(-5))
|
||||||
|
.SetProperty(t => t.FinishedAt, DateTime.UtcNow.AddMinutes(-1))
|
||||||
|
.SetProperty(t => t.Result, "old"));
|
||||||
|
}
|
||||||
|
|
||||||
|
var result = await _sut.ResetToIdleAsync(id, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(TaskStatus.Idle, t.Status);
|
||||||
|
Assert.Null(t.StartedAt);
|
||||||
|
Assert.Null(t.FinishedAt);
|
||||||
|
Assert.Null(t.Result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ResetToIdleAsync_FromRunning_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.ResetToIdleAsync(id, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Running, await GetStatusAsync(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── StartPlanningAsync ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartPlanningAsync_FromManual_FlipsStatus_AndPlanningPhase()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Manual);
|
||||||
|
|
||||||
|
var result = await _sut.StartPlanningAsync(id, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(TaskStatus.Planning, t.Status);
|
||||||
|
Assert.Equal(PlanningPhase.Active, t.PlanningPhase);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartPlanningAsync_FromRunning_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
|
||||||
|
var result = await _sut.StartPlanningAsync(id, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── FinalizePlanningAsync ────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FinalizePlanningAsync_OnActivePhase_TransitionsToFinalized()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Manual);
|
||||||
|
await _sut.StartPlanningAsync(id, default);
|
||||||
|
|
||||||
|
var result = await _sut.FinalizePlanningAsync(id, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(id);
|
||||||
|
Assert.Equal(PlanningPhase.Finalized, t.PlanningPhase);
|
||||||
|
Assert.NotNull(t.PlanningFinalizedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FinalizePlanningAsync_OnNonePhase_Rejects()
|
||||||
|
{
|
||||||
|
var id = await SeedTaskAsync(TaskStatus.Manual);
|
||||||
|
|
||||||
|
var result = await _sut.FinalizePlanningAsync(id, default);
|
||||||
|
|
||||||
|
Assert.False(result.Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── BlockOnAsync / UnblockAsync ─────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task BlockOnAsync_SetsBlockedByTaskId()
|
||||||
|
{
|
||||||
|
var pred = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
var task = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
|
||||||
|
var result = await _sut.BlockOnAsync(task, pred, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(task);
|
||||||
|
Assert.Equal(pred, t.BlockedByTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task UnblockAsync_ClearsBlockedByTaskId_AndWakesQueue()
|
||||||
|
{
|
||||||
|
var pred = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
var task = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
await _sut.BlockOnAsync(task, pred, default);
|
||||||
|
var wakesBefore = _built.WakeCount();
|
||||||
|
|
||||||
|
var result = await _sut.UnblockAsync(task, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
var t = await GetTaskAsync(task);
|
||||||
|
Assert.Null(t.BlockedByTaskId);
|
||||||
|
Assert.True(_built.WakeCount() > wakesBefore);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task UnblockAsync_OnWaitingTask_FlipsToQueued()
|
||||||
|
{
|
||||||
|
// Bridge to legacy chain layout: a Status=Waiting sibling becomes Queued on unblock.
|
||||||
|
var task = await SeedTaskAsync(TaskStatus.Waiting);
|
||||||
|
|
||||||
|
var result = await _sut.UnblockAsync(task, default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Queued, await GetStatusAsync(task));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── RecoverStaleRunningAsync ─────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RecoverStaleRunningAsync_FlipsAllRunningToFailed_ReturnsCount()
|
||||||
|
{
|
||||||
|
var r1 = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
var r2 = await SeedTaskAsync(TaskStatus.Running);
|
||||||
|
var q = await SeedTaskAsync(TaskStatus.Queued);
|
||||||
|
|
||||||
|
var count = await _sut.RecoverStaleRunningAsync("worker restart", default);
|
||||||
|
|
||||||
|
Assert.Equal(2, count);
|
||||||
|
Assert.Equal(TaskStatus.Failed, await GetStatusAsync(r1));
|
||||||
|
Assert.Equal(TaskStatus.Failed, await GetStatusAsync(r2));
|
||||||
|
Assert.Equal(TaskStatus.Queued, await GetStatusAsync(q));
|
||||||
|
var t = await GetTaskAsync(r1);
|
||||||
|
Assert.StartsWith("[stale] ", t.Result);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Child terminal → chain advance ───────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteAsync_OnChild_AdvancesNextWaitingSibling()
|
||||||
|
{
|
||||||
|
var parent = await SeedTaskAsync(TaskStatus.Planned);
|
||||||
|
var c0 = await SeedTaskAsync(TaskStatus.Running, parentId: parent, sortOrder: 0);
|
||||||
|
var c1 = await SeedTaskAsync(TaskStatus.Waiting, parentId: parent, sortOrder: 1);
|
||||||
|
var c2 = await SeedTaskAsync(TaskStatus.Waiting, parentId: parent, sortOrder: 2);
|
||||||
|
|
||||||
|
var result = await _sut.CompleteAsync(c0, DateTime.UtcNow, "ok", default);
|
||||||
|
|
||||||
|
Assert.True(result.Ok);
|
||||||
|
Assert.Equal(TaskStatus.Done, await GetStatusAsync(c0));
|
||||||
|
// Next sibling was Waiting → chain coordinator unblocks → Queued.
|
||||||
|
Assert.Equal(TaskStatus.Queued, await GetStatusAsync(c1));
|
||||||
|
// Subsequent sibling untouched.
|
||||||
|
Assert.Equal(TaskStatus.Waiting, await GetStatusAsync(c2));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user