Files
ClaudeDo/src/ClaudeDo.Ui/Services/WorkerClient.cs
mika kuns b1f4349dab feat(worker): configurable max parallel task executions
Add a "Max parallel executions" setting to the General settings tab so
the queue can run more than one task concurrently. QueueService now
tracks multiple active slots and reads the limit from app settings each
cycle, so changes take effect without restarting the worker.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 15:51:12 +02:00

484 lines
18 KiB
C#

using System.Collections.ObjectModel;
using Avalonia.Threading;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using CommunityToolkit.Mvvm.ComponentModel;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.DependencyInjection;
namespace ClaudeDo.Ui.Services;
public record ActiveTask(string Slot, string TaskId, DateTime StartedAt);
public sealed record WorkerLogEntry(string Message, WorkerLogLevel Level, DateTime TimestampUtc);
sealed class IndefiniteRetryPolicy : IRetryPolicy
{
private static readonly TimeSpan[] _delays =
[
TimeSpan.Zero,
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(30),
];
public TimeSpan? NextRetryDelay(RetryContext retryContext) =>
_delays[Math.Min(retryContext.PreviousRetryCount, _delays.Length - 1)];
}
public partial class WorkerClient : ObservableObject, IAsyncDisposable, IWorkerClient
{
private readonly HubConnection _hub;
private CancellationTokenSource? _startCts;
private Task _retryLoopTask = Task.CompletedTask;
private readonly object _startLock = new();
[ObservableProperty]
private bool _isConnected;
[ObservableProperty]
private bool _isReconnecting;
public ObservableCollection<ActiveTask> ActiveTasks { get; } = new();
public event Action<string, string, DateTime>? TaskStartedEvent;
public event Action<string, string, string, DateTime>? TaskFinishedEvent;
public event Action<string, string>? TaskMessageEvent;
public event Action<string>? TaskUpdatedEvent;
public event Action? ConnectionRestoredEvent;
public event Action<string>? WorktreeUpdatedEvent;
public event Action<string>? ListUpdatedEvent;
public event Action<WorkerLogEntry>? WorkerLogReceivedEvent;
public event Action<string, string>? PlanningMergeStartedEvent;
public event Action<string, string>? PlanningSubtaskMergedEvent;
public event Action<string, string, IReadOnlyList<string>>? PlanningMergeConflictEvent;
public event Action<string>? PlanningMergeAbortedEvent;
public event Action<string>? PlanningCompletedEvent;
public event Action<PrimeFiredEvent>? PrimeFired;
public string? LastMergeAllTarget { get; private set; }
public WorkerClient(string signalRUrl)
{
_hub = new HubConnectionBuilder()
.WithUrl(signalRUrl)
.WithAutomaticReconnect(new IndefiniteRetryPolicy())
.AddJsonProtocol(options =>
{
options.PayloadSerializerOptions.Converters.Add(new System.Text.Json.Serialization.JsonStringEnumConverter());
})
.Build();
_hub.Reconnected += async _ =>
{
Dispatcher.UIThread.Post(() => { IsConnected = true; IsReconnecting = false; });
await SeedActiveTasksAsync();
Dispatcher.UIThread.Post(() => ConnectionRestoredEvent?.Invoke());
};
_hub.Reconnecting += _ =>
{
Dispatcher.UIThread.Post(() => { IsConnected = false; IsReconnecting = true; });
return Task.CompletedTask;
};
_hub.Closed += _ =>
{
Dispatcher.UIThread.Post(() =>
{
IsConnected = false;
IsReconnecting = false;
ActiveTasks.Clear();
});
return Task.CompletedTask;
};
_hub.On<string, string, DateTime>("TaskStarted", (slot, taskId, startedAt) =>
{
Dispatcher.UIThread.Post(() =>
{
ActiveTasks.Add(new ActiveTask(slot, taskId, startedAt));
TaskStartedEvent?.Invoke(slot, taskId, startedAt);
});
});
_hub.On<string, string, string, DateTime>("TaskFinished", (slot, taskId, status, finishedAt) =>
{
Dispatcher.UIThread.Post(() =>
{
var existing = ActiveTasks.FirstOrDefault(t => t.TaskId == taskId);
if (existing is not null)
ActiveTasks.Remove(existing);
TaskFinishedEvent?.Invoke(slot, taskId, status, finishedAt);
});
});
_hub.On<string, string>("TaskMessage", (taskId, line) =>
{
Dispatcher.UIThread.Post(() => TaskMessageEvent?.Invoke(taskId, line));
});
_hub.On<string>("TaskUpdated", taskId =>
{
Dispatcher.UIThread.Post(() => TaskUpdatedEvent?.Invoke(taskId));
});
_hub.On<string>("WorktreeUpdated", taskId =>
{
Dispatcher.UIThread.Post(() => WorktreeUpdatedEvent?.Invoke(taskId));
});
_hub.On<string>("ListUpdated", listId =>
{
Dispatcher.UIThread.Post(() => ListUpdatedEvent?.Invoke(listId));
});
_hub.On<string, WorkerLogLevel, DateTime>("WorkerLog", (message, level, timestampUtc) =>
{
Dispatcher.UIThread.Post(() =>
WorkerLogReceivedEvent?.Invoke(new WorkerLogEntry(message, level, timestampUtc)));
});
_hub.On<string, string>("PlanningMergeStarted", (planningTaskId, targetBranch) =>
{
Dispatcher.UIThread.Post(() => PlanningMergeStartedEvent?.Invoke(planningTaskId, targetBranch));
});
_hub.On<string, string>("PlanningSubtaskMerged", (planningTaskId, subtaskId) =>
{
Dispatcher.UIThread.Post(() => PlanningSubtaskMergedEvent?.Invoke(planningTaskId, subtaskId));
});
_hub.On<string, string, IReadOnlyList<string>>("PlanningMergeConflict", (planningTaskId, subtaskId, conflictedFiles) =>
{
Dispatcher.UIThread.Post(() => PlanningMergeConflictEvent?.Invoke(planningTaskId, subtaskId, conflictedFiles));
});
_hub.On<string>("PlanningMergeAborted", planningTaskId =>
{
Dispatcher.UIThread.Post(() => PlanningMergeAbortedEvent?.Invoke(planningTaskId));
});
_hub.On<string>("PlanningCompleted", planningTaskId =>
{
Dispatcher.UIThread.Post(() => PlanningCompletedEvent?.Invoke(planningTaskId));
});
_hub.On<Guid, bool, string, DateTimeOffset>("PrimeFired", (id, ok, msg, when) =>
{
Dispatcher.UIThread.Post(() => PrimeFired?.Invoke(new PrimeFiredEvent(id, ok, msg, when)));
});
}
public Task StartAsync()
{
lock (_startLock)
{
if (!_retryLoopTask.IsCompleted)
return Task.CompletedTask;
var old = _startCts;
_startCts = new CancellationTokenSource();
old?.Cancel();
old?.Dispose();
_retryLoopTask = ConnectWithRetryAsync(_startCts.Token);
}
return Task.CompletedTask;
}
private async Task ConnectWithRetryAsync(CancellationToken ct)
{
var delays = new[] { 0, 2, 5, 10, 30 };
int attempt = 0;
Dispatcher.UIThread.Post(() => IsReconnecting = true);
while (!ct.IsCancellationRequested)
{
try
{
await _hub.StartAsync(ct);
Dispatcher.UIThread.Post(() => { IsConnected = true; IsReconnecting = false; });
await SeedActiveTasksAsync();
Dispatcher.UIThread.Post(() => ConnectionRestoredEvent?.Invoke());
return;
}
catch (OperationCanceledException)
{
return;
}
catch
{
var delay = delays[Math.Min(attempt++, delays.Length - 1)];
try { await Task.Delay(TimeSpan.FromSeconds(delay), ct); }
catch (OperationCanceledException) { return; }
}
}
}
public async Task StopAsync()
{
_startCts?.Cancel();
try { await _retryLoopTask; } catch (OperationCanceledException) { } catch { /* swallow */ }
try { await _hub.StopAsync(); } catch { /* swallow */ }
}
/// <summary>Invoke a hub method, returning default (null) when the worker is offline or errors.</summary>
private async Task<T?> TryInvokeAsync<T>(string method, params object?[] args)
{
try { return await _hub.InvokeCoreAsync<T>(method, args); }
catch { return default; }
}
public async Task RunNowAsync(string taskId)
{
await _hub.InvokeAsync("RunNow", taskId);
}
public async Task ContinueTaskAsync(string taskId, string followUpPrompt)
{
await _hub.InvokeAsync("ContinueTask", taskId, followUpPrompt);
}
public async Task ResetTaskAsync(string taskId)
{
await _hub.InvokeAsync("ResetTask", taskId);
}
public async Task<MergeResultDto> MergeTaskAsync(string taskId, string targetBranch, bool removeWorktree, string commitMessage)
{
return await _hub.InvokeAsync<MergeResultDto>(
"MergeTask", taskId, targetBranch, removeWorktree, commitMessage);
}
public Task<MergeTargetsDto?> GetMergeTargetsAsync(string taskId)
=> TryInvokeAsync<MergeTargetsDto>("GetMergeTargets", taskId);
public async Task CancelTaskAsync(string taskId)
{
await _hub.InvokeAsync("CancelTask", taskId);
}
public async Task WakeQueueAsync()
{
await _hub.InvokeAsync("WakeQueue");
}
public async Task<List<AgentInfo>> GetAgentsAsync()
=> await TryInvokeAsync<List<AgentInfo>>("GetAgents") ?? [];
public async Task RefreshAgentsAsync()
{
await _hub.InvokeAsync("RefreshAgents");
}
public Task<SeedResultDto?> RestoreDefaultAgentsAsync()
=> TryInvokeAsync<SeedResultDto>("RestoreDefaultAgents");
private async Task SeedActiveTasksAsync()
{
try
{
var active = await _hub.InvokeAsync<List<ActiveTaskDto>>("GetActive");
Dispatcher.UIThread.Post(() =>
{
ActiveTasks.Clear();
foreach (var a in active)
ActiveTasks.Add(new ActiveTask(a.Slot, a.TaskId, a.StartedAt));
});
}
catch (HubException)
{
// Expected: worker doesn't support GetActive yet
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"SeedActiveTasksAsync failed: {ex}");
}
}
public async ValueTask DisposeAsync()
{
_startCts?.Cancel();
try { await _retryLoopTask; } catch (OperationCanceledException) { } catch { /* swallow */ }
await _hub.DisposeAsync();
}
public Task<AppSettingsDto?> GetAppSettingsAsync()
=> TryInvokeAsync<AppSettingsDto>("GetAppSettings");
public async Task UpdateAppSettingsAsync(AppSettingsDto dto)
{
await _hub.InvokeAsync("UpdateAppSettings", dto);
}
public async Task<List<PrimeScheduleDto>> GetPrimeSchedulesAsync()
=> await TryInvokeAsync<List<PrimeScheduleDto>>("ListPrimeSchedules") ?? new List<PrimeScheduleDto>();
public Task<PrimeScheduleDto?> UpsertPrimeScheduleAsync(PrimeScheduleDto dto)
=> TryInvokeAsync<PrimeScheduleDto>("UpsertPrimeSchedule", dto);
public async Task DeletePrimeScheduleAsync(Guid id)
{
try { await _hub.InvokeAsync("DeletePrimeSchedule", id); }
catch { /* offline */ }
}
public async Task UpdateListAsync(UpdateListDto dto)
{
await _hub.InvokeAsync("UpdateList", dto);
}
public async Task UpdateListConfigAsync(UpdateListConfigDto dto)
{
await _hub.InvokeAsync("UpdateListConfig", dto);
}
public Task<ListConfigDto?> GetListConfigAsync(string listId)
=> TryInvokeAsync<ListConfigDto>("GetListConfig", listId);
public async Task UpdateTaskAgentSettingsAsync(UpdateTaskAgentSettingsDto dto)
{
await _hub.InvokeAsync("UpdateTaskAgentSettings", dto);
}
public async Task SetTaskStatusAsync(string taskId, ClaudeDo.Data.Models.TaskStatus status)
{
await _hub.InvokeAsync("SetTaskStatus", taskId, status.ToString());
}
public Task<WorktreeCleanupDto?> CleanupFinishedWorktreesAsync(string? listId = null)
=> TryInvokeAsync<WorktreeCleanupDto>("CleanupFinishedWorktrees", listId);
public Task<WorktreeResetDto?> ResetAllWorktreesAsync()
=> TryInvokeAsync<WorktreeResetDto>("ResetAllWorktrees");
public async Task<List<WorktreeOverviewDto>> GetWorktreesOverviewAsync(string? listId)
=> await TryInvokeAsync<List<WorktreeOverviewDto>>("GetWorktreesOverview", listId)
?? new List<WorktreeOverviewDto>();
public async Task<(bool Ok, string? Error)> SetWorktreeStateAsync(string taskId, WorktreeState newState)
{
try
{
var ok = await _hub.InvokeAsync<bool>("SetWorktreeState", taskId, newState);
return (ok, null);
}
catch (HubException ex)
{
return (false, ex.Message);
}
catch (Exception)
{
return (false, "Worker offline.");
}
}
public Task<ForceRemoveResultDto?> ForceRemoveWorktreeAsync(string taskId)
=> TryInvokeAsync<ForceRemoveResultDto>("ForceRemoveWorktree", taskId);
public async Task<PlanningSessionStartInfo> StartPlanningSessionAsync(string taskId, CancellationToken ct = default)
=> await _hub.InvokeAsync<PlanningSessionStartInfo>("StartPlanningSessionAsync", taskId, ct);
public async Task<PlanningSessionResumeInfo> ResumePlanningSessionAsync(string taskId, CancellationToken ct = default)
=> await _hub.InvokeAsync<PlanningSessionResumeInfo>("ResumePlanningSessionAsync", taskId, ct);
public async Task OpenInteractiveTerminalAsync(string taskId, CancellationToken ct = default)
=> await _hub.InvokeAsync("OpenInteractiveTerminalAsync", taskId, ct);
public async Task<DiscardPlanningOutcome> DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren = false, CancellationToken ct = default)
=> await _hub.InvokeAsync<DiscardPlanningOutcome>("DiscardPlanningSessionAsync", taskId, dequeueQueuedChildren, ct);
public async Task<int> FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks = true, CancellationToken ct = default)
=> await _hub.InvokeAsync<int>("FinalizePlanningSessionAsync", taskId, queueAgentTasks, ct);
public async Task<int> GetPendingDraftCountAsync(string taskId, CancellationToken ct = default)
=> await _hub.InvokeAsync<int>("GetPendingDraftCountAsync", taskId, ct);
public async Task<IReadOnlyList<SubtaskDiffDto>> GetPlanningAggregateAsync(string planningTaskId)
=> await TryInvokeAsync<List<SubtaskDiffDto>>("GetPlanningAggregate", planningTaskId) ?? [];
public Task<CombinedDiffResultDto?> BuildPlanningIntegrationBranchAsync(string planningTaskId, string targetBranch)
=> TryInvokeAsync<CombinedDiffResultDto>("BuildPlanningIntegrationBranch", planningTaskId, targetBranch);
public async Task MergeAllPlanningAsync(string planningTaskId, string targetBranch)
{
LastMergeAllTarget = targetBranch;
await _hub.InvokeAsync("MergeAllPlanning", planningTaskId, targetBranch);
}
public async Task ContinuePlanningMergeAsync(string planningTaskId)
{
await _hub.InvokeAsync("ContinuePlanningMerge", planningTaskId);
}
public async Task AbortPlanningMergeAsync(string planningTaskId)
{
await _hub.InvokeAsync("AbortPlanningMerge", planningTaskId);
}
public async Task QueuePlanningSubtasksAsync(string parentTaskId, CancellationToken ct = default)
{
await _hub.InvokeAsync("QueuePlanningSubtasksAsync", parentTaskId, ct);
}
// IWorkerClient explicit implementations (drop typed return values)
async Task IWorkerClient.StartPlanningSessionAsync(string taskId, CancellationToken ct)
=> await StartPlanningSessionAsync(taskId, ct);
async Task IWorkerClient.ResumePlanningSessionAsync(string taskId, CancellationToken ct)
=> await ResumePlanningSessionAsync(taskId, ct);
async Task<DiscardPlanningOutcome> IWorkerClient.DiscardPlanningSessionAsync(string taskId, bool dequeueQueuedChildren, CancellationToken ct)
=> await DiscardPlanningSessionAsync(taskId, dequeueQueuedChildren, ct);
async Task IWorkerClient.FinalizePlanningSessionAsync(string taskId, bool queueAgentTasks, CancellationToken ct)
=> await FinalizePlanningSessionAsync(taskId, queueAgentTasks, ct);
async Task<int> IWorkerClient.GetPendingDraftCountAsync(string taskId, CancellationToken ct)
=> await GetPendingDraftCountAsync(taskId, ct);
// DTOs for deserializing hub responses
private sealed class ActiveTaskDto
{
public string Slot { get; set; } = "";
public string TaskId { get; set; } = "";
public DateTime StartedAt { get; set; }
}
}
public sealed record AppSettingsDto(
string DefaultClaudeInstructions,
string DefaultModel,
int DefaultMaxTurns,
string DefaultPermissionMode,
int MaxParallelExecutions,
string WorktreeStrategy,
string? CentralWorktreeRoot,
bool WorktreeAutoCleanupEnabled,
int WorktreeAutoCleanupDays);
public sealed record WorktreeCleanupDto(int Removed);
public sealed record WorktreeResetDto(int Removed, int TasksAffected, bool Blocked, int RunningTasks);
public record MergeResultDto(string Status, IReadOnlyList<string> ConflictFiles, string? ErrorMessage);
public record MergeTargetsDto(string DefaultBranch, IReadOnlyList<string> LocalBranches);
public sealed record UpdateListDto(string Id, string Name, string? WorkingDir, string DefaultCommitType);
public sealed record UpdateListConfigDto(string ListId, string? Model, string? SystemPrompt, string? AgentPath);
public sealed record UpdateTaskAgentSettingsDto(string TaskId, string? Model, string? SystemPrompt, string? AgentPath);
public sealed record ListConfigDto(string? Model, string? SystemPrompt, string? AgentPath);
public sealed record SeedResultDto(int Copied, int Skipped);
public sealed record WorktreeOverviewDto(
string TaskId,
string TaskTitle,
ClaudeDo.Data.Models.TaskStatus TaskStatus,
string ListId,
string ListName,
string Path,
string BranchName,
string BaseCommit,
WorktreeState State,
string? DiffStat,
DateTime CreatedAt,
bool PathExistsOnDisk);
public sealed record ForceRemoveResultDto(bool Removed, string? Reason);