feat(worker): AskUser MCP tool so a running task can ask the user mid-run

A running task can call mcp__claudedo_run__AskUser(question) to block (up to 3
min) on a human answer. PendingQuestionRegistry holds the pending question +
TaskCompletionSource; the tool broadcasts TaskQuestionAsked, awaits the answer
(WorkerHub.AnswerTaskQuestion resolves it), and returns it as the tool result —
or a 'proceed on your judgment' fallback on timeout. The run stays Running
throughout (no status/schema change). ClaudeProcess raises MCP_TOOL_TIMEOUT so
the 60s HTTP-MCP cap doesn't kill the wait; the run MCP is now wired for every
task, not just standalone ones. System prompt updated to reconcile 'unattended'.
This commit is contained in:
Mika Kuns
2026-06-25 22:53:34 +02:00
parent bec26b2232
commit c7f8280106
14 changed files with 308 additions and 26 deletions

View File

@@ -26,6 +26,12 @@ public sealed class HubBroadcaster : IPrimeBroadcaster, IRefineBroadcaster
public Task TaskUpdated(string taskId) =>
_hub.Clients.All.SendAsync("TaskUpdated", taskId);
public Task TaskQuestionAsked(string taskId, string questionId, string question) =>
_hub.Clients.All.SendAsync("TaskQuestionAsked", taskId, questionId, question);
public Task TaskQuestionResolved(string taskId, string questionId) =>
_hub.Clients.All.SendAsync("TaskQuestionResolved", taskId, questionId);
public Task ListUpdated(string listId) =>
_hub.Clients.All.SendAsync("ListUpdated", listId);

View File

@@ -56,6 +56,7 @@ public record WorktreeOverviewDto(
bool PathExistsOnDisk);
public record ForceRemoveResultDto(bool Removed, string? Reason);
public record PendingQuestionDto(string TaskId, string QuestionId, string Question);
public record MergeResultDto(string Status, IReadOnlyList<string> ConflictFiles, string? ErrorMessage);
public record MergePreviewDto(string Status, IReadOnlyList<string> ConflictFiles, int ChangedFileCount);
public record MergeTargetsDto(string DefaultBranch, IReadOnlyList<string> LocalBranches);
@@ -114,6 +115,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
private readonly WorkerConfig _cfg;
private readonly OnlineInboxConfig _onlineInboxConfig;
private readonly OnlineTokenStore _onlineTokenStore;
private readonly Runner.PendingQuestionRegistry _pendingQuestions;
private readonly LogRingBuffer? _logBuffer;
public WorkerHub(
@@ -139,6 +141,7 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
WorkerConfig cfg,
OnlineInboxConfig onlineInboxConfig,
OnlineTokenStore onlineTokenStore,
Runner.PendingQuestionRegistry pendingQuestions,
LogRingBuffer? logBuffer = null)
{
_queue = queue;
@@ -163,9 +166,22 @@ public sealed class WorkerHub : Microsoft.AspNetCore.SignalR.Hub
_cfg = cfg;
_onlineInboxConfig = onlineInboxConfig;
_onlineTokenStore = onlineTokenStore;
_pendingQuestions = pendingQuestions;
_logBuffer = logBuffer;
}
/// <summary>Deliver the user's answer to a question a running task raised via AskUser.
/// Returns false if no matching question is still pending (already answered or timed out).</summary>
public bool AnswerTaskQuestion(string taskId, string questionId, string answer) =>
_pendingQuestions.TryAnswer(taskId, questionId, answer ?? string.Empty);
/// <summary>The question a running task is currently blocked on, if any (for UI re-attach).</summary>
public PendingQuestionDto? GetPendingQuestion(string taskId)
{
var q = _pendingQuestions.Get(taskId);
return q is null ? null : new PendingQuestionDto(q.TaskId, q.QuestionId, q.Question);
}
/// <summary>Recent worker log records (last 30 min, all levels) for the Log Visualizer overlay.</summary>
public IReadOnlyList<WorkerLogRecord> GetRecentLogs() =>
_logBuffer?.Snapshot() ?? Array.Empty<WorkerLogRecord>();

View File

@@ -72,6 +72,7 @@ builder.Services.AddSingleton<GitService>();
builder.Services.AddSingleton<WorktreeManager>();
builder.Services.AddSingleton<ClaudeArgsBuilder>();
builder.Services.AddSingleton<TaskRunTokenRegistry>();
builder.Services.AddSingleton<PendingQuestionRegistry>();
builder.Services.AddSingleton<TaskRunner>();
builder.Services.AddSingleton<WorktreeMaintenanceService>();
builder.Services.AddSingleton<TaskResetService>();

View File

@@ -39,6 +39,11 @@ public sealed class ClaudeProcess : IClaudeProcess
foreach (var arg in arguments)
psi.ArgumentList.Add(arg);
// Claude Code caps HTTP MCP tool calls at 60 s unless MCP_TOOL_TIMEOUT is raised.
// The in-task AskUser tool blocks up to 3 min waiting for the user, so lift the cap
// (with margin) or that wait would be killed early. Harmless for every other tool.
psi.Environment["MCP_TOOL_TIMEOUT"] = "200000";
using var process = new Process { StartInfo = psi };
process.Start();

View File

@@ -0,0 +1,51 @@
using System.Collections.Concurrent;
namespace ClaudeDo.Worker.Runner;
public sealed record PendingQuestion(string TaskId, string QuestionId, string Question);
// In-memory store of questions a running task has raised via the AskUser MCP tool and is
// blocking on. One pending question per task (the run's process is blocked mid-tool-call,
// so it cannot ask twice at once). Kept out of the DB on purpose: a question that outlives
// a Worker restart is already dead (StaleTaskRecovery flips the run to Failed). Singleton.
public sealed class PendingQuestionRegistry
{
private readonly ConcurrentDictionary<string, Entry> _byTask = new();
private sealed record Entry(string QuestionId, string Question, TaskCompletionSource<string> Answer);
// Registers a question for the task and returns its id plus the awaitable answer.
// A second register for the same task replaces any stale entry.
public (string QuestionId, Task<string> Answer) Register(string taskId, string question)
{
var questionId = Guid.NewGuid().ToString("N");
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
_byTask[taskId] = new Entry(questionId, question, tcs);
return (questionId, tcs.Task);
}
// Delivers the answer to a waiting question. Returns false if no matching question is
// pending (already answered, timed out, or stale id).
public bool TryAnswer(string taskId, string questionId, string answer)
{
if (_byTask.TryGetValue(taskId, out var entry)
&& entry.QuestionId == questionId
&& _byTask.TryRemove(taskId, out _))
{
return entry.Answer.TrySetResult(answer ?? string.Empty);
}
return false;
}
public PendingQuestion? Get(string taskId) =>
_byTask.TryGetValue(taskId, out var entry)
? new PendingQuestion(taskId, entry.QuestionId, entry.Question)
: null;
// Drops a pending question without delivering an answer (timeout/cancel cleanup).
public void Remove(string taskId, string questionId)
{
if (_byTask.TryGetValue(taskId, out var entry) && entry.QuestionId == questionId)
_byTask.TryRemove(taskId, out _);
}
}

View File

@@ -13,12 +13,28 @@ public sealed class TaskRunMcpService
private readonly TaskRepository _tasks;
private readonly TaskRunMcpContextAccessor _ctx;
private readonly HubBroadcaster _broadcaster;
private readonly PendingQuestionRegistry _pending;
public TaskRunMcpService(TaskRepository tasks, TaskRunMcpContextAccessor ctx, HubBroadcaster broadcaster)
// How long a running task blocks waiting for the user to answer an AskUser question
// before it gives up and proceeds autonomously. NOTE: the spawned claude process must
// run with MCP_TOOL_TIMEOUT raised above this (ClaudeProcess sets it) — Claude Code
// otherwise caps HTTP MCP tool calls at 60 s and would kill the call early.
internal static readonly TimeSpan QuestionWindow = TimeSpan.FromMinutes(3);
internal const string TimeoutFallback =
"No response received within 3 minutes — proceed using your best judgment, "
+ "note the assumption you made, and continue.";
public TaskRunMcpService(
TaskRepository tasks,
TaskRunMcpContextAccessor ctx,
HubBroadcaster broadcaster,
PendingQuestionRegistry pending)
{
_tasks = tasks;
_ctx = ctx;
_broadcaster = broadcaster;
_pending = pending;
}
[McpServerTool, Description(
@@ -47,4 +63,57 @@ public sealed class TaskRunMcpService
await _broadcaster.TaskUpdated(callerId);
return new SuggestedImprovementDto(child.Id);
}
[McpServerTool, Description(
"Ask the user a question and wait up to 3 minutes for their answer. Use this ONLY "
+ "when you genuinely need a human decision to proceed correctly and a wrong guess "
+ "would be costly or hard to undo (an irreversible action, contradictory "
+ "requirements, or a real fork where both options have meaningful consequences). "
+ "Do NOT use it for routine choices you can reasonably make yourself — for those, "
+ "pick the most sensible option and continue. The returned string is the user's "
+ "answer; if no one responds in time it tells you to proceed on your own judgment.")]
public Task<string> AskUser(string question, CancellationToken cancellationToken)
{
var callerId = _ctx.Current.CallerTaskId;
return AwaitAnswerAsync(
_pending, callerId, question ?? string.Empty,
(questionId, q) => _broadcaster.TaskQuestionAsked(callerId, questionId, q),
questionId => _broadcaster.TaskQuestionResolved(callerId, questionId),
QuestionWindow, cancellationToken);
}
// Registers the question, signals it, and blocks until the user answers, the window
// elapses (returns the fallback), or the run is cancelled (rethrows). Pure except for
// the two broadcast callbacks, so it is unit-testable without a real hub or DbContext.
internal static async Task<string> AwaitAnswerAsync(
PendingQuestionRegistry pending,
string taskId,
string question,
Func<string, string, Task> onAsked,
Func<string, Task> onResolved,
TimeSpan timeout,
CancellationToken ct)
{
var (questionId, answer) = pending.Register(taskId, question);
await onAsked(questionId, question);
try
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(timeout);
try
{
return await answer.WaitAsync(timeoutCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// The window elapsed (not a run cancellation) — let the agent carry on.
return TimeoutFallback;
}
}
finally
{
pending.Remove(taskId, questionId);
await onResolved(questionId);
}
}
}

View File

@@ -90,21 +90,23 @@ public sealed class TaskRunner
var resolvedConfig = await ResolveConfigAsync(task, listConfig, null, ct);
// Improvement-eligible runs get a per-run MCP identity so the agent can file
// out-of-scope follow-ups via SuggestImprovement. Children and planning runs do not.
if (task.ParentTaskId is null && task.PlanningPhase == PlanningPhase.None)
// Every run gets a per-run MCP identity so the agent can ask the user a
// mid-run question via AskUser. Improvement-eligible (standalone top-level)
// runs additionally get SuggestImprovement for filing out-of-scope follow-ups.
mcpToken = TaskRunTokenRegistry.GenerateToken();
_tokens.Register(mcpToken, task.Id);
Directory.CreateDirectory(_cfg.LogRoot);
mcpConfigPath = Path.Combine(_cfg.LogRoot, $"{task.Id}_mcp.json");
await File.WriteAllTextAsync(mcpConfigPath, BuildRunMcpConfigJson(mcpToken), ct);
var improvementEligible = task.ParentTaskId is null && task.PlanningPhase == PlanningPhase.None;
resolvedConfig = resolvedConfig with
{
mcpToken = TaskRunTokenRegistry.GenerateToken();
_tokens.Register(mcpToken, task.Id);
Directory.CreateDirectory(_cfg.LogRoot);
mcpConfigPath = Path.Combine(_cfg.LogRoot, $"{task.Id}_mcp.json");
await File.WriteAllTextAsync(mcpConfigPath, BuildRunMcpConfigJson(mcpToken), ct);
resolvedConfig = resolvedConfig with
{
McpConfigPath = mcpConfigPath,
AllowedTools = "mcp__claudedo_run__SuggestImprovement",
};
}
McpConfigPath = mcpConfigPath,
AllowedTools = improvementEligible
? "mcp__claudedo_run__AskUser,mcp__claudedo_run__SuggestImprovement"
: "mcp__claudedo_run__AskUser",
};
var now = DateTime.UtcNow;
// The queue picker claims Queued→Running atomically (incl. StartedAt) before