Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@ interface FetchDecompressionError extends Error {
path: string
}

function streamValue(msg: string) {
const m = /^Type validation failed: Value: (.+?)\.\nError message:/s.exec(msg)
if (!m) return
try {
return JSON.parse(m[1])
} catch {
return
}
}

function wrappedServerError(msg: string) {
if (!msg.includes("Type validation failed")) return
if (!msg.includes("invalid_union")) return
const v = streamValue(msg)
if (!v || typeof v !== "object") return
if (!("error" in v) || typeof v.error !== "object" || v.error === null) return
if ((v.error as { type?: unknown }).type !== "server_error") return
return JSON.stringify(v)
}

export namespace MessageV2 {
export const SYNTHETIC_ATTACHMENT_PROMPT = "Attached image(s) from tool result:"

Expand Down Expand Up @@ -1025,7 +1045,19 @@ export namespace MessageV2 {
{ cause: e },
).toObject()
case e instanceof Error:
return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject()
const msg = errorMessage(e)
const body = wrappedServerError(msg)
if (body) {
return new MessageV2.APIError(
{
message: "Stream failed before terminal event",
isRetryable: true,
responseBody: body,
},
{ cause: e },
).toObject()
}
return new NamedError.Unknown({ message: msg }, { cause: e }).toObject()
default:
try {
const parsed = ProviderError.parseStreamError(e)
Expand Down
25 changes: 24 additions & 1 deletion packages/opencode/test/lib/llm-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Sse = {
hang?: boolean
error?: unknown
reset?: boolean
passthrough?: boolean
}

type HttpError = {
Expand Down Expand Up @@ -582,6 +583,28 @@ export function raw(input: {
hang: input.hang,
error: input.error,
reset: input.reset,
passthrough: false,
}
}

export function rawResponses(input: {
chunks?: unknown[]
head?: unknown[]
tail?: unknown[]
wait?: PromiseLike<unknown>
hang?: boolean
error?: unknown
reset?: boolean
}): Item {
return {
type: "sse",
head: input.head ?? input.chunks ?? [],
tail: input.tail ?? [],
wait: input.wait,
hang: input.hang,
error: input.error,
reset: input.reset,
passthrough: true,
}
}

Expand Down Expand Up @@ -683,7 +706,7 @@ export class TestLLMServer extends Context.Service<TestLLMServer, TestLLMServer.
hits = [...hits, current]
yield* notify()
if (next.type !== "sse") return fail(next)
if (mode === "responses") return send(responses(next, modelFrom(body)))
if (mode === "responses") return send(next.passthrough ? next : responses(next, modelFrom(body)))
if (next.reset) {
yield* reset(next)
return HttpServerResponse.empty()
Expand Down
124 changes: 123 additions & 1 deletion packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@ import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { SessionSummary } from "../../src/session/summary"
import { Snapshot } from "../../src/snapshot"
import { Filesystem } from "../../src/util/filesystem"
import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { raw, reply, TestLLMServer } from "../lib/llm-server"
import { raw, rawResponses, reply, TestLLMServer } from "../lib/llm-server"

async function loadFixture(providerID: string, modelID: string) {
const fixturePath = path.join(import.meta.dir, "../tool/fixtures/models-api.json")
const data = await Filesystem.readJson<Record<string, Provider.Info & { models: Record<string, Provider.Model> }>>(
fixturePath,
)
const provider = data[providerID]
if (!provider) throw new Error(`Missing provider in fixture: ${providerID}`)
const model = provider.models[modelID]
if (!model) throw new Error(`Missing model in fixture: ${modelID}`)
return { provider, model }
}

Log.init({ print: false })

Expand Down Expand Up @@ -85,6 +98,43 @@ function providerCfg(url: string) {
}
}

function openaiCfg(url: string) {
return {
enabled_providers: ["openai"],
provider: {
openai: {
name: "OpenAI",
env: ["OPENAI_API_KEY"],
npm: "@ai-sdk/openai",
api: "https://api.openai.com/v1",
models: {
"gpt-5.4": {
id: "gpt-5.4",
name: "gpt-5.4",
attachment: true,
reasoning: true,
tool_call: true,
temperature: false,
release_date: "2025-01-01",
limit: { context: 400000, output: 128000 },
cost: { input: 0, output: 0 },
options: {},
},
},
options: {
apiKey: "test-openai-key",
baseURL: url,
},
},
},
}
}

const openai = {
providerID: ProviderID.make("openai"),
modelID: ModelID.make("gpt-5.4"),
}

function agent(): Agent.Info {
return {
name: "build",
Expand Down Expand Up @@ -840,3 +890,75 @@ it.live("session.processor effect tests mark interruptions aborted without manua
{ git: true, config: (url) => providerCfg(url) },
),
)

it.live("session.processor effect tests surface wrapped responses validation failures through session errors", () =>
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const seen = defer<void>()
const { processors, session, provider } = yield* boot()
const fixture = yield* Effect.promise(() => loadFixture("openai", "gpt-5.4"))
const bus = yield* Bus.Service
const sts = yield* SessionStatus.Service

yield* llm.push(
rawResponses({
chunks: [{ error: { code: "", message: "", type: "server_error" }, request_id: "" }],
}),
)
yield* llm.text("after")

const chat = yield* session.create({})
const parent = yield* user(chat.id, "wrapped error")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(openai.providerID, openai.modelID)
const errs: NonNullable<MessageV2.Assistant["error"]>[] = []
const states: number[] = []
const st = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
if (evt.properties.sessionID !== chat.id) return
if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt)
})
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
if (evt.properties.sessionID !== chat.id) return
if (!evt.properties.error) return
errs.push(evt.properties.error)
})
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: openai.providerID, modelID: fixture.model.id },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "wrapped error" }],
tools: {},
}).pipe(Effect.timeout("6 seconds"))
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
off()
st()

expect(value).toBe("continue")
expect(yield* llm.calls).toBe(2)
expect(states).toStrictEqual([1])
expect(handle.message.error).toBeUndefined()
expect(errs).toHaveLength(0)
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {
expect(stored.info.error).toBeUndefined()
}
}),
{ git: true, config: (url) => openaiCfg(url) },
),
)
Loading