feat(worker): add pre-flight checks and idempotent restart to PlanningMergeOrchestrator
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using ClaudeDo.Data;
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Git;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using ClaudeDo.Worker.Hub;
|
using ClaudeDo.Worker.Hub;
|
||||||
using ClaudeDo.Worker.Services;
|
using ClaudeDo.Worker.Services;
|
||||||
@@ -14,6 +15,7 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
private readonly TaskMergeService _merge;
|
private readonly TaskMergeService _merge;
|
||||||
private readonly PlanningAggregator _aggregator;
|
private readonly PlanningAggregator _aggregator;
|
||||||
private readonly HubBroadcaster _broadcaster;
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly GitService _git;
|
||||||
private readonly ILogger<PlanningMergeOrchestrator> _logger;
|
private readonly ILogger<PlanningMergeOrchestrator> _logger;
|
||||||
|
|
||||||
private sealed class State
|
private sealed class State
|
||||||
@@ -30,30 +32,53 @@ public sealed class PlanningMergeOrchestrator
|
|||||||
TaskMergeService merge,
|
TaskMergeService merge,
|
||||||
PlanningAggregator aggregator,
|
PlanningAggregator aggregator,
|
||||||
HubBroadcaster broadcaster,
|
HubBroadcaster broadcaster,
|
||||||
|
GitService git,
|
||||||
ILogger<PlanningMergeOrchestrator> logger)
|
ILogger<PlanningMergeOrchestrator> logger)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_merge = merge;
|
_merge = merge;
|
||||||
_aggregator = aggregator;
|
_aggregator = aggregator;
|
||||||
_broadcaster = broadcaster;
|
_broadcaster = broadcaster;
|
||||||
|
_git = git;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task StartAsync(string planningTaskId, string targetBranch, CancellationToken ct)
|
public async Task StartAsync(string planningTaskId, string targetBranch, CancellationToken ct)
|
||||||
{
|
{
|
||||||
|
string workingDir;
|
||||||
List<TaskEntity> children;
|
List<TaskEntity> children;
|
||||||
|
|
||||||
using (var ctx = _dbFactory.CreateDbContext())
|
using (var ctx = _dbFactory.CreateDbContext())
|
||||||
{
|
{
|
||||||
children = await ctx.Tasks
|
var planning = await ctx.Tasks
|
||||||
.Include(t => t.Worktree)
|
.Include(t => t.List)
|
||||||
.Where(t => t.ParentTaskId == planningTaskId)
|
.Include(t => t.Children).ThenInclude(c => c.Worktree)
|
||||||
.OrderBy(t => t.SortOrder)
|
.SingleOrDefaultAsync(t => t.Id == planningTaskId, ct)
|
||||||
.ToListAsync(ct);
|
?? throw new KeyNotFoundException($"Planning task '{planningTaskId}' not found.");
|
||||||
|
workingDir = planning.List.WorkingDir
|
||||||
|
?? throw new InvalidOperationException("List has no working directory.");
|
||||||
|
children = planning.Children.OrderBy(c => c.SortOrder).ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foreach (var c in children)
|
||||||
|
{
|
||||||
|
if (c.Status != TaskStatus.Done)
|
||||||
|
throw new InvalidOperationException($"subtask {c.Id} is not Done (status {c.Status})");
|
||||||
|
if (c.Worktree is null)
|
||||||
|
throw new InvalidOperationException($"subtask {c.Id} has no worktree");
|
||||||
|
if (c.Worktree.State != WorktreeState.Active && c.Worktree.State != WorktreeState.Merged)
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"subtask {c.Id} worktree state is {c.Worktree.State}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (await _git.IsMidMergeAsync(workingDir, ct))
|
||||||
|
throw new InvalidOperationException("repo is mid-merge");
|
||||||
|
if (await _git.HasChangesAsync(workingDir, ct))
|
||||||
|
throw new InvalidOperationException("working tree has uncommitted changes");
|
||||||
|
|
||||||
var queue = new Queue<string>(
|
var queue = new Queue<string>(
|
||||||
children
|
children
|
||||||
.Where(c => c.Worktree is not null && c.Worktree.State != WorktreeState.Merged)
|
.Where(c => c.Worktree!.State == WorktreeState.Active)
|
||||||
.Select(c => c.Id));
|
.Select(c => c.Id));
|
||||||
|
|
||||||
var state = new State { TargetBranch = targetBranch, RemainingSubtaskIds = queue };
|
var state = new State { TargetBranch = targetBranch, RemainingSubtaskIds = queue };
|
||||||
|
|||||||
@@ -259,8 +259,96 @@ public sealed class PlanningMergeOrchestratorTests : IDisposable
|
|||||||
factory, git,
|
factory, git,
|
||||||
NullLogger<PlanningAggregator>.Instance);
|
NullLogger<PlanningAggregator>.Instance);
|
||||||
var orch = new PlanningMergeOrchestrator(
|
var orch = new PlanningMergeOrchestrator(
|
||||||
factory, merge, aggregator, broadcaster,
|
factory, merge, aggregator, broadcaster, git,
|
||||||
NullLogger<PlanningMergeOrchestrator>.Instance);
|
NullLogger<PlanningMergeOrchestrator>.Instance);
|
||||||
return (orch, spy.Calls);
|
return (orch, spy.Calls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_SubtaskStillRunning_ThrowsWithoutSideEffects()
|
||||||
|
{
|
||||||
|
var db = NewDb();
|
||||||
|
var repo = NewRepo();
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main");
|
||||||
|
|
||||||
|
var (parentId, runningSub) = await SeedPlanningWithOneRunningChildAsync(db, repo);
|
||||||
|
|
||||||
|
var (orch, spy) = BuildOrchestrator(db);
|
||||||
|
|
||||||
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => orch.StartAsync(parentId, "main", CancellationToken.None));
|
||||||
|
Assert.Contains(runningSub, ex.Message);
|
||||||
|
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
Assert.Equal(TaskStatus.Planned, ctx.Tasks.Single(t => t.Id == parentId).Status);
|
||||||
|
Assert.Empty(spy);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_DirtyRepo_ThrowsWithoutSideEffects()
|
||||||
|
{
|
||||||
|
var db = NewDb();
|
||||||
|
var repo = NewRepo();
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main");
|
||||||
|
var (parentId, _, _) = await SeedPlanningWithTwoNonConflictingChildrenAsync(db, repo);
|
||||||
|
|
||||||
|
File.WriteAllText(Path.Combine(repo.RepoDir, "dirty.txt"), "unstaged\n");
|
||||||
|
|
||||||
|
var (orch, _) = BuildOrchestrator(db);
|
||||||
|
|
||||||
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => orch.StartAsync(parentId, "main", CancellationToken.None));
|
||||||
|
Assert.Contains("uncommitted", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_IdempotentRestart_SkipsAlreadyMergedWorktrees()
|
||||||
|
{
|
||||||
|
var db = NewDb();
|
||||||
|
var repo = NewRepo();
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main");
|
||||||
|
|
||||||
|
var (parentId, subA, subB) = await SeedPlanningWithTwoNonConflictingChildrenAsync(db, repo);
|
||||||
|
using (var setup = db.CreateContext())
|
||||||
|
{
|
||||||
|
var wt = setup.Worktrees.Single(w => w.TaskId == subA);
|
||||||
|
wt.State = WorktreeState.Merged;
|
||||||
|
await setup.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
var (orch, spy) = BuildOrchestrator(db);
|
||||||
|
await orch.StartAsync(parentId, "main", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.DoesNotContain(spy, c => c.Method == "PlanningSubtaskMerged" && (string)c.Args[1]! == subA);
|
||||||
|
Assert.Contains(spy, c => c.Method == "PlanningSubtaskMerged" && (string)c.Args[1]! == subB);
|
||||||
|
Assert.Contains(spy, c => c.Method == "PlanningCompleted");
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<(string parentId, string runningChild)> SeedPlanningWithOneRunningChildAsync(
|
||||||
|
DbFixture db, GitRepoFixture repo)
|
||||||
|
{
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Lists.Add(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId, Name = "test", CreatedAt = DateTime.UtcNow, WorkingDir = repo.RepoDir,
|
||||||
|
});
|
||||||
|
var parentId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
||||||
|
Status = TaskStatus.Planned, SortOrder = 0,
|
||||||
|
});
|
||||||
|
var running = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = running, ListId = listId, Title = "still running",
|
||||||
|
CreatedAt = DateTime.UtcNow, ParentTaskId = parentId,
|
||||||
|
Status = TaskStatus.Running, SortOrder = 1,
|
||||||
|
});
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
SeedWorktreeWithFile(ctx, repo, running, "fileR.txt", "R\n");
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
return (parentId, running);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user