diff --git a/apps/cli/package.json b/apps/cli/package.json index b00805b0589..e5b2c889d65 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -17,6 +17,7 @@ "build:extension": "pnpm --filter roo-cline bundle", "dev": "ROO_AUTH_BASE_URL=https://app.roocode.com ROO_SDK_BASE_URL=https://cloud-api.roocode.com ROO_CODE_PROVIDER_URL=https://api.roocode.com/proxy tsx src/index.ts", "dev:local": "ROO_AUTH_BASE_URL=http://localhost:3000 ROO_SDK_BASE_URL=http://localhost:3001 ROO_CODE_PROVIDER_URL=http://localhost:8080/proxy tsx src/index.ts", + "dev:test-stdin": "tsx scripts/test-stdin-stream.ts", "clean": "rimraf dist .turbo" }, "dependencies": { diff --git a/apps/cli/scripts/test-stdin-stream.ts b/apps/cli/scripts/test-stdin-stream.ts new file mode 100644 index 00000000000..5212df5b335 --- /dev/null +++ b/apps/cli/scripts/test-stdin-stream.ts @@ -0,0 +1,67 @@ +import path from "path" +import { fileURLToPath } from "url" +import readline from "readline" + +import { execa } from "execa" + +const __dirname = path.dirname(fileURLToPath(import.meta.url)) +const cliRoot = path.resolve(__dirname, "..") + +async function main() { + const child = execa( + "pnpm", + ["dev", "--print", "--stdin-prompt-stream", "--provider", "roo", "--output-format", "stream-json"], + { + cwd: cliRoot, + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + reject: false, + forceKillAfterDelay: 2_000, + }, + ) + + child.stdout?.on("data", (chunk) => process.stdout.write(chunk)) + child.stderr?.on("data", (chunk) => process.stderr.write(chunk)) + + console.log("[wrapper] Type a message and press Enter to send it.") + console.log("[wrapper] Type /exit to close stdin and let the CLI finish.") + + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: true, + }) + + rl.on("line", (line) => { + if (line.trim() === "/exit") { + console.log("[wrapper] Closing stdin...") + child.stdin?.end() + rl.close() + return + } + + if (!child.stdin?.destroyed) { + child.stdin?.write(`${line}\n`) + } + }) + + const onSignal = (signal: NodeJS.Signals) => { + console.log(`[wrapper] Received ${signal}, forwarding to CLI...`) + rl.close() + child.kill(signal) + } + + process.on("SIGINT", () => onSignal("SIGINT")) + process.on("SIGTERM", () => onSignal("SIGTERM")) + + const result = await child + rl.close() + console.log(`[wrapper] CLI exited with code ${result.exitCode}`) + process.exit(result.exitCode ?? 1) +} + +main().catch((error) => { + console.error("[wrapper] Fatal error:", error) + process.exit(1) +}) diff --git a/apps/cli/src/agent/json-event-emitter.ts b/apps/cli/src/agent/json-event-emitter.ts index 4a6d2629aee..bdf96a763de 100644 --- a/apps/cli/src/agent/json-event-emitter.ts +++ b/apps/cli/src/agent/json-event-emitter.ts @@ -19,7 +19,8 @@ import type { ClineMessage } from "@roo-code/types" import type { JsonEvent, JsonEventCost, JsonFinalOutput } from "@/types/json-events.js" import type { ExtensionClient } from "./extension-client.js" -import type { TaskCompletedEvent } from "./events.js" +import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js" +import { AgentLoopState } from "./agent-state.js" /** * Options for JsonEventEmitter. @@ -108,10 +109,11 @@ export class JsonEventEmitter { // Subscribe to message events const unsubMessage = client.on("message", (msg) => this.handleMessage(msg, false)) const unsubMessageUpdated = client.on("messageUpdated", (msg) => this.handleMessage(msg, true)) + const unsubStateChange = client.on("stateChange", (event) => this.handleStateChange(event)) const unsubTaskCompleted = client.on("taskCompleted", (event) => this.handleTaskCompleted(event)) const unsubError = client.on("error", (error) => this.handleError(error)) - this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubTaskCompleted, unsubError) + this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubStateChange, unsubTaskCompleted, unsubError) // Emit init event this.emitEvent({ @@ -121,6 +123,16 @@ export class JsonEventEmitter { }) } + private handleStateChange(event: AgentStateChangeEvent): void { + // Only treat the next say:text as a prompt echo when a new task starts. + if ( + event.previousState.state === AgentLoopState.NO_TASK && + event.currentState.state !== AgentLoopState.NO_TASK + ) { + this.expectPromptEchoAsUser = true + } + } + /** * Detach from the client and clean up subscriptions. */ @@ -257,6 +269,9 @@ export class JsonEventEmitter { case "user_feedback": case "user_feedback_diff": this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone)) + if (isDone) { + this.expectPromptEchoAsUser = false + } break case "api_req_started": { @@ -387,9 +402,6 @@ export class JsonEventEmitter { if (this.mode === "json") { this.outputFinalResult(event.success, resultContent) } - - // Next task in the same process starts with a new echoed prompt. - this.expectPromptEchoAsUser = true } /** diff --git a/apps/cli/src/commands/cli/run.ts b/apps/cli/src/commands/cli/run.ts index c7a01450a48..365febb9f86 100644 --- a/apps/cli/src/commands/cli/run.ts +++ b/apps/cli/src/commands/cli/run.ts @@ -4,6 +4,7 @@ import { createInterface } from "readline" import { fileURLToPath } from "url" import { createElement } from "react" +import pWaitFor from "p-wait-for" import { setLogger } from "@roo-code/vscode-shim" @@ -306,15 +307,149 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption if (useStdinPromptStream) { let hasReceivedStdinPrompt = false + // stdin stream mode may start at most one task in this process. + let startedTaskFromStdin = false + let activeTaskPromise: Promise | null = null + let fatalStreamError: Error | null = null + // Extension-owned queue depth mirrored from state pushes. + // CLI does not maintain its own prompt queue. + let extensionQueueDepth = 0 + + const waitForInitialState = async () => { + // Give the extension a brief chance to publish initial state so + // we can continue an existing task instead of creating a new one. + await pWaitFor( + () => { + if (fatalStreamError) { + throw fatalStreamError + } + + return host.client.isInitialized() + }, + { interval: 25, timeout: 2_000 }, + ).catch(() => { + // Best-effort wait only; continuing preserves previous behavior. + }) + + if (fatalStreamError) { + throw fatalStreamError + } + } - for await (const stdinPrompt of readPromptsFromStdinLines()) { - hasReceivedStdinPrompt = true - await host.runTask(stdinPrompt) - jsonEmitter?.clear() + const waitForActiveTask = async () => { + await pWaitFor( + () => { + if (fatalStreamError) { + throw fatalStreamError + } + + if (!host.client.hasActiveTask()) { + if (!activeTaskPromise && startedTaskFromStdin) { + throw new Error("task is no longer active; cannot continue conversation from stdin") + } + + return false + } + + return true + }, + { interval: 25, timeout: 5_000 }, + ) } - if (!hasReceivedStdinPrompt) { - throw new Error("no prompt provided via stdin") + const startInitialTask = async (taskPrompt: string) => { + startedTaskFromStdin = true + + activeTaskPromise = host + .runTask(taskPrompt) + .catch((error) => { + fatalStreamError = error instanceof Error ? error : new Error(String(error)) + }) + .finally(() => { + activeTaskPromise = null + }) + + await waitForActiveTask() + } + + const enqueueContinuation = async (text: string) => { + if (!host.client.hasActiveTask()) { + await waitForActiveTask() + } + + // Delegate ordering/drain behavior to the extension message queue. + host.sendToExtension({ type: "queueMessage", text }) + } + + const offClientError = host.client.on("error", (error) => { + fatalStreamError = error + }) + + const onExtensionMessage = (message: { type?: string; state?: { messageQueue?: unknown } }) => { + if (message.type !== "state") { + return + } + + const messageQueue = message.state?.messageQueue + extensionQueueDepth = Array.isArray(messageQueue) ? messageQueue.length : 0 + } + + host.on("extensionWebviewMessage", onExtensionMessage) + + try { + await waitForInitialState() + + for await (const stdinPrompt of readPromptsFromStdinLines()) { + hasReceivedStdinPrompt = true + + // Start once, then always continue via extension queue. + if (!host.client.hasActiveTask() && !startedTaskFromStdin) { + await startInitialTask(stdinPrompt) + } else { + await enqueueContinuation(stdinPrompt) + } + + if (fatalStreamError) { + throw fatalStreamError + } + } + + if (!hasReceivedStdinPrompt) { + throw new Error("no prompt provided via stdin") + } + + await pWaitFor( + () => { + if (fatalStreamError) { + throw fatalStreamError + } + + const isSettled = + !host.client.hasActiveTask() && !activeTaskPromise && extensionQueueDepth === 0 + + if (isSettled) { + return true + } + + if (host.isWaitingForInput() && extensionQueueDepth === 0) { + const currentAsk = host.client.getCurrentAsk() + + if (currentAsk === "completion_result") { + return true + } + + if (currentAsk) { + throw new Error(`stdin ended while task was waiting for input (${currentAsk})`) + } + } + + return false + }, + { interval: 50 }, + ) + } finally { + offClientError() + host.off("extensionWebviewMessage", onExtensionMessage) } } else { await host.runTask(prompt!) @@ -331,6 +466,7 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption process.stdout.write(JSON.stringify(errorEvent) + "\n") } else { console.error("[CLI] Error:", errorMessage) + if (error instanceof Error) { console.error(error.stack) }