From 16e1ddd1291e83ba3b3aa1239ff181f48c534e7a Mon Sep 17 00:00:00 2001 From: mika kuns Date: Sat, 25 Apr 2026 09:36:01 +0200 Subject: [PATCH] feat(worker): add PlanningChainCoordinator for sequential subtask execution Coordinates Waiting -> Queued transitions between sibling subtasks: when a child finishes Done, the next Waiting sibling is promoted to Queued. WorkerHub.QueuePlanningSubtasksAsync exposes this to the UI; TaskRunner advances the chain on completion. Also tightens the planning-session prompt: planner must use MCP tools, not direct edits. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ClaudeDo.Worker/Hub/WorkerHub.cs | 28 ++- .../Planning/PlanningChainCoordinator.cs | 63 +++++++ .../Planning/PlanningSessionManager.cs | 7 +- src/ClaudeDo.Worker/Program.cs | 1 + src/ClaudeDo.Worker/Runner/TaskRunner.cs | 24 ++- .../Planning/PlanningChainCoordinatorTests.cs | 164 ++++++++++++++++++ 6 files changed, 283 insertions(+), 4 deletions(-) create mode 100644 src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs create mode 100644 tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index f38e8e0..f2176af 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -48,6 +48,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub private readonly IPlanningTerminalLauncher _launcher; private readonly PlanningAggregator _planningAggregator; private readonly PlanningMergeOrchestrator _planningMergeOrchestrator; + private readonly PlanningChainCoordinator _planningChain; public WorkerHub( QueueService queue, @@ -61,7 +62,8 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub PlanningSessionManager planning, IPlanningTerminalLauncher launcher, PlanningAggregator planningAggregator, - PlanningMergeOrchestrator planningMergeOrchestrator) + PlanningMergeOrchestrator planningMergeOrchestrator, + PlanningChainCoordinator planningChain) { _queue = queue; _agentService = agentService; @@ -75,6 +77,30 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub _launcher = launcher; _planningAggregator = planningAggregator; _planningMergeOrchestrator = planningMergeOrchestrator; + _planningChain = planningChain; + } + + public async Task QueuePlanningSubtasksAsync(string parentTaskId) + { + try + { + await _planningChain.QueueSubtasksSequentiallyAsync(parentTaskId, Context.ConnectionAborted); + } + catch (InvalidOperationException ex) + { + throw new HubException(ex.Message); + } + + await using var ctx = await _dbFactory.CreateDbContextAsync(); + var childIds = await ctx.Tasks + .Where(t => t.ParentTaskId == parentTaskId) + .Select(t => t.Id) + .ToListAsync(); + await _broadcaster.TaskUpdated(parentTaskId); + foreach (var id in childIds) + await _broadcaster.TaskUpdated(id); + + _queue.WakeQueue(); } public string Ping() => $"pong v{Version}"; diff --git a/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs b/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs new file mode 100644 index 0000000..2421257 --- /dev/null +++ b/src/ClaudeDo.Worker/Planning/PlanningChainCoordinator.cs @@ -0,0 +1,63 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Planning; + +public sealed class PlanningChainCoordinator +{ + private readonly IDbContextFactory _dbFactory; + + public PlanningChainCoordinator(IDbContextFactory dbFactory) + => _dbFactory = dbFactory; + + public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default) + { + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var parent = await ctx.Tasks.FirstOrDefaultAsync(t => t.Id == parentTaskId, ct) + ?? throw new InvalidOperationException($"Task {parentTaskId} not found."); + + var children = await ctx.Tasks + .Where(t => t.ParentTaskId == parentTaskId) + .OrderBy(t => t.SortOrder).ThenBy(t => t.CreatedAt) + .ToListAsync(ct); + if (children.Count == 0) + throw new InvalidOperationException("Parent has no subtasks."); + + var bad = children.FirstOrDefault(c => + c.Status != TaskStatus.Manual && c.Status != TaskStatus.Planned); + if (bad is not null) + throw new InvalidOperationException( + $"Child {bad.Id} is in status {bad.Status}; expected Manual or Planned."); + + for (int i = 0; i < children.Count; i++) + children[i].Status = i == 0 ? TaskStatus.Queued : TaskStatus.Waiting; + + await ctx.SaveChangesAsync(ct); + } + + public async Task OnChildFinishedAsync( + string childTaskId, TaskStatus finalStatus, CancellationToken ct = default) + { + if (finalStatus != TaskStatus.Done) return null; + + await using var ctx = await _dbFactory.CreateDbContextAsync(ct); + var child = await ctx.Tasks + .AsNoTracking() + .FirstOrDefaultAsync(t => t.Id == childTaskId, ct); + if (child?.ParentTaskId is null) return null; + + var next = await ctx.Tasks + .Where(t => t.ParentTaskId == child.ParentTaskId + && t.SortOrder > child.SortOrder + && t.Status == TaskStatus.Waiting) + .OrderBy(t => t.SortOrder) + .FirstOrDefaultAsync(ct); + if (next is null) return null; + + next.Status = TaskStatus.Queued; + await ctx.SaveChangesAsync(ct); + return next.Id; + } +} diff --git a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs index 7f35756..2af7d06 100644 --- a/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs +++ b/src/ClaudeDo.Worker/Planning/PlanningSessionManager.cs @@ -291,14 +291,17 @@ public sealed class PlanningSessionManager """ You are a planning assistant for ClaudeDo. Your role is to help break down a task into smaller, actionable subtasks. + Your final goal WILL ALWAYS be the creation of Subtasks ALWAYS invoke the `superpowers:brainstorming` skill via the Skill tool at the start of every planning session, and follow its process end-to-end. It guides you through clarifying questions, approach exploration, and design approval BEFORE any subtasks are created. Do not create child tasks until the user has approved a design. - - Use the available MCP tools (mcp__claudedo__*) to create child tasks once the + + NEVER Change files yourself. + + ALWAYS Use the available MCP tools (mcp__claudedo__*) to create child tasks once the design is approved. When you are done planning, finalize the session. Be concise and focused. Each subtask should be independently executable. diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 7b73972..cefec7e 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -38,6 +38,7 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); // Agent file management. var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents"); diff --git a/src/ClaudeDo.Worker/Runner/TaskRunner.cs b/src/ClaudeDo.Worker/Runner/TaskRunner.cs index 240aa32..33d99d8 100644 --- a/src/ClaudeDo.Worker/Runner/TaskRunner.cs +++ b/src/ClaudeDo.Worker/Runner/TaskRunner.cs @@ -3,7 +3,9 @@ using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Planning; using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; namespace ClaudeDo.Worker.Runner; @@ -16,6 +18,7 @@ public sealed class TaskRunner private readonly ClaudeArgsBuilder _argsBuilder; private readonly WorkerConfig _cfg; private readonly ILogger _logger; + private readonly PlanningChainCoordinator _chain; public TaskRunner( IClaudeProcess claude, @@ -24,7 +27,8 @@ public sealed class TaskRunner WorktreeManager wtManager, ClaudeArgsBuilder argsBuilder, WorkerConfig cfg, - ILogger logger) + ILogger logger, + PlanningChainCoordinator chain) { _claude = claude; _dbFactory = dbFactory; @@ -33,6 +37,7 @@ public sealed class TaskRunner _argsBuilder = argsBuilder; _cfg = cfg; _logger = logger; + _chain = chain; } public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) @@ -338,6 +343,23 @@ public sealed class TaskRunner await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", task.Id, result.TurnCount, result.TokensIn, result.TokensOut); + + // Sequential planning chain: if this task has a parent, flip the next + // Waiting sibling to Queued so the queue pickup loop dispatches it next. + if (task.ParentTaskId is not null) + { + try + { + var advanced = await _chain.OnChildFinishedAsync( + task.Id, TaskStatus.Done, CancellationToken.None); + if (advanced is not null) + await _broadcaster.TaskUpdated(advanced); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", task.Id); + } + } } private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result) diff --git a/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs b/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs new file mode 100644 index 0000000..5abea99 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Planning/PlanningChainCoordinatorTests.cs @@ -0,0 +1,164 @@ +using ClaudeDo.Data; +using ClaudeDo.Data.Models; +using ClaudeDo.Worker.Planning; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.EntityFrameworkCore; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Planning; + +public sealed class PlanningChainCoordinatorTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly TestDbContextFactory _factory; + private readonly PlanningChainCoordinator _sut; + private readonly string _listId; + + public PlanningChainCoordinatorTests() + { + _factory = _db.CreateFactory(); + _sut = new PlanningChainCoordinator(_factory); + _listId = Guid.NewGuid().ToString(); + using var ctx = _factory.CreateDbContext(); + ctx.Lists.Add(new ListEntity + { + Id = _listId, + Name = "Test", + CreatedAt = DateTime.UtcNow, + DefaultCommitType = "chore", + }); + ctx.SaveChanges(); + } + + public void Dispose() => _db.Dispose(); + + private async Task SeedPlanningFamilyAsync(string parentId, int childCount) + { + await using var ctx = _factory.CreateDbContext(); + ctx.Tasks.Add(new TaskEntity + { + Id = parentId, + ListId = _listId, + Title = "Parent", + CreatedAt = DateTime.UtcNow, + Status = TaskStatus.Planned, + }); + for (int i = 0; i < childCount; i++) + { + ctx.Tasks.Add(new TaskEntity + { + Id = $"{parentId}-c{i}", + ListId = _listId, + Title = $"Child {i}", + CreatedAt = DateTime.UtcNow, + Status = TaskStatus.Manual, + ParentTaskId = parentId, + SortOrder = i, + }); + } + await ctx.SaveChangesAsync(); + } + + private async Task> GetChildrenAsync(string parentId) + { + await using var ctx = _factory.CreateDbContext(); + return await ctx.Tasks + .AsNoTracking() + .Where(t => t.ParentTaskId == parentId) + .OrderBy(t => t.SortOrder) + .ToListAsync(); + } + + [Fact] + public async Task QueueSubtasksSequentially_SetsFirstQueued_RestWaiting() + { + await SeedPlanningFamilyAsync("P", 3); + + await _sut.QueueSubtasksSequentiallyAsync("P", default); + + var kids = await GetChildrenAsync("P"); + Assert.Equal(TaskStatus.Queued, kids[0].Status); + Assert.Equal(TaskStatus.Waiting, kids[1].Status); + Assert.Equal(TaskStatus.Waiting, kids[2].Status); + } + + [Fact] + public async Task OnChildDone_FlipsNextWaitingToQueued() + { + await SeedPlanningFamilyAsync("P", 3); + await _sut.QueueSubtasksSequentiallyAsync("P", default); + + // Simulate first child finishing Done. + await using (var ctx = _factory.CreateDbContext()) + { + var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0"); + first.Status = TaskStatus.Done; + await ctx.SaveChangesAsync(); + } + + var advanced = await _sut.OnChildFinishedAsync("P-c0", TaskStatus.Done, default); + + Assert.Equal("P-c1", advanced); + var kids = await GetChildrenAsync("P"); + Assert.Equal(TaskStatus.Done, kids[0].Status); + Assert.Equal(TaskStatus.Queued, kids[1].Status); + Assert.Equal(TaskStatus.Waiting, kids[2].Status); + } + + [Fact] + public async Task OnChildFailed_DoesNotAdvanceChain() + { + await SeedPlanningFamilyAsync("P", 3); + await _sut.QueueSubtasksSequentiallyAsync("P", default); + + await using (var ctx = _factory.CreateDbContext()) + { + var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0"); + first.Status = TaskStatus.Failed; + await ctx.SaveChangesAsync(); + } + + var advanced = await _sut.OnChildFinishedAsync("P-c0", TaskStatus.Failed, default); + + Assert.Null(advanced); + var kids = await GetChildrenAsync("P"); + Assert.Equal(TaskStatus.Failed, kids[0].Status); + Assert.Equal(TaskStatus.Waiting, kids[1].Status); + Assert.Equal(TaskStatus.Waiting, kids[2].Status); + } + + [Fact] + public async Task OnChildDone_LastChild_ReturnsNull() + { + await SeedPlanningFamilyAsync("P", 2); + await _sut.QueueSubtasksSequentiallyAsync("P", default); + + // Mark both done, simulating chain reaching the end. + await using (var ctx = _factory.CreateDbContext()) + { + foreach (var t in ctx.Tasks.Where(t => t.ParentTaskId == "P")) + t.Status = TaskStatus.Done; + await ctx.SaveChangesAsync(); + } + + var advanced = await _sut.OnChildFinishedAsync("P-c1", TaskStatus.Done, default); + + Assert.Null(advanced); + } + + [Fact] + public async Task QueueSubtasksSequentially_RejectsNonManualChildren() + { + await SeedPlanningFamilyAsync("P", 2); + // Corrupt one child to be already Queued. + await using (var ctx = _factory.CreateDbContext()) + { + var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0"); + first.Status = TaskStatus.Queued; + await ctx.SaveChangesAsync(); + } + + await Assert.ThrowsAsync( + () => _sut.QueueSubtasksSequentiallyAsync("P", default)); + } +}