Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,132 @@ describe("JsonEventEmitter streaming deltas", () => {
expect(output[0]).toMatchObject({ content: "gh" })
expect(output[1]).toMatchObject({ content: " pr" })
})

it("streams say:command_output as deltas and correlates tool_result id to execute_command", () => {
const { stdout, lines } = createMockStdout()
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
const commandId = 404
const outputTs = 405

emitMessage(
emitter,
createAskMessage({
ts: commandId,
ask: "command",
partial: false,
text: "echo hello",
}),
)

emitMessage(emitter, {
ts: outputTs,
type: "say",
say: "command_output",
partial: true,
text: "line1\n",
} as ClineMessage)
emitMessage(emitter, {
ts: outputTs,
type: "say",
say: "command_output",
partial: true,
text: "line1\nline2\n",
} as ClineMessage)
emitMessage(emitter, {
ts: outputTs,
type: "say",
say: "command_output",
partial: false,
text: "line1\nline2\n",
} as ClineMessage)

const output = lines()
expect(output).toHaveLength(4)
expect(output[0]).toMatchObject({
type: "tool_use",
id: commandId,
subtype: "command",
tool_use: { name: "execute_command", input: { command: "echo hello" } },
done: true,
})
expect(output[1]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: "line1\n" },
})
expect(output[2]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: "line2\n" },
})
expect(output[3]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command" },
done: true,
})
expect(output[3]).not.toHaveProperty("tool_result.output")
})

it("prefers status-driven command output streaming and suppresses duplicate say completion", () => {
const { stdout, lines } = createMockStdout()
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
const commandId = 505

emitMessage(
emitter,
createAskMessage({
ts: commandId,
ask: "command",
partial: false,
text: "echo streamed",
}),
)

emitter.emitCommandOutputChunk("line1\n")
emitter.emitCommandOutputChunk("line1\nline2\n")
emitter.emitCommandOutputDone()

// This completion say is expected from the extension, but should be suppressed
// because we already streamed and completed via commandExecutionStatus.
emitMessage(emitter, {
ts: 999,
type: "say",
say: "command_output",
partial: false,
text: "line1\nline2\n",
} as ClineMessage)

const output = lines()
expect(output).toHaveLength(4)
expect(output[0]).toMatchObject({
type: "tool_use",
id: commandId,
subtype: "command",
tool_use: { name: "execute_command", input: { command: "echo streamed" } },
done: true,
})
expect(output[1]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: "line1\n" },
})
expect(output[2]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: "line2\n" },
})
expect(output[3]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command" },
done: true,
})
})
})
123 changes: 119 additions & 4 deletions apps/cli/src/agent/json-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ export class JsonEventEmitter {
private previousContent = new Map<number, string>()
// Track previous tool-use content for structured (non-append-only) delta computation.
private previousToolUseContent = new Map<number, string>()
// Track the currently active execute_command tool_use id for command_output correlation.
private activeCommandToolUseId: number | undefined
// Track command output snapshots by command tool-use id for delta computation.
private previousCommandOutputByToolUseId = new Map<number, string>()
// Track command ids whose output is being streamed from commandExecutionStatus updates.
private statusDrivenCommandOutputIds = new Set<number>()
// Track command ids that already emitted a terminal command_output done event.
private completedCommandOutputIds = new Set<number>()
// Suppress the next say:command_output completion message after status-driven streaming.
private suppressNextCommandOutputSay = false
// Track the completion result content
private completionResultContent: string | undefined
// Track the latest assistant text as a fallback for result.content.
Expand Down Expand Up @@ -288,6 +298,90 @@ export class JsonEventEmitter {
return this.mode === "stream-json" && content === null
}

private computeCommandOutputDelta(commandId: number, fullOutput: string | undefined): string | null {
const normalized = fullOutput ?? ""
const previous = this.previousCommandOutputByToolUseId.get(commandId) || ""

if (normalized === previous) {
return null
}

this.previousCommandOutputByToolUseId.set(commandId, normalized)
return normalized.startsWith(previous) ? normalized.slice(previous.length) : normalized
}

private emitCommandOutputEvent(commandId: number, fullOutput: string | undefined, isDone: boolean): void {
if (this.mode === "stream-json") {
const outputDelta = this.computeCommandOutputDelta(commandId, fullOutput)
const event: JsonEvent = {
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command" },
}

if (outputDelta !== null && outputDelta.length > 0) {
event.tool_result = { name: "execute_command", output: outputDelta }
}

if (isDone) {
event.done = true
this.previousCommandOutputByToolUseId.delete(commandId)
this.statusDrivenCommandOutputIds.delete(commandId)
this.completedCommandOutputIds.add(commandId)
if (this.activeCommandToolUseId === commandId) {
this.activeCommandToolUseId = undefined
}
}

// Suppress empty partial updates that carry no delta.
if (!isDone && outputDelta === null) {
return
}

this.emitEvent(event)
return
}

this.emitEvent({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: fullOutput },
...(isDone ? { done: true } : {}),
})

if (isDone) {
this.previousCommandOutputByToolUseId.delete(commandId)
this.statusDrivenCommandOutputIds.delete(commandId)
this.completedCommandOutputIds.add(commandId)
if (this.activeCommandToolUseId === commandId) {
this.activeCommandToolUseId = undefined
}
}
}

public emitCommandOutputChunk(outputSnapshot: string): void {
const commandId = this.activeCommandToolUseId
if (commandId === undefined) {
return
}

this.statusDrivenCommandOutputIds.add(commandId)
this.emitCommandOutputEvent(commandId, outputSnapshot, false)
}

public emitCommandOutputDone(): void {
const commandId = this.activeCommandToolUseId
if (commandId === undefined) {
return
}

this.statusDrivenCommandOutputIds.add(commandId)
this.suppressNextCommandOutputSay = true
this.emitCommandOutputEvent(commandId, undefined, true)
}

/**
* Get content to send for a message (delta for streaming, full for json mode).
*/
Expand Down Expand Up @@ -392,10 +486,7 @@ export class JsonEventEmitter {
break

case "command_output":
this.emitEvent({
type: "tool_result",
tool_result: { name: "execute_command", output: msg.text },
})
this.handleCommandOutputMessage(msg, isDone)
break

case "user_feedback":
Expand Down Expand Up @@ -517,6 +608,10 @@ export class JsonEventEmitter {
const toolInfo = parseToolInfo(msg.text)

if (subtype === "command") {
this.activeCommandToolUseId = msg.ts
this.completedCommandOutputIds.delete(msg.ts)
this.suppressNextCommandOutputSay = false

if (isStreamingPartial) {
const commandDelta = this.computeStructuredDelta(msg.ts, msg.text)
if (commandDelta === null) {
Expand Down Expand Up @@ -595,6 +690,21 @@ export class JsonEventEmitter {
})
}

private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void {
if (this.suppressNextCommandOutputSay) {
if (isDone) {
this.suppressNextCommandOutputSay = false
}
return
}

const commandId = this.activeCommandToolUseId ?? msg.ts
if (this.statusDrivenCommandOutputIds.has(commandId) || this.completedCommandOutputIds.has(commandId)) {
return
}
this.emitCommandOutputEvent(commandId, msg.text, isDone)
}

/**
* Handle task completion and emit result event.
*/
Expand Down Expand Up @@ -711,6 +821,11 @@ export class JsonEventEmitter {
this.seenMessageIds.clear()
this.previousContent.clear()
this.previousToolUseContent.clear()
this.activeCommandToolUseId = undefined
this.previousCommandOutputByToolUseId.clear()
this.statusDrivenCommandOutputIds.clear()
this.completedCommandOutputIds.clear()
this.suppressNextCommandOutputSay = false
this.completionResultContent = undefined
this.lastAssistantText = undefined
this.expectPromptEchoAsUser = true
Expand Down
47 changes: 45 additions & 2 deletions apps/cli/src/commands/cli/stdin-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,46 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId

const onExtensionMessage = (message: {
type?: string
text?: unknown
state?: {
currentTaskId?: unknown
currentTaskItem?: { id?: unknown }
messageQueue?: unknown
}
}) => {
if (message.type === "commandExecutionStatus") {
if (typeof message.text !== "string") {
return
}

let parsedStatus: unknown
try {
parsedStatus = JSON.parse(message.text)
} catch {
return
}

if (!isRecord(parsedStatus) || typeof parsedStatus.status !== "string") {
return
}

if (parsedStatus.status === "output" && typeof parsedStatus.output === "string") {
jsonEmitter.emitCommandOutputChunk(parsedStatus.output)
return
}

if (
parsedStatus.status === "exited" ||
parsedStatus.status === "timeout" ||
parsedStatus.status === "fallback"
) {
jsonEmitter.emitCommandOutputDone()
return
}

return
}

if (message.type !== "state") {
return
}
Expand Down Expand Up @@ -463,7 +497,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
}

switch (stdinCommand.command) {
case "start":
case "start": {
// A task can emit completion events before runTask() finalizers run.
// Wait for full settlement to avoid false "task_busy" on immediate next start.
// Safe from races: `for await` processes stdin commands serially, so no
Expand Down Expand Up @@ -503,8 +537,16 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
success: true,
})

// In CLI stdin-stream mode, default to the execa terminal provider so
// command output can be streamed deterministically. Explicit per-request
// config still wins.
const taskConfiguration = {
terminalShellIntegrationDisabled: true,
...(stdinCommand.configuration ?? {}),
}

activeTaskPromise = host
.runTask(stdinCommand.prompt, latestTaskId, stdinCommand.configuration)
.runTask(stdinCommand.prompt, latestTaskId, taskConfiguration)
.catch((error) => {
const message = error instanceof Error ? error.message : String(error)

Expand Down Expand Up @@ -559,6 +601,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
})

break
}

case "message": {
// If cancel was requested, wait briefly for the task to be rehydrated
Expand Down
Loading
Loading