fix(worker): reap idle interactive sessions so they don't pile up
Interactive/streaming sessions are persistent claude.exe processes that wait on stdin and never exit on their own. The only teardown was an explicit StopInteractiveSession from the UI — there is no client-disconnect or shutdown sweep — so an abandoned chat (UI closed, navigated away, crashed) kept its claude.exe (+ conhost) alive for the worker's whole lifetime. Under a long-running autostart worker these accumulate to dozens of orphaned child processes. LiveSessionRegistry now tracks per-session activity (Touch on every output line and user action) and exposes ReapIdleAsync, which stops sessions idle past a timeout while skipping any with a turn in flight. IdleSessionReaper (BackgroundService) sweeps every 5 min; idle timeout defaults to 30 min, configurable via interactive_idle_timeout_minutes (0 disables).
This commit is contained in:
@@ -41,6 +41,10 @@ public sealed class WorkerConfig
|
|||||||
[JsonPropertyName("external_mcp_api_key")]
|
[JsonPropertyName("external_mcp_api_key")]
|
||||||
public string? ExternalMcpApiKey { get; set; }
|
public string? ExternalMcpApiKey { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Interactive/streaming sessions idle longer than this are stopped by IdleSessionReaper. 0 disables reaping.</summary>
|
||||||
|
[JsonPropertyName("interactive_idle_timeout_minutes")]
|
||||||
|
public int InteractiveIdleTimeoutMinutes { get; set; } = 30;
|
||||||
|
|
||||||
[JsonPropertyName("online_inbox")]
|
[JsonPropertyName("online_inbox")]
|
||||||
public OnlineInboxConfig OnlineInbox { get; set; } = new();
|
public OnlineInboxConfig OnlineInbox { get; set; } = new();
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,11 @@ public sealed class InteractiveSessionService
|
|||||||
"--permission-mode", "auto",
|
"--permission-mode", "auto",
|
||||||
};
|
};
|
||||||
|
|
||||||
Func<string, Task> onLine = line => _broadcaster.TaskMessage(taskId, "[stdout] " + line);
|
Func<string, Task> onLine = line =>
|
||||||
|
{
|
||||||
|
_registry.Touch(taskId);
|
||||||
|
return _broadcaster.TaskMessage(taskId, "[stdout] " + line);
|
||||||
|
};
|
||||||
|
|
||||||
ILiveSession session;
|
ILiveSession session;
|
||||||
Task exitTask;
|
Task exitTask;
|
||||||
@@ -124,20 +128,27 @@ public sealed class InteractiveSessionService
|
|||||||
{
|
{
|
||||||
if (!_registry.TryGet(taskId, out var session))
|
if (!_registry.TryGet(taskId, out var session))
|
||||||
throw new InvalidOperationException("No interactive session is running for this task.");
|
throw new InvalidOperationException("No interactive session is running for this task.");
|
||||||
|
_registry.Touch(taskId);
|
||||||
await session.SendUserMessageAsync(text, ct);
|
await session.SendUserMessageAsync(text, ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct)
|
public async Task RemoveQueuedAsync(string taskId, string text, CancellationToken ct)
|
||||||
{
|
{
|
||||||
if (_registry.TryGet(taskId, out var session))
|
if (_registry.TryGet(taskId, out var session))
|
||||||
|
{
|
||||||
|
_registry.Touch(taskId);
|
||||||
await session.RemoveQueuedAsync(text, ct);
|
await session.RemoveQueuedAsync(text, ct);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public async Task InterruptAsync(string taskId, CancellationToken ct)
|
public async Task InterruptAsync(string taskId, CancellationToken ct)
|
||||||
{
|
{
|
||||||
if (_registry.TryGet(taskId, out var session))
|
if (_registry.TryGet(taskId, out var session))
|
||||||
|
{
|
||||||
|
_registry.Touch(taskId);
|
||||||
await session.InterruptAsync(ct);
|
await session.InterruptAsync(ct);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public async Task StopAsync(string taskId, CancellationToken ct)
|
public async Task StopAsync(string taskId, CancellationToken ct)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -82,6 +82,7 @@ builder.Services.AddSingleton<PlanningMergeOrchestrator>();
|
|||||||
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
builder.Services.AddSingleton<PlanningChainCoordinator>();
|
||||||
builder.Services.AddSingleton<LiveSessionRegistry>();
|
builder.Services.AddSingleton<LiveSessionRegistry>();
|
||||||
builder.Services.AddSingleton<InteractiveSessionService>();
|
builder.Services.AddSingleton<InteractiveSessionService>();
|
||||||
|
builder.Services.AddHostedService<IdleSessionReaper>();
|
||||||
|
|
||||||
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
// Queue dispatch primitives. QueueWaker holds the wake semaphore; the queue picker
|
||||||
// performs atomic Queued→Running claim. Both injected into the state service so it
|
// performs atomic Queued→Running claim. Both injected into the state service so it
|
||||||
|
|||||||
49
src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs
Normal file
49
src/ClaudeDo.Worker/Runner/IdleSessionReaper.cs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
using ClaudeDo.Worker.Config;
|
||||||
|
|
||||||
|
namespace ClaudeDo.Worker.Runner;
|
||||||
|
|
||||||
|
// Stops interactive/streaming sessions that have gone idle. Interactive `claude` processes wait
|
||||||
|
// on stdin and never exit on their own, and there is no client-disconnect teardown — so an
|
||||||
|
// abandoned chat (UI closed, navigated away, crashed) keeps its claude.exe (+ conhost) alive for
|
||||||
|
// the worker's entire lifetime. Under a long-running autostart worker these pile up (observed:
|
||||||
|
// ~170 child processes). This sweep reaps the idle ones.
|
||||||
|
public sealed class IdleSessionReaper : BackgroundService
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan SweepInterval = TimeSpan.FromMinutes(5);
|
||||||
|
|
||||||
|
private readonly LiveSessionRegistry _registry;
|
||||||
|
private readonly WorkerConfig _cfg;
|
||||||
|
private readonly ILogger<IdleSessionReaper> _logger;
|
||||||
|
|
||||||
|
public IdleSessionReaper(LiveSessionRegistry registry, WorkerConfig cfg, ILogger<IdleSessionReaper> logger)
|
||||||
|
{
|
||||||
|
_registry = registry;
|
||||||
|
_cfg = cfg;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
var idleTimeout = TimeSpan.FromMinutes(_cfg.InteractiveIdleTimeoutMinutes);
|
||||||
|
if (idleTimeout <= TimeSpan.Zero)
|
||||||
|
return; // reaper disabled
|
||||||
|
|
||||||
|
using var timer = new PeriodicTimer(SweepInterval);
|
||||||
|
while (await timer.WaitForNextTickAsync(stoppingToken))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var reaped = await _registry.ReapIdleAsync(DateTime.UtcNow, idleTimeout);
|
||||||
|
if (reaped.Count > 0)
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Reaped {session_count} idle interactive session(s) after {idle_minutes} min: {task_ids}",
|
||||||
|
reaped.Count, _cfg.InteractiveIdleTimeoutMinutes, string.Join(", ", reaped));
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "Idle session reap sweep failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,28 +5,47 @@ namespace ClaudeDo.Worker.Runner;
|
|||||||
|
|
||||||
// Singleton in-memory registry of active live streaming sessions.
|
// Singleton in-memory registry of active live streaming sessions.
|
||||||
// A session's lifetime matches its associated task run; dead entries are removed by the runner.
|
// A session's lifetime matches its associated task run; dead entries are removed by the runner.
|
||||||
|
//
|
||||||
|
// Interactive (stream-json) sessions never exit on their own — they wait on stdin — and there is
|
||||||
|
// no client-disconnect teardown, so an abandoned chat would otherwise keep its claude.exe alive
|
||||||
|
// for the worker's whole lifetime. IdleSessionReaper periodically stops sessions that have seen
|
||||||
|
// no activity past a timeout (see ReapIdleAsync); Touch() records that activity.
|
||||||
public sealed class LiveSessionRegistry
|
public sealed class LiveSessionRegistry
|
||||||
{
|
{
|
||||||
private readonly ConcurrentDictionary<string, ILiveSession> _sessions = new();
|
private sealed class Entry
|
||||||
|
{
|
||||||
|
public required ILiveSession Session { get; init; }
|
||||||
|
public long LastActivityTicksUtc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly ConcurrentDictionary<string, Entry> _sessions = new();
|
||||||
|
|
||||||
public void Register(string taskId, ILiveSession session)
|
public void Register(string taskId, ILiveSession session)
|
||||||
{
|
{
|
||||||
if (_sessions.TryRemove(taskId, out var existing))
|
if (_sessions.TryRemove(taskId, out var existing))
|
||||||
{
|
{
|
||||||
// Best-effort stop of the replaced session; don't await to avoid deadlock risk.
|
// Best-effort stop of the replaced session; don't await to avoid deadlock risk.
|
||||||
_ = existing.StopAsync().ContinueWith(t =>
|
_ = existing.Session.StopAsync().ContinueWith(t =>
|
||||||
{
|
{
|
||||||
if (t.IsFaulted) { /* swallow — old session is already orphaned */ }
|
if (t.IsFaulted) { /* swallow — old session is already orphaned */ }
|
||||||
}, TaskScheduler.Default);
|
}, TaskScheduler.Default);
|
||||||
}
|
}
|
||||||
_sessions[taskId] = session;
|
_sessions[taskId] = new Entry { Session = session, LastActivityTicksUtc = DateTime.UtcNow.Ticks };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marks a session as active so the idle reaper leaves it alone. Called on every user
|
||||||
|
// message and every output line. No-op if the session is not (yet) registered.
|
||||||
|
public void Touch(string taskId)
|
||||||
|
{
|
||||||
|
if (_sessions.TryGetValue(taskId, out var entry))
|
||||||
|
Interlocked.Exchange(ref entry.LastActivityTicksUtc, DateTime.UtcNow.Ticks);
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool TryGet(string taskId, out ILiveSession session)
|
public bool TryGet(string taskId, out ILiveSession session)
|
||||||
{
|
{
|
||||||
if (_sessions.TryGetValue(taskId, out var s))
|
if (_sessions.TryGetValue(taskId, out var entry))
|
||||||
{
|
{
|
||||||
session = s;
|
session = entry.Session;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
session = null!;
|
session = null!;
|
||||||
@@ -37,7 +56,32 @@ public sealed class LiveSessionRegistry
|
|||||||
|
|
||||||
public async Task StopAsync(string taskId)
|
public async Task StopAsync(string taskId)
|
||||||
{
|
{
|
||||||
if (_sessions.TryRemove(taskId, out var session))
|
if (_sessions.TryRemove(taskId, out var entry))
|
||||||
await session.StopAsync();
|
await entry.Session.StopAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stops and removes every session whose last activity is older than (nowUtc - idleTimeout),
|
||||||
|
// skipping any session with a turn in flight (an agent that's actively working, even if quiet).
|
||||||
|
// Returns the reaped task ids.
|
||||||
|
public async Task<IReadOnlyList<string>> ReapIdleAsync(DateTime nowUtc, TimeSpan idleTimeout)
|
||||||
|
{
|
||||||
|
var cutoffTicks = (nowUtc - idleTimeout).Ticks;
|
||||||
|
List<string>? reaped = null;
|
||||||
|
|
||||||
|
foreach (var kvp in _sessions)
|
||||||
|
{
|
||||||
|
var entry = kvp.Value;
|
||||||
|
if (entry.Session.IsTurnInFlight) continue;
|
||||||
|
if (Interlocked.Read(ref entry.LastActivityTicksUtc) > cutoffTicks) continue;
|
||||||
|
|
||||||
|
if (_sessions.TryRemove(kvp.Key, out var removed))
|
||||||
|
{
|
||||||
|
try { await removed.Session.StopAsync(); }
|
||||||
|
catch { /* already dead — leave it removed */ }
|
||||||
|
(reaped ??= new()).Add(kvp.Key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return reaped ?? (IReadOnlyList<string>)Array.Empty<string>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ public sealed class LiveSessionRegistryTests
|
|||||||
private sealed class FakeLiveSession : ILiveSession
|
private sealed class FakeLiveSession : ILiveSession
|
||||||
{
|
{
|
||||||
public bool StopCalled { get; private set; }
|
public bool StopCalled { get; private set; }
|
||||||
public bool IsTurnInFlight => false;
|
public bool IsTurnInFlight { get; set; }
|
||||||
|
|
||||||
public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask;
|
public Task SendUserMessageAsync(string text, CancellationToken ct) => Task.CompletedTask;
|
||||||
public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask;
|
public Task RemoveQueuedAsync(string text, CancellationToken ct) => Task.CompletedTask;
|
||||||
@@ -90,4 +90,56 @@ public sealed class LiveSessionRegistryTests
|
|||||||
var registry = new LiveSessionRegistry();
|
var registry = new LiveSessionRegistry();
|
||||||
await registry.StopAsync("no-such-task"); // should not throw
|
await registry.StopAsync("no-such-task"); // should not throw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReapIdleAsync_StopsAndRemovesIdleSession()
|
||||||
|
{
|
||||||
|
var registry = new LiveSessionRegistry();
|
||||||
|
var session = new FakeLiveSession();
|
||||||
|
registry.Register("task-1", session);
|
||||||
|
|
||||||
|
// Sweep "now" is an hour past registration, well beyond the 30-min idle window.
|
||||||
|
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30));
|
||||||
|
|
||||||
|
Assert.Contains("task-1", reaped);
|
||||||
|
Assert.True(session.StopCalled);
|
||||||
|
Assert.False(registry.TryGet("task-1", out _));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReapIdleAsync_KeepsRecentlyActiveSession()
|
||||||
|
{
|
||||||
|
var registry = new LiveSessionRegistry();
|
||||||
|
var session = new FakeLiveSession();
|
||||||
|
registry.Register("task-1", session);
|
||||||
|
|
||||||
|
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30));
|
||||||
|
|
||||||
|
Assert.Empty(reaped);
|
||||||
|
Assert.False(session.StopCalled);
|
||||||
|
Assert.True(registry.TryGet("task-1", out _));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReapIdleAsync_SkipsSessionWithTurnInFlight()
|
||||||
|
{
|
||||||
|
var registry = new LiveSessionRegistry();
|
||||||
|
var session = new FakeLiveSession { IsTurnInFlight = true };
|
||||||
|
registry.Register("task-1", session);
|
||||||
|
|
||||||
|
// Idle long enough to reap, but a turn is in flight → must be left alone.
|
||||||
|
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow.AddMinutes(60), TimeSpan.FromMinutes(30));
|
||||||
|
|
||||||
|
Assert.Empty(reaped);
|
||||||
|
Assert.False(session.StopCalled);
|
||||||
|
Assert.True(registry.TryGet("task-1", out _));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReapIdleAsync_NoSessions_ReturnsEmpty()
|
||||||
|
{
|
||||||
|
var registry = new LiveSessionRegistry();
|
||||||
|
var reaped = await registry.ReapIdleAsync(DateTime.UtcNow, TimeSpan.FromMinutes(30));
|
||||||
|
Assert.Empty(reaped);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user