fix: resolve critical bugs and improve reliability across worker, data, UI
- Fix worker using wrong DB by defaulting to CurrentUser service account and expanding ~ to absolute paths at install time - Fix DbContext disposed before fire-and-forget by passing taskId instead of TaskEntity into RunInSlotAsync, which creates its own context - Fix ActiveTaskDto property casing mismatch between hub and client - Move WAL mode PRAGMA before migrations to prevent concurrent lock issues - Replace FirstAsync with FirstOrDefaultAsync + null guards in tag operations - Add delete confirmation flow for lists - Log fire-and-forget exceptions instead of swallowing them - Broadcast RunCreated event from WorkerHub.RunNow - Add IDisposable to MainWindowViewModel for event handler cleanup - Preserve subtask CreatedAt on updates instead of overwriting - Replace bare catch blocks with Debug.WriteLine logging Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -30,10 +30,21 @@ public class ClaudeDoDbContext : DbContext
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public static void MigrateAndConfigure(ClaudeDoDbContext db)
|
public static void MigrateAndConfigure(ClaudeDoDbContext db)
|
||||||
{
|
{
|
||||||
|
var conn = db.Database.GetDbConnection();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
conn.Open();
|
||||||
|
|
||||||
|
// Set WAL FIRST, before migrations — prevents write-lock contention
|
||||||
|
// when UI and Worker start simultaneously.
|
||||||
|
using (var walCmd = conn.CreateCommand())
|
||||||
|
{
|
||||||
|
walCmd.CommandText = "PRAGMA journal_mode=wal;";
|
||||||
|
walCmd.ExecuteNonQuery();
|
||||||
|
}
|
||||||
|
|
||||||
// If the 'lists' table exists but __EFMigrationsHistory does not,
|
// If the 'lists' table exists but __EFMigrationsHistory does not,
|
||||||
// this is a pre-EF database. Baseline the InitialCreate migration.
|
// this is a pre-EF database. Baseline the InitialCreate migration.
|
||||||
var conn = db.Database.GetDbConnection();
|
|
||||||
conn.Open();
|
|
||||||
using (var cmd = conn.CreateCommand())
|
using (var cmd = conn.CreateCommand())
|
||||||
{
|
{
|
||||||
cmd.CommandText = "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='lists'";
|
cmd.CommandText = "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='lists'";
|
||||||
@@ -44,7 +55,6 @@ public class ClaudeDoDbContext : DbContext
|
|||||||
|
|
||||||
if (hasLists && !hasHistory)
|
if (hasLists && !hasHistory)
|
||||||
{
|
{
|
||||||
// Create the history table and mark InitialCreate as applied.
|
|
||||||
cmd.CommandText = """
|
cmd.CommandText = """
|
||||||
CREATE TABLE "__EFMigrationsHistory" (
|
CREATE TABLE "__EFMigrationsHistory" (
|
||||||
"MigrationId" TEXT NOT NULL CONSTRAINT "PK___EFMigrationsHistory" PRIMARY KEY,
|
"MigrationId" TEXT NOT NULL CONSTRAINT "PK___EFMigrationsHistory" PRIMARY KEY,
|
||||||
@@ -56,9 +66,12 @@ public class ClaudeDoDbContext : DbContext
|
|||||||
cmd.ExecuteNonQuery();
|
cmd.ExecuteNonQuery();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
conn.Close();
|
conn.Close();
|
||||||
|
}
|
||||||
|
|
||||||
db.Database.Migrate();
|
db.Database.Migrate();
|
||||||
db.Database.ExecuteSqlRaw("PRAGMA journal_mode=WAL");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,8 @@ public sealed class ListRepository
|
|||||||
|
|
||||||
public async Task AddTagAsync(string listId, long tagId, CancellationToken ct = default)
|
public async Task AddTagAsync(string listId, long tagId, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var list = await _context.Lists.Include(l => l.Tags).FirstAsync(l => l.Id == listId, ct);
|
var list = await _context.Lists.Include(l => l.Tags).FirstOrDefaultAsync(l => l.Id == listId, ct);
|
||||||
|
if (list is null) return;
|
||||||
var tag = await _context.Tags.FindAsync([tagId], ct);
|
var tag = await _context.Tags.FindAsync([tagId], ct);
|
||||||
if (tag is not null && !list.Tags.Any(t => t.Id == tagId))
|
if (tag is not null && !list.Tags.Any(t => t.Id == tagId))
|
||||||
{
|
{
|
||||||
@@ -57,7 +58,8 @@ public sealed class ListRepository
|
|||||||
|
|
||||||
public async Task RemoveTagAsync(string listId, long tagId, CancellationToken ct = default)
|
public async Task RemoveTagAsync(string listId, long tagId, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var list = await _context.Lists.Include(l => l.Tags).FirstAsync(l => l.Id == listId, ct);
|
var list = await _context.Lists.Include(l => l.Tags).FirstOrDefaultAsync(l => l.Id == listId, ct);
|
||||||
|
if (list is null) return;
|
||||||
var tag = list.Tags.FirstOrDefault(t => t.Id == tagId);
|
var tag = list.Tags.FirstOrDefault(t => t.Id == tagId);
|
||||||
if (tag is not null)
|
if (tag is not null)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -104,7 +104,8 @@ public sealed class TaskRepository
|
|||||||
|
|
||||||
public async Task AddTagAsync(string taskId, long tagId, CancellationToken ct = default)
|
public async Task AddTagAsync(string taskId, long tagId, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var task = await _context.Tasks.Include(t => t.Tags).FirstAsync(t => t.Id == taskId, ct);
|
var task = await _context.Tasks.Include(t => t.Tags).FirstOrDefaultAsync(t => t.Id == taskId, ct);
|
||||||
|
if (task is null) return;
|
||||||
var tag = await _context.Tags.FindAsync([tagId], ct);
|
var tag = await _context.Tags.FindAsync([tagId], ct);
|
||||||
if (tag is not null && !task.Tags.Any(t => t.Id == tagId))
|
if (tag is not null && !task.Tags.Any(t => t.Id == tagId))
|
||||||
{
|
{
|
||||||
@@ -115,7 +116,8 @@ public sealed class TaskRepository
|
|||||||
|
|
||||||
public async Task RemoveTagAsync(string taskId, long tagId, CancellationToken ct = default)
|
public async Task RemoveTagAsync(string taskId, long tagId, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
var task = await _context.Tasks.Include(t => t.Tags).FirstAsync(t => t.Id == taskId, ct);
|
var task = await _context.Tasks.Include(t => t.Tags).FirstOrDefaultAsync(t => t.Id == taskId, ct);
|
||||||
|
if (task is null) return;
|
||||||
var tag = task.Tags.FirstOrDefault(t => t.Id == tagId);
|
var tag = task.Tags.FirstOrDefault(t => t.Id == tagId);
|
||||||
if (tag is not null)
|
if (tag is not null)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ public sealed class InstallContext
|
|||||||
public int SignalRPort { get; set; } = 47_821;
|
public int SignalRPort { get; set; } = 47_821;
|
||||||
public int QueueBackstopIntervalMs { get; set; } = 30_000;
|
public int QueueBackstopIntervalMs { get; set; } = 30_000;
|
||||||
public string ClaudeBin { get; set; } = "claude";
|
public string ClaudeBin { get; set; } = "claude";
|
||||||
public string ServiceAccount { get; set; } = "LocalSystem";
|
public string ServiceAccount { get; set; } = "CurrentUser";
|
||||||
public bool AutoStart { get; set; } = true;
|
public bool AutoStart { get; set; } = true;
|
||||||
public int RestartDelayMs { get; set; } = 5000;
|
public int RestartDelayMs { get; set; } = 5000;
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using ClaudeDo.Data;
|
||||||
using ClaudeDo.Installer.Core;
|
using ClaudeDo.Installer.Core;
|
||||||
|
|
||||||
namespace ClaudeDo.Installer.Steps;
|
namespace ClaudeDo.Installer.Steps;
|
||||||
@@ -10,13 +11,15 @@ public sealed class WriteConfigStep : IInstallStep
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
// Expand ~ to the installing user's absolute path so the worker
|
||||||
|
// service always finds the correct DB regardless of service account.
|
||||||
var workerCfg = new InstallerWorkerConfig
|
var workerCfg = new InstallerWorkerConfig
|
||||||
{
|
{
|
||||||
DbPath = ctx.DbPath,
|
DbPath = Paths.Expand(ctx.DbPath),
|
||||||
SandboxRoot = ctx.SandboxRoot,
|
SandboxRoot = Paths.Expand(ctx.SandboxRoot),
|
||||||
LogRoot = ctx.LogRoot,
|
LogRoot = Paths.Expand(ctx.LogRoot),
|
||||||
WorktreeRootStrategy = ctx.WorktreeRootStrategy,
|
WorktreeRootStrategy = ctx.WorktreeRootStrategy,
|
||||||
CentralWorktreeRoot = ctx.CentralWorktreeRoot,
|
CentralWorktreeRoot = Paths.Expand(ctx.CentralWorktreeRoot),
|
||||||
QueueBackstopIntervalMs = ctx.QueueBackstopIntervalMs,
|
QueueBackstopIntervalMs = ctx.QueueBackstopIntervalMs,
|
||||||
SignalRPort = ctx.SignalRPort,
|
SignalRPort = ctx.SignalRPort,
|
||||||
ClaudeBin = ctx.ClaudeBin,
|
ClaudeBin = ctx.ClaudeBin,
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ using System.Collections.ObjectModel;
|
|||||||
using Avalonia.Threading;
|
using Avalonia.Threading;
|
||||||
using ClaudeDo.Data.Models;
|
using ClaudeDo.Data.Models;
|
||||||
using CommunityToolkit.Mvvm.ComponentModel;
|
using CommunityToolkit.Mvvm.ComponentModel;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.AspNetCore.SignalR.Client;
|
using Microsoft.AspNetCore.SignalR.Client;
|
||||||
|
|
||||||
namespace ClaudeDo.Ui.Services;
|
namespace ClaudeDo.Ui.Services;
|
||||||
@@ -208,9 +209,13 @@ public partial class WorkerClient : ObservableObject, IAsyncDisposable
|
|||||||
ActiveTasks.Add(new ActiveTask(a.Slot, a.TaskId, a.StartedAt));
|
ActiveTasks.Add(new ActiveTask(a.Slot, a.TaskId, a.StartedAt));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch
|
catch (HubException)
|
||||||
{
|
{
|
||||||
// Worker might not support GetActive yet
|
// Expected: worker doesn't support GetActive yet
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
System.Diagnostics.Debug.WriteLine($"SeedActiveTasksAsync failed: {ex}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
|
|
||||||
namespace ClaudeDo.Ui.ViewModels;
|
namespace ClaudeDo.Ui.ViewModels;
|
||||||
|
|
||||||
public partial class MainWindowViewModel : ViewModelBase
|
public partial class MainWindowViewModel : ViewModelBase, IDisposable
|
||||||
{
|
{
|
||||||
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
|
||||||
private readonly WorkerClient _worker;
|
private readonly WorkerClient _worker;
|
||||||
@@ -27,6 +27,8 @@ public partial class MainWindowViewModel : ViewModelBase
|
|||||||
public TaskDetailViewModel TaskDetail { get; }
|
public TaskDetailViewModel TaskDetail { get; }
|
||||||
public StatusBarViewModel StatusBar { get; }
|
public StatusBarViewModel StatusBar { get; }
|
||||||
|
|
||||||
|
private readonly Action<string> _onTaskChanged;
|
||||||
|
|
||||||
public MainWindowViewModel(
|
public MainWindowViewModel(
|
||||||
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
IDbContextFactory<ClaudeDoDbContext> dbFactory,
|
||||||
WorkerClient worker,
|
WorkerClient worker,
|
||||||
@@ -42,8 +44,15 @@ public partial class MainWindowViewModel : ViewModelBase
|
|||||||
TaskDetail = taskDetail;
|
TaskDetail = taskDetail;
|
||||||
StatusBar = statusBar;
|
StatusBar = statusBar;
|
||||||
|
|
||||||
|
_onTaskChanged = taskId => _ = TaskList.RefreshSingleAsync(taskId);
|
||||||
TaskList.SelectedTaskChanged += OnSelectedTaskChanged;
|
TaskList.SelectedTaskChanged += OnSelectedTaskChanged;
|
||||||
TaskDetail.TaskChanged += taskId => _ = TaskList.RefreshSingleAsync(taskId);
|
TaskDetail.TaskChanged += _onTaskChanged;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
TaskList.SelectedTaskChanged -= OnSelectedTaskChanged;
|
||||||
|
TaskDetail.TaskChanged -= _onTaskChanged;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task InitializeAsync()
|
public async Task InitializeAsync()
|
||||||
@@ -61,7 +70,11 @@ public partial class MainWindowViewModel : ViewModelBase
|
|||||||
StatusBar.ShowMessage($"Error loading lists: {ex.Message}");
|
StatusBar.ShowMessage($"Error loading lists: {ex.Message}");
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = _worker.StartAsync();
|
_ = _worker.StartAsync().ContinueWith(t =>
|
||||||
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
System.Diagnostics.Debug.WriteLine($"Worker connection failed: {t.Exception?.Message}");
|
||||||
|
}, TaskScheduler.Default);
|
||||||
}
|
}
|
||||||
|
|
||||||
partial void OnSelectedListChanged(ListItemViewModel? value)
|
partial void OnSelectedListChanged(ListItemViewModel? value)
|
||||||
@@ -154,23 +167,46 @@ public partial class MainWindowViewModel : ViewModelBase
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[ObservableProperty] private bool _isDeleteConfirmVisible;
|
||||||
|
private ListItemViewModel? _pendingDeleteList;
|
||||||
|
|
||||||
[RelayCommand]
|
[RelayCommand]
|
||||||
private async Task DeleteList()
|
private void DeleteList()
|
||||||
{
|
{
|
||||||
if (SelectedList is null) return;
|
if (SelectedList is null) return;
|
||||||
// TODO: confirmation dialog
|
_pendingDeleteList = SelectedList;
|
||||||
|
IsDeleteConfirmVisible = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
[RelayCommand]
|
||||||
|
private async Task ConfirmDeleteList()
|
||||||
|
{
|
||||||
|
IsDeleteConfirmVisible = false;
|
||||||
|
if (_pendingDeleteList is null) return;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
using var context = _dbFactory.CreateDbContext();
|
using var context = _dbFactory.CreateDbContext();
|
||||||
var listRepo = new ListRepository(context);
|
var listRepo = new ListRepository(context);
|
||||||
await listRepo.DeleteAsync(SelectedList.Id);
|
await listRepo.DeleteAsync(_pendingDeleteList.Id);
|
||||||
Lists.Remove(SelectedList);
|
Lists.Remove(_pendingDeleteList);
|
||||||
|
if (SelectedList == _pendingDeleteList)
|
||||||
SelectedList = null;
|
SelectedList = null;
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
StatusBar.ShowMessage($"Error deleting list: {ex.Message}");
|
StatusBar.ShowMessage($"Error deleting list: {ex.Message}");
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_pendingDeleteList = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[RelayCommand]
|
||||||
|
private void CancelDeleteList()
|
||||||
|
{
|
||||||
|
IsDeleteConfirmVisible = false;
|
||||||
|
_pendingDeleteList = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async Task ShowDialogAsync(Window dialog)
|
private static async Task ShowDialogAsync(Window dialog)
|
||||||
|
|||||||
@@ -282,16 +282,18 @@ public partial class TaskDetailViewModel : ViewModelBase
|
|||||||
if (e.PropertyName is not (nameof(SubtaskItemViewModel.Title) or nameof(SubtaskItemViewModel.Completed))) return;
|
if (e.PropertyName is not (nameof(SubtaskItemViewModel.Title) or nameof(SubtaskItemViewModel.Completed))) return;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
if (_taskId is null) return;
|
||||||
using var context = _dbFactory.CreateDbContext();
|
using var context = _dbFactory.CreateDbContext();
|
||||||
|
var orig = await context.Subtasks.AsNoTracking().FirstOrDefaultAsync(s => s.Id == vm.Id);
|
||||||
var subtaskRepo = new SubtaskRepository(context);
|
var subtaskRepo = new SubtaskRepository(context);
|
||||||
await subtaskRepo.UpdateAsync(new SubtaskEntity
|
await subtaskRepo.UpdateAsync(new SubtaskEntity
|
||||||
{
|
{
|
||||||
Id = vm.Id,
|
Id = vm.Id,
|
||||||
TaskId = _taskId ?? "",
|
TaskId = _taskId,
|
||||||
Title = vm.Title,
|
Title = vm.Title,
|
||||||
Completed = vm.Completed,
|
Completed = vm.Completed,
|
||||||
OrderNum = Subtasks.IndexOf(vm),
|
OrderNum = Subtasks.IndexOf(vm),
|
||||||
CreatedAt = DateTime.UtcNow,
|
CreatedAt = orig?.CreatedAt ?? DateTime.UtcNow,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@@ -378,13 +380,15 @@ public partial class TaskDetailViewModel : ViewModelBase
|
|||||||
UseShellExecute = true,
|
UseShellExecute = true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch { /* best effort */ }
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Debug.WriteLine($"Failed to open worktree: {ex.Message}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[RelayCommand]
|
[RelayCommand]
|
||||||
private void ShowDiff()
|
private void ShowDiff()
|
||||||
{
|
{
|
||||||
// TODO: open a proper diff viewer; for now open git diff in a console
|
|
||||||
if (WorktreePath is null) return;
|
if (WorktreePath is null) return;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -395,7 +399,10 @@ public partial class TaskDetailViewModel : ViewModelBase
|
|||||||
UseShellExecute = true,
|
UseShellExecute = true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch { /* best effort */ }
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Debug.WriteLine($"Failed to show diff: {ex.Message}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[RelayCommand]
|
[RelayCommand]
|
||||||
|
|||||||
@@ -215,7 +215,10 @@ public partial class TaskEditorViewModel : ViewModelBase
|
|||||||
{
|
{
|
||||||
if (vm.Id == "") continue;
|
if (vm.Id == "") continue;
|
||||||
if (vm.Title != vm.OriginalTitle || vm.Completed != vm.OriginalCompleted)
|
if (vm.Title != vm.OriginalTitle || vm.Completed != vm.OriginalCompleted)
|
||||||
await subtaskRepo.UpdateAsync(new SubtaskEntity { Id = vm.Id, TaskId = taskId, Title = vm.Title, Completed = vm.Completed, OrderNum = idx, CreatedAt = DateTime.UtcNow });
|
{
|
||||||
|
var origSub = existing.FirstOrDefault(e => e.Id == vm.Id);
|
||||||
|
await subtaskRepo.UpdateAsync(new SubtaskEntity { Id = vm.Id, TaskId = taskId, Title = vm.Title, Completed = vm.Completed, OrderNum = idx, CreatedAt = origSub?.CreatedAt ?? DateTime.UtcNow });
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// update order_num if position changed
|
// update order_num if position changed
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ using Microsoft.AspNetCore.SignalR;
|
|||||||
|
|
||||||
namespace ClaudeDo.Worker.Hub;
|
namespace ClaudeDo.Worker.Hub;
|
||||||
|
|
||||||
|
public record ActiveTaskDto(string Slot, string TaskId, DateTime StartedAt);
|
||||||
|
|
||||||
public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
||||||
{
|
{
|
||||||
private static readonly string Version =
|
private static readonly string Version =
|
||||||
@@ -12,19 +14,21 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
|
|
||||||
private readonly QueueService _queue;
|
private readonly QueueService _queue;
|
||||||
private readonly AgentFileService _agentService;
|
private readonly AgentFileService _agentService;
|
||||||
|
private readonly HubBroadcaster _broadcaster;
|
||||||
|
|
||||||
public WorkerHub(QueueService queue, AgentFileService agentService)
|
public WorkerHub(QueueService queue, AgentFileService agentService, HubBroadcaster broadcaster)
|
||||||
{
|
{
|
||||||
_queue = queue;
|
_queue = queue;
|
||||||
_agentService = agentService;
|
_agentService = agentService;
|
||||||
|
_broadcaster = broadcaster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public string Ping() => $"pong v{Version}";
|
public string Ping() => $"pong v{Version}";
|
||||||
|
|
||||||
public IReadOnlyList<object> GetActive()
|
public IReadOnlyList<ActiveTaskDto> GetActive()
|
||||||
{
|
{
|
||||||
return _queue.GetActive()
|
return _queue.GetActive()
|
||||||
.Select(a => (object)new { slot = a.slot, taskId = a.taskId, startedAt = a.startedAt })
|
.Select(a => new ActiveTaskDto(a.slot, a.taskId, a.startedAt))
|
||||||
.ToList();
|
.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,6 +37,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _queue.RunNow(taskId);
|
await _queue.RunNow(taskId);
|
||||||
|
await _broadcaster.RunCreated(taskId, 1, false);
|
||||||
}
|
}
|
||||||
catch (InvalidOperationException)
|
catch (InvalidOperationException)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -58,11 +58,13 @@ public sealed class QueueService : BackgroundService
|
|||||||
|
|
||||||
public async Task RunNow(string taskId)
|
public async Task RunNow(string taskId)
|
||||||
{
|
{
|
||||||
using var context = _dbFactory.CreateDbContext();
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
var taskRepo = new TaskRepository(context);
|
var taskRepo = new TaskRepository(context);
|
||||||
var task = await taskRepo.GetByIdAsync(taskId);
|
var exists = await taskRepo.GetByIdAsync(taskId);
|
||||||
if (task is null)
|
if (exists is null)
|
||||||
throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
|
}
|
||||||
|
|
||||||
lock (_lock)
|
lock (_lock)
|
||||||
{
|
{
|
||||||
@@ -72,8 +74,10 @@ public sealed class QueueService : BackgroundService
|
|||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
_ = RunInSlotAsync(task, "override", cts.Token).ContinueWith(_ =>
|
_ = RunInSlotAsync(taskId, "override", cts.Token).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId}", taskId);
|
||||||
lock (_lock) { _overrideSlot = null; }
|
lock (_lock) { _overrideSlot = null; }
|
||||||
cts.Dispose();
|
cts.Dispose();
|
||||||
}, TaskScheduler.Default);
|
}, TaskScheduler.Default);
|
||||||
@@ -98,8 +102,10 @@ public sealed class QueueService : BackgroundService
|
|||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
_overrideSlot = new QueueSlotState { TaskId = taskId, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
_ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(_ =>
|
_ = RunContinueInSlotAsync(taskId, followUpPrompt, cts.Token).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
_logger.LogError(t.Exception, "RunContinueInSlotAsync failed for task {TaskId}", taskId);
|
||||||
lock (_lock) { _overrideSlot = null; }
|
lock (_lock) { _overrideSlot = null; }
|
||||||
cts.Dispose();
|
cts.Dispose();
|
||||||
}, TaskScheduler.Default);
|
}, TaskScheduler.Default);
|
||||||
@@ -165,8 +171,10 @@ public sealed class QueueService : BackgroundService
|
|||||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||||
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
_queueSlot = new QueueSlotState { TaskId = task.Id, StartedAt = DateTime.UtcNow, Cts = cts };
|
||||||
|
|
||||||
_ = RunInSlotAsync(task, "queue", cts.Token).ContinueWith(_ =>
|
_ = RunInSlotAsync(task.Id, "queue", cts.Token).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
|
if (t.IsFaulted)
|
||||||
|
_logger.LogError(t.Exception, "RunInSlotAsync failed for task {TaskId} in queue slot", task.Id);
|
||||||
lock (_lock) { _queueSlot = null; }
|
lock (_lock) { _queueSlot = null; }
|
||||||
cts.Dispose();
|
cts.Dispose();
|
||||||
WakeQueue(); // Check for next task immediately.
|
WakeQueue(); // Check for next task immediately.
|
||||||
@@ -186,16 +194,25 @@ public sealed class QueueService : BackgroundService
|
|||||||
_logger.LogInformation("QueueService stopping");
|
_logger.LogInformation("QueueService stopping");
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunInSlotAsync(TaskEntity task, string slot, CancellationToken ct)
|
private async Task RunInSlotAsync(string taskId, string slot, CancellationToken ct)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Starting task {TaskId} in {Slot} slot", task.Id, slot);
|
_logger.LogInformation("Starting task {TaskId} in {Slot} slot", taskId, slot);
|
||||||
|
|
||||||
|
TaskEntity task;
|
||||||
|
using (var context = _dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
var taskRepo = new TaskRepository(context);
|
||||||
|
task = await taskRepo.GetByIdAsync(taskId, ct)
|
||||||
|
?? throw new KeyNotFoundException($"Task '{taskId}' not found.");
|
||||||
|
}
|
||||||
|
|
||||||
await _runner.RunAsync(task, slot, ct);
|
await _runner.RunAsync(task, slot, ct);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "Slot runner error for task {TaskId}", task.Id);
|
_logger.LogError(ex, "Slot runner error for task {TaskId}", taskId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user