feat(data,worker): add repositories, stale-task recovery, and test foundation

Data: TagRepository, ListRepository, TaskRepository (incl. queue selection via
effective agent tag + scheduled_for filter), WorktreeRepository. All CRUD via
parameterized SqliteCommand; enums roundtrip as lowercase strings matching the
schema CHECK constraints.

Worker: StaleTaskRecovery IHostedService flips running -> failed on startup and
marks the result column with a [stale] reason. All four repositories registered
as singletons.

Tests: DbFixture with temp-file SQLite + schema bootstrap, covering
TaskRepository (queue pick via list-tag and task-tag, schedule filter,
transitions, stale flip), ListRepository CRUD + junctions, and
StaleTaskRecovery. 14 tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-04-13 12:08:06 +02:00
parent f81ef02273
commit 9f51ff0b17
11 changed files with 989 additions and 0 deletions

View File

@@ -0,0 +1,289 @@
using ClaudeDo.Data.Models;
using Microsoft.Data.Sqlite;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Data.Repositories;
public sealed class TaskRepository
{
private readonly SqliteConnectionFactory _factory;
public TaskRepository(SqliteConnectionFactory factory) => _factory = factory;
#region Status mapping
private static string ToDb(TaskStatus s) => s switch
{
TaskStatus.Manual => "manual",
TaskStatus.Queued => "queued",
TaskStatus.Running => "running",
TaskStatus.Done => "done",
TaskStatus.Failed => "failed",
_ => throw new ArgumentOutOfRangeException(nameof(s)),
};
private static TaskStatus FromDb(string s) => s switch
{
"manual" => TaskStatus.Manual,
"queued" => TaskStatus.Queued,
"running" => TaskStatus.Running,
"done" => TaskStatus.Done,
"failed" => TaskStatus.Failed,
_ => throw new ArgumentOutOfRangeException(nameof(s)),
};
#endregion
#region CRUD
public async Task AddAsync(TaskEntity entity, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
INSERT INTO tasks (id, list_id, title, description, status, scheduled_for,
result, log_path, created_at, started_at, finished_at, commit_type)
VALUES (@id, @list_id, @title, @description, @status, @scheduled_for,
@result, @log_path, @created_at, @started_at, @finished_at, @commit_type)
""";
BindTask(cmd, entity);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task UpdateAsync(TaskEntity entity, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
UPDATE tasks SET list_id = @list_id, title = @title, description = @description,
status = @status, scheduled_for = @scheduled_for, result = @result,
log_path = @log_path, started_at = @started_at,
finished_at = @finished_at, commit_type = @commit_type
WHERE id = @id
""";
BindTask(cmd, entity);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task DeleteAsync(string taskId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DELETE FROM tasks WHERE id = @id";
cmd.Parameters.AddWithValue("@id", taskId);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task<TaskEntity?> GetByIdAsync(string taskId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT id, list_id, title, description, status, scheduled_for, result, log_path, created_at, started_at, finished_at, commit_type FROM tasks WHERE id = @id";
cmd.Parameters.AddWithValue("@id", taskId);
await using var reader = await cmd.ExecuteReaderAsync(ct);
if (!await reader.ReadAsync(ct)) return null;
return ReadTask(reader);
}
public async Task<List<TaskEntity>> GetByListAsync(string listId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT id, list_id, title, description, status, scheduled_for, result, log_path, created_at, started_at, finished_at, commit_type FROM tasks WHERE list_id = @list_id ORDER BY created_at";
cmd.Parameters.AddWithValue("@list_id", listId);
await using var reader = await cmd.ExecuteReaderAsync(ct);
var result = new List<TaskEntity>();
while (await reader.ReadAsync(ct))
result.Add(ReadTask(reader));
return result;
}
#endregion
#region Tag junction
public async Task<List<TagEntity>> GetTagsAsync(string taskId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT t.id, t.name FROM tags t
JOIN task_tags tt ON tt.tag_id = t.id
WHERE tt.task_id = @task_id
""";
cmd.Parameters.AddWithValue("@task_id", taskId);
await using var reader = await cmd.ExecuteReaderAsync(ct);
var result = new List<TagEntity>();
while (await reader.ReadAsync(ct))
result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) });
return result;
}
public async Task AddTagAsync(string taskId, long tagId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "INSERT OR IGNORE INTO task_tags (task_id, tag_id) VALUES (@task_id, @tag_id)";
cmd.Parameters.AddWithValue("@task_id", taskId);
cmd.Parameters.AddWithValue("@tag_id", tagId);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task RemoveTagAsync(string taskId, long tagId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DELETE FROM task_tags WHERE task_id = @task_id AND tag_id = @tag_id";
cmd.Parameters.AddWithValue("@task_id", taskId);
cmd.Parameters.AddWithValue("@tag_id", tagId);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task<List<TagEntity>> GetEffectiveTagsAsync(string taskId, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT DISTINCT t.id, t.name FROM tags t
WHERE t.id IN (
SELECT tag_id FROM task_tags WHERE task_id = @task_id
UNION
SELECT lt.tag_id FROM list_tags lt
JOIN tasks tk ON tk.list_id = lt.list_id
WHERE tk.id = @task_id
)
""";
cmd.Parameters.AddWithValue("@task_id", taskId);
await using var reader = await cmd.ExecuteReaderAsync(ct);
var result = new List<TagEntity>();
while (await reader.ReadAsync(ct))
result.Add(new TagEntity { Id = reader.GetInt64(0), Name = reader.GetString(1) });
return result;
}
#endregion
#region Queue selection
public async Task<TaskEntity?> GetNextQueuedAgentTaskAsync(DateTime now, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT t.id, t.list_id, t.title, t.description, t.status, t.scheduled_for,
t.result, t.log_path, t.created_at, t.started_at, t.finished_at, t.commit_type
FROM tasks t
WHERE t.status = 'queued'
AND (t.scheduled_for IS NULL OR t.scheduled_for <= @now)
AND EXISTS (
SELECT 1 FROM task_tags tt
JOIN tags tg ON tg.id = tt.tag_id
WHERE tt.task_id = t.id AND tg.name = 'agent'
UNION
SELECT 1 FROM list_tags lt
JOIN tags tg ON tg.id = lt.tag_id
WHERE lt.list_id = t.list_id AND tg.name = 'agent'
)
ORDER BY t.created_at ASC
LIMIT 1
""";
cmd.Parameters.AddWithValue("@now", now.ToString("o"));
await using var reader = await cmd.ExecuteReaderAsync(ct);
if (!await reader.ReadAsync(ct)) return null;
return ReadTask(reader);
}
#endregion
#region Transitions
public async Task MarkRunningAsync(string taskId, DateTime startedAt, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE tasks SET status = 'running', started_at = @started_at WHERE id = @id";
cmd.Parameters.AddWithValue("@id", taskId);
cmd.Parameters.AddWithValue("@started_at", startedAt.ToString("o"));
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task MarkDoneAsync(string taskId, DateTime finishedAt, string? result, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE tasks SET status = 'done', finished_at = @finished_at, result = @result WHERE id = @id";
cmd.Parameters.AddWithValue("@id", taskId);
cmd.Parameters.AddWithValue("@finished_at", finishedAt.ToString("o"));
cmd.Parameters.AddWithValue("@result", (object?)result ?? DBNull.Value);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task MarkFailedAsync(string taskId, DateTime finishedAt, string? errorMarkdown, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE tasks SET status = 'failed', finished_at = @finished_at, result = @result WHERE id = @id";
cmd.Parameters.AddWithValue("@id", taskId);
cmd.Parameters.AddWithValue("@finished_at", finishedAt.ToString("o"));
cmd.Parameters.AddWithValue("@result", (object?)errorMarkdown ?? DBNull.Value);
await cmd.ExecuteNonQueryAsync(ct);
}
public async Task<int> FlipAllRunningToFailedAsync(string reason, CancellationToken ct = default)
{
await using var conn = _factory.Open();
await using var cmd = conn.CreateCommand();
cmd.CommandText = """
UPDATE tasks SET status = 'failed',
finished_at = @now,
result = '[stale] ' || @reason
WHERE status = 'running'
""";
cmd.Parameters.AddWithValue("@now", DateTime.UtcNow.ToString("o"));
cmd.Parameters.AddWithValue("@reason", reason);
return await cmd.ExecuteNonQueryAsync(ct);
}
#endregion
#region Helpers
private static void BindTask(SqliteCommand cmd, TaskEntity e)
{
cmd.Parameters.AddWithValue("@id", e.Id);
cmd.Parameters.AddWithValue("@list_id", e.ListId);
cmd.Parameters.AddWithValue("@title", e.Title);
cmd.Parameters.AddWithValue("@description", (object?)e.Description ?? DBNull.Value);
cmd.Parameters.AddWithValue("@status", ToDb(e.Status));
cmd.Parameters.AddWithValue("@scheduled_for", e.ScheduledFor.HasValue ? e.ScheduledFor.Value.ToString("o") : DBNull.Value);
cmd.Parameters.AddWithValue("@result", (object?)e.Result ?? DBNull.Value);
cmd.Parameters.AddWithValue("@log_path", (object?)e.LogPath ?? DBNull.Value);
cmd.Parameters.AddWithValue("@created_at", e.CreatedAt.ToString("o"));
cmd.Parameters.AddWithValue("@started_at", e.StartedAt.HasValue ? e.StartedAt.Value.ToString("o") : DBNull.Value);
cmd.Parameters.AddWithValue("@finished_at", e.FinishedAt.HasValue ? e.FinishedAt.Value.ToString("o") : DBNull.Value);
cmd.Parameters.AddWithValue("@commit_type", e.CommitType);
}
private static TaskEntity ReadTask(SqliteDataReader r) => new()
{
Id = r.GetString(0),
ListId = r.GetString(1),
Title = r.GetString(2),
Description = r.IsDBNull(3) ? null : r.GetString(3),
Status = FromDb(r.GetString(4)),
ScheduledFor = r.IsDBNull(5) ? null : DateTime.Parse(r.GetString(5)),
Result = r.IsDBNull(6) ? null : r.GetString(6),
LogPath = r.IsDBNull(7) ? null : r.GetString(7),
CreatedAt = DateTime.Parse(r.GetString(8)),
StartedAt = r.IsDBNull(9) ? null : DateTime.Parse(r.GetString(9)),
FinishedAt = r.IsDBNull(10) ? null : DateTime.Parse(r.GetString(10)),
CommitType = r.GetString(11),
};
#endregion
}