diff --git a/src/ClaudeDo.Data/Repositories/ListRepository.cs b/src/ClaudeDo.Data/Repositories/ListRepository.cs new file mode 100644 index 0000000..eb185ae --- /dev/null +++ b/src/ClaudeDo.Data/Repositories/ListRepository.cs @@ -0,0 +1,124 @@ +using ClaudeDo.Data.Models; +using Microsoft.Data.Sqlite; + +namespace ClaudeDo.Data.Repositories; + +public sealed class ListRepository +{ + private readonly SqliteConnectionFactory _factory; + + public ListRepository(SqliteConnectionFactory factory) => _factory = factory; + + public async Task AddAsync(ListEntity entity, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + INSERT INTO lists (id, name, created_at, working_dir, default_commit_type) + VALUES (@id, @name, @created_at, @working_dir, @default_commit_type) + """; + cmd.Parameters.AddWithValue("@id", entity.Id); + cmd.Parameters.AddWithValue("@name", entity.Name); + cmd.Parameters.AddWithValue("@created_at", entity.CreatedAt.ToString("o")); + cmd.Parameters.AddWithValue("@working_dir", (object?)entity.WorkingDir ?? DBNull.Value); + cmd.Parameters.AddWithValue("@default_commit_type", entity.DefaultCommitType); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task UpdateAsync(ListEntity entity, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + UPDATE lists SET name = @name, working_dir = @working_dir, + default_commit_type = @default_commit_type + WHERE id = @id + """; + cmd.Parameters.AddWithValue("@id", entity.Id); + cmd.Parameters.AddWithValue("@name", entity.Name); + cmd.Parameters.AddWithValue("@working_dir", (object?)entity.WorkingDir ?? DBNull.Value); + cmd.Parameters.AddWithValue("@default_commit_type", entity.DefaultCommitType); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task DeleteAsync(string listId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "DELETE FROM lists WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", listId); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task GetByIdAsync(string listId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, name, created_at, working_dir, default_commit_type FROM lists WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", listId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + if (!await reader.ReadAsync(ct)) return null; + return ReadList(reader); + } + + public async Task> GetAllAsync(CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, name, created_at, working_dir, default_commit_type FROM lists ORDER BY created_at"; + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(ReadList(reader)); + return result; + } + + public async Task> GetTagsAsync(string listId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT t.id, t.name FROM tags t + JOIN list_tags lt ON lt.tag_id = t.id + WHERE lt.list_id = @list_id + """; + cmd.Parameters.AddWithValue("@list_id", listId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) }); + return result; + } + + public async Task AddTagAsync(string listId, long tagId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT OR IGNORE INTO list_tags (list_id, tag_id) VALUES (@list_id, @tag_id)"; + cmd.Parameters.AddWithValue("@list_id", listId); + cmd.Parameters.AddWithValue("@tag_id", tagId); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task RemoveTagAsync(string listId, long tagId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "DELETE FROM list_tags WHERE list_id = @list_id AND tag_id = @tag_id"; + cmd.Parameters.AddWithValue("@list_id", listId); + cmd.Parameters.AddWithValue("@tag_id", tagId); + await cmd.ExecuteNonQueryAsync(ct); + } + + private static ListEntity ReadList(SqliteDataReader reader) => new() + { + Id = reader.GetString(0), + Name = reader.GetString(1), + CreatedAt = DateTime.Parse(reader.GetString(2)), + WorkingDir = reader.IsDBNull(3) ? null : reader.GetString(3), + DefaultCommitType = reader.GetString(4), + }; +} diff --git a/src/ClaudeDo.Data/Repositories/TagRepository.cs b/src/ClaudeDo.Data/Repositories/TagRepository.cs new file mode 100644 index 0000000..d73d9b7 --- /dev/null +++ b/src/ClaudeDo.Data/Repositories/TagRepository.cs @@ -0,0 +1,33 @@ +using Microsoft.Data.Sqlite; + +namespace ClaudeDo.Data.Repositories; + +public sealed class TagRepository +{ + private readonly SqliteConnectionFactory _factory; + + public TagRepository(SqliteConnectionFactory factory) => _factory = factory; + + public async Task GetOrCreateAsync(string name, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + return await GetOrCreateAsync(conn, name, ct); + } + + public static async Task GetOrCreateAsync(SqliteConnection conn, string name, CancellationToken ct = default) + { + await using var sel = conn.CreateCommand(); + sel.CommandText = "SELECT id FROM tags WHERE name = @name"; + sel.Parameters.AddWithValue("@name", name); + + var existing = await sel.ExecuteScalarAsync(ct); + if (existing is not null) + return (long)existing; + + await using var ins = conn.CreateCommand(); + ins.CommandText = "INSERT INTO tags (name) VALUES (@name) RETURNING id"; + ins.Parameters.AddWithValue("@name", name); + + return (long)(await ins.ExecuteScalarAsync(ct))!; + } +} diff --git a/src/ClaudeDo.Data/Repositories/TaskRepository.cs b/src/ClaudeDo.Data/Repositories/TaskRepository.cs new file mode 100644 index 0000000..5bbb404 --- /dev/null +++ b/src/ClaudeDo.Data/Repositories/TaskRepository.cs @@ -0,0 +1,289 @@ +using ClaudeDo.Data.Models; +using Microsoft.Data.Sqlite; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Data.Repositories; + +public sealed class TaskRepository +{ + private readonly SqliteConnectionFactory _factory; + + public TaskRepository(SqliteConnectionFactory factory) => _factory = factory; + + #region Status mapping + + private static string ToDb(TaskStatus s) => s switch + { + TaskStatus.Manual => "manual", + TaskStatus.Queued => "queued", + TaskStatus.Running => "running", + TaskStatus.Done => "done", + TaskStatus.Failed => "failed", + _ => throw new ArgumentOutOfRangeException(nameof(s)), + }; + + private static TaskStatus FromDb(string s) => s switch + { + "manual" => TaskStatus.Manual, + "queued" => TaskStatus.Queued, + "running" => TaskStatus.Running, + "done" => TaskStatus.Done, + "failed" => TaskStatus.Failed, + _ => throw new ArgumentOutOfRangeException(nameof(s)), + }; + + #endregion + + #region CRUD + + public async Task AddAsync(TaskEntity entity, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + INSERT INTO tasks (id, list_id, title, description, status, scheduled_for, + result, log_path, created_at, started_at, finished_at, commit_type) + VALUES (@id, @list_id, @title, @description, @status, @scheduled_for, + @result, @log_path, @created_at, @started_at, @finished_at, @commit_type) + """; + BindTask(cmd, entity); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task UpdateAsync(TaskEntity entity, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + UPDATE tasks SET list_id = @list_id, title = @title, description = @description, + status = @status, scheduled_for = @scheduled_for, result = @result, + log_path = @log_path, started_at = @started_at, + finished_at = @finished_at, commit_type = @commit_type + WHERE id = @id + """; + BindTask(cmd, entity); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task DeleteAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "DELETE FROM tasks WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task GetByIdAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, list_id, title, description, status, scheduled_for, result, log_path, created_at, started_at, finished_at, commit_type FROM tasks WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + if (!await reader.ReadAsync(ct)) return null; + return ReadTask(reader); + } + + public async Task> GetByListAsync(string listId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT id, list_id, title, description, status, scheduled_for, result, log_path, created_at, started_at, finished_at, commit_type FROM tasks WHERE list_id = @list_id ORDER BY created_at"; + cmd.Parameters.AddWithValue("@list_id", listId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(ReadTask(reader)); + return result; + } + + #endregion + + #region Tag junction + + public async Task> GetTagsAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT t.id, t.name FROM tags t + JOIN task_tags tt ON tt.tag_id = t.id + WHERE tt.task_id = @task_id + """; + cmd.Parameters.AddWithValue("@task_id", taskId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) }); + return result; + } + + public async Task AddTagAsync(string taskId, long tagId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "INSERT OR IGNORE INTO task_tags (task_id, tag_id) VALUES (@task_id, @tag_id)"; + cmd.Parameters.AddWithValue("@task_id", taskId); + cmd.Parameters.AddWithValue("@tag_id", tagId); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task RemoveTagAsync(string taskId, long tagId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "DELETE FROM task_tags WHERE task_id = @task_id AND tag_id = @tag_id"; + cmd.Parameters.AddWithValue("@task_id", taskId); + cmd.Parameters.AddWithValue("@tag_id", tagId); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task> GetEffectiveTagsAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT DISTINCT t.id, t.name FROM tags t + WHERE t.id IN ( + SELECT tag_id FROM task_tags WHERE task_id = @task_id + UNION + SELECT lt.tag_id FROM list_tags lt + JOIN tasks tk ON tk.list_id = lt.list_id + WHERE tk.id = @task_id + ) + """; + cmd.Parameters.AddWithValue("@task_id", taskId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + var result = new List(); + while (await reader.ReadAsync(ct)) + result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) }); + return result; + } + + #endregion + + #region Queue selection + + public async Task GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT t.id, t.list_id, t.title, t.description, t.status, t.scheduled_for, + t.result, t.log_path, t.created_at, t.started_at, t.finished_at, t.commit_type + FROM tasks t + WHERE t.status = 'queued' + AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now) + AND EXISTS ( + SELECT 1 FROM task_tags tt + JOIN tags tg ON tg.id = tt.tag_id + WHERE tt.task_id = t.id AND tg.name = 'agent' + UNION + SELECT 1 FROM list_tags lt + JOIN tags tg ON tg.id = lt.tag_id + WHERE lt.list_id = t.list_id AND tg.name = 'agent' + ) + ORDER BY t.created_at ASC + LIMIT 1 + """; + cmd.Parameters.AddWithValue("@now", now.ToString("o")); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + if (!await reader.ReadAsync(ct)) return null; + return ReadTask(reader); + } + + #endregion + + #region Transitions + + public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE tasks SET status = 'running', started_at = @started_at WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + cmd.Parameters.AddWithValue("@started_at", startedAt.ToString("o")); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE tasks SET status = 'done', finished_at = @finished_at, result = @result WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + cmd.Parameters.AddWithValue("@finished_at", finishedAt.ToString("o")); + cmd.Parameters.AddWithValue("@result", (object?)result ?? DBNull.Value); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? errorMarkdown, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE tasks SET status = 'failed', finished_at = @finished_at, result = @result WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", taskId); + cmd.Parameters.AddWithValue("@finished_at", finishedAt.ToString("o")); + cmd.Parameters.AddWithValue("@result", (object?)errorMarkdown ?? DBNull.Value); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + UPDATE tasks SET status = 'failed', + finished_at = @now, + result = '[stale] ' || @reason + WHERE status = 'running' + """; + cmd.Parameters.AddWithValue("@now", DateTime.UtcNow.ToString("o")); + cmd.Parameters.AddWithValue("@reason", reason); + return await cmd.ExecuteNonQueryAsync(ct); + } + + #endregion + + #region Helpers + + private static void BindTask(SqliteCommand cmd, TaskEntity e) + { + cmd.Parameters.AddWithValue("@id", e.Id); + cmd.Parameters.AddWithValue("@list_id", e.ListId); + cmd.Parameters.AddWithValue("@title", e.Title); + cmd.Parameters.AddWithValue("@description", (object?)e.Description ?? DBNull.Value); + cmd.Parameters.AddWithValue("@status", ToDb(e.Status)); + cmd.Parameters.AddWithValue("@scheduled_for", e.ScheduledFor.HasValue ? e.ScheduledFor.Value.ToString("o") : DBNull.Value); + cmd.Parameters.AddWithValue("@result", (object?)e.Result ?? DBNull.Value); + cmd.Parameters.AddWithValue("@log_path", (object?)e.LogPath ?? DBNull.Value); + cmd.Parameters.AddWithValue("@created_at", e.CreatedAt.ToString("o")); + cmd.Parameters.AddWithValue("@started_at", e.StartedAt.HasValue ? e.StartedAt.Value.ToString("o") : DBNull.Value); + cmd.Parameters.AddWithValue("@finished_at", e.FinishedAt.HasValue ? e.FinishedAt.Value.ToString("o") : DBNull.Value); + cmd.Parameters.AddWithValue("@commit_type", e.CommitType); + } + + private static TaskEntity ReadTask(SqliteDataReader r) => new() + { + Id = r.GetString(0), + ListId = r.GetString(1), + Title = r.GetString(2), + Description = r.IsDBNull(3) ? null : r.GetString(3), + Status = FromDb(r.GetString(4)), + ScheduledFor = r.IsDBNull(5) ? null : DateTime.Parse(r.GetString(5)), + Result = r.IsDBNull(6) ? null : r.GetString(6), + LogPath = r.IsDBNull(7) ? null : r.GetString(7), + CreatedAt = DateTime.Parse(r.GetString(8)), + StartedAt = r.IsDBNull(9) ? null : DateTime.Parse(r.GetString(9)), + FinishedAt = r.IsDBNull(10) ? null : DateTime.Parse(r.GetString(10)), + CommitType = r.GetString(11), + }; + + #endregion +} diff --git a/src/ClaudeDo.Data/Repositories/WorktreeRepository.cs b/src/ClaudeDo.Data/Repositories/WorktreeRepository.cs new file mode 100644 index 0000000..ae96a54 --- /dev/null +++ b/src/ClaudeDo.Data/Repositories/WorktreeRepository.cs @@ -0,0 +1,102 @@ +using ClaudeDo.Data.Models; +using Microsoft.Data.Sqlite; + +namespace ClaudeDo.Data.Repositories; + +public sealed class WorktreeRepository +{ + private readonly SqliteConnectionFactory _factory; + + public WorktreeRepository(SqliteConnectionFactory factory) => _factory = factory; + + private static string ToDb(WorktreeState s) => s switch + { + WorktreeState.Active => "active", + WorktreeState.Merged => "merged", + WorktreeState.Discarded => "discarded", + WorktreeState.Kept => "kept", + _ => throw new ArgumentOutOfRangeException(nameof(s)), + }; + + private static WorktreeState FromDb(string s) => s switch + { + "active" => WorktreeState.Active, + "merged" => WorktreeState.Merged, + "discarded" => WorktreeState.Discarded, + "kept" => WorktreeState.Kept, + _ => throw new ArgumentOutOfRangeException(nameof(s)), + }; + + public async Task AddAsync(WorktreeEntity entity, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + INSERT INTO worktrees (task_id, path, branch_name, base_commit, head_commit, diff_stat, state, created_at) + VALUES (@task_id, @path, @branch_name, @base_commit, @head_commit, @diff_stat, @state, @created_at) + """; + cmd.Parameters.AddWithValue("@task_id", entity.TaskId); + cmd.Parameters.AddWithValue("@path", entity.Path); + cmd.Parameters.AddWithValue("@branch_name", entity.BranchName); + cmd.Parameters.AddWithValue("@base_commit", entity.BaseCommit); + cmd.Parameters.AddWithValue("@head_commit", (object?)entity.HeadCommit ?? DBNull.Value); + cmd.Parameters.AddWithValue("@diff_stat", (object?)entity.DiffStat ?? DBNull.Value); + cmd.Parameters.AddWithValue("@state", ToDb(entity.State)); + cmd.Parameters.AddWithValue("@created_at", entity.CreatedAt.ToString("o")); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task GetByTaskIdAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT task_id, path, branch_name, base_commit, head_commit, diff_stat, state, created_at FROM worktrees WHERE task_id = @task_id"; + cmd.Parameters.AddWithValue("@task_id", taskId); + + await using var reader = await cmd.ExecuteReaderAsync(ct); + if (!await reader.ReadAsync(ct)) return null; + return ReadWorktree(reader); + } + + public async Task UpdateHeadAsync(string taskId, string headCommit, string? diffStat, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE worktrees SET head_commit = @head_commit, diff_stat = @diff_stat WHERE task_id = @task_id"; + cmd.Parameters.AddWithValue("@task_id", taskId); + cmd.Parameters.AddWithValue("@head_commit", headCommit); + cmd.Parameters.AddWithValue("@diff_stat", (object?)diffStat ?? DBNull.Value); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task SetStateAsync(string taskId, WorktreeState state, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE worktrees SET state = @state WHERE task_id = @task_id"; + cmd.Parameters.AddWithValue("@task_id", taskId); + cmd.Parameters.AddWithValue("@state", ToDb(state)); + await cmd.ExecuteNonQueryAsync(ct); + } + + public async Task DeleteAsync(string taskId, CancellationToken ct = default) + { + await using var conn = _factory.Open(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "DELETE FROM worktrees WHERE task_id = @task_id"; + cmd.Parameters.AddWithValue("@task_id", taskId); + await cmd.ExecuteNonQueryAsync(ct); + } + + private static WorktreeEntity ReadWorktree(SqliteDataReader r) => new() + { + TaskId = r.GetString(0), + Path = r.GetString(1), + BranchName = r.GetString(2), + BaseCommit = r.GetString(3), + HeadCommit = r.IsDBNull(4) ? null : r.GetString(4), + DiffStat = r.IsDBNull(5) ? null : r.GetString(5), + State = FromDb(r.GetString(6)), + CreatedAt = DateTime.Parse(r.GetString(7)), + }; +} diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 55386fb..a3e06ac 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -1,6 +1,8 @@ using ClaudeDo.Data; +using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Hub; +using ClaudeDo.Worker.Services; var cfg = WorkerConfig.Load(); @@ -12,6 +14,11 @@ SchemaInitializer.Apply(dbFactory); builder.Services.AddSingleton(cfg); builder.Services.AddSingleton(dbFactory); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(); builder.Services.AddSignalR(); // Loopback-only bind. Firewall is irrelevant for 127.0.0.1. diff --git a/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs b/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs new file mode 100644 index 0000000..aa43d66 --- /dev/null +++ b/src/ClaudeDo.Worker/Services/StaleTaskRecovery.cs @@ -0,0 +1,26 @@ +using ClaudeDo.Data.Repositories; + +namespace ClaudeDo.Worker.Services; + +public sealed class StaleTaskRecovery : IHostedService +{ + private readonly TaskRepository _tasks; + private readonly ILogger _logger; + + public StaleTaskRecovery(TaskRepository tasks, ILogger logger) + { + _tasks = tasks; + _logger = logger; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + var flipped = await _tasks.FlipAllRunningToFailedAsync("worker restart", cancellationToken); + if (flipped > 0) + _logger.LogWarning("Stale task recovery: flipped {Count} running task(s) to failed", flipped); + else + _logger.LogInformation("Stale task recovery: no stale tasks found"); + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} diff --git a/tests/ClaudeDo.Worker.Tests/ClaudeDo.Worker.Tests.csproj b/tests/ClaudeDo.Worker.Tests/ClaudeDo.Worker.Tests.csproj index 21410a0..bfa12a3 100644 --- a/tests/ClaudeDo.Worker.Tests/ClaudeDo.Worker.Tests.csproj +++ b/tests/ClaudeDo.Worker.Tests/ClaudeDo.Worker.Tests.csproj @@ -11,6 +11,7 @@ + diff --git a/tests/ClaudeDo.Worker.Tests/Infrastructure/DbFixture.cs b/tests/ClaudeDo.Worker.Tests/Infrastructure/DbFixture.cs new file mode 100644 index 0000000..ebd8ff5 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Infrastructure/DbFixture.cs @@ -0,0 +1,23 @@ +using ClaudeDo.Data; + +namespace ClaudeDo.Worker.Tests.Infrastructure; + +public sealed class DbFixture : IDisposable +{ + public string DbPath { get; } + public SqliteConnectionFactory Factory { get; } + + public DbFixture() + { + DbPath = Path.Combine(Path.GetTempPath(), $"claudedo_test_{Guid.NewGuid():N}.db"); + Factory = new SqliteConnectionFactory(DbPath); + SchemaInitializer.Apply(Factory); + } + + public void Dispose() + { + try { File.Delete(DbPath); } catch { /* best effort */ } + try { File.Delete(DbPath + "-wal"); } catch { } + try { File.Delete(DbPath + "-shm"); } catch { } + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Repositories/ListRepositoryTests.cs b/tests/ClaudeDo.Worker.Tests/Repositories/ListRepositoryTests.cs new file mode 100644 index 0000000..f1e931d --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Repositories/ListRepositoryTests.cs @@ -0,0 +1,107 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Tests.Infrastructure; + +namespace ClaudeDo.Worker.Tests.Repositories; + +public sealed class ListRepositoryTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly ListRepository _lists; + private readonly TagRepository _tags; + + public ListRepositoryTests() + { + _lists = new ListRepository(_db.Factory); + _tags = new TagRepository(_db.Factory); + } + + public void Dispose() => _db.Dispose(); + + [Fact] + public async Task AddAsync_And_GetByIdAsync_Roundtrips() + { + var entity = new ListEntity + { + Id = Guid.NewGuid().ToString(), + Name = "Shopping", + CreatedAt = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + WorkingDir = @"C:\Repos\Test", + DefaultCommitType = "feat", + }; + + await _lists.AddAsync(entity); + var loaded = await _lists.GetByIdAsync(entity.Id); + + Assert.NotNull(loaded); + Assert.Equal(entity.Id, loaded.Id); + Assert.Equal(entity.Name, loaded.Name); + Assert.Equal(entity.WorkingDir, loaded.WorkingDir); + Assert.Equal(entity.DefaultCommitType, loaded.DefaultCommitType); + } + + [Fact] + public async Task UpdateAsync_Changes_Fields() + { + var entity = new ListEntity + { + Id = Guid.NewGuid().ToString(), + Name = "Original", + CreatedAt = DateTime.UtcNow, + }; + await _lists.AddAsync(entity); + + entity.Name = "Updated"; + entity.WorkingDir = @"C:\New"; + await _lists.UpdateAsync(entity); + + var loaded = await _lists.GetByIdAsync(entity.Id); + Assert.Equal("Updated", loaded!.Name); + Assert.Equal(@"C:\New", loaded.WorkingDir); + } + + [Fact] + public async Task DeleteAsync_Removes_List() + { + var entity = new ListEntity + { + Id = Guid.NewGuid().ToString(), + Name = "ToDelete", + CreatedAt = DateTime.UtcNow, + }; + await _lists.AddAsync(entity); + await _lists.DeleteAsync(entity.Id); + + var loaded = await _lists.GetByIdAsync(entity.Id); + Assert.Null(loaded); + } + + [Fact] + public async Task GetAllAsync_Returns_All_Lists() + { + var a = new ListEntity { Id = Guid.NewGuid().ToString(), Name = "A", CreatedAt = DateTime.UtcNow.AddMinutes(-1) }; + var b = new ListEntity { Id = Guid.NewGuid().ToString(), Name = "B", CreatedAt = DateTime.UtcNow }; + await _lists.AddAsync(a); + await _lists.AddAsync(b); + + var all = await _lists.GetAllAsync(); + Assert.True(all.Count >= 2); + } + + [Fact] + public async Task TagJunction_AddAndRemove() + { + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity { Id = listId, Name = "Tagged", CreatedAt = DateTime.UtcNow }); + var tagId = await _tags.GetOrCreateAsync("agent"); + + await _lists.AddTagAsync(listId, tagId); + var tags = await _lists.GetTagsAsync(listId); + Assert.Single(tags); + Assert.Equal("agent", tags[0].Name); + + await _lists.RemoveTagAsync(listId, tagId); + tags = await _lists.GetTagsAsync(listId); + Assert.Empty(tags); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs new file mode 100644 index 0000000..e99c376 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Repositories/TaskRepositoryTests.cs @@ -0,0 +1,217 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Tests.Infrastructure; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Repositories; + +public sealed class TaskRepositoryTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly TaskRepository _tasks; + private readonly ListRepository _lists; + private readonly TagRepository _tags; + + public TaskRepositoryTests() + { + _tasks = new TaskRepository(_db.Factory); + _lists = new ListRepository(_db.Factory); + _tags = new TagRepository(_db.Factory); + } + + public void Dispose() => _db.Dispose(); + + private async Task CreateListAsync(string? id = null) + { + var listId = id ?? Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity + { + Id = listId, + Name = "Test List", + CreatedAt = DateTime.UtcNow, + }); + return listId; + } + + private TaskEntity MakeTask(string listId, TaskStatus status = TaskStatus.Queued, DateTime? createdAt = null, DateTime? scheduledFor = null) => new() + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "Test Task", + Description = "A description", + Status = status, + ScheduledFor = scheduledFor, + CreatedAt = createdAt ?? DateTime.UtcNow, + CommitType = "feat", + }; + + [Fact] + public async Task AddAsync_Roundtrips_AllFields() + { + var listId = await CreateListAsync(); + var entity = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "My Task", + Description = "Desc", + Status = TaskStatus.Queued, + ScheduledFor = new DateTime(2026, 1, 1, 12, 0, 0, DateTimeKind.Utc), + Result = "some result", + LogPath = "/tmp/log.ndjson", + CreatedAt = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + StartedAt = new DateTime(2026, 1, 1, 1, 0, 0, DateTimeKind.Utc), + FinishedAt = new DateTime(2026, 1, 1, 2, 0, 0, DateTimeKind.Utc), + CommitType = "feat", + }; + + await _tasks.AddAsync(entity); + var loaded = await _tasks.GetByIdAsync(entity.Id); + + Assert.NotNull(loaded); + Assert.Equal(entity.Id, loaded.Id); + Assert.Equal(entity.ListId, loaded.ListId); + Assert.Equal(entity.Title, loaded.Title); + Assert.Equal(entity.Description, loaded.Description); + Assert.Equal(entity.Status, loaded.Status); + Assert.Equal(entity.ScheduledFor!.Value.Date, loaded.ScheduledFor!.Value.Date); + Assert.Equal(entity.Result, loaded.Result); + Assert.Equal(entity.LogPath, loaded.LogPath); + Assert.Equal(entity.CommitType, loaded.CommitType); + } + + [Fact] + public async Task GetNextQueuedAgentTaskAsync_Returns_OldestWithAgentTag_ViaTaskTag() + { + var listId = await CreateListAsync(); + var agentTagId = await _tags.GetOrCreateAsync("agent"); + + var older = MakeTask(listId, createdAt: DateTime.UtcNow.AddMinutes(-10)); + var newer = MakeTask(listId, createdAt: DateTime.UtcNow); + await _tasks.AddAsync(older); + await _tasks.AddAsync(newer); + await _tasks.AddTagAsync(older.Id, agentTagId); + await _tasks.AddTagAsync(newer.Id, agentTagId); + + var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); + Assert.NotNull(picked); + Assert.Equal(older.Id, picked.Id); + } + + [Fact] + public async Task GetNextQueuedAgentTaskAsync_Returns_TaskWithAgentTag_ViaListTag() + { + var listId = await CreateListAsync(); + var agentTagId = await _tags.GetOrCreateAsync("agent"); + await _lists.AddTagAsync(listId, agentTagId); + + var task = MakeTask(listId); + await _tasks.AddAsync(task); + + var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); + Assert.NotNull(picked); + Assert.Equal(task.Id, picked.Id); + } + + [Fact] + public async Task GetNextQueuedAgentTaskAsync_ReturnsNull_WhenNoAgentTag() + { + var listId = await CreateListAsync(); + var task = MakeTask(listId); + await _tasks.AddAsync(task); + + var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); + Assert.Null(picked); + } + + [Fact] + public async Task GetNextQueuedAgentTaskAsync_Skips_FutureScheduledFor() + { + var listId = await CreateListAsync(); + var agentTagId = await _tags.GetOrCreateAsync("agent"); + + var task = MakeTask(listId, scheduledFor: DateTime.UtcNow.AddHours(1)); + await _tasks.AddAsync(task); + await _tasks.AddTagAsync(task.Id, agentTagId); + + var picked = await _tasks.GetNextQueuedAgentTaskAsync(DateTime.UtcNow); + Assert.Null(picked); + } + + [Fact] + public async Task Transitions_MarkRunning_ThenMarkDone() + { + var listId = await CreateListAsync(); + var task = MakeTask(listId); + await _tasks.AddAsync(task); + + var startedAt = DateTime.UtcNow; + await _tasks.MarkRunningAsync(task.Id, startedAt); + var running = await _tasks.GetByIdAsync(task.Id); + Assert.Equal(TaskStatus.Running, running!.Status); + Assert.NotNull(running.StartedAt); + + var finishedAt = DateTime.UtcNow; + await _tasks.MarkDoneAsync(task.Id, finishedAt, "All good"); + var done = await _tasks.GetByIdAsync(task.Id); + Assert.Equal(TaskStatus.Done, done!.Status); + Assert.Equal("All good", done.Result); + Assert.NotNull(done.FinishedAt); + } + + [Fact] + public async Task FlipAllRunningToFailedAsync_FlipsOnlyRunningRows() + { + var listId = await CreateListAsync(); + var running1 = MakeTask(listId, status: TaskStatus.Running); + var running2 = MakeTask(listId, status: TaskStatus.Running); + var queued = MakeTask(listId, status: TaskStatus.Queued); + var done = MakeTask(listId, status: TaskStatus.Done); + + await _tasks.AddAsync(running1); + await _tasks.AddAsync(running2); + await _tasks.AddAsync(queued); + await _tasks.AddAsync(done); + + var flipped = await _tasks.FlipAllRunningToFailedAsync("worker restart"); + + Assert.Equal(2, flipped); + + var r1 = await _tasks.GetByIdAsync(running1.Id); + Assert.Equal(TaskStatus.Failed, r1!.Status); + Assert.StartsWith("[stale] ", r1.Result); + + var r2 = await _tasks.GetByIdAsync(running2.Id); + Assert.Equal(TaskStatus.Failed, r2!.Status); + + var q = await _tasks.GetByIdAsync(queued.Id); + Assert.Equal(TaskStatus.Queued, q!.Status); + + var d = await _tasks.GetByIdAsync(done.Id); + Assert.Equal(TaskStatus.Done, d!.Status); + } + + [Fact] + public async Task GetEffectiveTagsAsync_Returns_Union_Of_ListTags_And_TaskTags() + { + var listId = await CreateListAsync(); + var agentTagId = await _tags.GetOrCreateAsync("agent"); + var manualTagId = await _tags.GetOrCreateAsync("manual"); + var codeTagId = await TagRepository.GetOrCreateAsync(_db.Factory.Open(), "code"); + + await _lists.AddTagAsync(listId, agentTagId); + + var task = MakeTask(listId); + await _tasks.AddAsync(task); + await _tasks.AddTagAsync(task.Id, manualTagId); + await _tasks.AddTagAsync(task.Id, codeTagId); + + var effective = await _tasks.GetEffectiveTagsAsync(task.Id); + var names = effective.Select(t => t.Name).OrderBy(n => n).ToList(); + + Assert.Equal(3, names.Count); + Assert.Contains("agent", names); + Assert.Contains("code", names); + Assert.Contains("manual", names); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs b/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs new file mode 100644 index 0000000..20d54ca --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Services/StaleTaskRecoveryTests.cs @@ -0,0 +1,60 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Data.Repositories; +using ClaudeDo.Worker.Services; +using ClaudeDo.Worker.Tests.Infrastructure; +using Microsoft.Extensions.Logging.Abstractions; +using TaskStatus = ClaudeDo.Data.Models.TaskStatus; + +namespace ClaudeDo.Worker.Tests.Services; + +public sealed class StaleTaskRecoveryTests : IDisposable +{ + private readonly DbFixture _db = new(); + private readonly TaskRepository _tasks; + private readonly ListRepository _lists; + + public StaleTaskRecoveryTests() + { + _tasks = new TaskRepository(_db.Factory); + _lists = new ListRepository(_db.Factory); + } + + public void Dispose() => _db.Dispose(); + + [Fact] + public async Task StartAsync_Flips_Running_Tasks_To_Failed() + { + var listId = Guid.NewGuid().ToString(); + await _lists.AddAsync(new ListEntity { Id = listId, Name = "Test", CreatedAt = DateTime.UtcNow }); + + var running = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "Running task", + Status = TaskStatus.Running, + CreatedAt = DateTime.UtcNow, + }; + var queued = new TaskEntity + { + Id = Guid.NewGuid().ToString(), + ListId = listId, + Title = "Queued task", + Status = TaskStatus.Queued, + CreatedAt = DateTime.UtcNow, + }; + + await _tasks.AddAsync(running); + await _tasks.AddAsync(queued); + + var recovery = new StaleTaskRecovery(_tasks, NullLogger.Instance); + await recovery.StartAsync(CancellationToken.None); + + var r = await _tasks.GetByIdAsync(running.Id); + Assert.Equal(TaskStatus.Failed, r!.Status); + Assert.StartsWith("[stale] ", r.Result); + + var q = await _tasks.GetByIdAsync(queued.Id); + Assert.Equal(TaskStatus.Queued, q!.Status); + } +}