Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/forge_repo/src/provider/openai_responses/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ impl<T: HttpInfra> OpenAIResponsesProvider<T> {
.usage(usage),
))))
}
Ok(super::response::ResponsesStreamEvent::ResponseCompleted {
response,
}) => Some(Ok(super::response::StreamItem::Message(Box::new(
super::response::into_response_completed_message(response),
)))),
Ok(super::response::ResponsesStreamEvent::ResponseIncomplete {
response,
}) => Some(Err(super::response::into_response_incomplete_error(
response.incomplete_details.map(|d| d.reason),
))),
Ok(super::response::ResponsesStreamEvent::Unknown(_)) => None,
Ok(super::response::ResponsesStreamEvent::Response(inner)) => {
Some(Ok(super::response::StreamItem::Event(inner)))
Expand Down Expand Up @@ -310,6 +320,16 @@ impl<T: HttpInfra> OpenAIResponsesProvider<T> {
.usage(usage),
))))
}
Ok(super::response::ResponsesStreamEvent::ResponseCompleted {
response,
}) => Some(Ok(super::response::StreamItem::Message(Box::new(
super::response::into_response_completed_message(response),
)))),
Ok(super::response::ResponsesStreamEvent::ResponseIncomplete {
response,
}) => Some(Err(super::response::into_response_incomplete_error(
response.incomplete_details.map(|d| d.reason),
))),
Ok(super::response::ResponsesStreamEvent::Unknown(_)) => None,
Ok(super::response::ResponsesStreamEvent::Response(inner)) => {
Some(Ok(super::response::StreamItem::Event(inner)))
Expand Down
102 changes: 101 additions & 1 deletion crates/forge_repo/src/provider/openai_responses/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ pub(super) enum ResponsesStreamEvent {
cost: f64,
},

/// Codex backend `response.completed` event. The Codex backend omits
/// required `oai::Response` fields (e.g. `output`) on this event, so it
/// cannot be parsed via the generic `oai::ResponseStreamEvent`. We
/// deserialize only `end_turn` (backend-only continue-turn signal); other
/// data (output items, usage) arrives via earlier streaming events.
#[serde(rename = "response.completed")]
ResponseCompleted { response: ResponseCompletedPayload },

/// Codex backend `response.incomplete` event. Mapped to a hard error so
/// the orchestrator stops the turn instead of looping on a truncated
/// assistant message.
#[serde(rename = "response.incomplete")]
ResponseIncomplete { response: ResponseIncompletePayload },

/// Any standard OpenAI Responses API streaming event.
#[serde(untagged)]
Response(Box<oai::ResponseStreamEvent>),
Expand Down Expand Up @@ -82,10 +96,31 @@ where
pub(super) enum StreamItem {
/// A standard OpenAI Responses API streaming event.
Event(Box<oai::ResponseStreamEvent>),
/// A pre-resolved message (e.g. cost from a proxy ping event).
/// A pre-resolved message (e.g. cost from a proxy ping event, or a
/// Codex `response.completed` event already converted to its terminal
/// `ChatCompletionMessage`).
Message(Box<ChatCompletionMessage>),
}

/// Payload of the Codex `response.completed` event. The Codex backend omits
/// required `oai::Response` fields (e.g. `output`), so we deserialize only
/// `end_turn` (backend-only continue-turn signal).
#[derive(Debug, Deserialize)]
pub(super) struct ResponseCompletedPayload {
#[serde(default)]
pub end_turn: Option<bool>,
#[serde(default)]
pub usage: Option<oai::ResponseUsage>,
}

/// Payload of the Codex `response.incomplete` event. Carries the
/// `incomplete_details.reason` used to produce a useful error message.
#[derive(Debug, Deserialize)]
pub(super) struct ResponseIncompletePayload {
#[serde(default)]
pub incomplete_details: Option<oai::IncompleteDetails>,
}

/// Converts OpenAI Responses API usage into the domain Usage type.
/// Usage is sent once in the `response.completed` event (not split across
/// events).
Expand Down Expand Up @@ -269,6 +304,32 @@ fn retain_encrypted_reasoning_details(
}
}

/// Builds the terminal `ChatCompletionMessage` for a `response.completed`
/// event. Deduplicates content/reasoning/tool_calls that were already streamed
/// via deltas and applies the Codex `end_turn` override when present.
pub(super) fn into_response_completed_message(
payload: ResponseCompletedPayload,
) -> ChatCompletionMessage {
let mut message = ChatCompletionMessage::default();
if let Some(usage) = payload.usage {
message = message.usage(usage.into_domain());
}
if payload.end_turn == Some(false) {
// Server explicitly asks to continue the turn; leave finish_reason
// unset so the orchestrator loop does not terminate.
message
} else {
message.finish_reason_opt(Some(FinishReason::Stop))
}
}

/// Maps a `response.incomplete` event into a hard error so the orchestrator
/// stops the turn instead of looping on a truncated assistant message.
pub(super) fn into_response_incomplete_error(reason: Option<String>) -> anyhow::Error {
let reason = reason.unwrap_or_else(|| "unknown".to_string());
anyhow::anyhow!("Upstream response incomplete: {reason}")
}

fn into_response_failed_error(failed: oai::ResponseFailedEvent) -> anyhow::Error {
let Some(error) = failed.response.error else {
return anyhow::anyhow!("Upstream response failed: no error object returned");
Expand Down Expand Up @@ -478,6 +539,7 @@ impl IntoDomain for BoxStream<StreamItem, anyhow::Error> {
#[cfg(test)]
mod tests {
use async_openai::types::responses as oai;
use pretty_assertions::assert_eq;

// Type alias for ResponseStream in tests since it's not provided by
// response-types
Expand Down Expand Up @@ -1713,6 +1775,44 @@ mod tests {
}
}

#[test]
fn test_responses_stream_event_deserializes_codex_response_completed_without_output() {
let fixture = serde_json::json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1773422509,
"model": "gpt-5.3-codex-spark",
"object": "response",
"status": "completed",
"end_turn": false,
"usage": {
"input_tokens": 14900,
"output_tokens": 381,
"total_tokens": 15281,
"input_tokens_details": { "cached_tokens": 14720 },
"output_tokens_details": { "reasoning_tokens": 317 }
}
}
});
let actual: ResponsesStreamEvent = serde_json::from_value(fixture).unwrap();
let expected = Usage {
prompt_tokens: TokenCount::Actual(14900),
completion_tokens: TokenCount::Actual(381),
total_tokens: TokenCount::Actual(15281),
cached_tokens: TokenCount::Actual(14720),
cost: None,
};

match actual {
ResponsesStreamEvent::ResponseCompleted { response } => {
assert_eq!(response.end_turn, Some(false));
assert_eq!(response.usage.unwrap().into_domain(), expected);
}
other => panic!("Expected ResponseCompleted, got {:?}", other),
}
}

/// Simulates the Spark model's streaming pattern: function call arguments
/// are sent only in the `done` event (no deltas). The stream emits:
/// 1. output_item.added (function_call with empty arguments)
Expand Down
Loading