From 3142ba203fa55caba4781b34f966ab1a32075c9c Mon Sep 17 00:00:00 2001 From: mika kuns Date: Fri, 24 Apr 2026 18:08:58 +0200 Subject: [PATCH] feat(worker): add PlanningMergeOrchestrator happy path with merge event broadcasts Co-Authored-By: Claude Sonnet 4.6 --- src/ClaudeDo.Worker/Hub/HubBroadcaster.cs | 15 ++ .../Planning/PlanningMergeEvents.cs | 8 + .../Planning/PlanningMergeOrchestrator.cs | 119 ++++++++++++ .../PlanningMergeOrchestratorTests.cs | 170 ++++++++++++++++++ 4 files changed, 312 insertions(+) create mode 100644 src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs create mode 100644 src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Planning/PlanningMergeOrchestratorTests.cs diff --git a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs index 17263ba..e927ef3 100644 --- a/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs +++ b/src/ClaudeDo.Worker/Hub/HubBroadcaster.cs @@ -32,4 +32,19 @@ public sealed class HubBroadcaster public Task WorkerLog(string message, WorkerLogLevel level, DateTime timestampUtc) => _hub.Clients.All.SendAsync("WorkerLog", message, level, timestampUtc); + + public Task PlanningMergeStarted(string planningTaskId, string targetBranch) => + _hub.Clients.All.SendAsync("PlanningMergeStarted", planningTaskId, targetBranch); + + public Task PlanningSubtaskMerged(string planningTaskId, string subtaskId) => + _hub.Clients.All.SendAsync("PlanningSubtaskMerged", planningTaskId, subtaskId); + + public Task PlanningMergeConflict(string planningTaskId, string subtaskId, IReadOnlyList files) => + _hub.Clients.All.SendAsync("PlanningMergeConflict", planningTaskId, subtaskId, files); + + public Task PlanningMergeAborted(string planningTaskId) => + _hub.Clients.All.SendAsync("PlanningMergeAborted", planningTaskId); + + public Task PlanningCompleted(string planningTaskId) => + _hub.Clients.All.SendAsync("PlanningCompleted", planningTaskId); } diff --git a/src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs b/src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs new file mode 100644 index 0000000..566454f --- /dev/null +++ b/src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs @@ -0,0 +1,8 @@ +namespace ClaudeDo.Worker.Planning; + +public sealed record PlanningMergeStarted(string PlanningTaskId, string TargetBranch); +public sealed record PlanningSubtaskMerged(string PlanningTaskId, string SubtaskId); +public sealed record PlanningMergeConflict( + string PlanningTaskId, string SubtaskId, IReadOnlyList ConflictedFiles); +public sealed record PlanningMergeAborted(string PlanningTaskId); +public sealed record PlanningCompleted(string PlanningTaskId); diff --git a/src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs b/src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs new file mode 100644 index 0000000..338d269 --- /dev/null +++ b/src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs @@ -0,0 +1,119 @@ +using System.Collections.Concurrent; +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Services; +using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Planning; + +public sealed class PlanningMergeOrchestrator +{ + private readonly IDbContextFactory _dbFactory; + private readonly TaskMergeService _merge; + private readonly PlanningAggregator _aggregator; + private readonly HubBroadcaster _broadcaster; + private readonly ILogger _logger; + + private sealed class State + { + public required string TargetBranch { get; init; } + public required Queue RemainingSubtaskIds { get; init; } + public string? CurrentSubtaskId { get; set; } + } + + private readonly ConcurrentDictionary _states = new(); + + public PlanningMergeOrchestrator( + IDbContextFactory dbFactory, + TaskMergeService merge, + PlanningAggregator aggregator, + HubBroadcaster broadcaster, + ILogger logger) + { + _dbFactory = dbFactory; + _merge = merge; + _aggregator = aggregator; + _broadcaster = broadcaster; + _logger = logger; + } + + public async Task StartAsync(string planningTaskId, string targetBranch, CancellationToken ct) + { + List children; + using (var ctx = _dbFactory.CreateDbContext()) + { + children = await ctx.Tasks + .Include(t => t.Worktree) + .Where(t => t.ParentTaskId == planningTaskId) + .OrderBy(t => t.SortOrder) + .ToListAsync(ct); + } + + var queue = new Queue( + children + .Where(c => c.Worktree is not null && c.Worktree.State != WorktreeState.Merged) + .Select(c => c.Id)); + + _states[planningTaskId] = new State + { + TargetBranch = targetBranch, + RemainingSubtaskIds = queue, + }; + + await _broadcaster.PlanningMergeStarted(planningTaskId, targetBranch); + await DrainAsync(planningTaskId, ct); + } + + private async Task DrainAsync(string planningTaskId, CancellationToken ct) + { + if (!_states.TryGetValue(planningTaskId, out var state)) return; + + while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId)) + { + state.CurrentSubtaskId = subtaskId; + var result = await _merge.MergeAsync( + subtaskId, + state.TargetBranch, + removeWorktree: true, + commitMessage: "Merge subtask", + leaveConflictsInTree: true, + ct); + + if (result.Status == TaskMergeService.StatusConflict) + { + await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles); + return; + } + + if (result.Status != TaskMergeService.StatusMerged) + { + await _broadcaster.PlanningMergeConflict( + planningTaskId, subtaskId, + new[] { result.ErrorMessage ?? "merge blocked" }); + return; + } + + await _broadcaster.PlanningSubtaskMerged(planningTaskId, subtaskId); + } + + state.CurrentSubtaskId = null; + await FinalizePlanningDoneAsync(planningTaskId, ct); + _states.TryRemove(planningTaskId, out _); + await _broadcaster.PlanningCompleted(planningTaskId); + } + + private async Task FinalizePlanningDoneAsync(string planningTaskId, CancellationToken ct) + { + using var ctx = _dbFactory.CreateDbContext(); + var planning = await ctx.Tasks.SingleOrDefaultAsync(t => t.Id == planningTaskId, ct); + if (planning is null) return; + planning.Status = TaskStatus.Done; + planning.FinishedAt = DateTime.UtcNow; + await ctx.SaveChangesAsync(ct); + + try { await _aggregator.CleanupIntegrationBranchAsync(planningTaskId, ct); } + catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); } + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Planning/PlanningMergeOrchestratorTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/PlanningMergeOrchestratorTests.cs new file mode 100644 index 0000000..317086d --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Planning/PlanningMergeOrchestratorTests.cs @@ -0,0 +1,170 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Git; +using ClaudeDo.Data.Models; +using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Planning; + +file sealed class OrchestratorRecordingHubClients : IHubClients +{ + public OrchestratorRecordingClientProxy Proxy { get; } = new(); + public IClientProxy All => Proxy; + public IClientProxy AllExcept(IReadOnlyList excludedConnectionIds) => Proxy; + public IClientProxy Client(string connectionId) => Proxy; + public IClientProxy Clients(IReadOnlyList connectionIds) => Proxy; + public IClientProxy Group(string groupName) => Proxy; + public IClientProxy GroupExcept(string groupName, IReadOnlyList excludedConnectionIds) => Proxy; + public IClientProxy Groups(IReadOnlyList groupNames) => Proxy; + public IClientProxy User(string userId) => Proxy; + public IClientProxy Users(IReadOnlyList userIds) => Proxy; +} + +file sealed class OrchestratorRecordingClientProxy : IClientProxy +{ + public List<(string Method, object?[] Args)> Calls { get; } = new(); + public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) + { + Calls.Add((method, args)); + return Task.CompletedTask; + } +} + +file sealed class OrchestratorFakeHubContext : IHubContext +{ + public OrchestratorRecordingHubClients RecordingClients { get; } = new(); + public IHubClients Clients => RecordingClients; + public IGroupManager Groups => throw new NotImplementedException(); +} + +public sealed class PlanningMergeOrchestratorTests : IDisposable +{ + private readonly List _dbs = new(); + private readonly List _repos = new(); + private readonly List<(string repoDir, string wtPath)> _wtCleanups = new(); + + private DbFixture NewDb() { var d = new DbFixture(); _dbs.Add(d); return d; } + private GitRepoFixture NewRepo() { var r = new GitRepoFixture(); _repos.Add(r); return r; } + + public void Dispose() + { + foreach (var (repo, wt) in _wtCleanups) + try { GitRepoFixture.RunGit(repo, "worktree", "remove", "--force", wt); } catch { } + foreach (var d in _dbs) try { d.Dispose(); } catch { } + foreach (var r in _repos) try { r.Dispose(); } catch { } + } + + [Fact] + public async Task StartAsync_AllChildrenMergeCleanly_MarksPlanningDoneAndEmitsCompleted() + { + var db = NewDb(); + var repo = NewRepo(); + GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main"); + + var (parentId, subA, subB) = await SeedPlanningWithTwoNonConflictingChildrenAsync(db, repo); + + var (orch, calls) = BuildOrchestrator(db); + + await orch.StartAsync(parentId, "main", CancellationToken.None); + + using var ctx = db.CreateContext(); + var planning = ctx.Tasks.Single(t => t.Id == parentId); + Assert.Equal(TaskStatus.Done, planning.Status); + Assert.NotNull(planning.FinishedAt); + + Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subA).State); + Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subB).State); + + Assert.Contains(calls, c => c.Method == "PlanningMergeStarted"); + Assert.Equal(2, calls.Count(c => c.Method == "PlanningSubtaskMerged")); + Assert.Contains(calls, c => c.Method == "PlanningCompleted" && (string)c.Args[0]! == parentId); + } + + private async Task<(string parentId, string subA, string subB)> SeedPlanningWithTwoNonConflictingChildrenAsync( + DbFixture db, GitRepoFixture repo) + { + using var ctx = db.CreateContext(); + + var listId = Guid.NewGuid().ToString(); + ctx.Lists.Add(new ListEntity + { + Id = listId, Name = "test", CreatedAt = DateTime.UtcNow, + WorkingDir = repo.RepoDir, + }); + + var parentId = Guid.NewGuid().ToString(); + ctx.Tasks.Add(new TaskEntity + { + Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow, + Status = TaskStatus.Planned, SortOrder = 0, + }); + + var subA = Guid.NewGuid().ToString(); + var subB = Guid.NewGuid().ToString(); + ctx.Tasks.Add(new TaskEntity + { + Id = subA, ListId = listId, Title = "child A", CreatedAt = DateTime.UtcNow, + ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 1, + }); + ctx.Tasks.Add(new TaskEntity + { + Id = subB, ListId = listId, Title = "child B", CreatedAt = DateTime.UtcNow, + ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 2, + }); + await ctx.SaveChangesAsync(); + + SeedWorktree(ctx, repo, subA, "fileA.txt", "content A"); + SeedWorktree(ctx, repo, subB, "fileB.txt", "content B"); + await ctx.SaveChangesAsync(); + + return (parentId, subA, subB); + } + + private void SeedWorktree(ClaudeDoDbContext ctx, GitRepoFixture repo, string taskId, string filename, string content) + { + var wtPath = Path.Combine(Path.GetTempPath(), $"wt_{Guid.NewGuid():N}"); + _wtCleanups.Add((repo.RepoDir, wtPath)); + var branch = $"claudedo/{taskId[..8]}"; + GitRepoFixture.RunGit(repo.RepoDir, "worktree", "add", "-b", branch, wtPath, repo.BaseCommit); + File.WriteAllText(Path.Combine(wtPath, filename), content); + GitRepoFixture.RunGit(wtPath, "add", filename); + GitRepoFixture.RunGit(wtPath, "commit", "-m", $"add {filename}"); + var head = GitRepoFixture.RunGit(wtPath, "rev-parse", "HEAD").Trim(); + + ctx.Worktrees.Add(new WorktreeEntity + { + TaskId = taskId, + Path = wtPath, + BranchName = branch, + BaseCommit = repo.BaseCommit, + HeadCommit = head, + DiffStat = null, + State = WorktreeState.Active, + CreatedAt = DateTime.UtcNow, + }); + } + + private (PlanningMergeOrchestrator orch, List<(string Method, object?[] Args)> calls) BuildOrchestrator(DbFixture db) + { + var fakeHub = new OrchestratorFakeHubContext(); + var spy = fakeHub.RecordingClients.Proxy; + var broadcaster = new HubBroadcaster(fakeHub); + var git = new GitService(); + var factory = db.CreateFactory(); + var merge = new TaskMergeService( + factory, git, broadcaster, + NullLogger.Instance); + var aggregator = new PlanningAggregator( + factory, git, + NullLogger.Instance); + var orch = new PlanningMergeOrchestrator( + factory, merge, aggregator, broadcaster, + NullLogger.Instance); + return (orch, spy.Calls); + } +}