using System.Collections.Concurrent; using ClaudeDo.Data; using ClaudeDo.Data.Git; using ClaudeDo.Data.Models; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Lifecycle; using ClaudeDo.Worker.State; using Microsoft.EntityFrameworkCore; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Planning; public sealed class PlanningMergeOrchestrator { private readonly IDbContextFactory _dbFactory; private readonly TaskMergeService _merge; private readonly PlanningAggregator _aggregator; private readonly HubBroadcaster _broadcaster; private readonly GitService _git; private readonly ITaskStateService _state; private readonly ILogger _logger; private sealed class State { public required string TargetBranch { get; init; } public required Queue RemainingSubtaskIds { get; init; } public required bool IsPlanning { get; init; } public string? CurrentSubtaskId { get; set; } } private readonly ConcurrentDictionary _states = new(); public PlanningMergeOrchestrator( IDbContextFactory dbFactory, TaskMergeService merge, PlanningAggregator aggregator, HubBroadcaster broadcaster, GitService git, ITaskStateService state, ILogger logger) { _dbFactory = dbFactory; _merge = merge; _aggregator = aggregator; _broadcaster = broadcaster; _git = git; _state = state; _logger = logger; } public async Task StartAsync(string parentTaskId, string targetBranch, CancellationToken ct) { string workingDir; List children; bool isPlanning; bool parentHasWorktree; using (var ctx = _dbFactory.CreateDbContext()) { var parent = await ctx.Tasks .Include(t => t.List) .Include(t => t.Worktree) .Include(t => t.Children).ThenInclude(c => c.Worktree) .SingleOrDefaultAsync(t => t.Id == parentTaskId, ct) ?? throw new KeyNotFoundException($"Planning task '{parentTaskId}' not found."); workingDir = parent.List.WorkingDir ?? throw new InvalidOperationException("List has no working directory."); children = parent.Children.OrderBy(c => c.SortOrder).ToList(); isPlanning = parent.PlanningPhase != PlanningPhase.None; parentHasWorktree = parent.Worktree is { State: WorktreeState.Active }; } if (isPlanning) { foreach (var c in children) { if (c.Status != TaskStatus.Done) throw new InvalidOperationException($"subtask {c.Id} is not Done (status {c.Status})"); if (c.Worktree is null) throw new InvalidOperationException($"subtask {c.Id} has no worktree"); if (c.Worktree.State != WorktreeState.Active && c.Worktree.State != WorktreeState.Merged) throw new InvalidOperationException( $"subtask {c.Id} worktree state is {c.Worktree.State}"); } } if (await _git.IsMidMergeAsync(workingDir, ct)) throw new InvalidOperationException( "repo is mid-merge; use AbortPlanningMerge to reset the repository, then Approve again"); if (await _git.HasChangesAsync(workingDir, ct)) throw new InvalidOperationException("working tree has uncommitted changes"); var idsToMerge = new List(); if (!isPlanning && parentHasWorktree) idsToMerge.Add(parentTaskId); idsToMerge.AddRange( children .Where(c => c.Status == TaskStatus.Done && c.Worktree is { State: WorktreeState.Active }) .Select(c => c.Id)); var queue = new Queue(idsToMerge); var state = new State { TargetBranch = targetBranch, RemainingSubtaskIds = queue, IsPlanning = isPlanning }; if (!_states.TryAdd(parentTaskId, state)) throw new InvalidOperationException($"Merge already in progress for {parentTaskId}."); await _broadcaster.PlanningMergeStarted(parentTaskId, targetBranch); await DrainAsync(parentTaskId, ct); } public async Task ContinueAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state) || state.CurrentSubtaskId is null) throw new InvalidOperationException( "no in-progress merge to continue; if the worker was restarted during a conflict, use AbortPlanningMerge to reset the repository"); var current = state.CurrentSubtaskId; var result = await _merge.ContinueMergeAsync(current, ct); if (result.Status == TaskMergeService.StatusConflict) { await _broadcaster.PlanningMergeConflict(planningTaskId, current, result.ConflictFiles); return; } if (result.Status != TaskMergeService.StatusMerged) { _logger.LogWarning( "Planning continue blocked on subtask {Subtask}: {Msg}", current, result.ErrorMessage); _states.TryRemove(planningTaskId, out _); await _broadcaster.PlanningMergeAborted(planningTaskId); return; } await _broadcaster.PlanningSubtaskMerged(planningTaskId, current); state.CurrentSubtaskId = null; await DrainAsync(planningTaskId, ct); } public async Task AbortAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state) || state.CurrentSubtaskId is null) { // No in-memory state — worker may have been restarted while a conflict was paused. // Check whether the list repo is still mid-merge and abort it directly. await AbortStatelessAsync(planningTaskId, ct); return; } await _merge.AbortMergeAsync(state.CurrentSubtaskId, ct); _states.TryRemove(planningTaskId, out _); await _broadcaster.PlanningMergeAborted(planningTaskId); } private async Task AbortStatelessAsync(string planningTaskId, CancellationToken ct) { string? workingDir; await using (var ctx = _dbFactory.CreateDbContext()) { workingDir = await ctx.Tasks .Where(t => t.Id == planningTaskId) .Select(t => t.List.WorkingDir) .FirstOrDefaultAsync(ct); } if (string.IsNullOrWhiteSpace(workingDir) || !await _git.IsMidMergeAsync(workingDir, ct)) throw new InvalidOperationException("no in-progress merge to abort"); await _git.MergeAbortAsync(workingDir, ct); _logger.LogInformation( "Stateless abort of mid-merge for planning task {ParentId} (post-restart recovery)", planningTaskId); await _broadcaster.PlanningMergeAborted(planningTaskId); // Parent remains WaitingForReview — Approve will restart the unit merge from scratch. } private async Task DrainAsync(string planningTaskId, CancellationToken ct) { if (!_states.TryGetValue(planningTaskId, out var state)) return; var keepState = false; try { while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId)) { state.CurrentSubtaskId = subtaskId; var result = await _merge.MergeAsync( subtaskId, state.TargetBranch, removeWorktree: true, commitMessage: "Merge subtask", leaveConflictsInTree: true, ct); if (result.Status == TaskMergeService.StatusConflict) { await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles); keepState = true; return; } if (result.Status != TaskMergeService.StatusMerged) { _logger.LogWarning( "Planning merge blocked on subtask {Subtask}: {Msg}", subtaskId, result.ErrorMessage); await _broadcaster.PlanningMergeAborted(planningTaskId); return; // keepState stays false → finally removes the state entry } await _broadcaster.PlanningSubtaskMerged(planningTaskId, subtaskId); } state.CurrentSubtaskId = null; var finalized = await FinalizeParentDoneAsync(planningTaskId, state.IsPlanning, ct); if (finalized) await _broadcaster.PlanningCompleted(planningTaskId); } finally { if (!keepState) _states.TryRemove(planningTaskId, out _); } } private async Task FinalizeParentDoneAsync(string parentTaskId, bool isPlanning, CancellationToken ct) { var result = await _state.ApproveReviewAsync(parentTaskId, ct); if (!result.Ok) { // ApproveReviewAsync requires WaitingForReview. For improvement parents whose own // worktree is in the merge queue, TaskMergeService.ApproveIfWaitingForReviewAsync // already approved the parent during the drain — check for that expected path. await using var ctx = _dbFactory.CreateDbContext(); var current = await ctx.Tasks .Where(t => t.Id == parentTaskId) .Select(t => (TaskStatus?)t.Status) .FirstOrDefaultAsync(ct); if (current != TaskStatus.Done) { // Parent was cancelled or moved to an unexpected state during the merge drain. // Do not overwrite — the external transition takes precedence. _logger.LogWarning( "Unit-merge drain completed but parent {ParentTaskId} could not be finalized (status: {Status}): {Reason}", parentTaskId, current, result.Reason); return false; } } // Only planning builds an integration branch via the aggregator; skip cleanup otherwise. if (isPlanning) { try { await _aggregator.CleanupIntegrationBranchAsync(parentTaskId, ct); } catch (Exception ex) { _logger.LogWarning(ex, "integration branch cleanup failed"); } } return true; } }