From b564da44fb798e9e8e5c89ff912ab72a70a2ba9c Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 5 Jun 2026 21:05:34 -0700 Subject: [PATCH 1/5] Handle Ctrl-C for non-TTY unified exec --- codex-rs/core/src/unified_exec/process.rs | 15 ++- .../core/src/unified_exec/process_manager.rs | 40 +++--- .../core/src/unified_exec/process_tests.rs | 5 + codex-rs/core/tests/suite/unified_exec.rs | 116 ++++++++++++++++++ codex-rs/exec-server/src/client.rs | 25 ++++ codex-rs/exec-server/src/lib.rs | 3 + codex-rs/exec-server/src/local_process.rs | 54 +++++++- codex-rs/exec-server/src/process.rs | 3 + codex-rs/exec-server/src/protocol.rs | 18 +++ codex-rs/exec-server/src/remote_process.rs | 6 + codex-rs/exec-server/src/server/handler.rs | 10 ++ .../exec-server/src/server/process_handler.rs | 9 ++ codex-rs/exec-server/src/server/registry.rs | 8 ++ codex-rs/exec-server/tests/exec_process.rs | 64 ++++++++++ codex-rs/utils/pty/src/lib.rs | 2 + codex-rs/utils/pty/src/pipe.rs | 17 +++ codex-rs/utils/pty/src/process.rs | 22 ++++ codex-rs/utils/pty/src/process_group.rs | 43 ++++--- codex-rs/utils/pty/src/pty.rs | 22 ++++ 19 files changed, 442 insertions(+), 40 deletions(-) diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index 9671429d01d1..229e5b52f80e 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken; use crate::exec::is_likely_sandbox_denied; use codex_exec_server::ExecProcess; +use codex_exec_server::ProcessSignal as ExecServerProcessSignal; use codex_exec_server::ReadResponse as ExecReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; @@ -23,6 +24,7 @@ use codex_protocol::protocol::TruncationPolicy; use codex_sandboxing::SandboxType; use codex_utils_output_truncation::formatted_truncate_text; use codex_utils_pty::ExecCommandSession; +use codex_utils_pty::ProcessSignal as PtyProcessSignal; use codex_utils_pty::SpawnedPty; use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS; @@ -31,7 +33,6 @@ use super::head_tail_buffer::HeadTailBuffer; use super::process_state::ProcessState; const EARLY_EXIT_GRACE_PERIOD: Duration = Duration::from_millis(150); - pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync { /// Returns file descriptors that must stay open across the child `exec()`. /// @@ -212,6 +213,18 @@ impl UnifiedExecProcess { } } + pub(super) async fn interrupt(&self) -> Result<(), UnifiedExecError> { + match &self.process_handle { + ProcessHandle::Local(process_handle) => process_handle + .signal(PtyProcessSignal::Interrupt) + .map_err(|err| UnifiedExecError::process_failed(err.to_string())), + ProcessHandle::ExecServer(process_handle) => process_handle + .signal(ExecServerProcessSignal::Interrupt) + .await + .map_err(|err| UnifiedExecError::process_failed(err.to_string())), + } + } + pub(super) fn fail_and_terminate(&self, message: String) { let state = self.state_rx.borrow().clone(); if state.failure_message.is_none() { diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index cb455b113b6d..c9e6f3fdca4d 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -72,6 +72,7 @@ const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [ const NETWORK_ACCESS_DENIED_MESSAGE: &str = "Network access was denied by the Codex sandbox network proxy."; const LATE_NETWORK_DENIAL_GRACE_PERIOD: Duration = Duration::from_millis(100); +const INTERRUPT: &str = "\u{3}"; /// Test-only override for deterministic unified exec process IDs. /// @@ -617,24 +618,29 @@ impl UnifiedExecProcessManager { if !request.input.is_empty() { if !tty { - return Err(UnifiedExecError::StdinClosed); - } - match process.write(request.input.as_bytes()).await { - Ok(()) => { - // Give the remote process a brief window to react so that we are - // more likely to capture its output in the poll below. - tokio::time::sleep(Duration::from_millis(100)).await; + if request.input == INTERRUPT { + process.interrupt().await?; + } else { + return Err(UnifiedExecError::StdinClosed); } - Err(err) => { - let status = self.refresh_process_state(process_id).await; - if matches!(status, ProcessStatus::Exited { .. }) { - status_after_write = Some(status); - } else if matches!(err, UnifiedExecError::ProcessFailed { .. }) { - process.terminate(); - self.release_process_id(process_id).await; - return Err(err); - } else { - return Err(err); + } else { + match process.write(request.input.as_bytes()).await { + Ok(()) => { + // Give the remote process a brief window to react so that we are + // more likely to capture its output in the poll below. + tokio::time::sleep(Duration::from_millis(100)).await; + } + Err(err) => { + let status = self.refresh_process_state(process_id).await; + if matches!(status, ProcessStatus::Exited { .. }) { + status_after_write = Some(status); + } else if matches!(err, UnifiedExecError::ProcessFailed { .. }) { + process.terminate(); + self.release_process_id(process_id).await; + return Err(err); + } else { + return Err(err); + } } } } diff --git a/codex-rs/core/src/unified_exec/process_tests.rs b/codex-rs/core/src/unified_exec/process_tests.rs index ee0ea1cba33d..42db18ff78c4 100644 --- a/codex-rs/core/src/unified_exec/process_tests.rs +++ b/codex-rs/core/src/unified_exec/process_tests.rs @@ -5,6 +5,7 @@ use codex_exec_server::ExecProcess; use codex_exec_server::ExecProcessEventReceiver; use codex_exec_server::ExecServerError; use codex_exec_server::ProcessId; +use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteResponse; @@ -63,6 +64,10 @@ impl ExecProcess for MockExecProcess { Ok(self.write_response.clone()) } + async fn signal(&self, _signal: ProcessSignal) -> Result<(), ExecServerError> { + Ok(()) + } + async fn terminate(&self) -> Result<(), ExecServerError> { Ok(()) } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 3eb2c6a6d3bf..39a2749ce7ac 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1996,6 +1996,122 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = builder.build_with_remote_env(&server).await?; + + let start_call_id = "uexec-non-tty-interrupt-start"; + let interrupt_call_id = "uexec-non-tty-interrupt"; + + let start_args = serde_json::json!({ + "cmd": "trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done", + "yield_time_ms": 250, + "tty": false, + }); + let interrupt_args = serde_json::json!({ + "chars": "\u{3}", + "session_id": 1000, + "yield_time_ms": 1000, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + interrupt_call_id, + "write_stdin", + &serde_json::to_string(&interrupt_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-3"), + ]), + ]; + let request_log = mount_sse_sequence(&server, responses).await; + + submit_unified_exec_turn( + &test, + "interrupt non-tty unified exec", + PermissionProfile::Disabled, + ) + .await?; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = request_log.requests(); + assert!(!requests.is_empty(), "expected at least one POST request"); + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + + let start_output = outputs + .get(start_call_id) + .expect("missing start output for exec_command"); + assert_eq!( + start_output.process_id.as_deref(), + Some("1000"), + "exec_command should leave a running non-TTY session" + ); + assert!( + start_output.exit_code.is_none(), + "initial exec_command should not include exit_code while session is running" + ); + assert!( + start_output.output.contains("READY"), + "start output should include command readiness marker, got {:?}", + start_output.output + ); + + let interrupt_output = outputs + .get(interrupt_call_id) + .expect("missing interrupt output for write_stdin"); + assert!( + interrupt_output.process_id.is_none(), + "interrupted process should be cleared from the session map" + ); + assert_eq!( + interrupt_output.exit_code, + Some(42), + "interrupt should preserve the process-reported exit code" + ); + assert!( + interrupt_output.output.contains("INT-TRAP"), + "interrupt should drain output from the signal handler, got {:?}", + interrupt_output.output + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 8cf18e7d1f43..26b3e46e8859 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -35,6 +35,7 @@ use crate::protocol::EXEC_EXITED_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; use crate::protocol::EXEC_READ_METHOD; +use crate::protocol::EXEC_SIGNAL_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::EnvironmentInfo; @@ -80,8 +81,11 @@ use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -394,6 +398,23 @@ impl ExecServerClient { .await } + pub async fn signal( + &self, + process_id: &ProcessId, + signal: ProcessSignal, + ) -> Result<(), ExecServerError> { + let _response: SignalResponse = self + .call( + EXEC_SIGNAL_METHOD, + &SignalParams { + process_id: process_id.clone(), + signal, + }, + ) + .await?; + Ok(()) + } + pub async fn terminate( &self, process_id: &ProcessId, @@ -763,6 +784,10 @@ impl Session { self.client.write(&self.process_id, chunk).await } + pub(crate) async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + self.client.signal(&self.process_id, signal).await + } + pub(crate) async fn terminate(&self) -> Result<(), ExecServerError> { self.client.terminate(&self.process_id).await?; Ok(()) diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index e306ec395e7b..378d12e632e0 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -93,9 +93,12 @@ pub use protocol::HttpRequestResponse; pub use protocol::InitializeParams; pub use protocol::InitializeResponse; pub use protocol::ProcessOutputChunk; +pub use protocol::ProcessSignal; pub use protocol::ReadParams; pub use protocol::ReadResponse; pub use protocol::ShellInfo; +pub use protocol::SignalParams; +pub use protocol::SignalResponse; pub use protocol::TerminateParams; pub use protocol::TerminateResponse; pub use protocol::WriteParams; diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index bc69ec6105cf..691f807032ce 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -10,6 +10,7 @@ use codex_protocol::config_types::EnvironmentVariablePattern; use codex_protocol::config_types::ShellEnvironmentPolicy; use codex_protocol::shell_environment; use codex_utils_pty::ExecCommandSession; +use codex_utils_pty::ProcessSignal as PtyProcessSignal; use codex_utils_pty::TerminalSize; use tokio::sync::Mutex; use tokio::sync::Notify; @@ -33,8 +34,11 @@ use crate::protocol::ExecOutputStream; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -272,7 +276,6 @@ impl LocalProcess { &self, params: ReadParams, ) -> Result { - let _process_id = params.process_id.clone(); let after_seq = params.after_seq.unwrap_or(0); let max_bytes = params.max_bytes.unwrap_or(usize::MAX); let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); @@ -351,7 +354,6 @@ impl LocalProcess { &self, params: WriteParams, ) -> Result { - let _process_id = params.process_id.clone(); let _input_bytes = params.chunk.0.len(); let writer_tx = { let process_map = self.inner.processes.lock().await; @@ -383,11 +385,33 @@ impl LocalProcess { }) } + pub(crate) async fn signal_process( + &self, + params: SignalParams, + ) -> Result { + { + let process_map = self.inner.processes.lock().await; + match process_map.get(¶ms.process_id) { + Some(ProcessEntry::Running(process)) => { + if process.exit_code.is_some() { + return Ok(SignalResponse {}); + } + process + .session + .signal(pty_process_signal(params.signal)) + .map_err(|err| internal_error(format!("failed to signal process: {err}")))? + } + Some(ProcessEntry::Starting) | None => {} + } + } + + Ok(SignalResponse {}) + } + pub(crate) async fn terminate_process( &self, params: TerminateParams, ) -> Result { - let _process_id = params.process_id.clone(); let running = { let process_map = self.inner.processes.lock().await; match process_map.get(¶ms.process_id) { @@ -483,6 +507,10 @@ impl ExecProcess for LocalExecProcess { self.backend.write(&self.process_id, chunk).await } + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + self.backend.signal(&self.process_id, signal).await + } + async fn terminate(&self) -> Result<(), ExecServerError> { self.backend.terminate(&self.process_id).await } @@ -519,6 +547,20 @@ impl LocalProcess { .map_err(map_handler_error) } + async fn signal( + &self, + process_id: &ProcessId, + signal: ProcessSignal, + ) -> Result<(), ExecServerError> { + self.signal_process(SignalParams { + process_id: process_id.clone(), + signal, + }) + .await + .map_err(map_handler_error)?; + Ok(()) + } + async fn terminate(&self, process_id: &ProcessId) -> Result<(), ExecServerError> { self.terminate_process(TerminateParams { process_id: process_id.clone(), @@ -529,6 +571,12 @@ impl LocalProcess { } } +fn pty_process_signal(signal: ProcessSignal) -> PtyProcessSignal { + match signal { + ProcessSignal::Interrupt => PtyProcessSignal::Interrupt, + } +} + fn map_handler_error(error: JSONRPCErrorError) -> ExecServerError { ExecServerError::Server { code: error.code, diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index cb6c83213806..97f53aaefbc7 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -10,6 +10,7 @@ use crate::ExecServerError; use crate::ProcessId; use crate::protocol::ExecParams; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; @@ -178,6 +179,8 @@ pub trait ExecProcess: Send + Sync { async fn write(&self, chunk: Vec) -> Result; + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError>; + async fn terminate(&self) -> Result<(), ExecServerError>; } diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 6eeeeefdd73f..b85bff3577f4 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -15,6 +15,7 @@ pub const INITIALIZED_METHOD: &str = "initialized"; pub const EXEC_METHOD: &str = "process/start"; pub const EXEC_READ_METHOD: &str = "process/read"; pub const EXEC_WRITE_METHOD: &str = "process/write"; +pub const EXEC_SIGNAL_METHOD: &str = "process/signal"; pub const EXEC_TERMINATE_METHOD: &str = "process/terminate"; pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output"; pub const EXEC_EXITED_METHOD: &str = "process/exited"; @@ -166,6 +167,23 @@ pub struct WriteResponse { pub status: WriteStatus, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ProcessSignal { + Interrupt, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SignalParams { + pub process_id: ProcessId, + pub signal: ProcessSignal, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SignalResponse {} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TerminateParams { diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs index d8d06735cdb9..c7d16d4d4105 100644 --- a/codex-rs/exec-server/src/remote_process.rs +++ b/codex-rs/exec-server/src/remote_process.rs @@ -12,6 +12,7 @@ use crate::StartedExecProcess; use crate::client::LazyRemoteExecServerClient; use crate::client::Session; use crate::protocol::ExecParams; +use crate::protocol::ProcessSignal; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; @@ -76,6 +77,11 @@ impl ExecProcess for RemoteExecProcess { self.session.write(chunk).await } + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + trace!("exec process signal"); + self.session.signal(signal).await + } + async fn terminate(&self) -> Result<(), ExecServerError> { trace!("exec process terminate"); self.session.terminate().await diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index b8934705d2b8..395ce6773050 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -42,6 +42,8 @@ use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -171,6 +173,14 @@ impl ExecServerHandler { session.process().exec_write(params).await } + pub(crate) async fn signal( + &self, + params: SignalParams, + ) -> Result { + let session = self.require_initialized_for("exec")?; + session.process().signal(params).await + } + pub(crate) async fn terminate( &self, params: TerminateParams, diff --git a/codex-rs/exec-server/src/server/process_handler.rs b/codex-rs/exec-server/src/server/process_handler.rs index 38fbace1cd6f..9fced9c166aa 100644 --- a/codex-rs/exec-server/src/server/process_handler.rs +++ b/codex-rs/exec-server/src/server/process_handler.rs @@ -5,6 +5,8 @@ use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -49,6 +51,13 @@ impl ProcessHandler { self.process.exec_write(params).await } + pub(crate) async fn signal( + &self, + params: SignalParams, + ) -> Result { + self.process.signal_process(params).await + } + pub(crate) async fn terminate( &self, params: TerminateParams, diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 26d2876c6383..1f652c1c4c3d 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::protocol::ENVIRONMENT_INFO_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_READ_METHOD; +use crate::protocol::EXEC_SIGNAL_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::ExecParams; @@ -32,6 +33,7 @@ use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; use crate::protocol::ReadParams; +use crate::protocol::SignalParams; use crate::protocol::TerminateParams; use crate::protocol::WriteParams; use crate::rpc::RpcRouter; @@ -77,6 +79,12 @@ pub(crate) fn build_router() -> RpcRouter { handler.exec_write(params).await }, ); + router.request( + EXEC_SIGNAL_METHOD, + |handler: Arc, params: SignalParams| async move { + handler.signal(params).await + }, + ); router.request( EXEC_TERMINATE_METHOD, |handler: Arc, params: TerminateParams| async move { diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index e1f330fc4ed0..c480a6f8c46f 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -13,6 +13,7 @@ use codex_exec_server::ExecParams; use codex_exec_server::ExecProcess; use codex_exec_server::ExecProcessEvent; use codex_exec_server::ProcessId; +use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; @@ -505,6 +506,60 @@ async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool) Ok(()) } +async fn assert_exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let process_id = "proc-signal".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "trap 'printf \"signal:2\\n\"; exit 7' INT; printf 'ready\\n'; while :; do :; done".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let StartedExecProcess { process } = session; + let mut wake_rx = process.subscribe_wake(); + let mut ready_output = String::new(); + let mut after_seq = None; + loop { + let response = + read_process_until_change(Arc::clone(&process), &mut wake_rx, after_seq).await?; + for chunk in response.chunks { + ready_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + after_seq = Some(chunk.seq); + } + if ready_output.contains("ready\n") { + break; + } + if response.closed { + anyhow::bail!("process closed before readiness marker: {ready_output:?}"); + } + after_seq = response.next_seq.checked_sub(1).or(after_seq); + } + + process.signal(ProcessSignal::Interrupt).await?; + let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?; + + assert!( + output.contains("signal:2"), + "expected signal handler output, got {output:?}" + ); + assert_eq!(exit_code, Some(7)); + assert!(closed); + Ok(()) +} + async fn assert_exec_process_preserves_queued_events_before_subscribe( use_remote: bool, ) -> Result<()> { @@ -702,6 +757,15 @@ async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Resu assert_exec_process_rejects_write_without_pipe_stdin(use_remote).await } +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> { + assert_exec_process_signal_interrupts_process(use_remote).await +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/utils/pty/src/lib.rs b/codex-rs/utils/pty/src/lib.rs index 39fc9b5522ef..9f0cb442a244 100644 --- a/codex-rs/utils/pty/src/lib.rs +++ b/codex-rs/utils/pty/src/lib.rs @@ -17,6 +17,8 @@ pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin; pub use process::ProcessDriver; /// Handle for interacting with a spawned process (PTY or pipe). pub use process::ProcessHandle; +/// Process signal supported by spawned-process handles. +pub use process::ProcessSignal; /// Bundle of process handles plus split output and exit receivers returned by spawn helpers. pub use process::SpawnedProcess; /// Terminal size in character cells used for PTY spawn and resize operations. diff --git a/codex-rs/utils/pty/src/pipe.rs b/codex-rs/utils/pty/src/pipe.rs index 541a2ecf2f47..a5ddb0e326d8 100644 --- a/codex-rs/utils/pty/src/pipe.rs +++ b/codex-rs/utils/pty/src/pipe.rs @@ -19,6 +19,7 @@ use tokio::task::JoinHandle; use crate::process::ChildTerminator; use crate::process::ProcessHandle; +use crate::process::ProcessSignal; use crate::process::SpawnedProcess; #[cfg(target_os = "linux")] @@ -32,6 +33,22 @@ struct PipeChildTerminator { } impl ChildTerminator for PipeChildTerminator { + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + #[cfg(unix)] + { + crate::process_group::interrupt_process_group(self.process_group_id) + } + + #[cfg(not(unix))] + { + Ok(()) + } + } + } + } + fn kill(&mut self) -> io::Result<()> { #[cfg(unix)] { diff --git a/codex-rs/utils/pty/src/process.rs b/codex-rs/utils/pty/src/process.rs index 898a3d90ffa7..79f9273df7d2 100644 --- a/codex-rs/utils/pty/src/process.rs +++ b/codex-rs/utils/pty/src/process.rs @@ -17,7 +17,14 @@ use tokio::sync::watch; use tokio::task::AbortHandle; use tokio::task::JoinHandle; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ProcessSignal { + Interrupt, +} + pub(crate) trait ChildTerminator: Send + Sync { + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()>; + fn kill(&mut self) -> io::Result<()>; } @@ -193,6 +200,17 @@ impl ProcessHandle { } } + pub fn signal(&self, signal: ProcessSignal) -> io::Result<()> { + let Ok(mut killer_opt) = self.killer.lock() else { + return Ok(()); + }; + let Some(killer) = killer_opt.as_mut() else { + return Ok(()); + }; + + killer.signal(signal) + } + /// Attempts to kill the child and abort helper tasks. pub fn terminate(&self) { self.request_terminate(); @@ -232,6 +250,10 @@ struct ClosureTerminator { } impl ChildTerminator for ClosureTerminator { + fn signal(&mut self, _signal: ProcessSignal) -> io::Result<()> { + Ok(()) + } + fn kill(&mut self) -> io::Result<()> { if let Some(inner) = self.inner.as_mut() { (inner)(); diff --git a/codex-rs/utils/pty/src/process_group.rs b/codex-rs/utils/pty/src/process_group.rs index 22a934fab7e6..9d10d7c368cc 100644 --- a/codex-rs/utils/pty/src/process_group.rs +++ b/codex-rs/utils/pty/src/process_group.rs @@ -118,15 +118,10 @@ pub fn kill_process_group_by_pid(_pid: u32) -> io::Result<()> { } #[cfg(unix)] -/// Send SIGTERM to a specific process group ID (best-effort). -/// -/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and -/// `Ok(false)` when the group no longer exists. -pub fn terminate_process_group(process_group_id: u32) -> io::Result { +fn signal_process_group_id(pgid: libc::pid_t, signal: libc::c_int) -> io::Result { use std::io::ErrorKind; - let pgid = process_group_id as libc::pid_t; - let result = unsafe { libc::killpg(pgid, libc::SIGTERM) }; + let result = unsafe { libc::killpg(pgid, signal) }; if result == -1 { let err = io::Error::last_os_error(); if err.kind() == ErrorKind::NotFound || err.raw_os_error() == Some(libc::ESRCH) { @@ -138,6 +133,15 @@ pub fn terminate_process_group(process_group_id: u32) -> io::Result { Ok(true) } +#[cfg(unix)] +/// Send SIGTERM to a specific process group ID (best-effort). +/// +/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and +/// `Ok(false)` when the group no longer exists. +pub fn terminate_process_group(process_group_id: u32) -> io::Result { + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGTERM) +} + #[cfg(not(unix))] /// No-op on non-Unix platforms. pub fn terminate_process_group(_process_group_id: u32) -> io::Result { @@ -145,22 +149,23 @@ pub fn terminate_process_group(_process_group_id: u32) -> io::Result { } #[cfg(unix)] -/// Kill a specific process group ID (best-effort). -pub fn kill_process_group(process_group_id: u32) -> io::Result<()> { - use std::io::ErrorKind; - - let pgid = process_group_id as libc::pid_t; - let result = unsafe { libc::killpg(pgid, libc::SIGKILL) }; - if result == -1 { - let err = io::Error::last_os_error(); - if err.kind() != ErrorKind::NotFound && err.raw_os_error() != Some(libc::ESRCH) { - return Err(err); - } - } +/// Send SIGINT to a specific process group ID (best-effort). +pub fn interrupt_process_group(process_group_id: u32) -> io::Result<()> { + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGINT).map(|_| ()) +} +#[cfg(not(unix))] +/// No-op on non-Unix platforms. +pub fn interrupt_process_group(_process_group_id: u32) -> io::Result<()> { Ok(()) } +#[cfg(unix)] +/// Kill a specific process group ID (best-effort). +pub fn kill_process_group(process_group_id: u32) -> io::Result<()> { + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGKILL).map(|_| ()) +} + #[cfg(not(unix))] /// No-op on non-Unix platforms. pub fn kill_process_group(_process_group_id: u32) -> io::Result<()> { diff --git a/codex-rs/utils/pty/src/pty.rs b/codex-rs/utils/pty/src/pty.rs index 45c587b3287b..de1e290b45e4 100644 --- a/codex-rs/utils/pty/src/pty.rs +++ b/codex-rs/utils/pty/src/pty.rs @@ -30,6 +30,7 @@ use tokio::task::JoinHandle; use crate::process::ChildTerminator; use crate::process::ProcessHandle; +use crate::process::ProcessSignal; use crate::process::PtyHandles; use crate::process::PtyMasterHandle; use crate::process::SpawnedProcess; @@ -54,6 +55,19 @@ struct PtyChildTerminator { } impl ChildTerminator for PtyChildTerminator { + fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + #[cfg(unix)] + if let Some(process_group_id) = self.process_group_id { + return crate::process_group::interrupt_process_group(process_group_id); + } + + Ok(()) + } + } + } + fn kill(&mut self) -> std::io::Result<()> { #[cfg(unix)] if let Some(process_group_id) = self.process_group_id { @@ -81,6 +95,14 @@ struct RawPidTerminator { #[cfg(unix)] impl ChildTerminator for RawPidTerminator { + fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + crate::process_group::interrupt_process_group(self.process_group_id) + } + } + } + fn kill(&mut self) -> std::io::Result<()> { crate::process_group::kill_process_group(self.process_group_id) } From 5b5caa9431c818837eadc6d01cdd64e3b597ce9d Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 8 Jun 2026 14:35:29 -0700 Subject: [PATCH 2/5] Report unsupported process interrupts --- codex-rs/core/tests/suite/unified_exec.rs | 100 +++++++++++++++++++++ codex-rs/exec-server/tests/exec_process.rs | 62 ++++++++++++- codex-rs/utils/pty/src/pipe.rs | 2 +- codex-rs/utils/pty/src/process.rs | 13 ++- codex-rs/utils/pty/src/pty.rs | 2 +- 5 files changed, 173 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 39a2749ce7ac..0a286424b662 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -2112,6 +2112,106 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[cfg_attr(not(windows), ignore = "Windows-only unified exec interrupt test")] +async fn write_stdin_ctrl_c_reports_unsupported_interrupt_to_model_on_windows() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = builder.build_with_remote_env(&server).await?; + + let start_call_id = "uexec-windows-interrupt-start"; + let interrupt_call_id = "uexec-windows-interrupt"; + + let start_args = serde_json::json!({ + "shell": "cmd", + "cmd": "echo READY && ping -n 30 127.0.0.1 >NUL", + "yield_time_ms": 250, + "tty": false, + }); + let interrupt_args = serde_json::json!({ + "chars": "\u{3}", + "session_id": 1000, + "yield_time_ms": 1000, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + interrupt_call_id, + "write_stdin", + &serde_json::to_string(&interrupt_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-3"), + ]), + ]; + let request_log = mount_sse_sequence(&server, responses).await; + + submit_unified_exec_turn( + &test, + "interrupt non-tty unified exec on Windows", + PermissionProfile::Disabled, + ) + .await?; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let start_output = request_log + .function_call_output_text(start_call_id) + .expect("missing start output for exec_command"); + let start_output = parse_unified_exec_output(&start_output)?; + assert_eq!( + start_output.process_id.as_deref(), + Some("1000"), + "exec_command should leave a running non-TTY session" + ); + assert!( + start_output.output.contains("READY"), + "start output should include command readiness marker, got {:?}", + start_output.output + ); + + let interrupt_output = request_log + .function_call_output_text(interrupt_call_id) + .expect("missing interrupt output for write_stdin"); + assert!( + interrupt_output.contains("write_stdin failed"), + "model-visible write_stdin output should report failure, got {interrupt_output:?}" + ); + assert!( + interrupt_output.contains("process interrupt is not supported by this process backend"), + "model-visible write_stdin output should explain unsupported interrupt, got {interrupt_output:?}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index c480a6f8c46f..f9f47b653806 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -1,5 +1,3 @@ -#![cfg(unix)] - mod common; use std::sync::Arc; @@ -560,6 +558,45 @@ async fn assert_exec_process_signal_interrupts_process(use_remote: bool) -> Resu Ok(()) } +async fn assert_exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let session = context + .backend + .start(ExecParams { + process_id: ProcessId::from("proc-windows-signal"), + argv: vec![ + "cmd".to_string(), + "/C".to_string(), + "echo ready && ping -n 30 127.0.0.1 >NUL".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + + let err = session + .process + .signal(ProcessSignal::Interrupt) + .await + .expect_err("Windows non-TTY signal should report unsupported"); + let message = err.to_string(); + assert!( + message.contains("failed to signal process"), + "unexpected signal error: {message}" + ); + assert!( + message.contains("process interrupt is not supported by this process backend"), + "unexpected signal error: {message}" + ); + + session.process.terminate().await?; + Ok(()) +} + async fn assert_exec_process_preserves_queued_events_before_subscribe( use_remote: bool, ) -> Result<()> { @@ -594,6 +631,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe( } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { @@ -685,6 +723,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -694,6 +733,7 @@ async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -703,6 +743,7 @@ async fn exec_process_streams_output(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -712,6 +753,7 @@ async fn exec_process_pushes_events(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -721,6 +763,7 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()> #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -732,6 +775,7 @@ async fn exec_process_retains_output_after_exit_until_streams_close( #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -741,6 +785,7 @@ async fn exec_process_write_then_read(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -750,6 +795,7 @@ async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<() #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -759,6 +805,7 @@ async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Resu #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -768,6 +815,17 @@ async fn exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(windows), ignore = "Windows-only exec-server process test")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> { + assert_exec_process_signal_reports_unsupported_on_windows(use_remote).await +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] diff --git a/codex-rs/utils/pty/src/pipe.rs b/codex-rs/utils/pty/src/pipe.rs index a5ddb0e326d8..4bfe8e5305ac 100644 --- a/codex-rs/utils/pty/src/pipe.rs +++ b/codex-rs/utils/pty/src/pipe.rs @@ -43,7 +43,7 @@ impl ChildTerminator for PipeChildTerminator { #[cfg(not(unix))] { - Ok(()) + Err(crate::process::unsupported_signal(signal)) } } } diff --git a/codex-rs/utils/pty/src/process.rs b/codex-rs/utils/pty/src/process.rs index 79f9273df7d2..425885886b8c 100644 --- a/codex-rs/utils/pty/src/process.rs +++ b/codex-rs/utils/pty/src/process.rs @@ -22,6 +22,15 @@ pub enum ProcessSignal { Interrupt, } +pub(crate) fn unsupported_signal(signal: ProcessSignal) -> io::Error { + match signal { + ProcessSignal::Interrupt => io::Error::new( + io::ErrorKind::Unsupported, + "process interrupt is not supported by this process backend", + ), + } +} + pub(crate) trait ChildTerminator: Send + Sync { fn signal(&mut self, signal: ProcessSignal) -> io::Result<()>; @@ -250,8 +259,8 @@ struct ClosureTerminator { } impl ChildTerminator for ClosureTerminator { - fn signal(&mut self, _signal: ProcessSignal) -> io::Result<()> { - Ok(()) + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> { + Err(unsupported_signal(signal)) } fn kill(&mut self) -> io::Result<()> { diff --git a/codex-rs/utils/pty/src/pty.rs b/codex-rs/utils/pty/src/pty.rs index de1e290b45e4..73bbd76e5c74 100644 --- a/codex-rs/utils/pty/src/pty.rs +++ b/codex-rs/utils/pty/src/pty.rs @@ -63,7 +63,7 @@ impl ChildTerminator for PtyChildTerminator { return crate::process_group::interrupt_process_group(process_group_id); } - Ok(()) + Err(crate::process::unsupported_signal(signal)) } } } From 83c0b87e952501c98f96957c3df2b2efa45352f7 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 8 Jun 2026 15:00:38 -0700 Subject: [PATCH 3/5] Avoid expect_err in exec server test --- codex-rs/exec-server/tests/exec_process.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index f9f47b653806..af8e722c6667 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -578,11 +578,10 @@ async fn assert_exec_process_signal_reports_unsupported_on_windows(use_remote: b }) .await?; - let err = session - .process - .signal(ProcessSignal::Interrupt) - .await - .expect_err("Windows non-TTY signal should report unsupported"); + let err = match session.process.signal(ProcessSignal::Interrupt).await { + Ok(()) => anyhow::bail!("Windows non-TTY signal should report unsupported"), + Err(err) => err, + }; let message = err.to_string(); assert!( message.contains("failed to signal process"), From 9ff7581214ba17137fd351893766e6be3612ff05 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 9 Jun 2026 08:23:10 -0700 Subject: [PATCH 4/5] Report SIGINT exit code for unified exec --- codex-rs/core/tests/suite/unified_exec.rs | 54 +++++++++++++++++------ codex-rs/utils/pty/src/pipe.rs | 3 +- codex-rs/utils/pty/src/process.rs | 17 +++++++ codex-rs/utils/pty/src/pty.rs | 4 +- 4 files changed, 63 insertions(+), 15 deletions(-) diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 0a286424b662..885a411630e4 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1998,6 +1998,32 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { + assert_write_stdin_ctrl_c_interrupts_non_tty_session( + "trap", + "trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done", + /*expected_exit_code*/ 42, + Some("INT-TRAP"), + ) + .await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn write_stdin_ctrl_c_default_interrupt_reports_130_for_non_tty_session() -> Result<()> { + assert_write_stdin_ctrl_c_interrupts_non_tty_session( + "default", + "echo READY; exec sleep 30", + /*expected_exit_code*/ 130, + /*expected_interrupt_output*/ None, + ) + .await +} + +async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session( + test_name: &str, + command: &str, + expected_exit_code: i32, + expected_interrupt_output: Option<&str>, +) -> Result<()> { skip_if_no_network!(Ok(())); skip_if_sandbox!(Ok(())); skip_if_windows!(Ok(())); @@ -2012,11 +2038,11 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { }); let test = builder.build_with_remote_env(&server).await?; - let start_call_id = "uexec-non-tty-interrupt-start"; - let interrupt_call_id = "uexec-non-tty-interrupt"; + let start_call_id = format!("uexec-non-tty-interrupt-{test_name}-start"); + let interrupt_call_id = format!("uexec-non-tty-interrupt-{test_name}"); let start_args = serde_json::json!({ - "cmd": "trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done", + "cmd": command, "yield_time_ms": 250, "tty": false, }); @@ -2030,7 +2056,7 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { sse(vec![ ev_response_created("resp-1"), ev_function_call( - start_call_id, + &start_call_id, "exec_command", &serde_json::to_string(&start_args)?, ), @@ -2039,7 +2065,7 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { sse(vec![ ev_response_created("resp-2"), ev_function_call( - interrupt_call_id, + &interrupt_call_id, "write_stdin", &serde_json::to_string(&interrupt_args)?, ), @@ -2074,7 +2100,7 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { let outputs = collect_tool_outputs(&bodies)?; let start_output = outputs - .get(start_call_id) + .get(&start_call_id) .expect("missing start output for exec_command"); assert_eq!( start_output.process_id.as_deref(), @@ -2092,7 +2118,7 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { ); let interrupt_output = outputs - .get(interrupt_call_id) + .get(&interrupt_call_id) .expect("missing interrupt output for write_stdin"); assert!( interrupt_output.process_id.is_none(), @@ -2100,14 +2126,16 @@ async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { ); assert_eq!( interrupt_output.exit_code, - Some(42), + Some(expected_exit_code), "interrupt should preserve the process-reported exit code" ); - assert!( - interrupt_output.output.contains("INT-TRAP"), - "interrupt should drain output from the signal handler, got {:?}", - interrupt_output.output - ); + if let Some(expected_interrupt_output) = expected_interrupt_output { + assert!( + interrupt_output.output.contains(expected_interrupt_output), + "interrupt should drain output from the signal handler, got {:?}", + interrupt_output.output + ); + } Ok(()) } diff --git a/codex-rs/utils/pty/src/pipe.rs b/codex-rs/utils/pty/src/pipe.rs index 4bfe8e5305ac..3a9b62d9b7b0 100644 --- a/codex-rs/utils/pty/src/pipe.rs +++ b/codex-rs/utils/pty/src/pipe.rs @@ -21,6 +21,7 @@ use crate::process::ChildTerminator; use crate::process::ProcessHandle; use crate::process::ProcessSignal; use crate::process::SpawnedProcess; +use crate::process::exit_code_from_status; #[cfg(target_os = "linux")] use libc; @@ -226,7 +227,7 @@ async fn spawn_process_with_stdin_mode( let wait_exit_code = Arc::clone(&exit_code); let wait_handle: JoinHandle<()> = tokio::spawn(async move { let code = match child.wait().await { - Ok(status) => status.code().unwrap_or(-1), + Ok(status) => exit_code_from_status(status), Err(_) => -1, }; wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); diff --git a/codex-rs/utils/pty/src/process.rs b/codex-rs/utils/pty/src/process.rs index 425885886b8c..fdd693f43d6c 100644 --- a/codex-rs/utils/pty/src/process.rs +++ b/codex-rs/utils/pty/src/process.rs @@ -2,6 +2,7 @@ use core::fmt; use std::io; #[cfg(unix)] use std::os::fd::RawFd; +use std::process::ExitStatus; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::sync::atomic::AtomicBool; @@ -31,6 +32,22 @@ pub(crate) fn unsupported_signal(signal: ProcessSignal) -> io::Error { } } +pub(crate) fn exit_code_from_status(status: ExitStatus) -> i32 { + if let Some(code) = status.code() { + return code; + } + + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return 128 + signal; + } + } + + -1 +} + pub(crate) trait ChildTerminator: Send + Sync { fn signal(&mut self, signal: ProcessSignal) -> io::Result<()>; diff --git a/codex-rs/utils/pty/src/pty.rs b/codex-rs/utils/pty/src/pty.rs index 73bbd76e5c74..951818a80d33 100644 --- a/codex-rs/utils/pty/src/pty.rs +++ b/codex-rs/utils/pty/src/pty.rs @@ -35,6 +35,8 @@ use crate::process::PtyHandles; use crate::process::PtyMasterHandle; use crate::process::SpawnedProcess; use crate::process::TerminalSize; +#[cfg(unix)] +use crate::process::exit_code_from_status; /// Returns true when ConPTY support is available (Windows only). #[cfg(windows)] @@ -390,7 +392,7 @@ async fn spawn_process_preserving_fds( let wait_exit_code = Arc::clone(&exit_code); let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || { let code = match child.wait() { - Ok(status) => status.code().unwrap_or(-1), + Ok(status) => exit_code_from_status(status), Err(_) => -1, }; wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); From e605b381502ef535c95f6e57798cc78a08703912 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 9 Jun 2026 09:59:29 -0700 Subject: [PATCH 5/5] codex: fix CI failure on PR #26734 --- codex-rs/core/tests/suite/unified_exec.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 885a411630e4..f6a9b3a76826 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -2031,10 +2031,9 @@ async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session( let server = start_mock_server().await; let mut builder = test_codex().with_config(|config| { - config - .features - .enable(Feature::UnifiedExec) - .expect("test config should allow feature update"); + if let Err(err) = config.features.enable(Feature::UnifiedExec) { + panic!("test config should allow feature update: {err}"); + } }); let test = builder.build_with_remote_env(&server).await?; @@ -2101,7 +2100,7 @@ async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session( let start_output = outputs .get(&start_call_id) - .expect("missing start output for exec_command"); + .with_context(|| format!("missing start output for exec_command {start_call_id}"))?; assert_eq!( start_output.process_id.as_deref(), Some("1000"), @@ -2119,7 +2118,7 @@ async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session( let interrupt_output = outputs .get(&interrupt_call_id) - .expect("missing interrupt output for write_stdin"); + .with_context(|| format!("missing interrupt output for write_stdin {interrupt_call_id}"))?; assert!( interrupt_output.process_id.is_none(), "interrupted process should be cleared from the session map"