Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/common/src/error/iggy_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::Identifier;
use crate::IggyDuration;
use crate::utils::topic_size::MaxTopicSize;
use crate::{IggyMessage, utils::byte_size::IggyByteSize};
use std::sync::Arc;
Expand Down Expand Up @@ -502,6 +503,8 @@ pub enum IggyError {
CannotBindToSocket(String) = 12000,
#[error("Task execution timeout")]
TaskTimeout = 12001,
#[error("Request timeout after {0}")]
RequestTimeout(IggyDuration) = 12002,

#[error("IO error: {0}")]
IoError(String) = 13000,
Expand Down
14 changes: 14 additions & 0 deletions core/common/src/types/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ pub struct ArgsOptional {
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub websocket_reconnection_interval: Option<String>,

/// The optional per-request timeout for send/receive operations
///
/// [default: "300s"]
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub request_timeout: Option<String>,
}

/// The arguments used by the `ClientProviderConfig` to create a client.
Expand Down Expand Up @@ -351,6 +358,9 @@ pub struct Args {

/// The optional TLS validate certificate for the WebSocket transport
pub websocket_tls_validate_certificate: bool,

/// The per-request timeout for send/receive operations
pub request_timeout: String,
}

const QUIC_TRANSPORT: &str = "quic";
Expand Down Expand Up @@ -416,6 +426,7 @@ impl Default for Args {
websocket_tls_domain: "localhost".to_string(),
websocket_tls_ca_file: None,
websocket_tls_validate_certificate: false,
request_timeout: "300s".to_string(),
}
}
}
Expand Down Expand Up @@ -517,6 +528,9 @@ impl From<Vec<ArgsOptional>> for Args {
{
args.websocket_reconnection_interval = websocket_reconnection_interval;
}
if let Some(request_timeout) = optional_args.request_timeout {
args.request_timeout = request_timeout;
}
}

args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct QuicClientConfig {
pub validate_certificate: bool,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,
}

impl Default for QuicClientConfig {
Expand All @@ -64,6 +66,7 @@ impl Default for QuicClientConfig {
server_name: "localhost".to_string(),
auto_login: AutoLogin::Disabled,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("300s").unwrap(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

300s is IMHO overkill. make it 10s or so.

reconnection: QuicClientReconnectionConfig::default(),
response_buffer_size: 1000 * 1000 * 10,
max_concurrent_bidi_streams: 10000,
Expand Down Expand Up @@ -96,6 +99,7 @@ impl From<ConnectionString<QuicConnectionStringOptions>> for QuicClientConfig {
max_idle_timeout: connection_string.options().max_idle_timeout(),
validate_certificate: connection_string.options().validate_certificate(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: IggyDuration::from_str("300s").unwrap(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ impl QuicClientConfigBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config.request_timeout = request_timeout;
self
}

/// Finalizes the builder and returns the `QuicClientConfig`.
pub fn build(mut self) -> Result<QuicClientConfig, IggyError> {
self.config.server_address = self.config.server_address.trim().to_owned();
Expand All @@ -164,6 +170,7 @@ impl QuicClientConfigBuilder {
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn build_should_trim_and_validate_server_address() {
Expand Down Expand Up @@ -193,4 +200,16 @@ mod tests {

assert!(result.is_err());
}

#[test]
fn with_request_timeout_should_override_default() {
let config = QuicClientConfigBuilder::default()
.with_request_timeout(IggyDuration::from_str("60s").unwrap())
.build()
.unwrap();
assert_eq!(
config.request_timeout,
IggyDuration::from_str("60s").unwrap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct TcpClientConfig {
pub reconnection: TcpClientReconnectionConfig,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a pub field here is a breaking change for anyone constructing this struct with a literal without ..Default::default(), since it isn't #[non_exhaustive]. fine for now while the crate is pre-1.0, but if you plan to keep adding config fields it's worth marking the three client config structs #[non_exhaustive] once and steering people to the builders.

/// Disable Nagle algorithm for the TCP socket.
pub nodelay: bool,
}
Expand All @@ -54,6 +56,7 @@ impl Default for TcpClientConfig {
tls_ca_file: None,
tls_validate_certificate: true,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("300s").unwrap(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heads up this is a behavior change - before this, requests had no deadline, now they default to 300s. anything that legitimately takes longer than 300s on a connection now returns RequestTimeout (and trips the desync above). worth a changelog / release-note callout so users who relied on unbounded requests aren't surprised.

auto_login: AutoLogin::Disabled,
reconnection: TcpClientReconnectionConfig::default(),
nodelay: false,
Expand All @@ -73,6 +76,7 @@ impl From<ConnectionString<TcpConnectionStringOptions>> for TcpClientConfig {
tls_validate_certificate: true,
reconnection: connection_string.options().reconnection().to_owned(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: IggyDuration::from_str("300s").unwrap(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hardcodes 300s instead of reading from the connection string like the field right above it (heartbeat_interval uses connection_string.options()). same in the quic and websocket From<ConnectionString> impls. so request_timeout is settable via the builder and cli but silently pinned to 300s for anyone configuring through a connection string. either parse it from the options or document that it's not supported there.

nodelay: connection_string.options().nodelay(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ impl TcpClientConfigBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config.request_timeout = request_timeout;
self
}

/// Builds the TCP client configuration.
pub fn build(mut self) -> Result<TcpClientConfig, IggyError> {
self.config.server_address = self.config.server_address.trim().to_owned();
Expand All @@ -114,6 +120,7 @@ impl TcpClientConfigBuilder {
mod tests {
use super::*;
use crate::IggyError;
use std::str::FromStr;

fn builder_with_address(addr: &str) -> TcpClientConfigBuilder {
let mut builder = TcpClientConfigBuilder::default();
Expand Down Expand Up @@ -187,4 +194,16 @@ mod tests {
let builder = builder_with_address("iggy-server:8090");
assert!(builder.build().is_ok());
}

#[test]
fn with_request_timeout_should_override_default() {
let config = TcpClientConfigBuilder::default()
.with_request_timeout(IggyDuration::from_str("60s").unwrap())
.build()
.unwrap();
assert_eq!(
config.request_timeout,
IggyDuration::from_str("60s").unwrap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct WebSocketClientConfig {
pub reconnection: WebSocketClientReconnectionConfig,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,
/// WebSocket-specific configuration.
pub ws_config: WebSocketConfig,
/// Whether tls is enabled
Expand Down Expand Up @@ -70,6 +72,7 @@ impl Default for WebSocketClientConfig {
auto_login: AutoLogin::Disabled,
reconnection: WebSocketClientReconnectionConfig::default(),
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("300s").unwrap(),
ws_config: WebSocketConfig::default(),
tls_enabled: false,
tls_domain: "localhost".to_string(),
Expand Down Expand Up @@ -156,6 +159,7 @@ impl From<ConnectionString<WebSocketConnectionStringOptions>> for WebSocketClien
auto_login: connection_string.auto_login().to_owned(),
reconnection: connection_string.options().reconnection().to_owned(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: IggyDuration::from_str("300s").unwrap(),
ws_config,
tls_enabled: options.tls_enabled(),
tls_domain: options.tls_domain().into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ impl WebSocketClientConfigBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config.request_timeout = request_timeout;
self
}

/// Builds the WebSocket client configuration.
pub fn build(mut self) -> Result<WebSocketClientConfig, IggyError> {
self.config.server_address = self.config.server_address.trim().to_owned();
Expand All @@ -148,6 +154,7 @@ impl WebSocketClientConfigBuilder {
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn build_should_trim_and_validate_server_address() {
Expand Down Expand Up @@ -177,4 +184,16 @@ mod tests {

assert!(result.is_err());
}

#[test]
fn with_request_timeout_should_override_default() {
let config = WebSocketClientConfigBuilder::default()
.with_request_timeout(IggyDuration::from_str("60s").unwrap())
.build()
.unwrap();
assert_eq!(
config.request_timeout,
IggyDuration::from_str("60s").unwrap()
);
}
}
5 changes: 5 additions & 0 deletions core/integration/tests/cli/general/test_help_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ Options:
{CLAP_INDENT}
[default: "1s"]

--request-timeout <REQUEST_TIMEOUT>
The optional per-request timeout for send/receive operations
{CLAP_INDENT}
[default: "300s"]

-q, --quiet
Quiet mode (disabled stdout printing)

Expand Down
3 changes: 3 additions & 0 deletions core/sdk/src/client_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl ClientProviderConfig {
keep_alive_interval: args.quic_keep_alive_interval,
max_idle_timeout: args.quic_max_idle_timeout,
validate_certificate: args.quic_validate_certificate,
request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_str(&args.request_timeout).unwrap() panics on a bad --request-timeout value since it's a free-form string with no validation. also at lines 152 and 186. matches the existing pattern for the other duration args here so it's not new, but it adds another panic point on user input - a clap value_parser on the arg would reject bad values cleanly instead.

}));
}
TransportProtocol::Http => {
Expand Down Expand Up @@ -148,6 +149,7 @@ impl ClientProviderConfig {
)
.unwrap(),
},
request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(),
auto_login: if auto_login {
AutoLogin::Enabled(Credentials::UsernamePassword(
args.username,
Expand Down Expand Up @@ -181,6 +183,7 @@ impl ClientProviderConfig {
} else {
AutoLogin::Disabled
},
request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(),
ws_config: WebSocketConfig::default(),
tls_enabled: args.websocket_tls_enabled,
tls_domain: args.websocket_tls_domain,
Expand Down
18 changes: 18 additions & 0 deletions core/sdk/src/clients/client_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ impl TcpClientBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}

/// Builds the parent `IggyClient` with TCP configuration.
pub fn build(self) -> Result<IggyClient, IggyError> {
let client = TcpClient::create(Arc::new(self.config.build()?))?;
Expand Down Expand Up @@ -277,6 +283,12 @@ impl QuicClientBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}

/// Builds the parent `IggyClient` with QUIC configuration.
pub fn build(self) -> Result<IggyClient, IggyError> {
let client = QuicClient::create(Arc::new(self.config.build()?))?;
Expand Down Expand Up @@ -390,6 +402,12 @@ impl WebSocketClientBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}

/// Builds the parent `IggyClient` with WebSocket configuration.
pub fn build(self) -> Result<IggyClient, IggyError> {
let client = WebSocketClient::create(Arc::new(self.config.build()?))?;
Expand Down
Loading
Loading