fix(worker): reap idle interactive sessions so they don't pile up

Interactive/streaming sessions are persistent claude.exe processes that
wait on stdin and never exit on their own. The only teardown was an
explicit StopInteractiveSession from the UI — there is no client-disconnect
or shutdown sweep — so an abandoned chat (UI closed, navigated away,
crashed) kept its claude.exe (+ conhost) alive for the worker's whole
lifetime. Under a long-running autostart worker these accumulate to dozens
of orphaned child processes.

LiveSessionRegistry now tracks per-session activity (Touch on every output
line and user action) and exposes ReapIdleAsync, which stops sessions idle
past a timeout while skipping any with a turn in flight. IdleSessionReaper
(BackgroundService) sweeps every 5 min; idle timeout defaults to 30 min,
configurable via interactive_idle_timeout_minutes (0 disables).
This commit is contained in:
Mika Kuns
2026-06-26 14:34:44 +02:00
parent faf6104645
commit a53ae29799
6 changed files with 170 additions and 9 deletions

View File

@@ -41,6 +41,10 @@ public sealed class WorkerConfig
[JsonPropertyName("external_mcp_api_key")] [JsonPropertyName("external_mcp_api_key")]
public string? ExternalMcpApiKey { get; set; } public string? ExternalMcpApiKey { get; set; }
/// <summary>Interactive/streaming sessions idle longer than this are stopped by IdleSessionReaper. 0 disables reaping.</summary>
[JsonPropertyName("interactive_idle_timeout_minutes")]
public int InteractiveIdleTimeoutMinutes { get; set; } = 30;
[JsonPropertyName("online_inbox")] [JsonPropertyName("online_inbox")]
public OnlineInboxConfig OnlineInbox { get; set; } = new(); public OnlineInboxConfig OnlineInbox { get; set; } = new();

View File

@@ -70,7 +70,11 @@ public sealed class InteractiveSessionService
"--permission-mode", "auto", "--permission-mode", "auto",
}; };
Func<string, Task> onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line); Func<string, Task> onLine = line =>
{
_registry.Touch(taskId);
return _broadcaster.TaskMessage(taskId, "[stdout] " + line);
};
ILiveSession session; ILiveSession session;
Task exitTask; Task exitTask;
@@ -124,19 +128,26 @@ public sealed class InteractiveSessionService
{ {
if (!_registry.TryGet(taskId, out var session)) if (!_registry.TryGet(taskId, out var session))
throw new InvalidOperationException("No interactive session is running for this task."); throw new InvalidOperationException("No interactive session is running for this task.");
_registry.Touch(taskId);
await session.SendUserMessageAsync(text, ct); await session.SendUserMessageAsync(text, ct);
} }
public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct) public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct)
{ {
if (_registry.TryGet(taskId, out var session)) if (_registry.TryGet(taskId, out var session))
{
_registry.Touch(taskId);
await session.RemoveQueuedAsync(text, ct); await session.RemoveQueuedAsync(text, ct);
}
} }
public async Task InterruptAsync(string taskId, CancellationToken ct) public async Task InterruptAsync(string taskId, CancellationToken ct)
{ {
if (_registry.TryGet(taskId, out var session)) if (_registry.TryGet(taskId, out var session))
{
_registry.Touch(taskId);
await session.InterruptAsync(ct); await session.InterruptAsync(ct);
}
} }
public async Task StopAsync(string taskId, CancellationToken ct) public async Task StopAsync(string taskId, CancellationToken ct)

View File

@@ -82,6 +82,7 @@ builder.Services.AddSingleton<PlanningMergeOrchestrator>();
builder.Services.AddSingleton<PlanningChainCoordinator>(); builder.Services.AddSingleton<PlanningChainCoordinator>();
builder.Services.AddSingleton<LiveSessionRegistry>(); builder.Services.AddSingleton<LiveSessionRegistry>();
builder.Services.AddSingleton<InteractiveSessionService>(); builder.Services.AddSingleton<InteractiveSessionService>();
builder.Services.AddHostedService<IdleSessionReaper>();
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker // Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
// performs atomic Queued→Running claim. Both injected into the state service so it // performs atomic Queued→Running claim. Both injected into the state service so it

View File

@@ -0,0 +1,49 @@
using ClaudeDo.Worker.Config;
namespace ClaudeDo.Worker.Runner;
// Stops interactive/streaming sessions that have gone idle. Interactive `claude` processes wait
// on stdin and never exit on their own, and there is no client-disconnect teardown — so an
// abandoned chat (UI closed, navigated away, crashed) keeps its claude.exe (+ conhost) alive for
// the worker's entire lifetime. Under a long-running autostart worker these pile up (observed:
// ~170 child processes). This sweep reaps the idle ones.
public sealed class IdleSessionReaper : BackgroundService
{
private static readonly TimeSpan SweepInterval = TimeSpan.FromMinutes(5);
private readonly LiveSessionRegistry _registry;
private readonly WorkerConfig _cfg;
private readonly ILogger<IdleSessionReaper> _logger;
public IdleSessionReaper(LiveSessionRegistry registry, WorkerConfig cfg, ILogger<IdleSessionReaper> logger)
{
_registry = registry;
_cfg = cfg;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var idleTimeout = TimeSpan.FromMinutes(_cfg.InteractiveIdleTimeoutMinutes);
if (idleTimeout <= TimeSpan.Zero)
return; // reaper disabled
using var timer = new PeriodicTimer(SweepInterval);
while (await timer.WaitForNextTickAsync(stoppingToken))
{
try
{
var reaped = await _registry.ReapIdleAsync(DateTime.UtcNow, idleTimeout);
if (reaped.Count > 0)
_logger.LogInformation(
"Reaped {session_count} idle interactive session(s) after {idle_minutes} min: {task_ids}",
reaped.Count, _cfg.InteractiveIdleTimeoutMinutes, string.Join(", ", reaped));
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
_logger.LogWarning(ex, "Idle session reap sweep failed");
}
}
}
}

View File

@@ -5,28 +5,47 @@ namespace ClaudeDo.Worker.Runner;
// Singleton in-memory registry of active live streaming sessions. // Singleton in-memory registry of active live streaming sessions.
// A session's lifetime matches its associated task run; dead entries are removed by the runner. // A session's lifetime matches its associated task run; dead entries are removed by the runner.
//
// Interactive (stream-json) sessions never exit on their own — they wait on stdin — and there is
// no client-disconnect teardown, so an abandoned chat would otherwise keep its claude.exe alive
// for the worker's whole lifetime. IdleSessionReaper periodically stops sessions that have seen
// no activity past a timeout (see ReapIdleAsync); Touch() records that activity.
public sealed class LiveSessionRegistry public sealed class LiveSessionRegistry
{ {
private readonly ConcurrentDictionary<string, ILiveSession> _sessions = new(); private sealed class Entry
{
public required ILiveSession Session { get; init; }
public long LastActivityTicksUtc;
}
private readonly ConcurrentDictionary<string, Entry> _sessions = new();
public void Register(string taskId, ILiveSession session) public void Register(string taskId, ILiveSession session)
{ {
if (_sessions.TryRemove(taskId, out var existing)) if (_sessions.TryRemove(taskId, out var existing))
{ {
// Best-effort stop of the replaced session; don't await to avoid deadlock risk. // Best-effort stop of the replaced session; don't await to avoid deadlock risk.
_ = existing.StopAsync().ContinueWith(t => _ = existing.Session.StopAsync().ContinueWith(t =>
{ {
if (t.IsFaulted) { /* swallow — old session is already orphaned */ } if (t.IsFaulted) { /* swallow — old session is already orphaned */ }
}, TaskScheduler.Default); }, TaskScheduler.Default);
} }
_sessions[taskId] = session; _sessions[taskId] = new Entry { Session = session, LastActivityTicksUtc = DateTime.UtcNow.Ticks };
}
// Marks a session as active so the idle reaper leaves it alone. Called on every user
// message and every output line. No-op if the session is not (yet) registered.
public void Touch(string taskId)
{
if (_sessions.TryGetValue(taskId, out var entry))
Interlocked.Exchange(ref entry.LastActivityTicksUtc, DateTime.UtcNow.Ticks);
} }
public bool TryGet(string taskId, out ILiveSession session) public bool TryGet(string taskId, out ILiveSession session)
{ {
if (_sessions.TryGetValue(taskId, out var s)) if (_sessions.TryGetValue(taskId, out var entry))
{ {
session = s; session = entry.Session;
return true; return true;
} }
session = null!; session = null!;
@@ -37,7 +56,32 @@ public sealed class LiveSessionRegistry
public async Task StopAsync(string taskId) public async Task StopAsync(string taskId)
{ {
if (_sessions.TryRemove(taskId, out var session)) if (_sessions.TryRemove(taskId, out var entry))
await session.StopAsync(); await entry.Session.StopAsync();
}
// Stops and removes every session whose last activity is older than (nowUtc - idleTimeout),
// skipping any session with a turn in flight (an agent that's actively working, even if quiet).
// Returns the reaped task ids.
public async Task<IReadOnlyList<string>> ReapIdleAsync(DateTime nowUtc, TimeSpan idleTimeout)
{
var cutoffTicks = (nowUtc - idleTimeout).Ticks;
List<string>? reaped = null;
foreach (var kvp in _sessions)
{
var entry = kvp.Value;
if (entry.Session.IsTurnInFlight) continue;
if (Interlocked.Read(ref entry.LastActivityTicksUtc) > cutoffTicks) continue;
if (_sessions.TryRemove(kvp.Key, out var removed))
{
try { await removed.Session.StopAsync(); }
catch { /* already dead — leave it removed */ }
(reaped ??= new()).Add(kvp.Key);
}
}
return reaped ?? (IReadOnlyList<string>)Array.Empty<string>();
} }
} }

View File

@@ -8,7 +8,7 @@ public sealed class LiveSessionRegistryTests
private sealed class FakeLiveSession : ILiveSession private sealed class FakeLiveSession : ILiveSession
{ {
public bool StopCalled { get; private set; } public bool StopCalled { get; private set; }
public bool IsTurnInFlight => false; public bool IsTurnInFlight { get; set; }
public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask; public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask;
public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask; public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask;
@@ -90,4 +90,56 @@ public sealed class LiveSessionRegistryTests
var registry = new LiveSessionRegistry(); var registry = new LiveSessionRegistry();
await registry.StopAsync("no-such-task"); // should not throw await registry.StopAsync("no-such-task"); // should not throw
} }
[Fact]
public async Task ReapIdleAsync_StopsAndRemovesIdleSession()
{
var registry = new LiveSessionRegistry();
var session = new FakeLiveSession();
registry.Register("task-1", session);
// Sweep "now" is an hour past registration, well beyond the 30-min idle window.
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30));
Assert.Contains("task-1", reaped);
Assert.True(session.StopCalled);
Assert.False(registry.TryGet("task-1", out _));
}
[Fact]
public async Task ReapIdleAsync_KeepsRecentlyActiveSession()
{
var registry = new LiveSessionRegistry();
var session = new FakeLiveSession();
registry.Register("task-1", session);
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30));
Assert.Empty(reaped);
Assert.False(session.StopCalled);
Assert.True(registry.TryGet("task-1", out _));
}
[Fact]
public async Task ReapIdleAsync_SkipsSessionWithTurnInFlight()
{
var registry = new LiveSessionRegistry();
var session = new FakeLiveSession { IsTurnInFlight = true };
registry.Register("task-1", session);
// Idle long enough to reap, but a turn is in flight → must be left alone.
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30));
Assert.Empty(reaped);
Assert.False(session.StopCalled);
Assert.True(registry.TryGet("task-1", out _));
}
[Fact]
public async Task ReapIdleAsync_NoSessions_ReturnsEmpty()
{
var registry = new LiveSessionRegistry();
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30));
Assert.Empty(reaped);
}
} }