Files
ClaudeDo/src/ClaudeDo.Worker/Hub/WorkerHub.cs
2026-04-24 18:28:38 +02:00

391 lines
14 KiB
C#

using System.Reflection;
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Services;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
namespace ClaudeDo.Worker.Hub;
public record ActiveTaskDto(string Slot, string TaskId, DateTime StartedAt);
public record AppSettingsDto(
string DefaultClaudeInstructions,
string DefaultModel,
int DefaultMaxTurns,
string DefaultPermissionMode,
string WorktreeStrategy,
string? CentralWorktreeRoot,
bool WorktreeAutoCleanupEnabled,
int WorktreeAutoCleanupDays);
public record WorktreeCleanupDto(int Removed);
public record WorktreeResetDto(int Removed, int TasksAffected, bool Blocked, int RunningTasks);
public record MergeResultDto(string Status, IReadOnlyList<string> ConflictFiles, string? ErrorMessage);
public record MergeTargetsDto(string DefaultBranch, IReadOnlyList<string> LocalBranches);
public record UpdateListDto(string Id, string Name, string? WorkingDir, string DefaultCommitType);
public record UpdateListConfigDto(string ListId, string? Model, string? SystemPrompt, string? AgentPath);
public record UpdateTaskAgentSettingsDto(string TaskId, string? Model, string? SystemPrompt, string? AgentPath);
public record ListConfigDto(string? Model, string? SystemPrompt, string? AgentPath);
public record SeedResultDto(int Copied, int Skipped);
public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
{
private static readonly string Version =
Assembly.GetExecutingAssembly().GetName().Version?.ToString(3) ?? "0.0.0";
private readonly QueueService _queue;
private readonly AgentFileService _agentService;
private readonly DefaultAgentSeeder _seeder;
private readonly HubBroadcaster _broadcaster;
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly WorktreeMaintenanceService _wtMaintenance;
private readonly TaskResetService _resetService;
private readonly TaskMergeService _mergeService;
private readonly PlanningSessionManager _planning;
private readonly IPlanningTerminalLauncher _launcher;
private readonly PlanningAggregator _planningAggregator;
private readonly PlanningMergeOrchestrator _planningMergeOrchestrator;
public WorkerHub(
QueueService queue,
AgentFileService agentService,
DefaultAgentSeeder seeder,
HubBroadcaster broadcaster,
IDbContextFactory<ClaudeDoDbContext> dbFactory,
WorktreeMaintenanceService wtMaintenance,
TaskResetService resetService,
TaskMergeService mergeService,
PlanningSessionManager planning,
IPlanningTerminalLauncher launcher,
PlanningAggregator planningAggregator,
PlanningMergeOrchestrator planningMergeOrchestrator)
{
_queue = queue;
_agentService = agentService;
_seeder = seeder;
_broadcaster = broadcaster;
_dbFactory = dbFactory;
_wtMaintenance = wtMaintenance;
_resetService = resetService;
_mergeService = mergeService;
_planning = planning;
_launcher = launcher;
_planningAggregator = planningAggregator;
_planningMergeOrchestrator = planningMergeOrchestrator;
}
public string Ping() => $"pong v{Version}";
public IReadOnlyList<ActiveTaskDto> GetActive()
{
return _queue.GetActive()
.Select(a => new ActiveTaskDto(a.slot, a.taskId, a.startedAt))
.ToList();
}
public async Task RunNow(string taskId)
{
try
{
await _queue.RunNow(taskId);
}
catch (InvalidOperationException)
{
throw new HubException("override slot busy");
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public async Task<string> ContinueTask(string taskId, string followUpPrompt)
{
try
{
return await _queue.ContinueTask(taskId, followUpPrompt);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public async Task ResetTask(string taskId)
{
try
{
await _resetService.ResetAsync(taskId, CancellationToken.None);
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
}
public bool CancelTask(string taskId) => _queue.CancelTask(taskId);
public void WakeQueue() => _queue.WakeQueue();
public async Task<List<AgentInfo>> GetAgents() => await _agentService.ScanAsync();
public async Task RefreshAgents() => await _agentService.ScanAsync();
public async Task<SeedResultDto> RestoreDefaultAgents()
{
var result = await _seeder.SeedMissingAsync();
return new SeedResultDto(result.Copied, result.Skipped);
}
public async Task<AppSettingsDto> GetAppSettings()
{
using var ctx = _dbFactory.CreateDbContext();
var row = await new AppSettingsRepository(ctx).GetAsync();
return new AppSettingsDto(
row.DefaultClaudeInstructions,
row.DefaultModel,
row.DefaultMaxTurns,
row.DefaultPermissionMode,
row.WorktreeStrategy,
row.CentralWorktreeRoot,
row.WorktreeAutoCleanupEnabled,
row.WorktreeAutoCleanupDays);
}
public async Task UpdateAppSettings(AppSettingsDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new AppSettingsRepository(ctx);
await repo.UpdateAsync(new AppSettingsEntity
{
Id = AppSettingsEntity.SingletonId,
DefaultClaudeInstructions = dto.DefaultClaudeInstructions ?? "",
DefaultModel = dto.DefaultModel ?? "sonnet",
DefaultMaxTurns = dto.DefaultMaxTurns,
DefaultPermissionMode = dto.DefaultPermissionMode ?? "bypassPermissions",
WorktreeStrategy = dto.WorktreeStrategy ?? "sibling",
CentralWorktreeRoot = dto.CentralWorktreeRoot,
WorktreeAutoCleanupEnabled = dto.WorktreeAutoCleanupEnabled,
WorktreeAutoCleanupDays = dto.WorktreeAutoCleanupDays,
});
}
public async Task<WorktreeCleanupDto> CleanupFinishedWorktrees()
{
var result = await _wtMaintenance.CleanupFinishedAsync();
return new WorktreeCleanupDto(result.Removed);
}
public async Task<WorktreeResetDto> ResetAllWorktrees()
{
var result = await _wtMaintenance.ResetAllAsync();
return new WorktreeResetDto(result.Removed, result.TasksAffected, result.Blocked, result.RunningTasks);
}
public async Task<MergeResultDto> MergeTask(
string taskId, string targetBranch, bool removeWorktree, string commitMessage)
{
try
{
var r = await _mergeService.MergeAsync(
taskId,
targetBranch ?? "",
removeWorktree,
string.IsNullOrWhiteSpace(commitMessage) ? "Merge task" : commitMessage,
CancellationToken.None);
return new MergeResultDto(r.Status, r.ConflictFiles, r.ErrorMessage);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
}
public async Task<MergeTargetsDto> GetMergeTargets(string taskId)
{
try
{
var t = await _mergeService.GetTargetsAsync(taskId, CancellationToken.None);
return new MergeTargetsDto(t.DefaultBranch, t.LocalBranches);
}
catch (KeyNotFoundException)
{
throw new HubException("task not found");
}
catch (InvalidOperationException ex)
{
throw new HubException(ex.Message);
}
}
public async Task UpdateList(UpdateListDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var entity = await repo.GetByIdAsync(dto.Id);
if (entity is null) throw new HubException("list not found");
entity.Name = dto.Name;
entity.WorkingDir = string.IsNullOrWhiteSpace(dto.WorkingDir) ? null : dto.WorkingDir;
entity.DefaultCommitType = string.IsNullOrWhiteSpace(dto.DefaultCommitType) ? "chore" : dto.DefaultCommitType;
await repo.UpdateAsync(entity);
await _broadcaster.ListUpdated(dto.Id);
}
public async Task UpdateListConfig(UpdateListConfigDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var model = Nullify(dto.Model);
var systemPrompt = Nullify(dto.SystemPrompt);
var agentPath = Nullify(dto.AgentPath);
if (model is null && systemPrompt is null && agentPath is null)
{
await repo.DeleteConfigAsync(dto.ListId);
}
else
{
await repo.SetConfigAsync(new ListConfigEntity
{
ListId = dto.ListId,
Model = model,
SystemPrompt = systemPrompt,
AgentPath = agentPath,
});
}
await _broadcaster.ListUpdated(dto.ListId);
}
public async Task<ListConfigDto?> GetListConfig(string listId)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new ListRepository(ctx);
var config = await repo.GetConfigAsync(listId);
if (config is null) return null;
return new ListConfigDto(config.Model, config.SystemPrompt, config.AgentPath);
}
public async Task UpdateTaskAgentSettings(UpdateTaskAgentSettingsDto dto)
{
using var ctx = _dbFactory.CreateDbContext();
var repo = new TaskRepository(ctx);
await repo.UpdateAgentSettingsAsync(
dto.TaskId,
Nullify(dto.Model),
Nullify(dto.SystemPrompt),
Nullify(dto.AgentPath));
await _broadcaster.TaskUpdated(dto.TaskId);
}
public async Task<PlanningSessionStartContext> StartPlanningSessionAsync(string taskId)
{
var ctx = await _planning.StartAsync(taskId, Context.ConnectionAborted);
try
{
await _launcher.LaunchStartAsync(ctx, Context.ConnectionAborted);
}
catch (PlanningLaunchException)
{
await _planning.DiscardAsync(taskId, Context.ConnectionAborted);
throw;
}
await Clients.All.SendAsync("TaskUpdated", taskId);
return ctx;
}
public async Task<PlanningSessionResumeContext> ResumePlanningSessionAsync(string taskId)
{
var ctx = await _planning.ResumeAsync(taskId, Context.ConnectionAborted);
await _launcher.LaunchResumeAsync(ctx, Context.ConnectionAborted);
return ctx;
}
public async Task DiscardPlanningSessionAsync(string taskId)
{
await _planning.DiscardAsync(taskId, Context.ConnectionAborted);
await Clients.All.SendAsync("TaskUpdated", taskId);
}
public async Task<int> FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks = true)
{
var count = await _planning.FinalizeAsync(taskId, queueAgentTasks, Context.ConnectionAborted);
await Clients.All.SendAsync("TaskUpdated", taskId);
return count;
}
public Task<int> GetPendingDraftCountAsync(string taskId)
=> _planning.GetPendingDraftCountAsync(taskId, Context.ConnectionAborted);
public async Task<IReadOnlyList<SubtaskDiffDto>> GetPlanningAggregate(string planningTaskId)
{
try
{
var diffs = await _planningAggregator.GetAggregatedDiffAsync(planningTaskId, CancellationToken.None);
return diffs.Select(d => new SubtaskDiffDto(
d.SubtaskId, d.Title, d.BranchName, d.BaseCommit, d.HeadCommit, d.DiffStat, d.UnifiedDiff)).ToList();
}
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task<CombinedDiffResultDto> BuildPlanningIntegrationBranch(string planningTaskId, string targetBranch)
{
try
{
var result = await _planningAggregator.BuildIntegrationBranchAsync(
planningTaskId, targetBranch ?? "", CancellationToken.None);
return result switch
{
CombinedDiffResult.Ok ok => new CombinedDiffResultDto(
true, ok.Value.IntegrationBranch, ok.Value.UnifiedDiff, null, null),
CombinedDiffResult.Failed f => new CombinedDiffResultDto(
false, null, null, f.Value.FirstConflictSubtaskId, f.Value.ConflictedFiles),
_ => throw new InvalidOperationException("unknown result type"),
};
}
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task MergeAllPlanning(string planningTaskId, string targetBranch)
{
try { await _planningMergeOrchestrator.StartAsync(planningTaskId, targetBranch ?? "", CancellationToken.None); }
catch (KeyNotFoundException) { throw new HubException("planning task not found"); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task ContinuePlanningMerge(string planningTaskId)
{
try { await _planningMergeOrchestrator.ContinueAsync(planningTaskId, CancellationToken.None); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
public async Task AbortPlanningMerge(string planningTaskId)
{
try { await _planningMergeOrchestrator.AbortAsync(planningTaskId, CancellationToken.None); }
catch (InvalidOperationException ex) { throw new HubException(ex.Message); }
}
private static string? Nullify(string? s) => string.IsNullOrWhiteSpace(s) ? null : s;
}