OverrideSlotService dispatches RunAsync before calling StartRunningAsync, so a preflight failure (list not found, worktree setup) can reach MarkFailed while the task is still Queued. The guard is intentional, not dead code. - Add comment in FailAsync explaining the OverrideSlotService preflight gap - Add FailAsync_FromQueued_TransitionsToFailed test - Update CLAUDE.md transition table with the precise rationale Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
450 lines
19 KiB
C#
450 lines
19 KiB
C#
using ClaudeDo.Data;
|
|
using ClaudeDo.Data.Models;
|
|
using ClaudeDo.Data.Repositories;
|
|
using ClaudeDo.Worker.Hub;
|
|
using ClaudeDo.Worker.Planning;
|
|
using ClaudeDo.Worker.Queue;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
|
|
|
namespace ClaudeDo.Worker.State;
|
|
|
|
public sealed class TaskStateService : ITaskStateService
|
|
{
|
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
|
private readonly HubBroadcaster _broadcaster;
|
|
private readonly IQueueWaker _waker;
|
|
private readonly PlanningChainCoordinator _chain;
|
|
private readonly ILogger<TaskStateService> _logger;
|
|
|
|
public TaskStateService(
|
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
|
HubBroadcaster broadcaster,
|
|
IQueueWaker waker,
|
|
PlanningChainCoordinator chain,
|
|
ILogger<TaskStateService> logger)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_broadcaster = broadcaster;
|
|
_waker = waker;
|
|
_chain = chain;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
|
|
if (await IsDraftChildAsync(ctx, taskId, ct))
|
|
return new TransitionResult(false, "Draft subtask: finalize the plan before queuing it.");
|
|
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found or already running.");
|
|
|
|
_waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
|
|
if (await IsDraftChildAsync(ctx, taskId, ct))
|
|
return new TransitionResult(false, "Draft subtask: finalize the plan before running it.");
|
|
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Running)
|
|
.SetProperty(t => t.StartedAt, startedAt), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task already running or not found.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Done)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, result), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not running; cannot complete.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Done);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> SubmitForReviewAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.WaitingForReview)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, result), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not running; cannot submit for review.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> SubmitForChildrenAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.WaitingForChildren)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, result), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not running; cannot submit for children.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> ApproveReviewAsync(string taskId, CancellationToken ct)
|
|
{
|
|
var now = DateTime.UtcNow;
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Done)
|
|
.SetProperty(t => t.FinishedAt, now), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task is not waiting for review; cannot approve.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Done);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> RejectToQueueAsync(string taskId, string feedback, CancellationToken ct)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(feedback))
|
|
return new TransitionResult(false, "Feedback is required to reject for re-run.");
|
|
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Queued)
|
|
.SetProperty(t => t.ReviewFeedback, feedback)
|
|
.SetProperty(t => t.StartedAt, (DateTime?)null)
|
|
.SetProperty(t => t.FinishedAt, (DateTime?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task is not waiting for review; cannot reject.");
|
|
|
|
_waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> RejectToIdleAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Idle)
|
|
.SetProperty(t => t.ReviewFeedback, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task is not waiting for review; cannot park.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> ClearReviewFeedbackAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.ReviewFeedback, (string?)null), ct);
|
|
|
|
return affected == 0
|
|
? new TransitionResult(false, "Task not found.")
|
|
: new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
// Queued is intentional: OverrideSlotService dispatches RunAsync before calling
|
|
// StartRunningAsync, so a preflight failure (list not found, worktree setup) can
|
|
// reach MarkFailed while the task is still Queued in the DB.
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId &&
|
|
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued))
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
|
.SetProperty(t => t.FinishedAt, finishedAt)
|
|
.SetProperty(t => t.Result, error), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not in a failable state (must be Running or Queued).");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Failed);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct)
|
|
{
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
|
{
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId &&
|
|
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued
|
|
|| t.Status == TaskStatus.WaitingForReview
|
|
|| t.Status == TaskStatus.WaitingForChildren))
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Cancelled)
|
|
.SetProperty(t => t.FinishedAt, finishedAt), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not in cancellable state.");
|
|
}
|
|
|
|
await OnChildTerminalAsync(taskId, TaskStatus.Cancelled);
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Idle)
|
|
.SetProperty(t => t.StartedAt, (DateTime?)null)
|
|
.SetProperty(t => t.FinishedAt, (DateTime?)null)
|
|
.SetProperty(t => t.Result, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task is running; cannot reset.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
// Unconditional status write — bypasses transition rules. Used by the UI's
|
|
// "set status freely" affordance; intentionally no guards (caller may strand
|
|
// the runner if used while a task is executing).
|
|
public async Task<TransitionResult> ForceSetStatusAsync(string taskId, TaskStatus status, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, status), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found.");
|
|
|
|
if (status == TaskStatus.Queued) _waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == parentId
|
|
&& t.Status == TaskStatus.Idle
|
|
&& t.PlanningPhase == PlanningPhase.None)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not in plannable state.");
|
|
|
|
await _broadcaster.TaskUpdated(parentId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var hasChildren = await ctx.Tasks.AnyAsync(t => t.ParentTaskId == parentId, ct);
|
|
var newStatus = hasChildren ? TaskStatus.WaitingForChildren : TaskStatus.WaitingForReview;
|
|
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized)
|
|
.SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow)
|
|
.SetProperty(t => t.PlanningSessionToken, (string?)null)
|
|
.SetProperty(t => t.Status, newStatus), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "No active planning session.");
|
|
|
|
await _broadcaster.TaskUpdated(parentId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found.");
|
|
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct)
|
|
{
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
var affected = await ctx.Tasks
|
|
.Where(t => t.Id == taskId)
|
|
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct);
|
|
|
|
if (affected == 0)
|
|
return new TransitionResult(false, "Task not found.");
|
|
|
|
_waker.Wake();
|
|
await _broadcaster.TaskUpdated(taskId);
|
|
return new TransitionResult(true, null);
|
|
}
|
|
|
|
public async Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct)
|
|
{
|
|
var resultText = "[stale] " + reason;
|
|
var now = DateTime.UtcNow;
|
|
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
|
|
return await ctx.Tasks
|
|
.Where(t => t.Status == TaskStatus.Running)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
|
.SetProperty(t => t.FinishedAt, now)
|
|
.SetProperty(t => t.Result, resultText), ct);
|
|
}
|
|
|
|
// A subtask is "draft" only while its planning parent has an open (Active) session.
|
|
// Improvement children whose parent has PlanningPhase.None are not drafts and may be
|
|
// queued freely. Standalone tasks (no parent) are never draft.
|
|
private static async Task<bool> IsDraftChildAsync(ClaudeDoDbContext ctx, string taskId, CancellationToken ct)
|
|
{
|
|
var parentId = await ctx.Tasks.AsNoTracking()
|
|
.Where(t => t.Id == taskId)
|
|
.Select(t => t.ParentTaskId)
|
|
.FirstOrDefaultAsync(ct);
|
|
if (parentId is null) return false;
|
|
|
|
return await ctx.Tasks.AsNoTracking()
|
|
.AnyAsync(p => p.Id == parentId && p.PlanningPhase == PlanningPhase.Active, ct);
|
|
}
|
|
|
|
private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus)
|
|
{
|
|
// Terminal child writes are best-effort and use CancellationToken.None so the
|
|
// task lifecycle is never left partially completed because a caller cancelled.
|
|
string? parentId;
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
|
|
{
|
|
parentId = await ctx.Tasks
|
|
.AsNoTracking()
|
|
.Where(t => t.Id == taskId)
|
|
.Select(t => t.ParentTaskId)
|
|
.FirstOrDefaultAsync(CancellationToken.None);
|
|
}
|
|
if (parentId is null) return;
|
|
|
|
try
|
|
{
|
|
await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId);
|
|
}
|
|
|
|
try
|
|
{
|
|
await TryAdvanceParentAsync(parentId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "TryAdvanceParent failed for {ParentId}", parentId);
|
|
}
|
|
}
|
|
|
|
// Any parent (planning or improvement) sitting in WaitingForChildren surfaces for review
|
|
// once every child is terminal (Done/Failed/Cancelled). A failed or cancelled child does
|
|
// not wedge the parent — it is flagged on the result.
|
|
private async Task TryAdvanceParentAsync(string parentId)
|
|
{
|
|
string? parentResult;
|
|
List<TaskStatus> childStatuses;
|
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
|
|
{
|
|
var parent = await ctx.Tasks.AsNoTracking()
|
|
.FirstOrDefaultAsync(t => t.Id == parentId, CancellationToken.None);
|
|
if (parent is null || parent.Status != TaskStatus.WaitingForChildren) return;
|
|
parentResult = parent.Result;
|
|
childStatuses = await ctx.Tasks
|
|
.Where(t => t.ParentTaskId == parentId)
|
|
.Select(t => t.Status)
|
|
.ToListAsync(CancellationToken.None);
|
|
}
|
|
if (childStatuses.Count == 0) return;
|
|
|
|
bool allTerminal = childStatuses.All(s =>
|
|
s == TaskStatus.Done || s == TaskStatus.Failed || s == TaskStatus.Cancelled);
|
|
if (!allTerminal) return;
|
|
|
|
int failed = childStatuses.Count(s => s == TaskStatus.Failed);
|
|
int cancelled = childStatuses.Count(s => s == TaskStatus.Cancelled);
|
|
var newResult = parentResult;
|
|
if (failed + cancelled > 0)
|
|
{
|
|
var note = $"⚠ Children: {failed} failed, {cancelled} cancelled.";
|
|
newResult = string.IsNullOrWhiteSpace(parentResult) ? note : $"{parentResult}\n\n{note}";
|
|
}
|
|
|
|
await using var writeCtx = await _dbFactory.CreateDbContextAsync(CancellationToken.None);
|
|
await writeCtx.Tasks
|
|
.Where(t => t.Id == parentId && t.Status == TaskStatus.WaitingForChildren)
|
|
.ExecuteUpdateAsync(s => s
|
|
.SetProperty(t => t.Status, TaskStatus.WaitingForReview)
|
|
.SetProperty(t => t.Result, newResult), CancellationToken.None);
|
|
await _broadcaster.TaskUpdated(parentId);
|
|
}
|
|
}
|