Slice 4 of the worker state consolidation refactor. Eliminates the "queue never picks up planning tasks" bug structurally by routing both the manager and MCP finalize paths through TaskStateService and PlanningChainCoordinator.SetupChainAsync, where the auto-wake on enqueue guarantees the queue picker claims the first child immediately. - Delete TaskRepository.FinalizePlanningAsync; PlanningSessionManager now orchestrates via _state.FinalizePlanningAsync + _chain.SetupChainAsync. - Rename QueueSubtasksSequentiallyAsync to SetupChainAsync (internal); layout is now Status=Queued + BlockedByTaskId, with auto-attached agent tag. - OnChildFinishedAsync looks up the successor by BlockedByTaskId, drops the legacy Waiting status lookup. - PlanningMcpService.Finalize routes through state+chain; EditableStatuses drops Waiting and adds Idle; gate uses PlanningPhase==Active. - TaskStateService.FinalizePlanningAsync clears the planning session token. - UI: TaskRowViewModel adds BlockedByTaskId; IsQueued/IsWaiting reflect the new layout; TasksIslandViewModel.RemoveFromQueueAsync clears BlockedByTaskId on dequeue. - New regression test PlanningEndToEndTests.FinalizeAsync_FirstChildIs ClaimedByPicker_WithinDeadline asserts the picker claims the first child within 200ms with no manual WakeQueue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
261 lines
10 KiB
C#
261 lines
10 KiB
C#
using ClaudeDo.Data;
|
|
using ClaudeDo.Data.Models;
|
|
using ClaudeDo.Data.Repositories;
|
|
using ClaudeDo.Worker.Hub;
|
|
using ClaudeDo.Worker.Planning;
|
|
using ClaudeDo.Worker.Queue;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
|
|
|
namespace ClaudeDo.Worker.State;
|
|
|
|
public sealed class TaskStateService : ITaskStateService
|
|
{
|
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
|
private readonly HubBroadcaster _broadcaster;
|
|
private readonly IQueueWaker _waker;
|
|
private readonly PlanningChainCoordinator _chain;
|
|
private readonly ILogger<TaskStateService> _logger;
|
|
|
|
public TaskStateService(
|
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
|
HubBroadcaster broadcaster,
|
|
IQueueWaker waker,
|
|
PlanningChainCoordinator chain,
|
|
ILogger<TaskStateService> logger)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_broadcaster = broadcaster;
|
|
_waker = waker;
|
|
_chain = chain;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found or already running.");
|
|
|
|
_waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Running)
|
|
.SetProperty(t => t.StartedAt, startedAt), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task already running or not found.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Done)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, result), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not running; cannot complete.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Done);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Done)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, error), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task already done; cannot fail.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Failed);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId &&
|
|
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued))
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Cancelled)
|
|
.SetProperty(t => t.FinishedAt, finishedAt), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not in cancellable state.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Cancelled);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Idle)
|
|
.SetProperty(t => t.StartedAt, (DateTime?)null)
|
|
.SetProperty(t => t.FinishedAt, (DateTime?)null)
|
|
.SetProperty(t => t.Result, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task is running; cannot reset.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == parentId &&
|
|
(t.Status == TaskStatus.Manual || t.Status == TaskStatus.Idle))
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Planning)
|
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not in plannable state.");
|
|
|
|
await _broadcaster.TaskUpdated(parentId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized)
|
|
.SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow)
|
|
.SetProperty(t => t.PlanningSessionToken, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "No active planning session.");
|
|
|
|
await _broadcaster.TaskUpdated(parentId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found.");
|
|
|
|
// Bridge to legacy chain layout: a Waiting predecessor-blocked sibling becomes Queued
|
|
// when its predecessor finishes. New layout (post-Slice 4) stores siblings as
|
|
// Status=Queued + BlockedByTaskId set, so this is a no-op for them.
|
|
await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Waiting)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
|
|
|
_waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct)
|
|
{
|
|
var resultText = "[stale] " + reason;
|
|
var now = DateTime.UtcNow;
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
return await ctx.Tasks
|
|
.Where(t => t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
|
.SetProperty(t => t.FinishedAt, now)
|
|
.SetProperty(t => t.Result, resultText), ct);
|
|
}
|
|
|
|
private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus)
|
|
{
|
|
// Terminal child writes are best-effort and use CancellationToken.None so the
|
|
// task lifecycle is never left partially completed because a caller cancelled.
|
|
string? parentId;
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
|
|
{
|
|
parentId = await ctx.Tasks
|
|
.AsNoTracking()
|
|
.Where(t => t.Id == taskId)
|
|
.Select(t => t.ParentTaskId)
|
|
.FirstOrDefaultAsync(CancellationToken.None);
|
|
}
|
|
if (parentId is null) return;
|
|
|
|
try
|
|
{
|
|
await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId);
|
|
}
|
|
|
|
try
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None);
|
|
await new TaskRepository(ctx).TryCompleteParentAsync(parentId, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "TryCompleteParent failed for {ParentId}", parentId);
|
|
}
|
|
}
|
|
}
|