From 6d48a9be46a644c4b8142bc4194cebe94c5ed85e Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 5 Jun 2026 21:32:48 -0400 Subject: [PATCH 1/2] feat(sdk): support per-request timeout to TCP/QUIC/WebSocket clients --- core/common/src/error/iggy_error.rs | 3 + core/common/src/types/args/mod.rs | 14 ++ .../quic_config/quic_client_config.rs | 4 + .../quic_config/quic_client_config_builder.rs | 19 ++ .../tcp_config/tcp_client_config.rs | 4 + .../tcp_config/tcp_client_config_builder.rs | 19 ++ .../websocket_client_config.rs | 4 + .../websocket_client_config_builder.rs | 19 ++ .../tests/cli/general/test_help_command.rs | 5 + core/sdk/src/client_provider.rs | 3 + core/sdk/src/clients/client_builder.rs | 18 ++ core/sdk/src/quic/quic_client.rs | 132 +++++----- core/sdk/src/tcp/tcp_client.rs | 212 +++++++++------- core/sdk/src/websocket/websocket_client.rs | 233 +++++++++--------- examples/rust/src/shared/args.rs | 5 + 15 files changed, 436 insertions(+), 258 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 816d8fac57..0bf203cd1a 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -16,6 +16,7 @@ // under the License. use crate::Identifier; +use crate::IggyDuration; use crate::utils::topic_size::MaxTopicSize; use crate::{IggyMessage, utils::byte_size::IggyByteSize}; use std::sync::Arc; @@ -502,6 +503,8 @@ pub enum IggyError { CannotBindToSocket(String) = 12000, #[error("Task execution timeout")] TaskTimeout = 12001, + #[error("Request timeout after {0}")] + RequestTimeout(IggyDuration) = 12002, #[error("IO error: {0}")] IoError(String) = 13000, diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index f60b509740..f590da907c 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -218,6 +218,13 @@ pub struct ArgsOptional { #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub websocket_reconnection_interval: Option, + + /// The optional per-request timeout for send/receive operations + /// + /// [default: "300s"] + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub request_timeout: Option, } /// The arguments used by the `ClientProviderConfig` to create a client. @@ -351,6 +358,9 @@ pub struct Args { /// The optional TLS validate certificate for the WebSocket transport pub websocket_tls_validate_certificate: bool, + + /// The per-request timeout for send/receive operations + pub request_timeout: String, } const QUIC_TRANSPORT: &str = "quic"; @@ -416,6 +426,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "300s".to_string(), } } } @@ -517,6 +528,9 @@ impl From> for Args { { args.websocket_reconnection_interval = websocket_reconnection_interval; } + if let Some(request_timeout) = optional_args.request_timeout { + args.request_timeout = request_timeout; + } } args diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index b6e2b95e52..f186281770 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -54,6 +54,8 @@ pub struct QuicClientConfig { pub validate_certificate: bool, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, } impl Default for QuicClientConfig { @@ -64,6 +66,7 @@ impl Default for QuicClientConfig { server_name: "localhost".to_string(), auto_login: AutoLogin::Disabled, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), reconnection: QuicClientReconnectionConfig::default(), response_buffer_size: 1000 * 1000 * 10, max_concurrent_bidi_streams: 10000, @@ -96,6 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs index 9af4e85024..62ec1a2431 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs @@ -152,6 +152,12 @@ impl QuicClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Finalizes the builder and returns the `QuicClientConfig`. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -164,6 +170,7 @@ impl QuicClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -193,4 +200,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = QuicClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index 86997c6d7a..dd76ac85dc 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -41,6 +41,8 @@ pub struct TcpClientConfig { pub reconnection: TcpClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// Disable Nagle algorithm for the TCP socket. pub nodelay: bool, } @@ -54,6 +56,7 @@ impl Default for TcpClientConfig { tls_ca_file: None, tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), nodelay: false, @@ -73,6 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs index 6f665a5777..54944ad4a1 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs @@ -101,6 +101,12 @@ impl TcpClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the TCP client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -114,6 +120,7 @@ impl TcpClientConfigBuilder { mod tests { use super::*; use crate::IggyError; + use std::str::FromStr; fn builder_with_address(addr: &str) -> TcpClientConfigBuilder { let mut builder = TcpClientConfigBuilder::default(); @@ -187,4 +194,16 @@ mod tests { let builder = builder_with_address("iggy-server:8090"); assert!(builder.build().is_ok()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = TcpClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index d93c168a95..f757eab24a 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -33,6 +33,8 @@ pub struct WebSocketClientConfig { pub reconnection: WebSocketClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// WebSocket-specific configuration. pub ws_config: WebSocketConfig, /// Whether tls is enabled @@ -70,6 +72,7 @@ impl Default for WebSocketClientConfig { auto_login: AutoLogin::Disabled, reconnection: WebSocketClientReconnectionConfig::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: false, tls_domain: "localhost".to_string(), @@ -156,6 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs index 626c0e683e..1b1f16d0ba 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs @@ -136,6 +136,12 @@ impl WebSocketClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the WebSocket client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -148,6 +154,7 @@ impl WebSocketClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -177,4 +184,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = WebSocketClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/integration/tests/cli/general/test_help_command.rs b/core/integration/tests/cli/general/test_help_command.rs index 91a55099b8..5da76ae228 100644 --- a/core/integration/tests/cli/general/test_help_command.rs +++ b/core/integration/tests/cli/general/test_help_command.rs @@ -190,6 +190,11 @@ Options: {CLAP_INDENT} [default: "1s"] + --request-timeout + The optional per-request timeout for send/receive operations +{CLAP_INDENT} + [default: "300s"] + -q, --quiet Quiet mode (disabled stdout printing) diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 423b7048ea..60f55e1f9b 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -120,6 +120,7 @@ impl ClientProviderConfig { keep_alive_interval: args.quic_keep_alive_interval, max_idle_timeout: args.quic_max_idle_timeout, validate_certificate: args.quic_validate_certificate, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), })); } TransportProtocol::Http => { @@ -148,6 +149,7 @@ impl ClientProviderConfig { ) .unwrap(), }, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, @@ -181,6 +183,7 @@ impl ClientProviderConfig { } else { AutoLogin::Disabled }, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: args.websocket_tls_enabled, tls_domain: args.websocket_tls_domain, diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 0af473b554..f07e6db5fa 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -219,6 +219,12 @@ impl TcpClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with TCP configuration. pub fn build(self) -> Result { let client = TcpClient::create(Arc::new(self.config.build()?))?; @@ -277,6 +283,12 @@ impl QuicClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with QUIC configuration. pub fn build(self) -> Result { let client = QuicClient::create(Arc::new(self.config.build()?))?; @@ -390,6 +402,12 @@ impl WebSocketClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with WebSocket configuration. pub fn build(self) -> Result { let client = WebSocketClient::create(Arc::new(self.config.build()?))?; diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 2aacab7185..d9017039dc 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -642,78 +642,86 @@ impl QuicClient { let connection = self.connection.clone(); let response_buffer_size = self.config.response_buffer_size; + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { - let connection = connection.lock().await; - if let Some(connection) = connection.as_ref() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { - error!("Failed to open a bidirectional stream: {error}"); - IggyError::QuicError - })?; - trace!("Sending a QUIC request with code: {code}"); - #[cfg(feature = "vsr")] - trace!( - "Sending a QUIC VSR request of size {} with code: {code}", - request_size - ); - #[cfg(feature = "vsr")] - send.write_all(bytemuck::bytes_of(&request_header)) - .await - .map_err(|error| { - error!("Failed to write VSR request header: {error}"); + let io = async { + let connection = connection.lock().await; + if let Some(connection) = connection.as_ref() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { + error!("Failed to open a bidirectional stream: {error}"); + IggyError::QuicError + })?; + trace!("Sending a QUIC request with code: {code}"); + #[cfg(feature = "vsr")] + trace!( + "Sending a QUIC VSR request of size {} with code: {code}", + request_size + ); + #[cfg(feature = "vsr")] + send.write_all(bytemuck::bytes_of(&request_header)) + .await + .map_err(|error| { + error!("Failed to write VSR request header: {error}"); + IggyError::QuicError + })?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + send.write_all(&payload).await.map_err(|error| { + error!("Failed to write VSR request payload: {error}"); + IggyError::QuicError + })?; + } + #[cfg(feature = "vsr")] + send.finish().map_err(|error| { + error!("Failed to finish VSR request stream: {error}"); IggyError::QuicError })?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { + #[cfg(not(feature = "vsr"))] + send.write_all(&(payload_length as u32).to_le_bytes()) + .await + .map_err(|error| { + error!("Failed to write payload length: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] + send.write_all(&code.to_le_bytes()).await.map_err(|error| { + error!("Failed to write payload code: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] send.write_all(&payload).await.map_err(|error| { - error!("Failed to write VSR request payload: {error}"); + error!("Failed to write payload: {error}"); IggyError::QuicError })?; - } - #[cfg(feature = "vsr")] - send.finish().map_err(|error| { - error!("Failed to finish VSR request stream: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&(payload_length as u32).to_le_bytes()) - .await - .map_err(|error| { - error!("Failed to write payload length: {error}"); + #[cfg(not(feature = "vsr"))] + send.finish().map_err(|error| { + error!("Failed to finish sending data: {error}"); IggyError::QuicError })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&code.to_le_bytes()).await.map_err(|error| { - error!("Failed to write payload code: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&payload).await.map_err(|error| { - error!("Failed to write payload: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.finish().map_err(|error| { - error!("Failed to finish sending data: {error}"); - IggyError::QuicError - })?; - trace!("Sent a QUIC request with code: {code}, waiting for a response..."); - return QuicClient::handle_response(&mut recv, response_buffer_size as usize).await; - } + trace!("Sent a QUIC request with code: {code}, waiting for a response..."); + return QuicClient::handle_response(&mut recv, response_buffer_size as usize) + .await; + } - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + tokio::time::timeout(request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(request_timeout))? }) .await .map_err(|e| { @@ -968,6 +976,10 @@ mod tests { quic_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + quic_client_config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(quic_client_config.reconnection.enabled); assert!(quic_client_config.reconnection.max_retries.is_none()); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 8868b622ab..423e4096fe 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -105,7 +105,6 @@ impl Client for TcpClient { } } -#[async_trait] #[async_trait] impl BinaryTransport for TcpClient { async fn get_state(&self) -> ClientState { @@ -663,111 +662,118 @@ impl TcpClient { } let stream = self.stream.clone(); + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { - let mut stream = stream.lock().await; - if let Some(stream) = stream.as_mut() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(feature = "vsr")] - trace!( - "Sending a TCP VSR request of size {} with code: {code}", - request_size - ); - #[cfg(not(feature = "vsr"))] - trace!("Sending a TCP request of size {payload_length} with code: {code}"); - #[cfg(feature = "vsr")] - stream.write(bytemuck::bytes_of(&request_header)).await?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { + let io = async { + let mut stream = stream.lock().await; + if let Some(stream) = stream.as_mut() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(feature = "vsr")] + trace!( + "Sending a TCP VSR request of size {} with code: {code}", + request_size + ); + #[cfg(not(feature = "vsr"))] + trace!("Sending a TCP request of size {payload_length} with code: {code}"); + #[cfg(feature = "vsr")] + stream.write(bytemuck::bytes_of(&request_header)).await?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + stream.write(&payload).await?; + } + #[cfg(not(feature = "vsr"))] + stream.write(&(payload_length as u32).to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] + stream.write(&code.to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] stream.write(&payload).await?; - } - #[cfg(not(feature = "vsr"))] - stream.write(&(payload_length as u32).to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&code.to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&payload).await?; - stream.flush().await?; - trace!("Sent a TCP request with code: {code}, waiting for a response..."); - #[cfg(feature = "vsr")] - { - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - // `stream.read` delegates to `read_exact`; on success it - // always returns the requested length, so no short-read - // guard is needed here. - stream.read(&mut response_header).await.map_err(|error| { - error!( - "Failed to read VSR response header for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; + stream.flush().await?; + trace!("Sent a TCP request with code: {code}, waiting for a response..."); + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + // `stream.read` delegates to `read_exact`; on success it + // always returns the requested length, so no short-read + // guard is needed here. + stream.read(&mut response_header).await.map_err(|error| { + error!( + "Failed to read VSR response header for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; - let response_size = crate::vsr::response_size(&response_header)?; + let response_size = crate::vsr::response_size(&response_header)?; - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - let mut body = BytesMut::with_capacity(body_size); - stream.read_buf(&mut body, body_size).await.map_err(|error| { + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + let mut body = BytesMut::with_capacity(body_size); + stream.read_buf(&mut body, body_size).await.map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + body.freeze() + } else { + Bytes::new() + }; + + return crate::vsr::decode_response_split(&response_header, body); + } + + #[cfg(not(feature = "vsr"))] + { + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { error!( - "Failed to read VSR response body for TCP request with code: {code}: {error}", + "Failed to read response for TCP request with code: {code}: {error}", code = code, error = error ); IggyError::Disconnected })?; - body.freeze() - } else { - Bytes::new() - }; - return crate::vsr::decode_response_split(&response_header, body); - } + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } - #[cfg(not(feature = "vsr"))] - { - let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { - error!( - "Failed to read response for TCP request with code: {code}: {error}", - code = code, - error = error + let status = u32::from_le_bytes( + response_buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - IggyError::Disconnected - })?; - - if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { - error!("Received an invalid or empty response."); - return Err(IggyError::EmptyResponse); + let length = u32::from_le_bytes( + response_buffer[4..] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + return TcpClient::handle_response(status, length, stream).await; } - - let status = u32::from_le_bytes( - response_buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let length = u32::from_le_bytes( - response_buffer[4..] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - return TcpClient::handle_response(status, length, stream).await; } - } - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + tokio::time::timeout(request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(request_timeout))? }) .await .map_err(|e| { @@ -978,6 +984,10 @@ mod tests { tcp_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + tcp_client_config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(tcp_client_config.reconnection.enabled); assert!(tcp_client_config.reconnection.max_retries.is_none()); @@ -1125,4 +1135,32 @@ mod tests { let tcp_client = TcpClient::handle_response(0, 50, &mut stream).await; assert!(tcp_client.is_err()); } + + #[cfg(not(feature = "vsr"))] + #[tokio::test] + async fn should_return_request_timeout_when_server_does_not_respond() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + std::future::pending::<()>().await; + }); + + let config = TcpClientConfig { + server_address: addr.to_string(), + request_timeout: IggyDuration::from_str("100ms").unwrap(), + ..Default::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + + let tcp_stream = TcpStream::connect(addr).await.unwrap(); + let client_addr = tcp_stream.local_addr().unwrap(); + let stream = ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_addr, tcp_stream)); + *client.stream.lock().await = Some(stream); + *client.state.lock().await = ClientState::Connected; + + let result = client.send_raw(1, Bytes::new()).await; + assert!(matches!(result, Err(IggyError::RequestTimeout(_)))); + } } diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 5e5e35e2ad..73502bc137 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -651,129 +651,136 @@ impl WebSocketClient { ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => {} } - let mut stream_guard = self.stream.lock().await; - let stream = stream_guard.as_mut().ok_or_else(|| { - trace!("Cannot send data. Client is not connected."); - IggyError::NotConnected - })?; - - #[cfg(feature = "vsr")] - let request = { - let mut consensus_session = self - .consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(not(feature = "vsr"))] - let mut request = BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(payload_length as u32); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(code); - #[cfg(not(feature = "vsr"))] - request.put_slice(&payload); - - trace!( - "Sending {NAME} message with code: {}, payload size: {} bytes", - code, - payload.len() - ); - #[cfg(feature = "vsr")] - trace!( - "Sending {NAME} VSR request of size {} with code: {code}", - request.len() - ); - - stream.write(&request).await?; - stream.flush().await?; + let io = async { + let mut stream_guard = self.stream.lock().await; + let stream = stream_guard.as_mut().ok_or_else(|| { + trace!("Cannot send data. Client is not connected."); + IggyError::NotConnected + })?; - #[cfg(feature = "vsr")] - { - // Mirror the TCP path: header onto stack, body into its own - // buffer, `decode_response_split` slices without concatenation. - // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` - // (two zero-fills) + `BytesMut::with_capacity(response_size)` + - // two `put_slice` memcopies per reply. - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - stream.read(&mut response_header).await?; - - let response_size = crate::vsr::response_size(&response_header)?; - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - // `WebSocketStreamKind::read` reads into a slice without a - // zero-fill prerequisite; we still allocate `body_size` but - // skip the header concatenation. - let mut body = vec![0u8; body_size]; - stream.read(&mut body).await?; - Bytes::from(body) - } else { - Bytes::new() + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = self + .consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? }; - - crate::vsr::decode_response_split(&response_header, body) - } - - #[cfg(not(feature = "vsr"))] - { - let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - stream.read(&mut response_initial_buffer).await?; - - let status = u32::from_le_bytes([ - response_initial_buffer[0], - response_initial_buffer[1], - response_initial_buffer[2], - response_initial_buffer[3], - ]); - - let length = u32::from_le_bytes([ - response_initial_buffer[4], - response_initial_buffer[5], - response_initial_buffer[6], - response_initial_buffer[7], - ]) as usize; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(not(feature = "vsr"))] + let mut request = + BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(payload_length as u32); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(code); + #[cfg(not(feature = "vsr"))] + request.put_slice(&payload); trace!( - "Received {NAME} response status: {}, length: {} bytes", - status, length + "Sending {NAME} message with code: {}, payload size: {} bytes", + code, + payload.len() + ); + #[cfg(feature = "vsr")] + trace!( + "Sending {NAME} VSR request of size {} with code: {code}", + request.len() ); - if status != 0 { - // TEMP: See https://github.com/apache/iggy/pull/604 for context. - if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::UserAlreadyExists as u32 - || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 - || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 - { - debug!( - "Received a server resource already exists response: {} ({})", - status, - IggyError::from_code_as_string(status) - ) + stream.write(&request).await?; + stream.flush().await?; + + #[cfg(feature = "vsr")] + { + // Mirror the TCP path: header onto stack, body into its own + // buffer, `decode_response_split` slices without concatenation. + // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` + // (two zero-fills) + `BytesMut::with_capacity(response_size)` + + // two `put_slice` memcopies per reply. + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_header).await?; + + let response_size = crate::vsr::response_size(&response_header)?; + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + // `WebSocketStreamKind::read` reads into a slice without a + // zero-fill prerequisite; we still allocate `body_size` but + // skip the header concatenation. + let mut body = vec![0u8; body_size]; + stream.read(&mut body).await?; + Bytes::from(body) } else { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status), - ); - } + Bytes::new() + }; - return Err(IggyError::from_code(status)); + crate::vsr::decode_response_split(&response_header, body) } - if length == 0 { - return Ok(Bytes::new()); - } + #[cfg(not(feature = "vsr"))] + { + let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + stream.read(&mut response_initial_buffer).await?; + + let status = u32::from_le_bytes([ + response_initial_buffer[0], + response_initial_buffer[1], + response_initial_buffer[2], + response_initial_buffer[3], + ]); + + let length = u32::from_le_bytes([ + response_initial_buffer[4], + response_initial_buffer[5], + response_initial_buffer[6], + response_initial_buffer[7], + ]) as usize; + + trace!( + "Received {NAME} response status: {}, length: {} bytes", + status, length + ); - let mut response_buffer = vec![0u8; length]; - stream.read(&mut response_buffer).await?; + if status != 0 { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } - trace!("Received {NAME} response payload, size: {} bytes", length); - Ok(Bytes::from(response_buffer)) - } + return Err(IggyError::from_code(status)); + } + + if length == 0 { + return Ok(Bytes::new()); + } + + let mut response_buffer = vec![0u8; length]; + stream.read(&mut response_buffer).await?; + + trace!("Received {NAME} response payload, size: {} bytes", length); + Ok(Bytes::from(response_buffer)) + } + }; + + tokio::time::timeout(self.config.request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(self.config.request_timeout))? } } @@ -795,6 +802,10 @@ mod tests { client.config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + client.config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(matches!(client.config.auto_login, AutoLogin::Disabled)); assert!(client.config.reconnection.enabled); } diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs index ef43f5fb40..4b61a48215 100644 --- a/examples/rust/src/shared/args.rs +++ b/examples/rust/src/shared/args.rs @@ -190,6 +190,9 @@ pub struct Args { #[arg(long, default_value = "false")] pub websocket_tls_validate_certificate: bool, + + #[arg(long, default_value = "300s")] + pub request_timeout: String, } impl Args { @@ -262,6 +265,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "300s".to_string(), } } } @@ -373,6 +377,7 @@ impl Args { websocket_tls_domain: self.websocket_tls_domain.clone(), websocket_tls_ca_file: self.websocket_tls_ca_file.clone(), websocket_tls_validate_certificate: self.websocket_tls_validate_certificate, + request_timeout: self.request_timeout.clone(), } } From 005b84e25fe99bcbd3ab059c4c9f21b0bb69bbbb Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 14 Jun 2026 22:59:30 -0400 Subject: [PATCH 2/2] chore(sdk): reduce request timeout from 300s to 30s --- core/common/src/types/args/mod.rs | 4 ++-- .../src/types/configuration/quic_config/quic_client_config.rs | 4 ++-- .../src/types/configuration/tcp_config/tcp_client_config.rs | 4 ++-- .../configuration/websocket_config/websocket_client_config.rs | 4 ++-- core/integration/tests/cli/general/test_help_command.rs | 2 +- core/sdk/src/quic/quic_client.rs | 2 +- core/sdk/src/tcp/tcp_client.rs | 2 +- core/sdk/src/websocket/websocket_client.rs | 2 +- examples/rust/src/shared/args.rs | 4 ++-- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index f590da907c..d9ea93167b 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -221,7 +221,7 @@ pub struct ArgsOptional { /// The optional per-request timeout for send/receive operations /// - /// [default: "300s"] + /// [default: "30s"] #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub request_timeout: Option, @@ -426,7 +426,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, - request_timeout: "300s".to_string(), + request_timeout: "30s".to_string(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index f186281770..7350f6c04b 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -66,7 +66,7 @@ impl Default for QuicClientConfig { server_name: "localhost".to_string(), auto_login: AutoLogin::Disabled, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), reconnection: QuicClientReconnectionConfig::default(), response_buffer_size: 1000 * 1000 * 10, max_concurrent_bidi_streams: 10000, @@ -99,7 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), } } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index dd76ac85dc..578b70be13 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -56,7 +56,7 @@ impl Default for TcpClientConfig { tls_ca_file: None, tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), nodelay: false, @@ -76,7 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index f757eab24a..f50183183f 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -72,7 +72,7 @@ impl Default for WebSocketClientConfig { auto_login: AutoLogin::Disabled, reconnection: WebSocketClientReconnectionConfig::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: false, tls_domain: "localhost".to_string(), @@ -159,7 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/integration/tests/cli/general/test_help_command.rs b/core/integration/tests/cli/general/test_help_command.rs index 5da76ae228..266bd9da9c 100644 --- a/core/integration/tests/cli/general/test_help_command.rs +++ b/core/integration/tests/cli/general/test_help_command.rs @@ -193,7 +193,7 @@ Options: --request-timeout The optional per-request timeout for send/receive operations {CLAP_INDENT} - [default: "300s"] + [default: "30s"] -q, --quiet Quiet mode (disabled stdout printing) diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index d9017039dc..8f26fd29c4 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -978,7 +978,7 @@ mod tests { ); assert_eq!( quic_client_config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(quic_client_config.reconnection.enabled); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 423e4096fe..cac6ed03c6 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -986,7 +986,7 @@ mod tests { ); assert_eq!( tcp_client_config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(tcp_client_config.reconnection.enabled); diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 73502bc137..22ffd4ecc1 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -804,7 +804,7 @@ mod tests { ); assert_eq!( client.config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(matches!(client.config.auto_login, AutoLogin::Disabled)); assert!(client.config.reconnection.enabled); diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs index 4b61a48215..c95451fdba 100644 --- a/examples/rust/src/shared/args.rs +++ b/examples/rust/src/shared/args.rs @@ -191,7 +191,7 @@ pub struct Args { #[arg(long, default_value = "false")] pub websocket_tls_validate_certificate: bool, - #[arg(long, default_value = "300s")] + #[arg(long, default_value = "30s")] pub request_timeout: String, } @@ -265,7 +265,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, - request_timeout: "300s".to_string(), + request_timeout: "30s".to_string(), } } }