Files
ClaudeDo/src/ClaudeDo.Worker/State/TaskStateService.cs
2026-06-04 15:38:15 +02:00

397 lines
16 KiB
C#

using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Queue;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.State;
public sealed class TaskStateService : ITaskStateService
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly HubBroadcaster _broadcaster;
private readonly IQueueWaker _waker;
private readonly PlanningChainCoordinator _chain;
private readonly ILogger<TaskStateService> _logger;
public TaskStateService(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
HubBroadcaster broadcaster,
IQueueWaker waker,
PlanningChainCoordinator chain,
ILogger<TaskStateService> logger)
{
_dbFactory = dbFactory;
_broadcaster = broadcaster;
_waker = waker;
_chain = chain;
_logger = logger;
}
public async Task<TransitionResult> EnqueueAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
if (await IsDraftChildAsync(ctx, taskId, ct))
return new TransitionResult(false, "Draft subtask: finalize the plan before queuing it.");
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Queued), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found or already running.");
_waker.Wake();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> StartRunningAsync(string taskId, DateTime startedAt, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
if (await IsDraftChildAsync(ctx, taskId, ct))
return new TransitionResult(false, "Draft subtask: finalize the plan before running it.");
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Running)
.SetProperty(t => t.StartedAt, startedAt), ct);
if (affected == 0)
return new TransitionResult(false, "Task already running or not found.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> CompleteAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Done)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, result), ct);
if (affected == 0)
return new TransitionResult(false, "Task not running; cannot complete.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Done);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> SubmitForReviewAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.WaitingForReview)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, result), ct);
if (affected == 0)
return new TransitionResult(false, "Task not running; cannot submit for review.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> SubmitForChildrenAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.WaitingForChildren)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, result), ct);
if (affected == 0)
return new TransitionResult(false, "Task not running; cannot submit for children.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> ApproveReviewAsync(string taskId, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Done), ct);
if (affected == 0)
return new TransitionResult(false, "Task is not waiting for review; cannot approve.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Done);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> RejectToQueueAsync(string taskId, string feedback, CancellationToken ct)
{
if (string.IsNullOrWhiteSpace(feedback))
return new TransitionResult(false, "Feedback is required to reject for re-run.");
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Queued)
.SetProperty(t => t.ReviewFeedback, feedback)
.SetProperty(t => t.StartedAt, (DateTime?)null)
.SetProperty(t => t.FinishedAt, (DateTime?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task is not waiting for review; cannot reject.");
_waker.Wake();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> RejectToIdleAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Idle)
.SetProperty(t => t.ReviewFeedback, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task is not waiting for review; cannot park.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> ClearReviewFeedbackAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.ReviewFeedback, (string?)null), ct);
return affected == 0
? new TransitionResult(false, "Task not found.")
: new TransitionResult(true, null);
}
public async Task<TransitionResult> FailAsync(string taskId, DateTime finishedAt, string? error, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Done)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Failed)
.SetProperty(t => t.FinishedAt, finishedAt)
.SetProperty(t => t.Result, error), ct);
if (affected == 0)
return new TransitionResult(false, "Task already done; cannot fail.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Failed);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> CancelAsync(string taskId, DateTime finishedAt, CancellationToken ct)
{
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
{
var affected = await ctx.Tasks
.Where(t => t.Id == taskId &&
(t.Status == TaskStatus.Running || t.Status == TaskStatus.Queued
|| t.Status == TaskStatus.WaitingForReview))
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Cancelled)
.SetProperty(t => t.FinishedAt, finishedAt), ct);
if (affected == 0)
return new TransitionResult(false, "Task not in cancellable state.");
}
await OnChildTerminalAsync(taskId, TaskStatus.Cancelled);
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> ResetToIdleAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId && t.Status != TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Idle)
.SetProperty(t => t.StartedAt, (DateTime?)null)
.SetProperty(t => t.FinishedAt, (DateTime?)null)
.SetProperty(t => t.Result, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task is running; cannot reset.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
// Unconditional status write — bypasses transition rules. Used by the UI's
// "set status freely" affordance; intentionally no guards (caller may strand
// the runner if used while a task is executing).
public async Task<TransitionResult> ForceSetStatusAsync(string taskId, TaskStatus status, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, status), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found.");
if (status == TaskStatus.Queued) _waker.Wake();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> StartPlanningAsync(string parentId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == parentId
&& t.Status == TaskStatus.Idle
&& t.PlanningPhase == PlanningPhase.None)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.PlanningPhase, PlanningPhase.Active), ct);
if (affected == 0)
return new TransitionResult(false, "Task not in plannable state.");
await _broadcaster.TaskUpdated(parentId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> FinalizePlanningAsync(string parentId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == parentId && t.PlanningPhase == PlanningPhase.Active)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.PlanningPhase, PlanningPhase.Finalized)
.SetProperty(t => t.PlanningFinalizedAt, DateTime.UtcNow)
.SetProperty(t => t.PlanningSessionToken, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "No active planning session.");
await _broadcaster.TaskUpdated(parentId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> BlockOnAsync(string taskId, string predecessorTaskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, predecessorTaskId), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found.");
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<TransitionResult> UnblockAsync(string taskId, CancellationToken ct)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var affected = await ctx.Tasks
.Where(t => t.Id == taskId)
.ExecuteUpdateAsync(s => s.SetProperty(t => t.BlockedByTaskId, (string?)null), ct);
if (affected == 0)
return new TransitionResult(false, "Task not found.");
_waker.Wake();
await _broadcaster.TaskUpdated(taskId);
return new TransitionResult(true, null);
}
public async Task<int> RecoverStaleRunningAsync(string reason, CancellationToken ct)
{
var resultText = "[stale] " + reason;
var now = DateTime.UtcNow;
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
return await ctx.Tasks
.Where(t => t.Status == TaskStatus.Running)
.ExecuteUpdateAsync(s => s
.SetProperty(t => t.Status, TaskStatus.Failed)
.SetProperty(t => t.FinishedAt, now)
.SetProperty(t => t.Result, resultText), ct);
}
// A subtask is "draft" until its planning parent is finalized. Draft subtasks must not be
// queued or run by any path (UI, queue, RunNow, MCP). Standalone tasks are never draft.
private static async Task<bool> IsDraftChildAsync(ClaudeDoDbContext ctx, string taskId, CancellationToken ct)
{
var parentId = await ctx.Tasks.AsNoTracking()
.Where(t => t.Id == taskId)
.Select(t => t.ParentTaskId)
.FirstOrDefaultAsync(ct);
if (parentId is null) return false;
var parentFinalized = await ctx.Tasks.AsNoTracking()
.AnyAsync(p => p.Id == parentId && p.PlanningPhase == PlanningPhase.Finalized, ct);
return !parentFinalized;
}
private async Task OnChildTerminalAsync(string taskId, TaskStatus finalStatus)
{
// Terminal child writes are best-effort and use CancellationToken.None so the
// task lifecycle is never left partially completed because a caller cancelled.
string? parentId;
await using (var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None))
{
parentId = await ctx.Tasks
.AsNoTracking()
.Where(t => t.Id == taskId)
.Select(t => t.ParentTaskId)
.FirstOrDefaultAsync(CancellationToken.None);
}
if (parentId is null) return;
try
{
await _chain.OnChildFinishedAsync(taskId, finalStatus, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", taskId);
}
try
{
await using var ctx = await _dbFactory.CreateDbContextAsync(CancellationToken.None);
await new TaskRepository(ctx).TryCompleteParentAsync(parentId, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "TryCompleteParent failed for {ParentId}", parentId);
}
}
}