Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"build:extension": "pnpm --filter roo-cline bundle",
"dev": "ROO_AUTH_BASE_URL=https://app.roocode.com ROO_SDK_BASE_URL=https://cloud-api.roocode.com ROO_CODE_PROVIDER_URL=https://api.roocode.com/proxy tsx src/index.ts",
"dev:local": "ROO_AUTH_BASE_URL=http://localhost:3000 ROO_SDK_BASE_URL=http://localhost:3001 ROO_CODE_PROVIDER_URL=http://localhost:8080/proxy tsx src/index.ts",
"dev:test-stdin": "tsx scripts/test-stdin-stream.ts",
"clean": "rimraf dist .turbo"
},
"dependencies": {
Expand Down
67 changes: 67 additions & 0 deletions apps/cli/scripts/test-stdin-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import path from "path"
import { fileURLToPath } from "url"
import readline from "readline"

import { execa } from "execa"

const __dirname = path.dirname(fileURLToPath(import.meta.url))
const cliRoot = path.resolve(__dirname, "..")

async function main() {
const child = execa(
"pnpm",
["dev", "--print", "--stdin-prompt-stream", "--provider", "roo", "--output-format", "stream-json"],
{
cwd: cliRoot,
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
reject: false,
forceKillAfterDelay: 2_000,
},
)

child.stdout?.on("data", (chunk) => process.stdout.write(chunk))
child.stderr?.on("data", (chunk) => process.stderr.write(chunk))

console.log("[wrapper] Type a message and press Enter to send it.")
console.log("[wrapper] Type /exit to close stdin and let the CLI finish.")

const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: true,
})

rl.on("line", (line) => {
if (line.trim() === "/exit") {
console.log("[wrapper] Closing stdin...")
child.stdin?.end()
rl.close()
return
}

if (!child.stdin?.destroyed) {
child.stdin?.write(`${line}\n`)
}
})

const onSignal = (signal: NodeJS.Signals) => {
console.log(`[wrapper] Received ${signal}, forwarding to CLI...`)
rl.close()
child.kill(signal)
}

process.on("SIGINT", () => onSignal("SIGINT"))
process.on("SIGTERM", () => onSignal("SIGTERM"))

const result = await child
rl.close()
console.log(`[wrapper] CLI exited with code ${result.exitCode}`)
process.exit(result.exitCode ?? 1)
}

main().catch((error) => {
console.error("[wrapper] Fatal error:", error)
process.exit(1)
})
22 changes: 17 additions & 5 deletions apps/cli/src/agent/json-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import type { ClineMessage } from "@roo-code/types"
import type { JsonEvent, JsonEventCost, JsonFinalOutput } from "@/types/json-events.js"

import type { ExtensionClient } from "./extension-client.js"
import type { TaskCompletedEvent } from "./events.js"
import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js"
import { AgentLoopState } from "./agent-state.js"

/**
* Options for JsonEventEmitter.
Expand Down Expand Up @@ -108,10 +109,11 @@ export class JsonEventEmitter {
// Subscribe to message events
const unsubMessage = client.on("message", (msg) => this.handleMessage(msg, false))
const unsubMessageUpdated = client.on("messageUpdated", (msg) => this.handleMessage(msg, true))
const unsubStateChange = client.on("stateChange", (event) => this.handleStateChange(event))
const unsubTaskCompleted = client.on("taskCompleted", (event) => this.handleTaskCompleted(event))
const unsubError = client.on("error", (error) => this.handleError(error))

this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubTaskCompleted, unsubError)
this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubStateChange, unsubTaskCompleted, unsubError)

// Emit init event
this.emitEvent({
Expand All @@ -121,6 +123,16 @@ export class JsonEventEmitter {
})
}

private handleStateChange(event: AgentStateChangeEvent): void {
// Only treat the next say:text as a prompt echo when a new task starts.
if (
event.previousState.state === AgentLoopState.NO_TASK &&
event.currentState.state !== AgentLoopState.NO_TASK
) {
this.expectPromptEchoAsUser = true
}
}

/**
* Detach from the client and clean up subscriptions.
*/
Expand Down Expand Up @@ -257,6 +269,9 @@ export class JsonEventEmitter {
case "user_feedback":
case "user_feedback_diff":
this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
if (isDone) {
this.expectPromptEchoAsUser = false
}
break

case "api_req_started": {
Expand Down Expand Up @@ -387,9 +402,6 @@ export class JsonEventEmitter {
if (this.mode === "json") {
this.outputFinalResult(event.success, resultContent)
}

// Next task in the same process starts with a new echoed prompt.
this.expectPromptEchoAsUser = true
}

/**
Expand Down
148 changes: 142 additions & 6 deletions apps/cli/src/commands/cli/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createInterface } from "readline"
import { fileURLToPath } from "url"

import { createElement } from "react"
import pWaitFor from "p-wait-for"

import { setLogger } from "@roo-code/vscode-shim"

Expand Down Expand Up @@ -306,15 +307,149 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption

if (useStdinPromptStream) {
let hasReceivedStdinPrompt = false
// stdin stream mode may start at most one task in this process.
let startedTaskFromStdin = false
let activeTaskPromise: Promise<void> | null = null
let fatalStreamError: Error | null = null
// Extension-owned queue depth mirrored from state pushes.
// CLI does not maintain its own prompt queue.
let extensionQueueDepth = 0

const waitForInitialState = async () => {
// Give the extension a brief chance to publish initial state so
// we can continue an existing task instead of creating a new one.
await pWaitFor(
() => {
if (fatalStreamError) {
throw fatalStreamError
}

return host.client.isInitialized()
},
{ interval: 25, timeout: 2_000 },
).catch(() => {
// Best-effort wait only; continuing preserves previous behavior.
})

if (fatalStreamError) {
throw fatalStreamError
}
}

for await (const stdinPrompt of readPromptsFromStdinLines()) {
hasReceivedStdinPrompt = true
await host.runTask(stdinPrompt)
jsonEmitter?.clear()
const waitForActiveTask = async () => {
await pWaitFor(
() => {
if (fatalStreamError) {
throw fatalStreamError
}

if (!host.client.hasActiveTask()) {
if (!activeTaskPromise && startedTaskFromStdin) {
throw new Error("task is no longer active; cannot continue conversation from stdin")
}

return false
}

return true
},
{ interval: 25, timeout: 5_000 },
)
}

if (!hasReceivedStdinPrompt) {
throw new Error("no prompt provided via stdin")
const startInitialTask = async (taskPrompt: string) => {
startedTaskFromStdin = true

activeTaskPromise = host
.runTask(taskPrompt)
.catch((error) => {
fatalStreamError = error instanceof Error ? error : new Error(String(error))
})
.finally(() => {
activeTaskPromise = null
})

await waitForActiveTask()
}

const enqueueContinuation = async (text: string) => {
if (!host.client.hasActiveTask()) {
await waitForActiveTask()
}

// Delegate ordering/drain behavior to the extension message queue.
host.sendToExtension({ type: "queueMessage", text })
}

const offClientError = host.client.on("error", (error) => {
fatalStreamError = error
})

const onExtensionMessage = (message: { type?: string; state?: { messageQueue?: unknown } }) => {
if (message.type !== "state") {
return
}

const messageQueue = message.state?.messageQueue
extensionQueueDepth = Array.isArray(messageQueue) ? messageQueue.length : 0
}

host.on("extensionWebviewMessage", onExtensionMessage)

try {
await waitForInitialState()

for await (const stdinPrompt of readPromptsFromStdinLines()) {
hasReceivedStdinPrompt = true

// Start once, then always continue via extension queue.
if (!host.client.hasActiveTask() && !startedTaskFromStdin) {
await startInitialTask(stdinPrompt)
} else {
await enqueueContinuation(stdinPrompt)
}

if (fatalStreamError) {
throw fatalStreamError
}
}

if (!hasReceivedStdinPrompt) {
throw new Error("no prompt provided via stdin")
}

await pWaitFor(
() => {
if (fatalStreamError) {
throw fatalStreamError
}

const isSettled =
!host.client.hasActiveTask() && !activeTaskPromise && extensionQueueDepth === 0

if (isSettled) {
return true
}

if (host.isWaitingForInput() && extensionQueueDepth === 0) {
const currentAsk = host.client.getCurrentAsk()

if (currentAsk === "completion_result") {
return true
}

if (currentAsk) {
throw new Error(`stdin ended while task was waiting for input (${currentAsk})`)
}
}

return false
},
{ interval: 50 },
)
} finally {
offClientError()
host.off("extensionWebviewMessage", onExtensionMessage)
}
} else {
await host.runTask(prompt!)
Expand All @@ -331,6 +466,7 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption
process.stdout.write(JSON.stringify(errorEvent) + "\n")
} else {
console.error("[CLI] Error:", errorMessage)

if (error instanceof Error) {
console.error(error.stack)
}
Expand Down
Loading