diff --git a/crates/forge_repo/src/provider/openai_responses/repository.rs b/crates/forge_repo/src/provider/openai_responses/repository.rs index ac28697650..d709972170 100644 --- a/crates/forge_repo/src/provider/openai_responses/repository.rs +++ b/crates/forge_repo/src/provider/openai_responses/repository.rs @@ -231,6 +231,16 @@ impl OpenAIResponsesProvider { .usage(usage), )))) } + Ok(super::response::ResponsesStreamEvent::ResponseCompleted { + response, + }) => Some(Ok(super::response::StreamItem::Message(Box::new( + super::response::into_response_completed_message(response), + )))), + Ok(super::response::ResponsesStreamEvent::ResponseIncomplete { + response, + }) => Some(Err(super::response::into_response_incomplete_error( + response.incomplete_details.map(|d| d.reason), + ))), Ok(super::response::ResponsesStreamEvent::Unknown(_)) => None, Ok(super::response::ResponsesStreamEvent::Response(inner)) => { Some(Ok(super::response::StreamItem::Event(inner))) @@ -310,6 +320,16 @@ impl OpenAIResponsesProvider { .usage(usage), )))) } + Ok(super::response::ResponsesStreamEvent::ResponseCompleted { + response, + }) => Some(Ok(super::response::StreamItem::Message(Box::new( + super::response::into_response_completed_message(response), + )))), + Ok(super::response::ResponsesStreamEvent::ResponseIncomplete { + response, + }) => Some(Err(super::response::into_response_incomplete_error( + response.incomplete_details.map(|d| d.reason), + ))), Ok(super::response::ResponsesStreamEvent::Unknown(_)) => None, Ok(super::response::ResponsesStreamEvent::Response(inner)) => { Some(Ok(super::response::StreamItem::Event(inner))) diff --git a/crates/forge_repo/src/provider/openai_responses/response.rs b/crates/forge_repo/src/provider/openai_responses/response.rs index 201df2ce57..2e58f824c5 100644 --- a/crates/forge_repo/src/provider/openai_responses/response.rs +++ b/crates/forge_repo/src/provider/openai_responses/response.rs @@ -43,6 +43,20 @@ pub(super) enum ResponsesStreamEvent { cost: f64, }, + /// Codex backend `response.completed` event. The Codex backend omits + /// required `oai::Response` fields (e.g. `output`) on this event, so it + /// cannot be parsed via the generic `oai::ResponseStreamEvent`. We + /// deserialize only `end_turn` (backend-only continue-turn signal); other + /// data (output items, usage) arrives via earlier streaming events. + #[serde(rename = "response.completed")] + ResponseCompleted { response: ResponseCompletedPayload }, + + /// Codex backend `response.incomplete` event. Mapped to a hard error so + /// the orchestrator stops the turn instead of looping on a truncated + /// assistant message. + #[serde(rename = "response.incomplete")] + ResponseIncomplete { response: ResponseIncompletePayload }, + /// Any standard OpenAI Responses API streaming event. #[serde(untagged)] Response(Box), @@ -82,10 +96,31 @@ where pub(super) enum StreamItem { /// A standard OpenAI Responses API streaming event. Event(Box), - /// A pre-resolved message (e.g. cost from a proxy ping event). + /// A pre-resolved message (e.g. cost from a proxy ping event, or a + /// Codex `response.completed` event already converted to its terminal + /// `ChatCompletionMessage`). Message(Box), } +/// Payload of the Codex `response.completed` event. The Codex backend omits +/// required `oai::Response` fields (e.g. `output`), so we deserialize only +/// `end_turn` (backend-only continue-turn signal). +#[derive(Debug, Deserialize)] +pub(super) struct ResponseCompletedPayload { + #[serde(default)] + pub end_turn: Option, + #[serde(default)] + pub usage: Option, +} + +/// Payload of the Codex `response.incomplete` event. Carries the +/// `incomplete_details.reason` used to produce a useful error message. +#[derive(Debug, Deserialize)] +pub(super) struct ResponseIncompletePayload { + #[serde(default)] + pub incomplete_details: Option, +} + /// Converts OpenAI Responses API usage into the domain Usage type. /// Usage is sent once in the `response.completed` event (not split across /// events). @@ -269,6 +304,32 @@ fn retain_encrypted_reasoning_details( } } +/// Builds the terminal `ChatCompletionMessage` for a `response.completed` +/// event. Deduplicates content/reasoning/tool_calls that were already streamed +/// via deltas and applies the Codex `end_turn` override when present. +pub(super) fn into_response_completed_message( + payload: ResponseCompletedPayload, +) -> ChatCompletionMessage { + let mut message = ChatCompletionMessage::default(); + if let Some(usage) = payload.usage { + message = message.usage(usage.into_domain()); + } + if payload.end_turn == Some(false) { + // Server explicitly asks to continue the turn; leave finish_reason + // unset so the orchestrator loop does not terminate. + message + } else { + message.finish_reason_opt(Some(FinishReason::Stop)) + } +} + +/// Maps a `response.incomplete` event into a hard error so the orchestrator +/// stops the turn instead of looping on a truncated assistant message. +pub(super) fn into_response_incomplete_error(reason: Option) -> anyhow::Error { + let reason = reason.unwrap_or_else(|| "unknown".to_string()); + anyhow::anyhow!("Upstream response incomplete: {reason}") +} + fn into_response_failed_error(failed: oai::ResponseFailedEvent) -> anyhow::Error { let Some(error) = failed.response.error else { return anyhow::anyhow!("Upstream response failed: no error object returned"); @@ -478,6 +539,7 @@ impl IntoDomain for BoxStream { #[cfg(test)] mod tests { use async_openai::types::responses as oai; + use pretty_assertions::assert_eq; // Type alias for ResponseStream in tests since it's not provided by // response-types @@ -1713,6 +1775,44 @@ mod tests { } } + #[test] + fn test_responses_stream_event_deserializes_codex_response_completed_without_output() { + let fixture = serde_json::json!({ + "type": "response.completed", + "response": { + "id": "resp_1", + "created_at": 1773422509, + "model": "gpt-5.3-codex-spark", + "object": "response", + "status": "completed", + "end_turn": false, + "usage": { + "input_tokens": 14900, + "output_tokens": 381, + "total_tokens": 15281, + "input_tokens_details": { "cached_tokens": 14720 }, + "output_tokens_details": { "reasoning_tokens": 317 } + } + } + }); + let actual: ResponsesStreamEvent = serde_json::from_value(fixture).unwrap(); + let expected = Usage { + prompt_tokens: TokenCount::Actual(14900), + completion_tokens: TokenCount::Actual(381), + total_tokens: TokenCount::Actual(15281), + cached_tokens: TokenCount::Actual(14720), + cost: None, + }; + + match actual { + ResponsesStreamEvent::ResponseCompleted { response } => { + assert_eq!(response.end_turn, Some(false)); + assert_eq!(response.usage.unwrap().into_domain(), expected); + } + other => panic!("Expected ResponseCompleted, got {:?}", other), + } + } + /// Simulates the Spark model's streaming pattern: function call arguments /// are sent only in the `done` event (no deltas). The stream emits: /// 1. output_item.added (function_call with empty arguments)