feat(worker): route standalone success to review and resume on re-queue
Standalone tasks now enter WaitingForReview on success; re-queued tasks carrying reviewer feedback resume the prior Claude session with that feedback as the next turn. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ using ClaudeDo.Data.Models;
|
|||||||
using ClaudeDo.Data.Repositories;
|
using ClaudeDo.Data.Repositories;
|
||||||
using ClaudeDo.Worker.Config;
|
using ClaudeDo.Worker.Config;
|
||||||
using ClaudeDo.Worker.Runner;
|
using ClaudeDo.Worker.Runner;
|
||||||
|
using ClaudeDo.Worker.State;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace ClaudeDo.Worker.Queue;
|
namespace ClaudeDo.Worker.Queue;
|
||||||
@@ -16,6 +17,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
private readonly QueueWaker _waker;
|
private readonly QueueWaker _waker;
|
||||||
private readonly IQueuePicker _picker;
|
private readonly IQueuePicker _picker;
|
||||||
private readonly OverrideSlotService _override;
|
private readonly OverrideSlotService _override;
|
||||||
|
private readonly ITaskStateService _state;
|
||||||
|
|
||||||
private readonly object _lock = new();
|
private readonly object _lock = new();
|
||||||
private readonly Dictionary<string, QueueSlotState> _queueSlots = new();
|
private readonly Dictionary<string, QueueSlotState> _queueSlots = new();
|
||||||
@@ -27,7 +29,8 @@ public sealed class QueueService : BackgroundService
|
|||||||
ILogger<QueueService> logger,
|
ILogger<QueueService> logger,
|
||||||
QueueWaker waker,
|
QueueWaker waker,
|
||||||
IQueuePicker picker,
|
IQueuePicker picker,
|
||||||
OverrideSlotService overrideSlot)
|
OverrideSlotService overrideSlot,
|
||||||
|
ITaskStateService state)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
_runner = runner;
|
_runner = runner;
|
||||||
@@ -36,6 +39,7 @@ public sealed class QueueService : BackgroundService
|
|||||||
_waker = waker;
|
_waker = waker;
|
||||||
_picker = picker;
|
_picker = picker;
|
||||||
_override = overrideSlot;
|
_override = overrideSlot;
|
||||||
|
_state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
public IReadOnlyList<(string slot, string taskId, DateTime startedAt)> GetActive()
|
||||||
@@ -174,6 +178,29 @@ public sealed class QueueService : BackgroundService
|
|||||||
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A task re-queued from review carries reviewer feedback. Resume the prior
|
||||||
|
// Claude session with that feedback as the next turn when a session exists;
|
||||||
|
// otherwise fall back to a fresh run with the feedback folded into the prompt.
|
||||||
|
if (!string.IsNullOrWhiteSpace(task.ReviewFeedback))
|
||||||
|
{
|
||||||
|
var feedback = task.ReviewFeedback!;
|
||||||
|
string? sessionId;
|
||||||
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
|
sessionId = (await new TaskRunRepository(context).GetLatestByTaskIdAsync(taskId, ct))?.SessionId;
|
||||||
|
|
||||||
|
await _state.ClearReviewFeedbackAsync(taskId, ct);
|
||||||
|
|
||||||
|
if (sessionId is not null)
|
||||||
|
{
|
||||||
|
await _runner.ContinueAsync(taskId, feedback, "queue", ct);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
task.Description = string.IsNullOrWhiteSpace(task.Description)
|
||||||
|
? $"Reviewer feedback: {feedback}"
|
||||||
|
: $"{task.Description}\n\nReviewer feedback: {feedback}";
|
||||||
|
}
|
||||||
|
|
||||||
await _runner.RunAsync(task, "queue", ct);
|
await _runner.RunAsync(task, "queue", ct);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
|
|||||||
@@ -322,10 +322,21 @@ public sealed class TaskRunner
|
|||||||
// Terminal DB write uses CancellationToken.None so the task status
|
// Terminal DB write uses CancellationToken.None so the task status
|
||||||
// is never left as 'running' because of a cancel that arrived
|
// is never left as 'running' because of a cancel that arrived
|
||||||
// after the Claude run already succeeded.
|
// after the Claude run already succeeded.
|
||||||
|
// Standalone tasks gate on review; planning children go straight to Done
|
||||||
|
// so the sequential chain (which advances on terminal states) is unaffected.
|
||||||
var finishedAt = DateTime.UtcNow;
|
var finishedAt = DateTime.UtcNow;
|
||||||
|
if (task.ParentTaskId is null)
|
||||||
|
{
|
||||||
|
await _state.SubmitForReviewAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
|
||||||
|
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (waiting for review)", WorkerLogLevel.Success, DateTime.UtcNow);
|
||||||
|
await _broadcaster.TaskFinished(slot, task.Id, "waiting_for_review", finishedAt);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
await _state.CompleteAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
|
await _state.CompleteAsync(task.Id, finishedAt, result.ResultMarkdown, CancellationToken.None);
|
||||||
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow);
|
await _broadcaster.WorkerLog($"Finished \"{task.Title}\" (done)", WorkerLogLevel.Success, DateTime.UtcNow);
|
||||||
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
|
await _broadcaster.TaskFinished(slot, task.Id, "done", finishedAt);
|
||||||
|
}
|
||||||
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
|
_logger.LogInformation("Task {TaskId} completed (turns={Turns}, tokens_in={In}, tokens_out={Out})",
|
||||||
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
|
task.Id, result.TurnCount, result.TokensIn, result.TokensOut);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -117,12 +117,13 @@ public sealed class ExternalMcpServiceTests : IDisposable
|
|||||||
var dbFactory = _db.CreateFactory();
|
var dbFactory = _db.CreateFactory();
|
||||||
var wtManager = new WorktreeManager(new GitService(), dbFactory, cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new GitService(), dbFactory, cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
|
var state = TaskStateServiceBuilder.Build(dbFactory).State;
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, state);
|
||||||
var waker = new ClaudeDo.Worker.Queue.QueueWaker();
|
var waker = new ClaudeDo.Worker.Queue.QueueWaker();
|
||||||
var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory);
|
var picker = new ClaudeDo.Worker.Queue.QueuePicker(dbFactory);
|
||||||
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance, waker, picker, overrideSlot);
|
return new QueueService(dbFactory, runner, cfg, NullLogger<QueueService>.Instance, waker, picker, overrideSlot, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -53,12 +53,13 @@ public sealed class QueueServiceSlotGuardTests : IDisposable
|
|||||||
var dbFactory = _db.CreateFactory();
|
var dbFactory = _db.CreateFactory();
|
||||||
var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new ClaudeDo.Data.Git.GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
|
var state = TaskStateServiceBuilder.Build(dbFactory).State;
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, state);
|
||||||
_waker = new QueueWaker();
|
_waker = new QueueWaker();
|
||||||
var picker = new QueuePicker(dbFactory);
|
var picker = new QueuePicker(dbFactory);
|
||||||
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot);
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot, state);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -54,12 +54,13 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
var dbFactory = _db.CreateFactory();
|
var dbFactory = _db.CreateFactory();
|
||||||
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
var wtManager = new WorktreeManager(new GitService(), dbFactory, _cfg, NullLogger<WorktreeManager>.Instance);
|
||||||
var argsBuilder = new ClaudeArgsBuilder();
|
var argsBuilder = new ClaudeArgsBuilder();
|
||||||
|
var state = TaskStateServiceBuilder.Build(dbFactory).State;
|
||||||
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
var runner = new TaskRunner(fake, dbFactory, broadcaster, wtManager, argsBuilder, _cfg,
|
||||||
NullLogger<TaskRunner>.Instance, TaskStateServiceBuilder.Build(dbFactory).State);
|
NullLogger<TaskRunner>.Instance, state);
|
||||||
_waker = new QueueWaker();
|
_waker = new QueueWaker();
|
||||||
var picker = new QueuePicker(dbFactory);
|
var picker = new QueuePicker(dbFactory);
|
||||||
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
var overrideSlot = new OverrideSlotService(dbFactory, runner, NullLogger<OverrideSlotService>.Instance);
|
||||||
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot);
|
var service = new QueueService(dbFactory, runner, _cfg, NullLogger<QueueService>.Instance, _waker, picker, overrideSlot, state);
|
||||||
return (service, fake);
|
return (service, fake);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +113,58 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
await Assert.ThrowsAsync<KeyNotFoundException>(() => service.RunNow("nonexistent"));
|
await Assert.ThrowsAsync<KeyNotFoundException>(() => service.RunNow("nonexistent"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReQueuedReviewTask_ResumesSession_WithFeedbackPrompt_AndClearsFeedback()
|
||||||
|
{
|
||||||
|
var (listId, _) = await SeedListWithAgentTag();
|
||||||
|
|
||||||
|
string? capturedArgs = null;
|
||||||
|
string? capturedPrompt = null;
|
||||||
|
var done = new TaskCompletionSource();
|
||||||
|
|
||||||
|
var (service, _) = CreateService((prompt, _, args, _, _) =>
|
||||||
|
{
|
||||||
|
capturedPrompt = prompt;
|
||||||
|
capturedArgs = args;
|
||||||
|
done.TrySetResult();
|
||||||
|
return Task.FromResult(new RunResult { ExitCode = 0, SessionId = "sess-2", ResultMarkdown = "ok" });
|
||||||
|
});
|
||||||
|
|
||||||
|
// A task that was reviewed and rejected: Queued + ReviewFeedback, with a prior run carrying a session id.
|
||||||
|
var task = new TaskEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
ListId = listId,
|
||||||
|
Title = "Reviewed task",
|
||||||
|
Status = TaskStatus.Queued,
|
||||||
|
ReviewFeedback = "fix the bug",
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
};
|
||||||
|
await _taskRepo.AddAsync(task);
|
||||||
|
await new TaskRunRepository(_ctx).AddAsync(new TaskRunEntity
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString(),
|
||||||
|
TaskId = task.Id,
|
||||||
|
RunNumber = 1,
|
||||||
|
IsRetry = false,
|
||||||
|
Prompt = "original",
|
||||||
|
SessionId = "sess-1",
|
||||||
|
StartedAt = DateTime.UtcNow.AddMinutes(-1),
|
||||||
|
});
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
await service.StartAsync(cts.Token);
|
||||||
|
_waker.Wake();
|
||||||
|
await done.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
cts.Cancel();
|
||||||
|
|
||||||
|
Assert.Contains("--resume sess-1", capturedArgs);
|
||||||
|
Assert.Equal("fix the bug", capturedPrompt);
|
||||||
|
|
||||||
|
var reloaded = await _taskRepo.GetByIdAsync(task.Id);
|
||||||
|
Assert.Null(reloaded!.ReviewFeedback);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Schedule_Filter_Skips_Future_Tasks()
|
public async Task Schedule_Filter_Skips_Future_Tasks()
|
||||||
{
|
{
|
||||||
@@ -254,7 +307,8 @@ public sealed class QueueServiceTests : IDisposable
|
|||||||
|
|
||||||
var finalTask = await _taskRepo.GetByIdAsync(task.Id);
|
var finalTask = await _taskRepo.GetByIdAsync(task.Id);
|
||||||
Assert.NotNull(finalTask);
|
Assert.NotNull(finalTask);
|
||||||
Assert.Equal(TaskStatus.Done, finalTask.Status);
|
// A standalone task that completes successfully now gates on review.
|
||||||
|
Assert.Equal(TaskStatus.WaitingForReview, finalTask.Status);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
Reference in New Issue
Block a user