fix(worker): route FinalizeParentDoneAsync through TaskStateService
Replaces the direct EF Status write in PlanningMergeOrchestrator with _state.ApproveReviewAsync, enforcing the TaskStateService invariant as sole owner of Status writes. Handles the improvement-parent path where TaskMergeService already approved the parent's own worktree during the drain (status == Done on entry → still success). If the parent was concurrently cancelled, the transition guard rejects the approve, PlanningCompleted is not broadcast, and the cancelled status is preserved. ApproveReviewAsync now also sets FinishedAt. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ using ClaudeDo.Data.Git;
|
|||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Lifecycle;
|
using ClaudeDo.Worker.Lifecycle;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
@@ -16,6 +17,7 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
private readonly PlanningAggregator _aggregator;
|
private readonly PlanningAggregator _aggregator;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
private readonly GitService _git;
|
private readonly GitService _git;
|
||||||
|
private readonly ITaskStateService _state;
|
||||||
private readonly ILogger<PlanningMergeOrchestrator> _logger;
|
private readonly ILogger<PlanningMergeOrchestrator> _logger;
|
||||||
|
|
||||||
private sealed class State
|
private sealed class State
|
||||||
@@ -34,6 +36,7 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
PlanningAggregator aggregator,
|
PlanningAggregator aggregator,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
GitService git,
|
GitService git,
|
||||||
|
ITaskStateService state,
|
||||||
ILogger<PlanningMergeOrchestrator> logger)
|
ILogger<PlanningMergeOrchestrator> logger)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
@@ -41,6 +44,7 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
_aggregator = aggregator;
|
_aggregator = aggregator;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
_git = git;
|
_git = git;
|
||||||
|
_state = state;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,8 +185,9 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
}
|
}
|
||||||
|
|
||||||
state.CurrentSubtaskId = null;
|
state.CurrentSubtaskId = null;
|
||||||
await FinalizeParentDoneAsync(planningTaskId, state.IsPlanning, ct);
|
var finalized = await FinalizeParentDoneAsync(planningTaskId, state.IsPlanning, ct);
|
||||||
await _broadcaster.PlanningCompleted(planningTaskId);
|
if (finalized)
|
||||||
|
await _broadcaster.PlanningCompleted(planningTaskId);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -190,18 +195,30 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task FinalizeParentDoneAsync(string parentTaskId, bool isPlanning, CancellationToken ct)
|
private async Task<bool> FinalizeParentDoneAsync(string parentTaskId, bool isPlanning, CancellationToken ct)
|
||||||
{
|
{
|
||||||
using var ctx = _dbFactory.CreateDbContext();
|
var result = await _state.ApproveReviewAsync(parentTaskId, ct);
|
||||||
var parent = await ctx.Tasks.SingleOrDefaultAsync(t => t.Id == parentTaskId, ct);
|
if (!result.Ok)
|
||||||
if (parent is null) return;
|
{
|
||||||
parent.Status = TaskStatus.Done;
|
// ApproveReviewAsync requires WaitingForReview. For improvement parents whose own
|
||||||
parent.FinishedAt = DateTime.UtcNow;
|
// worktree is in the merge queue, TaskMergeService.ApproveIfWaitingForReviewAsync
|
||||||
await ctx.SaveChangesAsync(ct);
|
// already approved the parent during the drain — check for that expected path.
|
||||||
|
await using var ctx = _dbFactory.CreateDbContext();
|
||||||
|
var current = await ctx.Tasks
|
||||||
|
.Where(t => t.Id == parentTaskId)
|
||||||
|
.Select(t => (TaskStatus?)t.Status)
|
||||||
|
.FirstOrDefaultAsync(ct);
|
||||||
|
|
||||||
// Surface the Done transition to the UI. Without this the parent row stays
|
if (current != TaskStatus.Done)
|
||||||
// visibly stuck in WaitingForReview even though the unit merge completed.
|
{
|
||||||
await _broadcaster.TaskUpdated(parentTaskId);
|
// Parent was cancelled or moved to an unexpected state during the merge drain.
|
||||||
|
// Do not overwrite — the external transition takes precedence.
|
||||||
|
_logger.LogWarning(
|
||||||
|
"Unit-merge drain completed but parent {ParentTaskId} could not be finalized (status: {Status}): {Reason}",
|
||||||
|
parentTaskId, current, result.Reason);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Only planning builds an integration branch via the aggregator; skip cleanup otherwise.
|
// Only planning builds an integration branch via the aggregator; skip cleanup otherwise.
|
||||||
if (isPlanning)
|
if (isPlanning)
|
||||||
@@ -209,5 +226,7 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
try { await _aggregator.CleanupIntegrationBranchAsync(parentTaskId, ct); }
|
try { await _aggregator.CleanupIntegrationBranchAsync(parentTaskId, ct); }
|
||||||
catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); }
|
catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -126,11 +126,14 @@ public sealed class TaskStateService : ITaskStateService
|
|||||||
|
|
||||||
public async Task<TransitionResult> ApproveReviewAsync(string taskId, CancellationToken ct)
|
public async Task<TransitionResult> ApproveReviewAsync(string taskId, CancellationToken ct)
|
||||||
{
|
{
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
await using (var ctx = await _dbFactory.CreateDbContextAsync(ct))
|
||||||
{
|
{
|
||||||
var affected = await ctx.Tasks
|
var affected = await ctx.Tasks
|
||||||
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
|
.Where(t => t.Id == taskId && t.Status == TaskStatus.WaitingForReview)
|
||||||
.ExecuteUpdateAsync(s => s.SetProperty(t => t.Status, TaskStatus.Done), ct);
|
.ExecuteUpdateAsync(s => s
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Done)
|
||||||
|
.SetProperty(t => t.FinishedAt, now), ct);
|
||||||
|
|
||||||
if (affected == 0)
|
if (affected == 0)
|
||||||
return new TransitionResult(false, "Task is not waiting for review; cannot approve.");
|
return new TransitionResult(false, "Task is not waiting for review; cannot approve.");
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ public sealed class PlanningMergeOrchestratorTests : IDisposable
|
|||||||
ctx.Tasks.Add(new TaskEntity
|
ctx.Tasks.Add(new TaskEntity
|
||||||
{
|
{
|
||||||
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
||||||
Status = TaskStatus.Idle, PlanningPhase = PlanningPhase.Finalized, SortOrder = 0,
|
Status = TaskStatus.WaitingForReview, PlanningPhase = PlanningPhase.Finalized, SortOrder = 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
var subA = Guid.NewGuid().ToString();
|
var subA = Guid.NewGuid().ToString();
|
||||||
@@ -169,7 +169,7 @@ public sealed class PlanningMergeOrchestratorTests : IDisposable
|
|||||||
ctx.Tasks.Add(new TaskEntity
|
ctx.Tasks.Add(new TaskEntity
|
||||||
{
|
{
|
||||||
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
||||||
Status = TaskStatus.Idle, PlanningPhase = PlanningPhase.Finalized, SortOrder = 0,
|
Status = TaskStatus.WaitingForReview, PlanningPhase = PlanningPhase.Finalized, SortOrder = 0,
|
||||||
});
|
});
|
||||||
var subA = Guid.NewGuid().ToString();
|
var subA = Guid.NewGuid().ToString();
|
||||||
var subB = Guid.NewGuid().ToString();
|
var subB = Guid.NewGuid().ToString();
|
||||||
@@ -252,15 +252,17 @@ public sealed class PlanningMergeOrchestratorTests : IDisposable
|
|||||||
var broadcaster = new HubBroadcaster(fakeHub);
|
var broadcaster = new HubBroadcaster(fakeHub);
|
||||||
var git = new GitService();
|
var git = new GitService();
|
||||||
var factory = db.CreateFactory();
|
var factory = db.CreateFactory();
|
||||||
|
var built = TaskStateServiceBuilder.Build(factory);
|
||||||
var merge = new TaskMergeService(
|
var merge = new TaskMergeService(
|
||||||
factory, git, broadcaster,
|
factory, git, broadcaster,
|
||||||
TaskStateServiceBuilder.Build(factory).State,
|
built.State,
|
||||||
NullLogger<TaskMergeService>.Instance);
|
NullLogger<TaskMergeService>.Instance);
|
||||||
var aggregator = new PlanningAggregator(
|
var aggregator = new PlanningAggregator(
|
||||||
factory, git,
|
factory, git,
|
||||||
NullLogger<PlanningAggregator>.Instance);
|
NullLogger<PlanningAggregator>.Instance);
|
||||||
var orch = new PlanningMergeOrchestrator(
|
var orch = new PlanningMergeOrchestrator(
|
||||||
factory, merge, aggregator, broadcaster, git,
|
factory, merge, aggregator, broadcaster, git,
|
||||||
|
built.State,
|
||||||
NullLogger<PlanningMergeOrchestrator>.Instance);
|
NullLogger<PlanningMergeOrchestrator>.Instance);
|
||||||
return (orch, spy.Calls);
|
return (orch, spy.Calls);
|
||||||
}
|
}
|
||||||
@@ -426,4 +428,72 @@ public sealed class PlanningMergeOrchestratorTests : IDisposable
|
|||||||
|
|
||||||
return (parentId, subA, subB);
|
return (parentId, subA, subB);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Parent is Cancelled before the orchestrator finalizes (simulates a race where the user
|
||||||
|
/// cancels the parent while the merge drain is in progress). After the drain completes,
|
||||||
|
/// ApproveReviewAsync sees Status != WaitingForReview and refuses — parent must stay
|
||||||
|
/// Cancelled and PlanningCompleted must not be broadcast.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_ParentCancelledBeforeFinalize_StatusRemainsAndNoPlanningCompleted()
|
||||||
|
{
|
||||||
|
var db = NewDb();
|
||||||
|
var repo = NewRepo();
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main");
|
||||||
|
|
||||||
|
// Improvement parent (PlanningPhase.None) seeded as Cancelled — simulates the race
|
||||||
|
// where a user or another thread cancelled the parent during the merge drain.
|
||||||
|
var (parentId, subA, subB) = await SeedCancelledParentWithDoneChildrenAsync(db, repo);
|
||||||
|
|
||||||
|
var (orch, calls) = BuildOrchestrator(db);
|
||||||
|
await orch.StartAsync(parentId, "main", CancellationToken.None);
|
||||||
|
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
Assert.Equal(TaskStatus.Cancelled, ctx.Tasks.Single(t => t.Id == parentId).Status);
|
||||||
|
Assert.DoesNotContain(calls, c => c.Method == "PlanningCompleted");
|
||||||
|
// Child worktrees were still merged during the drain
|
||||||
|
Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subA).State);
|
||||||
|
Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subB).State);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<(string parentId, string subA, string subB)> SeedCancelledParentWithDoneChildrenAsync(
|
||||||
|
DbFixture db, GitRepoFixture repo)
|
||||||
|
{
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Lists.Add(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId, Name = "test", CreatedAt = DateTime.UtcNow,
|
||||||
|
WorkingDir = repo.RepoDir,
|
||||||
|
});
|
||||||
|
|
||||||
|
var parentId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = parentId, ListId = listId, Title = "improve", CreatedAt = DateTime.UtcNow,
|
||||||
|
Status = TaskStatus.Cancelled, PlanningPhase = PlanningPhase.None, SortOrder = 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
var subA = Guid.NewGuid().ToString();
|
||||||
|
var subB = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = subA, ListId = listId, Title = "child A", CreatedAt = DateTime.UtcNow,
|
||||||
|
ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 1,
|
||||||
|
});
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = subB, ListId = listId, Title = "child B", CreatedAt = DateTime.UtcNow,
|
||||||
|
ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 2,
|
||||||
|
});
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
|
||||||
|
SeedWorktree(ctx, repo, subA, "fileA.txt", "content A");
|
||||||
|
SeedWorktree(ctx, repo, subB, "fileB.txt", "content B");
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
|
||||||
|
return (parentId, subA, subB);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -126,15 +126,17 @@ public sealed class TreeMergeTests : IDisposable
|
|||||||
var broadcaster = new HubBroadcaster(fakeHub);
|
var broadcaster = new HubBroadcaster(fakeHub);
|
||||||
var git = new GitService();
|
var git = new GitService();
|
||||||
var factory = db.CreateFactory();
|
var factory = db.CreateFactory();
|
||||||
|
var built = TaskStateServiceBuilder.Build(factory);
|
||||||
var merge = new TaskMergeService(
|
var merge = new TaskMergeService(
|
||||||
factory, git, broadcaster,
|
factory, git, broadcaster,
|
||||||
TaskStateServiceBuilder.Build(factory).State,
|
built.State,
|
||||||
NullLogger<TaskMergeService>.Instance);
|
NullLogger<TaskMergeService>.Instance);
|
||||||
var aggregator = new PlanningAggregator(
|
var aggregator = new PlanningAggregator(
|
||||||
factory, git,
|
factory, git,
|
||||||
NullLogger<PlanningAggregator>.Instance);
|
NullLogger<PlanningAggregator>.Instance);
|
||||||
var orch = new PlanningMergeOrchestrator(
|
var orch = new PlanningMergeOrchestrator(
|
||||||
factory, merge, aggregator, broadcaster, git,
|
factory, merge, aggregator, broadcaster, git,
|
||||||
|
built.State,
|
||||||
NullLogger<PlanningMergeOrchestrator>.Instance);
|
NullLogger<PlanningMergeOrchestrator>.Instance);
|
||||||
return (orch, spy.Calls);
|
return (orch, spy.Calls);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user