From 711374e85838eaf4efaafc4afb9831e4031d8e18 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Fri, 26 Jun 2026 14:34:44 +0200 Subject: [PATCH] fix(worker): reap idle interactive sessions so they don't pile up MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Interactive/streaming sessions are persistent claude.exe processes that wait on stdin and never exit on their own. The only teardown was an explicit StopInteractiveSession from the UI — there is no client-disconnect or shutdown sweep — so an abandoned chat (UI closed, navigated away, crashed) kept its claude.exe (+ conhost) alive for the worker's whole lifetime. Under a long-running autostart worker these accumulate to dozens of orphaned child processes. LiveSessionRegistry now tracks per-session activity (Touch on every output line and user action) and exposes ReapIdleAsync, which stops sessions idle past a timeout while skipping any with a turn in flight. IdleSessionReaper (BackgroundService) sweeps every 5 min; idle timeout defaults to 30 min, configurable via interactive_idle_timeout_minutes (0 disables). --- src/ClaudeDo.Worker/Config/WorkerConfig.cs | 4 ++ .../Planning/InteractiveSessionService.cs | 13 ++++- src/ClaudeDo.Worker/Program.cs | 1 + .../Runner/IdleSessionReaper.cs | 49 ++++++++++++++++ .../Runner/LiveSessionRegistry.cs | 58 ++++++++++++++++--- .../Runner/LiveSessionRegistryTests.cs | 54 ++++++++++++++++- 6 files changed, 170 insertions(+), 9 deletions(-) create mode 100644 src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs diff --git a/src/ClaudeDo.Worker/Config/WorkerConfig.cs b/src/ClaudeDo.Worker/Config/WorkerConfig.cs index afcdff9..f060517 100644 --- a/src/ClaudeDo.Worker/Config/WorkerConfig.cs +++ b/src/ClaudeDo.Worker/Config/WorkerConfig.cs @@ -41,6 +41,10 @@ public sealed class WorkerConfig [JsonPropertyName("external_mcp_api_key")] public string? ExternalMcpApiKey { get; set; } + /// Interactive/streaming sessions idle longer than this are stopped by IdleSessionReaper. 0 disables reaping. + [JsonPropertyName("interactive_idle_timeout_minutes")] + public int InteractiveIdleTimeoutMinutes { get; set; } = 30; + [JsonPropertyName("online_inbox")] public OnlineInboxConfig OnlineInbox { get; set; } = new(); diff --git a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs index 099790a..2a8cb21 100644 --- a/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs +++ b/src/ClaudeDo.Worker/Planning/InteractiveSessionService.cs @@ -70,7 +70,11 @@ public sealed class InteractiveSessionService "--permission-mode", "auto", }; - Func onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line); + Func onLine = line => + { + _registry.Touch(taskId); + return _broadcaster.TaskMessage(taskId, "[stdout] " + line); + }; ILiveSession session; Task exitTask; @@ -124,19 +128,26 @@ public sealed class InteractiveSessionService { if (!_registry.TryGet(taskId, out var session)) throw new InvalidOperationException("No interactive session is running for this task."); + _registry.Touch(taskId); await session.SendUserMessageAsync(text, ct); } public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct) { if (_registry.TryGet(taskId, out var session)) + { + _registry.Touch(taskId); await session.RemoveQueuedAsync(text, ct); + } } public async Task InterruptAsync(string taskId, CancellationToken ct) { if (_registry.TryGet(taskId, out var session)) + { + _registry.Touch(taskId); await session.InterruptAsync(ct); + } } public async Task StopAsync(string taskId, CancellationToken ct) diff --git a/src/ClaudeDo.Worker/Program.cs b/src/ClaudeDo.Worker/Program.cs index 29f4c48..7716b7a 100644 --- a/src/ClaudeDo.Worker/Program.cs +++ b/src/ClaudeDo.Worker/Program.cs @@ -82,6 +82,7 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddHostedService(); // Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker // performs atomic Queued→Running claim. Both injected into the state service so it diff --git a/src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs b/src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs new file mode 100644 index 0000000..9d4a0c1 --- /dev/null +++ b/src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs @@ -0,0 +1,49 @@ +using ClaudeDo.Worker.Config; + +namespace ClaudeDo.Worker.Runner; + +// Stops interactive/streaming sessions that have gone idle. Interactive `claude` processes wait +// on stdin and never exit on their own, and there is no client-disconnect teardown — so an +// abandoned chat (UI closed, navigated away, crashed) keeps its claude.exe (+ conhost) alive for +// the worker's entire lifetime. Under a long-running autostart worker these pile up (observed: +// ~170 child processes). This sweep reaps the idle ones. +public sealed class IdleSessionReaper : BackgroundService +{ + private static readonly TimeSpan SweepInterval = TimeSpan.FromMinutes(5); + + private readonly LiveSessionRegistry _registry; + private readonly WorkerConfig _cfg; + private readonly ILogger _logger; + + public IdleSessionReaper(LiveSessionRegistry registry, WorkerConfig cfg, ILogger logger) + { + _registry = registry; + _cfg = cfg; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var idleTimeout = TimeSpan.FromMinutes(_cfg.InteractiveIdleTimeoutMinutes); + if (idleTimeout <= TimeSpan.Zero) + return; // reaper disabled + + using var timer = new PeriodicTimer(SweepInterval); + while (await timer.WaitForNextTickAsync(stoppingToken)) + { + try + { + var reaped = await _registry.ReapIdleAsync(DateTime.UtcNow, idleTimeout); + if (reaped.Count > 0) + _logger.LogInformation( + "Reaped {session_count} idle interactive session(s) after {idle_minutes} min: {task_ids}", + reaped.Count, _cfg.InteractiveIdleTimeoutMinutes, string.Join(", ", reaped)); + } + catch (OperationCanceledException) { throw; } + catch (Exception ex) + { + _logger.LogWarning(ex, "Idle session reap sweep failed"); + } + } + } +} diff --git a/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs b/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs index 60d4376..90a0c64 100644 --- a/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs +++ b/src/ClaudeDo.Worker/Runner/LiveSessionRegistry.cs @@ -5,28 +5,47 @@ namespace ClaudeDo.Worker.Runner; // Singleton in-memory registry of active live streaming sessions. // A session's lifetime matches its associated task run; dead entries are removed by the runner. +// +// Interactive (stream-json) sessions never exit on their own — they wait on stdin — and there is +// no client-disconnect teardown, so an abandoned chat would otherwise keep its claude.exe alive +// for the worker's whole lifetime. IdleSessionReaper periodically stops sessions that have seen +// no activity past a timeout (see ReapIdleAsync); Touch() records that activity. public sealed class LiveSessionRegistry { - private readonly ConcurrentDictionary _sessions = new(); + private sealed class Entry + { + public required ILiveSession Session { get; init; } + public long LastActivityTicksUtc; + } + + private readonly ConcurrentDictionary _sessions = new(); public void Register(string taskId, ILiveSession session) { if (_sessions.TryRemove(taskId, out var existing)) { // Best-effort stop of the replaced session; don't await to avoid deadlock risk. - _ = existing.StopAsync().ContinueWith(t => + _ = existing.Session.StopAsync().ContinueWith(t => { if (t.IsFaulted) { /* swallow — old session is already orphaned */ } }, TaskScheduler.Default); } - _sessions[taskId] = session; + _sessions[taskId] = new Entry { Session = session, LastActivityTicksUtc = DateTime.UtcNow.Ticks }; + } + + // Marks a session as active so the idle reaper leaves it alone. Called on every user + // message and every output line. No-op if the session is not (yet) registered. + public void Touch(string taskId) + { + if (_sessions.TryGetValue(taskId, out var entry)) + Interlocked.Exchange(ref entry.LastActivityTicksUtc, DateTime.UtcNow.Ticks); } public bool TryGet(string taskId, out ILiveSession session) { - if (_sessions.TryGetValue(taskId, out var s)) + if (_sessions.TryGetValue(taskId, out var entry)) { - session = s; + session = entry.Session; return true; } session = null!; @@ -37,7 +56,32 @@ public sealed class LiveSessionRegistry public async Task StopAsync(string taskId) { - if (_sessions.TryRemove(taskId, out var session)) - await session.StopAsync(); + if (_sessions.TryRemove(taskId, out var entry)) + await entry.Session.StopAsync(); + } + + // Stops and removes every session whose last activity is older than (nowUtc - idleTimeout), + // skipping any session with a turn in flight (an agent that's actively working, even if quiet). + // Returns the reaped task ids. + public async Task> ReapIdleAsync(DateTime nowUtc, TimeSpan idleTimeout) + { + var cutoffTicks = (nowUtc - idleTimeout).Ticks; + List? reaped = null; + + foreach (var kvp in _sessions) + { + var entry = kvp.Value; + if (entry.Session.IsTurnInFlight) continue; + if (Interlocked.Read(ref entry.LastActivityTicksUtc) > cutoffTicks) continue; + + if (_sessions.TryRemove(kvp.Key, out var removed)) + { + try { await removed.Session.StopAsync(); } + catch { /* already dead — leave it removed */ } + (reaped ??= new()).Add(kvp.Key); + } + } + + return reaped ?? (IReadOnlyList)Array.Empty(); } } diff --git a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs index 54ab2c8..89b48b3 100644 --- a/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs +++ b/tests/ClaudeDo.Worker.Tests/Runner/LiveSessionRegistryTests.cs @@ -8,7 +8,7 @@ public sealed class LiveSessionRegistryTests private sealed class FakeLiveSession : ILiveSession { public bool StopCalled { get; private set; } - public bool IsTurnInFlight => false; + public bool IsTurnInFlight { get; set; } public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask; public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask; @@ -90,4 +90,56 @@ public sealed class LiveSessionRegistryTests var registry = new LiveSessionRegistry(); await registry.StopAsync("no-such-task"); // should not throw } + + [Fact] + public async Task ReapIdleAsync_StopsAndRemovesIdleSession() + { + var registry = new LiveSessionRegistry(); + var session = new FakeLiveSession(); + registry.Register("task-1", session); + + // Sweep "now" is an hour past registration, well beyond the 30-min idle window. + var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30)); + + Assert.Contains("task-1", reaped); + Assert.True(session.StopCalled); + Assert.False(registry.TryGet("task-1", out _)); + } + + [Fact] + public async Task ReapIdleAsync_KeepsRecentlyActiveSession() + { + var registry = new LiveSessionRegistry(); + var session = new FakeLiveSession(); + registry.Register("task-1", session); + + var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30)); + + Assert.Empty(reaped); + Assert.False(session.StopCalled); + Assert.True(registry.TryGet("task-1", out _)); + } + + [Fact] + public async Task ReapIdleAsync_SkipsSessionWithTurnInFlight() + { + var registry = new LiveSessionRegistry(); + var session = new FakeLiveSession { IsTurnInFlight = true }; + registry.Register("task-1", session); + + // Idle long enough to reap, but a turn is in flight → must be left alone. + var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30)); + + Assert.Empty(reaped); + Assert.False(session.StopCalled); + Assert.True(registry.TryGet("task-1", out _)); + } + + [Fact] + public async Task ReapIdleAsync_NoSessions_ReturnsEmpty() + { + var registry = new LiveSessionRegistry(); + var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30)); + Assert.Empty(reaped); + } }