feat(worker): add PlanningChainCoordinator for sequential subtask execution

Coordinates Waiting -> Queued transitions between sibling subtasks: when a child finishes Done, the next Waiting sibling is promoted to Queued. WorkerHub.QueuePlanningSubtasksAsync exposes this to the UI; TaskRunner advances the chain on completion. Also tightens the planning-session prompt: planner must use MCP tools, not direct edits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-04-25 09:36:01 +02:00
parent 288d2ece8b
commit 16e1ddd129
6 changed files with 283 additions and 4 deletions

View File

@@ -48,6 +48,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
private readonly IPlanningTerminalLauncher _launcher; private readonly IPlanningTerminalLauncher _launcher;
private readonly PlanningAggregator _planningAggregator; private readonly PlanningAggregator _planningAggregator;
private readonly PlanningMergeOrchestrator _planningMergeOrchestrator; private readonly PlanningMergeOrchestrator _planningMergeOrchestrator;
private readonly PlanningChainCoordinator _planningChain;
public WorkerHub( public WorkerHub(
QueueService queue, QueueService queue,
@@ -61,7 +62,8 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
PlanningSessionManager planning, PlanningSessionManager planning,
IPlanningTerminalLauncher launcher, IPlanningTerminalLauncher launcher,
PlanningAggregator planningAggregator, PlanningAggregator planningAggregator,
PlanningMergeOrchestrator planningMergeOrchestrator) PlanningMergeOrchestrator planningMergeOrchestrator,
PlanningChainCoordinator planningChain)
{ {
_queue = queue; _queue = queue;
_agentService = agentService; _agentService = agentService;
@@ -75,6 +77,30 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
_launcher = launcher; _launcher = launcher;
_planningAggregator = planningAggregator; _planningAggregator = planningAggregator;
_planningMergeOrchestrator = planningMergeOrchestrator; _planningMergeOrchestrator = planningMergeOrchestrator;
_planningChain = planningChain;
}
public async Task QueuePlanningSubtasksAsync(string parentTaskId)
{
try
{
await _planningChain.QueueSubtasksSequentiallyAsync(parentTaskId, Context.ConnectionAborted);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
await using var ctx = await _dbFactory.CreateDbContextAsync();
var childIds = await ctx.Tasks
.Where(t => t.ParentTaskId == parentTaskId)
.Select(t => t.Id)
.ToListAsync();
await _broadcaster.TaskUpdated(parentTaskId);
foreach (var id in childIds)
await _broadcaster.TaskUpdated(id);
_queue.WakeQueue();
} }
public string Ping() => $"pong v{Version}"; public string Ping() => $"pong v{Version}";

View File

@@ -0,0 +1,63 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Planning;
public sealed class PlanningChainCoordinator
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
public PlanningChainCoordinator(IDbContextFactory<ClaudeDoDbContext> dbFactory)
=> _dbFactory = dbFactory;
public async Task QueueSubtasksSequentiallyAsync(string parentTaskId, CancellationToken ct = default)
{
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var parent = await ctx.Tasks.FirstOrDefaultAsync(t => t.Id == parentTaskId, ct)
?? throw new InvalidOperationException($"Task {parentTaskId} not found.");
var children = await ctx.Tasks
.Where(t => t.ParentTaskId == parentTaskId)
.OrderBy(t => t.SortOrder).ThenBy(t => t.CreatedAt)
.ToListAsync(ct);
if (children.Count == 0)
throw new InvalidOperationException("Parent has no subtasks.");
var bad = children.FirstOrDefault(c =>
c.Status != TaskStatus.Manual && c.Status != TaskStatus.Planned);
if (bad is not null)
throw new InvalidOperationException(
$"Child {bad.Id} is in status {bad.Status}; expected Manual or Planned.");
for (int i = 0; i < children.Count; i++)
children[i].Status = i == 0 ? TaskStatus.Queued : TaskStatus.Waiting;
await ctx.SaveChangesAsync(ct);
}
public async Task<string?> OnChildFinishedAsync(
string childTaskId, TaskStatus finalStatus, CancellationToken ct = default)
{
if (finalStatus != TaskStatus.Done) return null;
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var child = await ctx.Tasks
.AsNoTracking()
.FirstOrDefaultAsync(t => t.Id == childTaskId, ct);
if (child?.ParentTaskId is null) return null;
var next = await ctx.Tasks
.Where(t => t.ParentTaskId == child.ParentTaskId
&& t.SortOrder > child.SortOrder
&& t.Status == TaskStatus.Waiting)
.OrderBy(t => t.SortOrder)
.FirstOrDefaultAsync(ct);
if (next is null) return null;
next.Status = TaskStatus.Queued;
await ctx.SaveChangesAsync(ct);
return next.Id;
}
}

View File

@@ -291,6 +291,7 @@ public sealed class PlanningSessionManager
""" """
You are a planning assistant for ClaudeDo. You are a planning assistant for ClaudeDo.
Your role is to help break down a task into smaller, actionable subtasks. Your role is to help break down a task into smaller, actionable subtasks.
Your final goal WILL ALWAYS be the creation of Subtasks
ALWAYS invoke the `superpowers:brainstorming` skill via the Skill tool at the ALWAYS invoke the `superpowers:brainstorming` skill via the Skill tool at the
start of every planning session, and follow its process end-to-end. It guides start of every planning session, and follow its process end-to-end. It guides
@@ -298,7 +299,9 @@ public sealed class PlanningSessionManager
BEFORE any subtasks are created. Do not create child tasks until the user has BEFORE any subtasks are created. Do not create child tasks until the user has
approved a design. approved a design.
Use the available MCP tools (mcp__claudedo__*) to create child tasks once the NEVER Change files yourself.
ALWAYS Use the available MCP tools (mcp__claudedo__*) to create child tasks once the
design is approved. When you are done planning, finalize the session. design is approved. When you are done planning, finalize the session.
Be concise and focused. Each subtask should be independently executable. Be concise and focused. Each subtask should be independently executable.

View File

@@ -38,6 +38,7 @@ builder.Services.AddSingleton<TaskResetService>();
builder.Services.AddSingleton<TaskMergeService>(); builder.Services.AddSingleton<TaskMergeService>();
builder.Services.AddSingleton<PlanningAggregator>(); builder.Services.AddSingleton<PlanningAggregator>();
builder.Services.AddSingleton<PlanningMergeOrchestrator>(); builder.Services.AddSingleton<PlanningMergeOrchestrator>();
builder.Services.AddSingleton<PlanningChainCoordinator>();
// Agent file management. // Agent file management.
var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents"); var agentsDir = Path.Combine(ClaudeDo.Data.Paths.AppDataRoot(), "agents");

View File

@@ -3,7 +3,9 @@ using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories; using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Config;
using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Hub;
using ClaudeDo.Worker.Planning;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Runner; namespace ClaudeDo.Worker.Runner;
@@ -16,6 +18,7 @@ public sealed class TaskRunner
private readonly ClaudeArgsBuilder _argsBuilder; private readonly ClaudeArgsBuilder _argsBuilder;
private readonly WorkerConfig _cfg; private readonly WorkerConfig _cfg;
private readonly ILogger<TaskRunner> _logger; private readonly ILogger<TaskRunner> _logger;
private readonly PlanningChainCoordinator _chain;
public TaskRunner( public TaskRunner(
IClaudeProcess claude, IClaudeProcess claude,
@@ -24,7 +27,8 @@ public sealed class TaskRunner
WorktreeManager wtManager, WorktreeManager wtManager,
ClaudeArgsBuilder argsBuilder, ClaudeArgsBuilder argsBuilder,
WorkerConfig cfg, WorkerConfig cfg,
ILogger<TaskRunner> logger) ILogger<TaskRunner> logger,
PlanningChainCoordinator chain)
{ {
_claude = claude; _claude = claude;
_dbFactory = dbFactory; _dbFactory = dbFactory;
@@ -33,6 +37,7 @@ public sealed class TaskRunner
_argsBuilder = argsBuilder; _argsBuilder = argsBuilder;
_cfg = cfg; _cfg = cfg;
_logger = logger; _logger = logger;
_chain = chain;
} }
public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct) public async Task RunAsync(TaskEntity task, string slot, CancellationToken ct)
@@ -338,6 +343,23 @@ public sealed class TaskRunner
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt); await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})", _logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
task.Id, result.TurnCount, result.TokensIn, result.TokensOut); task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
// Sequential planning chain: if this task has a parent, flip the next
// Waiting sibling to Queued so the queue pickup loop dispatches it next.
if (task.ParentTaskId is not null)
{
try
{
var advanced = await _chain.OnChildFinishedAsync(
task.Id, TaskStatus.Done, CancellationToken.None);
if (advanced is not null)
await _broadcaster.TaskUpdated(advanced);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "PlanningChain advance failed for {TaskId}", task.Id);
}
}
} }
private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result) private async Task HandleFailure(string taskId, string taskTitle, string slot, RunResult result)

View File

@@ -0,0 +1,164 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Tests.Infrastructure;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Tests.Planning;
public sealed class PlanningChainCoordinatorTests : IDisposable
{
private readonly DbFixture _db = new();
private readonly TestDbContextFactory _factory;
private readonly PlanningChainCoordinator _sut;
private readonly string _listId;
public PlanningChainCoordinatorTests()
{
_factory = _db.CreateFactory();
_sut = new PlanningChainCoordinator(_factory);
_listId = Guid.NewGuid().ToString();
using var ctx = _factory.CreateDbContext();
ctx.Lists.Add(new ListEntity
{
Id = _listId,
Name = "Test",
CreatedAt = DateTime.UtcNow,
DefaultCommitType = "chore",
});
ctx.SaveChanges();
}
public void Dispose() => _db.Dispose();
private async Task SeedPlanningFamilyAsync(string parentId, int childCount)
{
await using var ctx = _factory.CreateDbContext();
ctx.Tasks.Add(new TaskEntity
{
Id = parentId,
ListId = _listId,
Title = "Parent",
CreatedAt = DateTime.UtcNow,
Status = TaskStatus.Planned,
});
for (int i = 0; i < childCount; i++)
{
ctx.Tasks.Add(new TaskEntity
{
Id = $"{parentId}-c{i}",
ListId = _listId,
Title = $"Child {i}",
CreatedAt = DateTime.UtcNow,
Status = TaskStatus.Manual,
ParentTaskId = parentId,
SortOrder = i,
});
}
await ctx.SaveChangesAsync();
}
private async Task<List<TaskEntity>> GetChildrenAsync(string parentId)
{
await using var ctx = _factory.CreateDbContext();
return await ctx.Tasks
.AsNoTracking()
.Where(t => t.ParentTaskId == parentId)
.OrderBy(t => t.SortOrder)
.ToListAsync();
}
[Fact]
public async Task QueueSubtasksSequentially_SetsFirstQueued_RestWaiting()
{
await SeedPlanningFamilyAsync("P", 3);
await _sut.QueueSubtasksSequentiallyAsync("P", default);
var kids = await GetChildrenAsync("P");
Assert.Equal(TaskStatus.Queued, kids[0].Status);
Assert.Equal(TaskStatus.Waiting, kids[1].Status);
Assert.Equal(TaskStatus.Waiting, kids[2].Status);
}
[Fact]
public async Task OnChildDone_FlipsNextWaitingToQueued()
{
await SeedPlanningFamilyAsync("P", 3);
await _sut.QueueSubtasksSequentiallyAsync("P", default);
// Simulate first child finishing Done.
await using (var ctx = _factory.CreateDbContext())
{
var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0");
first.Status = TaskStatus.Done;
await ctx.SaveChangesAsync();
}
var advanced = await _sut.OnChildFinishedAsync("P-c0", TaskStatus.Done, default);
Assert.Equal("P-c1", advanced);
var kids = await GetChildrenAsync("P");
Assert.Equal(TaskStatus.Done, kids[0].Status);
Assert.Equal(TaskStatus.Queued, kids[1].Status);
Assert.Equal(TaskStatus.Waiting, kids[2].Status);
}
[Fact]
public async Task OnChildFailed_DoesNotAdvanceChain()
{
await SeedPlanningFamilyAsync("P", 3);
await _sut.QueueSubtasksSequentiallyAsync("P", default);
await using (var ctx = _factory.CreateDbContext())
{
var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0");
first.Status = TaskStatus.Failed;
await ctx.SaveChangesAsync();
}
var advanced = await _sut.OnChildFinishedAsync("P-c0", TaskStatus.Failed, default);
Assert.Null(advanced);
var kids = await GetChildrenAsync("P");
Assert.Equal(TaskStatus.Failed, kids[0].Status);
Assert.Equal(TaskStatus.Waiting, kids[1].Status);
Assert.Equal(TaskStatus.Waiting, kids[2].Status);
}
[Fact]
public async Task OnChildDone_LastChild_ReturnsNull()
{
await SeedPlanningFamilyAsync("P", 2);
await _sut.QueueSubtasksSequentiallyAsync("P", default);
// Mark both done, simulating chain reaching the end.
await using (var ctx = _factory.CreateDbContext())
{
foreach (var t in ctx.Tasks.Where(t => t.ParentTaskId == "P"))
t.Status = TaskStatus.Done;
await ctx.SaveChangesAsync();
}
var advanced = await _sut.OnChildFinishedAsync("P-c1", TaskStatus.Done, default);
Assert.Null(advanced);
}
[Fact]
public async Task QueueSubtasksSequentially_RejectsNonManualChildren()
{
await SeedPlanningFamilyAsync("P", 2);
// Corrupt one child to be already Queued.
await using (var ctx = _factory.CreateDbContext())
{
var first = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0");
first.Status = TaskStatus.Queued;
await ctx.SaveChangesAsync();
}
await Assert.ThrowsAsync<InvalidOperationException>(
() => _sut.QueueSubtasksSequentiallyAsync("P", default));
}
}