using System.Collections.ObjectModel; using Avalonia.Threading; using ClaudeDo.Data.Models; using ClaudeDo.Data.Repositories; using CommunityToolkit.Mvvm.ComponentModel; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; namespace ClaudeDo.Ui.Services; public record ActiveTask(string Slot, string TaskId, DateTime StartedAt); public sealed record WorkerLogEntry(string Message, WorkerLogLevel Level, DateTime TimestampUtc); sealed class IndefiniteRetryPolicy : IRetryPolicy { private static readonly TimeSpan[] _delays = [ TimeSpan.Zero, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30), ]; public TimeSpan? NextRetryDelay(RetryContext retryContext) => _delays[Math.Min(retryContext.PreviousRetryCount, _delays.Length - 1)]; } public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerClient { private readonly HubConnection _hub; private CancellationTokenSource? _startCts; private Task _retryLoopTask = Task.CompletedTask; private readonly object _startLock = new(); [ObservableProperty] private bool _isConnected; [ObservableProperty] private bool _isReconnecting; public ObservableCollection ActiveTasks { get; } = new(); public event Action? TaskStartedEvent; public event Action? TaskFinishedEvent; public event Action? TaskMessageEvent; public event Action? TaskUpdatedEvent; public event Action? ConnectionRestoredEvent; public event Action? WorktreeUpdatedEvent; public event Action? ListUpdatedEvent; public event Action? WorkerLogReceivedEvent; public event Action? PrepStartedEvent; public event Action? PrepLineEvent; public event Action? PrepFinishedEvent; public event Action? PlanningMergeStartedEvent; public event Action? PlanningSubtaskMergedEvent; public event Action>? PlanningMergeConflictEvent; public event Action? PlanningMergeAbortedEvent; public event Action? PlanningCompletedEvent; public event Action? PrimeFired; public string? LastMergeAllTarget { get; private set; } public WorkerClient(string signalRUrl) { _hub = new HubConnectionBuilder() .WithUrl(signalRUrl) .WithAutomaticReconnect(new IndefiniteRetryPolicy()) .AddJsonProtocol(options => { options.PayloadSerializerOptions.Converters.Add(new System.Text.Json.Serialization.JsonStringEnumConverter()); }) .Build(); _hub.Reconnected += async _ => { Dispatcher.UIThread.Post(() => { IsConnected = true; IsReconnecting = false; }); await SeedActiveTasksAsync(); Dispatcher.UIThread.Post(() => ConnectionRestoredEvent?.Invoke()); }; _hub.Reconnecting += _ => { Dispatcher.UIThread.Post(() => { IsConnected = false; IsReconnecting = true; }); return Task.CompletedTask; }; _hub.Closed += _ => { Dispatcher.UIThread.Post(() => { IsConnected = false; IsReconnecting = false; ActiveTasks.Clear(); }); return Task.CompletedTask; }; _hub.On("TaskStarted", (slot, taskId, startedAt) => { Dispatcher.UIThread.Post(() => { ActiveTasks.Add(new ActiveTask(slot, taskId, startedAt)); TaskStartedEvent?.Invoke(slot, taskId, startedAt); }); }); _hub.On("TaskFinished", (slot, taskId, status, finishedAt) => { Dispatcher.UIThread.Post(() => { var existing = ActiveTasks.FirstOrDefault(t => t.TaskId == taskId); if (existing is not null) ActiveTasks.Remove(existing); TaskFinishedEvent?.Invoke(slot, taskId, status, finishedAt); }); }); _hub.On("TaskMessage", (taskId, line) => { Dispatcher.UIThread.Post(() => TaskMessageEvent?.Invoke(taskId, line)); }); _hub.On("TaskUpdated", taskId => { Dispatcher.UIThread.Post(() => TaskUpdatedEvent?.Invoke(taskId)); }); _hub.On("WorktreeUpdated", taskId => { Dispatcher.UIThread.Post(() => WorktreeUpdatedEvent?.Invoke(taskId)); }); _hub.On("ListUpdated", listId => { Dispatcher.UIThread.Post(() => ListUpdatedEvent?.Invoke(listId)); }); _hub.On("WorkerLog", (message, level, timestampUtc) => { Dispatcher.UIThread.Post(() => WorkerLogReceivedEvent?.Invoke(new WorkerLogEntry(message, level, timestampUtc))); }); _hub.On("PlanningMergeStarted", (planningTaskId, targetBranch) => { Dispatcher.UIThread.Post(() => PlanningMergeStartedEvent?.Invoke(planningTaskId, targetBranch)); }); _hub.On("PlanningSubtaskMerged", (planningTaskId, subtaskId) => { Dispatcher.UIThread.Post(() => PlanningSubtaskMergedEvent?.Invoke(planningTaskId, subtaskId)); }); _hub.On>("PlanningMergeConflict", (planningTaskId, subtaskId, conflictedFiles) => { Dispatcher.UIThread.Post(() => PlanningMergeConflictEvent?.Invoke(planningTaskId, subtaskId, conflictedFiles)); }); _hub.On("PlanningMergeAborted", planningTaskId => { Dispatcher.UIThread.Post(() => PlanningMergeAbortedEvent?.Invoke(planningTaskId)); }); _hub.On("PlanningCompleted", planningTaskId => { Dispatcher.UIThread.Post(() => PlanningCompletedEvent?.Invoke(planningTaskId)); }); _hub.On("PrimeFired", (id, ok, msg, when) => { Dispatcher.UIThread.Post(() => PrimeFired?.Invoke(new PrimeFiredEvent(id, ok, msg, when))); }); _hub.On("PrepStarted", () => Dispatcher.UIThread.Post(() => PrepStartedEvent?.Invoke())); _hub.On("PrepLine", line => Dispatcher.UIThread.Post(() => PrepLineEvent?.Invoke(line))); _hub.On("PrepFinished", ok => Dispatcher.UIThread.Post(() => PrepFinishedEvent?.Invoke(ok))); } public Task StartAsync() { lock (_startLock) { if (!_retryLoopTask.IsCompleted) return Task.CompletedTask; var old = _startCts; _startCts = new CancellationTokenSource(); old?.Cancel(); old?.Dispose(); _retryLoopTask = ConnectWithRetryAsync(_startCts.Token); } return Task.CompletedTask; } private async Task ConnectWithRetryAsync(CancellationToken ct) { var delays = new[] { 0, 2, 5, 10, 30 }; int attempt = 0; Dispatcher.UIThread.Post(() => IsReconnecting = true); while (!ct.IsCancellationRequested) { try { await _hub.StartAsync(ct); Dispatcher.UIThread.Post(() => { IsConnected = true; IsReconnecting = false; }); await SeedActiveTasksAsync(); Dispatcher.UIThread.Post(() => ConnectionRestoredEvent?.Invoke()); return; } catch (OperationCanceledException) { return; } catch { var delay = delays[Math.Min(attempt++, delays.Length - 1)]; try { await Task.Delay(TimeSpan.FromSeconds(delay), ct); } catch (OperationCanceledException) { return; } } } } public async Task StopAsync() { _startCts?.Cancel(); try { await _retryLoopTask; } catch (OperationCanceledException) { } catch { /* swallow */ } try { await _hub.StopAsync(); } catch { /* swallow */ } } /// Invoke a hub method, returning default (null) when the worker is offline or errors. private async Task TryInvokeAsync(string method, params object?[] args) { try { return await _hub.InvokeCoreAsync(method, args); } catch { return default; } } public async Task RunNowAsync(string taskId) { await _hub.InvokeAsync("RunNow", taskId); } public async Task ContinueTaskAsync(string taskId, string followUpPrompt) { await _hub.InvokeAsync("ContinueTask", taskId, followUpPrompt); } public async Task ResetTaskAsync(string taskId) { await _hub.InvokeAsync("ResetTask", taskId); } public async Task MergeTaskAsync(string taskId, string targetBranch, bool removeWorktree, string commitMessage) { return await _hub.InvokeAsync( "MergeTask", taskId, targetBranch, removeWorktree, commitMessage); } public Task GetMergeTargetsAsync(string taskId) => TryInvokeAsync("GetMergeTargets", taskId); public async Task CancelTaskAsync(string taskId) { await _hub.InvokeAsync("CancelTask", taskId); } public async Task WakeQueueAsync() { await _hub.InvokeAsync("WakeQueue"); } public async Task> GetAgentsAsync() => await TryInvokeAsync>("GetAgents") ?? []; public async Task RefreshAgentsAsync() { await _hub.InvokeAsync("RefreshAgents"); } public Task RestoreDefaultAgentsAsync() => TryInvokeAsync("RestoreDefaultAgents"); private async Task SeedActiveTasksAsync() { try { var active = await _hub.InvokeAsync>("GetActive"); Dispatcher.UIThread.Post(() => { ActiveTasks.Clear(); foreach (var a in active) ActiveTasks.Add(new ActiveTask(a.Slot, a.TaskId, a.StartedAt)); }); } catch (HubException) { // Expected: worker doesn't support GetActive yet } catch (Exception ex) { System.Diagnostics.Debug.WriteLine($"SeedActiveTasksAsync failed: {ex}"); } } public async ValueTask DisposeAsync() { _startCts?.Cancel(); try { await _retryLoopTask; } catch (OperationCanceledException) { } catch { /* swallow */ } await _hub.DisposeAsync(); } public Task GetAppSettingsAsync() => TryInvokeAsync("GetAppSettings"); public async Task UpdateAppSettingsAsync(AppSettingsDto dto) { await _hub.InvokeAsync("UpdateAppSettings", dto); } public async Task> GetPrimeSchedulesAsync() => await TryInvokeAsync>("ListPrimeSchedules") ?? new List(); public Task UpsertPrimeScheduleAsync(PrimeScheduleDto dto) => TryInvokeAsync("UpsertPrimeSchedule", dto); public async Task DeletePrimeScheduleAsync(Guid id) { try { await _hub.InvokeAsync("DeletePrimeSchedule", id); } catch { /* offline */ } } private static string IsoDay(DateOnly d) => d.ToString("yyyy-MM-dd"); public Task GetWeekReportAsync(DateOnly start, DateOnly end) => TryInvokeAsync("GetWeekReport", IsoDay(start), IsoDay(end)); public Task GenerateWeekReportAsync(DateOnly start, DateOnly end) => _hub.InvokeAsync("GenerateWeekReport", IsoDay(start), IsoDay(end)); public Task RunDailyPrepNowAsync() => _hub.InvokeAsync("RunDailyPrepNow"); public Task ClearMyDayAsync() => _hub.InvokeAsync("ClearMyDay"); public async Task> GetDailyNotesAsync(DateOnly day) => await TryInvokeAsync>("GetDailyNotes", IsoDay(day)) ?? new List(); public Task AddDailyNoteAsync(DateOnly day, string text) => TryInvokeAsync("AddDailyNote", IsoDay(day), text); public async Task UpdateDailyNoteAsync(string id, string text) => await _hub.InvokeAsync("UpdateDailyNote", id, text); public async Task DeleteDailyNoteAsync(string id) => await _hub.InvokeAsync("DeleteDailyNote", id); public async Task GetLastPrepLogAsync() => await TryInvokeAsync("GetLastPrepLog") ?? string.Empty; public async Task UpdateListAsync(UpdateListDto dto) { await _hub.InvokeAsync("UpdateList", dto); } public async Task UpdateListConfigAsync(UpdateListConfigDto dto) { await _hub.InvokeAsync("UpdateListConfig", dto); } public Task GetListConfigAsync(string listId) => TryInvokeAsync("GetListConfig", listId); public async Task UpdateTaskAgentSettingsAsync(UpdateTaskAgentSettingsDto dto) { await _hub.InvokeAsync("UpdateTaskAgentSettings", dto); } public async Task SetTaskStatusAsync(string taskId, ClaudeDo.Data.Models.TaskStatus status) { await _hub.InvokeAsync("SetTaskStatus", taskId, status.ToString()); } public async Task ApproveReviewAsync(string taskId) { await _hub.InvokeAsync("ApproveReview", taskId); } public async Task RejectReviewToQueueAsync(string taskId, string feedback) { await _hub.InvokeAsync("RejectReviewToQueue", taskId, feedback); } public async Task RejectReviewToIdleAsync(string taskId) { await _hub.InvokeAsync("RejectReviewToIdle", taskId); } public async Task CancelReviewAsync(string taskId) { await _hub.InvokeAsync("CancelReview", taskId); } public Task CleanupFinishedWorktreesAsync(string? listId = null) => TryInvokeAsync("CleanupFinishedWorktrees", listId); public Task ResetAllWorktreesAsync() => TryInvokeAsync("ResetAllWorktrees"); public async Task> GetWorktreesOverviewAsync(string? listId) => await TryInvokeAsync>("GetWorktreesOverview", listId) ?? new List(); public async Task<(bool Ok, string? Error)> SetWorktreeStateAsync(string taskId, WorktreeState newState) { try { var ok = await _hub.InvokeAsync("SetWorktreeState", taskId, newState); return (ok, null); } catch (HubException ex) { return (false, ex.Message); } catch (Exception) { return (false, "Worker offline."); } } public Task ForceRemoveWorktreeAsync(string taskId) => TryInvokeAsync("ForceRemoveWorktree", taskId); public async Task StartPlanningSessionAsync(string taskId, CancellationToken ct = default) => await _hub.InvokeAsync("StartPlanningSessionAsync", taskId, ct); public async Task ResumePlanningSessionAsync(string taskId, CancellationToken ct = default) => await _hub.InvokeAsync("ResumePlanningSessionAsync", taskId, ct); public async Task OpenInteractiveTerminalAsync(string taskId, CancellationToken ct = default) => await _hub.InvokeAsync("OpenInteractiveTerminalAsync", taskId, ct); public async Task DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false, CancellationToken ct = default) => await _hub.InvokeAsync("DiscardPlanningSessionAsync", taskId, dequeueQueuedChildren, ct); public async Task FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks = true, CancellationToken ct = default) => await _hub.InvokeAsync("FinalizePlanningSessionAsync", taskId, queueAgentTasks, ct); public async Task GetPendingDraftCountAsync(string taskId, CancellationToken ct = default) => await _hub.InvokeAsync("GetPendingDraftCountAsync", taskId, ct); public async Task> GetPlanningAggregateAsync(string planningTaskId) => await TryInvokeAsync>("GetPlanningAggregate", planningTaskId) ?? []; public Task BuildPlanningIntegrationBranchAsync(string planningTaskId, string targetBranch) => TryInvokeAsync("BuildPlanningIntegrationBranch", planningTaskId, targetBranch); public async Task MergeAllPlanningAsync(string planningTaskId, string targetBranch) { LastMergeAllTarget = targetBranch; await _hub.InvokeAsync("MergeAllPlanning", planningTaskId, targetBranch); } public async Task ContinuePlanningMergeAsync(string planningTaskId) { await _hub.InvokeAsync("ContinuePlanningMerge", planningTaskId); } public async Task AbortPlanningMergeAsync(string planningTaskId) { await _hub.InvokeAsync("AbortPlanningMerge", planningTaskId); } public async Task QueuePlanningSubtasksAsync(string parentTaskId, CancellationToken ct = default) { await _hub.InvokeAsync("QueuePlanningSubtasksAsync", parentTaskId, ct); } // IWorkerClient explicit implementations (drop typed return values) async Task IWorkerClient.StartPlanningSessionAsync(string taskId, CancellationToken ct) => await StartPlanningSessionAsync(taskId, ct); async Task IWorkerClient.ResumePlanningSessionAsync(string taskId, CancellationToken ct) => await ResumePlanningSessionAsync(taskId, ct); async Task IWorkerClient.DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren, CancellationToken ct) => await DiscardPlanningSessionAsync(taskId, dequeueQueuedChildren, ct); async Task IWorkerClient.FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks, CancellationToken ct) => await FinalizePlanningSessionAsync(taskId, queueAgentTasks, ct); async Task IWorkerClient.GetPendingDraftCountAsync(string taskId, CancellationToken ct) => await GetPendingDraftCountAsync(taskId, ct); // DTOs for deserializing hub responses private sealed class ActiveTaskDto { public string Slot { get; set; } = ""; public string TaskId { get; set; } = ""; public DateTime StartedAt { get; set; } } } public sealed record AppSettingsDto( string DefaultClaudeInstructions, string DefaultModel, int DefaultMaxTurns, string DefaultPermissionMode, int MaxParallelExecutions, string WorktreeStrategy, string? CentralWorktreeRoot, bool WorktreeAutoCleanupEnabled, int WorktreeAutoCleanupDays, string? ReportExcludedPaths, int StandupWeekday, int DailyPrepMaxTasks); public sealed record WorktreeCleanupDto(int Removed); public sealed record WorktreeResetDto(int Removed, int TasksAffected, bool Blocked, int RunningTasks); public record MergeResultDto(string Status, IReadOnlyList ConflictFiles, string? ErrorMessage); public record MergeTargetsDto(string DefaultBranch, IReadOnlyList LocalBranches); public sealed record UpdateListDto(string Id, string Name, string? WorkingDir, string DefaultCommitType); public sealed record UpdateListConfigDto(string ListId, string? Model, string? SystemPrompt, string? AgentPath, int? MaxTurns = null); public sealed record UpdateTaskAgentSettingsDto(string TaskId, string? Model, string? SystemPrompt, string? AgentPath, int? MaxTurns = null); public sealed record ListConfigDto(string? Model, string? SystemPrompt, string? AgentPath, int? MaxTurns = null); public sealed record SeedResultDto(int Copied, int Skipped); public sealed record WorktreeOverviewDto( string TaskId, string TaskTitle, ClaudeDo.Data.Models.TaskStatus TaskStatus, string ListId, string ListName, string Path, string BranchName, string BaseCommit, WorktreeState State, string? DiffStat, DateTime CreatedAt, bool PathExistsOnDisk); public sealed record ForceRemoveResultDto(bool Removed, string? Reason);