feat(worker): Online Inbox sync engine (Phase 1)

Optional, opt-in (online_inbox.enabled, default false → zero network).
Worker-side reconcile loop: pull web-created tasks down as Idle, push the
list catalog and the Idle backlog mirror up. Auth behind IOnlineAuthProvider
(StaticTokenAuthProvider default; ZitadelAuthProvider stubbed for Phase 2).
DPAPI refresh-token store. 35 tests, no real network/Zitadel/Claude.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
mika kuns
2026-06-10 09:55:20 +02:00
parent 8cbe1adb32
commit 1ac9ced0bd
22 changed files with 1196 additions and 0 deletions

View File

@@ -87,6 +87,22 @@ public sealed class TaskRepository
.ToListAsync(ct);
}
/// <summary>
/// Returns all tasks that qualify as "real" Idle backlog items for online mirroring:
/// Status==Idle, no parent, PlanningPhase==None, not blocked.
/// </summary>
public async Task<List<TaskEntity>> GetAllIdleBacklogAsync(CancellationToken ct = default)
{
return await _context.Tasks
.AsNoTracking()
.Where(t => t.Status == TaskStatus.Idle
&& t.ParentTaskId == null
&& t.PlanningPhase == PlanningPhase.None
&& t.BlockedByTaskId == null)
.OrderBy(t => t.SortOrder).ThenBy(t => t.CreatedAt)
.ToListAsync(ct);
}
#endregion
#region Status transitions

View File

@@ -19,6 +19,7 @@ Worker/
Hub/ — WorkerHub, HubBroadcaster
Report/ — ClaudeHistoryReader, WeekReportPromptBuilder, WeekReportService; interfaces in Report/Interfaces/
Prime/ — daily-prep ("Prime Claude"): PrimeScheduler (BackgroundService), PrimeRunner (runs the daily prep), DailyPrepPrompt (fixed prompt + CLI args + LogPath() helper), NextDueCalculator, PrimeScheduleSignal; interfaces in Prime/Interfaces/ (IPrimeRunner, IPrimeClock, IPrimeScheduleSignal, IPrimeBroadcaster)
Online/ — optional Online Inbox sync: OnlineInboxConfig (config record), Dtos (RemoteList/RemoteTask/MirrorTask), IOnlineInboxApi, OnlineInboxApiClient (typed HttpClient, bearer auth, HTTPS guard), OnlineTokenStore (DPAPI refresh-token store, Windows-only), StaticTokenAuthProvider (default/test IOnlineAuthProvider), ZitadelAuthProvider (stub — TODO(online-inbox) Phase 2), OnlineSyncService (BackgroundService: reconcile loop), OnlineBacklog (Idle-backlog filter/query); interface in Online/Interfaces/ (IOnlineAuthProvider)
```
Interfaces (e.g. `IQueueWaker`, `IPrimeClock`, `ITaskStateService`) live in an `Interfaces/` subfolder within their area; the namespace stays the area namespace.
@@ -165,6 +166,12 @@ Loaded from `~/.todo-app/worker.config.json`:
- `queue_backstop_interval_ms` (default 30000)
- `signalr_port` (default 47821)
- `claude_bin` (path to claude CLI)
- `online_inbox` — Online Inbox config (default: `enabled=false`, zero network when disabled):
- `enabled` (bool, default false) — when false the entire `Online/` stack is not registered
- `api_base_url` (string) — must be HTTPS or loopback; validated at startup when enabled
- `poll_interval_seconds` (int, default 60)
- `zitadel.authority`, `zitadel.client_id`, `zitadel.scopes` (Phase 2; not used until ZitadelAuthProvider is wired)
- The refresh token is NOT in this file — stored encrypted via DPAPI at `~/.todo-app/online-inbox.token`
Per-list config (`list_config` in DB) provides defaults for `model`, `system_prompt`, `agent_path`; tasks can override each individually. Task-generating MCP tools (`AddTask`, planning `CreateChildTask`, `SuggestImprovement`) accept an optional `model` (alias-validated via `ModelRegistry.NormalizeAlias` — `haiku`/`sonnet`/`opus`, blank = inherit) so Claude assigns the cheapest capable model at creation time; the planning/system/improvement prompts instruct it to do so (`ModelRegistry.ByCostAscending` = the cost order).

View File

@@ -14,6 +14,7 @@
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />
<PackageReference Include="ModelContextProtocol" Version="1.2.0" />
<PackageReference Include="ModelContextProtocol.AspNetCore" Version="1.2.0" />
<PackageReference Include="System.Security.Cryptography.ProtectedData" Version="8.0.0" />
</ItemGroup>
<ItemGroup>

View File

@@ -1,6 +1,7 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using ClaudeDo.Data;
using ClaudeDo.Worker.Online;
namespace ClaudeDo.Worker.Config;
@@ -39,6 +40,9 @@ public sealed class WorkerConfig
[JsonPropertyName("external_mcp_api_key")]
public string? ExternalMcpApiKey { get; set; }
[JsonPropertyName("online_inbox")]
public OnlineInboxConfig OnlineInbox { get; set; } = new();
public static string DefaultConfigPath =>
Path.Combine(Paths.AppDataRoot(), "worker.config.json");

View File

@@ -0,0 +1,20 @@
using System.Text.Json.Serialization;
namespace ClaudeDo.Worker.Online;
public sealed record RemoteList(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("name")] string Name);
public sealed record RemoteTask(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("listId")] string ListId,
[property: JsonPropertyName("title")] string Title,
[property: JsonPropertyName("description")] string? Description,
[property: JsonPropertyName("createdAt")] DateTimeOffset CreatedAt);
public sealed record MirrorTask(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("listId")] string ListId,
[property: JsonPropertyName("title")] string Title,
[property: JsonPropertyName("description")] string? Description);

View File

@@ -0,0 +1,9 @@
namespace ClaudeDo.Worker.Online;
public interface IOnlineInboxApi
{
Task PutListsAsync(IReadOnlyList<RemoteList> lists, CancellationToken ct = default);
Task<IReadOnlyList<RemoteTask>> GetUnimportedTasksAsync(CancellationToken ct = default);
Task MarkImportedAsync(string id, CancellationToken ct = default);
Task PutMirrorAsync(IReadOnlyList<MirrorTask> tasks, CancellationToken ct = default);
}

View File

@@ -0,0 +1,6 @@
namespace ClaudeDo.Worker.Online.Interfaces;
public interface IOnlineAuthProvider
{
Task<string?> GetAccessTokenAsync(CancellationToken ct = default);
}

View File

@@ -0,0 +1,27 @@
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using Microsoft.EntityFrameworkCore;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Online;
public static class OnlineBacklog
{
/// <summary>
/// Returns the current Idle backlog: Status==Idle, no parent, PlanningPhase==None, not blocked.
/// These are the tasks mirrored to the online store (§2 of the contract).
/// </summary>
public static async Task<List<MirrorTask>> CurrentAsync(
TaskRepository tasks,
CancellationToken ct = default)
{
var all = await tasks.GetAllIdleBacklogAsync(ct);
return all.Select(t => new MirrorTask(t.Id, t.ListId, t.Title, t.Description)).ToList();
}
internal static bool IsBacklogItem(TaskEntity t) =>
t.Status == TaskStatus.Idle
&& t.ParentTaskId == null
&& t.PlanningPhase == PlanningPhase.None
&& t.BlockedByTaskId == null;
}

View File

@@ -0,0 +1,84 @@
using System.Net.Http.Json;
using ClaudeDo.Worker.Online.Interfaces;
namespace ClaudeDo.Worker.Online;
public sealed class OnlineInboxApiClient : IOnlineInboxApi
{
private readonly HttpClient _http;
private readonly IOnlineAuthProvider _auth;
public OnlineInboxApiClient(HttpClient http, IOnlineAuthProvider auth)
{
_http = http;
_auth = auth;
}
/// <summary>
/// Validates that <paramref name="baseUrl"/> is HTTPS or a loopback address.
/// Throws <see cref="InvalidOperationException"/> for non-HTTPS non-loopback URLs.
/// </summary>
public static void ValidateBaseUrl(string baseUrl)
{
if (string.IsNullOrWhiteSpace(baseUrl))
throw new InvalidOperationException("online_inbox.api_base_url is not configured.");
var uri = new Uri(baseUrl, UriKind.Absolute);
if (uri.Scheme != "https" && !uri.IsLoopback)
throw new InvalidOperationException(
$"online_inbox.api_base_url must be HTTPS or loopback. Got: {baseUrl}");
}
public async Task PutListsAsync(IReadOnlyList<RemoteList> lists, CancellationToken ct = default)
{
using var req = await BuildAsync(HttpMethod.Put, "/lists", lists, ct);
using var resp = await _http.SendAsync(req, ct);
await EnsureSuccessAsync(resp, ct);
}
public async Task<IReadOnlyList<RemoteTask>> GetUnimportedTasksAsync(CancellationToken ct = default)
{
using var req = await BuildAsync(HttpMethod.Get, "/tasks?imported=false", null, ct);
using var resp = await _http.SendAsync(req, ct);
await EnsureSuccessAsync(resp, ct);
var result = await resp.Content.ReadFromJsonAsync<List<RemoteTask>>(ct);
return result ?? [];
}
public async Task MarkImportedAsync(string id, CancellationToken ct = default)
{
using var req = await BuildAsync(HttpMethod.Post, $"/tasks/{Uri.EscapeDataString(id)}/imported", null, ct);
using var resp = await _http.SendAsync(req, ct);
await EnsureSuccessAsync(resp, ct);
}
public async Task PutMirrorAsync(IReadOnlyList<MirrorTask> tasks, CancellationToken ct = default)
{
using var req = await BuildAsync(HttpMethod.Put, "/tasks/mirror", tasks, ct);
using var resp = await _http.SendAsync(req, ct);
await EnsureSuccessAsync(resp, ct);
}
private async Task<HttpRequestMessage> BuildAsync(
HttpMethod method,
string path,
object? body,
CancellationToken ct)
{
var token = await _auth.GetAccessTokenAsync(ct);
var req = new HttpRequestMessage(method, path);
if (token is not null)
req.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", token);
if (body is not null)
req.Content = JsonContent.Create(body);
return req;
}
private static async Task EnsureSuccessAsync(HttpResponseMessage resp, CancellationToken ct)
{
if (resp.IsSuccessStatusCode) return;
var body = await resp.Content.ReadAsStringAsync(ct);
throw new OnlineInboxException((int)resp.StatusCode,
$"Online Inbox API error {(int)resp.StatusCode}: {body}");
}
}

View File

@@ -0,0 +1,30 @@
using System.Text.Json.Serialization;
namespace ClaudeDo.Worker.Online;
public sealed class ZitadelClientConfig
{
[JsonPropertyName("authority")]
public string Authority { get; set; } = "";
[JsonPropertyName("client_id")]
public string ClientId { get; set; } = "";
[JsonPropertyName("scopes")]
public string Scopes { get; set; } = "openid offline_access";
}
public sealed class OnlineInboxConfig
{
[JsonPropertyName("enabled")]
public bool Enabled { get; set; } = false;
[JsonPropertyName("api_base_url")]
public string ApiBaseUrl { get; set; } = "";
[JsonPropertyName("poll_interval_seconds")]
public int PollIntervalSeconds { get; set; } = 60;
[JsonPropertyName("zitadel")]
public ZitadelClientConfig Zitadel { get; set; } = new();
}

View File

@@ -0,0 +1,12 @@
namespace ClaudeDo.Worker.Online;
public sealed class OnlineInboxException : Exception
{
public int StatusCode { get; }
public OnlineInboxException(int statusCode, string message)
: base(message)
{
StatusCode = statusCode;
}
}

View File

@@ -0,0 +1,121 @@
using ClaudeDo.Data;
using ClaudeDo.Data.Models;
using ClaudeDo.Data.Repositories;
using ClaudeDo.Worker.Online.Interfaces;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using TaskStatus = ClaudeDo.Data.Models.TaskStatus;
namespace ClaudeDo.Worker.Online;
public sealed class OnlineSyncService : BackgroundService
{
private readonly IDbContextFactory<ClaudeDoDbContext> _dbFactory;
private readonly IOnlineInboxApi _api;
private readonly IOnlineAuthProvider _auth;
private readonly OnlineInboxConfig _config;
private readonly ILogger<OnlineSyncService> _logger;
public OnlineSyncService(
IDbContextFactory<ClaudeDoDbContext> dbFactory,
IOnlineInboxApi api,
IOnlineAuthProvider auth,
OnlineInboxConfig config,
ILogger<OnlineSyncService> logger)
{
_dbFactory = dbFactory;
_api = api;
_auth = auth;
_config = config;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await TickAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "OnlineSyncService cycle failed; backing off to next interval");
}
try
{
await Task.Delay(TimeSpan.FromSeconds(_config.PollIntervalSeconds), stoppingToken);
}
catch (OperationCanceledException)
{
return;
}
}
}
internal async Task TickAsync(CancellationToken ct)
{
var token = await _auth.GetAccessTokenAsync(ct);
if (token is null)
{
_logger.LogDebug("OnlineSyncService: no access token, skipping cycle");
return;
}
await using var ctx = await _dbFactory.CreateDbContextAsync(ct);
var tasks = new TaskRepository(ctx);
var lists = new ListRepository(ctx);
// Step 1: pull unimported tasks, import them locally, mark each imported.
var unimported = await _api.GetUnimportedTasksAsync(ct);
foreach (var remote in unimported)
{
var existing = await tasks.GetByIdAsync(remote.Id, ct);
if (existing is not null)
{
// Already imported locally; just mark it on the server.
await _api.MarkImportedAsync(remote.Id, ct);
continue;
}
var list = await lists.GetByIdAsync(remote.ListId, ct);
if (list is null)
{
_logger.LogWarning(
"OnlineSyncService: remote task {Id} references unknown list {ListId}; skipping",
remote.Id, remote.ListId);
continue;
}
var entity = new TaskEntity
{
Id = remote.Id,
ListId = remote.ListId,
Title = remote.Title,
Description = remote.Description,
Status = TaskStatus.Idle,
CreatedBy = "online",
CreatedAt = remote.CreatedAt.UtcDateTime,
CommitType = CommitTypeRegistry.DefaultType,
};
await tasks.AddAsync(entity, ct);
await _api.MarkImportedAsync(remote.Id, ct);
_logger.LogInformation("OnlineSyncService: imported task {Id} ('{Title}')", remote.Id, remote.Title);
}
// Step 2: push full list catalog.
var allLists = await lists.GetAllAsync(ct);
var remoteLists = allLists.Select(l => new RemoteList(l.Id, l.Name)).ToList();
await _api.PutListsAsync(remoteLists, ct);
// Step 3: push current Idle backlog mirror.
var mirror = await OnlineBacklog.CurrentAsync(tasks, ct);
await _api.PutMirrorAsync(mirror, ct);
}
}

View File

@@ -0,0 +1,54 @@
using System.Runtime.Versioning;
using System.Security.Cryptography;
using System.Text;
using ClaudeDo.Data;
namespace ClaudeDo.Worker.Online;
/// <summary>
/// Persists the Zitadel refresh token encrypted with DPAPI (CurrentUser scope).
/// Windows-only; the file lives at ~/.todo-app/online-inbox.token.
/// </summary>
[SupportedOSPlatform("windows")]
public sealed class OnlineTokenStore
{
private readonly string _tokenPath;
public OnlineTokenStore()
: this(Path.Combine(Paths.AppDataRoot(), "online-inbox.token")) { }
internal OnlineTokenStore(string tokenPath)
{
_tokenPath = tokenPath;
}
public void Save(string refreshToken)
{
ArgumentException.ThrowIfNullOrEmpty(refreshToken);
var plain = Encoding.UTF8.GetBytes(refreshToken);
var cipher = ProtectedData.Protect(plain, null, DataProtectionScope.CurrentUser);
Directory.CreateDirectory(Path.GetDirectoryName(_tokenPath)!);
File.WriteAllBytes(_tokenPath, cipher);
}
public string? Read()
{
if (!File.Exists(_tokenPath)) return null;
try
{
var cipher = File.ReadAllBytes(_tokenPath);
var plain = ProtectedData.Unprotect(cipher, null, DataProtectionScope.CurrentUser);
return Encoding.UTF8.GetString(plain);
}
catch
{
return null;
}
}
public void Clear()
{
if (File.Exists(_tokenPath))
File.Delete(_tokenPath);
}
}

View File

@@ -0,0 +1,21 @@
using ClaudeDo.Worker.Online.Interfaces;
namespace ClaudeDo.Worker.Online;
/// <summary>
/// Simple <see cref="IOnlineAuthProvider"/> that returns a fixed token supplied at construction.
/// Used as the default DI registration until <c>ZitadelAuthProvider</c> is wired (Phase 2).
/// Also serves as the test double.
/// </summary>
public sealed class StaticTokenAuthProvider : IOnlineAuthProvider
{
private readonly string? _token;
public StaticTokenAuthProvider(string? token = null)
{
_token = token;
}
public Task<string?> GetAccessTokenAsync(CancellationToken ct = default)
=> Task.FromResult(_token);
}

View File

@@ -0,0 +1,13 @@
using ClaudeDo.Worker.Online.Interfaces;
namespace ClaudeDo.Worker.Online;
// TODO(online-inbox): wire the Zitadel package once client config is known (Phase 2).
// Replace this stub with a real implementation that uses OnlineTokenStore to read the
// refresh token and exchanges it for an access token via the Zitadel OIDC endpoint,
// caching the access token until near expiry.
public sealed class ZitadelAuthProvider : IOnlineAuthProvider
{
public Task<string?> GetAccessTokenAsync(CancellationToken ct = default)
=> Task.FromResult<string?>(null);
}

View File

@@ -11,6 +11,8 @@ using ClaudeDo.Worker.Planning;
using ClaudeDo.Worker.Queue;
using ClaudeDo.Worker.Runner;
using ClaudeDo.Worker.State;
using ClaudeDo.Worker.Online;
using ClaudeDo.Worker.Online.Interfaces;
using ClaudeDo.Worker.Prime;
using ClaudeDo.Worker.Refine;
using ClaudeDo.Worker.Report;
@@ -149,6 +151,22 @@ builder.Services.AddMcpServer()
.WithTools<PlanningMcpService>()
.WithTools<TaskRunMcpService>();
// Online Inbox — registered only when enabled.
if (cfg.OnlineInbox.Enabled)
{
OnlineInboxApiClient.ValidateBaseUrl(cfg.OnlineInbox.ApiBaseUrl);
builder.Services.AddSingleton(cfg.OnlineInbox);
builder.Services.AddSingleton<IOnlineAuthProvider, StaticTokenAuthProvider>();
#pragma warning disable CA1416 // ClaudeDo.Worker is Windows-only; DPAPI is fine here.
builder.Services.AddSingleton<OnlineTokenStore>();
#pragma warning restore CA1416
builder.Services.AddHttpClient<IOnlineInboxApi, OnlineInboxApiClient>(client =>
{
client.BaseAddress = new Uri(cfg.OnlineInbox.ApiBaseUrl.TrimEnd('/') + "/");
});
builder.Services.AddHostedService<OnlineSyncService>();
}
// Loopback-only bind. Firewall is irrelevant for 127.0.0.1.
builder.WebHost.UseUrls($"http://127.0.0.1:{cfg.SignalRPort}");