refactor(worker/state): introduce TaskStateService and route mutations through it

Slice 2 of the worker state consolidation refactor (spec sections 2 and 8).

Adds Worker/State/ITaskStateService + TaskStateService as the single component
that mutates Status, PlanningPhase, and BlockedByTaskId. Each transition is one
atomic ExecuteUpdate with a WHERE filter on the expected source status, so
parallel claims are TOCTOU-free. Side effects (queue wake on -> Queued, hub
TaskUpdated broadcast, chain advance + parent completion on terminal child)
are owned by the service so callers no longer need to remember them.

Migrated callers (mechanical, behavior preserved):
- TaskRunner: HandleSuccess/HandleFailure/MarkFailed/RunAsync/ContinueAsync
- StaleTaskRecovery: bulk recover stale Running tasks
- TaskResetService: status flip (worktree cleanup stays in service)
- PlanningSessionManager.StartAsync: status flip via state, token write via repo
- PlanningChainCoordinator.OnChildFinishedAsync: routes the next-sibling write
  through state.UnblockAsync (Slice 4 finishes the rewrite)
- ExternalMcpService.UpdateTaskStatus: Queued case via state.EnqueueAsync

Repo Mark*Async helpers (MarkRunning/MarkDone/MarkFailed/FlipAllRunningToFailed)
are now internal; ClaudeDo.Data grants InternalsVisibleTo to ClaudeDo.Worker
and ClaudeDo.Worker.Tests for the existing repo-level tests.

DI: TaskStateService is registered as Singleton in both the main app and the
external-MCP app; the queue-wake delegate captures sp -> QueueService.WakeQueue
to break the TaskStateService -> QueueService -> TaskRunner -> TaskStateService
construction cycle. PlanningChainCoordinator takes Func<ITaskStateService> for
the same reason; Slice 3 will replace both with IQueueWaker.

Tests: TaskStateServiceTests covers happy + reject for every transition, the
parallel StartRunningAsync claim race, child-terminal chain advancement, and
stale recovery. Existing service/repo tests are updated to construct the new
state-service via a TaskStateServiceBuilder helper. Pre-existing constructor
drift in QueueService/ExternalMcp/PlanningHub tests is patched to keep the
test project building (the surrounding test logic is otherwise untouched).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-04-27 11:31:57 +02:00
parent cf7a6e413c
commit 8823265e5a
22 changed files with 845 additions and 91 deletions

View File

@@ -14,4 +14,9 @@
</PackageReference>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ClaudeDo.Worker" />
<InternalsVisibleTo Include="ClaudeDo.Worker.Tests" />
</ItemGroup>
</Project>

View File

@@ -88,7 +88,7 @@ public sealed class TaskRepository
#region Status transitions
public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default)
internal async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default)
{
await _context.Tasks
.Where(t => t.Id == taskId)
@@ -97,7 +97,7 @@ public sealed class TaskRepository
.SetProperty(t => t.StartedAt, startedAt), ct);
}
public async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
internal async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
{
await _context.Tasks
.Where(t => t.Id == taskId)
@@ -107,7 +107,7 @@ public sealed class TaskRepository
.SetProperty(t => t.Result, result), ct);
}
public async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
internal async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
{
await _context.Tasks
.Where(t => t.Id == taskId)
@@ -124,7 +124,7 @@ public sealed class TaskRepository
.ExecuteUpdateAsync(s => s.SetProperty(t => t.LogPath, logPath), ct);
}
public async Task<int> FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default)
internal async Task<int> FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default)
{
var resultText = "[stale] " + reason;
var now = DateTime.UtcNow;
@@ -364,6 +364,17 @@ public sealed class TaskRepository
return await _context.Tasks.AsNoTracking().FirstOrDefaultAsync(t => t.Id == taskId, ct);
}
public async Task SetPlanningSessionTokenAsync(
string taskId,
string sessionToken,
CancellationToken ct = default)
{
await _context.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.PlanningSessionToken, sessionToken), ct);
}
public async Task UpdatePlanningSessionIdAsync(
string parentId,
string sessionId,

View File

@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Services;
using ClaudeDo.Worker.State;
using ModelContextProtocol.Server;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
@@ -32,19 +33,22 @@ public sealed class ExternalMcpService
private readonly QueueService _queue;
private readonly HubBroadcaster _broadcaster;
private readonly TagRepository _tags;
private readonly ITaskStateService _state;
public ExternalMcpService(
TaskRepository tasks,
ListRepository lists,
QueueService queue,
HubBroadcaster broadcaster,
TagRepository tags)
TagRepository tags,
ITaskStateService state)
{
_tasks = tasks;
_lists = lists;
_queue = queue;
_broadcaster = broadcaster;
_tags = tags;
_state = state;
}
[McpServerTool, Description("List all task lists available in ClaudeDo.")]
@@ -173,14 +177,13 @@ public sealed class ExternalMcpService
{
case TaskStatus.Manual:
await _tasks.ResetToManualAsync(taskId, cancellationToken);
await _broadcaster.TaskUpdated(taskId);
break;
case TaskStatus.Queued:
if (task.Status is TaskStatus.Running)
throw new InvalidOperationException("Cannot enqueue a running task.");
task.Status = TaskStatus.Queued;
await _tasks.UpdateAsync(task, cancellationToken);
_queue.WakeQueue();
var enqueueResult = await _state.EnqueueAsync(taskId, cancellationToken);
if (!enqueueResult.Ok)
throw new InvalidOperationException(enqueueResult.Reason ?? "Cannot enqueue task.");
break;
default:
@@ -189,7 +192,6 @@ public sealed class ExternalMcpService
}
var reload = (await _tasks.GetByIdAsync(taskId, cancellationToken))!;
await _broadcaster.TaskUpdated(taskId);
return ToDto(reload);
}

View File

@@ -1,5 +1,6 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Worker.State;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
@@ -8,9 +9,15 @@ namespace ClaudeDo.Worker.Planning;
public sealed class PlanningChainCoordinator
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly Func<ITaskStateService> _state;
public PlanningChainCoordinator(IDbContextFactory<ClaudeDoDbContext> dbFactory)
=> _dbFactory = dbFactory;
public PlanningChainCoordinator(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
Func<ITaskStateService> state)
{
_dbFactory = dbFactory;
_state = state;
}
public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default)
{
@@ -56,6 +63,7 @@ public sealed class PlanningChainCoordinator
if (child?.ParentTaskId is null) return null;
var next = await ctx.Tasks
.AsNoTracking()
.Where(t => t.ParentTaskId == child.ParentTaskId
&& t.SortOrder > child.SortOrder
&& t.Status == TaskStatus.Waiting)
@@ -63,8 +71,7 @@ public sealed class PlanningChainCoordinator
.FirstOrDefaultAsync(ct);
if (next is null) return null;
next.Status = TaskStatus.Queued;
await ctx.SaveChangesAsync(ct);
await _state().UnblockAsync(next.Id, ct);
return next.Id;
}
}

View File

@@ -6,6 +6,7 @@ using ClaudeDo.Data.Git;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.State;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
@@ -22,17 +23,20 @@ public sealed class PlanningSessionManager
private readonly GitService _git;
private readonly WorkerConfig _cfg;
private readonly string _rootDirectory;
private readonly ITaskStateService? _state;
// DI constructor.
public PlanningSessionManager(
IDbContextFactory<ClaudeDoDbContext> factory,
GitService git,
WorkerConfig cfg,
ITaskStateService state,
string rootDirectory)
{
_factory = factory;
_git = git;
_cfg = cfg;
_state = state;
_rootDirectory = rootDirectory;
}
@@ -43,13 +47,15 @@ public sealed class PlanningSessionManager
AppSettingsRepository settings,
GitService git,
WorkerConfig cfg,
string rootDirectory)
string rootDirectory,
ITaskStateService? state = null)
{
_tasksOverride = tasks;
_listsOverride = lists;
_settingsOverride = settings;
_git = git;
_cfg = cfg;
_state = state;
_rootDirectory = rootDirectory;
}
@@ -114,8 +120,19 @@ public sealed class PlanningSessionManager
// Session dir + token + prompt files.
var token = GenerateToken();
var started = await tasks.SetPlanningStartedAsync(taskId, token, ct)
?? throw new InvalidOperationException("Failed to transition task to Planning.");
if (_state is not null)
{
var startResult = await _state.StartPlanningAsync(taskId, ct);
if (!startResult.Ok)
throw new InvalidOperationException(startResult.Reason ?? "Failed to transition task to Planning.");
await tasks.SetPlanningSessionTokenAsync(taskId, token, ct);
}
else
{
// Test fallback when no state-service is provided.
if (await tasks.SetPlanningStartedAsync(taskId, token, ct) is null)
throw new InvalidOperationException("Failed to transition task to Planning.");
}
var sessionDir = Path.Combine(_rootDirectory, taskId);
Directory.CreateDirectory(sessionDir);

View File

@@ -7,6 +7,7 @@ using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.Services;
using ClaudeDo.Worker.State;
using Microsoft.EntityFrameworkCore;
var cfg = WorkerConfig.Load();
@@ -41,6 +42,17 @@ builder.Services.AddSingleton<PlanningAggregator>();
builder.Services.AddSingleton<PlanningMergeOrchestrator>();
builder.Services.AddSingleton<PlanningChainCoordinator>();
// Centralized status mutation. Use a delegate for WakeQueue to break the
// TaskStateService → QueueService → TaskRunner → TaskStateService DI cycle;
// Slice 3 will replace this with IQueueWaker.
builder.Services.AddSingleton<Func<ITaskStateService>>(sp => () => sp.GetRequiredService<ITaskStateService>());
builder.Services.AddSingleton<ITaskStateService>(sp => new TaskStateService(
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
sp.GetRequiredService<HubBroadcaster>(),
() => sp.GetRequiredService<QueueService>().WakeQueue(),
sp.GetRequiredService<PlanningChainCoordinator>(),
sp.GetRequiredService<ILogger<TaskStateService>>()));
// Agent file management.
var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents");
Directory.CreateDirectory(agentsDir);
@@ -65,6 +77,7 @@ builder.Services.AddSingleton(sp =>
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>(),
sp.GetRequiredService<GitService>(),
cfg,
sp.GetRequiredService<ITaskStateService>(),
planningSessionsDir));
builder.Services.AddSingleton<IPlanningTerminalLauncher>(sp =>
new WindowsTerminalPlanningLauncher("wt.exe", cfg.ClaudeBin));
@@ -123,6 +136,7 @@ if (cfg.ExternalMcpPort > 0)
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<HubBroadcaster>());
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<QueueService>());
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>());
externalBuilder.Services.AddSingleton(app.Services.GetRequiredService<ITaskStateService>());
externalBuilder.Services.AddScoped<ClaudeDoDbContext>(sp =>
sp.GetRequiredService<IDbContextFactory<ClaudeDoDbContext>>().CreateDbContext());
externalBuilder.Services.AddScoped<TaskRepository>();

View File

@@ -3,7 +3,7 @@ using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.State;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
@@ -18,7 +18,7 @@ public sealed class TaskRunner
private readonly ClaudeArgsBuilder _argsBuilder;
private readonly WorkerConfig _cfg;
private readonly ILogger<TaskRunner> _logger;
private readonly PlanningChainCoordinator _chain;
private readonly ITaskStateService _state;
public TaskRunner(
IClaudeProcess claude,
@@ -28,7 +28,7 @@ public sealed class TaskRunner
ClaudeArgsBuilder argsBuilder,
WorkerConfig cfg,
ILogger<TaskRunner> logger,
PlanningChainCoordinator chain)
ITaskStateService state)
{
_claude = claude;
_dbFactory = dbFactory;
@@ -37,7 +37,7 @@ public sealed class TaskRunner
_argsBuilder = argsBuilder;
_cfg = cfg;
_logger = logger;
_chain = chain;
_state = state;
}
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
@@ -91,11 +91,7 @@ public sealed class TaskRunner
var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct);
var now = DateTime.UtcNow;
using (var context = _dbFactory.CreateDbContext())
{
var taskRepo = new TaskRepository(context);
await taskRepo.MarkRunningAsync(task.Id, now, ct);
}
await _state.StartRunningAsync(task.Id, now, ct);
await _broadcaster.TaskStarted(slot, task.Id, now);
// Build prompt.
@@ -202,11 +198,7 @@ public sealed class TaskRunner
}
var now = DateTime.UtcNow;
using (var context = _dbFactory.CreateDbContext())
{
var taskRepo = new TaskRepository(context);
await taskRepo.MarkRunningAsync(taskId, now, ct);
}
await _state.StartRunningAsync(taskId, now, ct);
await _broadcaster.TaskStarted(slot, taskId, now);
var nextRunNumber = lastRun.RunNumber + 1;
@@ -332,34 +324,11 @@ public sealed class TaskRunner
// is never left as 'running' because of a cancel that arrived
// after the Claude run already succeeded.
var finishedAt = DateTime.UtcNow;
using (var context = _dbFactory.CreateDbContext())
{
var taskRepo = new TaskRepository(context);
await taskRepo.MarkDoneAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
if (task.ParentTaskId is not null)
await taskRepo.TryCompleteParentAsync(task.ParentTaskId, CancellationToken.None);
}
await _state.CompleteAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow);
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
// Sequential planning chain: if this task has a parent, flip the next
// Waiting sibling to Queued so the queue pickup loop dispatches it next.
if (task.ParentTaskId is not null)
{
try
{
var advanced = await _chain.OnChildFinishedAsync(
task.Id, TaskStatus.Done, CancellationToken.None);
if (advanced is not null)
await _broadcaster.TaskUpdated(advanced);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", task.Id);
}
}
}
private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result)
@@ -367,12 +336,7 @@ public sealed class TaskRunner
// Intentionally does not accept a CancellationToken: this is the
// terminal write for a failed task and must always be persisted.
var finishedAt = DateTime.UtcNow;
using var context = _dbFactory.CreateDbContext();
var taskRepo = new TaskRepository(context);
await taskRepo.MarkFailedAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None);
var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None);
if (justFailed?.ParentTaskId is not null)
await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None);
await _state.FailAsync(taskId, finishedAt, result.ErrorMarkdown, CancellationToken.None);
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
await _broadcaster.TaskFinished(slot, taskId, "failed", finishedAt);
_logger.LogWarning("Task {TaskId} failed (turns={Turns}): {Error}", taskId, result.TurnCount, result.ErrorMarkdown);
@@ -384,15 +348,9 @@ public sealed class TaskRunner
{
var now = DateTime.UtcNow;
// Terminal write — never cancel.
using var context = _dbFactory.CreateDbContext();
var taskRepo = new TaskRepository(context);
await taskRepo.MarkFailedAsync(taskId, now, error, CancellationToken.None);
var justFailed = await taskRepo.GetByIdAsync(taskId, CancellationToken.None);
if (justFailed?.ParentTaskId is not null)
await taskRepo.TryCompleteParentAsync(justFailed.ParentTaskId, CancellationToken.None);
await _state.FailAsync(taskId, now, error, CancellationToken.None);
await _broadcaster.WorkerLog($"Finished \"{taskTitle}\" (failed)", WorkerLogLevel.Error, DateTime.UtcNow);
await _broadcaster.TaskFinished(slot, taskId, "failed", now);
await _broadcaster.TaskUpdated(taskId);
}
catch (Exception ex)
{

View File

@@ -1,25 +1,21 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Repositories;
using Microsoft.EntityFrameworkCore;
using ClaudeDo.Worker.State;
namespace ClaudeDo.Worker.Services;
public sealed class StaleTaskRecovery : IHostedService
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly ITaskStateService _state;
private readonly ILogger<StaleTaskRecovery> _logger;
public StaleTaskRecovery(IDbContextFactory<ClaudeDoDbContext> dbFactory, ILogger<StaleTaskRecovery> logger)
public StaleTaskRecovery(ITaskStateService state, ILogger<StaleTaskRecovery> logger)
{
_dbFactory = dbFactory;
_state = state;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
using var context = _dbFactory.CreateDbContext();
var tasks = new TaskRepository(context);
var flipped = await tasks.FlipAllRunningToFailedAsync("worker restart", cancellationToken);
var flipped = await _state.RecoverStaleRunningAsync("worker restart", cancellationToken);
if (flipped > 0)
_logger.LogWarning("Stale task recovery: flipped {Count} running task(s) to failed", flipped);
else

View File

@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.State;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
@@ -13,17 +14,20 @@ public sealed class TaskResetService
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly WorktreeManager _wtManager;
private readonly HubBroadcaster _broadcaster;
private readonly ITaskStateService _state;
private readonly ILogger<TaskResetService> _logger;
public TaskResetService(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
WorktreeManager wtManager,
HubBroadcaster broadcaster,
ITaskStateService state,
ILogger<TaskResetService> logger)
{
_dbFactory = dbFactory;
_wtManager = wtManager;
_broadcaster = broadcaster;
_state = state;
_logger = logger;
}
@@ -55,16 +59,13 @@ public sealed class TaskResetService
worktreeChanged = true;
}
using (var ctx = _dbFactory.CreateDbContext())
{
await new TaskRepository(ctx).ResetToManualAsync(taskId, ct);
}
await _state.ResetToIdleAsync(taskId, ct);
await _broadcaster.TaskUpdated(taskId);
if (worktreeChanged)
await _broadcaster.WorktreeUpdated(taskId);
_logger.LogInformation("Reset task {TaskId} to Manual (worktree discarded: {Discarded})", taskId, worktreeChanged);
_logger.LogInformation("Reset task {TaskId} to Idle (worktree discarded: {Discarded})", taskId, worktreeChanged);
await _broadcaster.WorkerLog($"Reset \"{task.Title}\"", WorkerLogLevel.Warn, DateTime.UtcNow);
}
}

View File

@@ -0,0 +1,19 @@
namespace ClaudeDo.Worker.State;
public interface ITaskStateService
{
Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct);
Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct);
Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct);
Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct);
Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct);
Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct);
Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct);
Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct);
Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct);
Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct);
Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct);
}

View File

@@ -0,0 +1,258 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.State;
public sealed class TaskStateService : ITaskStateService
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly HubBroadcaster _broadcaster;
private readonly Action _wakeQueue;
private readonly PlanningChainCoordinator _chain;
private readonly ILogger<TaskStateService> _logger;
public TaskStateService(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
HubBroadcaster broadcaster,
Action wakeQueue,
PlanningChainCoordinator chain,
ILogger<TaskStateService> logger)
{
_dbFactory = dbFactory;
_broadcaster = broadcaster;
_wakeQueue = wakeQueue;
_chain = chain;
_logger = logger;
}
public async Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found or already running.");
_wakeQueue();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Running)
.SetProperty(t => t.StartedAt, startedAt), ct);
if (affected == 0)
return new TransitionResult(false, "Task already running or not found.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Done)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, result), ct);
if (affected == 0)
return new TransitionResult(false, "Task not running; cannot complete.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Done);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Done)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Failed)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, error), ct);
if (affected == 0)
return new TransitionResult(false, "Task already done; cannot fail.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Failed);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId &&
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued))
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Cancelled)
.SetProperty(t => t.FinishedAt, finishedAt), ct);
if (affected == 0)
return new TransitionResult(false, "Task not in cancellable state.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Cancelled);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Idle)
.SetProperty(t => t.StartedAt, (DateTime?)null)
.SetProperty(t => t.FinishedAt, (DateTime?)null)
.SetProperty(t => t.Result, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task is running; cannot reset.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == parentId &&
(t.Status == TaskStatus.Manual || t.Status == TaskStatus.Idle))
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Planning)
.SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct);
if (affected == 0)
return new TransitionResult(false, "Task not in plannable state.");
await _broadcaster.TaskUpdated(parentId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized)
.SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow), ct);
if (affected == 0)
return new TransitionResult(false, "No active planning session.");
await _broadcaster.TaskUpdated(parentId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found.");
// Bridge to legacy chain layout: a Waiting predecessor-blocked sibling becomes Queued
// when its predecessor finishes. New layout (post-Slice 4) stores siblings as
// Status=Queued + BlockedByTaskId set, so this is a no-op for them.
await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
_wakeQueue();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct)
{
var resultText = "[stale] " + reason;
var now = DateTime.UtcNow;
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
return await ctx.Tasks
.Where(t => t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Failed)
.SetProperty(t => t.FinishedAt, now)
.SetProperty(t => t.Result, resultText), ct);
}
private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus)
{
// Terminal child writes are best-effort and use CancellationToken.None so the
// task lifecycle is never left partially completed because a caller cancelled.
string? parentId;
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
{
parentId = await ctx.Tasks
.AsNoTracking()
.Where(t => t.Id == taskId)
.Select(t => t.ParentTaskId)
.FirstOrDefaultAsync(CancellationToken.None);
}
if (parentId is null) return;
try
{
await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId);
}
try
{
await using var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None);
await new TaskRepository(ctx).TryCompleteParentAsync(parentId, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "TryCompleteParent failed for {ParentId}", parentId);
}
}
}

View File

@@ -0,0 +1,3 @@
namespace ClaudeDo.Worker.State;
public sealed record TransitionResult(bool Ok, string? Reason);