From 8823265e5afa38250a150ec3fb449bcc0cef9a14 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Mon, 27 Apr 2026 11:31:57 +0200 Subject: [PATCH] refactor(worker/state): introduce TaskStateService and route mutations through it Slice 2 of the worker state consolidation refactor (spec sections 2 and 8). Adds Worker/State/ITaskStateService + TaskStateService as the single component that mutates Status, PlanningPhase, and BlockedByTaskId. Each transition is one atomic ExecuteUpdate with a WHERE filter on the expected source status, so parallel claims are TOCTOU-free. Side effects (queue wake on -> Queued, hub TaskUpdated broadcast, chain advance + parent completion on terminal child) are owned by the service so callers no longer need to remember them. Migrated callers (mechanical, behavior preserved): - TaskRunner: HandleSuccess/HandleFailure/MarkFailed/RunAsync/ContinueAsync - StaleTaskRecovery: bulk recover stale Running tasks - TaskResetService: status flip (worktree cleanup stays in service) - PlanningSessionManager.StartAsync: status flip via state, token write via repo - PlanningChainCoordinator.OnChildFinishedAsync: routes the next-sibling write through state.UnblockAsync (Slice 4 finishes the rewrite) - ExternalMcpService.UpdateTaskStatus: Queued case via state.EnqueueAsync Repo Mark*Async helpers (MarkRunning/MarkDone/MarkFailed/FlipAllRunningToFailed) are now internal; ClaudeDo.Data grants InternalsVisibleTo to ClaudeDo.Worker and ClaudeDo.Worker.Tests for the existing repo-level tests. DI: TaskStateService is registered as Singleton in both the main app and the external-MCP app; the queue-wake delegate captures sp -> QueueService.WakeQueue to break the TaskStateService -> QueueService -> TaskRunner -> TaskStateService construction cycle. PlanningChainCoordinator takes Func for the same reason; Slice 3 will replace both with IQueueWaker. Tests: TaskStateServiceTests covers happy + reject for every transition, the parallel StartRunningAsync claim race, child-terminal chain advancement, and stale recovery. Existing service/repo tests are updated to construct the new state-service via a TaskStateServiceBuilder helper. Pre-existing constructor drift in QueueService/ExternalMcp/PlanningHub tests is patched to keep the test project building (the surrounding test logic is otherwise untouched). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ClaudeDo.Data/ClaudeDo.Data.csproj | 5 + .../Repositories/TaskRepository.cs | 19 +- .../External/ExternalMcpService.cs | 16 +- .../Planning/PlanningChainCoordinator.cs | 15 +- .../Planning/PlanningSessionManager.cs | 23 +- src/ClaudeDo.Worker/Program.cs | 14 + src/ClaudeDo.Worker/Runner/TaskRunner.cs | 60 +-- .../Services/StaleTaskRecovery.cs | 14 +- .../Services/TaskResetService.cs | 11 +- .../State/ITaskStateService.cs | 19 + src/ClaudeDo.Worker/State/TaskStateService.cs | 258 ++++++++++++ src/ClaudeDo.Worker/State/TransitionResult.cs | 3 + .../External/ExternalMcpServiceTests.cs | 5 +- .../Hub/PlanningHubTests.cs | 2 +- .../Infrastructure/FakeHubContext.cs | 39 ++ .../Infrastructure/TaskStateServiceBuilder.cs | 37 ++ .../Planning/PlanningChainCoordinatorTests.cs | 2 +- .../Services/QueueServiceSlotGuardTests.cs | 2 +- .../Services/QueueServiceTests.cs | 2 +- .../Services/StaleTaskRecoveryTests.cs | 3 +- .../Services/TaskResetServiceTests.cs | 4 +- .../State/TaskStateServiceTests.cs | 383 ++++++++++++++++++ 22 files changed, 845 insertions(+), 91 deletions(-) create mode 100644 src/ClaudeDo.Worker/State/ITaskStateService.cs create mode 100644 src/ClaudeDo.Worker/State/TaskStateService.cs create mode 100644 src/ClaudeDo.Worker/State/TransitionResult.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs create mode 100644 tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs diff --git a/src/ClaudeDo.Data/ClaudeDo.Data.csproj b/src/ClaudeDo.Data/ClaudeDo.Data.csproj index ec9a23e..81866bf 100644 --- a/src/ClaudeDo.Data/ClaudeDo.Data.csproj +++ b/src/ClaudeDo.Data/ClaudeDo.Data.csproj @@ -14,4 +14,9 @@ + + + + + diff --git a/src/ClaudeDo.Data/Repositories/TaskRepository.cs b/src/ClaudeDo.Data/Repositories/TaskRepository.cs index 48ca97a..5c76d14 100644 --- a/src/ClaudeDo.Data/Repositories/TaskRepository.cs +++ b/src/ClaudeDo.Data/Repositories/TaskRepository.cs @@ -88,7 +88,7 @@ public sealed class TaskRepository #region Status transitions - public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default) + internal async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default) { await _context.Tasks .Where(t => t.Id == taskId) @@ -97,7 +97,7 @@ public sealed class TaskRepository .SetProperty(t => t.StartedAt, startedAt), ct); } - public async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default) + internal async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default) { await _context.Tasks .Where(t => t.Id == taskId) @@ -107,7 +107,7 @@ public sealed class TaskRepository .SetProperty(t => t.Result, result), ct); } - public async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default) + internal async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default) { await _context.Tasks .Where(t => t.Id == taskId) @@ -124,7 +124,7 @@ public sealed class TaskRepository .ExecuteUpdateAsync(s => s.SetProperty(t => t.LogPath, logPath), ct); } - public async Task FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default) + internal async Task FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default) { var resultText = "[stale] " + reason; var now = DateTime.UtcNow; @@ -364,6 +364,17 @@ public sealed class TaskRepository return await _context.Tasks.AsNoTracking().FirstOrDefaultAsync(t => t.Id == taskId, ct); } + public async Task SetPlanningSessionTokenAsync( + string taskId, + string sessionToken, + CancellationToken ct = default) + { + await _context.Tasks + .Where(t => t.Id == taskId) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.PlanningSessionToken, sessionToken), ct); + } + public async Task UpdatePlanningSessionIdAsync( string parentId, string sessionId, diff --git a/src/ClaudeDo.Worker/External/ExternalMcpService.cs b/src/ClaudeDo.Worker/External/ExternalMcpService.cs index 3db5ef5..4573cbe 100644 --- a/src/ClaudeDo.Worker/External/ExternalMcpService.cs +++ b/src/ClaudeDo.Worker/External/ExternalMcpService.cs @@ -3,6 +3,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.State; using ModelContextProtocol.Server; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -32,19 +33,22 @@ public sealed class ExternalMcpService private readonly QueueService _queue; private readonly HubBroadcaster _broadcaster; private readonly TagRepository _tags; + private readonly ITaskStateService _state; public ExternalMcpService( TaskRepository tasks, ListRepository lists, QueueService queue, HubBroadcaster broadcaster, - TagRepository tags) + TagRepository tags, + ITaskStateService state) { _tasks = tasks; _lists = lists; _queue = queue; _broadcaster = broadcaster; _tags = tags; + _state = state; } [McpServerTool, Description("List all task lists available in ClaudeDo.")] @@ -173,14 +177,13 @@ public sealed class ExternalMcpService { case TaskStatus.Manual: await _tasks.ResetToManualAsync(taskId, cancellationToken); + await _broadcaster.TaskUpdated(taskId); break; case TaskStatus.Queued: - if (task.Status is TaskStatus.Running) - throw new InvalidOperationException("Cannot enqueue a running task."); - task.Status = TaskStatus.Queued; - await _tasks.UpdateAsync(task, cancellationToken); - _queue.WakeQueue(); + var enqueueResult = await _state.EnqueueAsync(taskId, cancellationToken); + if (!enqueueResult.Ok) + throw new InvalidOperationException(enqueueResult.Reason ?? "Cannot enqueue task."); break; default: @@ -189,7 +192,6 @@ public sealed class ExternalMcpService } var reload = (await _tasks.GetByIdAsync(taskId, cancellationToken))!; - await _broadcaster.TaskUpdated(taskId); return ToDto(reload); } diff --git a/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs b/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs index ce608ea..873d93c 100644 --- a/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs +++ b/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs @@ -1,5 +1,6 @@ using ClaudeDo.Data; using ClaudeDo.Data.Models; +using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -8,9 +9,15 @@ namespace ClaudeDo.Worker.Planning; public sealed class PlanningChainCoordinator { private readonly IDbContextFactory _dbFactory; + private readonly Func _state; - public PlanningChainCoordinator(IDbContextFactory dbFactory) - => _dbFactory = dbFactory; + public PlanningChainCoordinator( + IDbContextFactory dbFactory, + Func state) + { + _dbFactory = dbFactory; + _state = state; + } public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default) { @@ -56,6 +63,7 @@ public sealed class PlanningChainCoordinator if (child?.ParentTaskId is null) return null; var next = await ctx.Tasks + .AsNoTracking() .Where(t => t.ParentTaskId == child.ParentTaskId && t.SortOrder > child.SortOrder && t.Status == TaskStatus.Waiting) @@ -63,8 +71,7 @@ public sealed class PlanningChainCoordinator .FirstOrDefaultAsync(ct); if (next is null) return null; - next.Status = TaskStatus.Queued; - await ctx.SaveChangesAsync(ct); + await _state().UnblockAsync(next.Id, ct); return next.Id; } } diff --git a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs index 2d82d8d..875dcb4 100644 --- a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs +++ b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs @@ -6,6 +6,7 @@ using ClaudeDo.Data.Git; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; +using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -22,17 +23,20 @@ public sealed class PlanningSessionManager private readonly GitService _git; private readonly WorkerConfig _cfg; private readonly string _rootDirectory; + private readonly ITaskStateService? _state; // DI constructor. public PlanningSessionManager( IDbContextFactory factory, GitService git, WorkerConfig cfg, + ITaskStateService state, string rootDirectory) { _factory = factory; _git = git; _cfg = cfg; + _state = state; _rootDirectory = rootDirectory; } @@ -43,13 +47,15 @@ public sealed class PlanningSessionManager AppSettingsRepository settings, GitService git, WorkerConfig cfg, - string rootDirectory) + string rootDirectory, + ITaskStateService? state = null) { _tasksOverride = tasks; _listsOverride = lists; _settingsOverride = settings; _git = git; _cfg = cfg; + _state = state; _rootDirectory = rootDirectory; } @@ -114,8 +120,19 @@ public sealed class PlanningSessionManager // Session dir + token + prompt files. var token = GenerateToken(); - var started = await tasks.SetPlanningStartedAsync(taskId, token, ct) - ?? throw new InvalidOperationException("Failed to transition task to Planning."); + if (_state is not null) + { + var startResult = await _state.StartPlanningAsync(taskId, ct); + if (!startResult.Ok) + throw new InvalidOperationException(startResult.Reason ?? "Failed to transition task to Planning."); + await tasks.SetPlanningSessionTokenAsync(taskId, token, ct); + } + else + { + // Test fallback when no state-service is provided. + if (await tasks.SetPlanningStartedAsync(taskId, token, ct) is null) + throw new InvalidOperationException("Failed to transition task to Planning."); + } var sessionDir = Path.Combine(_rootDirectory, taskId); Directory.CreateDirectory(sessionDir); diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 2e19c36..bbd9044 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -7,6 +7,7 @@ using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Planning; using ClaudeDo.Worker.Runner; using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; var cfg = WorkerConfig.Load(); @@ -41,6 +42,17 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +// Centralized status mutation. Use a delegate for WakeQueue to break the +// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle; +// Slice 3 will replace this with IQueueWaker. +builder.Services.AddSingleton>(sp => () => sp.GetRequiredService()); +builder.Services.AddSingleton(sp => new TaskStateService( + sp.GetRequiredService>(), + sp.GetRequiredService(), + () => sp.GetRequiredService().WakeQueue(), + sp.GetRequiredService(), + sp.GetRequiredService>())); + // Agent file management. var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents"); Directory.CreateDirectory(agentsDir); @@ -65,6 +77,7 @@ builder.Services.AddSingleton(sp => sp.GetRequiredService>(), sp.GetRequiredService(), cfg, + sp.GetRequiredService(), planningSessionsDir)); builder.Services.AddSingleton(sp => new WindowsTerminalPlanningLauncher("wt.exe", cfg.ClaudeBin)); @@ -123,6 +136,7 @@ if (cfg.ExternalMcpPort > 0) externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); externalBuilder.Services.AddSingleton(app.Services.GetRequiredService>()); + externalBuilder.Services.AddSingleton(app.Services.GetRequiredService()); externalBuilder.Services.AddScoped(sp => sp.GetRequiredService>().CreateDbContext()); externalBuilder.Services.AddScoped(); diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index aae0ea0..cdada3d 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -3,7 +3,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; -using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -18,7 +18,7 @@ public sealed class TaskRunner private readonly ClaudeArgsBuilder _argsBuilder; private readonly WorkerConfig _cfg; private readonly ILogger _logger; - private readonly PlanningChainCoordinator _chain; + private readonly ITaskStateService _state; public TaskRunner( IClaudeProcess claude, @@ -28,7 +28,7 @@ public sealed class TaskRunner ClaudeArgsBuilder argsBuilder, WorkerConfig cfg, ILogger logger, - PlanningChainCoordinator chain) + ITaskStateService state) { _claude = claude; _dbFactory = dbFactory; @@ -37,7 +37,7 @@ public sealed class TaskRunner _argsBuilder = argsBuilder; _cfg = cfg; _logger = logger; - _chain = chain; + _state = state; } public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) @@ -91,11 +91,7 @@ public sealed class TaskRunner var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct); var now = DateTime.UtcNow; - using (var context = _dbFactory.CreateDbContext()) - { - var taskRepo = new TaskRepository(context); - await taskRepo.MarkRunningAsync(task.Id, now, ct); - } + await _state.StartRunningAsync(task.Id, now, ct); await _broadcaster.TaskStarted(slot, task.Id, now); // Build prompt. @@ -202,11 +198,7 @@ public sealed class TaskRunner } var now = DateTime.UtcNow; - using (var context = _dbFactory.CreateDbContext()) - { - var taskRepo = new TaskRepository(context); - await taskRepo.MarkRunningAsync(taskId, now, ct); - } + await _state.StartRunningAsync(taskId, now, ct); await _broadcaster.TaskStarted(slot, taskId, now); var nextRunNumber = lastRun.RunNumber + 1; @@ -332,34 +324,11 @@ public sealed class TaskRunner // is never left as 'running' because of a cancel that arrived // after the Claude run already succeeded. var finishedAt = DateTime.UtcNow; - using (var context = _dbFactory.CreateDbContext()) - { - var taskRepo = new TaskRepository(context); - await taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None); - if (task.ParentTaskId is not null) - await taskRepo.TryCompleteParentAsync(task.ParentTaskId, CancellationToken.None); - } + await _state.CompleteAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", task.Id, result.TurnCount, result.TokensIn, result.TokensOut); - - // Sequential planning chain: if this task has a parent, flip the next - // Waiting sibling to Queued so the queue pickup loop dispatches it next. - if (task.ParentTaskId is not null) - { - try - { - var advanced = await _chain.OnChildFinishedAsync( - task.Id, TaskStatus.Done, CancellationToken.None); - if (advanced is not null) - await _broadcaster.TaskUpdated(advanced); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", task.Id); - } - } } private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result) @@ -367,12 +336,7 @@ public sealed class TaskRunner // Intentionally does not accept a CancellationToken: this is the // terminal write for a failed task and must always be persisted. var finishedAt = DateTime.UtcNow; - using var context = _dbFactory.CreateDbContext(); - var taskRepo = new TaskRepository(context); - await taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None); - var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None); - if (justFailed?.ParentTaskId is not null) - await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None); + await _state.FailAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt); _logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown); @@ -384,15 +348,9 @@ public sealed class TaskRunner { var now = DateTime.UtcNow; // Terminal write — never cancel. - using var context = _dbFactory.CreateDbContext(); - var taskRepo = new TaskRepository(context); - await taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None); - var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None); - if (justFailed?.ParentTaskId is not null) - await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None); + await _state.FailAsync(taskId, now, error, CancellationToken.None); await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow); await _broadcaster.TaskFinished(slot, taskId, "failed", now); - await _broadcaster.TaskUpdated(taskId); } catch (Exception ex) { diff --git a/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs b/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs index 7ad3df7..c45ab4f 100644 --- a/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs +++ b/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs @@ -1,25 +1,21 @@ -using ClaudeDo.Data; -using ClaudeDo.Data.Repositories; -using Microsoft.EntityFrameworkCore; +using ClaudeDo.Worker.State; namespace ClaudeDo.Worker.Services; public sealed class StaleTaskRecovery : IHostedService { - private readonly IDbContextFactory _dbFactory; + private readonly ITaskStateService _state; private readonly ILogger _logger; - public StaleTaskRecovery(IDbContextFactory dbFactory, ILogger logger) + public StaleTaskRecovery(ITaskStateService state, ILogger logger) { - _dbFactory = dbFactory; + _state = state; _logger = logger; } public async Task StartAsync(CancellationToken cancellationToken) { - using var context = _dbFactory.CreateDbContext(); - var tasks = new TaskRepository(context); - var flipped = await tasks.FlipAllRunningToFailedAsync("worker restart", cancellationToken); + var flipped = await _state.RecoverStaleRunningAsync("worker restart", cancellationToken); if (flipped > 0) _logger.LogWarning("Stale task recovery: flipped {Count} running task(s) to failed", flipped); else diff --git a/src/ClaudeDo.Worker/Services/TaskResetService.cs b/src/ClaudeDo.Worker/Services/TaskResetService.cs index f8b3372..314dff5 100644 --- a/src/ClaudeDo.Worker/Services/TaskResetService.cs +++ b/src/ClaudeDo.Worker/Services/TaskResetService.cs @@ -3,6 +3,7 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Runner; +using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; @@ -13,17 +14,20 @@ public sealed class TaskResetService private readonly IDbContextFactory _dbFactory; private readonly WorktreeManager _wtManager; private readonly HubBroadcaster _broadcaster; + private readonly ITaskStateService _state; private readonly ILogger _logger; public TaskResetService( IDbContextFactory dbFactory, WorktreeManager wtManager, HubBroadcaster broadcaster, + ITaskStateService state, ILogger logger) { _dbFactory = dbFactory; _wtManager = wtManager; _broadcaster = broadcaster; + _state = state; _logger = logger; } @@ -55,16 +59,13 @@ public sealed class TaskResetService worktreeChanged = true; } - using (var ctx = _dbFactory.CreateDbContext()) - { - await new TaskRepository(ctx).ResetToManualAsync(taskId, ct); - } + await _state.ResetToIdleAsync(taskId, ct); await _broadcaster.TaskUpdated(taskId); if (worktreeChanged) await _broadcaster.WorktreeUpdated(taskId); - _logger.LogInformation("Reset task {TaskId} to Manual (worktree discarded: {Discarded})", taskId, worktreeChanged); + _logger.LogInformation("Reset task {TaskId} to Idle (worktree discarded: {Discarded})", taskId, worktreeChanged); await _broadcaster.WorkerLog($"Reset \"{task.Title}\"", WorkerLogLevel.Warn, DateTime.UtcNow); } } diff --git a/src/ClaudeDo.Worker/State/ITaskStateService.cs b/src/ClaudeDo.Worker/State/ITaskStateService.cs new file mode 100644 index 0000000..9d22df9 --- /dev/null +++ b/src/ClaudeDo.Worker/State/ITaskStateService.cs @@ -0,0 +1,19 @@ +namespace ClaudeDo.Worker.State; + +public interface ITaskStateService +{ + Task EnqueueAsync(string taskId, CancellationToken ct); + Task StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct); + Task CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct); + Task FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct); + Task CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct); + Task ResetToIdleAsync(string taskId, CancellationToken ct); + + Task StartPlanningAsync(string parentId, CancellationToken ct); + Task FinalizePlanningAsync(string parentId, CancellationToken ct); + + Task BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct); + Task UnblockAsync(string taskId, CancellationToken ct); + + Task RecoverStaleRunningAsync(string reason, CancellationToken ct); +} diff --git a/src/ClaudeDo.Worker/State/TaskStateService.cs b/src/ClaudeDo.Worker/State/TaskStateService.cs new file mode 100644 index 0000000..dc976b0 --- /dev/null +++ b/src/ClaudeDo.Worker/State/TaskStateService.cs @@ -0,0 +1,258 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Planning; +using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.State; + +public sealed class TaskStateService : ITaskStateService +{ + private readonly IDbContextFactory _dbFactory; + private readonly HubBroadcaster _broadcaster; + private readonly Action _wakeQueue; + private readonly PlanningChainCoordinator _chain; + private readonly ILogger _logger; + + public TaskStateService( + IDbContextFactory dbFactory, + HubBroadcaster broadcaster, + Action wakeQueue, + PlanningChainCoordinator chain, + ILogger logger) + { + _dbFactory = dbFactory; + _broadcaster = broadcaster; + _wakeQueue = wakeQueue; + _chain = chain; + _logger = logger; + } + + public async Task EnqueueAsync(string taskId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && t.Status != TaskStatus.Running) + .ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not found or already running."); + + _wakeQueue(); + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && t.Status != TaskStatus.Running) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Running) + .SetProperty(t => t.StartedAt, startedAt), ct); + + if (affected == 0) + return new TransitionResult(false, "Task already running or not found."); + + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct) + { + await using (var ctx = await _dbFactory.CreateDbContextAsync(ct)) + { + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && t.Status == TaskStatus.Running) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Done) + .SetProperty(t => t.FinishedAt, finishedAt) + .SetProperty(t => t.Result, result), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not running; cannot complete."); + } + + await OnChildTerminalAsync(taskId, TaskStatus.Done); + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct) + { + await using (var ctx = await _dbFactory.CreateDbContextAsync(ct)) + { + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && t.Status != TaskStatus.Done) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Failed) + .SetProperty(t => t.FinishedAt, finishedAt) + .SetProperty(t => t.Result, error), ct); + + if (affected == 0) + return new TransitionResult(false, "Task already done; cannot fail."); + } + + await OnChildTerminalAsync(taskId, TaskStatus.Failed); + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct) + { + await using (var ctx = await _dbFactory.CreateDbContextAsync(ct)) + { + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && + (t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued)) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Cancelled) + .SetProperty(t => t.FinishedAt, finishedAt), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not in cancellable state."); + } + + await OnChildTerminalAsync(taskId, TaskStatus.Cancelled); + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task ResetToIdleAsync(string taskId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == taskId && t.Status != TaskStatus.Running) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Idle) + .SetProperty(t => t.StartedAt, (DateTime?)null) + .SetProperty(t => t.FinishedAt, (DateTime?)null) + .SetProperty(t => t.Result, (string?)null), ct); + + if (affected == 0) + return new TransitionResult(false, "Task is running; cannot reset."); + + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task StartPlanningAsync(string parentId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == parentId && + (t.Status == TaskStatus.Manual || t.Status == TaskStatus.Idle)) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Planning) + .SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not in plannable state."); + + await _broadcaster.TaskUpdated(parentId); + return new TransitionResult(true, null); + } + + public async Task FinalizePlanningAsync(string parentId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized) + .SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow), ct); + + if (affected == 0) + return new TransitionResult(false, "No active planning session."); + + await _broadcaster.TaskUpdated(parentId); + return new TransitionResult(true, null); + } + + public async Task BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == taskId) + .ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not found."); + + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task UnblockAsync(string taskId, CancellationToken ct) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var affected = await ctx.Tasks + .Where(t => t.Id == taskId) + .ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct); + + if (affected == 0) + return new TransitionResult(false, "Task not found."); + + // Bridge to legacy chain layout: a Waiting predecessor-blocked sibling becomes Queued + // when its predecessor finishes. New layout (post-Slice 4) stores siblings as + // Status=Queued + BlockedByTaskId set, so this is a no-op for them. + await ctx.Tasks + .Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting) + .ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct); + + _wakeQueue(); + await _broadcaster.TaskUpdated(taskId); + return new TransitionResult(true, null); + } + + public async Task RecoverStaleRunningAsync(string reason, CancellationToken ct) + { + var resultText = "[stale] " + reason; + var now = DateTime.UtcNow; + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + return await ctx.Tasks + .Where(t => t.Status == TaskStatus.Running) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.Status, TaskStatus.Failed) + .SetProperty(t => t.FinishedAt, now) + .SetProperty(t => t.Result, resultText), ct); + } + + private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus) + { + // Terminal child writes are best-effort and use CancellationToken.None so the + // task lifecycle is never left partially completed because a caller cancelled. + string? parentId; + await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None)) + { + parentId = await ctx.Tasks + .AsNoTracking() + .Where(t => t.Id == taskId) + .Select(t => t.ParentTaskId) + .FirstOrDefaultAsync(CancellationToken.None); + } + if (parentId is null) return; + + try + { + await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId); + } + + try + { + await using var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None); + await new TaskRepository(ctx).TryCompleteParentAsync(parentId, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "TryCompleteParent failed for {ParentId}", parentId); + } + } +} diff --git a/src/ClaudeDo.Worker/State/TransitionResult.cs b/src/ClaudeDo.Worker/State/TransitionResult.cs new file mode 100644 index 0000000..fc7d66b --- /dev/null +++ b/src/ClaudeDo.Worker/State/TransitionResult.cs @@ -0,0 +1,3 @@ +namespace ClaudeDo.Worker.State; + +public sealed record TransitionResult(bool Ok, string? Reason); diff --git a/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs b/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs index 4ab79ba..1f1ce0c 100644 --- a/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/External/ExternalMcpServiceTests.cs @@ -94,7 +94,8 @@ public sealed class ExternalMcpServiceTests : IDisposable // we never call its WakeQueue/RunNow/CancelTask paths, so a real QueueService // built with the same approach used in QueueServiceTests is sufficient. private ExternalMcpService BuildSut(QueueService queue) => - new(_tasks, _lists, queue, _broadcaster, _tags); + new(_tasks, _lists, queue, _broadcaster, _tags, + TaskStateServiceBuilder.Build(_db.CreateFactory()).State); private QueueService CreateQueue() { @@ -113,7 +114,7 @@ public sealed class ExternalMcpServiceTests : IDisposable var wtManager = new WorktreeManager(new GitService(), dbFactory, cfg, NullLogger.Instance); var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg, - NullLogger.Instance); + NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); return new QueueService(dbFactory, runner, cfg, NullLogger.Instance); } diff --git a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs index b0abc28..6e78a03 100644 --- a/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Hub/PlanningHubTests.cs @@ -50,7 +50,7 @@ public sealed class PlanningHubTests : IDisposable { var hub = new WorkerHub( null!, null!, null!, null!, null!, null!, null!, null!, - _planning, _launcher, null!, null!); + _planning, _launcher, null!, null!, null!); hub.Clients = new FakeHubCallerClients(_proxy); hub.Context = new FakeHubCallerContext(); return hub; diff --git a/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs b/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs new file mode 100644 index 0000000..ee89bfa --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Infrastructure/FakeHubContext.cs @@ -0,0 +1,39 @@ +using ClaudeDo.Worker.Hub; +using Microsoft.AspNetCore.SignalR; + +namespace ClaudeDo.Worker.Tests.Infrastructure; + +public sealed record CapturedHubCall(string Method, object?[] Args); + +public sealed class CapturingClientProxy : IClientProxy +{ + public readonly List Calls = new(); + + public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) + { + Calls.Add(new CapturedHubCall(method, args)); + return Task.CompletedTask; + } +} + +public sealed class CapturingHubClients : IHubClients +{ + public CapturingClientProxy AllProxy { get; } = new(); + public IClientProxy All => AllProxy; + public IClientProxy AllExcept(IReadOnlyList excludedConnectionIds) => AllProxy; + public IClientProxy Client(string connectionId) => AllProxy; + public IClientProxy Clients(IReadOnlyList connectionIds) => AllProxy; + public IClientProxy Group(string groupName) => AllProxy; + public IClientProxy GroupExcept(string groupName, IReadOnlyList excludedConnectionIds) => AllProxy; + public IClientProxy Groups(IReadOnlyList groupNames) => AllProxy; + public IClientProxy User(string userId) => AllProxy; + public IClientProxy Users(IReadOnlyList userIds) => AllProxy; +} + +public sealed class CapturingHubContext : IHubContext +{ + private readonly CapturingHubClients _clients = new(); + public CapturingClientProxy Proxy => _clients.AllProxy; + public IHubClients Clients => _clients; + public IGroupManager Groups => throw new NotImplementedException(); +} diff --git a/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs b/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs new file mode 100644 index 0000000..3551677 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Infrastructure/TaskStateServiceBuilder.cs @@ -0,0 +1,37 @@ +using ClaudeDo.Data; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.State; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; + +namespace ClaudeDo.Worker.Tests.Infrastructure; + +/// Test-only helper that wires TaskStateService and PlanningChainCoordinator +/// against a shared DB factory, breaking the Func cycle between them. +public static class TaskStateServiceBuilder +{ + public sealed record Built( + TaskStateService State, + PlanningChainCoordinator Chain, + CapturingHubContext Hub, + Func WakeCount); + + public static Built Build(IDbContextFactory dbFactory) + { + var hub = new CapturingHubContext(); + var broadcaster = new HubBroadcaster(hub); + var wakeCount = new int[1]; + + TaskStateService? state = null; + var chain = new PlanningChainCoordinator(dbFactory, () => state!); + state = new TaskStateService( + dbFactory, + broadcaster, + () => Interlocked.Increment(ref wakeCount[0]), + chain, + NullLogger.Instance); + + return new Built(state, chain, hub, () => Volatile.Read(ref wakeCount[0])); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs index 5abea99..dc7af96 100644 --- a/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs @@ -17,7 +17,7 @@ public sealed class PlanningChainCoordinatorTests : IDisposable public PlanningChainCoordinatorTests() { _factory = _db.CreateFactory(); - _sut = new PlanningChainCoordinator(_factory); + _sut = TaskStateServiceBuilder.Build(_factory).Chain; _listId = Guid.NewGuid().ToString(); using var ctx = _factory.CreateDbContext(); ctx.Lists.Add(new ListEntity diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs index 5157372..73de686 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceSlotGuardTests.cs @@ -54,7 +54,7 @@ public sealed class QueueServiceSlotGuardTests : IDisposable var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger.Instance); var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, - NullLogger.Instance); + NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); return (service, fake); } diff --git a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs index 3b5a88a..50410f2 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/QueueServiceTests.cs @@ -55,7 +55,7 @@ public sealed class QueueServiceTests : IDisposable var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger.Instance); var argsBuilder = new ClaudeArgsBuilder(); var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg, - NullLogger.Instance); + NullLogger.Instance, TaskStateServiceBuilder.Build(dbFactory).State); var service = new QueueService(dbFactory, runner, _cfg, NullLogger.Instance); return (service, fake); } diff --git a/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs b/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs index f6452d6..993a513 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs @@ -54,7 +54,8 @@ public sealed class StaleTaskRecoveryTests : IDisposable await _tasks.AddAsync(running); await _tasks.AddAsync(queued); - var recovery = new StaleTaskRecovery(_db.CreateFactory(), NullLogger.Instance); + var built = TaskStateServiceBuilder.Build(_db.CreateFactory()); + var recovery = new StaleTaskRecovery(built.State, NullLogger.Instance); await recovery.StartAsync(CancellationToken.None); var r = await _tasks.GetByIdAsync(running.Id); diff --git a/tests/ClaudeDo.Worker.Tests/Services/TaskResetServiceTests.cs b/tests/ClaudeDo.Worker.Tests/Services/TaskResetServiceTests.cs index 3c147d0..5ef3054 100644 --- a/tests/ClaudeDo.Worker.Tests/Services/TaskResetServiceTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Services/TaskResetServiceTests.cs @@ -34,10 +34,12 @@ public class TaskResetServiceTests : IDisposable { var fakeHub = new RecordingHubContext(); var broadcaster = new HubBroadcaster(fakeHub); + var built = TaskStateServiceBuilder.Build(db.CreateFactory()); var svc = new TaskResetService( db.CreateFactory(), wtMgr, broadcaster, + built.State, NullLogger.Instance); return (svc, fakeHub.Proxy); } @@ -111,7 +113,7 @@ public class TaskResetServiceTests : IDisposable { var updated = await new TaskRepository(ctx).GetByIdAsync(task.Id); Assert.NotNull(updated); - Assert.Equal(TaskStatus.Manual, updated!.Status); + Assert.Equal(TaskStatus.Idle, updated!.Status); Assert.Null(updated.Result); Assert.Null(updated.StartedAt); Assert.Null(updated.FinishedAt); diff --git a/tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs b/tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs new file mode 100644 index 0000000..9028cad --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/State/TaskStateServiceTests.cs @@ -0,0 +1,383 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.State; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.State; + +public sealed class TaskStateServiceTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly TestDbContextFactory _factory; + private readonly TaskStateServiceBuilder.Built _built; + private readonly ITaskStateService _sut; + private readonly string _listId; + + public TaskStateServiceTests() + { + _factory = _db.CreateFactory(); + _built = TaskStateServiceBuilder.Build(_factory); + _sut = _built.State; + + _listId = Guid.NewGuid().ToString(); + using var ctx = _factory.CreateDbContext(); + ctx.Lists.Add(new ListEntity + { + Id = _listId, + Name = "Test", + CreatedAt = DateTime.UtcNow, + DefaultCommitType = "chore", + }); + ctx.SaveChanges(); + } + + public void Dispose() => _db.Dispose(); + + private async Task SeedTaskAsync(TaskStatus status, string? parentId = null, int sortOrder = 0) + { + var id = Guid.NewGuid().ToString(); + await using var ctx = _factory.CreateDbContext(); + ctx.Tasks.Add(new TaskEntity + { + Id = id, + ListId = _listId, + Title = "task", + Status = status, + CreatedAt = DateTime.UtcNow, + ParentTaskId = parentId, + SortOrder = sortOrder, + }); + await ctx.SaveChangesAsync(); + return id; + } + + private async Task GetStatusAsync(string id) + { + await using var ctx = _factory.CreateDbContext(); + return await ctx.Tasks.Where(t => t.Id == id).Select(t => t.Status).FirstAsync(); + } + + private async Task GetTaskAsync(string id) + { + await using var ctx = _factory.CreateDbContext(); + return await new TaskRepository(ctx).GetByIdAsync(id) ?? throw new InvalidOperationException($"task {id} not found"); + } + + // ─── EnqueueAsync ───────────────────────────────────────────────────── + + [Fact] + public async Task EnqueueAsync_FromIdle_TransitionsToQueued_AndWakesQueue() + { + var id = await SeedTaskAsync(TaskStatus.Idle); + var wakesBefore = _built.WakeCount(); + + var result = await _sut.EnqueueAsync(id, default); + + Assert.True(result.Ok); + Assert.Equal(TaskStatus.Queued, await GetStatusAsync(id)); + Assert.True(_built.WakeCount() > wakesBefore); + Assert.Contains(_built.Hub.Proxy.Calls, c => c.Method == "TaskUpdated"); + } + + [Fact] + public async Task EnqueueAsync_FromRunning_Rejects_AndDoesNotMutate() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.EnqueueAsync(id, default); + + Assert.False(result.Ok); + Assert.Equal(TaskStatus.Running, await GetStatusAsync(id)); + } + + // ─── StartRunningAsync ──────────────────────────────────────────────── + + [Fact] + public async Task StartRunningAsync_FromQueued_TransitionsToRunning_AndStampsStartedAt() + { + var id = await SeedTaskAsync(TaskStatus.Queued); + var startedAt = new DateTime(2026, 4, 27, 10, 0, 0, DateTimeKind.Utc); + + var result = await _sut.StartRunningAsync(id, startedAt, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(TaskStatus.Running, t.Status); + Assert.Equal(startedAt, t.StartedAt); + } + + [Fact] + public async Task StartRunningAsync_FromRunning_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.StartRunningAsync(id, DateTime.UtcNow, default); + + Assert.False(result.Ok); + } + + [Fact] + public async Task StartRunningAsync_TwoParallelClaims_ExactlyOneWins() + { + var id = await SeedTaskAsync(TaskStatus.Queued); + var startedAt = DateTime.UtcNow; + + // Two concurrent calls: only one ExecuteUpdate should affect a row. + var t1 = Task.Run(() => _sut.StartRunningAsync(id, startedAt, default)); + var t2 = Task.Run(() => _sut.StartRunningAsync(id, startedAt, default)); + var results = await Task.WhenAll(t1, t2); + + var winners = results.Count(r => r.Ok); + Assert.Equal(1, winners); + Assert.Equal(TaskStatus.Running, await GetStatusAsync(id)); + } + + // ─── CompleteAsync ──────────────────────────────────────────────────── + + [Fact] + public async Task CompleteAsync_FromRunning_TransitionsToDone() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.CompleteAsync(id, DateTime.UtcNow, "ok", default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(TaskStatus.Done, t.Status); + Assert.Equal("ok", t.Result); + Assert.NotNull(t.FinishedAt); + } + + [Fact] + public async Task CompleteAsync_FromQueued_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Queued); + + var result = await _sut.CompleteAsync(id, DateTime.UtcNow, "ok", default); + + Assert.False(result.Ok); + Assert.Equal(TaskStatus.Queued, await GetStatusAsync(id)); + } + + // ─── FailAsync ──────────────────────────────────────────────────────── + + [Fact] + public async Task FailAsync_FromRunning_TransitionsToFailed() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.FailAsync(id, DateTime.UtcNow, "boom", default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(TaskStatus.Failed, t.Status); + Assert.Equal("boom", t.Result); + } + + [Fact] + public async Task FailAsync_FromDone_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Done); + + var result = await _sut.FailAsync(id, DateTime.UtcNow, "boom", default); + + Assert.False(result.Ok); + Assert.Equal(TaskStatus.Done, await GetStatusAsync(id)); + } + + // ─── CancelAsync ────────────────────────────────────────────────────── + + [Fact] + public async Task CancelAsync_FromRunning_TransitionsToCancelled() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.CancelAsync(id, DateTime.UtcNow, default); + + Assert.True(result.Ok); + Assert.Equal(TaskStatus.Cancelled, await GetStatusAsync(id)); + } + + [Fact] + public async Task CancelAsync_FromDone_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Done); + + var result = await _sut.CancelAsync(id, DateTime.UtcNow, default); + + Assert.False(result.Ok); + Assert.Equal(TaskStatus.Done, await GetStatusAsync(id)); + } + + // ─── ResetToIdleAsync ───────────────────────────────────────────────── + + [Fact] + public async Task ResetToIdleAsync_FromFailed_ClearsTimestamps() + { + var id = await SeedTaskAsync(TaskStatus.Failed); + await using (var ctx = _factory.CreateDbContext()) + { + await ctx.Tasks.Where(t => t.Id == id) + .ExecuteUpdateAsync(s => s + .SetProperty(t => t.StartedAt, DateTime.UtcNow.AddMinutes(-5)) + .SetProperty(t => t.FinishedAt, DateTime.UtcNow.AddMinutes(-1)) + .SetProperty(t => t.Result, "old")); + } + + var result = await _sut.ResetToIdleAsync(id, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(TaskStatus.Idle, t.Status); + Assert.Null(t.StartedAt); + Assert.Null(t.FinishedAt); + Assert.Null(t.Result); + } + + [Fact] + public async Task ResetToIdleAsync_FromRunning_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.ResetToIdleAsync(id, default); + + Assert.False(result.Ok); + Assert.Equal(TaskStatus.Running, await GetStatusAsync(id)); + } + + // ─── StartPlanningAsync ─────────────────────────────────────────────── + + [Fact] + public async Task StartPlanningAsync_FromManual_FlipsStatus_AndPlanningPhase() + { + var id = await SeedTaskAsync(TaskStatus.Manual); + + var result = await _sut.StartPlanningAsync(id, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(TaskStatus.Planning, t.Status); + Assert.Equal(PlanningPhase.Active, t.PlanningPhase); + } + + [Fact] + public async Task StartPlanningAsync_FromRunning_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Running); + + var result = await _sut.StartPlanningAsync(id, default); + + Assert.False(result.Ok); + } + + // ─── FinalizePlanningAsync ──────────────────────────────────────────── + + [Fact] + public async Task FinalizePlanningAsync_OnActivePhase_TransitionsToFinalized() + { + var id = await SeedTaskAsync(TaskStatus.Manual); + await _sut.StartPlanningAsync(id, default); + + var result = await _sut.FinalizePlanningAsync(id, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(id); + Assert.Equal(PlanningPhase.Finalized, t.PlanningPhase); + Assert.NotNull(t.PlanningFinalizedAt); + } + + [Fact] + public async Task FinalizePlanningAsync_OnNonePhase_Rejects() + { + var id = await SeedTaskAsync(TaskStatus.Manual); + + var result = await _sut.FinalizePlanningAsync(id, default); + + Assert.False(result.Ok); + } + + // ─── BlockOnAsync / UnblockAsync ───────────────────────────────────── + + [Fact] + public async Task BlockOnAsync_SetsBlockedByTaskId() + { + var pred = await SeedTaskAsync(TaskStatus.Queued); + var task = await SeedTaskAsync(TaskStatus.Queued); + + var result = await _sut.BlockOnAsync(task, pred, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(task); + Assert.Equal(pred, t.BlockedByTaskId); + } + + [Fact] + public async Task UnblockAsync_ClearsBlockedByTaskId_AndWakesQueue() + { + var pred = await SeedTaskAsync(TaskStatus.Queued); + var task = await SeedTaskAsync(TaskStatus.Queued); + await _sut.BlockOnAsync(task, pred, default); + var wakesBefore = _built.WakeCount(); + + var result = await _sut.UnblockAsync(task, default); + + Assert.True(result.Ok); + var t = await GetTaskAsync(task); + Assert.Null(t.BlockedByTaskId); + Assert.True(_built.WakeCount() > wakesBefore); + } + + [Fact] + public async Task UnblockAsync_OnWaitingTask_FlipsToQueued() + { + // Bridge to legacy chain layout: a Status=Waiting sibling becomes Queued on unblock. + var task = await SeedTaskAsync(TaskStatus.Waiting); + + var result = await _sut.UnblockAsync(task, default); + + Assert.True(result.Ok); + Assert.Equal(TaskStatus.Queued, await GetStatusAsync(task)); + } + + // ─── RecoverStaleRunningAsync ───────────────────────────────────────── + + [Fact] + public async Task RecoverStaleRunningAsync_FlipsAllRunningToFailed_ReturnsCount() + { + var r1 = await SeedTaskAsync(TaskStatus.Running); + var r2 = await SeedTaskAsync(TaskStatus.Running); + var q = await SeedTaskAsync(TaskStatus.Queued); + + var count = await _sut.RecoverStaleRunningAsync("worker restart", default); + + Assert.Equal(2, count); + Assert.Equal(TaskStatus.Failed, await GetStatusAsync(r1)); + Assert.Equal(TaskStatus.Failed, await GetStatusAsync(r2)); + Assert.Equal(TaskStatus.Queued, await GetStatusAsync(q)); + var t = await GetTaskAsync(r1); + Assert.StartsWith("[stale] ", t.Result); + } + + // ─── Child terminal → chain advance ─────────────────────────────────── + + [Fact] + public async Task CompleteAsync_OnChild_AdvancesNextWaitingSibling() + { + var parent = await SeedTaskAsync(TaskStatus.Planned); + var c0 = await SeedTaskAsync(TaskStatus.Running, parentId: parent, sortOrder: 0); + var c1 = await SeedTaskAsync(TaskStatus.Waiting, parentId: parent, sortOrder: 1); + var c2 = await SeedTaskAsync(TaskStatus.Waiting, parentId: parent, sortOrder: 2); + + var result = await _sut.CompleteAsync(c0, DateTime.UtcNow, "ok", default); + + Assert.True(result.Ok); + Assert.Equal(TaskStatus.Done, await GetStatusAsync(c0)); + // Next sibling was Waiting → chain coordinator unblocks → Queued. + Assert.Equal(TaskStatus.Queued, await GetStatusAsync(c1)); + // Subsequent sibling untouched. + Assert.Equal(TaskStatus.Waiting, await GetStatusAsync(c2)); + } +}