feat(worker): add PlanningMergeOrchestrator happy path with merge event broadcasts
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -32,4 +32,19 @@ public sealed class HubBroadcaster
|
|||||||
|
|
||||||
public Task WorkerLog(string message, WorkerLogLevel level, DateTime timestampUtc) =>
|
public Task WorkerLog(string message, WorkerLogLevel level, DateTime timestampUtc) =>
|
||||||
_hub.Clients.All.SendAsync("WorkerLog", message, level, timestampUtc);
|
_hub.Clients.All.SendAsync("WorkerLog", message, level, timestampUtc);
|
||||||
|
|
||||||
|
public Task PlanningMergeStarted(string planningTaskId, string targetBranch) =>
|
||||||
|
_hub.Clients.All.SendAsync("PlanningMergeStarted", planningTaskId, targetBranch);
|
||||||
|
|
||||||
|
public Task PlanningSubtaskMerged(string planningTaskId, string subtaskId) =>
|
||||||
|
_hub.Clients.All.SendAsync("PlanningSubtaskMerged", planningTaskId, subtaskId);
|
||||||
|
|
||||||
|
public Task PlanningMergeConflict(string planningTaskId, string subtaskId, IReadOnlyList<string> files) =>
|
||||||
|
_hub.Clients.All.SendAsync("PlanningMergeConflict", planningTaskId, subtaskId, files);
|
||||||
|
|
||||||
|
public Task PlanningMergeAborted(string planningTaskId) =>
|
||||||
|
_hub.Clients.All.SendAsync("PlanningMergeAborted", planningTaskId);
|
||||||
|
|
||||||
|
public Task PlanningCompleted(string planningTaskId) =>
|
||||||
|
_hub.Clients.All.SendAsync("PlanningCompleted", planningTaskId);
|
||||||
}
|
}
|
||||||
|
|||||||
8
src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs
Normal file
8
src/ClaudeDo.Worker/Planning/PlanningMergeEvents.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace ClaudeDo.Worker.Planning;
|
||||||
|
|
||||||
|
public sealed record PlanningMergeStarted(string PlanningTaskId, string TargetBranch);
|
||||||
|
public sealed record PlanningSubtaskMerged(string PlanningTaskId, string SubtaskId);
|
||||||
|
public sealed record PlanningMergeConflict(
|
||||||
|
string PlanningTaskId, string SubtaskId, IReadOnlyList<string> ConflictedFiles);
|
||||||
|
public sealed record PlanningMergeAborted(string PlanningTaskId);
|
||||||
|
public sealed record PlanningCompleted(string PlanningTaskId);
|
||||||
119
src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs
Normal file
119
src/ClaudeDo.Worker/Planning/PlanningMergeOrchestrator.cs
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Services;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Planning;
|
||||||
|
|
||||||
|
public sealed class PlanningMergeOrchestrator
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
|
private readonly TaskMergeService _merge;
|
||||||
|
private readonly PlanningAggregator _aggregator;
|
||||||
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
private readonly ILogger<PlanningMergeOrchestrator> _logger;
|
||||||
|
|
||||||
|
private sealed class State
|
||||||
|
{
|
||||||
|
public required string TargetBranch { get; init; }
|
||||||
|
public required Queue<string> RemainingSubtaskIds { get; init; }
|
||||||
|
public string? CurrentSubtaskId { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly ConcurrentDictionary<string, State> _states = new();
|
||||||
|
|
||||||
|
public PlanningMergeOrchestrator(
|
||||||
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
|
TaskMergeService merge,
|
||||||
|
PlanningAggregator aggregator,
|
||||||
|
HubBroadcaster broadcaster,
|
||||||
|
ILogger<PlanningMergeOrchestrator> logger)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_merge = merge;
|
||||||
|
_aggregator = aggregator;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task StartAsync(string planningTaskId, string targetBranch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
List<TaskEntity> children;
|
||||||
|
using (var ctx = _dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
children = await ctx.Tasks
|
||||||
|
.Include(t => t.Worktree)
|
||||||
|
.Where(t => t.ParentTaskId == planningTaskId)
|
||||||
|
.OrderBy(t => t.SortOrder)
|
||||||
|
.ToListAsync(ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
var queue = new Queue<string>(
|
||||||
|
children
|
||||||
|
.Where(c => c.Worktree is not null && c.Worktree.State != WorktreeState.Merged)
|
||||||
|
.Select(c => c.Id));
|
||||||
|
|
||||||
|
_states[planningTaskId] = new State
|
||||||
|
{
|
||||||
|
TargetBranch = targetBranch,
|
||||||
|
RemainingSubtaskIds = queue,
|
||||||
|
};
|
||||||
|
|
||||||
|
await _broadcaster.PlanningMergeStarted(planningTaskId, targetBranch);
|
||||||
|
await DrainAsync(planningTaskId, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task DrainAsync(string planningTaskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (!_states.TryGetValue(planningTaskId, out var state)) return;
|
||||||
|
|
||||||
|
while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId))
|
||||||
|
{
|
||||||
|
state.CurrentSubtaskId = subtaskId;
|
||||||
|
var result = await _merge.MergeAsync(
|
||||||
|
subtaskId,
|
||||||
|
state.TargetBranch,
|
||||||
|
removeWorktree: true,
|
||||||
|
commitMessage: "Merge subtask",
|
||||||
|
leaveConflictsInTree: true,
|
||||||
|
ct);
|
||||||
|
|
||||||
|
if (result.Status == TaskMergeService.StatusConflict)
|
||||||
|
{
|
||||||
|
await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.Status != TaskMergeService.StatusMerged)
|
||||||
|
{
|
||||||
|
await _broadcaster.PlanningMergeConflict(
|
||||||
|
planningTaskId, subtaskId,
|
||||||
|
new[] { result.ErrorMessage ?? "merge blocked" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await _broadcaster.PlanningSubtaskMerged(planningTaskId, subtaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
state.CurrentSubtaskId = null;
|
||||||
|
await FinalizePlanningDoneAsync(planningTaskId, ct);
|
||||||
|
_states.TryRemove(planningTaskId, out _);
|
||||||
|
await _broadcaster.PlanningCompleted(planningTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task FinalizePlanningDoneAsync(string planningTaskId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
using var ctx = _dbFactory.CreateDbContext();
|
||||||
|
var planning = await ctx.Tasks.SingleOrDefaultAsync(t => t.Id == planningTaskId, ct);
|
||||||
|
if (planning is null) return;
|
||||||
|
planning.Status = TaskStatus.Done;
|
||||||
|
planning.FinishedAt = DateTime.UtcNow;
|
||||||
|
await ctx.SaveChangesAsync(ct);
|
||||||
|
|
||||||
|
try { await _aggregator.CleanupIntegrationBranchAsync(planningTaskId, ct); }
|
||||||
|
catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,170 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
|
using ClaudeDo.Data.Git;
|
||||||
|
using ClaudeDo.Data.Models;
|
||||||
|
using ClaudeDo.Worker.Hub;
|
||||||
|
using ClaudeDo.Worker.Planning;
|
||||||
|
using ClaudeDo.Worker.Services;
|
||||||
|
using ClaudeDo.Worker.Tests.Infrastructure;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Tests.Planning;
|
||||||
|
|
||||||
|
file sealed class OrchestratorRecordingHubClients : IHubClients
|
||||||
|
{
|
||||||
|
public OrchestratorRecordingClientProxy Proxy { get; } = new();
|
||||||
|
public IClientProxy All => Proxy;
|
||||||
|
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds) => Proxy;
|
||||||
|
public IClientProxy Client(string connectionId) => Proxy;
|
||||||
|
public IClientProxy Clients(IReadOnlyList<string> connectionIds) => Proxy;
|
||||||
|
public IClientProxy Group(string groupName) => Proxy;
|
||||||
|
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds) => Proxy;
|
||||||
|
public IClientProxy Groups(IReadOnlyList<string> groupNames) => Proxy;
|
||||||
|
public IClientProxy User(string userId) => Proxy;
|
||||||
|
public IClientProxy Users(IReadOnlyList<string> userIds) => Proxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
file sealed class OrchestratorRecordingClientProxy : IClientProxy
|
||||||
|
{
|
||||||
|
public List<(string Method, object?[] Args)> Calls { get; } = new();
|
||||||
|
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
Calls.Add((method, args));
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
file sealed class OrchestratorFakeHubContext : IHubContext<WorkerHub>
|
||||||
|
{
|
||||||
|
public OrchestratorRecordingHubClients RecordingClients { get; } = new();
|
||||||
|
public IHubClients Clients => RecordingClients;
|
||||||
|
public IGroupManager Groups => throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class PlanningMergeOrchestratorTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly List<DbFixture> _dbs = new();
|
||||||
|
private readonly List<GitRepoFixture> _repos = new();
|
||||||
|
private readonly List<(string repoDir, string wtPath)> _wtCleanups = new();
|
||||||
|
|
||||||
|
private DbFixture NewDb() { var d = new DbFixture(); _dbs.Add(d); return d; }
|
||||||
|
private GitRepoFixture NewRepo() { var r = new GitRepoFixture(); _repos.Add(r); return r; }
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
foreach (var (repo, wt) in _wtCleanups)
|
||||||
|
try { GitRepoFixture.RunGit(repo, "worktree", "remove", "--force", wt); } catch { }
|
||||||
|
foreach (var d in _dbs) try { d.Dispose(); } catch { }
|
||||||
|
foreach (var r in _repos) try { r.Dispose(); } catch { }
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_AllChildrenMergeCleanly_MarksPlanningDoneAndEmitsCompleted()
|
||||||
|
{
|
||||||
|
var db = NewDb();
|
||||||
|
var repo = NewRepo();
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "branch", "-m", "main");
|
||||||
|
|
||||||
|
var (parentId, subA, subB) = await SeedPlanningWithTwoNonConflictingChildrenAsync(db, repo);
|
||||||
|
|
||||||
|
var (orch, calls) = BuildOrchestrator(db);
|
||||||
|
|
||||||
|
await orch.StartAsync(parentId, "main", CancellationToken.None);
|
||||||
|
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
var planning = ctx.Tasks.Single(t => t.Id == parentId);
|
||||||
|
Assert.Equal(TaskStatus.Done, planning.Status);
|
||||||
|
Assert.NotNull(planning.FinishedAt);
|
||||||
|
|
||||||
|
Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subA).State);
|
||||||
|
Assert.Equal(WorktreeState.Merged, ctx.Worktrees.Single(w => w.TaskId == subB).State);
|
||||||
|
|
||||||
|
Assert.Contains(calls, c => c.Method == "PlanningMergeStarted");
|
||||||
|
Assert.Equal(2, calls.Count(c => c.Method == "PlanningSubtaskMerged"));
|
||||||
|
Assert.Contains(calls, c => c.Method == "PlanningCompleted" && (string)c.Args[0]! == parentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<(string parentId, string subA, string subB)> SeedPlanningWithTwoNonConflictingChildrenAsync(
|
||||||
|
DbFixture db, GitRepoFixture repo)
|
||||||
|
{
|
||||||
|
using var ctx = db.CreateContext();
|
||||||
|
|
||||||
|
var listId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Lists.Add(new ListEntity
|
||||||
|
{
|
||||||
|
Id = listId, Name = "test", CreatedAt = DateTime.UtcNow,
|
||||||
|
WorkingDir = repo.RepoDir,
|
||||||
|
});
|
||||||
|
|
||||||
|
var parentId = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = parentId, ListId = listId, Title = "plan", CreatedAt = DateTime.UtcNow,
|
||||||
|
Status = TaskStatus.Planned, SortOrder = 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
var subA = Guid.NewGuid().ToString();
|
||||||
|
var subB = Guid.NewGuid().ToString();
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = subA, ListId = listId, Title = "child A", CreatedAt = DateTime.UtcNow,
|
||||||
|
ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 1,
|
||||||
|
});
|
||||||
|
ctx.Tasks.Add(new TaskEntity
|
||||||
|
{
|
||||||
|
Id = subB, ListId = listId, Title = "child B", CreatedAt = DateTime.UtcNow,
|
||||||
|
ParentTaskId = parentId, Status = TaskStatus.Done, SortOrder = 2,
|
||||||
|
});
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
|
||||||
|
SeedWorktree(ctx, repo, subA, "fileA.txt", "content A");
|
||||||
|
SeedWorktree(ctx, repo, subB, "fileB.txt", "content B");
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
|
||||||
|
return (parentId, subA, subB);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void SeedWorktree(ClaudeDoDbContext ctx, GitRepoFixture repo, string taskId, string filename, string content)
|
||||||
|
{
|
||||||
|
var wtPath = Path.Combine(Path.GetTempPath(), $"wt_{Guid.NewGuid():N}");
|
||||||
|
_wtCleanups.Add((repo.RepoDir, wtPath));
|
||||||
|
var branch = $"claudedo/{taskId[..8]}";
|
||||||
|
GitRepoFixture.RunGit(repo.RepoDir, "worktree", "add", "-b", branch, wtPath, repo.BaseCommit);
|
||||||
|
File.WriteAllText(Path.Combine(wtPath, filename), content);
|
||||||
|
GitRepoFixture.RunGit(wtPath, "add", filename);
|
||||||
|
GitRepoFixture.RunGit(wtPath, "commit", "-m", $"add {filename}");
|
||||||
|
var head = GitRepoFixture.RunGit(wtPath, "rev-parse", "HEAD").Trim();
|
||||||
|
|
||||||
|
ctx.Worktrees.Add(new WorktreeEntity
|
||||||
|
{
|
||||||
|
TaskId = taskId,
|
||||||
|
Path = wtPath,
|
||||||
|
BranchName = branch,
|
||||||
|
BaseCommit = repo.BaseCommit,
|
||||||
|
HeadCommit = head,
|
||||||
|
DiffStat = null,
|
||||||
|
State = WorktreeState.Active,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private (PlanningMergeOrchestrator orch, List<(string Method, object?[] Args)> calls) BuildOrchestrator(DbFixture db)
|
||||||
|
{
|
||||||
|
var fakeHub = new OrchestratorFakeHubContext();
|
||||||
|
var spy = fakeHub.RecordingClients.Proxy;
|
||||||
|
var broadcaster = new HubBroadcaster(fakeHub);
|
||||||
|
var git = new GitService();
|
||||||
|
var factory = db.CreateFactory();
|
||||||
|
var merge = new TaskMergeService(
|
||||||
|
factory, git, broadcaster,
|
||||||
|
NullLogger<TaskMergeService>.Instance);
|
||||||
|
var aggregator = new PlanningAggregator(
|
||||||
|
factory, git,
|
||||||
|
NullLogger<PlanningAggregator>.Instance);
|
||||||
|
var orch = new PlanningMergeOrchestrator(
|
||||||
|
factory, merge, aggregator, broadcaster,
|
||||||
|
NullLogger<PlanningMergeOrchestrator>.Instance);
|
||||||
|
return (orch, spy.Calls);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user