using System.Reflection; using ClaudeDo.Data; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Agents; using ClaudeDo.Worker.Lifecycle; using ClaudeDo.Worker.Planning; using ClaudeDo.Worker.Prime; using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.State; using ClaudeDo.Worker.Worktrees; using TaskStatus = ClaudeDo.Data.Models.TaskStatus; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; namespace ClaudeDo.Worker.Hub; public record ActiveTaskDto(string Slot, string TaskId, DateTime StartedAt); public record AppSettingsDto( string DefaultClaudeInstructions, string DefaultModel, int DefaultMaxTurns, string DefaultPermissionMode, string WorktreeStrategy, string? CentralWorktreeRoot, bool WorktreeAutoCleanupEnabled, int WorktreeAutoCleanupDays); public record WorktreeCleanupDto(int Removed); public record WorktreeResetDto(int Removed, int TasksAffected, bool Blocked, int RunningTasks); public record MergeResultDto(string Status, IReadOnlyList ConflictFiles, string? ErrorMessage); public record MergeTargetsDto(string DefaultBranch, IReadOnlyList LocalBranches); public record UpdateListDto(string Id, string Name, string? WorkingDir, string DefaultCommitType); public record UpdateListConfigDto(string ListId, string? Model, string? SystemPrompt, string? AgentPath); public record UpdateTaskAgentSettingsDto(string TaskId, string? Model, string? SystemPrompt, string? AgentPath); public record ListConfigDto(string? Model, string? SystemPrompt, string? AgentPath); public record SeedResultDto(int Copied, int Skipped); public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub { private static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0"; private readonly QueueService _queue; private readonly IQueueWaker _waker; private readonly AgentFileService _agentService; private readonly DefaultAgentSeeder _seeder; private readonly HubBroadcaster _broadcaster; private readonly IDbContextFactory _dbFactory; private readonly WorktreeMaintenanceService _wtMaintenance; private readonly TaskResetService _resetService; private readonly TaskMergeService _mergeService; private readonly PlanningSessionManager _planning; private readonly IPlanningTerminalLauncher _launcher; private readonly PlanningAggregator _planningAggregator; private readonly PlanningMergeOrchestrator _planningMergeOrchestrator; private readonly PlanningChainCoordinator _planningChain; private readonly IPrimeScheduleSignal _primeSignal; private readonly ITaskStateService _state; public WorkerHub( QueueService queue, IQueueWaker waker, AgentFileService agentService, DefaultAgentSeeder seeder, HubBroadcaster broadcaster, IDbContextFactory dbFactory, WorktreeMaintenanceService wtMaintenance, TaskResetService resetService, TaskMergeService mergeService, PlanningSessionManager planning, IPlanningTerminalLauncher launcher, PlanningAggregator planningAggregator, PlanningMergeOrchestrator planningMergeOrchestrator, PlanningChainCoordinator planningChain, IPrimeScheduleSignal primeSignal, ITaskStateService state) { _queue = queue; _waker = waker; _agentService = agentService; _seeder = seeder; _broadcaster = broadcaster; _dbFactory = dbFactory; _wtMaintenance = wtMaintenance; _resetService = resetService; _mergeService = mergeService; _planning = planning; _launcher = launcher; _planningAggregator = planningAggregator; _planningMergeOrchestrator = planningMergeOrchestrator; _planningChain = planningChain; _primeSignal = primeSignal; _state = state; } public async Task QueuePlanningSubtasksAsync(string parentTaskId) { try { await _planningChain.SetupChainAsync(parentTaskId, Context.ConnectionAborted); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } await using var ctx = await _dbFactory.CreateDbContextAsync(); var childIds = await ctx.Tasks .Where(t => t.ParentTaskId == parentTaskId) .Select(t => t.Id) .ToListAsync(); await _broadcaster.TaskUpdated(parentTaskId); foreach (var id in childIds) await _broadcaster.TaskUpdated(id); } public string Ping() => $"pong v{Version}"; public IReadOnlyList GetActive() { return _queue.GetActive() .Select(a => new ActiveTaskDto(a.slot, a.taskId, a.startedAt)) .ToList(); } public async Task RunNow(string taskId) { try { await _queue.RunNow(taskId); } catch (InvalidOperationException) { throw new HubException("override slot busy"); } catch (KeyNotFoundException) { throw new HubException("task not found"); } } public async Task ContinueTask(string taskId, string followUpPrompt) { try { return await _queue.ContinueTask(taskId, followUpPrompt); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } catch (KeyNotFoundException) { throw new HubException("task not found"); } } public async Task ResetTask(string taskId) { try { await _resetService.ResetAsync(taskId, CancellationToken.None); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } catch (KeyNotFoundException) { throw new HubException("task not found"); } } public bool CancelTask(string taskId) => _queue.CancelTask(taskId); public void WakeQueue() => _waker.Wake(); public async Task> GetAgents() => await _agentService.ScanAsync(); public async Task RefreshAgents() => await _agentService.ScanAsync(); public async Task RestoreDefaultAgents() { var result = await _seeder.SeedMissingAsync(); return new SeedResultDto(result.Copied, result.Skipped); } public async Task GetAppSettings() { using var ctx = _dbFactory.CreateDbContext(); var row = await new AppSettingsRepository(ctx).GetAsync(); return new AppSettingsDto( row.DefaultClaudeInstructions, row.DefaultModel, row.DefaultMaxTurns, row.DefaultPermissionMode, row.WorktreeStrategy, row.CentralWorktreeRoot, row.WorktreeAutoCleanupEnabled, row.WorktreeAutoCleanupDays); } public async Task UpdateAppSettings(AppSettingsDto dto) { using var ctx = _dbFactory.CreateDbContext(); var repo = new AppSettingsRepository(ctx); await repo.UpdateAsync(new AppSettingsEntity { Id = AppSettingsEntity.SingletonId, DefaultClaudeInstructions = dto.DefaultClaudeInstructions ?? "", DefaultModel = dto.DefaultModel ?? ModelRegistry.DefaultAlias, DefaultMaxTurns = dto.DefaultMaxTurns, DefaultPermissionMode = dto.DefaultPermissionMode ?? PermissionModeRegistry.DefaultMode, WorktreeStrategy = dto.WorktreeStrategy ?? "sibling", CentralWorktreeRoot = dto.CentralWorktreeRoot, WorktreeAutoCleanupEnabled = dto.WorktreeAutoCleanupEnabled, WorktreeAutoCleanupDays = dto.WorktreeAutoCleanupDays, }); } public async Task CleanupFinishedWorktrees() { var result = await _wtMaintenance.CleanupFinishedAsync(); return new WorktreeCleanupDto(result.Removed); } public async Task ResetAllWorktrees() { var result = await _wtMaintenance.ResetAllAsync(); return new WorktreeResetDto(result.Removed, result.TasksAffected, result.Blocked, result.RunningTasks); } public async Task MergeTask( string taskId, string targetBranch, bool removeWorktree, string commitMessage) { try { var r = await _mergeService.MergeAsync( taskId, targetBranch ?? "", removeWorktree, string.IsNullOrWhiteSpace(commitMessage) ? "Merge task" : commitMessage, CancellationToken.None); return new MergeResultDto(r.Status, r.ConflictFiles, r.ErrorMessage); } catch (KeyNotFoundException) { throw new HubException("task not found"); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task GetMergeTargets(string taskId) { try { var t = await _mergeService.GetTargetsAsync(taskId, CancellationToken.None); return new MergeTargetsDto(t.DefaultBranch, t.LocalBranches); } catch (KeyNotFoundException) { throw new HubException("task not found"); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task UpdateList(UpdateListDto dto) { using var ctx = _dbFactory.CreateDbContext(); var repo = new ListRepository(ctx); var entity = await repo.GetByIdAsync(dto.Id); if (entity is null) throw new HubException("list not found"); entity.Name = dto.Name; entity.WorkingDir = string.IsNullOrWhiteSpace(dto.WorkingDir) ? null : dto.WorkingDir; entity.DefaultCommitType = string.IsNullOrWhiteSpace(dto.DefaultCommitType) ? "chore" : dto.DefaultCommitType; await repo.UpdateAsync(entity); await _broadcaster.ListUpdated(dto.Id); } public async Task UpdateListConfig(UpdateListConfigDto dto) { using var ctx = _dbFactory.CreateDbContext(); var repo = new ListRepository(ctx); var model = Nullify(dto.Model); var systemPrompt = Nullify(dto.SystemPrompt); var agentPath = Nullify(dto.AgentPath); if (model is null && systemPrompt is null && agentPath is null) { await repo.DeleteConfigAsync(dto.ListId); } else { await repo.SetConfigAsync(new ListConfigEntity { ListId = dto.ListId, Model = model, SystemPrompt = systemPrompt, AgentPath = agentPath, }); } await _broadcaster.ListUpdated(dto.ListId); } public async Task GetListConfig(string listId) { using var ctx = _dbFactory.CreateDbContext(); var repo = new ListRepository(ctx); var config = await repo.GetConfigAsync(listId); if (config is null) return null; return new ListConfigDto(config.Model, config.SystemPrompt, config.AgentPath); } public async Task SetTaskStatus(string taskId, string status) { if (!Enum.TryParse(status, ignoreCase: true, out var parsed)) throw new HubException($"unknown status: {status}"); var result = await _state.ForceSetStatusAsync(taskId, parsed, Context.ConnectionAborted); if (!result.Ok) throw new HubException(result.Reason ?? "set status failed"); } public async Task UpdateTaskAgentSettings(UpdateTaskAgentSettingsDto dto) { using var ctx = _dbFactory.CreateDbContext(); var repo = new TaskRepository(ctx); await repo.UpdateAgentSettingsAsync( dto.TaskId, Nullify(dto.Model), Nullify(dto.SystemPrompt), Nullify(dto.AgentPath)); await _broadcaster.TaskUpdated(dto.TaskId); } public async Task StartPlanningSessionAsync(string taskId) { var ctx = await _planning.StartAsync(taskId, Context.ConnectionAborted); try { await _launcher.LaunchStartAsync(ctx, Context.ConnectionAborted); } catch (PlanningLaunchException) { // Launch failed before any children could be created; force-cleanup is safe. await _planning.DiscardAsync(taskId, dequeueQueuedChildren: true, Context.ConnectionAborted); throw; } await Clients.All.SendAsync("TaskUpdated", taskId); return ctx; } public async Task ResumePlanningSessionAsync(string taskId) { var ctx = await _planning.ResumeAsync(taskId, Context.ConnectionAborted); await _launcher.LaunchResumeAsync(ctx, Context.ConnectionAborted); return ctx; } public async Task OpenInteractiveTerminalAsync(string taskId) { var ctx = await _planning.OpenInteractiveAsync(taskId, Context.ConnectionAborted); await _launcher.LaunchInteractiveAsync(ctx, Context.ConnectionAborted); } public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false) { var outcome = await _planning.DiscardAsync(taskId, dequeueQueuedChildren, Context.ConnectionAborted); if (outcome.Result == DiscardPlanningResult.Discarded) await Clients.All.SendAsync("TaskUpdated", taskId); return outcome; } public async Task FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks = true) { var count = await _planning.FinalizeAsync(taskId, queueAgentTasks, Context.ConnectionAborted); await Clients.All.SendAsync("TaskUpdated", taskId); return count; } public Task GetPendingDraftCountAsync(string taskId) => _planning.GetPendingDraftCountAsync(taskId, Context.ConnectionAborted); public async Task> GetPlanningAggregate(string planningTaskId) { try { var diffs = await _planningAggregator.GetAggregatedDiffAsync(planningTaskId, CancellationToken.None); return diffs.Select(d => new SubtaskDiffDto( d.SubtaskId, d.Title, d.BranchName, d.BaseCommit, d.HeadCommit, d.DiffStat, d.UnifiedDiff)).ToList(); } catch (KeyNotFoundException) { throw new HubException("planning task not found"); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task BuildPlanningIntegrationBranch(string planningTaskId, string targetBranch) { try { var result = await _planningAggregator.BuildIntegrationBranchAsync( planningTaskId, targetBranch ?? "", CancellationToken.None); return result switch { CombinedDiffResult.Ok ok => new CombinedDiffResultDto( true, ok.Value.IntegrationBranch, ok.Value.UnifiedDiff, null, null), CombinedDiffResult.Failed f => new CombinedDiffResultDto( false, null, null, f.Value.FirstConflictSubtaskId, f.Value.ConflictedFiles), _ => throw new InvalidOperationException("unknown result type"), }; } catch (KeyNotFoundException) { throw new HubException("planning task not found"); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task MergeAllPlanning(string planningTaskId, string targetBranch) { try { await _planningMergeOrchestrator.StartAsync(planningTaskId, targetBranch ?? "", CancellationToken.None); } catch (KeyNotFoundException) { throw new HubException("planning task not found"); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task ContinuePlanningMerge(string planningTaskId) { try { await _planningMergeOrchestrator.ContinueAsync(planningTaskId, CancellationToken.None); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task AbortPlanningMerge(string planningTaskId) { try { await _planningMergeOrchestrator.AbortAsync(planningTaskId, CancellationToken.None); } catch (InvalidOperationException ex) { throw new HubException(ex.Message); } } public async Task> ListPrimeSchedules() { using var ctx = _dbFactory.CreateDbContext(); var rows = await new PrimeScheduleRepository(ctx).ListAsync(); return rows.Select(e => new PrimeScheduleDto( e.Id, e.StartDate, e.EndDate, e.TimeOfDay, e.WorkdaysOnly, e.Enabled, e.LastRunAt, e.PromptOverride)).ToList(); } public async Task UpsertPrimeSchedule(PrimeScheduleDto dto) { using var ctx = _dbFactory.CreateDbContext(); var repo = new PrimeScheduleRepository(ctx); var existing = await repo.GetAsync(dto.Id); var entity = new ClaudeDo.Data.Models.PrimeScheduleEntity { Id = dto.Id == Guid.Empty ? Guid.NewGuid() : dto.Id, StartDate = dto.StartDate, EndDate = dto.EndDate, TimeOfDay = dto.TimeOfDay, WorkdaysOnly = dto.WorkdaysOnly, Enabled = dto.Enabled, PromptOverride = dto.PromptOverride, CreatedAt = existing?.CreatedAt ?? DateTimeOffset.UtcNow, LastRunAt = existing?.LastRunAt, }; await repo.UpsertAsync(entity); _primeSignal.Signal(); return new PrimeScheduleDto(entity.Id, entity.StartDate, entity.EndDate, entity.TimeOfDay, entity.WorkdaysOnly, entity.Enabled, entity.LastRunAt, entity.PromptOverride); } public async Task DeletePrimeSchedule(Guid id) { using var ctx = _dbFactory.CreateDbContext(); await new PrimeScheduleRepository(ctx).DeleteAsync(id); _primeSignal.Signal(); } private static string? Nullify(string? s) => string.IsNullOrWhiteSpace(s) ? null : s; }