diff --git a/src/ClaudeDo.Worker/Hub/WorkerHub.cs b/src/ClaudeDo.Worker/Hub/WorkerHub.cs index f040fb8..13720df 100644 --- a/src/ClaudeDo.Worker/Hub/WorkerHub.cs +++ b/src/ClaudeDo.Worker/Hub/WorkerHub.cs @@ -6,6 +6,7 @@ using ClaudeDo.Data.Repositories; using ClaudeDo.Worker.Agents; using ClaudeDo.Worker.Config; using ClaudeDo.Worker.Lifecycle; +using ClaudeDo.Worker.Logging; using ClaudeDo.Worker.Online; using ClaudeDo.Worker.Planning; using ClaudeDo.Worker.Prime; @@ -113,6 +114,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub private readonly WorkerConfig _cfg; private readonly OnlineInboxConfig _onlineInboxConfig; private readonly OnlineTokenStore _onlineTokenStore; + private readonly LogRingBuffer? _logBuffer; public WorkerHub( QueueService queue, @@ -136,7 +138,8 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub IRefineRunner refineRunner, WorkerConfig cfg, OnlineInboxConfig onlineInboxConfig, - OnlineTokenStore onlineTokenStore) + OnlineTokenStore onlineTokenStore, + LogRingBuffer? logBuffer = null) { _queue = queue; _waker = waker; @@ -160,8 +163,13 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub _cfg = cfg; _onlineInboxConfig = onlineInboxConfig; _onlineTokenStore = onlineTokenStore; + _logBuffer = logBuffer; } + /// Recent worker log records (last 30 min, all levels) for the Log Visualizer overlay. + public IReadOnlyList GetRecentLogs() => + _logBuffer?.Snapshot() ?? Array.Empty(); + // Maps the two exceptions service methods throw into client-facing HubExceptions: // KeyNotFoundException -> notFoundMessage, InvalidOperationException -> its own message. private static async Task HubGuard(Func action, string notFoundMessage = "task not found") diff --git a/src/ClaudeDo.Worker/Logging/BroadcastLogSink.cs b/src/ClaudeDo.Worker/Logging/BroadcastLogSink.cs new file mode 100644 index 0000000..dcb46e3 --- /dev/null +++ b/src/ClaudeDo.Worker/Logging/BroadcastLogSink.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ClaudeDo.Data.Models; +using Serilog.Core; +using Serilog.Events; + +namespace ClaudeDo.Worker.Logging; + +/// +/// Serilog sink that (a) buffers every event (all levels) into +/// for the Log Visualizer overlay, and (b) broadcasts Warn/Error events to the UI footer +/// via the attached delegate — deduped within a rate-limit window, with a loop guard so +/// SignalR's own log output cannot feed back into another broadcast. +/// +public sealed class BroadcastLogSink : ILogEventSink +{ + private static readonly TimeSpan RateLimitWindow = TimeSpan.FromSeconds(120); + + private readonly LogRingBuffer _buffer; + private readonly Func _utcNow; + private readonly Dictionary _lastBroadcast = new(); + private readonly object _gate = new(); + private Func? _broadcast; + + public BroadcastLogSink(LogRingBuffer buffer, Func? utcNow = null) + { + _buffer = buffer; + _utcNow = utcNow ?? (() => DateTime.UtcNow); + } + + /// Wired post-build, once the SignalR hub context exists. + public void Attach(Func broadcast) => _broadcast = broadcast; + + public void Emit(LogEvent logEvent) + { + var level = Map(logEvent.Level); + var message = Render(logEvent); + var tsUtc = logEvent.Timestamp.UtcDateTime; + + _buffer.Append(new WorkerLogRecord(message, level, tsUtc)); + + if (level is not (WorkerLogLevel.Warn or WorkerLogLevel.Error)) + return; + if (IsPlumbing(logEvent)) + return; + + var broadcast = _broadcast; + if (broadcast is null) + return; + + lock (_gate) + { + if (_lastBroadcast.TryGetValue(message, out var last) && _utcNow() - last < RateLimitWindow) + return; + _lastBroadcast[message] = _utcNow(); + Prune(); + } + + // Fire-and-forget. A broadcast failure itself logs (re-entering this sink), so it + // must never throw out of Emit nor leave an unobserved faulted task. + try { _ = broadcast(message, level, tsUtc).ContinueWith(t => { _ = t.Exception; }, TaskScheduler.Default); } + catch { /* swallow — logging must not crash on a transport hiccup */ } + } + + private static WorkerLogLevel Map(LogEventLevel level) => level switch + { + LogEventLevel.Warning => WorkerLogLevel.Warn, + LogEventLevel.Error or LogEventLevel.Fatal => WorkerLogLevel.Error, + _ => WorkerLogLevel.Info, + }; + + private static string Render(LogEvent e) + { + var msg = e.RenderMessage(); + if (e.Exception is { } ex) + { + var first = ex.Message.Split('\n', 2)[0].Trim(); + msg = $"{msg}: {ex.GetType().Name}: {first}"; + } + return msg; + } + + private static bool IsPlumbing(LogEvent e) + { + if (!e.Properties.TryGetValue("SourceContext", out var v) || v is not ScalarValue { Value: string sc }) + return false; + return sc.StartsWith("Microsoft.AspNetCore.SignalR", StringComparison.Ordinal) + || sc.StartsWith("Microsoft.AspNetCore.Http.Connections", StringComparison.Ordinal); + } + + private void Prune() + { + if (_lastBroadcast.Count <= 256) return; + var cutoff = _utcNow() - RateLimitWindow; + foreach (var key in _lastBroadcast.Where(kv => kv.Value < cutoff).Select(kv => kv.Key).ToList()) + _lastBroadcast.Remove(key); + } +} diff --git a/src/ClaudeDo.Worker/Logging/LogRingBuffer.cs b/src/ClaudeDo.Worker/Logging/LogRingBuffer.cs new file mode 100644 index 0000000..05e1a2a --- /dev/null +++ b/src/ClaudeDo.Worker/Logging/LogRingBuffer.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace ClaudeDo.Worker.Logging; + +/// +/// Thread-safe, time-bounded ring of recent log records. On each append it evicts +/// records older than window and trims to cap. Feeds the Log Visualizer +/// overlay via (oldest-first). The clock is injectable for tests. +/// +public sealed class LogRingBuffer +{ + private readonly TimeSpan _window; + private readonly int _cap; + private readonly Func _utcNow; + private readonly LinkedList _items = new(); + private readonly object _gate = new(); + + public LogRingBuffer(TimeSpan window, int cap = 5000, Func? utcNow = null) + { + _window = window; + _cap = cap; + _utcNow = utcNow ?? (() => DateTime.UtcNow); + } + + public void Append(WorkerLogRecord record) + { + lock (_gate) + { + _items.AddLast(record); + var cutoff = _utcNow() - _window; + while (_items.First is { } first && + (first.Value.TimestampUtc < cutoff || _items.Count > _cap)) + _items.RemoveFirst(); + } + } + + /// Records still inside the window, oldest-first. + public IReadOnlyList Snapshot() + { + lock (_gate) + { + var cutoff = _utcNow() - _window; + return _items.Where(r => r.TimestampUtc >= cutoff).ToList(); + } + } +} diff --git a/src/ClaudeDo.Worker/Logging/WorkerLogRecord.cs b/src/ClaudeDo.Worker/Logging/WorkerLogRecord.cs new file mode 100644 index 0000000..dd724a8 --- /dev/null +++ b/src/ClaudeDo.Worker/Logging/WorkerLogRecord.cs @@ -0,0 +1,7 @@ +using System; +using ClaudeDo.Data.Models; + +namespace ClaudeDo.Worker.Logging; + +/// One captured log line for the in-memory ring buffer / Log Visualizer overlay. +public sealed record WorkerLogRecord(string Message, WorkerLogLevel Level, DateTime TimestampUtc); diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 92450be..a5daaea 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -7,6 +7,7 @@ using ClaudeDo.Worker.Config; using ClaudeDo.Worker.External; using ClaudeDo.Worker.Hub; using ClaudeDo.Worker.Lifecycle; +using ClaudeDo.Worker.Logging; using ClaudeDo.Worker.Planning; using ClaudeDo.Worker.Queue; using ClaudeDo.Worker.Runner; @@ -34,13 +35,22 @@ var builder = WebApplication.CreateBuilder(args); var logRoot = cfg.LogRoot; Directory.CreateDirectory(logRoot); + +// In-memory ring + broadcast sink power the footer log strip and the Log Visualizer +// overlay. Created pre-build so Serilog can write to the sink; the SignalR broadcaster +// is attached once the host is built (see below). +var logBuffer = new LogRingBuffer(TimeSpan.FromMinutes(30)); +var broadcastSink = new BroadcastLogSink(logBuffer); + builder.Host.UseSerilog((ctx, lc) => lc .MinimumLevel.Information() .WriteTo.File( System.IO.Path.Combine(logRoot, "worker-.log"), rollingInterval: RollingInterval.Day, retainedFileCountLimit: 7, - shared: true)); + shared: true) + .WriteTo.Sink(broadcastSink)); +builder.Services.AddSingleton(logBuffer); builder.Services.AddDbContextFactory(opt => opt.UseSqlite($"Data Source={cfg.DbPath}")); @@ -180,6 +190,10 @@ builder.WebHost.UseUrls($"http://127.0.0.1:{cfg.SignalRPort}"); var app = builder.Build(); +// Now that the hub context exists, let the broadcast sink push Warn/Error to the footer. +var logBroadcaster = app.Services.GetRequiredService(); +broadcastSink.Attach((message, level, ts) => logBroadcaster.WorkerLog(message, level, ts)); + using (var scope = app.Services.CreateScope()) { ClaudeDoDbContext.MigrateAndConfigure( diff --git a/tests/ClaudeDo.Worker.Tests/Logging/BroadcastLogSinkTests.cs b/tests/ClaudeDo.Worker.Tests/Logging/BroadcastLogSinkTests.cs new file mode 100644 index 0000000..2827486 --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Logging/BroadcastLogSinkTests.cs @@ -0,0 +1,107 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Worker.Logging; +using Serilog.Events; +using Serilog.Parsing; + +namespace ClaudeDo.Worker.Tests.Logging; + +public class BroadcastLogSinkTests +{ + private static readonly DateTimeOffset EvtTime = new(2026, 6, 23, 8, 0, 0, TimeSpan.Zero); + private static readonly MessageTemplateParser Parser = new(); + + private static LogEvent Evt(LogEventLevel level, string template, Exception? ex = null, string? sourceContext = null) + { + var props = new List(); + if (sourceContext is not null) + props.Add(new LogEventProperty("SourceContext", new ScalarValue(sourceContext))); + return new LogEvent(EvtTime, level, ex, Parser.Parse(template), props); + } + + private static (BroadcastLogSink sink, LogRingBuffer buffer, List<(string msg, WorkerLogLevel lvl)> calls) + NewSink(Func clock) + { + var buffer = new LogRingBuffer(TimeSpan.FromHours(1), utcNow: () => new DateTime(2026, 6, 23, 8, 0, 0, DateTimeKind.Utc)); + var sink = new BroadcastLogSink(buffer, clock); + var calls = new List<(string, WorkerLogLevel)>(); + sink.Attach((m, l, _) => { calls.Add((m, l)); return Task.CompletedTask; }); + return (sink, buffer, calls); + } + + [Fact] + public void Buffers_all_levels() + { + var (sink, buffer, _) = NewSink(() => EvtTime.UtcDateTime); + sink.Emit(Evt(LogEventLevel.Information, "info")); + sink.Emit(Evt(LogEventLevel.Warning, "warn")); + sink.Emit(Evt(LogEventLevel.Error, "err")); + + Assert.Equal(3, buffer.Snapshot().Count); + } + + [Fact] + public void Broadcasts_only_warn_and_error() + { + var (sink, _, calls) = NewSink(() => EvtTime.UtcDateTime); + sink.Emit(Evt(LogEventLevel.Information, "info")); + sink.Emit(Evt(LogEventLevel.Warning, "warn")); + sink.Emit(Evt(LogEventLevel.Error, "err")); + + Assert.Equal(new[] { WorkerLogLevel.Warn, WorkerLogLevel.Error }, calls.Select(c => c.lvl)); + } + + [Fact] + public void Dedupes_repeat_within_window_but_still_buffers() + { + var now = EvtTime.UtcDateTime; + var (sink, buffer, calls) = NewSink(() => now); + sink.Emit(Evt(LogEventLevel.Warning, "same")); + now = now.AddSeconds(30); + sink.Emit(Evt(LogEventLevel.Warning, "same")); + + Assert.Single(calls); // second broadcast suppressed + Assert.Equal(2, buffer.Snapshot().Count); // both buffered + } + + [Fact] + public void Allows_repeat_after_window() + { + var now = EvtTime.UtcDateTime; + var (sink, _, calls) = NewSink(() => now); + sink.Emit(Evt(LogEventLevel.Warning, "same")); + now = now.AddSeconds(121); + sink.Emit(Evt(LogEventLevel.Warning, "same")); + + Assert.Equal(2, calls.Count); + } + + [Fact] + public void Appends_exception_to_message() + { + var (sink, _, calls) = NewSink(() => EvtTime.UtcDateTime); + sink.Emit(Evt(LogEventLevel.Error, "boom", new InvalidOperationException("bad dir"))); + + Assert.Single(calls); + Assert.Equal("boom: InvalidOperationException: bad dir", calls[0].msg); + } + + [Fact] + public void Plumbing_source_buffered_but_not_broadcast() + { + var (sink, buffer, calls) = NewSink(() => EvtTime.UtcDateTime); + sink.Emit(Evt(LogEventLevel.Error, "transport hiccup", sourceContext: "Microsoft.AspNetCore.SignalR.HubConnectionHandler")); + + Assert.Empty(calls); + Assert.Single(buffer.Snapshot()); + } + + [Fact] + public void Does_not_throw_when_detached() + { + var buffer = new LogRingBuffer(TimeSpan.FromHours(1)); + var sink = new BroadcastLogSink(buffer); + sink.Emit(Evt(LogEventLevel.Error, "no subscriber")); + + Assert.Single(buffer.Snapshot()); + } +} diff --git a/tests/ClaudeDo.Worker.Tests/Logging/LogRingBufferTests.cs b/tests/ClaudeDo.Worker.Tests/Logging/LogRingBufferTests.cs new file mode 100644 index 0000000..bd35b3d --- /dev/null +++ b/tests/ClaudeDo.Worker.Tests/Logging/LogRingBufferTests.cs @@ -0,0 +1,48 @@ +using ClaudeDo.Data.Models; +using ClaudeDo.Worker.Logging; + +namespace ClaudeDo.Worker.Tests.Logging; + +public class LogRingBufferTests +{ + private static readonly DateTime Now = new(2026, 6, 23, 8, 0, 0, DateTimeKind.Utc); + + private static WorkerLogRecord Rec(DateTime ts, string msg = "m", WorkerLogLevel lvl = WorkerLogLevel.Info) + => new(msg, lvl, ts); + + [Fact] + public void Evicts_records_older_than_window_on_append() + { + var buf = new LogRingBuffer(TimeSpan.FromMinutes(30), utcNow: () => Now); + buf.Append(Rec(Now.AddMinutes(-40), "old")); + buf.Append(Rec(Now.AddMinutes(-10), "recent")); + + var snap = buf.Snapshot(); + + Assert.Single(snap); + Assert.Equal("recent", snap[0].Message); + } + + [Fact] + public void Evicts_oldest_beyond_cap() + { + var buf = new LogRingBuffer(TimeSpan.FromHours(1), cap: 3, utcNow: () => Now); + for (var i = 0; i < 5; i++) buf.Append(Rec(Now, $"m{i}")); + + var snap = buf.Snapshot(); + + Assert.Equal(3, snap.Count); + Assert.Equal(new[] { "m2", "m3", "m4" }, snap.Select(r => r.Message)); + } + + [Fact] + public void Snapshot_is_oldest_first() + { + var buf = new LogRingBuffer(TimeSpan.FromHours(1), utcNow: () => Now); + buf.Append(Rec(Now.AddMinutes(-3), "a")); + buf.Append(Rec(Now.AddMinutes(-2), "b")); + buf.Append(Rec(Now.AddMinutes(-1), "c")); + + Assert.Equal(new[] { "a", "b", "c" }, buf.Snapshot().Select(r => r.Message)); + } +}