diff --git a/apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts b/apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts index 329e06d24a0..d80e66ceb55 100644 --- a/apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts +++ b/apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts @@ -215,4 +215,132 @@ describe("JsonEventEmitter streaming deltas", () => { expect(output[0]).toMatchObject({ content: "gh" }) expect(output[1]).toMatchObject({ content: " pr" }) }) + + it("streams say:command_output as deltas and correlates tool_result id to execute_command", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + const commandId = 404 + const outputTs = 405 + + emitMessage( + emitter, + createAskMessage({ + ts: commandId, + ask: "command", + partial: false, + text: "echo hello", + }), + ) + + emitMessage(emitter, { + ts: outputTs, + type: "say", + say: "command_output", + partial: true, + text: "line1\n", + } as ClineMessage) + emitMessage(emitter, { + ts: outputTs, + type: "say", + say: "command_output", + partial: true, + text: "line1\nline2\n", + } as ClineMessage) + emitMessage(emitter, { + ts: outputTs, + type: "say", + say: "command_output", + partial: false, + text: "line1\nline2\n", + } as ClineMessage) + + const output = lines() + expect(output).toHaveLength(4) + expect(output[0]).toMatchObject({ + type: "tool_use", + id: commandId, + subtype: "command", + tool_use: { name: "execute_command", input: { command: "echo hello" } }, + done: true, + }) + expect(output[1]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command", output: "line1\n" }, + }) + expect(output[2]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command", output: "line2\n" }, + }) + expect(output[3]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command" }, + done: true, + }) + expect(output[3]).not.toHaveProperty("tool_result.output") + }) + + it("prefers status-driven command output streaming and suppresses duplicate say completion", () => { + const { stdout, lines } = createMockStdout() + const emitter = new JsonEventEmitter({ mode: "stream-json", stdout }) + const commandId = 505 + + emitMessage( + emitter, + createAskMessage({ + ts: commandId, + ask: "command", + partial: false, + text: "echo streamed", + }), + ) + + emitter.emitCommandOutputChunk("line1\n") + emitter.emitCommandOutputChunk("line1\nline2\n") + emitter.emitCommandOutputDone() + + // This completion say is expected from the extension, but should be suppressed + // because we already streamed and completed via commandExecutionStatus. + emitMessage(emitter, { + ts: 999, + type: "say", + say: "command_output", + partial: false, + text: "line1\nline2\n", + } as ClineMessage) + + const output = lines() + expect(output).toHaveLength(4) + expect(output[0]).toMatchObject({ + type: "tool_use", + id: commandId, + subtype: "command", + tool_use: { name: "execute_command", input: { command: "echo streamed" } }, + done: true, + }) + expect(output[1]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command", output: "line1\n" }, + }) + expect(output[2]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command", output: "line2\n" }, + }) + expect(output[3]).toMatchObject({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command" }, + done: true, + }) + }) }) diff --git a/apps/cli/src/agent/json-event-emitter.ts b/apps/cli/src/agent/json-event-emitter.ts index f46c39be506..e0f5d5bea64 100644 --- a/apps/cli/src/agent/json-event-emitter.ts +++ b/apps/cli/src/agent/json-event-emitter.ts @@ -107,6 +107,16 @@ export class JsonEventEmitter { private previousContent = new Map() // Track previous tool-use content for structured (non-append-only) delta computation. private previousToolUseContent = new Map() + // Track the currently active execute_command tool_use id for command_output correlation. + private activeCommandToolUseId: number | undefined + // Track command output snapshots by command tool-use id for delta computation. + private previousCommandOutputByToolUseId = new Map() + // Track command ids whose output is being streamed from commandExecutionStatus updates. + private statusDrivenCommandOutputIds = new Set() + // Track command ids that already emitted a terminal command_output done event. + private completedCommandOutputIds = new Set() + // Suppress the next say:command_output completion message after status-driven streaming. + private suppressNextCommandOutputSay = false // Track the completion result content private completionResultContent: string | undefined // Track the latest assistant text as a fallback for result.content. @@ -288,6 +298,90 @@ export class JsonEventEmitter { return this.mode === "stream-json" && content === null } + private computeCommandOutputDelta(commandId: number, fullOutput: string | undefined): string | null { + const normalized = fullOutput ?? "" + const previous = this.previousCommandOutputByToolUseId.get(commandId) || "" + + if (normalized === previous) { + return null + } + + this.previousCommandOutputByToolUseId.set(commandId, normalized) + return normalized.startsWith(previous) ? normalized.slice(previous.length) : normalized + } + + private emitCommandOutputEvent(commandId: number, fullOutput: string | undefined, isDone: boolean): void { + if (this.mode === "stream-json") { + const outputDelta = this.computeCommandOutputDelta(commandId, fullOutput) + const event: JsonEvent = { + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command" }, + } + + if (outputDelta !== null && outputDelta.length > 0) { + event.tool_result = { name: "execute_command", output: outputDelta } + } + + if (isDone) { + event.done = true + this.previousCommandOutputByToolUseId.delete(commandId) + this.statusDrivenCommandOutputIds.delete(commandId) + this.completedCommandOutputIds.add(commandId) + if (this.activeCommandToolUseId === commandId) { + this.activeCommandToolUseId = undefined + } + } + + // Suppress empty partial updates that carry no delta. + if (!isDone && outputDelta === null) { + return + } + + this.emitEvent(event) + return + } + + this.emitEvent({ + type: "tool_result", + id: commandId, + subtype: "command", + tool_result: { name: "execute_command", output: fullOutput }, + ...(isDone ? { done: true } : {}), + }) + + if (isDone) { + this.previousCommandOutputByToolUseId.delete(commandId) + this.statusDrivenCommandOutputIds.delete(commandId) + this.completedCommandOutputIds.add(commandId) + if (this.activeCommandToolUseId === commandId) { + this.activeCommandToolUseId = undefined + } + } + } + + public emitCommandOutputChunk(outputSnapshot: string): void { + const commandId = this.activeCommandToolUseId + if (commandId === undefined) { + return + } + + this.statusDrivenCommandOutputIds.add(commandId) + this.emitCommandOutputEvent(commandId, outputSnapshot, false) + } + + public emitCommandOutputDone(): void { + const commandId = this.activeCommandToolUseId + if (commandId === undefined) { + return + } + + this.statusDrivenCommandOutputIds.add(commandId) + this.suppressNextCommandOutputSay = true + this.emitCommandOutputEvent(commandId, undefined, true) + } + /** * Get content to send for a message (delta for streaming, full for json mode). */ @@ -392,10 +486,7 @@ export class JsonEventEmitter { break case "command_output": - this.emitEvent({ - type: "tool_result", - tool_result: { name: "execute_command", output: msg.text }, - }) + this.handleCommandOutputMessage(msg, isDone) break case "user_feedback": @@ -517,6 +608,10 @@ export class JsonEventEmitter { const toolInfo = parseToolInfo(msg.text) if (subtype === "command") { + this.activeCommandToolUseId = msg.ts + this.completedCommandOutputIds.delete(msg.ts) + this.suppressNextCommandOutputSay = false + if (isStreamingPartial) { const commandDelta = this.computeStructuredDelta(msg.ts, msg.text) if (commandDelta === null) { @@ -595,6 +690,21 @@ export class JsonEventEmitter { }) } + private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void { + if (this.suppressNextCommandOutputSay) { + if (isDone) { + this.suppressNextCommandOutputSay = false + } + return + } + + const commandId = this.activeCommandToolUseId ?? msg.ts + if (this.statusDrivenCommandOutputIds.has(commandId) || this.completedCommandOutputIds.has(commandId)) { + return + } + this.emitCommandOutputEvent(commandId, msg.text, isDone) + } + /** * Handle task completion and emit result event. */ @@ -711,6 +821,11 @@ export class JsonEventEmitter { this.seenMessageIds.clear() this.previousContent.clear() this.previousToolUseContent.clear() + this.activeCommandToolUseId = undefined + this.previousCommandOutputByToolUseId.clear() + this.statusDrivenCommandOutputIds.clear() + this.completedCommandOutputIds.clear() + this.suppressNextCommandOutputSay = false this.completionResultContent = undefined this.lastAssistantText = undefined this.expectPromptEchoAsUser = true diff --git a/apps/cli/src/commands/cli/stdin-stream.ts b/apps/cli/src/commands/cli/stdin-stream.ts index 9ad89ef1c7d..318328a3961 100644 --- a/apps/cli/src/commands/cli/stdin-stream.ts +++ b/apps/cli/src/commands/cli/stdin-stream.ts @@ -349,12 +349,46 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId const onExtensionMessage = (message: { type?: string + text?: unknown state?: { currentTaskId?: unknown currentTaskItem?: { id?: unknown } messageQueue?: unknown } }) => { + if (message.type === "commandExecutionStatus") { + if (typeof message.text !== "string") { + return + } + + let parsedStatus: unknown + try { + parsedStatus = JSON.parse(message.text) + } catch { + return + } + + if (!isRecord(parsedStatus) || typeof parsedStatus.status !== "string") { + return + } + + if (parsedStatus.status === "output" && typeof parsedStatus.output === "string") { + jsonEmitter.emitCommandOutputChunk(parsedStatus.output) + return + } + + if ( + parsedStatus.status === "exited" || + parsedStatus.status === "timeout" || + parsedStatus.status === "fallback" + ) { + jsonEmitter.emitCommandOutputDone() + return + } + + return + } + if (message.type !== "state") { return } @@ -463,7 +497,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId } switch (stdinCommand.command) { - case "start": + case "start": { // A task can emit completion events before runTask() finalizers run. // Wait for full settlement to avoid false "task_busy" on immediate next start. // Safe from races: `for await` processes stdin commands serially, so no @@ -503,8 +537,16 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId success: true, }) + // In CLI stdin-stream mode, default to the execa terminal provider so + // command output can be streamed deterministically. Explicit per-request + // config still wins. + const taskConfiguration = { + terminalShellIntegrationDisabled: true, + ...(stdinCommand.configuration ?? {}), + } + activeTaskPromise = host - .runTask(stdinCommand.prompt, latestTaskId, stdinCommand.configuration) + .runTask(stdinCommand.prompt, latestTaskId, taskConfiguration) .catch((error) => { const message = error instanceof Error ? error.message : String(error) @@ -559,6 +601,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId }) break + } case "message": { // If cancel was requested, wait briefly for the task to be rehydrated diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 1d4320493a0..4f8a5e49fdc 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -1383,6 +1383,9 @@ export class Task extends EventEmitter implements TaskLike { // block (via the `pWaitFor`). const isBlocking = !(this.askResponse !== undefined || this.lastMessageTs !== askTs) const isMessageQueued = !this.messageQueueService.isEmpty() + // Keep queued user messages intact during command_output asks. Those asks + // are terminal flow-control, not conversational turns. + const shouldDrainQueuedMessageForAsk = type !== "command_output" const isStatusMutable = !partial && isBlocking && !isMessageQueued && approval.decision === "ask" if (isStatusMutable) { @@ -1423,7 +1426,7 @@ export class Task extends EventEmitter implements TaskLike { }, statusMutationTimeout), ) } - } else if (isMessageQueued) { + } else if (isMessageQueued && shouldDrainQueuedMessageForAsk) { const message = this.messageQueueService.dequeueMessage() if (message) { @@ -1450,7 +1453,7 @@ export class Task extends EventEmitter implements TaskLike { // If a queued message arrives while we're blocked on an ask (e.g. a follow-up // suggestion click that was incorrectly queued due to UI state), consume it // immediately so the task doesn't hang. - if (!this.messageQueueService.isEmpty()) { + if (shouldDrainQueuedMessageForAsk && !this.messageQueueService.isEmpty()) { const message = this.messageQueueService.dequeueMessage() if (message) { // If this is a tool approval ask, we need to approve first (yesButtonClicked) diff --git a/src/core/task/__tests__/ask-queued-message-drain.spec.ts b/src/core/task/__tests__/ask-queued-message-drain.spec.ts index 3b4097a9407..06f577881ec 100644 --- a/src/core/task/__tests__/ask-queued-message-drain.spec.ts +++ b/src/core/task/__tests__/ask-queued-message-drain.spec.ts @@ -35,4 +35,38 @@ describe("Task.ask queued message drain", () => { expect(result.response).toBe("messageResponse") expect(result.text).toBe("picked answer") }) + + it("does not consume queued messages for command_output asks", async () => { + const task = Object.create(Task.prototype) as Task + ;(task as any).abort = false + ;(task as any).clineMessages = [] + ;(task as any).askResponse = undefined + ;(task as any).askResponseText = undefined + ;(task as any).askResponseImages = undefined + ;(task as any).lastMessageTs = undefined + + const { MessageQueueService } = await import("../../message-queue/MessageQueueService") + ;(task as any).messageQueueService = new MessageQueueService() + ;(task as any).addToClineMessages = vi.fn(async () => {}) + ;(task as any).saveClineMessages = vi.fn(async () => {}) + ;(task as any).updateClineMessage = vi.fn(async () => {}) + ;(task as any).cancelAutoApprovalTimeout = vi.fn(() => {}) + ;(task as any).checkpointSave = vi.fn(async () => {}) + ;(task as any).emit = vi.fn() + ;(task as any).providerRef = { deref: () => undefined } + + const askPromise = task.ask("command_output", "command is still running...", false) + ;(task as any).messageQueueService.addMessage("1+1=?") + + setTimeout(() => { + task.approveAsk() + }, 0) + + const result = await askPromise + + expect(result.response).toBe("yesButtonClicked") + expect(result.text).toBeUndefined() + expect((task as any).messageQueueService.isEmpty()).toBe(false) + expect((task as any).messageQueueService.messages[0]?.text).toBe("1+1=?") + }) }) diff --git a/src/core/tools/ExecuteCommandTool.ts b/src/core/tools/ExecuteCommandTool.ts index cb6fc6ff023..7d690d67ddd 100644 --- a/src/core/tools/ExecuteCommandTool.ts +++ b/src/core/tools/ExecuteCommandTool.ts @@ -218,6 +218,53 @@ export async function executeCommandInTerminal( // Bound accumulated output buffer size to prevent unbounded memory growth for long-running commands. // The interceptor preserves full output; this buffer is only for UI display (100KB limit). const maxAccumulatedOutputSize = 100_000 + const commandOutputStreamThrottleMs = 150 + let latestCompressedOutput = "" + let lastQueuedCommandOutput = "" + let lastCommandOutputEmitAt = 0 + let pendingCommandOutputEmitTimer: NodeJS.Timeout | undefined + let commandOutputSayChain: Promise = Promise.resolve() + + const queueCommandOutputMessage = (text: string, partial: boolean, force = false): Promise => { + if (!force && text === lastQueuedCommandOutput) { + return commandOutputSayChain + } + + lastQueuedCommandOutput = text + commandOutputSayChain = commandOutputSayChain + .then(async () => { + await task.say("command_output", text, undefined, partial, undefined, undefined, { + isNonInteractive: true, + }) + }) + .catch((error) => { + console.error("[ExecuteCommandTool] Failed to publish command output:", error) + }) + + return commandOutputSayChain + } + + const schedulePartialCommandOutputUpdate = () => { + if (!latestCompressedOutput || completed) { + return + } + + const emitUpdate = () => { + pendingCommandOutputEmitTimer = undefined + lastCommandOutputEmitAt = Date.now() + void queueCommandOutputMessage(latestCompressedOutput, true) + } + + const elapsed = Date.now() - lastCommandOutputEmitAt + if (elapsed >= commandOutputStreamThrottleMs) { + emitUpdate() + return + } + + if (!pendingCommandOutputEmitTimer) { + pendingCommandOutputEmitTimer = setTimeout(emitUpdate, commandOutputStreamThrottleMs - elapsed) + } + } // Track when onCompleted callback finishes to avoid race condition. // The callback is async but Terminal/ExecaTerminal don't await it, so we track completion @@ -242,8 +289,10 @@ export async function executeCommandInTerminal( // Continue sending compressed output to webview for UI display (unchanged behavior) const compressedOutput = Terminal.compressTerminalOutput(accumulatedOutput) + latestCompressedOutput = compressedOutput const status: CommandExecutionStatus = { executionId, status: "output", output: compressedOutput } provider?.postMessageToWebview({ type: "commandExecutionStatus", text: JSON.stringify(status) }) + schedulePartialCommandOutputUpdate() if (runInBackground || hasAskedForCommandOutput) { return @@ -266,6 +315,9 @@ export async function executeCommandInTerminal( }, onCompleted: async (output: string | undefined) => { try { + clearTimeout(pendingCommandOutputEmitTimer) + pendingCommandOutputEmitTimer = undefined + // Finalize interceptor and get persisted result. // We await finalize() to ensure the artifact file is fully flushed // before we advertise the artifact_id to the LLM. @@ -275,8 +327,12 @@ export async function executeCommandInTerminal( // Continue using compressed output for UI display result = Terminal.compressTerminalOutput(output ?? "") + latestCompressedOutput = result - task.say("command_output", result) + // Preserve order: wait for queued partial updates, then emit the final + // non-partial command_output update. + await commandOutputSayChain + await queueCommandOutputMessage(result, false, true) completed = true } finally { // Signal that onCompleted has finished, so the main code can safely use persistedResult @@ -372,6 +428,7 @@ export async function executeCommandInTerminal( } finally { clearTimeout(agentTimeoutId) clearTimeout(userTimeoutId) + clearTimeout(pendingCommandOutputEmitTimer) task.terminalProcess = undefined }