fix(worker): prevent PlanningMergeOrchestrator double-drain race and orphaned state

This commit is contained in:
mika kuns
2026-04-24 18:12:21 +02:00
parent 3142ba203f
commit ef070ddab5

View File

@@ -56,11 +56,9 @@ public sealed class PlanningMergeOrchestrator
.Where(c => c.Worktree is not null && c.Worktree.State != WorktreeState.Merged) .Where(c => c.Worktree is not null && c.Worktree.State != WorktreeState.Merged)
.Select(c => c.Id)); .Select(c => c.Id));
_states[planningTaskId] = new State var state = new State { TargetBranch = targetBranch, RemainingSubtaskIds = queue };
{ if (!_states.TryAdd(planningTaskId, state))
TargetBranch = targetBranch, throw new InvalidOperationException($"Merge already in progress for {planningTaskId}.");
RemainingSubtaskIds = queue,
};
await _broadcaster.PlanningMergeStarted(planningTaskId, targetBranch); await _broadcaster.PlanningMergeStarted(planningTaskId, targetBranch);
await DrainAsync(planningTaskId, ct); await DrainAsync(planningTaskId, ct);
@@ -70,6 +68,9 @@ public sealed class PlanningMergeOrchestrator
{ {
if (!_states.TryGetValue(planningTaskId, out var state)) return; if (!_states.TryGetValue(planningTaskId, out var state)) return;
var keepState = false;
try
{
while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId)) while (state.RemainingSubtaskIds.TryDequeue(out var subtaskId))
{ {
state.CurrentSubtaskId = subtaskId; state.CurrentSubtaskId = subtaskId;
@@ -84,6 +85,7 @@ public sealed class PlanningMergeOrchestrator
if (result.Status == TaskMergeService.StatusConflict) if (result.Status == TaskMergeService.StatusConflict)
{ {
await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles); await _broadcaster.PlanningMergeConflict(planningTaskId, subtaskId, result.ConflictFiles);
keepState = true;
return; return;
} }
@@ -92,6 +94,7 @@ public sealed class PlanningMergeOrchestrator
await _broadcaster.PlanningMergeConflict( await _broadcaster.PlanningMergeConflict(
planningTaskId, subtaskId, planningTaskId, subtaskId,
new[] { result.ErrorMessage ?? "merge blocked" }); new[] { result.ErrorMessage ?? "merge blocked" });
keepState = true;
return; return;
} }
@@ -100,9 +103,13 @@ public sealed class PlanningMergeOrchestrator
state.CurrentSubtaskId = null; state.CurrentSubtaskId = null;
await FinalizePlanningDoneAsync(planningTaskId, ct); await FinalizePlanningDoneAsync(planningTaskId, ct);
_states.TryRemove(planningTaskId, out _);
await _broadcaster.PlanningCompleted(planningTaskId); await _broadcaster.PlanningCompleted(planningTaskId);
} }
finally
{
if (!keepState) _states.TryRemove(planningTaskId, out _);
}
}
private async Task FinalizePlanningDoneAsync(string planningTaskId, CancellationToken ct) private async Task FinalizePlanningDoneAsync(string planningTaskId, CancellationToken ct)
{ {