From aee456ef18c23bd2227f9e42d04caf35b17b4512 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:08:48 +0000 Subject: [PATCH 1/6] Document app-server backpressure --- codex-rs/app-server/README.md | 6 + codex-rs/app-server/src/error_code.rs | 1 + codex-rs/app-server/src/lib.rs | 106 +++++++++++++++-- codex-rs/app-server/src/transport.rs | 158 +++++++++++++++++++++++--- 4 files changed, 248 insertions(+), 23 deletions(-) diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 66d4a501ec5f..c40693c601a3 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -28,6 +28,12 @@ Supported transports: Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads. +Backpressure behavior: + +- The server uses bounded queues between transport ingress, request processing, and outbound writes. +- When request ingress is saturated, new requests are rejected with a JSON-RPC error code `-32001` and message `"Server overloaded; retry later."`. +- Clients should treat this as retryable and use exponential backoff with jitter. + ## Message Schema Currently, you can dump a TypeScript version of the schema using `codex app-server generate-ts`, or a JSON Schema bundle via `codex app-server generate-json-schema`. Each output is specific to the version of Codex you used to run the command, so the generated artifacts are guaranteed to match that version. diff --git a/codex-rs/app-server/src/error_code.rs b/codex-rs/app-server/src/error_code.rs index 1ffd889d404b..ca93b2f2d335 100644 --- a/codex-rs/app-server/src/error_code.rs +++ b/codex-rs/app-server/src/error_code.rs @@ -1,2 +1,3 @@ pub(crate) const INVALID_REQUEST_ERROR_CODE: i64 = -32600; pub(crate) const INTERNAL_ERROR_CODE: i64 = -32603; +pub(crate) const OVERLOADED_ERROR_CODE: i64 = -32001; diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ad049ad30550..0de4ddbf8e6d 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -21,6 +21,7 @@ use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessageSender; use crate::transport::CHANNEL_CAPACITY; use crate::transport::ConnectionState; +use crate::transport::OutboundConnectionState; use crate::transport::TransportEvent; use crate::transport::has_initialized_connections; use crate::transport::route_outgoing_envelope; @@ -37,6 +38,7 @@ use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use toml::Value as TomlValue; use tracing::error; @@ -61,6 +63,29 @@ mod transport; pub use crate::transport::AppServerTransport; +/// Control-plane messages from the processor/transport side to the outbound router task. +/// +/// `run_main_with_transport` now uses two loops/tasks: +/// - processor loop: handles incoming JSON-RPC and request dispatch +/// - outbound loop: performs potentially slow writes to per-connection writers +/// +/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable +/// connection state directly. In particular, the outbound loop needs to know: +/// - when a connection opens/closes so it can route messages correctly +/// - when a connection becomes initialized so broadcast semantics remain unchanged +enum OutboundControlEvent { + /// Register a new writer for an opened connection. + Opened { + connection_id: ConnectionId, + writer: mpsc::Sender, + ready: oneshot::Sender<()>, + }, + /// Remove state for a closed/disconnected connection. + Closed { connection_id: ConnectionId }, + /// Mark the connection as initialized, enabling broadcast delivery. + Initialized { connection_id: ConnectionId }, +} + fn config_warning_from_error( summary: impl Into, err: &std::io::Error, @@ -197,6 +222,8 @@ pub async fn run_main_with_transport( let (transport_event_tx, mut transport_event_rx) = mpsc::channel::(CHANNEL_CAPACITY); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outbound_control_tx, mut outbound_control_rx) = + mpsc::channel::(CHANNEL_CAPACITY); let mut stdio_handles = Vec::>::new(); let mut websocket_accept_handle = None; @@ -336,8 +363,47 @@ pub async fn run_main_with_transport( } } + let outbound_handle = tokio::spawn(async move { + let mut outbound_connections = HashMap::::new(); + loop { + tokio::select! { + envelope = outgoing_rx.recv() => { + let Some(envelope) = envelope else { + break; + }; + route_outgoing_envelope(&mut outbound_connections, envelope).await; + } + event = outbound_control_rx.recv() => { + let Some(event) = event else { + break; + }; + match event { + OutboundControlEvent::Opened { + connection_id, + writer, + ready, + } => { + outbound_connections.insert(connection_id, OutboundConnectionState::new(writer)); + let _ = ready.send(()); + } + OutboundControlEvent::Closed { connection_id } => { + outbound_connections.remove(&connection_id); + } + OutboundControlEvent::Initialized { connection_id } => { + if let Some(connection_state) = outbound_connections.get_mut(&connection_id) { + connection_state.initialized = true; + } + } + } + } + } + } + info!("outbound router task exited (channel closed)"); + }); + let processor_handle = tokio::spawn({ let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outbound_control_tx = outbound_control_tx; let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone(); let loader_overrides = loader_overrides_for_config_api; let mut processor = MessageProcessor::new(MessageProcessorArgs { @@ -362,9 +428,31 @@ pub async fn run_main_with_transport( }; match event { TransportEvent::ConnectionOpened { connection_id, writer } => { - connections.insert(connection_id, ConnectionState::new(writer)); + let (ready_tx, ready_rx) = oneshot::channel(); + if outbound_control_tx + .send(OutboundControlEvent::Opened { + connection_id, + writer: writer.clone(), + ready: ready_tx, + }) + .await + .is_err() + { + break; + } + if ready_rx.await.is_err() { + break; + } + connections.insert(connection_id, ConnectionState::new()); } TransportEvent::ConnectionClosed { connection_id } => { + if outbound_control_tx + .send(OutboundControlEvent::Closed { connection_id }) + .await + .is_err() + { + break; + } connections.remove(&connection_id); if shutdown_when_no_connections && connections.is_empty() { break; @@ -377,6 +465,7 @@ pub async fn run_main_with_transport( warn!("dropping request from unknown connection: {:?}", connection_id); continue; }; + let was_initialized = connection_state.session.initialized; processor .process_request( connection_id, @@ -384,6 +473,14 @@ pub async fn run_main_with_transport( &mut connection_state.session, ) .await; + if !was_initialized && connection_state.session.initialized { + let send_result = outbound_control_tx + .send(OutboundControlEvent::Initialized { connection_id }) + .await; + if send_result.is_err() { + break; + } + } } JSONRPCMessage::Response(response) => { processor.process_response(response).await; @@ -398,12 +495,6 @@ pub async fn run_main_with_transport( } } } - envelope = outgoing_rx.recv() => { - let Some(envelope) = envelope else { - break; - }; - route_outgoing_envelope(&mut connections, envelope).await; - } created = thread_created_rx.recv(), if listen_for_threads => { match created { Ok(thread_id) => { @@ -433,6 +524,7 @@ pub async fn run_main_with_transport( drop(transport_event_tx); let _ = processor_handle.await; + let _ = outbound_handle.await; if let Some(handle) = websocket_accept_handle { handle.abort(); diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 39fd13212cf7..80a7c3424ec9 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -1,8 +1,12 @@ +use crate::error_code::OVERLOADED_ERROR_CODE; use crate::message_processor::ConnectionSessionState; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; +use crate::outgoing_message::OutgoingError; use crate::outgoing_message::OutgoingMessage; +use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCRequest; use futures::SinkExt; use futures::StreamExt; use owo_colors::OwoColorize; @@ -140,15 +144,27 @@ pub(crate) enum TransportEvent { } pub(crate) struct ConnectionState { - pub(crate) writer: mpsc::Sender, pub(crate) session: ConnectionSessionState, } impl ConnectionState { + pub(crate) fn new() -> Self { + Self { + session: ConnectionSessionState::default(), + } + } +} + +pub(crate) struct OutboundConnectionState { + pub(crate) writer: mpsc::Sender, + pub(crate) initialized: bool, +} + +impl OutboundConnectionState { pub(crate) fn new(writer: mpsc::Sender) -> Self { Self { writer, - session: ConnectionSessionState::default(), + initialized: false, } } } @@ -159,6 +175,7 @@ pub(crate) async fn start_stdio_connection( ) -> IoResult<()> { let connection_id = ConnectionId(0); let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let writer_tx_for_reader = writer_tx.clone(); transport_event_tx .send(TransportEvent::ConnectionOpened { connection_id, @@ -178,11 +195,10 @@ pub(crate) async fn start_stdio_connection( Ok(Some(line)) => { if !forward_incoming_message( &transport_event_tx_for_reader, + &writer_tx_for_reader, connection_id, &line, - ) - .await - { + ) { break; } } @@ -267,6 +283,7 @@ async fn run_websocket_connection( }; let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let writer_tx_for_reader = writer_tx.clone(); if transport_event_tx .send(TransportEvent::ConnectionOpened { connection_id, @@ -295,7 +312,12 @@ async fn run_websocket_connection( incoming_message = websocket_reader.next() => { match incoming_message { Some(Ok(WebSocketMessage::Text(text))) => { - if !forward_incoming_message(&transport_event_tx, connection_id, &text).await { + if !forward_incoming_message( + &transport_event_tx, + &writer_tx_for_reader, + connection_id, + &text, + ) { break; } } @@ -324,19 +346,14 @@ async fn run_websocket_connection( .await; } -async fn forward_incoming_message( +fn forward_incoming_message( transport_event_tx: &mpsc::Sender, + writer: &mpsc::Sender, connection_id: ConnectionId, payload: &str, ) -> bool { match serde_json::from_str::(payload) { - Ok(message) => transport_event_tx - .send(TransportEvent::IncomingMessage { - connection_id, - message, - }) - .await - .is_ok(), + Ok(message) => enqueue_incoming_message(transport_event_tx, writer, connection_id, message), Err(err) => { error!("Failed to deserialize JSONRPCMessage: {err}"); true @@ -344,6 +361,50 @@ async fn forward_incoming_message( } } +fn enqueue_incoming_message( + transport_event_tx: &mpsc::Sender, + writer: &mpsc::Sender, + connection_id: ConnectionId, + message: JSONRPCMessage, +) -> bool { + let event = TransportEvent::IncomingMessage { + connection_id, + message, + }; + match transport_event_tx.try_send(event) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Closed(_)) => false, + Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { + connection_id, + message: JSONRPCMessage::Request(request), + })) => { + if writer + .try_send(overloaded_error_for_request(request)) + .is_err() + { + warn!("failed to enqueue overload response for connection: {connection_id:?}"); + } + true + } + Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { .. })) => { + warn!("dropping non-request incoming message because processor queue is full"); + true + } + Err(mpsc::error::TrySendError::Full(_)) => true, + } +} + +fn overloaded_error_for_request(request: JSONRPCRequest) -> OutgoingMessage { + OutgoingMessage::Error(OutgoingError { + id: request.id, + error: JSONRPCErrorError { + code: OVERLOADED_ERROR_CODE, + message: "Server overloaded; retry later.".to_string(), + data: None, + }, + }) +} + fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option { let value = match serde_json::to_value(outgoing_message) { Ok(value) => value, @@ -362,7 +423,7 @@ fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option, + connections: &mut HashMap, envelope: OutgoingEnvelope, ) { match envelope { @@ -385,7 +446,7 @@ pub(crate) async fn route_outgoing_envelope( let target_connections: Vec = connections .iter() .filter_map(|(connection_id, connection_state)| { - if connection_state.session.initialized { + if connection_state.initialized { Some(*connection_id) } else { None @@ -416,7 +477,9 @@ pub(crate) fn has_initialized_connections( #[cfg(test)] mod tests { use super::*; + use crate::error_code::OVERLOADED_ERROR_CODE; use pretty_assertions::assert_eq; + use serde_json::json; #[test] fn app_server_transport_parses_stdio_listen_url() { @@ -456,4 +519,67 @@ mod tests { "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" ); } + + #[tokio::test] + async fn enqueue_incoming_request_returns_overload_error_when_queue_is_full() { + let connection_id = ConnectionId(42); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(1); + let (writer_tx, mut writer_rx) = mpsc::channel(1); + + let first_message = + JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: first_message.clone(), + }) + .await + .expect("queue should accept first message"); + + let request = JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest { + id: codex_app_server_protocol::RequestId::Integer(7), + method: "config/read".to_string(), + params: Some(json!({ "includeLayers": false })), + }); + assert!(enqueue_incoming_message( + &transport_event_tx, + &writer_tx, + connection_id, + request + )); + + let queued_event = transport_event_rx + .recv() + .await + .expect("first event should stay queued"); + match queued_event { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + message, + } => { + assert_eq!(queued_connection_id, connection_id); + assert_eq!(message, first_message); + } + _ => panic!("expected queued incoming message"), + } + + let overload = writer_rx + .recv() + .await + .expect("request should receive overload error"); + let overload_json = serde_json::to_value(overload).expect("serialize overload error"); + assert_eq!( + overload_json, + json!({ + "id": 7, + "error": { + "code": OVERLOADED_ERROR_CODE, + "message": "Server overloaded; retry later." + } + }) + ); + } } From f742019bec0c91db03663256b0c8c3f3df099e4a Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:15:50 +0000 Subject: [PATCH 2/6] Discuss outbound routing fixes --- codex-rs/app-server/src/lib.rs | 21 +++- codex-rs/app-server/src/message_processor.rs | 16 +-- codex-rs/app-server/src/transport.rs | 123 ++++++++++++++----- 3 files changed, 120 insertions(+), 40 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 0de4ddbf8e6d..3d78dcdadf60 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -83,7 +83,10 @@ enum OutboundControlEvent { /// Remove state for a closed/disconnected connection. Closed { connection_id: ConnectionId }, /// Mark the connection as initialized, enabling broadcast delivery. - Initialized { connection_id: ConnectionId }, + Initialized { + connection_id: ConnectionId, + ready: oneshot::Sender<()>, + }, } fn config_warning_from_error( @@ -389,10 +392,14 @@ pub async fn run_main_with_transport( OutboundControlEvent::Closed { connection_id } => { outbound_connections.remove(&connection_id); } - OutboundControlEvent::Initialized { connection_id } => { + OutboundControlEvent::Initialized { + connection_id, + ready, + } => { if let Some(connection_state) = outbound_connections.get_mut(&connection_id) { connection_state.initialized = true; } + let _ = ready.send(()); } } } @@ -474,12 +481,20 @@ pub async fn run_main_with_transport( ) .await; if !was_initialized && connection_state.session.initialized { + let (ready_tx, ready_rx) = oneshot::channel(); let send_result = outbound_control_tx - .send(OutboundControlEvent::Initialized { connection_id }) + .send(OutboundControlEvent::Initialized { + connection_id, + ready: ready_tx, + }) .await; if send_result.is_err() { break; } + if ready_rx.await.is_err() { + break; + } + processor.send_initialize_notifications().await; } } JSONRPCMessage::Response(response) => { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 26da44df3113..fdb981cff1db 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -286,14 +286,6 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; session.initialized = true; - for notification in self.config_warnings.iter().cloned() { - self.outgoing - .send_server_notification(ServerNotification::ConfigWarning( - notification, - )) - .await; - } - return; } } @@ -381,6 +373,14 @@ impl MessageProcessor { self.codex_message_processor.thread_created_receiver() } + pub(crate) async fn send_initialize_notifications(&self) { + for notification in self.config_warnings.iter().cloned() { + self.outgoing + .send_server_notification(ServerNotification::ConfigWarning(notification)) + .await; + } + } + pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) { self.codex_message_processor .try_attach_thread_listener(thread_id) diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 80a7c3424ec9..e4155cf4635a 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -6,7 +6,6 @@ use crate::outgoing_message::OutgoingError; use crate::outgoing_message::OutgoingMessage; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCRequest; use futures::SinkExt; use futures::StreamExt; use owo_colors::OwoColorize; @@ -198,7 +197,9 @@ pub(crate) async fn start_stdio_connection( &writer_tx_for_reader, connection_id, &line, - ) { + ) + .await + { break; } } @@ -317,7 +318,9 @@ async fn run_websocket_connection( &writer_tx_for_reader, connection_id, &text, - ) { + ) + .await + { break; } } @@ -346,14 +349,16 @@ async fn run_websocket_connection( .await; } -fn forward_incoming_message( +async fn forward_incoming_message( transport_event_tx: &mpsc::Sender, writer: &mpsc::Sender, connection_id: ConnectionId, payload: &str, ) -> bool { match serde_json::from_str::(payload) { - Ok(message) => enqueue_incoming_message(transport_event_tx, writer, connection_id, message), + Ok(message) => { + enqueue_incoming_message(transport_event_tx, writer, connection_id, message).await + } Err(err) => { error!("Failed to deserialize JSONRPCMessage: {err}"); true @@ -361,7 +366,7 @@ fn forward_incoming_message( } } -fn enqueue_incoming_message( +async fn enqueue_incoming_message( transport_event_tx: &mpsc::Sender, writer: &mpsc::Sender, connection_id: ConnectionId, @@ -379,32 +384,24 @@ fn enqueue_incoming_message( message: JSONRPCMessage::Request(request), })) => { if writer - .try_send(overloaded_error_for_request(request)) + .try_send(OutgoingMessage::Error(OutgoingError { + id: request.id, + error: JSONRPCErrorError { + code: OVERLOADED_ERROR_CODE, + message: "Server overloaded; retry later.".to_string(), + data: None, + }, + })) .is_err() { warn!("failed to enqueue overload response for connection: {connection_id:?}"); } true } - Err(mpsc::error::TrySendError::Full(TransportEvent::IncomingMessage { .. })) => { - warn!("dropping non-request incoming message because processor queue is full"); - true - } - Err(mpsc::error::TrySendError::Full(_)) => true, + Err(mpsc::error::TrySendError::Full(event)) => transport_event_tx.send(event).await.is_ok(), } } -fn overloaded_error_for_request(request: JSONRPCRequest) -> OutgoingMessage { - OutgoingMessage::Error(OutgoingError { - id: request.id, - error: JSONRPCErrorError { - code: OVERLOADED_ERROR_CODE, - message: "Server overloaded; retry later.".to_string(), - data: None, - }, - }) -} - fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option { let value = match serde_json::to_value(outgoing_message) { Ok(value) => value, @@ -544,12 +541,9 @@ mod tests { method: "config/read".to_string(), params: Some(json!({ "includeLayers": false })), }); - assert!(enqueue_incoming_message( - &transport_event_tx, - &writer_tx, - connection_id, - request - )); + assert!( + enqueue_incoming_message(&transport_event_tx, &writer_tx, connection_id, request).await + ); let queued_event = transport_event_rx .recv() @@ -582,4 +576,75 @@ mod tests { }) ); } + + #[tokio::test] + async fn enqueue_incoming_response_waits_instead_of_dropping_when_queue_is_full() { + let connection_id = ConnectionId(42); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(1); + let (writer_tx, _writer_rx) = mpsc::channel(1); + + let first_message = + JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: first_message.clone(), + }) + .await + .expect("queue should accept first message"); + + let response = JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse { + id: codex_app_server_protocol::RequestId::Integer(7), + result: json!({"ok": true}), + }); + let transport_event_tx_for_enqueue = transport_event_tx.clone(); + let writer_tx_for_enqueue = writer_tx.clone(); + let enqueue_handle = tokio::spawn(async move { + enqueue_incoming_message( + &transport_event_tx_for_enqueue, + &writer_tx_for_enqueue, + connection_id, + response, + ) + .await + }); + + let queued_event = transport_event_rx + .recv() + .await + .expect("first event should be dequeued"); + match queued_event { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + message, + } => { + assert_eq!(queued_connection_id, connection_id); + assert_eq!(message, first_message); + } + _ => panic!("expected queued incoming message"), + } + + let enqueue_result = enqueue_handle.await.expect("enqueue task should not panic"); + assert!(enqueue_result); + + let forwarded_event = transport_event_rx + .recv() + .await + .expect("response should be forwarded instead of dropped"); + match forwarded_event { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + message: + JSONRPCMessage::Response(codex_app_server_protocol::JSONRPCResponse { id, result }), + } => { + assert_eq!(queued_connection_id, connection_id); + assert_eq!(id, codex_app_server_protocol::RequestId::Integer(7)); + assert_eq!(result, json!({"ok": true})); + } + _ => panic!("expected forwarded response message"), + } + } } From bd89f60348adda35e19685550174977749f15a8e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:24:07 +0000 Subject: [PATCH 3/6] Plan outbound router ordering fixes --- codex-rs/app-server/src/lib.rs | 18 +++++++++++++++++- codex-rs/app-server/src/transport.rs | 8 ++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 3d78dcdadf60..fed9314f2597 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -366,6 +366,7 @@ pub async fn run_main_with_transport( } } + let transport_event_tx_for_outbound = transport_event_tx.clone(); let outbound_handle = tokio::spawn(async move { let mut outbound_connections = HashMap::::new(); loop { @@ -374,7 +375,22 @@ pub async fn run_main_with_transport( let Some(envelope) = envelope else { break; }; - route_outgoing_envelope(&mut outbound_connections, envelope).await; + let disconnected_connections = + route_outgoing_envelope(&mut outbound_connections, envelope).await; + let mut should_exit = false; + for connection_id in disconnected_connections { + if transport_event_tx_for_outbound + .send(TransportEvent::ConnectionClosed { connection_id }) + .await + .is_err() + { + should_exit = true; + break; + } + } + if should_exit { + break; + } } event = outbound_control_rx.recv() => { let Some(event) = event else { diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index e4155cf4635a..ae25720474c6 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -422,7 +422,8 @@ fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option, envelope: OutgoingEnvelope, -) { +) -> Vec { + let mut disconnected = Vec::new(); match envelope { OutgoingEnvelope::ToConnection { connection_id, @@ -433,10 +434,11 @@ pub(crate) async fn route_outgoing_envelope( "dropping message for disconnected connection: {:?}", connection_id ); - return; + return disconnected; }; if connection_state.writer.send(message).await.is_err() { connections.remove(&connection_id); + disconnected.push(connection_id); } } OutgoingEnvelope::Broadcast { message } => { @@ -457,10 +459,12 @@ pub(crate) async fn route_outgoing_envelope( }; if connection_state.writer.send(message.clone()).await.is_err() { connections.remove(&connection_id); + disconnected.push(connection_id); } } } } + disconnected } pub(crate) fn has_initialized_connections( From 9a64991e6908b324376480718e8e037d2c27cefe Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:38:45 +0000 Subject: [PATCH 4/6] Plan outbound routing fixes --- codex-rs/app-server/src/lib.rs | 45 ++++++-------------- codex-rs/app-server/src/message_processor.rs | 4 ++ codex-rs/app-server/src/transport.rs | 45 ++++++++++++-------- 3 files changed, 44 insertions(+), 50 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index fed9314f2597..b20aaa53128d 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -13,6 +13,7 @@ use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; @@ -70,23 +71,18 @@ pub use crate::transport::AppServerTransport; /// - outbound loop: performs potentially slow writes to per-connection writers /// /// `OutboundControlEvent` keeps those loops coordinated without sharing mutable -/// connection state directly. In particular, the outbound loop needs to know: -/// - when a connection opens/closes so it can route messages correctly -/// - when a connection becomes initialized so broadcast semantics remain unchanged +/// connection state directly. In particular, the outbound loop needs to know +/// when a connection opens/closes so it can route messages correctly. enum OutboundControlEvent { /// Register a new writer for an opened connection. Opened { connection_id: ConnectionId, writer: mpsc::Sender, + initialized: Arc, ready: oneshot::Sender<()>, }, /// Remove state for a closed/disconnected connection. Closed { connection_id: ConnectionId }, - /// Mark the connection as initialized, enabling broadcast delivery. - Initialized { - connection_id: ConnectionId, - ready: oneshot::Sender<()>, - }, } fn config_warning_from_error( @@ -400,23 +396,18 @@ pub async fn run_main_with_transport( OutboundControlEvent::Opened { connection_id, writer, + initialized, ready, } => { - outbound_connections.insert(connection_id, OutboundConnectionState::new(writer)); + outbound_connections.insert( + connection_id, + OutboundConnectionState::new(writer, initialized), + ); let _ = ready.send(()); } OutboundControlEvent::Closed { connection_id } => { outbound_connections.remove(&connection_id); } - OutboundControlEvent::Initialized { - connection_id, - ready, - } => { - if let Some(connection_state) = outbound_connections.get_mut(&connection_id) { - connection_state.initialized = true; - } - let _ = ready.send(()); - } } } } @@ -451,11 +442,13 @@ pub async fn run_main_with_transport( }; match event { TransportEvent::ConnectionOpened { connection_id, writer } => { + let outbound_initialized = Arc::new(AtomicBool::new(false)); let (ready_tx, ready_rx) = oneshot::channel(); if outbound_control_tx .send(OutboundControlEvent::Opened { connection_id, writer: writer.clone(), + initialized: Arc::clone(&outbound_initialized), ready: ready_tx, }) .await @@ -466,7 +459,7 @@ pub async fn run_main_with_transport( if ready_rx.await.is_err() { break; } - connections.insert(connection_id, ConnectionState::new()); + connections.insert(connection_id, ConnectionState::new(outbound_initialized)); } TransportEvent::ConnectionClosed { connection_id } => { if outbound_control_tx @@ -494,22 +487,10 @@ pub async fn run_main_with_transport( connection_id, request, &mut connection_state.session, + &connection_state.outbound_initialized, ) .await; if !was_initialized && connection_state.session.initialized { - let (ready_tx, ready_rx) = oneshot::channel(); - let send_result = outbound_control_tx - .send(OutboundControlEvent::Initialized { - connection_id, - ready: ready_tx, - }) - .await; - if send_result.is_err() { - break; - } - if ready_rx.await.is_err() { - break; - } processor.send_initialize_notifications().await; } } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fdb981cff1db..b962d52902ec 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1,6 +1,8 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use crate::codex_message_processor::CodexMessageProcessor; use crate::codex_message_processor::CodexMessageProcessorArgs; @@ -191,6 +193,7 @@ impl MessageProcessor { connection_id: ConnectionId, request: JSONRPCRequest, session: &mut ConnectionSessionState, + outbound_initialized: &AtomicBool, ) { let request_id = ConnectionRequestId { connection_id, @@ -286,6 +289,7 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; session.initialized = true; + outbound_initialized.store(true, Ordering::Release); return; } } diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index ae25720474c6..702c28ed695d 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -17,6 +17,7 @@ use std::io::Result as IoResult; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use tokio::io::AsyncBufReadExt; @@ -143,27 +144,29 @@ pub(crate) enum TransportEvent { } pub(crate) struct ConnectionState { + pub(crate) outbound_initialized: Arc, pub(crate) session: ConnectionSessionState, } impl ConnectionState { - pub(crate) fn new() -> Self { + pub(crate) fn new(outbound_initialized: Arc) -> Self { Self { + outbound_initialized, session: ConnectionSessionState::default(), } } } pub(crate) struct OutboundConnectionState { + pub(crate) initialized: Arc, pub(crate) writer: mpsc::Sender, - pub(crate) initialized: bool, } impl OutboundConnectionState { - pub(crate) fn new(writer: mpsc::Sender) -> Self { + pub(crate) fn new(writer: mpsc::Sender, initialized: Arc) -> Self { Self { + initialized, writer, - initialized: false, } } } @@ -383,20 +386,26 @@ async fn enqueue_incoming_message( connection_id, message: JSONRPCMessage::Request(request), })) => { - if writer - .try_send(OutgoingMessage::Error(OutgoingError { - id: request.id, - error: JSONRPCErrorError { - code: OVERLOADED_ERROR_CODE, - message: "Server overloaded; retry later.".to_string(), - data: None, - }, - })) - .is_err() - { - warn!("failed to enqueue overload response for connection: {connection_id:?}"); + let overload_error = OutgoingMessage::Error(OutgoingError { + id: request.id, + error: JSONRPCErrorError { + code: OVERLOADED_ERROR_CODE, + message: "Server overloaded; retry later.".to_string(), + data: None, + }, + }); + match writer.try_send(overload_error) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Closed(_)) => false, + Err(mpsc::error::TrySendError::Full(overload_error)) => { + if writer.send(overload_error).await.is_err() { + warn!("failed to send overload response for connection: {connection_id:?}"); + false + } else { + true + } + } } - true } Err(mpsc::error::TrySendError::Full(event)) => transport_event_tx.send(event).await.is_ok(), } @@ -445,7 +454,7 @@ pub(crate) async fn route_outgoing_envelope( let target_connections: Vec = connections .iter() .filter_map(|(connection_id, connection_state)| { - if connection_state.initialized { + if connection_state.initialized.load(Ordering::Acquire) { Some(*connection_id) } else { None From 66f9f631a219e303489c1a455fbb1d4a0d49c58e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 14:46:51 +0000 Subject: [PATCH 5/6] Fix app-server outbound connection --- codex-rs/app-server/src/lib.rs | 52 ++++++++++------------ codex-rs/app-server/src/transport.rs | 64 +++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 37 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index b20aaa53128d..c09595428c4d 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -39,7 +39,6 @@ use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; use tokio::sync::mpsc; -use tokio::sync::oneshot; use tokio::task::JoinHandle; use toml::Value as TomlValue; use tracing::error; @@ -79,7 +78,6 @@ enum OutboundControlEvent { connection_id: ConnectionId, writer: mpsc::Sender, initialized: Arc, - ready: oneshot::Sender<()>, }, /// Remove state for a closed/disconnected connection. Closed { connection_id: ConnectionId }, @@ -367,6 +365,27 @@ pub async fn run_main_with_transport( let mut outbound_connections = HashMap::::new(); loop { tokio::select! { + biased; + event = outbound_control_rx.recv() => { + let Some(event) = event else { + break; + }; + match event { + OutboundControlEvent::Opened { + connection_id, + writer, + initialized, + } => { + outbound_connections.insert( + connection_id, + OutboundConnectionState::new(writer, initialized), + ); + } + OutboundControlEvent::Closed { connection_id } => { + outbound_connections.remove(&connection_id); + } + } + } envelope = outgoing_rx.recv() => { let Some(envelope) = envelope else { break; @@ -388,28 +407,6 @@ pub async fn run_main_with_transport( break; } } - event = outbound_control_rx.recv() => { - let Some(event) = event else { - break; - }; - match event { - OutboundControlEvent::Opened { - connection_id, - writer, - initialized, - ready, - } => { - outbound_connections.insert( - connection_id, - OutboundConnectionState::new(writer, initialized), - ); - let _ = ready.send(()); - } - OutboundControlEvent::Closed { connection_id } => { - outbound_connections.remove(&connection_id); - } - } - } } } info!("outbound router task exited (channel closed)"); @@ -443,22 +440,17 @@ pub async fn run_main_with_transport( match event { TransportEvent::ConnectionOpened { connection_id, writer } => { let outbound_initialized = Arc::new(AtomicBool::new(false)); - let (ready_tx, ready_rx) = oneshot::channel(); if outbound_control_tx .send(OutboundControlEvent::Opened { connection_id, - writer: writer.clone(), + writer, initialized: Arc::clone(&outbound_initialized), - ready: ready_tx, }) .await .is_err() { break; } - if ready_rx.await.is_err() { - break; - } connections.insert(connection_id, ConnectionState::new(outbound_initialized)); } TransportEvent::ConnectionClosed { connection_id } => { diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 702c28ed695d..d70eb3ffd8d8 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -397,13 +397,12 @@ async fn enqueue_incoming_message( match writer.try_send(overload_error) { Ok(()) => true, Err(mpsc::error::TrySendError::Closed(_)) => false, - Err(mpsc::error::TrySendError::Full(overload_error)) => { - if writer.send(overload_error).await.is_err() { - warn!("failed to send overload response for connection: {connection_id:?}"); - false - } else { - true - } + Err(mpsc::error::TrySendError::Full(_overload_error)) => { + warn!( + "dropping overload response for connection {:?}: outbound queue is full", + connection_id + ); + true } } } @@ -660,4 +659,55 @@ mod tests { _ => panic!("expected forwarded response message"), } } + + #[tokio::test] + async fn enqueue_incoming_request_does_not_block_when_writer_queue_is_full() { + let connection_id = ConnectionId(42); + let (transport_event_tx, _transport_event_rx) = mpsc::channel(1); + let (writer_tx, mut writer_rx) = mpsc::channel(1); + + transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: JSONRPCMessage::Notification( + codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }, + ), + }) + .await + .expect("transport queue should accept first message"); + + writer_tx + .send(OutgoingMessage::Notification( + crate::outgoing_message::OutgoingNotification { + method: "queued".to_string(), + params: None, + }, + )) + .await + .expect("writer queue should accept first message"); + + let request = JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest { + id: codex_app_server_protocol::RequestId::Integer(7), + method: "config/read".to_string(), + params: Some(json!({ "includeLayers": false })), + }); + + let enqueue_result = tokio::time::timeout( + std::time::Duration::from_millis(100), + enqueue_incoming_message(&transport_event_tx, &writer_tx, connection_id, request), + ) + .await + .expect("enqueue should not block while writer queue is full"); + assert!(enqueue_result); + + let queued_outgoing = writer_rx + .recv() + .await + .expect("writer queue should still contain original message"); + let queued_json = serde_json::to_value(queued_outgoing).expect("serialize queued message"); + assert_eq!(queued_json, json!({ "method": "queued" })); + } } From 5cba950f2514059b869b3504a8e8d6bb6aa17ee3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 15:00:35 +0000 Subject: [PATCH 6/6] Review app-server request loop --- codex-rs/app-server/src/lib.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index c09595428c4d..0b23fed74ed7 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -9,6 +9,7 @@ use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::ConfigLayerStackOrdering; use codex_core::config_loader::LoaderOverrides; use std::collections::HashMap; +use std::collections::VecDeque; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; @@ -363,6 +364,7 @@ pub async fn run_main_with_transport( let transport_event_tx_for_outbound = transport_event_tx.clone(); let outbound_handle = tokio::spawn(async move { let mut outbound_connections = HashMap::::new(); + let mut pending_closed_connections = VecDeque::::new(); loop { tokio::select! { biased; @@ -392,20 +394,23 @@ pub async fn run_main_with_transport( }; let disconnected_connections = route_outgoing_envelope(&mut outbound_connections, envelope).await; - let mut should_exit = false; - for connection_id in disconnected_connections { - if transport_event_tx_for_outbound - .send(TransportEvent::ConnectionClosed { connection_id }) - .await - .is_err() - { - should_exit = true; - break; - } + pending_closed_connections.extend(disconnected_connections); + } + } + + while let Some(connection_id) = pending_closed_connections.front().copied() { + match transport_event_tx_for_outbound + .try_send(TransportEvent::ConnectionClosed { connection_id }) + { + Ok(()) => { + pending_closed_connections.pop_front(); } - if should_exit { + Err(mpsc::error::TrySendError::Full(_)) => { break; } + Err(mpsc::error::TrySendError::Closed(_)) => { + return; + } } } }