feat(worker): refine planning chain re-shape on re-run

SetupChainAsync now sequences only non-terminal children (Idle/Queued).
Done/Failed/Cancelled rows are left in place so a re-run on a partially
executed chain keeps history intact and only reshapes the tail. Running
children abort the op since the chain cannot be reshaped mid-flight.
First non-terminal child is explicitly unblocked.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-04-30 14:17:29 +02:00
parent 9ba238f4ad
commit d4d5a4b8e7
2 changed files with 83 additions and 10 deletions

View File

@@ -20,9 +20,14 @@ public sealed class PlanningChainCoordinator
}
// Sets up a sequential queue chain over a planning parent's children.
// - First child gets Status=Queued (auto-wakes the queue picker).
// - Each subsequent child gets Status=Queued + BlockedByTaskId=<predecessor>,
// - First non-terminal child gets Status=Queued, BlockedByTaskId=null.
// - Each subsequent non-terminal child gets Status=Queued + BlockedByTaskId=<predecessor>,
// so the picker skips them until the predecessor finishes.
// - Terminal children (Done/Failed/Cancelled) are left untouched; they are
// skipped when computing predecessors so a re-run on a partially executed
// chain leaves history alone but still reshapes the tail.
// - Running children abort the operation — the chain cannot be reshaped while
// one of its members is mid-flight.
// The "agent" tag is auto-attached to every child so the picker can claim them.
// Returns the number of children placed in the chain.
internal async Task<int> SetupChainAsync(string parentTaskId, CancellationToken ct = default)
@@ -39,10 +44,10 @@ public sealed class PlanningChainCoordinator
if (children.Count == 0)
throw new InvalidOperationException("Parent has no subtasks.");
var bad = children.FirstOrDefault(c => c.Status != TaskStatus.Idle);
if (bad is not null)
var running = children.FirstOrDefault(c => c.Status == TaskStatus.Running);
if (running is not null)
throw new InvalidOperationException(
$"Child {bad.Id} is in status {bad.Status}; expected Idle.");
$"Child {running.Id} is running; cannot reshape chain.");
// Worker queue picker requires the "agent" tag — attach it so children are pickable.
var agentTag = await ctx.Tags.FirstOrDefaultAsync(t => t.Name == "agent", ct);
@@ -56,15 +61,23 @@ public sealed class PlanningChainCoordinator
await ctx.SaveChangesAsync(ct);
}
// Re-shape over Idle and Queued children only; leave Done/Failed/Cancelled
// (terminal) results in place.
var sequenceable = children
.Where(c => c.Status == TaskStatus.Idle || c.Status == TaskStatus.Queued)
.ToList();
var state = _state();
for (int i = 0; i < children.Count; i++)
for (int i = 0; i < sequenceable.Count; i++)
{
await state.EnqueueAsync(children[i].Id, ct);
if (i > 0)
await state.BlockOnAsync(children[i].Id, children[i - 1].Id, ct);
await state.EnqueueAsync(sequenceable[i].Id, ct);
if (i == 0)
await state.UnblockAsync(sequenceable[i].Id, ct);
else
await state.BlockOnAsync(sequenceable[i].Id, sequenceable[i - 1].Id, ct);
}
return children.Count;
return sequenceable.Count;
}
public async Task<string?> OnChildFinishedAsync(