Files
ClaudeDo/src/ClaudeDo.Worker/Hub/WorkerHub.cs
2026-05-19 11:08:52 +02:00

529 lines
20 KiB
C#

using System.Reflection;
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Agents;
using ClaudeDo.Worker.Lifecycle;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Prime;
using ClaudeDo.Worker.Queue;
using ClaudeDo.Worker.State;
using ClaudeDo.Worker.Worktrees;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
namespace ClaudeDo.Worker.Hub;
public record ActiveTaskDto(string Slot, string TaskId, DateTime StartedAt);
public record AppSettingsDto(
string DefaultClaudeInstructions,
string DefaultModel,
int DefaultMaxTurns,
string DefaultPermissionMode,
string WorktreeStrategy,
string? CentralWorktreeRoot,
bool WorktreeAutoCleanupEnabled,
int WorktreeAutoCleanupDays);
public record WorktreeCleanupDto(int Removed);
public record WorktreeResetDto(int Removed, int TasksAffected, bool Blocked, int RunningTasks);
public record WorktreeOverviewDto(
string TaskId,
string TaskTitle,
ClaudeDo.Data.Models.TaskStatus TaskStatus,
string ListId,
string ListName,
string Path,
string BranchName,
string BaseCommit,
WorktreeState State,
string? DiffStat,
DateTime CreatedAt,
bool PathExistsOnDisk);
public record ForceRemoveResultDto(bool Removed, string? Reason);
public record MergeResultDto(string Status, IReadOnlyList<string> ConflictFiles, string? ErrorMessage);
public record MergeTargetsDto(string DefaultBranch, IReadOnlyList<string> LocalBranches);
public record UpdateListDto(string Id, string Name, string? WorkingDir, string DefaultCommitType);
public record UpdateListConfigDto(string ListId, string? Model, string? SystemPrompt, string? AgentPath);
public record UpdateTaskAgentSettingsDto(string TaskId, string? Model, string? SystemPrompt, string? AgentPath);
public record ListConfigDto(string? Model, string? SystemPrompt, string? AgentPath);
public record SeedResultDto(int Copied, int Skipped);
public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
{
private static readonly string Version =
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
private readonly QueueService _queue;
private readonly IQueueWaker _waker;
private readonly AgentFileService _agentService;
private readonly DefaultAgentSeeder _seeder;
private readonly HubBroadcaster _broadcaster;
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly WorktreeMaintenanceService _wtMaintenance;
private readonly TaskResetService _resetService;
private readonly TaskMergeService _mergeService;
private readonly PlanningSessionManager _planning;
private readonly IPlanningTerminalLauncher _launcher;
private readonly PlanningAggregator _planningAggregator;
private readonly PlanningMergeOrchestrator _planningMergeOrchestrator;
private readonly PlanningChainCoordinator _planningChain;
private readonly IPrimeScheduleSignal _primeSignal;
private readonly ITaskStateService _state;
public WorkerHub(
QueueService queue,
IQueueWaker waker,
AgentFileService agentService,
DefaultAgentSeeder seeder,
HubBroadcaster broadcaster,
IDbContextFactory<ClaudeDoDbContext> dbFactory,
WorktreeMaintenanceService wtMaintenance,
TaskResetService resetService,
TaskMergeService mergeService,
PlanningSessionManager planning,
IPlanningTerminalLauncher launcher,
PlanningAggregator planningAggregator,
PlanningMergeOrchestrator planningMergeOrchestrator,
PlanningChainCoordinator planningChain,
IPrimeScheduleSignal primeSignal,
ITaskStateService state)
{
_queue = queue;
_waker = waker;
_agentService = agentService;
_seeder = seeder;
_broadcaster = broadcaster;
_dbFactory = dbFactory;
_wtMaintenance = wtMaintenance;
_resetService = resetService;
_mergeService = mergeService;
_planning = planning;
_launcher = launcher;
_planningAggregator = planningAggregator;
_planningMergeOrchestrator = planningMergeOrchestrator;
_planningChain = planningChain;
_primeSignal = primeSignal;
_state = state;
}
public async Task QueuePlanningSubtasksAsync(string parentTaskId)
{
try
{
await _planningChain.SetupChainAsync(parentTaskId, Context.ConnectionAborted);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
await using var ctx = await _dbFactory.CreateDbContextAsync();
var childIds = await ctx.Tasks
.Where(t => t.ParentTaskId == parentTaskId)
.Select(t => t.Id)
.ToListAsync();
await _broadcaster.TaskUpdated(parentTaskId);
foreach (var id in childIds)
await _broadcaster.TaskUpdated(id);
}
public string Ping() => $"pong v{Version}";
public IReadOnlyList<ActiveTaskDto> GetActive()
{
return _queue.GetActive()
.Select(a => new ActiveTaskDto(a.slot, a.taskId, a.startedAt))
.ToList();
}
public async Task RunNow(string taskId)
{
try
{
await _queue.RunNow(taskId);
}
catch (InvalidOperationException)
{
throw new HubException("override slot busy");
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public async Task<string> ContinueTask(string taskId, string followUpPrompt)
{
try
{
return await _queue.ContinueTask(taskId, followUpPrompt);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public async Task ResetTask(string taskId)
{
try
{
await _resetService.ResetAsync(taskId, CancellationToken.None);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
public void WakeQueue() => _waker.Wake();
public async Task<List<AgentInfo>> GetAgents() => await _agentService.ScanAsync();
public async Task RefreshAgents() => await _agentService.ScanAsync();
public async Task<SeedResultDto> RestoreDefaultAgents()
{
var result = await _seeder.SeedMissingAsync();
return new SeedResultDto(result.Copied, result.Skipped);
}
public async Task<AppSettingsDto> GetAppSettings()
{
using var ctx = _dbFactory.CreateDbContext();
var row = await new AppSettingsRepository(ctx).GetAsync();
return new AppSettingsDto(
row.DefaultClaudeInstructions,
row.DefaultModel,
row.DefaultMaxTurns,
row.DefaultPermissionMode,
row.WorktreeStrategy,
row.CentralWorktreeRoot,
row.WorktreeAutoCleanupEnabled,
row.WorktreeAutoCleanupDays);
}
public async Task UpdateAppSettings(AppSettingsDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new AppSettingsRepository(ctx);
await repo.UpdateAsync(new AppSettingsEntity
{
Id = AppSettingsEntity.SingletonId,
DefaultClaudeInstructions = dto.DefaultClaudeInstructions ?? "",
DefaultModel = dto.DefaultModel ?? ModelRegistry.DefaultAlias,
DefaultMaxTurns = dto.DefaultMaxTurns,
DefaultPermissionMode = dto.DefaultPermissionMode ?? PermissionModeRegistry.DefaultMode,
WorktreeStrategy = dto.WorktreeStrategy ?? "sibling",
CentralWorktreeRoot = dto.CentralWorktreeRoot,
WorktreeAutoCleanupEnabled = dto.WorktreeAutoCleanupEnabled,
WorktreeAutoCleanupDays = dto.WorktreeAutoCleanupDays,
});
}
public async Task<WorktreeCleanupDto> CleanupFinishedWorktrees(string? listId = null)
{
var result = await _wtMaintenance.CleanupFinishedAsync(listId, Context.ConnectionAborted);
return new WorktreeCleanupDto(result.Removed);
}
public async Task<WorktreeResetDto> ResetAllWorktrees()
{
var result = await _wtMaintenance.ResetAllAsync();
return new WorktreeResetDto(result.Removed, result.TasksAffected, result.Blocked, result.RunningTasks);
}
public async Task<List<WorktreeOverviewDto>> GetWorktreesOverview(string? listId)
{
var rows = await _wtMaintenance.GetOverviewAsync(listId, Context.ConnectionAborted);
return rows.Select(r => new WorktreeOverviewDto(
r.TaskId, r.TaskTitle, r.TaskStatus, r.ListId, r.ListName,
r.Path, r.BranchName, r.BaseCommit, r.State, r.DiffStat, r.CreatedAt, r.PathExistsOnDisk)).ToList();
}
public async Task<bool> SetWorktreeState(string taskId, WorktreeState newState)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new WorktreeRepository(ctx);
var existing = await repo.GetByTaskIdAsync(taskId, Context.ConnectionAborted);
if (existing is null) throw new HubException("worktree not found");
await repo.SetStateAsync(taskId, newState, Context.ConnectionAborted);
await _broadcaster.WorktreeUpdated(taskId);
return true;
}
public async Task<ForceRemoveResultDto> ForceRemoveWorktree(string taskId)
{
var result = await _wtMaintenance.ForceRemoveAsync(taskId, Context.ConnectionAborted);
if (result.Removed)
await _broadcaster.WorktreeUpdated(taskId);
return new ForceRemoveResultDto(result.Removed, result.Reason);
}
public async Task<MergeResultDto> MergeTask(
string taskId, string targetBranch, bool removeWorktree, string commitMessage)
{
try
{
var r = await _mergeService.MergeAsync(
taskId,
targetBranch ?? "",
removeWorktree,
string.IsNullOrWhiteSpace(commitMessage) ? "Merge task" : commitMessage,
CancellationToken.None);
return new MergeResultDto(r.Status, r.ConflictFiles, r.ErrorMessage);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
}
public async Task<MergeTargetsDto> GetMergeTargets(string taskId)
{
try
{
var t = await _mergeService.GetTargetsAsync(taskId, CancellationToken.None);
return new MergeTargetsDto(t.DefaultBranch, t.LocalBranches);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
}
public async Task UpdateList(UpdateListDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var entity = await repo.GetByIdAsync(dto.Id);
if (entity is null) throw new HubException("list not found");
entity.Name = dto.Name;
entity.WorkingDir = string.IsNullOrWhiteSpace(dto.WorkingDir) ? null : dto.WorkingDir;
entity.DefaultCommitType = string.IsNullOrWhiteSpace(dto.DefaultCommitType) ? CommitTypeRegistry.DefaultType : dto.DefaultCommitType;
await repo.UpdateAsync(entity);
await _broadcaster.ListUpdated(dto.Id);
}
public async Task UpdateListConfig(UpdateListConfigDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var model = Nullify(dto.Model);
var systemPrompt = Nullify(dto.SystemPrompt);
var agentPath = Nullify(dto.AgentPath);
if (model is null && systemPrompt is null && agentPath is null)
{
await repo.DeleteConfigAsync(dto.ListId);
}
else
{
await repo.SetConfigAsync(new ListConfigEntity
{
ListId = dto.ListId,
Model = model,
SystemPrompt = systemPrompt,
AgentPath = agentPath,
});
}
await _broadcaster.ListUpdated(dto.ListId);
}
public async Task<ListConfigDto?> GetListConfig(string listId)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var config = await repo.GetConfigAsync(listId);
if (config is null) return null;
return new ListConfigDto(config.Model, config.SystemPrompt, config.AgentPath);
}
public async Task SetTaskStatus(string taskId, string status)
{
if (!Enum.TryParse<TaskStatus>(status, ignoreCase: true, out var parsed))
throw new HubException($"unknown status: {status}");
var result = await _state.ForceSetStatusAsync(taskId, parsed, Context.ConnectionAborted);
if (!result.Ok) throw new HubException(result.Reason ?? "set status failed");
}
public async Task UpdateTaskAgentSettings(UpdateTaskAgentSettingsDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new TaskRepository(ctx);
await repo.UpdateAgentSettingsAsync(
dto.TaskId,
Nullify(dto.Model),
Nullify(dto.SystemPrompt),
Nullify(dto.AgentPath));
await _broadcaster.TaskUpdated(dto.TaskId);
}
public async Task<PlanningSessionStartContext> StartPlanningSessionAsync(string taskId)
{
var ctx = await _planning.StartAsync(taskId, Context.ConnectionAborted);
try
{
await _launcher.LaunchStartAsync(ctx, Context.ConnectionAborted);
}
catch (PlanningLaunchException)
{
// Launch failed before any children could be created; force-cleanup is safe.
await _planning.DiscardAsync(taskId, dequeueQueuedChildren: true, Context.ConnectionAborted);
throw;
}
await Clients.All.SendAsync("TaskUpdated", taskId);
return ctx;
}
public async Task<PlanningSessionResumeContext> ResumePlanningSessionAsync(string taskId)
{
var ctx = await _planning.ResumeAsync(taskId, Context.ConnectionAborted);
await _launcher.LaunchResumeAsync(ctx, Context.ConnectionAborted);
return ctx;
}
public async Task OpenInteractiveTerminalAsync(string taskId)
{
var ctx = await _planning.OpenInteractiveAsync(taskId, Context.ConnectionAborted);
await _launcher.LaunchInteractiveAsync(ctx, Context.ConnectionAborted);
}
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false)
{
var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted);
if (outcome.Result == DiscardPlanningResult.Discarded)
await Clients.All.SendAsync("TaskUpdated", taskId);
return outcome;
}
public async Task<int> FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks = true)
{
var count = await _planning.FinalizeAsync(taskId, queueAgentTasks, Context.ConnectionAborted);
await Clients.All.SendAsync("TaskUpdated", taskId);
return count;
}
public Task<int> GetPendingDraftCountAsync(string taskId)
=> _planning.GetPendingDraftCountAsync(taskId, Context.ConnectionAborted);
public async Task<IReadOnlyList<SubtaskDiffDto>> GetPlanningAggregate(string planningTaskId)
{
try
{
var diffs = await _planningAggregator.GetAggregatedDiffAsync(planningTaskId, CancellationToken.None);
return diffs.Select(d => new SubtaskDiffDto(
d.SubtaskId, d.Title, d.BranchName, d.BaseCommit, d.HeadCommit, d.DiffStat, d.UnifiedDiff)).ToList();
}
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task<CombinedDiffResultDto> BuildPlanningIntegrationBranch(string planningTaskId, string targetBranch)
{
try
{
var result = await _planningAggregator.BuildIntegrationBranchAsync(
planningTaskId, targetBranch ?? "", CancellationToken.None);
return result switch
{
CombinedDiffResult.Ok ok => new CombinedDiffResultDto(
true, ok.Value.IntegrationBranch, ok.Value.UnifiedDiff, null, null),
CombinedDiffResult.Failed f => new CombinedDiffResultDto(
false, null, null, f.Value.FirstConflictSubtaskId, f.Value.ConflictedFiles),
_ => throw new InvalidOperationException("unknown result type"),
};
}
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task MergeAllPlanning(string planningTaskId, string targetBranch)
{
try { await _planningMergeOrchestrator.StartAsync(planningTaskId, targetBranch ?? "", CancellationToken.None); }
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task ContinuePlanningMerge(string planningTaskId)
{
try { await _planningMergeOrchestrator.ContinueAsync(planningTaskId, CancellationToken.None); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task AbortPlanningMerge(string planningTaskId)
{
try { await _planningMergeOrchestrator.AbortAsync(planningTaskId, CancellationToken.None); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task<List<PrimeScheduleDto>> ListPrimeSchedules()
{
using var ctx = _dbFactory.CreateDbContext();
var rows = await new PrimeScheduleRepository(ctx).ListAsync();
return rows.Select(e => new PrimeScheduleDto(
e.Id, e.StartDate, e.EndDate, e.TimeOfDay,
e.WorkdaysOnly, e.Enabled, e.LastRunAt, e.PromptOverride)).ToList();
}
public async Task<PrimeScheduleDto> UpsertPrimeSchedule(PrimeScheduleDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new PrimeScheduleRepository(ctx);
var existing = await repo.GetAsync(dto.Id);
var entity = new ClaudeDo.Data.Models.PrimeScheduleEntity
{
Id = dto.Id == Guid.Empty ? Guid.NewGuid() : dto.Id,
StartDate = dto.StartDate,
EndDate = dto.EndDate,
TimeOfDay = dto.TimeOfDay,
WorkdaysOnly = dto.WorkdaysOnly,
Enabled = dto.Enabled,
PromptOverride = dto.PromptOverride,
CreatedAt = existing?.CreatedAt ?? DateTimeOffset.UtcNow,
LastRunAt = existing?.LastRunAt,
};
await repo.UpsertAsync(entity);
_primeSignal.Signal();
return new PrimeScheduleDto(entity.Id, entity.StartDate, entity.EndDate, entity.TimeOfDay,
entity.WorkdaysOnly, entity.Enabled, entity.LastRunAt, entity.PromptOverride);
}
public async Task DeletePrimeSchedule(Guid id)
{
using var ctx = _dbFactory.CreateDbContext();
await new PrimeScheduleRepository(ctx).DeleteAsync(id);
_primeSignal.Signal();
}
private static string? Nullify(string? s) => string.IsNullOrWhiteSpace(s) ? null : s;
}