Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions codex-rs/app-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
//! surfaces as channel-full errors rather than unbounded memory growth.

mod path;
mod remote;

use std::error::Error;
Expand Down Expand Up @@ -58,13 +59,15 @@ pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;

pub use crate::path::AppServerPath;
pub use crate::remote::RemoteAppServerClient;
pub use crate::remote::RemoteAppServerConnectArgs;
pub use crate::remote::RemoteAppServerEndpoint;
Expand Down Expand Up @@ -845,6 +848,15 @@ impl AppServerRequestHandle {
}

impl AppServerClient {
pub fn codex_home(&self, local_codex_home: &AbsolutePathBuf) -> Option<AppServerPath> {
match self {
Self::InProcess(_) => Some(AppServerPath::from_app_server(
local_codex_home.display().to_string(),
)),
Self::Remote(client) => client.codex_home().map(AppServerPath::from_app_server),
}
}

pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
match self {
Self::InProcess(client) => client.request(request).await,
Expand Down Expand Up @@ -1110,6 +1122,7 @@ mod tests {
id: request.id,
result: serde_json::json!({
"userAgent": "codex_cli_rs/9.8.7-test (Test OS; x86_64) rust",
"codexHome": "/server/.codex",
}),
}),
)
Expand Down Expand Up @@ -1446,6 +1459,7 @@ mod tests {
.expect("remote client should connect");

assert_eq!(client.server_version(), Some("9.8.7-test"));
assert_eq!(client.codex_home(), Some("/server/.codex"));
let response: GetAccountResponse = client
.request_typed(ClientRequest::GetAccount {
request_id: RequestId::Integer(1),
Expand Down
58 changes: 58 additions & 0 deletions codex-rs/app-server-client/src/path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Paths resolved using the app-server host's platform rules.

use std::fmt;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AppServerPath(String);

impl AppServerPath {
pub fn from_app_server(path: impl Into<String>) -> Self {
Self(path.into())
}

pub fn from_absolute_str(raw: &str) -> Option<Self> {
(raw.starts_with('/') || is_windows_absolute_path(raw)).then(|| Self(raw.to_string()))
}

pub fn as_str(&self) -> &str {
&self.0
}

pub fn components(&self) -> Vec<&str> {
let separators = if is_windows_absolute_path(&self.0) {
&['/', '\\'][..]
} else {
&['/'][..]
};
self.0
.split(separators)
.filter(|part| !part.is_empty())
.collect()
Comment thread
etraut-openai marked this conversation as resolved.
}

pub fn join(&self, segment: impl AsRef<str>) -> Self {
let is_windows = is_windows_absolute_path(&self.0);
let (path, separator) = if is_windows {
(self.0.trim_end_matches(['/', '\\']), '\\')
} else {
(self.0.trim_end_matches('/'), '/')
};
Self(format!("{path}{separator}{}", segment.as_ref()))
}
}

impl fmt::Display for AppServerPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

fn is_windows_absolute_path(path: &str) -> bool {
let bytes = path.as_bytes();
(bytes.len() >= 3
&& bytes[0].is_ascii_alphabetic()
&& bytes[1] == b':'
&& matches!(bytes[2], b'\\' | b'/'))
|| path.starts_with("\\\\")
|| path.starts_with("//")
}
56 changes: 27 additions & 29 deletions codex-rs/app-server-client/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub(crate) fn websocket_url_supports_auth_token(url: &Url) -> bool {

enum RemoteClientCommand {
Request {
request: Box<ClientRequest>,
request: Box<JSONRPCRequest>,
response_tx: oneshot::Sender<IoResult<RequestResult>>,
},
Notify {
Expand All @@ -151,6 +151,7 @@ pub struct RemoteAppServerClient {
event_rx: mpsc::UnboundedReceiver<AppServerEvent>,
pending_events: VecDeque<AppServerEvent>,
server_version: Option<String>,
codex_home: Option<String>,
worker_handle: tokio::task::JoinHandle<()>,
}

Expand Down Expand Up @@ -185,6 +186,10 @@ impl RemoteAppServerClient {
self.server_version.as_deref()
}

pub fn codex_home(&self) -> Option<&str> {
self.codex_home.as_deref()
}

async fn connect_with_stream<S>(
channel_capacity: usize,
endpoint: String,
Expand All @@ -195,7 +200,7 @@ impl RemoteAppServerClient {
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut stream = stream;
let (pending_events, server_version) = initialize_remote_connection(
let (pending_events, server_version, codex_home) = initialize_remote_connection(
&mut stream,
&endpoint,
initialize_params,
Expand All @@ -218,7 +223,7 @@ impl RemoteAppServerClient {
};
match command {
RemoteClientCommand::Request { request, response_tx } => {
let request_id = request_id_from_client_request(&request);
let request_id = request.id.clone();
if pending_requests.contains_key(&request_id) {
let _ = response_tx.send(Err(IoError::new(
ErrorKind::InvalidInput,
Expand All @@ -229,7 +234,7 @@ impl RemoteAppServerClient {
pending_requests.insert(request_id.clone(), response_tx);
if let Err(err) = write_jsonrpc_message(
&mut stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
JSONRPCMessage::Request(*request),
&endpoint,
)
.await
Expand Down Expand Up @@ -472,6 +477,7 @@ impl RemoteAppServerClient {
event_rx,
pending_events: pending_events.into(),
server_version,
codex_home,
worker_handle,
})
}
Expand All @@ -483,25 +489,7 @@ impl RemoteAppServerClient {
}

pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server request channel is closed",
)
})?
self.request_handle().request(request).await
}

pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
Expand Down Expand Up @@ -613,6 +601,7 @@ impl RemoteAppServerClient {
event_rx,
pending_events: _pending_events,
server_version: _server_version,
codex_home: _codex_home,
worker_handle,
} = self;
let mut worker_handle = worker_handle;
Expand All @@ -637,6 +626,11 @@ impl RemoteAppServerClient {

impl RemoteAppServerRequestHandle {
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
self.request_json_rpc(jsonrpc_request_from_client_request(request))
.await
}

pub async fn request_json_rpc(&self, request: JSONRPCRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::Request {
Expand Down Expand Up @@ -800,13 +794,14 @@ async fn initialize_remote_connection<S>(
endpoint: &str,
params: InitializeParams,
initialize_timeout: Duration,
) -> IoResult<(Vec<AppServerEvent>, Option<String>)>
) -> IoResult<(Vec<AppServerEvent>, Option<String>, Option<String>)>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let initialize_request_id = RequestId::String("initialize".to_string());
let mut pending_events = Vec::new();
let mut server_version = None;
let mut codex_home = None;
write_jsonrpc_message(
stream,
JSONRPCMessage::Request(jsonrpc_request_from_client_request(
Expand Down Expand Up @@ -838,6 +833,12 @@ where
let (_, rest) = user_agent.split_once('/')?;
rest.split_whitespace().next().map(str::to_string)
});
codex_home = response
.result
.get("codexHome")
.and_then(serde_json::Value::as_str)
.filter(|codex_home| !codex_home.is_empty())
.map(str::to_string);
break Ok(());
}
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
Expand Down Expand Up @@ -929,7 +930,7 @@ where
)
.await?;

Ok((pending_events, server_version))
Ok((pending_events, server_version, codex_home))
}

fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
Expand All @@ -951,10 +952,6 @@ fn deliver_event(
})
}

fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
jsonrpc_request_from_client_request(request.clone()).id
}

fn jsonrpc_request_from_client_request(request: ClientRequest) -> JSONRPCRequest {
let value = match serde_json::to_value(request) {
Ok(value) => value,
Expand Down Expand Up @@ -1024,6 +1021,7 @@ mod tests {
event_rx,
pending_events: VecDeque::new(),
server_version: None,
codex_home: None,
worker_handle,
};

Expand Down
Loading
Loading