Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion codex-rs/core/src/unified_exec/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken;

use crate::exec::is_likely_sandbox_denied;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessSignal as ExecServerProcessSignal;
use codex_exec_server::ReadResponse as ExecReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
Expand All @@ -23,6 +24,7 @@ use codex_protocol::protocol::TruncationPolicy;
use codex_sandboxing::SandboxType;
use codex_utils_output_truncation::formatted_truncate_text;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::ProcessSignal as PtyProcessSignal;
use codex_utils_pty::SpawnedPty;

use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
Expand All @@ -31,7 +33,6 @@ use super::head_tail_buffer::HeadTailBuffer;
use super::process_state::ProcessState;

const EARLY_EXIT_GRACE_PERIOD: Duration = Duration::from_millis(150);

pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
/// Returns file descriptors that must stay open across the child `exec()`.
///
Expand Down Expand Up @@ -212,6 +213,18 @@ impl UnifiedExecProcess {
}
}

pub(super) async fn interrupt(&self) -> Result<(), UnifiedExecError> {
match &self.process_handle {
ProcessHandle::Local(process_handle) => process_handle

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(found by Codex)
This only signals the spawned zsh process group. With unified-exec zsh-fork, approved commands run under EscalateServer outside that group, and the super-exec path still does not forward signal

@pakrym-oai pakrym-oai Jun 9, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll punt on zsh-fork for now.

.signal(PtyProcessSignal::Interrupt)
.map_err(|err| UnifiedExecError::process_failed(err.to_string())),
ProcessHandle::ExecServer(process_handle) => process_handle
.signal(ExecServerProcessSignal::Interrupt)
.await
.map_err(|err| UnifiedExecError::process_failed(err.to_string())),
Comment thread
pakrym-oai marked this conversation as resolved.
}
}

pub(super) fn fail_and_terminate(&self, message: String) {
let state = self.state_rx.borrow().clone();
if state.failure_message.is_none() {
Expand Down
40 changes: 23 additions & 17 deletions codex-rs/core/src/unified_exec/process_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
const NETWORK_ACCESS_DENIED_MESSAGE: &str =
"Network access was denied by the Codex sandbox network proxy.";
const LATE_NETWORK_DENIAL_GRACE_PERIOD: Duration = Duration::from_millis(100);
const INTERRUPT: &str = "\u{3}";

/// Test-only override for deterministic unified exec process IDs.
///
Expand Down Expand Up @@ -617,24 +618,29 @@ impl UnifiedExecProcessManager {

if !request.input.is_empty() {
if !tty {
return Err(UnifiedExecError::StdinClosed);
}
match process.write(request.input.as_bytes()).await {
Ok(()) => {
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
if request.input == INTERRUPT {
process.interrupt().await?;
} else {
return Err(UnifiedExecError::StdinClosed);
}
Err(err) => {
let status = self.refresh_process_state(process_id).await;
if matches!(status, ProcessStatus::Exited { .. }) {
status_after_write = Some(status);
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
process.terminate();
self.release_process_id(process_id).await;
return Err(err);
} else {
return Err(err);
} else {
match process.write(request.input.as_bytes()).await {
Ok(()) => {
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(err) => {
let status = self.refresh_process_state(process_id).await;
if matches!(status, ProcessStatus::Exited { .. }) {
status_after_write = Some(status);
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
process.terminate();
self.release_process_id(process_id).await;
return Err(err);
} else {
return Err(err);
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/core/src/unified_exec/process_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use codex_exec_server::ExecProcess;
use codex_exec_server::ExecProcessEventReceiver;
use codex_exec_server::ExecServerError;
use codex_exec_server::ProcessId;
use codex_exec_server::ProcessSignal;
use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteResponse;
Expand Down Expand Up @@ -63,6 +64,10 @@ impl ExecProcess for MockExecProcess {
Ok(self.write_response.clone())
}

async fn signal(&self, _signal: ProcessSignal) -> Result<(), ExecServerError> {
Ok(())
}

async fn terminate(&self) -> Result<(), ExecServerError> {
Ok(())
}
Expand Down
243 changes: 243 additions & 0 deletions codex-rs/core/tests/suite/unified_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,249 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> {
assert_write_stdin_ctrl_c_interrupts_non_tty_session(
"trap",
"trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done",
/*expected_exit_code*/ 42,
Some("INT-TRAP"),
)
.await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_ctrl_c_default_interrupt_reports_130_for_non_tty_session() -> Result<()> {
assert_write_stdin_ctrl_c_interrupts_non_tty_session(
"default",
"echo READY; exec sleep 30",
/*expected_exit_code*/ 130,
/*expected_interrupt_output*/ None,
)
.await
}

async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session(
test_name: &str,
command: &str,
expected_exit_code: i32,
expected_interrupt_output: Option<&str>,
) -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));

let server = start_mock_server().await;

let mut builder = test_codex().with_config(|config| {
if let Err(err) = config.features.enable(Feature::UnifiedExec) {
panic!("test config should allow feature update: {err}");
}
});
let test = builder.build_with_remote_env(&server).await?;

let start_call_id = format!("uexec-non-tty-interrupt-{test_name}-start");
let interrupt_call_id = format!("uexec-non-tty-interrupt-{test_name}");

let start_args = serde_json::json!({
"cmd": command,
"yield_time_ms": 250,
"tty": false,
});
let interrupt_args = serde_json::json!({
"chars": "\u{3}",
"session_id": 1000,
"yield_time_ms": 1000,
});

let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
&start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
&interrupt_call_id,
"write_stdin",
&serde_json::to_string(&interrupt_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;

submit_unified_exec_turn(
&test,
"interrupt non-tty unified exec",
PermissionProfile::Disabled,
)
.await?;

wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;

let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();

let outputs = collect_tool_outputs(&bodies)?;

let start_output = outputs
.get(&start_call_id)
.with_context(|| format!("missing start output for exec_command {start_call_id}"))?;
assert_eq!(
start_output.process_id.as_deref(),
Some("1000"),
"exec_command should leave a running non-TTY session"
);
assert!(
start_output.exit_code.is_none(),
"initial exec_command should not include exit_code while session is running"
);
assert!(
start_output.output.contains("READY"),
"start output should include command readiness marker, got {:?}",
start_output.output
);

let interrupt_output = outputs
.get(&interrupt_call_id)
.with_context(|| format!("missing interrupt output for write_stdin {interrupt_call_id}"))?;
assert!(
interrupt_output.process_id.is_none(),
"interrupted process should be cleared from the session map"
);
assert_eq!(
interrupt_output.exit_code,
Some(expected_exit_code),
"interrupt should preserve the process-reported exit code"
);
if let Some(expected_interrupt_output) = expected_interrupt_output {
assert!(
interrupt_output.output.contains(expected_interrupt_output),
"interrupt should drain output from the signal handler, got {:?}",
interrupt_output.output
);
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg_attr(not(windows), ignore = "Windows-only unified exec interrupt test")]
async fn write_stdin_ctrl_c_reports_unsupported_interrupt_to_model_on_windows() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));

let server = start_mock_server().await;

let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;

let start_call_id = "uexec-windows-interrupt-start";
let interrupt_call_id = "uexec-windows-interrupt";

let start_args = serde_json::json!({
"shell": "cmd",
"cmd": "echo READY && ping -n 30 127.0.0.1 >NUL",
"yield_time_ms": 250,
"tty": false,
});
let interrupt_args = serde_json::json!({
"chars": "\u{3}",
"session_id": 1000,
"yield_time_ms": 1000,
});

let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
interrupt_call_id,
"write_stdin",
&serde_json::to_string(&interrupt_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;

submit_unified_exec_turn(
&test,
"interrupt non-tty unified exec on Windows",
PermissionProfile::Disabled,
)
.await?;

wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;

let start_output = request_log
.function_call_output_text(start_call_id)
.expect("missing start output for exec_command");
let start_output = parse_unified_exec_output(&start_output)?;
assert_eq!(
start_output.process_id.as_deref(),
Some("1000"),
"exec_command should leave a running non-TTY session"
);
assert!(
start_output.output.contains("READY"),
"start output should include command readiness marker, got {:?}",
start_output.output
);

let interrupt_output = request_log
.function_call_output_text(interrupt_call_id)
.expect("missing interrupt output for write_stdin");
assert!(
interrupt_output.contains("write_stdin failed"),
"model-visible write_stdin output should report failure, got {interrupt_output:?}"
);
assert!(
interrupt_output.contains("process interrupt is not supported by this process backend"),
"model-visible write_stdin output should explain unsupported interrupt, got {interrupt_output:?}"
);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
Expand Down
Loading
Loading