using System.Collections.Concurrent; using ClaudeDo.Data; using ClaudeDo.Data.Git; using ClaudeDo.Data.Models; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Services; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Planning; public sealed class PlanningMergeOrchestrator { private readonly IDbContextFactory _dbFactory; private readonly TaskMergeService _merge; private readonly PlanningAggregator _aggregator; private readonly HubBroadcaster _broadcaster; private readonly GitService _git; private readonly ILogger _logger; private sealed class State { public required string TargetBranch { get; init; } public required Queue RemainingSubtaskIds { get; init; } public string? CurrentSubtaskId { get; set; } } private readonly ConcurrentDictionary _states = new(); public PlanningMergeOrchestrator( IDbContextFactory dbFactory, TaskMergeService merge, PlanningAggregator aggregator, HubBroadcaster broadcaster, GitService git, ILogger logger) { _dbFactory = dbFactory; _merge = merge; _aggregator = aggregator; _broadcaster = broadcaster; _git = git; _logger = logger; } public async Task StartAsync(string planningTaskId, string targetBranch, CancellationToken ct) { string workingDir; List children; using (var ctx = _dbFactory.CreateDbContext()) { var planning = await ctx.Tasks .Include(t => t.List) .Include(t => t.Children).ThenInclude(c => c.Worktree) .SingleOrDefaultAsync(t => t.Id == planningTaskId, ct) ?? throw new KeyNotFoundException($"Planning task '{planningTaskId}' not found."); workingDir = planning.List.WorkingDir ?? throw new InvalidOperationException("List has no working directory."); children = planning.Children.OrderBy(c => c.SortOrder).ToList(); } foreach (var c in children) { if (c.Status != TaskStatus.Done) throw new InvalidOperationException($"subtask {c.Id} is not Done (status {c.Status})"); if (c.Worktree is null) throw new InvalidOperationException($"subtask {c.Id} has no worktree"); if (c.Worktree.State != WorktreeState.Active && c.Worktree.State != WorktreeState.Merged) throw new InvalidOperationException( $"subtask {c.Id} worktree state is {c.Worktree.State}"); } if (await _git.IsMidMergeAsync(workingDir, ct)) throw new InvalidOperationException("repo is mid-merge"); if (await _git.HasChangesAsync(workingDir, ct)) throw new InvalidOperationException("working tree has uncommitted changes"); var queue = new Queue( children .Where(c => c.Worktree!.State == WorktreeState.Active) .Select(c => c.Id)); var state = new State { TargetBranch = targetBranch, RemainingSubtaskIds = queue }; if (!_states.TryAdd(planningTaskId, state)) throw new InvalidOperationException($"Merge already in progress for {planningTaskId}."); await _broadcaster.PlanningMergeStarted(planningTaskId, targetBranch); await DrainAsync(planningTaskId, ct); } public async Task ContinueAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state) || state.CurrentSubtaskId is null) throw new InvalidOperationException("no in-progress merge to continue"); var current = state.CurrentSubtaskId; var result = await _merge.ContinueMergeAsync(current, ct); if (result.Status == TaskMergeService.StatusConflict) { await _broadcaster.PlanningMergeConflict(planningTaskId, current, result.ConflictFiles); return; } if (result.Status != TaskMergeService.StatusMerged) { _logger.LogWarning( "Planning continue blocked on subtask {Subtask}: {Msg}", current, result.ErrorMessage); _states.TryRemove(planningTaskId, out _); await _broadcaster.PlanningMergeAborted(planningTaskId); return; } await _broadcaster.PlanningSubtaskMerged(planningTaskId, current); state.CurrentSubtaskId = null; await DrainAsync(planningTaskId, ct); } public async Task AbortAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state) || state.CurrentSubtaskId is null) throw new InvalidOperationException("no in-progress merge to abort"); await _merge.AbortMergeAsync(state.CurrentSubtaskId, ct); _states.TryRemove(planningTaskId, out _); await _broadcaster.PlanningMergeAborted(planningTaskId); } private async Task DrainAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state)) return; var keepState = false; try { while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId)) { state.CurrentSubtaskId = subtaskId; var result = await _merge.MergeAsync( subtaskId, state.TargetBranch, removeWorktree: true, commitMessage: "Merge subtask", leaveConflictsInTree: true, ct); if (result.Status == TaskMergeService.StatusConflict) { await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles); keepState = true; return; } if (result.Status != TaskMergeService.StatusMerged) { _logger.LogWarning( "Planning merge blocked on subtask {Subtask}: {Msg}", subtaskId, result.ErrorMessage); await _broadcaster.PlanningMergeAborted(planningTaskId); return; // keepState stays false → finally removes the state entry } await _broadcaster.PlanningSubtaskMerged(planningTaskId, subtaskId); } state.CurrentSubtaskId = null; await FinalizePlanningDoneAsync(planningTaskId, ct); await _broadcaster.PlanningCompleted(planningTaskId); } finally { if (!keepState) _states.TryRemove(planningTaskId, out _); } } private async Task FinalizePlanningDoneAsync(string planningTaskId, CancellationToken ct) { using var ctx = _dbFactory.CreateDbContext(); var planning = await ctx.Tasks.SingleOrDefaultAsync(t => t.Id == planningTaskId, ct); if (planning is null) return; planning.Status = TaskStatus.Done; planning.FinishedAt = DateTime.UtcNow; await ctx.SaveChangesAsync(ct); try { await _aggregator.CleanupIntegrationBranchAsync(planningTaskId, ct); } catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); } } }