feat(worker): refine planning chain re-shape on re-run
SetupChainAsync now sequences only non-terminal children (Idle/Queued). Done/Failed/Cancelled rows are left in place so a re-run on a partially executed chain keeps history intact and only reshapes the tail. Running children abort the op since the chain cannot be reshaped mid-flight. First non-terminal child is explicitly unblocked. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -20,9 +20,14 @@ public sealed class PlanningChainCoordinator
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sets up a sequential queue chain over a planning parent's children.
|
// Sets up a sequential queue chain over a planning parent's children.
|
||||||
// - First child gets Status=Queued (auto-wakes the queue picker).
|
// - First non-terminal child gets Status=Queued, BlockedByTaskId=null.
|
||||||
// - Each subsequent child gets Status=Queued + BlockedByTaskId=<predecessor>,
|
// - Each subsequent non-terminal child gets Status=Queued + BlockedByTaskId=<predecessor>,
|
||||||
// so the picker skips them until the predecessor finishes.
|
// so the picker skips them until the predecessor finishes.
|
||||||
|
// - Terminal children (Done/Failed/Cancelled) are left untouched; they are
|
||||||
|
// skipped when computing predecessors so a re-run on a partially executed
|
||||||
|
// chain leaves history alone but still reshapes the tail.
|
||||||
|
// - Running children abort the operation — the chain cannot be reshaped while
|
||||||
|
// one of its members is mid-flight.
|
||||||
// The "agent" tag is auto-attached to every child so the picker can claim them.
|
// The "agent" tag is auto-attached to every child so the picker can claim them.
|
||||||
// Returns the number of children placed in the chain.
|
// Returns the number of children placed in the chain.
|
||||||
internal async Task<int> SetupChainAsync(string parentTaskId, CancellationToken ct = default)
|
internal async Task<int> SetupChainAsync(string parentTaskId, CancellationToken ct = default)
|
||||||
@@ -39,10 +44,10 @@ public sealed class PlanningChainCoordinator
|
|||||||
if (children.Count == 0)
|
if (children.Count == 0)
|
||||||
throw new InvalidOperationException("Parent has no subtasks.");
|
throw new InvalidOperationException("Parent has no subtasks.");
|
||||||
|
|
||||||
var bad = children.FirstOrDefault(c => c.Status != TaskStatus.Idle);
|
var running = children.FirstOrDefault(c => c.Status == TaskStatus.Running);
|
||||||
if (bad is not null)
|
if (running is not null)
|
||||||
throw new InvalidOperationException(
|
throw new InvalidOperationException(
|
||||||
$"Child {bad.Id} is in status {bad.Status}; expected Idle.");
|
$"Child {running.Id} is running; cannot reshape chain.");
|
||||||
|
|
||||||
// Worker queue picker requires the "agent" tag — attach it so children are pickable.
|
// Worker queue picker requires the "agent" tag — attach it so children are pickable.
|
||||||
var agentTag = await ctx.Tags.FirstOrDefaultAsync(t => t.Name == "agent", ct);
|
var agentTag = await ctx.Tags.FirstOrDefaultAsync(t => t.Name == "agent", ct);
|
||||||
@@ -56,15 +61,23 @@ public sealed class PlanningChainCoordinator
|
|||||||
await ctx.SaveChangesAsync(ct);
|
await ctx.SaveChangesAsync(ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-shape over Idle and Queued children only; leave Done/Failed/Cancelled
|
||||||
|
// (terminal) results in place.
|
||||||
|
var sequenceable = children
|
||||||
|
.Where(c => c.Status == TaskStatus.Idle || c.Status == TaskStatus.Queued)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
var state = _state();
|
var state = _state();
|
||||||
for (int i = 0; i < children.Count; i++)
|
for (int i = 0; i < sequenceable.Count; i++)
|
||||||
{
|
{
|
||||||
await state.EnqueueAsync(children[i].Id, ct);
|
await state.EnqueueAsync(sequenceable[i].Id, ct);
|
||||||
if (i > 0)
|
if (i == 0)
|
||||||
await state.BlockOnAsync(children[i].Id, children[i - 1].Id, ct);
|
await state.UnblockAsync(sequenceable[i].Id, ct);
|
||||||
|
else
|
||||||
|
await state.BlockOnAsync(sequenceable[i].Id, sequenceable[i - 1].Id, ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
return children.Count;
|
return sequenceable.Count;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<string?> OnChildFinishedAsync(
|
public async Task<string?> OnChildFinishedAsync(
|
||||||
|
|||||||
@@ -191,4 +191,64 @@ public sealed class PlanningChainCoordinatorTests : IDisposable
|
|||||||
await Assert.ThrowsAsync<InvalidOperationException>(
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
() => _sut.SetupChainAsync("P", default));
|
() => _sut.SetupChainAsync("P", default));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SetupChain_AcceptsPartiallyQueuedChildren_IsIdempotent()
|
||||||
|
{
|
||||||
|
// Mirrors the BoxDataReader scenario: chain was partially set up earlier,
|
||||||
|
// user re-runs "Queue subtasks sequentially" — should re-establish the chain
|
||||||
|
// without throwing.
|
||||||
|
await SeedPlanningFamilyAsync("P", 4);
|
||||||
|
await using (var ctx = _factory.CreateDbContext())
|
||||||
|
{
|
||||||
|
// Pre-state: c0,c1 Idle; c2,c3 already Queued+blocked.
|
||||||
|
var c2 = await ctx.Tasks.FirstAsync(t => t.Id == "P-c2");
|
||||||
|
c2.Status = TaskStatus.Queued;
|
||||||
|
c2.BlockedByTaskId = "P-c1";
|
||||||
|
var c3 = await ctx.Tasks.FirstAsync(t => t.Id == "P-c3");
|
||||||
|
c3.Status = TaskStatus.Queued;
|
||||||
|
c3.BlockedByTaskId = "P-c2";
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
var count = await _sut.SetupChainAsync("P", default);
|
||||||
|
|
||||||
|
Assert.Equal(4, count);
|
||||||
|
var kids = await GetChildrenAsync("P");
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[0].Status);
|
||||||
|
Assert.Null(kids[0].BlockedByTaskId);
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[1].Status);
|
||||||
|
Assert.Equal("P-c0", kids[1].BlockedByTaskId);
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[2].Status);
|
||||||
|
Assert.Equal("P-c1", kids[2].BlockedByTaskId);
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[3].Status);
|
||||||
|
Assert.Equal("P-c2", kids[3].BlockedByTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SetupChain_SkipsTerminalChildren_DoesNotResurrectThem()
|
||||||
|
{
|
||||||
|
await SeedPlanningFamilyAsync("P", 4);
|
||||||
|
await using (var ctx = _factory.CreateDbContext())
|
||||||
|
{
|
||||||
|
var c0 = await ctx.Tasks.FirstAsync(t => t.Id == "P-c0");
|
||||||
|
c0.Status = TaskStatus.Done;
|
||||||
|
var c1 = await ctx.Tasks.FirstAsync(t => t.Id == "P-c1");
|
||||||
|
c1.Status = TaskStatus.Failed;
|
||||||
|
await ctx.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
var count = await _sut.SetupChainAsync("P", default);
|
||||||
|
|
||||||
|
// Only the two non-terminal tail children get chained.
|
||||||
|
Assert.Equal(2, count);
|
||||||
|
var kids = await GetChildrenAsync("P");
|
||||||
|
Assert.Equal(TaskStatus.Done, kids[0].Status);
|
||||||
|
Assert.Equal(TaskStatus.Failed, kids[1].Status);
|
||||||
|
// First non-terminal becomes the chain head (unblocked).
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[2].Status);
|
||||||
|
Assert.Null(kids[2].BlockedByTaskId);
|
||||||
|
Assert.Equal(TaskStatus.Queued, kids[3].Status);
|
||||||
|
Assert.Equal("P-c2", kids[3].BlockedByTaskId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user